Python基础:之进程 一、进程 11 进程间通信(IPC) 12 进程间通信方式三:共享数据 13 进程池   

Python基础:之进程
一、进程
11 进程间通信(IPC)
12 进程间通信方式三:共享数据
13 进程池 
 

1.什么是进程

程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。

正在进行的一个过程或者说一个任务。而负责执行任务则是cpu

2.进程与程序的区别

程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。

需要强调的是:同一个程序执行两次,那也是两个进程,比如打开暴风影音,虽然都是同一个软件,但是一个可以播放苍井空,一个可以播放饭岛爱。

3.并发与并行

 无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务

  并行:同时运行,只有具备多个cpu才能实现并行

      并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发,(并行也属于并发)

      Python基础:之进程
一、进程
11 进程间通信(IPC)
12 进程间通信方式三:共享数据
13 进程池 
 

      所有现代计算机经常会在同一时间做很多件事,一个用户的PC(无论是单cpu还是多cpu),都可以同时运行多个任务(一个任务可以理解为一个进程)。

    启动一个进程来杀毒(360软件)

    启动一个进程来看电影(暴风影音)

    启动一个进程来聊天(腾讯QQ)

  所有的这些进程都需被管理,于是一个支持多进程的多道程序系统是至关重要的

      多道技术:内存中同时存入多道(多个)程序,cpu从一个进程快速切换到另外一个,使每个进程各自运行几十或几百毫秒,这样,虽然在某一个瞬间,一个cpu只能执行一个任务,但在1秒内,cpu却可以运行多个进程,这就给人产生了并行的错觉,即伪并发,以此来区分多处理器操作系统的真正硬件并行(多个cpu共享同一个物理内存)

4.同步异步

同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去;

    异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态。当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率。

    举个例子,打电话时就是同步通信,发短息时就是异步通信。

5.进程的创建

 但凡是硬件,都需要有操作系统去管理,只要有操作系统,就有进程的概念,就需要有创建进程的方式,一些操作系统只为一个应用程序设计,比如微波炉中的控制器,一旦启动微波炉,所有的进程都已经存在。

  而对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4中形式创建新的进程

  1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)

  2. 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)

  3. 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)

  4. 一个批处理作业的初始化(只在大型机的批处理系统中应用)

  

  无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的:

  1. 在UNIX中该系统调用是:fork,fork会创建一个与父进程一模一样的副本,二者有相同的存储映像、同样的环境字符串和同样的打开文件(在shell解释器进程中,执行一个命令就会创建一个子进程)

  2. 在windows中该系统调用是:CreateProcess,CreateProcess既处理进程的创建,也负责把正确的程序装入新进程。

 

  关于创建的子进程,UNIX和windows

  1.相同的是:进程创建后,父进程和子进程有各自不同的地址空间(多道技术要求物理层面实现进程之间内存的隔离),任何一个进程的在其地址空间中的修改都不会影响到另外一个进程。

  2.不同的是:在UNIX中,子进程的初始地址空间是父进程的一个副本,提示:子进程和父进程是可以有只读的共享内存区的。但是对于windows系统来说,从一开始父进程与子进程的地址空间就是不同的。

6.进程的终止

  • 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)
  • 出错退出(自愿,python a.py中a.py不存在)
  • 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)
  • 被其他进程杀死(非自愿,如kill -9)

 7.进程的层次结构

 无论UNIX还是windows,进程只有一个父进程,不同的是:

  1. 在UNIX中所有的进程,都是以init进程为根,组成树形结构。父子进程共同组成一个进程组,这样,当从键盘发出一个信号时,该信号被送给当前与键盘相关的进程组中的所有成员。

  2. 在windows中,没有进程层次的概念,所有的进程都是地位相同的,唯一类似于进程层次的暗示,是在创建进程时,父进程得到一个特别的令牌(称为句柄),该句柄可以用来控制子进程,但是父进程有权把该句柄传给其他子进程,这样就没有层次了。

8.进程状态

 tail -f access.log |grep '404'

  执行程序tail,开启一个子进程,执行程序grep,开启另外一个子进程,两个进程之间基于管道'|'通讯,将tail的结果作为grep的输入。

  进程grep在等待输入(即I/O)时的状态称为阻塞,此时grep命令都无法运行

  其实在两种情况下会导致一个进程在逻辑上不能运行,

  1. 进程挂起是自身原因,遇到I/O阻塞,便要让出CPU让其他进程去执行,这样保证CPU一直在工作

  2. 与进程无关,是操作系统层面,可能会因为一个进程占用时间过多,或者优先级等原因,而调用其他的进程去使用CPU。

  因而一个进程由三种状态

