进程通信及线程安全
什么是进程间的通信,以及如何解决进程间的通信,线程共享同一片内存空间,那么这样又有什么危害,线程锁是什么,它能干嘛,队列又应该怎么用,以及生产者和消费者模式
进程间的通信
先看代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15from multiprocessing import Process
age = 10 # 全局变量
def func():
global age
age = 20
# func()
p = Process(target=func)
p.start()
p.join()
print(age)
打印输出1
10
通过上面的代码的运行结果,我们可以看出,进程和进程之间的内存地址是相互独立的,如下图
那么既然进程间内存地址是独立的,那么线程之间内存地址是不是独立的呢? 这个还有想吗,肯定不是独立的嘛,注意 线程是通过进程创建的,所有的线程位于同一进程,所以线程共享一块内存地址,看代码1
2
3
4
5
6
7
8
9
10
11
12
13
14from threading import Thread
age = 10 # 全局变量
def func():
global age
age = 20
# func()
t = Thread(target=func)
t.start()
t.join()
print(age)
打印输出1
20
上面的例子可以说明,进程间的内存地址是相互独立的,而线程间的则是共享同一个内存地址,那么我们首先解决进程间通信的这么一个障碍
Manager
先看图:
然后看代码1
2
3
4
5
6
7
8
9
10
11
12
13
14from multiprocessing import Process, Manager
# 先生成一个管理器 生成管理器的同时 也生成了一个公共的进程
mgr = Manager()
tmp_list = mgr.list() # 从公共空间中开辟一个 list 空间
def func():
tmp_list.append(1)
tmp_list.append(2)
p = Process(target=func)
p.start()
p.join()
print(tmp_list)
输出结果1
[1, 2] # 是一个列表
上述代码说明了,通过这个 Manager 我们可以在进程间通信,当然,不仅仅只有 list 还有 dict 以及 queue1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25from multiprocessing import Process, Manager
mgr = Manager() # 生成一个服务器进程,并返回与其通信的管理器
tmp_list = mgr.list()
tmp_dict = mgr.dict()
tmp_queue = mgr.Queue()
print(tmp_list)
print(type(tmp_list))
print(type(tmp_dict))
print(type(tmp_queue))
def b():
tmp_list.append('a')
tmp_dict.update({"a": 1})
tmp_queue.put(1)
t = Process(target=b)
t.start()
t.join()
print(tmp_list)
print(tmp_dict)
print(tmp_queue.get())
讲完了,进程间的通信,接下来我们来看看,线程间的问题应该怎么解决呢, 什么? 线程间还需要解决通信吗,不是说线程共用一块内存地址,那还需要解决说什么问题呢?线程其实需要解决,共用同一片内存地址的问题,就是因为线程共有同一片内存空间,所以,有时候会造成当两个线程同时对一个变量进行修改的同时就会出现问题,那么应该怎么解决呢?别急,请往下看
线程锁
因为线程共享一块内存地址,所以当多个线程修改共享资源的时候,就会出现各种各样的问题
1 | from threading import Thread |
输出结果1
2
3第一次--> 13417734
第二次--> 13641278
第三次--> 23328333
为什么每次的结果会不一样呢,如果你给的值较小的话,是不会出现这种问题的,只有你的值给的比较大的话,那么就会出现这种问题了,首先我们分析一下,当 Python 解释器遇到 count += 1
的时候,其实的分了这么几个步骤的
- 先将
count += 1
变成count = count + 1
- 然后先计算
count + 1
- 将
count + 1
得到的值重新赋值给count
那么应该怎么解决这个问题呢, Python 给我们提供了一个 锁 的东西,那么这个锁应该怎么用,其实也很简单
1 | from threading import Thread, Lock |
运行结果:1
2
3第一个运行结果--> 20000000
第二个运行结果--> 19999999
第三个运行结果--> 20000000
这样就算是解决了线程之间的资源竞争问题,锁的作用就是保证,在任何一个时刻,都只有一个线程在运行,但是,锁重要吗,其实在项目中,几乎不会用到,大家知道有这么个东西就可以了,注意: 锁可能会降低一点效率,但是,如果去了锁, 又要保持公共资源前提下,效率会降低 10 ~ 20 倍。而且多线程有个特性,就是会避开阻塞,所以大家就不要考虑怎么把锁去掉
对于给公共资源上锁,还有一种更优雅的写法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20from threading import Thread, Lock
count = 0
n = 10000000
lock = Lock() # 生成锁
def calc(n):
global count
for i in range(n):
with lock:
count += 1
t = Thread(target=calc, args=(n, ))
t1 = Thread(target=calc, args=(n, ))
t.start()
t1.start()
t.join()
t1.join()
print(count)
然后接下来,我们再来看看队列这么一个知识点
队列
队列,是一种数据结构,因为队列底层已经实现了锁,所以我们只需要会用就行了,所以队列就是用来解决安全问题的,队列一般是用在生产者和消费者直接的信息传递
我们在学习队列之前,先看看队列的原理
队列只有一个入口,一个出口,遵循先出先出(FIFO:First in First Out
)
先看代码:1
2
3
4
5
6
7from queue import Queue
q = Queue() # 实例一个队列对象 里面有一个 maxsize 参数,如果不传,那么则是无穷大,传了则表示 队列的一个最大值
q.put('a') # 向队列里面塞东西 入队
print(q.get()) # 向队列里面取东西 出队
上面就是一个最简单的队列,那么队列当然不止只有这么两个方法,还有比如说 full()
、empty()
、qsize()
、task_done()
和join()
,当然需要注意的是,full()
和empty()
是近似值,就是说,不是很正确,然后join()
和task_done()
这两个方法在下面会讲到,我们回过头来看看,get()
和put()
这么两个方法,一个就是塞,一个取,但是要注意,这玩意也会阻塞,比如当你 put
超过了,当初定义队列的最大值,那么就会阻塞,另外 get()
如果取出超过队列长度,它也会阻塞,接下来操作一下队列的其它一些方法1
2
3
4
5
6
7
8
9
10
11from queue import Queue
q = Queue(3) # 这个队列最大长度为3
q.put('a') # 向队列里面塞东西
q.put('b')
q.put('a')
print(q.empty()) # False
print(q.full()) # True
print(q.qsize()) # 3
print(q.get()) # 向队列里面取东西
接下来我们看下task_done
和join
这两个方法,先看代码1
2
3
4
5
6
7
8from queue import Queue
q = Queue(3) # 这个队列最大长度为3
q.put(1) # 往队列添加数据 计数器 +1
print(q.get()) # 仅仅取数据,不做任何操作
q.task_done() # 告诉队列 当前任务结束 计数器 -1
q.join() # 阻塞,一直到计数器为 0 才会停止阻塞
首先,我们要了解一个概念,在队列里面,存在着一个计数器的东西,当对队列进行 put
的时候,这个计数器就会 +1 ,但是,当对队列进行 get
的时候,计数器,并不会做任何操作,只有使用了 task_done
,这个计算器才会 -1 ,而 join
表示,只有当计算器为 0的时候,才不阻塞,否则就阻塞
除了 Python 自带的 queue 这么一个队列,还记得前面说过的 manager 本身带的一个 queue 吗,用法和使用本身自带的是一模一样的,只不过一个是针对线程,一个是进程,自带的是针对线程,manager 是针对进程1
2
3
4
5
6
7from multiprocessing import Manager
mgr = Manager()
q = mgr.Queue()
q.put('1')
print(q.get())
生产者和消费者
怎么说呢,其实生产者和消费者模式,就是一个负责生产,一个负责消费,它们之间通过队列进行传输数据,先看张图
可以把生产者和消费者都看成进程或线程
多线程实现生产者和消费者
1 | from threading import Thread |
因为不是守护线程,所以不需要加 join
代码输出1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63消费者,消费了987
生产者,生产了203
生产者,生产了540
消费者,消费了775
消费者,消费了432
生产者,生产了190
生产者,生产了779
消费者,消费了981
消费者,消费了203
消费者,消费了540
生产者,生产了198
生产者,生产了399
生产者,生产了951
消费者,消费了190
消费者,消费了779
生产者,生产了465
生产者,生产了318
消费者,消费了198
消费者,消费了399
生产者,生产了945
生产者,生产了947
消费者,消费了951
消费者,消费了465
生产者,生产了416
生产者,生产了776
消费者,消费了318
消费者,消费了945
生产者,生产了652
生产者,生产了304
消费者,消费了947
消费者,消费了416
生产者,生产了287
生产者,生产了449
消费者,消费了776
消费者,消费了652
消费者,消费了304
消费者,消费了287
生产者,生产了456
生产者,生产了883
生产者,生产了262
生产者,生产了703
消费者,消费了449
消费者,消费了456
生产者,生产了970
生产者,生产了517
消费者,消费了883
消费者,消费了262
生产者,生产了310
生产者,生产了563
消费者,消费了703
消费者,消费了970
生产者,生产了991
生产者,生产了743
消费者,消费了517
消费者,消费了310
消费者,消费了563
消费者,消费了991
生产者,生产了276
生产者,生产了326
生产者,生产了519
生产者,生产了292
消费者,消费了743
消费者,消费了276
为什么会有时候生产一个,获取两个,生产两个,获取一个等等,线程不仅仅阻塞会切换,还有轮询的切换,所以就会出现上述的情况,但是我们这个我们不用管为什么会这样,因为生产者就负责生产,消费者就负责消费,知道这个就够了
多进程实现生产者和消费者
1 | from multiprocessing import Process, Manager |
运行结果1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47生产者,生产了576
消费者,消费了786
生产者,生产了786
消费者,消费了971
生产者,生产了971
消费者,消费了989
生产者,生产了989
消费者,消费了290
生产者,生产了290
生产者,生产了695
消费者,消费了695
生产者,生产了988
消费者,消费了988
生产者,生产了300
消费者,消费了300
消费者,消费了830
生产者,生产了830
消费者,消费了728
生产者,生产了728
消费者,消费了626
生产者,生产了626
消费者,消费了315
生产者,生产了315
生产者,生产了945
消费者,消费了945
生产者,生产了197
消费者,消费了197
消费者,消费了755
生产者,生产了755
生产者,生产了691
消费者,消费了691
消费者,消费了438
生产者,生产了438
消费者,消费了296
生产者,生产了296
消费者,消费了806
生产者,生产了806
消费者,消费了957
生产者,生产了957
消费者,消费了916
生产者,生产了916
消费者,消费了231
生产者,生产了231
生产者,生产了325
消费者,消费了325
生产者,生产了904
消费者,消费了904
注意:为什么多进程的一定要加 Join ,因为 Manger 的是守护进程,所以要使用 join 来防止子进程会随着父进程的结束而终止