线程的补充(锁、信号量、事件、条件、定时器、队列)

线程的补充(锁、信号量、事件、条件、定时器、队列)
线程的补充

一、锁
1、数据安全问题
# 线程为什么要有锁
    # 线程之间的数据安全问题 :
        # += -= 赋值操作不安全
        
    # 线程安全的数据类型有:
        # pop append 都是线程安全的
        # 队列也是数据安全的
# logging 例子:数据不安全
线程的补充(锁、信号量、事件、条件、定时器、队列)
from threading import Thread n = 0 def func(): global n for i in range(150000): n -= 1 def func2(): global n for i in range(150000): n += 1 t_lst = [] for i in range(10): t2 = Thread(target=func2) t = Thread(target=func) t.start() t2.start() t_lst.append(t) t_lst.append(t2) for t in t_lst: t.join() print('--->',n) # --->-17665 因此在进行 += -=操作的时候应该加上锁: from threading import Thread,Lock n = 0 def func(lock): global n for i in range(150000): lock.acquire() n -= 1 lock.release() def func2(lock): global n for i in range(150000): lock.acquire() n += 1 lock.release() lock = Lock() t_lst = [] for i in range(10): t2 = Thread(target=func2,args=(lock,)) t = Thread(target=func,args=(lock,)) t.start() t2.start() t_lst.append(t) t_lst.append(t2) for t in t_lst: t.join() print('--->',n) # --->0 2、死锁问题(互斥锁才会出现死锁Lock,递归锁可以快速解决死锁问题RLock) 所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法运行下去。 此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。即:操作的时候,抢到一把锁之后还要再去抢第二把锁, 但是由于一个线程抢到一把锁,另一个线程抢到了另一把锁,所以导致死锁。 例如:(互斥锁才会出现死锁) import time from threading import Thread,Lock noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s拿到面条了'%name) fork_lock.acquire() print('%s拿到叉子了' % name) print('%s吃面' % name) time.sleep(0.3) fork_lock.release() print('%s放下叉子' % name) noodle_lock.release() print('%s放下面'%name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面条了'%name) print('%s吃面'%name) time.sleep(0.3) noodle_lock.release() print('%s放下面'%name) fork_lock.release() print('%s放下叉子' % name) name_list = ['小明','小红'] name_list2 = ['小白','小黑'] for name in name_list: Thread(target=eat1,args=(name,)).start() for name in name_list2: Thread(target=eat2,args=(name,)).start() 结果: 线程的补充(锁、信号量、事件、条件、定时器、队列) 解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。 这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。 上面的例子如果使用RLock代替Lock,则不会发生死锁: import time from threading import Thread,RLock fork_lock = noodle_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s拿到面条了'%name) fork_lock.acquire() print('%s拿到叉子了' % name) print('%s吃面' % name) time.sleep(0.3) fork_lock.release() print('%s放下叉子' % name) noodle_lock.release() print('%s放下面'%name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面条了'%name) print('%s吃面'%name) time.sleep(0.3) noodle_lock.release() print('%s放下面'%name) fork_lock.release() print('%s放下叉子' % name) name_list = ['小明','小红'] name_list2 = ['小白','小黑'] for name in name_list: Thread(target=eat1,args=(name,)).start() for name in name_list2: Thread(target=eat2,args=(name,)).start() 3、死锁总结 互斥锁:每释放一把钥匙,别人立刻就可以抢 递归锁:要等全部钥匙释放完了,别人才能去抢,实际上是一串钥匙,只有一个人能得到(图递归锁) 递归锁可以解决互斥锁的死锁问题 互斥锁 两把锁(一个门需要两把钥匙才能开,但可能出现一个人抢了一把钥匙,另一个人抢了另一把钥匙,导致大家都进不去) 多个线程抢 递归锁 一把锁(抢一串钥匙,第一把进了一个门,另一把钥匙再进第二个门...然后出第二个门,再出第一个门,完全出来后其他人才能抢这个钥匙串) 多个线程抢 递归锁好不好? 递归锁并不是一个好的解决方案 死锁现象的发生不是互斥锁的问题 而是程序员的逻辑有问题导致的 递归锁能够快速的解决死锁问题 递归锁 迅速恢复服务 递归锁替换互斥锁 在接下来的时间中慢慢把递归锁替换成互斥锁 能够完善代码的逻辑 提高代码的效率 多个线程之间,用完一个资源再用另外一个资源 先释放一个资源,再去获取一个资源的锁 二、信号量 跟进程的一样 Semaphore管理一个内置的计数器, 每当调用acquire()时内置计数器-1; 调用release() 时内置计数器+1; 计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。 import time from threading import Semaphore,Thread def func(index,sem): sem.acquire() print(index) time.sleep(1) sem.release() sem = Semaphore(5) for i in range(10): Thread(target=func,args=(i,sem)).start() 三、事件 跟进程的一样 线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。 为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为False。 如果有线程等待一个Event对象, 而这个Event对象的标志为False,那么这个线程将会被一直阻塞直至该标志为True。一个线程如果将一个Event对象的信号标志设置为True,它将唤醒所有等待这个Event对象的线程。 如果一个线程等待一个已经被设置为True的Event对象,那么它将忽略这个事件, 继续执行。 event.wait() 信号为False的时候就阻塞,直到事件内的信号变成True才不阻塞 event.set() 把信号变成True event.clear() 把信号变成False event.is_set() 查看信号状态是否为True 例如:检测数据库连接 import time import random from threading import Thread,Event def check(e): print('开始检测数据库连接') time.sleep(random.randint(1,3)) # 检测数据库连接(这里只是模拟,应该是真正检测的) e.set() # 成功了 def connect(e): for i in range(3): # 尝试三次去连接数据库 e.wait(0.5) # 超时就向下继续执行 if e.is_set(): # 如果True才显示连接成功 print('数据库连接成功') break else: print('尝试连接数据库%s次失败' % (i + 1)) else: raise TimeoutError e = Event() Thread(target=connect,args=(e,)).start() Thread(target=check,args=(e,)).start() 四、条件 Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。 如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。 就像排队进火车站一样 notify 控制流量 通知有多少人可以通过了 wait 在门口等待的所有人 acquire wait 使用前后都需要加锁 做的事情 release acquire notify 使用前后都需要加锁 release 例子: from threading import Thread,Condition def func(con,index): print('%s在等待'%index) con.acquire() con.wait() print('%s do something'%index) con.release() con = Condition() for i in range(10): t = Thread(target=func,args=(con,i)) t.start() # con.acquire() # con.notify_all() # 让所有的线程通过 # con.release() count = 10 while count > 0: num = int(input('>>>:')) con.acquire() con.notify(num) # 选择让多少个线程通过 count -= num con.release() 五、定时器 定时器,指定n秒后执行某个操作 from threading import Timer def func(): print('执行我啦') t = Timer(5,func) # 起一个定时器任务,设置多少秒后执行 t.start() print('主线程') 六、队列 queue队列 :使用import queue,用法与进程Queue一样
注意:
在进程模块multiprocessing中导入的队列Queue是IPC进程之间通信的,基于socket的,是在文件级别的
而queue模块的Queue队列是在内存级别的,不能用于IPC通信
1、先进先出(普通的队列)
from queue import Queue

