《python》join、守护进程、锁/信号量/事件、进程队列 一、multiprocess.process模块 二、进程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)

1、join方法

  阻塞主进程,等待子进程执行完毕再放开阻塞

import time
import random
from multiprocessing import Process

# 单个子进程
def func(index):
    time.sleep(random.randint(1, 3))
    print('发送完毕')

if __name__ == '__main__':
    p = Process(target=func, args=(1,))
    p.start()
    p.join()    # 阻塞 直到p进程执行完毕就结束阻塞
    print('邮件已经发送完毕')
'''
发送完毕
邮件已经发送完毕
'''


# 多个子进程
def func(index):
    time.sleep(random.randint(1,3))
    print('第%s个邮件已经发送完毕' % index)

if __name__ == '__main__':
    p_lst = []
    for i in range(10):
        p = Process(target=func, args=(i,))
        p.start()
        p_lst.append(p)
    for p in p_lst:
        p.join()    # 等待每个子进程执行完毕
    print('10个邮件已经发送完毕')
'''
第2个邮件已经发送完毕
第4个邮件已经发送完毕
第7个邮件已经发送完毕
第5个邮件已经发送完毕
第1个邮件已经发送完毕
第9个邮件已经发送完毕
第0个邮件已经发送完毕
第3个邮件已经发送完毕
第8个邮件已经发送完毕
第6个邮件已经发送完毕
10个邮件已经发送完毕
'''

2、第二种开启进程的方法

  以继承Process类的形式开启进程的方式

import os
from multiprocessing import Process

class MyProcess(Process):
    def run(self):
        print('子进程: ', os.getpid(), os.getppid())

if __name__ == '__main__':
    p = MyProcess()
    p.start()   # 开启一个子进程,让这个子进程执行run方法
    print('主进程:', os.getpid())
'''
主进程: 6852
子进程:  6644 6852
'''


# 给子进程传参数


class MyProcess(Process):
    def __init__(self, arg):
        super().__init__()
        self.arg = arg
    def run(self):
        time.sleep(1)
        print('子进程: ', os.getpid(), os.getppid(), self.arg)

if __name__ == '__main__':
    # 开启单个子进程
    p = MyProcess('参数')
    p.start()   # 开启一个子进程,让这个子进程执行run方法
    p.join()
    print('主进程:', os.getpid())
'''
子进程:  6552 7784 参数
主进程: 7784
'''

if __name__ == '__main__':
    # 开启多个子进程
    for i in range(10):
        p = MyProcess('参数%s' % i)
        p.start()   # 开启一个子进程,让这个子进程执行run方法
    print('主进程:', os.getpid())
'''
主进程: 7340
子进程:  6540 7340 参数0
子进程:  6512 7340 参数3
子进程:  7648 7340 参数1
子进程:  7460 7340 参数2
子进程:  8048 7340 参数4
子进程:  5108 7340 参数8
子进程:  7868 7340 参数7
子进程:  7892 7340 参数6
子进程:  4172 7340 参数9
子进程:  7224 7340 参数5
'''

3、守护进程

  主要功能:每隔一段时间就向一台机器汇报自己的状态(程序的报活)

  特点:会随着主进程的结束而结束。

import time
from multiprocessing import Process

def func():
    print('子进程 start')
    time.sleep(3)
    print('子进程 end')

if __name__ == '__main__':
    p = Process(target=func)
    p.daemon = True     # 设置p为一个守护进程,必须在start之前完成
    p.start()
    time.sleep(2)
    print('主进程')
'''
子进程 start
主进程
'''
# 主进程会等待子进程完全结束才结束
# 守护进程会随着主进程的代码执行完毕而结束


def func1():
    count = 1
    while 1:
        print(count * '*')
        time.sleep(0.5)
        count += 1

def func2():
    print('func2 start')
    time.sleep(5)
    print('func2 end')

if __name__ == '__main__':
    p1 = Process(target=func1)
    p1.daemon = True
    p1.start()  # p1是守护进程
    p2 = Process(target=func2)
    p2.start()
    time.sleep(3)
    print('主进程')
