python之并发编程进阶篇9
分类:
IT文章
•
2023-11-07 10:31:27
一、守护进程和守护线程
1)守护进程的概念
什么是守护进程:
守护: 在主进程代码结束情况下,就立即死掉
守护进程本质就是一个子进程,该子进程守护着主进程
为何要用守护进程
守护进程本质就是一个子进程,所以在主进程需要将任务并发执行的时候需要开启子进程
当该子进程执行的任务生命周期伴随主进程整个生命周期的时候,就需要将该子进程做成守护的进程
2)创建守护进程
from multiprocessing import Process
import time
def task(x):
print('%s is running' %x)
time.sleep(1)
print('%s is done' % x)
if __name__ == '__main__':
p=Process(target=task,args=('守护进程',))
p.daemon=True # 必须放到p.start()之前
p.start()
time.sleep(3)
print('主')
View Code
3)守护线程的概念
主线程要等到该进程内所有非守护线程(子线程)都死掉才算死掉,因为主线程的生命周期
代表了该进程的生命周期,该进程一定是要等到所有非守护的线程都干完活才应该死掉
可以简单理解为:
守护线程是要等待该进程内所有非守护的线程都运行完毕才死掉
4)创建守护线程
from threading import Thread
import time
def task(x):
print('%s is running' %x)
time.sleep(3)
print('%s is done' % x)
if __name__ == '__main__':
t=Thread(target=task,args=('守护线程',))
t.daemon=True # 必须放到p.start()之前
t.start()
print('主')
View Code
二、互斥锁和信号量与GIL全局解释器锁,死锁及递归锁
1)互斥锁的意义
互斥锁的原理是将进程/线程内执行的部分代码由并发执行变成穿行执行,牺牲了效率但保证数据安全
互斥锁不能连续低执行mutex.acquire()操作,必须等到拿着锁的进程释放锁mutex.release()其他进程才能抢到
2)进程mutex=Lock()
from multiprocessing import Process,Lock
import json
import os
import time
import random
mutex=Lock()
def check():
with open('db.json','rt',encoding='utf-8') as f:
dic=json.load(f)
print('%s 剩余票数:%s' %(os.getpid(),dic['count']))
def get():
with open('db.json','rt',encoding='utf-8') as f:
dic=json.load(f)
time.sleep(1)
if dic['count'] > 0:
dic['count']-=1
time.sleep(random.randint(1,3)) #模拟网络延迟
with open('db.json','wt',encoding='utf-8') as f:
json.dump(dic,f)
print('%s 抢票成功' %os.getpid())
def task(mutex):
# 并发查看
check()
# 串行购票
mutex.acquire()
get()
mutex.release()
if __name__ == '__main__':
for i in range(7):
p=Process(target=task,args=(mutex,))
p.start()
# p.join() # 将p内的代码变成整体串行
View Code
3)信号量。设置能同时执行任务的数量
# 比如公共厕所,能同时上厕所的有4个位置
from multiprocessing import Process,Semaphore
import os
import time
import random
sm=Semaphore(4)
def go_wc(sm):
sm.acquire()
print('%s is wcing' %os.getpid())
time.sleep(random.randint(1,3))
sm.release()
if __name__ == '__main__':
for i in range(20):
p=Process(target=go_wc,args=(sm,))
p.start()
View Code
4)线程问题版。线程中修改同一个数据,数据存在不安全,故障
from threading import Thread
import time
n = 100
def task():
global n
temp = n
time.sleep(0.1)
n = temp -1
if __name__ == '__main__':
t_l = []
for i in range(100):
t = Thread(target=task)
t_l.append(t)
t.start()
for t in t_l:
t.join()
print(n)
View Code
5)线程互斥锁修改版
from threading import Thread,Lock
import time
mutex = Lock()
n = 100
def task():
global n
with mutex: # 拿到锁,自动释放锁
temp = n
time.sleep(0.1)
n = temp -1
if __name__ == '__main__':
t_l = []
start_time = time.time()
for i in range(50):
t = Thread(target=task)
t_l.append(t)
t.start()
for t in t_l:
t.join()
print(n)
print(time.time()- start_time)
View Code
6)GIL的意义。判断什么情况下使用线程和进程
1 GIL是什么
GIL是全局解释器锁,本质就是一把互斥锁
GIL是Cpython解释器的特性,而不是python的特性
每启动一个进程,该进程就会有一个GIL锁,用来控制该进程内的多个线程同一时间只有一个执行
这意味着Cpython解释器的多线程没有并行的效果,但是有并发的效果
2、为什么要有GIL
因为Cpython解释器的垃圾回收机制不是线程安全的
3、GIL vs 自定义互斥锁
在一个进程内的多个线程要想执行,首先需要抢的是GIL,GIL就相当于执行权限
python的多进程用于计算密集型
python的多线程用于IO密集型
7)计算密集型中,进程计算和线程计算对比
进程计算,计算密集型。利用多核cpu的优势,但进程数不能超过核数的2倍,会大量消耗cpu资源。计算时间 27.400567293167114
from multiprocessing import Process
import time
def task1():
res=1
for i in range(100000000):
res*=i
def task2():
res = 1
for i in range(100000000):
res += i
def task3():
res = 1
for i in range(100000000):
res -= i
def task4():
res = 1
for i in range(100000000):
res += i
if __name__ == '__main__':
start_time=time.time()
p1=Process(target=task1)
p2=Process(target=task2)
p3=Process(target=task3)
p4=Process(target=task4)
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
stop_time=time.time()
print(stop_time-start_time) #27.400567293167114
计算密集型任务用多进程
线程进行密集计算。计算时间 86.84396719932556
import time
from threading import Thread
def task1():
res=1
for i in range(100000000):
res*=i
def task2():
res = 1
for i in range(100000000):
res += i
def task3():
res = 1
for i in range(100000000):
res -= i
def task4():
res = 1
for i in range(100000000):
res += i
if __name__ == '__main__':
start_time=time.time()
p1=Thread(target=task1)
p2=Thread(target=task2)
p3=Thread(target=task3)
p4=Thread(target=task4)
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
stop_time=time.time()
print(stop_time-start_time) # 86.84396719932556
View Code
8)IO密集型中。进程与线程对比
进程完成时间 3.5172011852264404
import time
from multiprocessing import Process
def task1():
time.sleep(3)
def task2():
time.sleep(3)
def task3():
time.sleep(3)
def task4():
time.sleep(3)
if __name__ == '__main__':
start_time=time.time()
p1=Process(target=task1)
p2=Process(target=task2)
p3=Process(target=task3)
p4=Process(target=task4)
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
stop_time=time.time()
print(stop_time-start_time) # 3.5172011852264404
View Code
线程优势,完成时间 3.003171443939209
import time
from threading import Thread
def task1():
time.sleep(3)
def task2():
time.sleep(3)
def task3():
time.sleep(3)
def task4():
time.sleep(3)
if __name__ == '__main__':
start_time=time.time()
p1=Thread(target=task1)
p2=Thread(target=task2)
p3=Thread(target=task3)
p4=Thread(target=task4)
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
stop_time=time.time()
print(stop_time-start_time) # 3.003171443939209
View Code
9)死锁现象。释放锁之前,都需要获取到对方的锁,造成了无法释放锁
from threading import Thread,Lock
import time
mutexA = Lock()
mutexB = Lock()
class Mythread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print('%s 抢到了A锁' %self.name)
mutexB.acquire()
print('%s 抢到了B锁' % self.name)
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print('%s 抢到了B锁' % self.name)
time.sleep(1)
mutexA.acquire()
print('%s 抢到了A锁' % self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(2):
t = Mythread()
t.start()
View Code
10)递归锁。RLock,解决死锁现象。递归锁可以连续acquire()
from threading import Thread,RLock
import time
mutexA = mutexB = RLock()
class Mythread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print('%s 抢到了A锁' %self.name)
mutexB.acquire()
print('%s 抢到了B锁' % self.name)
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print('%s 抢到了B锁' % self.name)
time.sleep(1)
mutexA.acquire()
print('%s 抢到了A锁' % self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(2):
t = Mythread()
t.start()
View Code
三、IPC机制或队列和生产者模型
IPC:进程间通信,有两种解决方案:队列、管道
1)队列,先进先出。应用于生产者模型
from multiprocessing import Queue
q=Queue(maxsize=3)
q.put({'x':1})
q.put(2)
q.put('third')
print(q.get())
print(q.get())
print(q.get())
View Code
默认不加参数,超过队列最大值会堵塞。 q.put(1,block=False) 超过最大值,程序中断。效果等同于 q.put_nowait(1)。
timeout=3 超时时间,block=True的时候,才有意义
2)生产者模型的意义
1、什么是生产者消费者模型
生产者消费者模型指的是一种解决问题的思路
该模型中包含两类明确的角色:
1、生产者:创造数据的任务
2、消费者:处理数据的任务
2、为什么要用生产者消费者模型?
1、实现生产者与消费者任务的解耦和
2、平衡了生产者的生产力与消费者消费力
一旦程序中出现明显的两类需要并发执行的任务,一类是负责数据的,另外一类是负责处理数据的
那么就可以使用生产者消费者模型来提升执行效率
3、如何用
生产者----》队列《-------消费者
队列
1、队列占用的是内存控制,即便是不指定队列的大小也不可能无限制地放数据
2、队列是用来传递消息的介质,即队列内存放的是数据量较小的数据
2)生产者模型Queue,消费者卡住的不完善版本
from multiprocessing import Queue,Process
import time
def producer(name,q):
for i in range(5):
res = '包子%s' %i
time.sleep(0.5)
print('