q = Queue()
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())

结果:
1
2
3


2、后进先出(栈)
from queue import LifoQueue
lq = LifoQueue()
lq.put(1)
lq.put(2)
lq.put(3)

print(lq.get())
print(lq.get())
print(lq.get())

结果:
3
2
1


3、优先级队列
数字越小优先级越高
字母ASCII越小优先级越高
注意:不能数字和字母分别put

from queue import PriorityQueue
pq = PriorityQueue()
pq.put(10)
pq.put(5)
pq.put(6)

print(pq.get())
print(pq.get())
print(pq.get())

结果:
5
6
10



from queue import PriorityQueue
pq = PriorityQueue()
pq.put('a')
pq.put('A')
pq.put('b')

print(pq.get())
print(pq.get())
print(pq.get())

结果:
A
a
b

线程的补充(锁、信号量、事件、条件、定时器、队列)
from queue import PriorityQueue
pq = PriorityQueue()
pq.put((15,'abc'))  # 先按元组第一个元素比较,若相同则按第二个元素比较
pq.put((5,'ghi'))
pq.put((12,'def'))
pq.put((12,'aaa'))

print(pq.get())
print(pq.get())
print(pq.get())

结果:
(5, 'ghi')
(12, 'aaa')
(12, 'def')
线程的补充(锁、信号量、事件、条件、定时器、队列)


 
线程的补充(锁、信号量、事件、条件、定时器、队列)
线程的补充(锁、信号量、事件、条件、定时器、队列)
线程的补充

