python之并发编程进阶篇9

一、守护进程和守护线程

1)守护进程的概念

什么是守护进程:
    守护: 在主进程代码结束情况下,就立即死掉
    守护进程本质就是一个子进程,该子进程守护着主进程

为何要用守护进程
    守护进程本质就是一个子进程,所以在主进程需要将任务并发执行的时候需要开启子进程
    当该子进程执行的任务生命周期伴随主进程整个生命周期的时候,就需要将该子进程做成守护的进程

2)创建守护进程

from multiprocessing import Process
import time

def task(x):
    print('%s is running' %x)
    time.sleep(1)
    print('%s is done' % x)

if __name__ == '__main__':
    p=Process(target=task,args=('守护进程',))
    p.daemon=True # 必须放到p.start()之前
    p.start()
    time.sleep(3)
    print('')
View Code

3)守护线程的概念

主线程要等到该进程内所有非守护线程(子线程)都死掉才算死掉,因为主线程的生命周期
代表了该进程的生命周期,该进程一定是要等到所有非守护的线程都干完活才应该死掉

可以简单理解为:
    守护线程是要等待该进程内所有非守护的线程都运行完毕才死掉

4)创建守护线程

from threading import Thread
import time

def task(x):
    print('%s is running' %x)
    time.sleep(3)
    print('%s is done' % x)

if __name__ == '__main__':
    t=Thread(target=task,args=('守护线程',))
    t.daemon=True # 必须放到p.start()之前
    t.start()
    print('')
View Code

二、互斥锁和信号量与GIL全局解释器锁,死锁及递归锁

1)互斥锁的意义

互斥锁的原理是将进程/线程内执行的部分代码由并发执行变成穿行执行,牺牲了效率但保证数据安全
互斥锁不能连续低执行mutex.acquire()操作,必须等到拿着锁的进程释放锁mutex.release()其他进程才能抢到

2)进程mutex=Lock()

from multiprocessing import Process,Lock
import json
import os
import time
import random

mutex=Lock()

def check():
    with open('db.json','rt',encoding='utf-8') as f:
        dic=json.load(f)
    print('%s 剩余票数:%s' %(os.getpid(),dic['count']))

def get():
    with open('db.json','rt',encoding='utf-8') as f:
        dic=json.load(f)
    time.sleep(1)
    if dic['count']  > 0:
        dic['count']-=1
        time.sleep(random.randint(1,3)) #模拟网络延迟
        with open('db.json','wt',encoding='utf-8') as f:
            json.dump(dic,f)
            print('%s 抢票成功' %os.getpid())

def task(mutex):
    # 并发查看
    check()
    # 串行购票
    mutex.acquire()
    get()
    mutex.release()

if __name__ == '__main__':
    for i in range(7):
        p=Process(target=task,args=(mutex,))
        p.start()
        # p.join() # 将p内的代码变成整体串行
View Code

3)信号量。设置能同时执行任务的数量

# 比如公共厕所,能同时上厕所的有4个位置
from multiprocessing import Process,Semaphore
import os
import time
import random

sm=Semaphore(4)

def go_wc(sm):
    sm.acquire()
    print('%s is wcing' %os.getpid())
    time.sleep(random.randint(1,3))
    sm.release()

if __name__ == '__main__':
    for i in range(20):
        p=Process(target=go_wc,args=(sm,))
        p.start()
View Code

4)线程问题版。线程中修改同一个数据,数据存在不安全,故障

from threading import Thread
import time
n = 100
def task():
    global n
    temp = n
    time.sleep(0.1)
    n = temp -1

if __name__ == '__main__':
    t_l  = []
    for i in range(100):
        t = Thread(target=task)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
    print(n)
View Code

5)线程互斥锁修改版

from threading import Thread,Lock
import time

mutex = Lock()
n = 100
def task():
    global n
    with mutex:     # 拿到锁,自动释放锁
        temp = n
        time.sleep(0.1)
        n = temp -1

if __name__ == '__main__':
    t_l = []
    start_time = time.time()
    for i in range(50):
        t = Thread(target=task)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
    print(n)
    print(time.time()- start_time)
View Code

6)GIL的意义。判断什么情况下使用线程和进程

1 GIL是什么
    GIL是全局解释器锁,本质就是一把互斥锁
    GIL是Cpython解释器的特性,而不是python的特性
    每启动一个进程,该进程就会有一个GIL锁,用来控制该进程内的多个线程同一时间只有一个执行
    这意味着Cpython解释器的多线程没有并行的效果,但是有并发的效果

2、为什么要有GIL
    因为Cpython解释器的垃圾回收机制不是线程安全的

3、GIL vs 自定义互斥锁
    在一个进程内的多个线程要想执行,首先需要抢的是GIL,GIL就相当于执行权限

    python的多进程用于计算密集型
    python的多线程用于IO密集型

7)计算密集型中,进程计算和线程计算对比

  进程计算,计算密集型。利用多核cpu的优势,但进程数不能超过核数的2倍,会大量消耗cpu资源。计算时间  27.400567293167114

from multiprocessing import Process
import time

def task1():
    res=1
    for i in range(100000000):
        res*=i

def task2():
    res = 1
    for i in range(100000000):
        res += i

def task3():
    res = 1
    for i in range(100000000):
        res -= i

def task4():
    res = 1
    for i in range(100000000):
        res += i

