python之路之线程,进程,协程2 一、线程  二、生产者消费者模型及队列 三、进程 四、线程池的实现 五、 协程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

  1、创建线程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

  2、主线程是否等待子线程

     t.setDaemon(Ture/False):默认是false,等待子线程完成,ture,表示不等待子线程结束

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

   3、主线程等待,子线程执行

    join(),一直等到子线程结束

    join(3),最多等待3秒,如果子线程需要两秒,则等待2秒。

  4、线程锁

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

    R.rlock()

 1 #!/usr/bin/env python
 2 #coding:utf-8
 3    
 4 import threading
 5 import time
 6    
 7 gl_num = 0
 8    
 9 lock = threading.RLock()
10    
11 def Func():
12     lock.acquire()
13     global gl_num
14     gl_num +=1
15     time.sleep(1)
16     print gl_num
17     lock.release()
18        
19 for i in range(10):
20     t = threading.Thread(target=Func)
21     t.start()
线程锁

  5、线程事件

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3  
 4 import threading
 5  
 6  
 7 def do(event):
 8     print 'start'
 9     event.wait()
10     print 'execute'
11  
12  
13 event_obj = threading.Event()
14 for i in range(10):
15     t = threading.Thread(target=do, args=(event_obj,))
16     t.start()
17  
18 event_obj.clear()
19 inp = raw_input('input:')
20 if inp == 'true':
21     event_obj.set()
线程事件

  6、线程池

    python内部没有提供

    需要自定义

 二、生产者消费者模型及队列

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

三、进程

  1、创建进程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

  2、daemon

      默认false,歘歘

  3、jion()等待

    python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

  4、进程之间数据不能共享

 1 #!/usr/bin/env python
 2 #coding:utf-8
 3  
 4 from multiprocessing import Process
 5 from multiprocessing import Manager
 6  
 7 import time
 8  
 9 li = []
10  
11 def foo(i):
12     li.append(i)
13     print 'say hi',li
14   
15 for i in range(10):
16     p = Process(target=foo,args=(i,))
17     p.start()
18      
19 print 'ending',li
进程之间数据不共享

          ······线程之间数据是共享的·············

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

      ·············进程数据不能共享(默认)············

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

~~~~~~~~~~~~~~~~~~~~~~~~~进程之间数据共享~~~~~~~~~~~~~~~~~~~~

 1 #方法一,Array
 2 from multiprocessing import Process,Array
 3 temp = Array('i', [11,22,33,44])
 4  
 5 def Foo(i):
 6     temp[i] = 100+i
 7     for item in temp:
 8         print i,'----->',item
 9  
10 for i in range(2):
11     p = Process(target=Foo,args=(i,))
12     p.start()
13  
14 #方法二:manage.dict()共享数据
15 from multiprocessing import Process,Manager
16  
17 manage = Manager()
18 dic = manage.dict()
19  
20 def Foo(i):
21     dic[i] = 100+i
22     print dic.values()
23  
24 for i in range(2):
25     p = Process(target=Foo,args=(i,))
26     p.start()
27     p.join()
进程数据共享py2.7(两个方法)

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

  5、进程池

    p  = Pool(5)

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from  multiprocessing import Process,Pool
 4 import time
 5   
 6 def Foo(i):
 7     time.sleep(2)
 8     return i+100
 9   
10 def Bar(arg):
11     print arg
12   
13 pool = Pool(5)
14 #print pool.apply(Foo,(1,))
15 #print pool.apply_async(func =Foo, args=(1,)).get()
16   
17 for i in range(10):
18     pool.apply_async(func=Foo, args=(i,),callback=Bar)
19   
20 print 'end'
21 pool.close()
22 pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
进程池

 python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

~~~~~~~~~~~~进程池基础之apply和apply_async方法区别~~~~~~~~~~~~~~~~

    p.apply()  每一个任务是排队进行,进程.join()

    p.apply_async()  每一个任务并发进行,可以设置回调函数,进程无.join(),daemon=True