Python基础:之进程
一、进程
11 进程间通信(IPC)
12 进程间通信方式三:共享数据
13 进程池 
 

9.进程并发实现

 进程并发的实现在于,硬件中断一个正在运行的进程,把此时进程运行的所有状态保存下来,为此,操作系统维护一张表格,即进程表(process table),每个进程占用一个进程表项(这些表项也称为进程控制块)

Python基础:之进程
一、进程
11 进程间通信(IPC)
12 进程间通信方式三:共享数据
13 进程池 
 

  该表存放了进程状态的重要信息:程序计数器、堆栈指针、内存分配状况、所有打开文件的状态、帐号和调度信息,以及其他在进程由运行态转为就绪态或阻塞态时,必须保存的信息,从而保证该进程在再次启动时,就像从未被中断过一样。

10.python多进程模块

10.1 multiprocessing模块介绍

python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

10.2 Process创建进程的类

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数介绍
1.group参数未使用,值始终为None()
2.target表示调用对象,即子进程要执行的任务
3.args表示调用对象的位置参数元组,args=(1,2,'egon',)
4.kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
5.name为子进程的名称

方法介绍

1.p.start():启动进程,并调用该子进程中的p.run() 

2.p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  

3.p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁

4.p.is_alive():如果p仍然运行,返回True

5.p.join([timeout]):主进程等待p终止(强调:是主进程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

属性介绍

1.p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

2.p.name:进程的名称

3.p.pid:进程的pid

4.p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)

5.p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

10.3 Python开启进程的两种方式

注意:在windows中Process()必须放到# if __name__ == '__main__':下

if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.

由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 
如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 
这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。

方式一

def piao(name):
print("%s is piaoing"%name)
time.sleep(2)
print("%s is piao end"%name)
if __name__ == '__main__':
p1 = Process(target=piao,args=('egon',),name='p1')
p1.start()#启动主进程调用子进程中的run方法
print('p1 name is %s'%p1.name)#此处代码和start同时运行的并行
print('父类')##此处代码和start同时运行的并行

方式二

继承Process,重写run方法,继承Process类一定要重写run方法,init初始化时super父类init方法在定义自己得

class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print("%s is piaoing" % self.name)
        # time.sleep(1)
        print("%s is piao end"%self.name)
if __name__ == '__main__':
    p1=Piao('han')
    p1.start()#自动化调用run方法
    print('父类1')
    p2=Piao('cai')
    p2.start()
    print("父类2")

 练习socket并发通讯

方式一

SERVER
from multiprocessing import Process from socket import * server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,addr): while True: #通讯循环 try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': while True: #链接循环 conn,addr=server.accept() p=Process(target=talk,args=(conn,addr)) p.start()

方式二 

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process
from socket import *
class socket_server(Process):
    def __init__(self):
        super(socket_server, self).__init__()

    def run(self):
        server = socket(AF_INET, SOCK_STREAM)
        server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        server.bind(('127.0.0.1', 8081))
        server.listen(5)
        conn, addr = server.accept()
        while True: #通讯循环
            try:
                msg=conn.recv(1024)
                if not msg:break
                conn.send(msg.upper())
            except Exception:
                break
if __name__ == '__main__':
    p1 = socket_server()
    p1.start()

客户端  

#!/usr/bin/python
# -*- coding:utf-8 -*-
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081))
while True:
    msg=input('>>').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

10.4进程方法属性

#p.join(),是父进程在等p的结束,是父进程阻塞在原地,而p仍然在后台运行
def piao(name): print('%s is piaoing' % name) time.sleep(random.randint(1,3)) print('%s is piao end' % name) if __name__ == '__main__': p1=Process(target=piao,args=('egon',)) p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('wupeiqi',)) p4=Process(target=piao,args=('yuanhao',)) p_l=[p1,p2,p3,p4] for p in p_l: p.start() for p in p_l: p.join()#父进程等待p运行结束后才执行住进程 print('主进程')
#有的同学会有疑问:既然join是等待进程结束,那么我像下面这样写,进程不就又变成串行的了吗?
#当然不是了,必须明确:p.join()是让谁等?
#很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p,

#详细解析如下:
#进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了
#而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
#join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待
# 所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间
#进程对象的其他方法一:terminate,is_alive
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s is piao end' %self.name)


p1=Piao('egon1')
p1.start()

p1.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
print(p1.is_alive()) #结果为True

print('开始')
print(p1.is_alive()) #结果为False

  

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing  import Process
import time
import random
import os
class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)

