python-并发编程
操作系统相关
操作系统就是一个协调、管理和控制计算机硬件资源和软件资源的控制程序
多道技术
1 | 一 操作系统的作用: |
进程相关
- 进程:程序运行的过程,是一个动态的概念
- 程序:是一系列的代码文件,是一个静态的概念
并发、并行和串行
- 并发:是伪并行,多个任务看起来同时运行,单个CPU+多道技术就可以实现并发(并行也属于并发)
- 并行:多个任务真正意义上的同时运行,只有具备多个CPU才能实现并行
- 串行:一个任务运行完毕后才能开启下一个任务
提交任务的两种方式
同步:发出一个功能调用时,在没有得到结果之前,该调用就不会返回
异步:当一个异步功能调用发出之后,调用者不能立刻得到结果,当该异步功能完成后,通过状态、通知或回调来通知调用者
一个任务运行的三种状态
运行态:当前进程正在被CPU执行
阻塞态:正在执行的进程,由于等待某个事件而无法执行时,如遇到I/O
就绪态:当前进程没有被CPU执行
multiprocessing模块
python中的多线程无法利用多核优势(os.cpu_count()
查看),在python大部分情况使用多进程,python提供了multipprocessing模块
multiprocess模块功能众多,支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件
与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内
Process类
介绍
创建进程的类
1 | # 由改类实例化的对象,表示一个子进程中的任务,还没有启动 |
参数介绍
1 | group 参数未使用,值始终为None |
方法介绍
1 | p.start() 启动进程 |
属性介绍
1 | p.daemon 默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且此时 p不能创建自己的新进程,必须在p.start()之前设置 |
使用
在Windows中
Process()
必须放到if name == 'main':
下
开启进程方式一
1 | from multiprocessing import Process |
开启进程方式二
1 | from multiprocessing import Process |
os.getpid()
获取当前进程pid
os.getppid()
获取当前进程的父进程pid
进程之间的内存空间是隔离的
1 | from multiprocessing import Process |
进程对象的方法
join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23from multiprocessing import Process
import os
import time
import random
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self) -> None:
print('%s 正在运行, 进程号是 %s' % (self.name, os.getpid()))
time.sleep(random.randint(1, 3))
print('%s 运行结束, 进程号是 %s ' % (self.name, os.getpid()))
if __name__ == '__main__':
p = MyProcess('p1')
p.start()
p.join() # 保证子进程结束后才会向下执行,当前主线程处于等的状态,而p是处于运行的状态
# p.join(2) # 指定等待p子进程的时间,如果子进程p运行完直接往下执行,如果等了2s之后还没执行完也会向下执行
print('开始 主进程 %s ' % os.getpid())该方法并不是串行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46from multiprocessing import Process
import os
import time
import random
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self) -> None:
print('%s 正在运行, 进程号是 %s' % (self.name, os.getpid()))
time.sleep(random.randint(1, 3))
print('%s 运行结束, 进程号是 %s ' % (self.name, os.getpid()))
if __name__ == '__main__':
p1 = MyProcess('p1')
p2 = MyProcess('p2')
p3 = MyProcess('p3')
p4 = MyProcess('p4')
p5 = MyProcess('p5')
# 这几个进程是差不多一起一起的,并不是启动一个执行完之后再运行第二个进程,是让主进程等,而不是让后面的子进程等
p1.start()
p2.start()
p3.start()
p4.start()
p5.start()
# p_list = [p1, p2, p3, p4, p5]
# for p in p_list:
# p.start()
# 但是当 p1 执行完成后确实要等后面的 p2-p5 进程执行完成后才能继续往后
p1.join()
p2.join()
p3.join()
p4.join()
p5.join()
#for p in p_list:
# p.join()
print('主进程 %s ' % os.getpid())terminate()和is_alive()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18from multiprocessing import Process
import os
import time
import random
def task(name):
print('%s is run, task is %s ' % (name, os.getppid()))
time.sleep(random.randint(1, 3))
print('%s is end,task is %s ' % (name, os.getpid()))
if __name__ == '__main__':
p = Process(target=task, args=('test',))
p.start()
p.terminate() # 关闭进程,不会立即关闭
print(p.is_alive()) # 所以此时查看进程是否存活时为True
print('main is start ')
print(p.is_alive()) # 子进程已经关闭了,此时为Falsename和pid
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23class MyProcess(Process):
def __init__(self, name):
# self.name=name
# super().__init__() #Process的__init__方法会执行self.name=Piao-1,
# #所以加到这里,会覆盖我们的self.name=name
#为我们开启的进程设置名字的做法
super().__init__()
self.name = name
def run(self) -> None:
print('%s is run' % self.name)
time.sleep(random.randint(1, 3))
print('%s is end' % self.name)
if __name__ == '__main__':
p = MyProcess('test')
p.start()
print('main is run')
print(p.pid) # 查看pid
进程相关
僵尸进程
1 | 僵尸进程:一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程 |
产生僵尸进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14from multiprocessing import Process
import os
import time
def run():
print('子', os.getpid())
if __name__ == '__main__':
p = Process(target=run)
p.start()
print('主', os.getpid())
time.sleep(1000)查看僵尸进程
1
ps aux|grep Z # SATA 显示 Z 就是僵尸进程
解决办法
1
2
3
4
51. 杀死父进程
kill -CHLD 父进程的pid
kill -9 父进程的pid
2. 对开启的子进程应该记得使用join,join会回收僵尸进程
3. https://blog.csdn.net/u010571844/article/details/50419798问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17from multiprocessing import Process
import time,os
def task():
print('%s is running' %os.getpid())
time.sleep(3)
if __name__ == '__main__':
p=Process(target=task)
p.start()
p.join() # 等待进程p结束后,join函数内部会发送系统调用wait,去告诉操作系统回收掉进程p的id号
print(p.pid) #???此时能否看到子进程p的id号
print('主')
# p.join()是像操作系统发送请求,告知操作系统p的id号不需要再占用了,回收就可以,
# 此时在父进程内还可以看到p.pid,但此时的p.pid是一个无意义的id号,因为操作系统已经将该编号回收
孤儿进程
1 | 当父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程,由于进程不可能脱离进程树而独立存在,孤儿进程将被PID为1的init进程所收养,并由init进程对它们完成状态收集工作。孤儿进程被收养后进行正常的释放,没有危害 |
演示代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16from multiprocessing import Process
import os
import time
def run():
print('子', os.getpid())
time.sleep(50)
if __name__ == '__main__':
p1 = Process(target=run)
p2 = Process(target=run)
p1.start()
p2.start()
print('主', os.getpid())现象
两个子进程并没有退出,此时两个子进程的父进程由 1 接管,当时间久了之后会被释放掉
守护进程
主进程创建守护进程
- 守护进程会在主进程代码执行结束后就终止
- 守护进程内无法再开启子进程,否则抛出异常: AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
实例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import random
from multiprocessing import Process
import os
import time
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self) -> None:
print('%s is run' % self.name)
time.sleep(random.randint(1, 3))
print('%s is end ' % self.name)
p1 = MyProcess('p1')
p1.daemon = True # 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
p1.start()
print('main is run', os.getpid())
# 结果:main is run 可以看到子线程没有执行
互斥锁
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的
而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理
代码一
没加锁的情况
1 | from multiprocessing import Process |
加锁之后的情况
1 | #由并发变成了串行,牺牲了运行效率,但避免了竞争 |
代码二
文件当数据库,模拟抢票
不加锁的情况
1 | # 并发运行,效率高,但是在竞争一个文件,数据写入错乱 |
加锁之后
1 | # 查票还是并发,但是在购票的时候由并发变成了串行,牺牲了运行效率,但保证了数据安全 |
总结
1 | #加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 |
IPC机制
进程彼此之间互相隔离,要实现进程之间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
管道
ps -ef |grep xx 前面的进程产生的数据交给后面的进程
队列
底层就是以管道和锁定的方式实现
创建队列的类
1 | Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递 |
主要方法
1 | q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。blocked为True(默认值)如果 |
其他方法
1 | q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞 |
应用
1 | from multiprocessing import Process, Queue |
生产者消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度
为什么要使用生产者和消费者模式
- 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式
什么是生产者消费者模式
- 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
总结
1
2
3
4
5
6
7
8
9
10#程序中有两类角色
一类负责生产数据(生产者)
一类负责处理数据(消费者)
#引入生产者消费者模型为了解决的问题是
平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度
#如何实现
生产者<-->队列<——>消费者
#生产者消费者模型实现类程序的解耦和基于队列实现生产者消费者模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41from multiprocessing import Process, Queue
import time, os, random
def producer(q, name, courier):
for i in range(3):
res = '%s %s ' % (courier, i)
time.sleep(random.randint(1, 3))
q.put(res)
print('%s 送来 %s ' % (name, res))
q.put(None) # 结束之后发送None信息到队里里面,有几个消费者就发几个None
q.put(None)
def consumer(q, name):
while True:
res = q.get()
if res is None:
break
time.sleep(random.randint(1,3))
print('%s 拿到了 %s' % (name, res))
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q, '快递员1', 'sf'))
p2 = Process(target=producer, args=(q, '快递员2', 'yz'))
p3 = Process(target=producer, args=(q, '快递员3', 'jd'))
c1 = Process(target=consumer, args=(q, '拿货人1'))
c2 = Process(target=consumer, args=(q, '拿货人2'))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
print('%s is run ' % os.getpid())
JoinableQueue([maxsize])`
这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
介绍
1 | #参数介绍: |
优化上面队列代码
1 | from multiprocessing import Process, JoinableQueue |
信号量
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁
信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
1 | from multiprocessing import Process, Semaphore |
线程相关
线程是进程内代码运行的过程,线程是一个执行单位,CPU执行的就是线程。进程是一个资源单位
线程和进程的区别
- 同一进程下的多个线程共享该进程的内存资源,线程之间可以互相通信
- 开启子线程的开销要远远小于开启子线程
线程相关的方法
1 | Thread实例对象的方法 |
开启线程的两种方式
方式一
1
2
3
4
5
6
7
8
9from threading import Thread, current_thread
def task():
print('%s is running ' % current_thread().name)
if __name__ == '__main__':
t = Thread(target=task)
t.start()
print('主线程', current_thread().name)方式二
1
2
3
4
5
6
7
8
9
10
11
12
13from threading import Thread, current_thread
class MyThread(Thread):
def __init__(self):
super().__init__()
def run(self) -> None:
print('%s is running ' % current_thread().name) # 打印当前线程名
if __name__ == '__main__':
t = MyThread()
t.start()
print('主线程', current_thread().name)
- 线程之间数据相互影响
1 | from threading import Thread, current_thread |
守护线程
无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁
需要强调的是:运行完毕并非终止运行
1 | 1. 对主进程来说,运行完毕指的是主进程代码运行完毕 |
1 | 1. 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束 |
代码案例
1 | from threading import Thread, current_thread |
互斥锁
现象:
1 | from threading import Thread, current_thread |
加锁
1 | from threading import Thread, current_thread,Lock |
信号量
1 | import random |
Event
同进程的一样
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
1 | event.isSet():返回event的状态值; |
案例代码一
1 | from threading import Event, Thread, current_thread |
模拟红绿灯
1 | from threading import Event, Thread, current_thread |
定时器
定时器Timer类是Thread的派生类,用于在指定时间后调用一个方法。
指定n秒后执行某操作
1 | from threading import Timer |
线程queue
queue队列 :使用import queue,用法与进程Queue一样
当信息必须在多个线程之间安全交换时,队列在线程编程中特别有用
基本方法
1
2
3
4put 往线程队列里防止,超过队列长度,直接阻塞
get 从队列中取值,如果获取不到,直接阻塞
put_nowait: 如果放入的值超过队列长度,直接报错(linux)
get_nowait: 如果获取的值已经没有了,直接报错用法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54import queue
# 队列:先进先出
q = queue.Queue(3) # 指定队列的大小
q.put(111) # 整型
q.put("aaa") # 字符串
q.put((1,2,3)) # 元组
print(q.get())
print(q.get())
print(q.get())
'''
111
aaa
(1, 2, 3)
'''
# 堆栈:后进先出
q = queue.LifoQueue(3)
q.put(111)
q.put("aaa")
q.put((1,2,3))
print(q.get())
print(q.get())
print(q.get())
'''
(1, 2, 3)
aaa
111
'''
# 优先级队列:
# 1.默认按照数字大小排序,然后会按照ascii编码在从小到大排序
# 2.先写先排,后写后排
q = queue.PriorityQueue(3)
q.put((10,111)) # 第一个值是优先级,第二值才是要放的元素
q.put((11,"aaa"))
q.put((-1,(1,2,3)))
print(q.get())
print(q.get())
print(q.get())
'''
(-1, (1, 2, 3)) # 数越小优先级越高
(10, 111)
(11, 'aaa')
'''
死锁和递归锁
死锁是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程
代码演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56from threading import Thread, Lock
import time
mutexA = Lock()
mutexB = Lock()
class MyThread(Thread):
def __init__(self, name):
super().__init__()
self.name = name
def f1(self):
mutexA.acquire()
print('%s 抢到了A锁 ' % self.name)
mutexB.acquire()
print('%s 抢到了B锁 ' % self.name)
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print('%s 抢到了B锁 ' % self.name)
time.sleep(0.1)
mutexA.acquire()
print('%s 抢到了A锁 ' % self.name)
mutexA.release()
mutexB.release()
def run(self) -> None:
self.f1()
self.f2()
if __name__ == '__main__':
t1 = MyThread('线程1')
t2 = MyThread('线程2')
t3 = MyThread('线程3')
t4 = MyThread('线程4')
t1.start()
t2.start()
t3.start()
t4.start()
print('主线程')
# 线程1 抢到了A锁
# 线程1 抢到了B锁
# 线程1 抢到了B锁
# 线程2 抢到了A锁
# 主线程
# 此时卡在这了解决方法
递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock
这个RLock内部维护着一个Lock和一个计数(counter)变量,计数记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48from threading import Thread, Lock, RLock
import time
mutexA = mutexB = RLock()
class MyThread(Thread):
def __init__(self, name):
super().__init__()
self.name = name
def f1(self):
mutexA.acquire()
print('%s 抢到了A锁 ' % self.name)
mutexB.acquire()
print('%s 抢到了B锁 ' % self.name)
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print('%s 抢到了B锁 ' % self.name)
time.sleep(0.1)
mutexA.acquire()
print('%s 抢到了A锁 ' % self.name)
mutexA.release()
mutexB.release()
def run(self) -> None:
self.f1()
self.f2()
if __name__ == '__main__':
t1 = MyThread('线程1')
t2 = MyThread('线程2')
t3 = MyThread('线程3')
t4 = MyThread('线程4')
t1.start()
t2.start()
t3.start()
t4.start()
print('主线程')多线程实现TCP并发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41# 服务端
import socket
from multiprocessing import Process
from threading import Thread
s = socket.socket()
s.bind(('127.0.0.1', 8080))
s.listen(5)
def task(sock):
while True:
try:
res = sock.recv(1024)
if len(res) == 0: break
data = res.upper()
sock.send(data)
except Exception:
break
sock.close()
while True:
sock, address = s.accept()
print(address)
t = Thread(target=task, args=(sock,))
t.start()
# 客户端
import socket
c = socket.socket()
c.connect(('127.0.0.1', 8080))
while True:
cmd = input('>>>:').strip()
if len(cmd) == 0: continue
c.send(cmd.encode('utf8'))
data = c.recv(1024)
print(data.decode('utf8'))
GIL全局解释器锁
介绍
GIL的全称是:Global Interpreter Lock,意思就是全局解释器锁
1 | ''' |
首先需要明确的一点是GIL
并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL
归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL
GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全
综上:
如果多个线程的target=work,那么执行流程是
多个线程先访问到解释器的代码,即拿到执行权限,然后将target的代码交给解释器的代码去执行
解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据100,可能线程1执行x=100的同时,而垃圾回收执行的是回收100的操作,解决这种问题没有什么高明的方法,就是加锁处理,如下图的GIL,保证python解释器同一时间只能执行一个任务的代码
GIL与Lock
只要在一个进程里就一定有GIL锁的存在,GIL锁不能保证python数据的安全,它保证的是解释器级别(内存管理)的安全,也可以说是背后存在的一种机制。可以肯定的一点是:保护不同的数据的安全,就应该加不同的锁。
GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理,如下图:
1 | from threading import Thread,Lock |
GIL与多线程
对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用
对运行一个程序来说,随着cpu的增多执行效率肯定会有所提高(不管提高幅度多大,总会有所提高),这是因为一个程序基本上不会是纯计算或者纯I/O,所以我们只能相对的去看一个程序到底是计算密集型还是I/O密集型,从而进一步分析python的多线程到底有无用武之地
场景:
1 | 分析: |
多线程性能测试
计算密集型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24from multiprocessing import Process
from threading import Thread
import os, time
def work():
res = 0
for i in range(100000000):
res *= 1
if __name__ == '__main__':
l = []
print(os.cpu_count()) # 查看cpu核数
start_time = time.time()
for i in range(8):
p = Process(target=work) # 进程 7.7s多
# p = Thread(target=work) # 线程 28s多
l.append(p)
p.start()
for p in l:
p.join()
stop_time = time.time()
print('run time is %s ' % (stop_time - start_time))I/O密集型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21from multiprocessing import Process
from threading import Thread
import os, time
def work():
time.sleep(2)
if __name__ == '__main__':
l = []
# print(os.cpu_count()) # 查看CPU核数
start = time.time()
for i in range(1000):
p=Process(target=work) # 使用进程
# p = Thread(target=work) # 使用线程比进程效率稍高
l.append(p)
p.start()
for p in l:
p.join()
stop = time.time()
print('run time is %s' % (stop - start))结论
多线程用于IO密集型,如socket,爬虫,web
多进程用于计算密集型,如金融分
进程池与线程池
在刚开始接触多进程或多线程时,我们迫不及待地基于多进程或多线程实现并发的套接字通信,然而这种实现方式的致命缺陷是:服务的开启的进程数或线程数都会随着并发的客户端数目地增多而增多,这会对服务端主机带来巨大的压力,甚至于不堪重负而瘫痪。于是我们必须对服务端开启的进程数或线程数加以控制,让机器在一个自己可以承受的范围内运行,这就是进程池或线程池的用途,例如进程池,就是用来存放进程的池子,本质还是基于多进程,只不过是对开启进程的数目加上了限制
Python标准模块concurrent.futures
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26# 1、介绍
concurrent.futures模块是用来创建并行的任务,提供了高度封装的异步调用接口
concurent.future这个模块用起来非常方便,它的接口也封装的非常简单,既可以实现进程池,也可以实现线程池
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
两者都实现了同一个接口,这个接口是由抽象Executor类定义的。
# 2、基本方法
submit(fn, *args, **kwargs)
异步提交任务
map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作
shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
result(timeout=None)
取得结果
add_done_callback(fn)
回调函数
进程池
1 | """ |
线程池
1 | """ |
协程
介绍
协程是单线程下实现的并发,协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的
对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程。
python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
对比操作系统控制线程的切换,用户在单线程内控制协程的切换
特点: 自己的应用程序实现多个人的调度
遇到I/O切换,可以将单线程的I/O降到最低,因此可以将单线程的威力发挥到最大
缺点: 不能实现并行
单线程下的多个任务一旦遇到I/O,整个线程都会阻塞,所有的任务都停滞
总结
- 必须在只有一个单线程里实现并发
- 修改共享数据不需加锁
- 用户程序里自己保存多个控制流的上下文栈
- 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))**
yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换
Gevent模块
Gevent是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
Gevent内部会用到greenlet这个模块,这个模块就是多个任务之间来回的切,切走之前把一个任务的状态保留下来,它们的底层都会用到yield,其实就是层层帮我们封装好了。greenlet内部会封装yield,Gevent就是对greenlet进行了进一步的封装,封装后greenlet会帮忙检测I/O,实现遇到I/O切换,这个才是我们所追求的协程
使用方法
1
2
3
4
5
6
7
8
9
10
11
12g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,
如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的
g2=gevent.spawn(func2)
g1.join() 等待g1结束
g2.join() 等待g2结束
或者上述两步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值遇到IO阻塞时自动切换任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import gevent
def eat(name):
print('%s eat 1' %name) # 1.吃了一口饭
gevent.sleep(2) # 2.原地睡了2秒,相当于模拟遇到I/O了
print('%s eat 2' %name) # 6.接着打印又回来吃了一口饭
def play(name):
print('%s play 1' %name) # 3.遇到I/O以后就切到了另外一个任务,玩了一下
gevent.sleep(1) # 4.又遇到I/O了,睡了1秒,它先睡完
print('%s play 2' %name) # 5.接着又玩了一下,原本应该切到eat 2,但是仍在阻塞中
g1=gevent.spawn(eat,'egon') # spawn提交eat任务,然后提交一个人名。协程1
g2=gevent.spawn(play,name='egon')# spawn提交playt任务。协程2
g1.join() # 等着协程对象g1结束
g2.join() # 等着协程对象g2结束
#或者gevent.joinall([g1,g2])
print('主')
'''
上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了
'''打补丁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26'''
from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头
'''
from gevent import monkey;monkey.patch_all()
import gevent
import time
def eat():
print('eat food 1')
time.sleep(2)
print('eat food 2')
def play():
print('play 1')
time.sleep(1)
print('play 2')
g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('主')
"""
单线程下能抗住的并发已经非常非常高了,因为现在接触的软件大部分都是I/O密集型的
其实单线程下完全可以一个任务运行完以后(它真正运行完花的时间是非常短的,大量时间都在做I/O)
可以利用运行一段时间遇到I/O操作了就快速切换另一个任务再运行,在多任务之间快速的切
"""基于协程实现并发
通过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞)
服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40# 首先导了猴子补丁,打了补丁保证下面所有模块的I/O行为都能监测到
from gevent import monkey;monkey.patch_all()
from socket import * # 然后导了socket模块,准备写套接字
import gevent # 最后导入gevent模块, 用来单线程下实现并发
def server(server_ip,port): # 套接字服务端任务1:建链接
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind((server_ip,port)) # 绑定ip和端口
s.listen(5) # 监听
while True:
conn,addr=s.accept() # 等待链接请求
# 每建成一个链接,就提交一个协程对象进行通信,异步提交
gevent.spawn(talk,conn,addr)
def talk(conn,addr): # 套接字服务端任务2:建通信
try:
while True:
res=conn.recv(1024) # 收消息
print('client %s:%s msg: %s' %(addr[0],addr[1],res))
conn.send(res.upper()) # 回消息,大写回
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1',8080) # 把ip和端口传进去
# 注:没必要join在原地等了,因为服务端在启动运行起来后,服务端函数是一个死循环,
# 不会结束,既然主进程不会结束那就不用再等了
"""
整体逻辑:就一个线程server,没有多线程也没有多进程,这个线程每建成一个链接就提交
一个协程对象,gevent会帮你在多个任务之间遇到I/O来回快速的切换,从而实现并发效果
如何证明并发的效果?
服务端启动起来后,同时多个客户端连接过去,如果多个客户端能同时得到结果,并发效果
就实现了
"""客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22# 可同时开多个客户端(客户端1、客户端2、客户端3)
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
client.send("hello".encode('utf-8')) # 在不停的向服务端发送“hello”
msg=client.recv(1024) # 收消息,在不停的收HELLO
print(msg.decode('utf-8'))
"""
解析:
三个客户端都能同时不停的发消息和收消息,都有并发效果,但服务端没有开多线程,事实上
就是服务端在多个任务之间来回的切换
其实就是给第一个客户端执行一个seed来发送I/O请求,只要seed发出之后运行完就是操作
系统的任务了,seed负责发消息,操作系统负责做I/O。gevent模块会利用你seed的过程
直接切到下一个任务,再切到下下一个任务,一直往下切,给客户端的感觉就是每一个客户端
都能被服务,并发就实现了
"""
IO模型
简介
IO模型研究的主要是网络IO(linux系统)
- 同步(synchronous) 大部分情况下会采用缩写的形式 sync
- 异步(asynchronous) async
- 阻塞(blocking)
- 非阻塞(non-blocking)
五种IO模型:
* blocking IO 阻塞IO
* nonblocking IO 非阻塞IO
* IO multiplexing IO多路复用
* signal driven IO 信号驱动IO
* asynchronous IO 异步IO
由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model
四种IO模型简介
阻塞IO
最为常见的一种IO模型 有两个等待的阶段(wait for data、copy data)
非阻塞IO
系统调用阶段变为了非阻塞(轮训) 有一个等待的阶段(copy data)
轮训的阶段是比较消耗资源的
多路复用IO
利用select或者epoll来监管多个程序 一旦某个程序需要的数据存在于内存中了 那么立刻通知该程序去取即可
异步IO
只需要发起一次系统调用 之后无需频繁发送 有结果并准备好之后会通过异步回调机制反馈给调用者