多进程、多线程

在学习 Python 的中多进程和多线程之前,要弄清楚什么是进程? 而什么又是线程?

什么是进程(Process)

对于操作系统来说,一个任务就是一个进程,比如打开 PyCharm 就是启动了 PyCharm 进程,打开 QQ 就是启动了 QQ 进程,每一个进程的启动都会先产生一个线程,即主线程,然后由这个主线程去创建其它子线程

什么是线程(Thread)

线程被包含在进程中,是进程实际的运作单位,被称为微进程,一个进程中可以有多个线程,一个进程可以同时并发多个线程,每条线程并行执行不同的任务

进程和线程之间的区别

  • 进程之间是相互独立的,多进程中的,进程之间的变量是不会相互影响的,但是一个进程内的多个线程是内存共享的,所以线程间的变量是共享的
  • 因为进程之间是相互独立的,所以进程间的崩溃不会影响到其它进程,但是由于线程是包含在进程内部的,所以线程的崩溃会引发进程的崩溃,继而造成同一进程中的其它线程也崩溃
  • 进程的启动速度慢,线程的启动速度快,但是两者运行速度没有可比性

并行和并发的理解

并行是并发,但是并发不一定是并行,并行需要满足的要求是:CPU的核心数大于进程的数量,这才是并行,如果 CPU 的核心数小于进程的数量,那么就是并发了,不能称为并行,所以说,并行是并发,但是并发不是并行

创建进程(Process)

os模块的fork方法可以在进程,但是这个只能在Linux/unix/mac上使用,而且比较特殊,别的函数,调用一次,返回一次,但是 fork() 调用一次,返回两次,因为操作系统自动把当前进程(也就是父进程)复制了一份进程(就是子进程),然后分别在父进程和子进程内返回

子进程永远返回 0,而父进程返回子进程的 Pid ,原因是因为,一个父进程可以 fork 出很多子进程,所以父进程要记录下每一个子进程的 Pid, 子进程调用 getppid 则可以获取父进程的 pid

1
2
3
4
5
6
7
8
9
10
11
12
import os
"""
getpid 可以获取当前进程的进程号
getppid 可以获取父级进程的进程号
"""
print('父进程', os.getpid())
pid = os.fork()
# 子进程永远都是返回 0
if pid == 0:
print('子进程{}, 父进程{}'.format(os.getpid(), os.getppid()))
else:
print('父进程{}, 子进程{}'.format(os.getpid(), pid))

运行结果如下:

1
2
3
父进程 2831
父进程2831, 子进程2832
子进程2832, 父进程2831

因为 fork() 函数在 Windows 上是无效的,所以有了 multprocessing 跨平台的多进程模块,这个模块的 Process 类提供了进程对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import multiprocessing
import time
import os

def func():
time.sleep(3)
print('子进程{}'.format(os.getpid()))

if __name__ == '__main__':
print('父进程{}'.format(os.getpid()))
print('子进程开始...', time.strftime('%H:%M:%S', time.localtime()))
p = multiprocessing.Process(target=func)
p.start() # 开始启动进程
p.join() # 等待进程执行完毕
print('子进程结束...', time.strftime('%H:%M:%S', time.localtime()))

运行结果如下:

1
2
3
4
父进程2879
子进程开始... 18:39:39
子进程2880
子进程结束... 18:39:42

通过 Process 创建子进程实例的时候,需要通过 target 参数来传入一个 执行函数,当然, Process 不仅仅如此,还可以传入 args (位置参数)和 kwargs (关键字参数),创建完进程后,使用 start() 来启动,这种方式创建进程比 fork() 要简单
join() 方法,就是父进程只有等待子进程执行完毕后,才会继续往下执行,一般用于进程间的同步

创建线程(Thread)

和进程的 multprocessing 模块一样,线程也有一个属于自己的模块 threading, threading 模块提供了一个 Thread 用于创建线程

1
2
3
4
5
6
7
8
9
10
import threading

print('主线程开始', threading.current_thread().name)
def func():
print('子线程',threading.current_thread().name)