if __name__ == '__main__':

    p=Piao('egon')
    p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程死,p跟着一起死
    p.start()
    p.join(0.0001) #等待p停止,等0.0001秒就不再等了
    print('开始')
    print(1)
    print(p.is_alive())

 

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
import random
class Piao(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法会执行self.name=Piao-1,
        #                    #所以加到这里,会覆盖我们的self.name=name

        #为我们开启的进程设置名字的做法
        super().__init__()

        self.name = name

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)
if __name__ == '__main__':

    p=Piao('egon')
    p.start()
    print('开始')
    print(p.pid) #查看pid

 

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
import random
def piao(name):
    print('%s is piaoing' % name)
    time.sleep(random.randint(1,3))
    print('%s is piao end' % name)
if __name__ == '__main__':
    p1=Process(target=piao,args=('egon',))
    p1.name = 'hanjialong'#修改进程名称
    # p1.daemon=True
    p1.start()

    p1.terminate()
    print(p1.is_alive())
    time.sleep(1)
    print(p1.is_alive())

    print('主进程')

    print(p1.name)
    print(p1.pid)

循环开启进程

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
import random
def foo(name):
    print(name)

if __name__ == '__main__':
    for i in range(1,100):
        p = Process(target=foo, args=('进程%s' % i,))
        p.start()
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,
part1:共享同一打印终端,发现会有多行内容打印到一行的现象(多个进程共享并抢占同一个打印终端,乱了)
#多进程共享一个打印终端(用python2测试看两个进程同时往一个终端打印,出现打印到一行的错误)
from multiprocessing import Process
import time
class Logger(Process):
    def __init__(self):
        super(Logger,self).__init__()
    def run(self):
        print(self.name)


for i in range(1000000):
    l=Logger()
    l.start()

 线程共享同一个文件,实验:

既然可以用文件共享数据,那么为什么通讯不用文件作为介质呢?

1.效率问题,使用文件速度太慢

2.需要自己加锁处理

from multiprocessing import Process
import time
import random
#多进程共享一套文件系统

def work(filename,msg):
    with open(filename,'a',encoding='utf-8') as f:
        f.write(msg)

