Python之路【第八篇】(二)python基础 之进程、线程、协程 Python(八)进程、线程、协程篇

Python之路【第八篇】(二)python基础 之进程、线程、协程
Python(八)进程、线程、协程篇

Python之路【第八篇】(二)python基础 之进程、线程、协程
Python(八)进程、线程、协程篇

进程和线程的关系                                                                                                       

进程定义
    进程就是一个程序在一个数据集上的一次动态执行过程。
    进程一般由程序、数据集、进程控制块三部分组成。
    我们编写的程序用来描述进程要完成哪些功能以及如何完成;
    数据集则是程序在执行过程中所需要使用的资源;
    进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。
线程的出现是为了降低上下文切换的消耗,提高系统的并发性,并突破一个进程只能干一样事的缺陷,使到进程内并发成为可能。
  假设,一个文本程序,需要接受键盘输入,将内容显示在屏幕上,还需要保存信息到硬盘中。若只有
  一个进程,势必造成同一时间只能干一样事的尴尬(当保存时,就不能通过键盘输入内容)。若有多
  个进程,每个进程负责一个任务,进程A负责接收键盘输入的任务,进程B负责将内容显示在屏幕上的
  任务,进程C负责保存内容到硬盘中的任务。这里进程A,B,C间的协作涉及到了进程通信问题,而且
  有共同都需要拥有的东西-------文本内容,不停的切换造成性能上的损失。若有一种机制,可以使
  任务A,B,C共享资源,这样上下文切换所需要保存和恢复的内容就少了,同时又可以减少通信所带
  来的性能损耗,那就好了。是的,这种机制就是线程。

  线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序
  计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发
  性能。线程没有自己的系统资源。

两者之间的区别
  • Threads share the address space of the process that created it; processes have their own address space.
  • 线程的地址空间共享,每个进程有自己的地址空间。
  • Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
  • 一个进程中的线程直接接入他的进程的数据段,但是每个进程都有他们自己的从父进程拷贝过来的数据段
  • Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
  • 一个进程内部的线程之间能够直接通信,进程之间必须使用进程间通信实现通信
  • New threads are easily created; new processes require duplication of the parent process.
  • 新的线程很容易被创建,新的进程需要从父进程复制
  • Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
  • 一个进程中的线程间能够有相当大的控制力度,进程仅仅只能控制他的子进程
  • Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
  • 改变主线程(删除,优先级改变等)可能影响这个进程中的其他线程;修改父进程不会影响子进程
并发和并行、同步和异步 简单定义                               
 并发和并行

  并发:系统具有处理多个任务(动作)的能力

  并行:系统具有 (同时) 处理多个任务(动作)的能力

  关系:并行是并发的一个子集

 同步和异步
同步:当一个进程执行到一个io(需要接受外部的数据的时候)----等:同步
异步:当一个进程执行到一个io(需要接受外部的数据的时候)----不等:异步 (效率高)
例:
打电话:同步
发短信:异步

Python的 GIL                                                                                           

无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行

线程                                                                                                               

Threading用于提供线程相关的操作。线程是应用程序中工作的最小单元,它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

threading 模块建立在 _thread 模块之上。thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。

一、线程条用的两种方式:

1、直接导入

import threading  #导入线程模块
import time
 
def worker(num):
    time.sleep(1)
    print(num)
    return
 
for i in range(10):
    t = threading.Thread(target=worker, args=(i,))#创建线程     target 后面接收的是函数  args和面接收的是函数里面的参数,如果函数没有参数,可以不用写

    t.start()  # 自动调用内置函数的run方法

2、继承方法

import threading
import time
  
  
class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num
  
    def run(self):    #定义每个线程要运行的函数
  
        print("running on number:%s" %self.num)
  
        time.sleep(2)
  
if __name__ == '__main__':
  
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()  # 直接调用run函数
    t2.start()

thread方法:   t.start() : 激活线程   t.getName() : 获取线程的名称   t.setName() : 设置线程的名称   t.name : 获取或设置线程的名称   t.is_alive() : 判断线程是否为激活状态   t.isAlive() :判断线程是否为激活状态   t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,
  必须在执行start()方法之前才可以使用。
  如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;
  如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止   t.isDaemon() : 判断是否为守护线程   t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None   t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义   t.run() :线程被cpu调度后自动执行线程对象的run方法

t.jion

import threading
from time import ctime,sleep
import time

def ListenMusic(name):

        print ("Begin listening to %s. %s" %(name,ctime()))
        sleep(3)
        print("end listening %s"%ctime())