一、锁
1、数据安全问题
# 线程为什么要有锁
    # 线程之间的数据安全问题 :
        # += -= 赋值操作不安全
        
    # 线程安全的数据类型有:
        # pop append 都是线程安全的
        # 队列也是数据安全的
# logging 例子:数据不安全
线程的补充(锁、信号量、事件、条件、定时器、队列)
from threading import Thread n = 0 def func(): global n for i in range(150000): n -= 1 def func2(): global n for i in range(150000): n += 1 t_lst = [] for i in range(10): t2 = Thread(target=func2) t = Thread(target=func) t.start() t2.start() t_lst.append(t) t_lst.append(t2) for t in t_lst: t.join() print('--->',n) # --->-17665 因此在进行 += -=操作的时候应该加上锁: from threading import Thread,Lock n = 0 def func(lock): global n for i in range(150000): lock.acquire() n -= 1 lock.release() def func2(lock): global n for i in range(150000): lock.acquire() n += 1 lock.release() lock = Lock() t_lst = [] for i in range(10): t2 = Thread(target=func2,args=(lock,)) t = Thread(target=func,args=(lock,)) t.start() t2.start() t_lst.append(t) t_lst.append(t2) for t in t_lst: t.join() print('--->',n) # --->0 2、死锁问题(互斥锁才会出现死锁Lock,递归锁可以快速解决死锁问题RLock) 所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法运行下去。 此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。即:操作的时候,抢到一把锁之后还要再去抢第二把锁, 但是由于一个线程抢到一把锁,另一个线程抢到了另一把锁,所以导致死锁。 例如:(互斥锁才会出现死锁) import time from threading import Thread,Lock noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s拿到面条了'%name) fork_lock.acquire() print('%s拿到叉子了' % name) print('%s吃面' % name) time.sleep(0.3) fork_lock.release() print('%s放下叉子' % name) noodle_lock.release() print('%s放下面'%name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面条了'%name) print('%s吃面'%name) time.sleep(0.3) noodle_lock.release() print('%s放下面'%name) fork_lock.release() print('%s放下叉子' % name) name_list = ['小明','小红'] name_list2 = ['小白','小黑'] for name in name_list: Thread(target=eat1,args=(name,)).start() for name in name_list2: Thread(target=eat2,args=(name,)).start() 结果: 线程的补充(锁、信号量、事件、条件、定时器、队列) 解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。 这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。 上面的例子如果使用RLock代替Lock,则不会发生死锁: import time from threading import Thread,RLock fork_lock = noodle_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s拿到面条了'%name) fork_lock.acquire() print('%s拿到叉子了' % name) print('%s吃面' % name) time.sleep(0.3) fork_lock.release() print('%s放下叉子' % name) noodle_lock.release() print('%s放下面'%name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面条了'%name) print('%s吃面'%name) time.sleep(0.3) noodle_lock.release() print('%s放下面'%name) fork_lock.release() print('%s放下叉子' % name) name_list = ['小明','小红'] name_list2 = ['小白','小黑'] for name in name_list: Thread(target=eat1,args=(name,)).start() for name in name_list2: Thread(target=eat2,args=(name,)).start() 3、死锁总结 互斥锁:每释放一把钥匙,别人立刻就可以抢 递归锁:要等全部钥匙释放完了,别人才能去抢,实际上是一串钥匙,只有一个人能得到(图递归锁) 递归锁可以解决互斥锁的死锁问题 互斥锁 两把锁(一个门需要两把钥匙才能开,但可能出现一个人抢了一把钥匙,另一个人抢了另一把钥匙,导致大家都进不去) 多个线程抢 递归锁 一把锁(抢一串钥匙,第一把进了一个门,另一把钥匙再进第二个门...然后出第二个门,再出第一个门,完全出来后其他人才能抢这个钥匙串) 多个线程抢 递归锁好不好? 递归锁并不是一个好的解决方案 死锁现象的发生不是互斥锁的问题 而是程序员的逻辑有问题导致的 递归锁能够快速的解决死锁问题 递归锁 迅速恢复服务 递归锁替换互斥锁 在接下来的时间中慢慢把递归锁替换成互斥锁 能够完善代码的逻辑 提高代码的效率 多个线程之间,用完一个资源再用另外一个资源 先释放一个资源,再去获取一个资源的锁 二、信号量 跟进程的一样 Semaphore管理一个内置的计数器, 每当调用acquire()时内置计数器-1; 调用release() 时内置计数器+1; 计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。 import time from threading import Semaphore,Thread def func(index,sem): sem.acquire() print(index) time.sleep(1) sem.release() sem = Semaphore(5) for i in range(10): Thread(target=func,args=(i,sem)).start() 三、事件 跟进程的一样 线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。 为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为False。 如果有线程等待一个Event对象, 而这个Event对象的标志为False,那么这个线程将会被一直阻塞直至该标志为True。一个线程如果将一个Event对象的信号标志设置为True,它将唤醒所有等待这个Event对象的线程。 如果一个线程等待一个已经被设置为True的Event对象,那么它将忽略这个事件, 继续执行。 event.wait() 信号为False的时候就阻塞,直到事件内的信号变成True才不阻塞 event.set() 把信号变成True event.clear() 把信号变成False event.is_set() 查看信号状态是否为True 例如:检测数据库连接 import time import random from threading import Thread,Event def check(e): print('开始检测数据库连接') time.sleep(random.randint(1,3)) # 检测数据库连接(这里只是模拟,应该是真正检测的) e.set() # 成功了 def connect(e): for i in range(3): # 尝试三次去连接数据库 e.wait(0.5) # 超时就向下继续执行 if e.is_set(): # 如果True才显示连接成功 print('数据库连接成功') break else: print('尝试连接数据库%s次失败' % (i + 1)) else: raise TimeoutError e = Event() Thread(target=connect,args=(e,)).start() Thread(target=check,args=(e,)).start() 四、条件 Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。 如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。 就像排队进火车站一样 notify 控制流量 通知有多少人可以通过了 wait 在门口等待的所有人 acquire wait 使用前后都需要加锁 做的事情 release acquire notify 使用前后都需要加锁 release 例子: from threading import Thread,Condition def func(con,index): print('%s在等待'%index) con.acquire() con.wait() print('%s do something'%index) con.release() con = Condition() for i in range(10): t = Thread(target=func,args=(con,i)) t.start() # con.acquire() # con.notify_all() # 让所有的线程通过 # con.release() count = 10 while count > 0: num = int(input('>>>:')) con.acquire() con.notify(num) # 选择让多少个线程通过 count -= num con.release() 五、定时器 定时器,指定n秒后执行某个操作 from threading import Timer def func(): print('执行我啦') t = Timer(5,func) # 起一个定时器任务,设置多少秒后执行 t.start() print('主线程') 六、队列 queue队列 :使用import queue,用法与进程Queue一样
注意:
在进程模块multiprocessing中导入的队列Queue是IPC进程之间通信的,基于socket的,是在文件级别的
而queue模块的Queue队列是在内存级别的,不能用于IPC通信
1、先进先出(普通的队列)
from queue import Queue