t = threading.Thread(target=func)
t.start()
t.join()
print('主线程结束',threading.current_thread().name)

线程的创建和进程区别并不大,线程是通过一个 Thread 类实例化一个线程, 然后调用 start() 启动线程,调用 join() 将子线程挂起,让主线程等待子线程结束,然后主线程才结束,

计算密集型任务和 IO密集任务

计算密集(耗时任务)用进程,IO密集(阻塞任务)用线程,因为线程遇到阻塞会自动切换,耗时和阻塞的区别在于,耗时就是会一直占用 CPU 进行做运算, 阻塞则是指的是 让 CPU 休眠

1
2
3
4
5
6
7
8
9
# 阻塞 time.sleep 表示阻塞任务
def func():
time.sleep(5)

# 耗时任务 fib 表示耗时任务
def fib(n):
if n <=2 :
return 1
return fib(n-1) + fib(n-2)

使用多进程实现并行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import multiprocessing
import time

print('fib start...', time.strftime('%H:%M:%S', time.localtime()))
def fib(n):
if n == 0:
return 1
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)

p = multiprocessing.Process(target=fib, args=(35,))
p.start()
print(fib(35))
p.join()
print('fib end...', time.strftime('%H:%M:%S', time.localtime()))

使用多线程避免阻塞

多线程避免阻塞,使用 socket 本身的阻塞做为例子

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import socket
from threading import Thread

server = socket.socket()
server.bind(('', 8888))
server.listen(1000)

# 接收数据
def read(conn, raddr):
while True:
data = conn.recv(1000)
if data:
print(data.decode('utf-8'))
conn.send(data)
else:
conn.close()
break

while True:
connection, remote_address = server.accept()
# 使用多线程防止阻塞
t = Thread(target=read, args=(connection, remote_address))
t.start()

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
import socket

client = socket.socket()
client.connect(('localhost', 8888))
while True:
msg = input('>>')
client.send(msg.encode('utf8'))
data = client.recv(1000)
if data:
print(data.decode('utf-8'))
else:
client.close()
break

这个就是使用多线程实现并发服务器的一个典型例子,每接收到一个客户端的连接,就分配一个线程去接收,这里要注意,不要使用 join(),因为一旦使用了,就表示只有当前线程结束后,才会开始下一次线程,当然,这是多线程实现非阻塞的套接字,使用多进程同样也是可以的,但是,使用多进程,比较损耗 CPU ,所以当遇到阻塞,要使用线程,因为线程遇到阻塞会自动切换

操作补充

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import multiprocessing
import time


print('outer start', time.strftime('%H:%M:%S', time.localtime()))
def func():
print('func start', time.strftime('%H:%M:%S', time.localtime()))
time.sleep(4)
print('func end', time.strftime('%H:%M:%S', time.localtime()))

p = multiprocessing.Process(target=func) # 只是定义进程 没有生成进程
p.start() # 生产进程 调用 run 方法,间接调用 target 对象
p.join() # 由调用进程 阻塞等待对应子进程结束在 (主进程调用)
time.sleep(4)
print('outer end', time.strftime('%H:%M:%S', time.localtime()))

join 方法就是,由调用线程等待对应子进程结束
打印当前进程

1
2
3
4
5
6
7
8
9
10
11
12
import multiprocessing


print(multiprocessing.current_process())
def func():
print('func start')
print(multiprocessing.current_process())
print('func end')

p = multiprocessing.Process(target=func)
p.start() # 没有先后顺序
print('main end')

current_process() 是打印出当前进程,可以用于调试 注意父进程会等待子进程结束在退出

进程可以中途停止,但是线程是不可以中途停止,进程使用terminate()就可以中止

1
2
3
4
5
6
7
8
9
10
11
12
13
import multiprocessing
import time

print(multiprocessing.current_process())
def func():
print('func start')
time.sleep(4)
print('func end')

p = multiprocessing.Process(target=func)
p.start()
time.sleep(2)
p.terminate() # 中止

进程和进程的标识

进程的标识是 pid 号,线程的标识是 ident

先看下面这段代码