if __name__ == '__main__':
    start_time=time.time()
    p1=Process(target=task1)
    p2=Process(target=task2)
    p3=Process(target=task3)
    p4=Process(target=task4)

    p1.start()
    p2.start()
    p3.start()
    p4.start()

    p1.join()
    p2.join()
    p3.join()
    p4.join()
    stop_time=time.time()
    print(stop_time-start_time) #27.400567293167114
计算密集型任务用多进程

  线程进行密集计算。计算时间 86.84396719932556

import time
from threading import Thread
def task1():
    res=1
    for i in range(100000000):
        res*=i

def task2():
    res = 1
    for i in range(100000000):
        res += i

def task3():
    res = 1
    for i in range(100000000):
        res -= i

def task4():
    res = 1
    for i in range(100000000):
        res += i

if __name__ == '__main__':
    start_time=time.time()
    p1=Thread(target=task1)
    p2=Thread(target=task2)
    p3=Thread(target=task3)
    p4=Thread(target=task4)

    p1.start()
    p2.start()
    p3.start()
    p4.start()

    p1.join()
    p2.join()
    p3.join()
    p4.join()
    stop_time=time.time()
    print(stop_time-start_time) # 86.84396719932556
View Code

 8)IO密集型中。进程与线程对比

   进程完成时间 3.5172011852264404

import time
from multiprocessing import Process
def task1():
    time.sleep(3)
def task2():
    time.sleep(3)

def task3():
    time.sleep(3)

def task4():
    time.sleep(3)

if __name__ == '__main__':
    start_time=time.time()
    p1=Process(target=task1)
    p2=Process(target=task2)
    p3=Process(target=task3)
    p4=Process(target=task4)

    p1.start()
    p2.start()
    p3.start()
    p4.start()

    p1.join()
    p2.join()
    p3.join()
    p4.join()
    stop_time=time.time()
    print(stop_time-start_time) # 3.5172011852264404
View Code

   线程优势,完成时间 3.003171443939209

import time
from threading import Thread
def task1():
    time.sleep(3)
def task2():
    time.sleep(3)

def task3():
    time.sleep(3)

def task4():
    time.sleep(3)

if __name__ == '__main__':
    start_time=time.time()
    p1=Thread(target=task1)
    p2=Thread(target=task2)
    p3=Thread(target=task3)
    p4=Thread(target=task4)

    p1.start()
    p2.start()
    p3.start()
    p4.start()

    p1.join()
    p2.join()
    p3.join()
    p4.join()
    stop_time=time.time()
    print(stop_time-start_time) # 3.003171443939209
View Code

9)死锁现象。释放锁之前,都需要获取到对方的锁,造成了无法释放锁

from threading import Thread,Lock
import time
mutexA = Lock()
mutexB = Lock()

class Mythread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('%s 抢到了A锁' %self.name)

        mutexB.acquire()
        print('%s 抢到了B锁' % self.name)
        mutexB.release()

        mutexA.release()
    def f2(self):
        mutexB.acquire()
        print('%s 抢到了B锁' % self.name)
        time.sleep(1)

        mutexA.acquire()
        print('%s 抢到了A锁' % self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(2):
        t =  Mythread()
        t.start()
View Code

10)递归锁。RLock,解决死锁现象。递归锁可以连续acquire()

from threading import Thread,RLock
import time
mutexA = mutexB = RLock()
class Mythread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('%s 抢到了A锁' %self.name)

        mutexB.acquire()
        print('%s 抢到了B锁' % self.name)
        mutexB.release()

        mutexA.release()
    def f2(self):
        mutexB.acquire()
        print('%s 抢到了B锁' % self.name)
        time.sleep(1)

        mutexA.acquire()
        print('%s 抢到了A锁' % self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(2):
        t =  Mythread()
        t.start()
View Code

三、IPC机制或队列和生产者模型

  IPC:进程间通信,有两种解决方案:队列、管道

1)队列,先进先出。应用于生产者模型

from multiprocessing import Queue
q=Queue(maxsize=3)

q.put({'x':1})
q.put(2)
q.put('third')

print(q.get())
print(q.get())
print(q.get())
View Code

  默认不加参数,超过队列最大值会堵塞。 q.put(1,block=False)  超过最大值,程序中断。效果等同于 q.put_nowait(1)。

 timeout=3 超时时间,block=True的时候,才有意义

2)生产者模型的意义

1、什么是生产者消费者模型
    生产者消费者模型指的是一种解决问题的思路
    该模型中包含两类明确的角色:
        1、生产者:创造数据的任务
        2、消费者:处理数据的任务

2、为什么要用生产者消费者模型?
    1、实现生产者与消费者任务的解耦和
    2、平衡了生产者的生产力与消费者消费力
    一旦程序中出现明显的两类需要并发执行的任务,一类是负责数据的,另外一类是负责处理数据的
    那么就可以使用生产者消费者模型来提升执行效率

3、如何用
    生产者----》队列《-------消费者
    队列
        1、队列占用的是内存控制,即便是不指定队列的大小也不可能无限制地放数据
        2、队列是用来传递消息的介质,即队列内存放的是数据量较小的数据

2)生产者模型Queue,消费者卡住的不完善版本

from multiprocessing import Queue,Process
import time

def producer(name,q):
    for i in range(5):
        res = '包子%s' %i
        time.sleep(0.5)
        print('

相关推荐