进程池和线程池

进程池和线程池是什么, 我们可以直接想成是一个池子,那么为什么会有这么一个进程池或者线程池的诞生呢,是由于每次创建进程/线程的成本太大,所以就有这么一个池子里面放了一些已经提前创建好的进程/线程,供我们使用

在使用进程池和线程池之前,我们先来自己实现一个可以重复利用的线程和线程池,然后我们再来了解如何使用 Python 自带的池

重复利用一个线程

首先来看一段代码

1
2
3
4
5
6
7
8
9
10
11
from threading import Thread


def fn():
print('qwe')


t = Thread(target=fn)
t.start()
t.join() # 表示主线程阻塞等待子线程线程
t.start() # 在join之后再次调用 start

以上的代码是会报错,这就是默认情况下,一个线程如果你执行了 join ,那就表示这个线程已经结束了,那么应该怎么实现一个可以重复利用的线程,我们先回顾下面向对象的方式用线程,那么先看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from threading import Thread, current_thread


class MyThread(Thread):
def __init__(self):
super().__init__()
print('init method {}'.format(current_thread()))

# 只有重写的 run 方法里面才是子线程
def run(self):
print('run method {}'.format(current_thread()))

# 这是主线程的
def apply_async(self):
print('apply method {}'.format(current_thread()))


t = MyThread()
t.start()
t.apply_async()

###### 代码输出
init method <_MainThread(MainThread, started -1219012864)>
apply method <_MainThread(MainThread, started -1219012864)>
run method <MyThread(Thread-1, started -1222968512)>

以上表明,只有重写的 run 方法,才是属于子线程的内容,其她全部都是属于主线程的,这一点很重要,我们就可以通过这种方式来达到一个可以不断重复使用的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from threading import Thread
from queue import Queue


class MyThread(Thread):
def __init__(self):
super().__init__()
# 设置为守护线程 会在主线程执行完所以代码就被干掉
self.daemon = True
# 直接生成一个队列
self.queue = Queue()
self.start()

# 只有重写的 run 方法里面才是子线程
def run(self):
# 不断的拿任务
while True:
task = self.queue.get()
task()
self.queue.task_done() # 表示计数器 -1

# 这是主线程的 不断的丢任务
def apply_async(self, task):
self.queue.put(task)

# 因为设置了守护线程,所以需要设置 join 用来执行子线程的代码
def join(self):
self.queue.join()


def fn1():
print(1)


def fn2():
print(2)


t = MyThread()
# t.start() # 可以直接写到 init 里面 在初始化的时候就默认调用 start
t.apply_async(fn1)
t.apply_async(fn2)
t.apply_async(fn1)
t.join() # 阻塞到计数器为 0

######## 执行结果
1
2
1

上面就是一个可以重复使用的线程,现在主要来逻辑就是利用队列和生产者和消费者模式实现的,那么接下来我们如何自己实现一个线程池

自定义线程池

Talk is cheap. Show me the code 直接看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from threading import Thread, current_thread
from queue import Queue
import time


class ThreadPool:
def __init__(self, n):
self.queue = Queue()
for i in range(n):
# 把队列传到每个线程中
Thread(target=self.func, daemon=True).start()

# 就是线程的内容
def func(self):
while True:
task = self.queue.get()
task()
print(current_thread())
self.queue.task_done()

def apply_async(self, task):
self.queue.put(task)

def join(self):
self.queue.join()


def func():
time.sleep(5)


tp = ThreadPool(3)
tp.apply_async(func)
tp.apply_async(func)
tp.apply_async(func)
tp.join()

上面就是自己实现的一个线程池,但是其实Python内部本身就定义定义了线程和进程池,我们前面的目的只是为了让大家理解有这么一个东西

自带的池子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
import time

def fn(i):
print('{}===='.format(i))
time.sleep(2)

if __name__ == '__main__':
start_time = time.time()
pool = ThreadPool(100)
for i in range(100):
pool.apply_async(func=fn,args=(i,))
pool.close()
pool.join()
print('运行时间为 {}'.format(time.time()-start_time))

通过这个返回的值,我们可以知道,创建线程的消耗比创建进程低很多,所以在能使用线程的时候,就别用进程

自带的池可以获取返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing.pool import ThreadPool
import time


def func():
time.sleep(5)
return '1111'


tp = ThreadPool(4)
print(1, time.strftime('%H:%M:%S', time.localtime()))
result = tp.apply_async(func)
print(2, time.strftime('%H:%M:%S', time.localtime()))
print(result.get())
print(3, time.strftime('%H:%M:%S', time.localtime()))

使用池来实现并发服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import socket
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ThreadPool

server = socket.socket()
server.bind(('', 9000))
server.listen(1000)
print('开始监听....')

def accept(server):
tp = ThreadPool(10)
while True:
con, addr = server.accept()
print('客户端 {}已经连接'.format(addr))
tp.apply_async(func=work_thread, args=(con, addr ))


def work_thread(con, addr):
while True:
data = con.recv(1000)
if data:
print('来自{}客户端的{}'.format(addr, data.decode()))
con.send(data)
else:
con.close()
print('客户端{}已经断开连接'.format(addr))
break


n = cpu_count()
pool = Pool(n)
for i in range(n):
print('等待客户端连接')
pool.apply_async(func=accept, args=(server, ))

pool.close()
pool.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import socket
from multiprocessing import Pool

server = socket.socket()
server.bind(('', 8888))
server.listen(5)
print('listen ....')

def read(con):
while True:
data = con.recv(1000)
if data:
print(data)
con.send(data)
else:
con.close()
break


pool = Pool(4)
while True:
con, addr = server.accept()
pool.apply_async(func=read, args=(con, ))
---------------- The   End ----------------