def RecordBlog(title):

        print ("Begin recording the %s! %s" %(title,ctime()))
        sleep(5)
        print('end recording %s'%ctime())


threads = []


t1 = threading.Thread(target=ListenMusic,args=('水手',))
t2 = threading.Thread(target=RecordBlog,args=('python线程',))

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':

    for t in threads:
        #t.setDaemon(True) #注意:一定在start之前设置
        t.start()
        # t.join()
    # t1.join()
    t1.setDaemon(True)

    #t2.join()########考虑这三种join位置下的结果?
    print ("all over %s" %ctime())

  

join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞。

setDaemon(True):

         将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。

         当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成

         想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程

         完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦

线程锁

threading.RLock & threading.Lock

我们使用线程对数据进行操作的时候,如果多个线程同时修改某个数据,

可能会出现不可预料的结果,为了保证数据的准确性,引入了锁的概念。

 1 import threading,time
 2 
 3 def sub():
 4     global num  # z在每个线程中都获取这个全局变量
 5 
 6     temp = num
 7     print("-- get num", num)
 8     time.sleep(0.001)
 9     num = temp - 1  # 对此公共的变量进行-1操作
10 
11 num = 100  # 设置一个共享变量
12 
13 l = []
14 for i in range(100):
15     t = threading.Thread(target=sub)
16     t.start()
17     l.append(t)
18 for t in l:  # 等所有线程执行完毕
19     t.join()
20 
21 print("finall num ",num)
不设置锁的情况
  1 -- get num 100
  2 -- get num 100
  3 -- get num 100
  4 -- get num 100
  5 -- get num 100
  6 -- get num 100
  7 -- get num 100
  8 -- get num 100
  9 -- get num 100
 10 -- get num 100
 11 -- get num 99
 12 -- get num 99
 13 -- get num 99
 14 -- get num 99
 15 -- get num 99
 16 -- get num 99
 17 -- get num 99
 18 -- get num 99
 19 -- get num 99
 20 -- get num 98
 21 -- get num 98
 22 -- get num 98
 23 -- get num 98
 24 -- get num 98
 25 -- get num 98
 26 -- get num 98
 27 -- get num 98
 28 -- get num 98
 29 -- get num 98
 30 -- get num 98
 31 -- get num 98
 32 -- get num 97
 33 -- get num 97
 34 -- get num 97
 35 -- get num 97
 36 -- get num 97
 37 -- get num 97
 38 -- get num 97
 39 -- get num 97
 40 -- get num 97
 41 -- get num 96
 42 -- get num 96
 43 -- get num 96
 44 -- get num 96
 45 -- get num 96
 46 -- get num 96
 47 -- get num 96
 48 -- get num 96
 49 -- get num 95
 50 -- get num 95
 51 -- get num 95
 52 -- get num 95
 53 -- get num 95
 54 -- get num 95
 55 -- get num 94
 56 -- get num 94
 57 -- get num 94
 58 -- get num 94
 59 -- get num 94
 60 -- get num 94
 61 -- get num 94
 62 -- get num 94
 63 -- get num 94
 64 -- get num 94
 65 -- get num 93
 66 -- get num 93
 67 -- get num 93
 68 -- get num 93
 69 -- get num 93
 70 -- get num 93
 71 -- get num 93
 72 -- get num 93
 73 -- get num 93
 74 -- get num 93
 75 -- get num 93
 76 -- get num 92
 77 -- get num 92
 78 -- get num 92
 79 -- get num 92
 80 -- get num 92
 81 -- get num 92
 82 -- get num 92
 83 -- get num 92
 84 -- get num 92
 85 -- get num 92
 86 -- get num 92
 87 -- get num 92
 88 -- get num 91
 89 -- get num 91
 90 -- get num 91
 91 -- get num 91
 92 -- get num 91
 93 -- get num 91
 94 -- get num 91
 95 -- get num 91
 96 -- get num 91
 97 -- get num 91
 98 -- get num 91
 99 -- get num 91
100 -- get num 91
101 finall num  90
102 
103 Process finished with exit code 0
运行结果

 Python之路【第八篇】(二)python基础 之进程、线程、协程
Python(八)进程、线程、协程篇

同步锁(threading.Lock)

# 加锁
import threading,time
def sub():
    global num  # z在每个线程中都获取这个全局变量

    lock.acquire() # 加锁
    temp = num
    print("-- get num", num)
    time.sleep(0.1)
    num = temp - 1  # 对此公共的变量进行-1操作
    lock.release() #接锁