if __name__ == '__main__':

    for i in range(5):
        p=Process(target=work,args=('a.txt','进程%s
' %str(i)))
        p.start()
        p.join()
        work('a.txt',"主进程%s
"%str(i))

11 进程间通信(IPC)

1.队列方式

 进程彼此之间互相隔离,要实现进程间通信,即IPC,multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

创建队列的类(底层就是以管道和锁定的方式实现)

Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

参数介绍

maxsize是队列中允许最大项数,省略则无大小限制。 

方法介绍 

1.q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

2.q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.

3.q.get_nowait():同q.get(False) q.put_nowait():同q.put(False)

4.q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目

5.q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。

6.q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

了解

1 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞

2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

3 q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用
4.q.cancel_join_thread方法可以禁止这种行为

应用 


multiprocessing模块支持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,但是队列接口
#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Queue
#队列,先进先出
q=Queue(3)#设置队列长度为3

q.put({'a':1})
q.put('b')
q.put('c')
print(q.full())#此时队列已满所以返回True
# q.put('d',False) #等同于q.put_nowait('d')
print(q.get())
q.put('d',timeout=1)#当超过1秒还没有空间存储就会报错
print(q.qsize())
print(q.get())
print(q.get())
print(q.get())
# print(q.get(timeout=2))#在2秒内还没有可取的就会报错
# print(q.get())
# print(q.get())
# print(q.empty())
print(q.get(block=False))
# print(q.get_nowait())

10.6 生产者消费者模型 

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        time.sleep(random.randint(1,3))
        res=q.get()
        print(" 33[45m消费者拿到了:%s 33[0m"%res)
def producer(seq,q):
    for item in seq:
        time.sleep(random.randint(1,3))
        print(" 33[46m生产者生产了:%s 33[0m"%item)
        q.put(item)
if __name__ == '__main__':
    q=Queue()
    seq=('包子%s' %i for i in range(10))
    producer(seq,q)
    print('主线程')
    c =Process(target=consumer,args=(q,))
    c.start()

主线程等待消费者结束(生产者发送结束信号给消费者)

#!/usr/bin/Python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        # time.sleep(random.randint(1,3))
        res=q.get()
        print(" 33[45m消费者拿到了:%s 33[0m"%res)
def producer(seq,q):
    for item in seq:
        # time.sleep(random.randint(1,3))
        print(" 33[46m生产者生产了:%s 33[0m"%item)
        q.put(item)
if __name__ == '__main__':
    q=Queue()
    c =Process(target=consumer,args=(q,))
    c.start()
    seq=('包子%s' %i for i in range(10))
    producer(seq,q)
    c.join()
    print('主线程')

JoinableQueue类

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

    参数介绍:

    maxsize是队列中允许最大项数,省略则无大小限制。    

  方法介绍:

    JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
from multiprocessing import Process,JoinableQueue
import time
import random

def consumer(q,name):
    while True:
        # time.sleep(random.randint(1,3))
        res=q.get()
        q.task_done()
        print(' 33[41m消费者%s拿到了%s 33[0m' %(name,res))

def producer(seq,q,name):
    for item in seq:
        # time.sleep(random.randint(1,3))
        q.put(item)
        print(' 33[42m生产者%s生产了%s 33[0m' %(name,item))
    q.join()
    print('============>>')

if __name__ == '__main__':
    q=JoinableQueue()
    c=Process(target=consumer,args=(q,'egon'),)
    c.daemon=True #设置守护进程,主进程结束c就结束
    c.start()

    seq=['包子%s' %i for i in range(10)]
    p=Process(target=producer,args=(seq,q,'厨师1'))
    p.start()

    # master--->producer----->q--->consumer(10次task_done)
    p.join() #主进程等待p结束,p等待c把数据都取完,c一旦取完数据,p.join就是不再阻塞,进
    # 而主进程结束,主进程结束会回收守护进程c,而且c此时也没有存在的必要了
    print('主进程')

 一个生产者多个消费者

from multiprocessing import Process,JoinableQueue
import time,random
def consumer(name,q):
    while True:
        time.sleep(random.randint(1,2))
        res=q.get()
        print(' 33[45m%s拿到了 %s 33[0m' %(name,res))
        q.task_done()


def producer(seq,q):
    for item in seq:
        time.sleep(random.randrange(1,2))
        q.put(item)
        print(' 33[46m生产者做好了 %s 33[0m' %item)
    q.join()

if __name__ == '__main__':
    q=JoinableQueue()
    seq=('包子%s' %i for i in range(10))

    p1=Process(target=consumer,args=('消费者1',q,))
    p2=Process(target=consumer,args=('消费者2',q,))
    p3=Process(target=consumer,args=('消费者3',q,))
    p1.daemon=True
    p2.daemon=True
    p3.daemon=True
    p1.start()
    p2.start()
    p3.start()

    producer(seq,q)

    print('主线程')

也可以开启一个新的子进程当生产者,不用主线程当生产者

from multiprocessing import Process,JoinableQueue
import time,random
def consumer(name,q):
    while True:
        # time.sleep(random.randint(1,2))
        res=q.get()
        print(' 33[45m%s拿到了 %s 33[0m' %(name,res))
        q.task_done()


def producer(seq,q):
    for item in seq:
        # time.sleep(random.randrange(1,2))
        q.put(item)
        print(' 33[46m生产者做好了 %s 33[0m' %item)
    q.join()

if __name__ == '__main__':
    q=JoinableQueue()
    seq=['包子%s' %i for i in range(10)] #在windows下无法传入生成器,我们可以用列表解析测试

    p1=Process(target=consumer,args=('消费者1',q,))
    p2=Process(target=consumer,args=('消费者2',q,))
    p3=Process(target=consumer,args=('消费者3',q,))
    p1.daemon=True
    p2.daemon=True
    p3.daemon=True
    p1.start()
    p2.start()
    p3.start()

    # producer(seq,q) #也可以是下面三行的形式,开启一个新的子进程当生产者,不用主线程当生产者
    p4=Process(target=producer,args=(seq,q))
    p4.start()
    p4.join()
    print('主线程')

2.管道方式

管道也可以说是队列的另外一种形式,下面我们就开始介绍基于管道实现金城之间的消息传递

创建管道的类:

Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道

参数介绍:

dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。

 方法介绍:

 conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。

 conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
1.conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
2.conn1.fileno():返回连接使用的整数文件描述符
3.conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。 4.conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
5.conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收 6.conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

基于管道实现进程间通信(与队列的方式是类似的,队列就是管道加锁实现的):

from multiprocessing import Process,Pipe

import time,os
def consumer(p,name):
    left,right=p
    left.close()
    while True:
        try:
            baozi=right.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            right.close()
            break
def producer(seq,p):
    left,right=p
    right.close()
    for i in seq:
        left.send(i)
        # time.sleep(1)
    else:
        left.close()
if __name__ == '__main__':
    left,right=Pipe()

    c1=Process(target=consumer,args=((left,right),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(left,right))

    right.close()
    left.close()

    c1.join()
    print('主进程')

注意:生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。如果忘记执行这些步骤,程序可能再消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生产EOFError异常。因此在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点。

管道可以用于双向通信,利用通常在客户端/服务器中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序,如下

#注意:send()和recv()方法使用pickle模块对对象进行序列化。
from multiprocessing import Process,Pipe import time,os def adder(p,name): server,client=p client.close() while True: try: x,y=server.recv() except EOFError: server.close() break res=x+y server.send(res) print('server done') if __name__ == '__main__': server,client=Pipe() c1=Process(target=adder,args=((server,client),'c1')) c1.start() server.close() client.send((10,20)) print(client.recv()) client.close() c1.join() print('主进程')

12 进程间通信方式三:共享数据

12 .1进程共享数据

from multiprocessing import Manager,Process
import os
def work(d,l):
    l.append(os.getpid())
    d[os.getpid()]=os.getpid()

if __name__ == '__main__':
    m=Manager()
    l=m.list(['init',])
    d=m.dict({'name':'egon'})


    # p1=Process(target=work,args=(d,l))
    # p2=Process(target=work,args=(d,l))
    # p3=Process(target=work,args=(d,l))
    # p4=Process(target=work,args=(d,l))
    # p5=Process(target=work,args=(d,l))
    #
    # p_l=[p1,p2,p3,p4,p5]
    # for p in p_l:
    #     p.start()
    #
    # for p in p_l:
    #     p.join()

    p_l=[]
    for i in range(5):
        p=Process(target=work,args=(d,l))
        p_l.append(p)
        p.start()

    for p in p_l:
        p.join()
    print(d)
    print(l)

12.2  进程同步锁 

加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全。

在4.4小节我们学习了队列,队列是管道+锁实现的,因而我们无需考虑复杂的锁问题。

但是在4.3小节中我们介绍到,进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理

所以,就让我们帮文件当做数据库,模拟抢票(Lock互斥锁)

#文件db的内容为:{"count":1}
#注意一定要用双引号,不然json无法识别
from multiprocessing import Process,Lock
import json
import time
import random
import os

def work(filename,lock): #买票
    # lock.acquire()
    with lock:
        with open(filename,encoding='utf-8') as f:
            dic=json.loads(f.read())
            # print('剩余票数: %s' % dic['count'])
        if dic['count'] > 0:
            dic['count']-=1
            time.sleep(random.randint(1,3)) #模拟网络延迟
            with open(filename,'w',encoding='utf-8') as f:
                f.write(json.dumps(dic))
            print('%s 购票成功' %os.getpid())
        else:
            print('%s 购票失败' %os.getpid())
    # lock.release()

if __name__ == '__main__':
    lock=Lock()
    p_l=[]
    for i in range(100):
        p=Process(target=work,args=('db',lock))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()

    print('主线程')

13 进程池 

13.1 Pool类

在使用Python进行系统管理时,特别是同时操作多个文件目录或者远程控制多台主机,并行操作可以节约大量的时间。如果操作的对象数目不大时,还可以直接使用Process类动态的生成多个进程,十几个还好,但是如果上百个甚至更多,那手动去限制进程数量就显得特别的繁琐,此时进程池就派上用场了。 
Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。
下面介绍一下multiprocessing 模块下的Pool类下的几个方法

#!/usr/bin/python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Lock
import json
import time
import random
def work(dbfile,name,lock):
    # lock.acquire()
    with lock:
        with open(dbfile,encoding='utf-8') as f:
            dic=json.loads(f.read())

        if dic['count'] > 0:
            dic['count']-=1
            time.sleep(random.randint(1,3)) #模拟网络延迟
            with open(dbfile,'w',encoding='utf-8') as f:
                f.write(json.dumps(dic))
            print(' 33[43m%s 抢票成功 33[0m' %name)
        else:
            print(' 33[45m%s 抢票失败 33[0m' %name)
    # lock.release()


if __name__ == '__main__':
    lock=Lock()
    p_l=[]
    for i in range(100):
        p=Process(target=work,args=('a.txt','用户%s' %i,lock))
        p_l.append(p)
        p.start()


    for p in p_l:
        p.join()
    print('主进程')

使用进程池进程socket通讯 

客户端

#客户端
#!/usr/bin/Python # -*- coding:utf-8 -*- from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8081)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))

服务端

#!/usr/bin/python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Pool
from socket import *
import os
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8081))
server.listen(5)

def talk(conn,addr):
print(os.getpid())
while True: #通讯循环
try:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__':
pool=Pool()
res_l=[]
while True: #链接循环
conn,addr=server.accept()
# print(addr)
# pool.apply(talk,args=(conn,addr))
res=pool.apply_async(talk,args=(conn,addr))
res_l.append(res)
# print(res_l)

13.2 回调函数