1
2
3
4
5
import multiprocessing
import time

print(multiprocessing.current_process().pid)
time.sleep(10)

以上代码运行之后,在控制台输出

1
31819

我们使用 ps aux|grep python3 来查看进程

如果是使用是 Pycharm 连接 Linux 运行以上代码,则会多出一个 Pycharm 的进程,然后加上 grep 的进程,所以有 3个进程

接下来继续看子进程的

1
2
3
4
5
6
7
8
9
print(multiprocessing.current_process().pid)
def func():
print('func start')
print(multiprocessing.current_process().pid)
print('func end')

p = multiprocessing.Process(target=func) # 只是定义了一个 进程对象 并没有生产进程
print(p.pid) # None
p.start() # 这才是生成进程的

然后我们来看看 线程的 ident

1
2
3
4
5
6
7
8
9
10
11
import threading

print('main thread',threading.current_thread().ident)
def func():
print('func inner', threading.current_thread().ident)
time.sleep(10)

t = threading.Thread(target=func)
print('sub-thread before stat', t.ident)
t.start()
print('sub-thread after stat', t.ident)

代码输出

1
2
3
4
main thread 140104521996096
sub-thread before stat None
func inner 140104378394368
sub-thread after stat 140104378394368

线程和进程的名字

设置进程名字:

1
2
3
4
5
6
7
8
9
10
import multiprocessing

print(multiprocessing.current_process())
def func():
print('func start')
print(multiprocessing.current_process())
print('func end')

p = multiprocessing.Process(target=func, name='handsomeFu') # 通过 name 可以设置进程的名字
p.start()

输出结果:

1
2
3
4
<_MainProcess(MainProcess, started)>
func start
<Process(handsomeFu, started)>
func end

当然不止可以在实例的时候设置,也可以通过,实例.属性进行设置

设置线程的名字

1
2
3
4
5
6
7
8
9
10
import threading

def func():
print('func ...')

t = threading.Thread(target=func, name='handsome')
# t.setName('付帅帅') # 老式接口 没有使用 property
# t.name = 'handsomeFu'
t.start()
print(t)

方法和进程几乎一模一样,但是多了一个 setName用来设置名字,但是这个 setName 是老式接口,不需要使用

检测进程活性 is_active

1
2
3
4
5
6
7
8
9
10
11
12
import multiprocessing


def func():
time.sleep(2)

p = multiprocessing.Process(target=func)
print(p.is_alive(), p)
p.start()
print(p.is_alive(), p)
p.join()
print(p.is_alive(), p)

输出

1
2
3
4
False <Process(Process-1, initial)>
True <Process(Process-1, started)>
func ...
False <Process(Process-1, stopped)>

我们可以看到在 start 之前,打印为 False ,start 之后为 True ,join 之后又变成了 False,所以可以用于调试,同样,线程也是有 is_alive() 的方法用来判断线程的一个状态

守护模式

举个例子,如果你的 Pycharm 里面同时打开了多个文件,那么当你对 Pycharm 进行操作的时候,那么正在被编辑的文件,则一定会被关闭,就是子进程会随着父进程的关闭而关闭

1
2
3
4
5
6
7
8
9
def func():
print('func start')
time.sleep(5)
print('func end')


p = multiprocessing.Process(target=func, daemon=True)
p.start()
time.sleep(3)

输出

1
func start

一旦主进程没有执行了,那么就会退出,先 terminate 所有的 daemon 子进程 然后等待所有的普通子进程结束

面向对象使用进程和线程

面向对象的方法,就是继承类,重写方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from threading import Thread



class MyThread(Thread):
def __init__(self):
super().__init__()

def run(self):
print('这是重写之后的 run 方法')


t = MyThread()
t.start()

输出

1
这是重写之后的 run 方法

传参的版本

1
2
3
4
5
6
7
8
9
10
11
12
13
from threading import Thread

class MyThread(Thread):
def __init__(self, a):
super().__init__()
self.a = a

def run(self):
print(self.a)


t = MyThread(666)
t.start()

输出

1
666

---------------- The   End ----------------