四、线程池的实现

  1、低配版线程池

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

 python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

   2、高配版线程池

       (1)、设计思路

  1 复制代码
  2 
  3 #!/usr/bin/env python
  4 # -*- coding:utf-8 -*-
  5 
  6 import queue
  7 import threading
  8 import contextlib
  9 import time
 10 
 11 StopEvent = object()
 12 
 13 
 14 class ThreadPool(object):
 15 
 16     def __init__(self, max_num, max_task_num = None):
 17         if max_task_num:
 18             self.q = queue.Queue(max_task_num)
 19         else:
 20             self.q = queue.Queue()
 21         self.max_num = max_num
 22         self.cancel = False
 23         self.terminal = False
 24         self.generate_list = []
 25         self.free_list = []
 26 
 27     def run(self, func, args, callback=None):
 28         """
 29         线程池执行一个任务
 30         :param func: 任务函数
 31         :param args: 任务函数所需参数
 32         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
 33         :return: 如果线程池已经终止,则返回True否则None
 34         """
 35         if self.cancel:
 36             return
 37         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
 38             self.generate_thread()
 39         w = (func, args, callback,)
 40         self.q.put(w)
 41 
 42     def generate_thread(self):
 43         """
 44         创建一个线程
 45         """
 46         t = threading.Thread(target=self.call)
 47         t.start()
 48 
 49     def call(self):
 50         """
 51         循环去获取任务函数并执行任务函数
 52         """
 53         current_thread = threading.currentThread()
 54         self.generate_list.append(current_thread)
 55 
 56         event = self.q.get()
 57         while event != StopEvent:
 58 
 59             func, arguments, callback = event
 60             try:
 61                 result = func(*arguments)
 62                 success = True
 63             except Exception as e:
 64                 success = False
 65                 result = None
 66 
 67             if callback is not None:
 68                 try:
 69                     callback(success, result)
 70                 except Exception as e:
 71                     pass
 72 
 73             with self.worker_state(self.free_list, current_thread):
 74                 if self.terminal:
 75                     event = StopEvent
 76                 else:
 77                     event = self.q.get()
 78         else:
 79 
 80             self.generate_list.remove(current_thread)
 81 
 82     def close(self):
 83         """
 84         执行完所有的任务后,所有线程停止
 85         """
 86         self.cancel = True
 87         full_size = len(self.generate_list)
 88         while full_size:
 89             self.q.put(StopEvent)
 90             full_size -= 1
 91 
 92     def terminate(self):
 93         """
 94         无论是否还有任务,终止线程
 95         """
 96         self.terminal = True
 97 
 98         while self.generate_list:
 99             self.q.put(StopEvent)
100 
101         self.q.queue.clear()
102 
103     @contextlib.contextmanager
104     def worker_state(self, state_list, worker_thread):
105         """
106         用于记录线程中正在等待的线程数
107         """
108         state_list.append(worker_thread)
109         try:
110             yield
111         finally:
112             state_list.remove(worker_thread)
113 
114 
115 
116 # How to use
117 
118 
119 pool = ThreadPool(5)
120 
121 def callback(status, result):
122     # status, execute action status
123     # result, execute action return value
124     pass
125 
126 
127 def action(i):
128     print(i)
129 
130 for i in range(30):
131     ret = pool.run(action, (i,), callback)
132 
133 time.sleep(5)
134 print(len(pool.generate_list), len(pool.free_list))
135 print(len(pool.generate_list), len(pool.free_list))
136 # pool.close()
137 # pool.terminate()
View Code

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

    2、上下文管理基础

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

 python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

    3、上下文管理之with自定义open

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 
  4 import queue
  5 import threading
  6 import contextlib
  7 import time
  8 
  9 StopEvent = object()
 10 
 11 
 12 class ThreadPool(object):
 13 
 14     def __init__(self, max_num, max_task_num = None):
 15         if max_task_num:
 16             self.q = queue.Queue(max_task_num)
 17         else:
 18             self.q = queue.Queue()
 19             # 最多创建的线程数(线程池最大容量)
 20         self.max_num = max_num
 21         self.cancel = False
 22         self.terminal = False
 23         # 真实创建的线程列表
 24         self.generate_list = []
 25         # 空闲线程数量
 26         self.free_list = []
 27 
 28     def run(self, func, args, callback=None):
 29         """
 30         线程池执行一个任务
 31         :param func: 任务函数
 32         :param args: 任务函数所需参数
 33         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
 34         :return: 如果线程池已经终止,则返回True否则None
 35         """
 36         w = (func, args, callback,)
 37         self.q.put(w)
 38         # 把任务放在一个元组里
 39 
 40         if self.cancel:
 41             return
 42         # 如果没有空闲线程,且创建的线程数目小于线程池最大创建数目
 43         # 则创建线程
 44         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
 45             self.generate_thread()
 46 
 47     def generate_thread(self):
 48         """
 49         创建一个线程
 50         """
 51         t = threading.Thread(target=self.call)
 52         t.start()
 53 
 54     def call(self):
 55         """
 56         循环去获取任务函数并执行任务函数
 57         """
 58         # 获取当前线程
 59         current_thread = threading.currentThread()
 60         self.generate_list.append(current_thread)
 61         # 去任务并执行
 62         event = self.q.get()
 63         while event != StopEvent:
 64             # 是任务
 65             # 解开任务包
 66             func, arguments, callback = event
 67             # 执行任务
 68             try:
 69                 result = func(*arguments)
 70                 success = True
 71             except Exception as e:
 72                 success = False
 73                 result = e
 74 
 75             if callback is not None:
 76                 try:
 77                     callback(success, result)
 78                 except Exception as e:
 79                     pass
 80             # 标记 空闲了
 81             with self.worker_state(self.free_list, current_thread):
 82                 if self.terminal:
 83                     event = StopEvent
 84                 else:
 85                     # 取任务
 86                     event = self.q.get()
 87         else:
 88             # 不是元组,不是任务
 89             self.generate_list.remove(current_thread)
 90 # 想要终止
 91     # 1、让正在从队列中取任务的线程挂掉
 92     # 2、主线程,你跳我就跳
 93 
 94     def close(self):
 95         """
 96         执行完所有的任务后,所有线程停止
 97         """
 98         self.cancel = True
 99         full_size = len(self.generate_list)
