进程池和线程池是什么, 我们可以直接想成是一个池子,那么为什么会有这么一个进程池或者线程池的诞生呢,是由于每次创建进程/线程的成本太大,所以就有这么一个池子里面放了一些已经提前创建好的进程/线程,供我们使用
在使用进程池和线程池之前,我们先来自己实现一个可以重复利用的线程和线程池,然后我们再来了解如何使用 Python 自带的池
重复利用一个线程
首先来看一段代码
1
2
3
4
5
6
7
8
9
10
11 from threading import Thread
def fn():
print('qwe')
t = Thread(target=fn)
t.start()
t.join() # 表示主线程阻塞等待子线程线程
t.start() # 在join之后再次调用 start
以上的代码是会报错,这就是默认情况下,一个线程如果你执行了 join ,那就表示这个线程已经结束了,那么应该怎么实现一个可以重复利用的线程,我们先回顾下面向对象的方式用线程,那么先看代码
1 | from threading import Thread, current_thread |
以上表明,只有重写的 run 方法,才是属于子线程的内容,其她全部都是属于主线程的,这一点很重要,我们就可以通过这种方式来达到一个可以不断重复使用的线程1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49from threading import Thread
from queue import Queue
class MyThread(Thread):
def __init__(self):
super().__init__()
# 设置为守护线程 会在主线程执行完所以代码就被干掉
self.daemon = True
# 直接生成一个队列
self.queue = Queue()
self.start()
# 只有重写的 run 方法里面才是子线程
def run(self):
# 不断的拿任务
while True:
task = self.queue.get()
task()
self.queue.task_done() # 表示计数器 -1
# 这是主线程的 不断的丢任务
def apply_async(self, task):
self.queue.put(task)
# 因为设置了守护线程,所以需要设置 join 用来执行子线程的代码
def join(self):
self.queue.join()
def fn1():
print(1)
def fn2():
print(2)
t = MyThread()
# t.start() # 可以直接写到 init 里面 在初始化的时候就默认调用 start
t.apply_async(fn1)
t.apply_async(fn2)
t.apply_async(fn1)
t.join() # 阻塞到计数器为 0
######## 执行结果
1
2
1
上面就是一个可以重复使用的线程,现在主要来逻辑就是利用队列和生产者和消费者模式实现的,那么接下来我们如何自己实现一个线程池
自定义线程池
Talk is cheap. Show me the code
直接看代码
1 | from threading import Thread, current_thread |
上面就是自己实现的一个线程池,但是其实Python内部本身就定义定义了线程和进程池,我们前面的目的只是为了让大家理解有这么一个东西
自带的池子
1 | from multiprocessing import Pool |
通过这个返回的值,我们可以知道,创建线程的消耗比创建进程低很多,所以在能使用线程的时候,就别用进程
自带的池可以获取返回值1
2
3
4
5
6
7
8
9
10
11
12
13
14
15from multiprocessing.pool import ThreadPool
import time
def func():
time.sleep(5)
return '1111'
tp = ThreadPool(4)
print(1, time.strftime('%H:%M:%S', time.localtime()))
result = tp.apply_async(func)
print(2, time.strftime('%H:%M:%S', time.localtime()))
print(result.get())
print(3, time.strftime('%H:%M:%S', time.localtime()))
使用池来实现并发服务器
1 | import socket |
1 | import socket |