num = 100  # 设置一个共享变量

l = []
lock = threading.Lock()  # 生成全局锁
for i in range(100):
    t = threading.Thread(target=sub)
    t.start()
    l.append(t)
for t in l:  # 等所有线程执行完毕
    t.join()

print("finall num ", num)

死锁

 1 import threading,time
 2 
 3 class MyThread(threading.Thread):
 4 
 5 
 6     def actionA(self):
 7         A.acquire()  # 加锁
 8         print(self.name,"actionA gotA",time.ctime())
 9         time.sleep(2)
10 
11         B.acquire()  # 加锁
12         print(self.name,"actionA gotB",time.ctime())
13         time.sleep(1)
14 
15         B.release() # 解锁
16         A.release()# 解锁
17 
18     def actionB(self):
19 
20         B.acquire() # 加锁
21         print(self.name, "actionB gotA", time.ctime())
22         time.sleep(2)
23         A.acquire() # 加锁
24 
25         print(self.name, "actionB gotB", time.ctime())
26         time.sleep(1)
27 
28         A.release()# 解锁
29         B.release()# 解锁
30 
31     def run(self):
32         self.actionA()
33         self.actionB()
34 
35 if __name__ == '__main__':
36 
37     A = threading.Lock()
38     B = threading.Lock()
39 
40     L = []
41 
42     for i in range(5):
43         t = MyThread()
44         t.start()
45         L.append(t)
46     for i in L:
47         i.join()
48     print("ending...")
死锁 

递归锁(threading.RLock)

 1 import threading,time
 2 
 3 class MyThread(threading.Thread):
 4 
 5 
 6     def actionA(self):
 7         r_lock.acquire()
 8         print(self.name,"actionA gotA",time.ctime())
 9         time.sleep(2)
10 
11         r_lock.acquire()
12         print(self.name,"actionA gotB",time.ctime())
13         time.sleep(1)
14 
15         r_lock.release()
16         r_lock.release()
17 
18     def actionB(self):
19         r_lock.acquire()
20         print(self.name, "actionB gotA", time.ctime())
21         time.sleep(2)
22         r_lock.acquire()
23 
24         print(self.name, "actionB gotB", time.ctime())
25         time.sleep(1)
26 
27         r_lock.release()
28         r_lock.release()
29 
30     def run(self):
31         self.actionA()
32         self.actionB()
33 
34 if __name__ == '__main__':
35 
36     # A = threading.Lock()
37     # B = threading.Lock()
38     r_lock = threading.RLock() # 设置递归锁
39     L = []
40 
41     for i in range(5):
42         t = MyThread()
43         t.start()
44         L.append(t)
45     for i in L:
46         i.join()
47     print("ending...")
递归锁

threading.RLock和threading.Lock 的区别

RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁

同步条件threading.Event

Event是线程间通信最间的机制之一:一个线程发送一个event信号,其他的线程则等待这个信号。用于主线程控制其他线程的执行。 Events 管理一个flag,这个flag可以使用set()设置成True或者使用clear()重置为False,wait()则用于阻塞,在flag为True之前。flag默认为False。

  • Event.wait([timeout]) : 堵塞线程,直到Event对象内部标识位被设为True或超时(如果提供了参数timeout)
  • Event.set() :将标识位设为Ture
  • Event.clear() : 将标识位设为False
  • Event.isSet() :判断标识位是否为Ture
 1 # event 同步条件 简单的同步对象
 2 
 3 # import threading
 4 # event = threading.Event()  # 创建一个同步对象
 5 # event.wait() # 等 阻塞线程 等待标志位flag 被设定 如果被设定就开始执行
 6 # event.set() # 设定
 7 # event.clear() # 清除 被清除就会阻塞
 8 #
 9 # # 一个event对象可以用到多个线程里面
10 
11 import threading,time
12 
13 class Boss(threading.Thread):
14     def run(self):
15         print("Boss:今晚大家加班到22:00")
16         print(event.isSet) # 判断标志位是否是被设定
17         event.set() # 设定标志位
18         time.sleep(5)
19         print("Boss:<今天可以提早下班了>")
20         print(event.isSet)
21         event.set()
22 
23 class Worker(threading.Thread):
24     def run(self):
25         event.wait()
26         print("Worker:哎,又加班,命苦啊。。。")
27         time.sleep(1)
28         event.clear()
29         event.wait()
30         print("Workwe: 好....")
31 
32 if __name__ == '__main__':
33     event = threading.Event()
34 
35     threads = []
36     for i in range(5):
37         threads.append(Worker())
38     threads.append(Boss())
39     for t in threads:
40         t.start()
41     for r in threads:
42         r.join()
event 同步条件 简单的同步对象

