线程锁、threading.local(flask源码中用的到)、线程池、生产者消费者模型
一、线程锁
线程安全,多线程操作时,内部会让所有线程排队处理。如:list/dict/Queue
线程不安全 + 人(锁) => 排队处理
1、RLock/Lock:一次放一个
a、创建10个线程,在列表中追加自己,如下代码:
import threading v = [] def func(arg): v.append(arg) print(v) for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start()
b、创建10个线程,把自己添加到列表中,再读取列表的最后一个,如下代码:
import threading import time v = [] lock = threading.Lock() def func(arg): lock.acquire() # 加锁 v.append(arg) time.sleep(0.01) m = v[-1] print(arg,m) lock.release() # 释放锁 for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start()
注意:RLock和Lock用法一样,只是Lock只能锁一次解一次,RLock支持锁多次解多次,以后用RLock。
2、BoundedSemaphore(n) ,信号量, 一次放n个,如下代码:
import threading import time lock = threading.BoundedSemaphore(3) def func(arg): lock.acquire() # 加锁 time.sleep(1) print(arg) lock.release() # 释放锁 for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start()
3、condition(),一次放x个,x可由用户动态输入,代码如下:
1)方式一:
import time import threading lock = threading.Condition() def func(arg): print('线程进来了') lock.acquire() lock.wait() # 加锁 print(arg) time.sleep(1) lock.release() for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() while True: inp = int(input('>>>')) lock.acquire() lock.notify(inp) lock.release()
2)方式二:
import time import threading lock = threading.Condition() def f1(): print('来执行函数了') input(">>>") return True def func(arg): print('线程进来了') lock.wait_for(f1) # 等函数f1执行完毕后继续往下走 print(arg) time.sleep(1) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
4、Event,一次放所有,如下示例:
import threading lock = threading.Event() def func(arg): print('线程来了') lock.wait() # 加锁:红灯 print(arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() input(">>>") lock.set() # 绿灯 lock.clear() # 再次变红灯 for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() input(">>>") lock.set()
总结:
线程安全,列表和字典线程安全;
为什么要加锁? 非线程安全,控制一段代码;
二、threading.local()
作用:内部自动为每个线程维护一个空间(本质是字典),用于当前线程存取属于自己的值,保证线程之间的数据隔离。
{
线程ID : { . . . },
线程ID : { . . . },
线程ID : { . . . },
线程ID : { . . . },
}
""" 以后:Flask框架内部看到源码 上下文管理 """ import time import threading INFO = {} class Local(object): def __getattr__(self, item): ident = threading.get_ident() return INFO[ident][item] def __setattr__(self, key, value): ident = threading.get_ident() if ident in INFO: INFO[ident][key] = value else: INFO[ident] = {key:value} obj = Local() def func(arg): obj.phone = arg # 调用对象的 __setattr__方法(“phone”,1) time.sleep(2) print(obj.phone,arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
import threading import time v = threading.local() def func(arg): v.phone = arg # 内部会为当前线程创建一个空间用于存储:phone = 自己的值 time.sleep(2) print(v.phone,arg) # 去当前线程自己空间取值 for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start()