q = Queue()
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())

结果:
1
2
3


2、后进先出(栈)
from queue import LifoQueue
lq = LifoQueue()
lq.put(1)
lq.put(2)
lq.put(3)

print(lq.get())
print(lq.get())
print(lq.get())

结果:
3
2
1


3、优先级队列
数字越小优先级越高
字母ASCII越小优先级越高
注意:不能数字和字母分别put

from queue import PriorityQueue
pq = PriorityQueue()
pq.put(10)
pq.put(5)
pq.put(6)

print(pq.get())
print(pq.get())
print(pq.get())

结果:
5
6
10



from queue import PriorityQueue
pq = PriorityQueue()
pq.put('a')
pq.put('A')
pq.put('b')

print(pq.get())
print(pq.get())
print(pq.get())

结果:
A
a
b

线程的补充(锁、信号量、事件、条件、定时器、队列)
from queue import PriorityQueue
pq = PriorityQueue()
pq.put((15,'abc'))  # 先按元组第一个元素比较,若相同则按第二个元素比较
pq.put((5,'ghi'))
pq.put((12,'def'))
pq.put((12,'aaa'))

print(pq.get())
print(pq.get())
print(pq.get())

结果:
(5, 'ghi')
(12, 'aaa')
(12, 'def')
线程的补充(锁、信号量、事件、条件、定时器、队列)


 
线程的补充(锁、信号量、事件、条件、定时器、队列)