'''
func2 start
*
**
***
****
*****
******
主进程
func2 end
'''
# 如果主进程代码已经执行完毕,但是子进程还没执行完,守护进程都不会继续执行
# 守护进程会随着主进程的代码执行完毕而结束
# 主进程会等待子进程结束,守护进程只等待主进程代码结束就结束了

二、进程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)

1、锁---multiprocess.Lock

  加锁降低了程序的效率,让原来能够同时执行的代码变成顺序执行了,异步变同步的过程

  好处:保证了数据的安全

import time
import json
from multiprocessing import Process, Lock

# 当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。

# 多进程抢占输出资源
def search(person):
    with open('ticket') as f:
        dic = json.load(f)
    time.sleep(0.2)     # 模拟网络延迟
    print('%s查询余票:' % person, dic['count'])

def get_ticket(person):
    with open('ticket') as f:
        dic = json.load(f)
    time.sleep(0.2)     # 模拟网络延迟
    if dic['count'] > 0:
        print('%s买到票了' % person)
        dic['count'] -= 1   # 买到票,数量减1
        time.sleep(0.2)     # 模拟网络延迟
        with open('ticket', 'w') as f:
            json.dump(dic, f)   # 把剩余票数写回文件
    else:
        print('%s没买到票' % person)

def ticket(person):
    search(person)  # 查票
    get_ticket(person)  # 抢票

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=ticket, args=('person%s' % i,))
        p.start()
'''
person0查询余票: 5
person4查询余票: 5
person3查询余票: 5
person0买到票了
person8查询余票: 5
person2查询余票: 5
person7查询余票: 5
person4买到票了
person1查询余票: 5
person5查询余票: 5
person3买到票了
person9查询余票: 5
person6查询余票: 5
person8买到票了
person2买到票了
person7买到票了
person1买到票了
person9买到票了
person6买到票了
person5买到票了
'''

# 使用锁维护执行顺序
def search(person):
    with open('ticket') as f:
        dic = json.load(f)
    time.sleep(0.2)     # 模拟网络延迟
    print('%s查询余票:' % person, dic['count'])

def get_ticket(person):
    with open('ticket') as f:
        dic = json.load(f)
    time.sleep(0.2)     # 模拟网络延迟
    if dic['count'] > 0:
        print('%s买到票了' % person)
        dic['count'] -= 1   # 买到票,数量减1
        time.sleep(0.2)     # 模拟网络延迟
        with open('ticket', 'w') as f:
            json.dump(dic, f)   # 把剩余票数写回文件
    else:
        print('%s没买到票' % person)

def ticket(person, lock):
    search(person)
    lock.acquire()  # 加锁
    get_ticket(person)
    lock.release()  # 解锁

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        p = Process(target=ticket, args=('person%s' % i, lock))
        p.start()
'''
person0查询余票: 5
person1查询余票: 5
person2查询余票: 5
person0买到票了
person5查询余票: 5
person6查询余票: 5
person3查询余票: 5
person9查询余票: 5
person4查询余票: 5
person8查询余票: 5
person7查询余票: 5
person1买到票了
person2买到票了
person5买到票了
person6买到票了
person3没买到票
person9没买到票
person4没买到票
person8没买到票
person7没买到票
'''
# 为了保证数据的安全
# 在异步的情况下,多个进程有可能同时修改同一份资源
# 就给这个修改的过程加上锁
import time
from multiprocessing import Process, Lock

# 加了锁就把异步变成同步了
def func(num, lock):
    time.sleep(1)
    print('异步执行', num)  # 异步会同时开始

    lock.acquire()
    time.sleep(0.5)
    print('同步执行', num)  # 同步要一个结束才开始下一个
    lock.release()
    
if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        p = Process(target=func, args=(i, lock))
        p.start()
from multiprocessing import Process, Lock

# 互斥锁
lock = Lock()
lock.acquire()
print('456')
lock.acquire()
print('123')
# 只打印456,123不打印

 2、信号量---multiprocess.Semaphore

  信号量的实现机制:计数器 + 锁 实现的

import time
import random
from multiprocessing import Process, Semaphore

def ktv(person, sem):
    sem.acquire()
    print('