在学习 Python 的中多进程和多线程之前,要弄清楚什么是进程? 而什么又是线程?
什么是进程(Process
)
对于操作系统来说,一个任务就是一个进程,比如打开 PyCharm 就是启动了 PyCharm 进程,打开 QQ 就是启动了 QQ 进程,每一个进程的启动都会先产生一个线程,即主线程,然后由这个主线程去创建其它子线程
什么是线程(Thread
)
线程被包含在进程中,是进程实际的运作单位,被称为微进程,一个进程中可以有多个线程,一个进程可以同时并发多个线程,每条线程并行执行不同的任务
进程和线程之间的区别
- 进程之间是相互独立的,多进程中的,进程之间的变量是不会相互影响的,但是一个进程内的多个线程是内存共享的,所以线程间的变量是共享的
- 因为进程之间是相互独立的,所以进程间的崩溃不会影响到其它进程,但是由于线程是包含在进程内部的,所以线程的崩溃会引发进程的崩溃,继而造成同一进程中的其它线程也崩溃
- 进程的启动速度慢,线程的启动速度快,但是两者运行速度没有可比性
并行和并发的理解
并行是并发,但是并发不一定是并行,并行需要满足的要求是:CPU的核心数大于进程的数量,这才是并行,如果 CPU 的核心数小于进程的数量,那么就是并发了,不能称为并行,所以说,并行是并发,但是并发不是并行
创建进程(Process
)
os模块的fork方法可以在进程,但是这个只能在Linux/unix/mac上使用,而且比较特殊,别的函数,调用一次,返回一次,但是 fork() 调用一次,返回两次,因为操作系统自动把当前进程(也就是父进程)复制了一份进程(就是子进程),然后分别在父进程和子进程内返回
子进程永远返回 0,而父进程返回子进程的 Pid ,原因是因为,一个父进程可以 fork 出很多子进程,所以父进程要记录下每一个子进程的 Pid, 子进程调用
getppid
则可以获取父进程的 pid
1 | import os |
运行结果如下:
1 | 父进程 2831 |
因为 fork() 函数在 Windows 上是无效的,所以有了 multprocessing
跨平台的多进程模块,这个模块的 Process
类提供了进程对象1
2
3
4
5
6
7
8
9
10
11
12
13
14
15import multiprocessing
import time
import os
def func():
time.sleep(3)
print('子进程{}'.format(os.getpid()))
if __name__ == '__main__':
print('父进程{}'.format(os.getpid()))
print('子进程开始...', time.strftime('%H:%M:%S', time.localtime()))
p = multiprocessing.Process(target=func)
p.start() # 开始启动进程
p.join() # 等待进程执行完毕
print('子进程结束...', time.strftime('%H:%M:%S', time.localtime()))
运行结果如下:1
2
3
4父进程2879
子进程开始... 18:39:39
子进程2880
子进程结束... 18:39:42
通过 Process 创建子进程实例的时候,需要通过 target 参数来传入一个 执行函数,当然, Process 不仅仅如此,还可以传入 args (位置参数)和 kwargs (关键字参数),创建完进程后,使用 start() 来启动,这种方式创建进程比 fork() 要简单
join() 方法,就是父进程只有等待子进程执行完毕后,才会继续往下执行,一般用于进程间的同步
创建线程(Thread
)
和进程的 multprocessing
模块一样,线程也有一个属于自己的模块 threading
, threading
模块提供了一个 Thread
用于创建线程
1 | import threading |
线程的创建和进程区别并不大,线程是通过一个 Thread 类实例化一个线程, 然后调用 start() 启动线程,调用 join() 将子线程挂起,让主线程等待子线程结束,然后主线程才结束,
计算密集型任务和 IO密集任务
计算密集(耗时任务)用进程,IO密集(阻塞任务)用线程,因为线程遇到阻塞会自动切换,耗时和阻塞的区别在于,耗时就是会一直占用 CPU 进行做运算, 阻塞则是指的是 让 CPU 休眠
1 | # 阻塞 time.sleep 表示阻塞任务 |
使用多进程实现并行
1 | import multiprocessing |
使用多线程避免阻塞
多线程避免阻塞,使用 socket 本身的阻塞做为例子
服务端:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import socket
from threading import Thread
server = socket.socket()
server.bind(('', 8888))
server.listen(1000)
# 接收数据
def read(conn, raddr):
while True:
data = conn.recv(1000)
if data:
print(data.decode('utf-8'))
conn.send(data)
else:
conn.close()
break
while True:
connection, remote_address = server.accept()
# 使用多线程防止阻塞
t = Thread(target=read, args=(connection, remote_address))
t.start()
客户端:1
2
3
4
5
6
7
8
9
10
11
12
13import socket
client = socket.socket()
client.connect(('localhost', 8888))
while True:
msg = input('>>')
client.send(msg.encode('utf8'))
data = client.recv(1000)
if data:
print(data.decode('utf-8'))
else:
client.close()
break
这个就是使用多线程实现并发服务器的一个典型例子,每接收到一个客户端的连接,就分配一个线程去接收,这里要注意,不要使用 join(),因为一旦使用了,就表示只有当前线程结束后,才会开始下一次线程,当然,这是多线程实现非阻塞的套接字,使用多进程同样也是可以的,但是,使用多进程,比较损耗 CPU ,所以当遇到阻塞,要使用线程,因为线程遇到阻塞会自动切换
操作补充
1 | import multiprocessing |
join 方法就是,由调用线程等待对应子进程结束
打印当前进程1
2
3
4
5
6
7
8
9
10
11
12import multiprocessing
print(multiprocessing.current_process())
def func():
print('func start')
print(multiprocessing.current_process())
print('func end')
p = multiprocessing.Process(target=func)
p.start() # 没有先后顺序
print('main end')
current_process() 是打印出当前进程,可以用于调试 注意父进程会等待子进程结束在退出
进程可以中途停止,但是线程是不可以中途停止,进程使用terminate()
就可以中止1
2
3
4
5
6
7
8
9
10
11
12
13import multiprocessing
import time
print(multiprocessing.current_process())
def func():
print('func start')
time.sleep(4)
print('func end')
p = multiprocessing.Process(target=func)
p.start()
time.sleep(2)
p.terminate() # 中止
进程和进程的标识
进程的标识是 pid 号,线程的标识是 ident
先看下面这段代码1
2
3
4
5import multiprocessing
import time
print(multiprocessing.current_process().pid)
time.sleep(10)
以上代码运行之后,在控制台输出1
31819
我们使用 ps aux|grep python3
来查看进程
如果是使用是 Pycharm 连接 Linux 运行以上代码,则会多出一个 Pycharm 的进程,然后加上 grep 的进程,所以有 3个进程
接下来继续看子进程的1
2
3
4
5
6
7
8
9print(multiprocessing.current_process().pid)
def func():
print('func start')
print(multiprocessing.current_process().pid)
print('func end')
p = multiprocessing.Process(target=func) # 只是定义了一个 进程对象 并没有生产进程
print(p.pid) # None
p.start() # 这才是生成进程的
然后我们来看看 线程的 ident
1 | import threading |
代码输出1
2
3
4main thread 140104521996096
sub-thread before stat None
func inner 140104378394368
sub-thread after stat 140104378394368
线程和进程的名字
设置进程名字:1
2
3
4
5
6
7
8
9
10import multiprocessing
print(multiprocessing.current_process())
def func():
print('func start')
print(multiprocessing.current_process())
print('func end')
p = multiprocessing.Process(target=func, name='handsomeFu') # 通过 name 可以设置进程的名字
p.start()
输出结果:1
2
3
4<_MainProcess(MainProcess, started)>
func start
<Process(handsomeFu, started)>
func end
当然不止可以在实例的时候设置,也可以通过,实例.属性进行设置
设置线程的名字1
2
3
4
5
6
7
8
9
10import threading
def func():
print('func ...')
t = threading.Thread(target=func, name='handsome')
# t.setName('付帅帅') # 老式接口 没有使用 property
# t.name = 'handsomeFu'
t.start()
print(t)
方法和进程几乎一模一样,但是多了一个 setName用来设置名字,但是这个 setName 是老式接口,不需要使用
检测进程活性 is_active
1
2
3
4
5
6
7
8
9
10
11
12import multiprocessing
def func():
time.sleep(2)
p = multiprocessing.Process(target=func)
print(p.is_alive(), p)
p.start()
print(p.is_alive(), p)
p.join()
print(p.is_alive(), p)
输出1
2
3
4False <Process(Process-1, initial)>
True <Process(Process-1, started)>
func ...
False <Process(Process-1, stopped)>
我们可以看到在 start 之前,打印为 False ,start 之后为 True ,join 之后又变成了 False,所以可以用于调试,同样,线程也是有 is_alive() 的方法用来判断线程的一个状态
守护模式
举个例子,如果你的 Pycharm 里面同时打开了多个文件,那么当你对 Pycharm 进行操作的时候,那么正在被编辑的文件,则一定会被关闭,就是子进程会随着父进程的关闭而关闭
1 | def func(): |
输出1
func start
一旦主进程没有执行了,那么就会退出,先 terminate 所有的 daemon 子进程 然后等待所有的普通子进程结束
面向对象使用进程和线程
面向对象的方法,就是继承类,重写方法
1 | from threading import Thread |
输出
1 | 这是重写之后的 run 方法 |
传参的版本1
2
3
4
5
6
7
8
9
10
11
12
13from threading import Thread
class MyThread(Thread):
def __init__(self, a):
super().__init__()
self.a = a
def run(self):
print(self.a)
t = MyThread(666)
t.start()
输出1
666