疫情环境下的网络学习笔记 python 4.27

疫情环境下的网络学习笔记 python 4.27

上节课回顾

  • 开启线程的两种方式
  • tcp服务端实现并发
  • 线程对象的join方法
  • 同一个进程内多个线程数据是共享的
  • 线程对象属性和方法
  • 守护线程
  • 线程互斥锁
  • gil全局解释器锁
    • Cpython中解释器内存管理不是线程安全的
  • python多线程是否有用
    • IO密集型:多线程更节省资源
    • 计算密集型:多进程效率更高

今日内容

  • 死锁与递归锁
  • 信号量
  • Event事件
  • 线程q
  • 进程池与线程池
  • 协程
  • 协程实现tcp服务端并发效果

死锁与递归锁

死锁

from threading import Thread,Lock
import time
mutexA = Lock()
mutexB = Lock()
# A和B产生的是不同的对象,要想生成一样的对象,要写单例模式
class MyThread(Thread):
	def run(self):
		self.func1()
		self.func2()
		
	def func1(self):
		mutexA.acquire()
		print(self.name,'抢到A')
		mutexB.acquire()
		print('self.name,'抢到B'')
		nutexB.release()
		mutexA.release()
		
	def func2(self):
		mutexB.acquire()
		print('self.name,'抢到B'')
		time.sleep(2)
		mutexA.acquire()
		print(self.name,'抢到A')
		nutexA.release()
		mutexB.release()
		
if __name__ == '__main__':
	for i in range(10):
		t = MyThread()
		t.start()
        # 各自拿了对方的锁,关在另外的房间里,谁都出不去

两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程

递归锁

可以连续的acquire和release,但是只能被第一个抢到锁执行操作,连续acquire和release

内部有一个计数器,每acquire一次计数+1,每release一次计数-1,只要计数不为0,其他人都不能抢

python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁
mutexA = mutexB = RLock()

信号量

信号量在不同阶段对应的是不同的技术,并发编程中是锁,可以抢的锁的个数

在threading模块中导入 Semaphore ,用这个来生成一个锁

from threading import Thread,Semaphore
import threading
import time
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    # 生成一个锁,最多有5个线程可以获得锁,超出的就等待这5个线程释放了,再一起抢
    for i in range(23):
        t=Thread(target=func)
        t.start()

与进程池不同的地方在:每次抢锁都是不同的线程。进程池则是规定了线程量,来来回回都是这几个线程在工作,不会变

Event事件

一些进程 / 线程需要等待另外一些进程/线程运行完毕后才能运行

from threading import Thread, Event
import time

def light():
    print('红灯')
    time.sleep(3)
    print('绿灯')
    # 告诉等待红灯的人可以走了
    event.set()


def car(name):
    print(f'伞兵{name}号,准备就绪!')
    event.wait()  # 阻塞,等待接收信号
    print(f'伞兵{name}号,起飞')


if __name__ == '__main__':
    event = Event()   # 造了一个红绿灯,红绿灯必须在线程公共的进程下,即能被每个线程找到
    t = Thread(target=light)
    t.start()
    for i in range(10):
        t1 = Thread(target=car, args=(str(i),))
        t1.start()

线程q

# 先进先出
import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''
# 后进先出
import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
后进先出
third
second
first
'''
# 优先级q,可以给放入队列的数据设置进出的优先级
import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
# 数字越小优先级越高,优先级高的优先出队
'''
(10, 'b')
(20, 'a')
(30, 'c')
'''

进程池与线程池 重要

回顾tcp服务端实现并发:每来一个人,就开启一个线程 / 进程

先回顾一下tcp并发

import socket
from threading import Thread
def comm(conn):
	while True:
		try:
			data = conn.recv(1024)
			if len(data) == 0:break
			conn.send(data.upper())
		except:
			break
	conn.close()
	
def server():
	server = socket.socket()
	server.bind(('1270.0.0.1',8080))
	server.listen(5)
	while True:
		conn,addr = server.accept()
		t = Thread(target=comm,args=(conn,))
		t.start
		
if __name__ == '__main__':
	s = Thread(target=server)
	s.start
	

开设进程或线程,都需要消耗资源,线程开设的消耗小一点。不能左到无限制开设进程和线程,因为计算机硬件资源跟不上,应该在硬件能够正常工作的情况下最大限度利用它

:用来保证计算机硬件安全的情况下最大利用计算机,降低了程序的运行效率,但是保证了计算机硬件的安全,从而稳定运行程序

基本使用

# 线程池
from concurrent.futures import ThreadPoolExecutor ProcessPoolExecutor
import time
pool = ThreadPoolExecutor(5)
# 括号内可以传数字,不传回默认开设当前cpu个数5倍的线程,传了数字,池子里就固定有几个线程,这些线程不会销毁或重复

# 池子的使用很简单,只需要把任务往池子中submit即可
def task(n):
	print(n)
	time.sleep(1)
	return n**n
	
# 向池子中异步提交任务task
# 由于设置了池5,所以一次只有5个线程
# 结果通过回调得到,执行得到一个Future类,类中有一个result方法,得到结果
for i in range(20):
	res = pool.submit(task,i)
	print(res.result()) # None
	# 使用result,并发变成了串行,拿到了异步的结果
	# 可以定义一个列表,把future保存进列表,在另一个for 循环里一个个拿到res

pool.shutdown()
# 关闭线程池,等待线程池中所有的任务运行完毕
# 进程池
pool = ProcessPoolExecutor()
# 括号内可以传数字,不传默认以当前cpu核数开启进程,其他与线程一样
# 开启进程的代码一定要在main下面写
res = pool.submit(task,i).add_done_callback(回调函数)
# 运行结束之后自动触发回调函数,返回一个Future对象传给回调函数,可以通过result得到返回值

总结

  1. 导入模块 from concurrent.futures import ThreadPoolExecutor ProcessPoolExecutor
  2. 生成一个池子 pool = ThreadPoolExecutor(5)
  3. 得到回调 pool.submit(task,i).add_done_callback(回调函数),自动触发回调函数
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
pool = ProcessPoolExecutor(5)
pool.submit(task, i).add_done_callback(call_back)

协程

  • 进程:资源单位

  • 线程:执行单位

  • 协程:单线程下实现并发

    代码层面上检测所有IO操作,一旦遇到IO,在代码层切换,这样给CPU的感觉是程序一直在运行,没有IO,从而提升程序的运行效率

切换 + 保存状态

遇到io,切:导入模块

from gevent import monkey
monkey.patch_all()
# from gevent import monkey; monkey.patch_all()
from gevent import spawn
# 本身无法监测常见的一些io操作,使用的时候需要额外导入一句话
# 又由于上面两句话在使用gevent模块是肯定要导入的,所以支持简写
from gevent import monkey; monkey.patch_all()

协程实现tcp服务端的并发

单线程实现并发

# 服务端
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn


def communication(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()


def server(ip, port):
    server = socket.socket()
    server.bind((ip, port))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        spawn(communication, conn)


if __name__ == '__main__':
    g1 = spawn(server, '127.0.0.1', 8080)
    g1.join()
# 客户端
from threading import Thread, current_thread
import socket


def x_client():
    client = socket.socket()
    client.connect(('127.0.0.1',8080))
    n = 0
    while True:
        msg = '%s say hello %s'%(current_thread().name,n)
        n += 1
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))


if __name__ == '__main__':
    for i in range(500):
        t = Thread(target=x_client)
        t.start()

总结

可以通过:多进程下面开设多线程,多线程下开设协程,从而使执行效率提升