当线程执行的时候,如果flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。

信号量(Semaphore)

   信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。

      计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)

      BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

 1 import threading,time
 2 class myThread(threading.Thread):
 3     def run(self):
 4         if semaphore.acquire():
 5             print(self.name)
 6             time.sleep(5)
 7             semaphore.release()
 8 if __name__=="__main__":
 9     semaphore=threading.Semaphore(50)
10     thrs=[]
11     for i in range(100):
12         thrs.append(myThread())
13     for t in thrs:
14         t.start()
信号量

queue 队列

适用于多线程编程的先进先出数据结构,可以用来安全的传递多线程信息。

queue 方法:

  • q = queue.Queue(maxsize=0) # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。
  • q.join()   # 等到队列为kong的时候,在执行别的操作
  • q.qsize()   # 返回队列的大小 (不可靠)
  • q.empty()    # 当队列为空的时候,返回True 否则返回False (不可靠)
  • q.full()     # 当队列满的时候,返回True,否则返回False (不可靠)
  • q.put(item, block=True, timeout=None) # 将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置,为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后,如果队列无法给出放入item的位置,则引发 queue.Full 异常
  • q.get(block=True, timeout=None) # 移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞,若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。
  • q.put_nowait(item) # 等效于 put(item,block=False)
  • q.get_nowait()     # 等效于 get(item,block=False)
 1 # import queue  # 线程 队列 数据结构
 2 #
 3 # q = queue.Queue(3) # FIFO先进先出的模式
 4 #
 5 # q.put("yangjian")
 6 # q.put(20)
 7 # q.put({"name":"yuan"})
 8 # # q.put_nowait(56) # q.put(block=Flase)
 9 #
10 # print(q.qsize()) #返回队列的大小
11 # print(q.empty()) #如果队列为空,返回True,反之False
12 # print(q.full()) #如果队列满了,返回True,反之False
13 # # q.put(34,False)
14 #
15 # while True:
16 #     data = q.get()
17 #     print(data)
18 #     print("-------------")
19 
20 # 先进后出
21 # import queue
22 # q = queue.LifoQueue()
23 #
24 # q.put(12)
25 # q.put("hello")
26 # q.put({"name":"yangjian"})
27 #
28 # while True:
29 #     data = q.get()
30 #     print(data)
31 #     print("-------")
32 
33 # 优先集
34 
35 # import queue
36 # q = queue.PriorityQueue()
37 # q.put([3,12])
38 # q.put([2,"hello"])
39 # q.put([4,{"name":"yangjian"}])
40 #
41 # while True:
42 #     data = q.get()
43 #     print(data)
44 #     print("-------")
队列三种模式
import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 1
  while count <11:
    print("making........")
    time.sleep(random.randrange(3))
    q.put(count)  # 在队列中放数据
    print('生产者 %s 生产 %s 包子..' %(name, count)) # 生产者生产包子
    count +=1
    q.task_done()  # 发送数据
    #q.join()
    # print("ok......")
def Consumer(name):
  count = 0
  while count <10:
    time.sleep(random.randrange(4))
    if not q.empty():  # 判断队列中是否为空
        data = q.get()  #在队列中取数据
        #q.task_done()
        q.join()  # 做了包子才鞥吃
        # print(data)
        print(' 33[32;1m消费者 %s 已经吃了 %s 包子... 33[0m' %(name, data))  # 吃包子
    else:
        print("-----没有包子了----")
    count +=1

p1 = threading.Thread(target=Producer, args=('A厨师',)) # 创建线程对象
c1 = threading.Thread(target=Consumer, args=('小明',))
c2 = threading.Thread(target=Consumer, args=('小红',))
c3 = threading.Thread(target=Consumer, args=('小刚',))
p1.start() # 启用
c1.start()
c2.start()
c3.start()  

进程                                                                            

 1 # 进程 并行
 2 # 进程调用第一种方式
 3 # from multiprocessing import Process
 4 #
 5 # import time
 6 #
 7 # def f(name):
 8 #     time.sleep(1)
 9 #     print("hello",name,time.ctime())