100         while full_size:
101             self.q.put(StopEvent)  # 给队列加终止符,有几个加几个
102             full_size -= 1
103 
104     def terminate(self):
105         """
106         无论是否还有任务,终止线程
107         """
108         self.terminal = True
109 
110         while self.generate_list:
111             self.q.put(StopEvent)
112 
113         self.q.queue.clear()
114 
115     @contextlib.contextmanager # 装饰器 处理上下文
116     def worker_state(self, state_list, worker_thread):
117         """
118         用于记录线程中正在等待的线程数
119         """
120         state_list.append(worker_thread)
121         try:
122             yield
123         finally:
124             state_list.remove(worker_thread)
125 
126 
127 pool = ThreadPool(5)
128 
129 def callback(status, result):
130     # status, execute action status
131     # result, execute action return value
132     pass
133 
134 
135 def action(i):
136     print(i)
137 
138 
139 for i in range(30):
140     # 将任务放在队列中
141     # 着手开始处理任务
142     #           -创建线程
143                 #   -有空闲线程,则不再创建线程
144                 #     -不能高于线程池的限制
145                 #     -根据任务个数判断
146 
147     #           -线程去队列中取任务
148     ret = pool.run(action, (i,), callback)
149 
150 time.sleep(5)
151 print(len(pool.generate_list), len(pool.free_list))
152 print(len(pool.generate_list), len(pool.free_list))
153 pool.close()
154 pool.terminate()
最终代码

五、 协程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

  greenlet

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3  
 4  
 5 from greenlet import greenlet
 6  
 7  
 8 def test1():
 9     print 12
10     gr2.switch()
11     print 34
12     gr2.switch()
13  
14  
15 def test2():
16     print 56
17     gr1.switch()
18     print 78
19  
20 gr1 = greenlet(test1)
21 gr2 = greenlet(test2)
22 gr1.switch()

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程

 1 import gevent
 2  
 3 def foo():
 4     print('Running in foo')
 5     gevent.sleep(0)
 6     print('Explicit context switch to foo again')
 7  
 8 def bar():
 9     print('Explicit context to bar')
10     gevent.sleep(0)
11     print('Implicit context switch back to bar')
12  
13 gevent.joinall([
14     gevent.spawn(foo),
15     gevent.spawn(bar),
16 ])

 遇到io操作自动切换:

 1 from gevent import monkey; monkey.patch_all()
 2 import gevent
 3 import urllib2
 4 
 5 def f(url):
 6     print('GET: %s' % url)
 7     resp = urllib2.urlopen(url)
 8     data = resp.read()
 9     print('%d bytes received from %s.' % (len(data), url))
10 
11 gevent.joinall([
12         gevent.spawn(f, 'https://www.python.org/'),
13         gevent.spawn(f, 'https://www.yahoo.com/'),
14         gevent.spawn(f, 'https://github.com/'),
15 ])
View Code

python之路之线程,进程,协程2
一、线程
 二、生产者消费者模型及队列
三、进程
四、线程池的实现
五、 协程