10 #
11 # if __name__ == '__main__':
12 #     p_list = []
13 #     for i in range(3):
14 #         p = Process(target=f,args=("alvin",))
15 #         p_list.append(p)
16 #         p.start()
17 #     for i in p_list:
18 #         i.join()
19 #     print("end")
20 
21 # 进程调用第二种方式
22 from multiprocessing import Process
23 import time
24 class MyProcess(Process):
25     def __init__(self,name):
26         super(MyProcess,self).__init__()
27         self.name = name
28 
29     def run(self):
30         time.sleep(1)
31         print("hello",self.name,time.ctime())
32 
33 if __name__ == '__main__':
34     p_list = []
35     for i in range(3):
36         p = MyProcess("xiaoming")
37         p.start()
38         p_list.append(p)
39 
40     for p in p_list:
41         p.join()
42     print("end")
进程的两种调用方式

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前还没有实现,库引用中提示必须是None; 
  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

属性:

  daemon:和线程的setDeamon功能一样

  name:进程名字。

  pid:进程号。

 进程间的通信

1、进程队列Queue

from multiprocessing import Process, Queue
import queue

def f(q,n):
    #q.put([123, 456, 'hello'])
    q.put(n*n+1)
    print("son process",id(q))

if __name__ == '__main__':
    q = Queue()  #try: q=queue.Queue()
    print("main process",id(q))

    for i in range(3):
        p = Process(target=f, args=(q,i))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())

2、管道pipe

from multiprocessing import Process,Pipe

def f(conn):
    conn.send("美女约吗?")
    response = conn.recv()
    print("杨建",response)
    conn.close()

if __name__ == '__main__':
    parent_conn,child_conn = Pipe() # 双向管道

    p = Process(target=f,args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    parent_conn.send("约")
    parent_conn.close()  

3、数据共享Managers

Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。

from multiprocessing import Process,Manager

def f(d,l,n):
    d[n] = "1"
    d["name"] = "yangjian"
    l.append(n)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()

        l = manager.list(range(1,6))

        p_list = []

        for i in range(10):
            p = Process(target=f,args=(d,l,i))

            p.start()
            p_list.append(p)

        for res in p_list:
            res.join()

        print(d)
        print(l)

 进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

方法:

  • apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。

  • close() : 阻止更多的任务提交到pool,待任务完成后,工作进程会退出。

  • terminate() : 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。

  • join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程

from multiprocessing import Process,Pool
import time,os

def foo(i):
    time.sleep(1)
    print(i)
    print("son",os.getpid())

    return "HELLO %s" %i  # 返回值给bar的参数

def bar(arg):  # 主进程调用
    print(arg)

if __name__ == '__main__':
    pool = Pool(10) # 进程池数最多允许几个线程 如果不写按默认CPU核数来

    print("main pid",os.getpid())
    for i in range(100):
        #pool.apply(func=foo,args=(i,)) # 同步接口
        #pool.apply_async(func=foo,args=(i,))# 异步接口

        # 回调函数 :callback 就是某个动作或者函数执行成功之后再去执行的函数
        pool.apply_async(func=foo,args=(i,),callback=bar)
    pool.close()
    pool.join()  # join 与 close调用顺序是固定的不可调换

    print("end") 

协程                                                                             

 协程又叫微线程,从技术的角度来说,“协程就是你可以暂停执行的函数”。如果你把它理解成“就像生成器一样”,那么你就想对了。 线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

  协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

  协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程

协程,又称微线程,纤程。英文名Coroutine。

优点1: 协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

优点2: 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

1、利用yield自定义协程

import time
import queue

def consumer(name):

    print("---->ready to eat baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" %(name,new_baozi))
        # time.sleep(1)

def prouducer():
    r = con.__next__()
    r = con2.__next__()

    n = 0
    while 1:
        time.sleep(1)
        print(" 33[32;1m[producer] 33[0m is making baozi %s and %s"%(n,n+1))
        con.send(n) # 发送数据赋值给yield
        con2.send(n+1)

if __name__ == '__main__':

    con = consumer("c1")
    con2 = consumer("c2")
    prouducer()

2、利用 greenlet模块实现协程

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator

from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def test2():
    print(56)
    gr1.switch()
    print(78)
    gr1.switch()

gr1 = greenlet(test1)
gr2 = greenlet(test2)

gr2.switch()

3、gevent

# gevent
import gevent
 
def foo():
    print("Running in foo")
    gevent.sleep(0)
    print("Explicit context switch to foo angin")
 
def bar():
    print("Explicit context to bar")
    gevent.sleep(0)
    print("Implicit context swich back to bar")
 
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])
 
# 输出结果:
Running in foo
Explicit context to bar
Explicit context switch to foo angin
Implicit context swich back to bar