进程/线程/协程
分类:
IT文章
•
2024-03-18 22:09:55
概念:
进程:运行中的程序 有生命周期 关掉程序就销毁了
进程调度:1)先来先服务算法
2)短作业优先调度算法
3)时间片轮转法
4)多级反馈队列
并发:多个程序一起运行,轮流交替使用资源,独木桥 a先走一段 然后让给b走
并行:指两者同时执行,赛跑,两个人同时往前跑
阻塞:input sleep 文件的输入输出 网络请求 recv accept
同步:一个任务的完成需要依赖另一个任务,等到被依赖的任务完成后,依赖的任务才算完成 等待的过程什么都不干
异步:不需要等待被依赖的任务完成,当等待被依赖的任务完成时,立即执行 等待的过程可以干别的事
同步阻塞:一直等待
异步阻塞:可以干别的事
同步非阻塞:来回切换观察
异步非阻塞:可以干别的事。等待别人通知
多进程 :同时开启多个运行程序
进程:
1.创建进程:
from multiprocessing import Process
import os
def func(m,n):
print(m,n)
print('获取子进程的pid:', os.getpid())
print('获取子进程的父进程pid:', os.getppid())
if __name__ == '__main__':
p = Process(target=func,args=('参数1','参数2')) #创建一个对象 传递一个参数是需要后面加,
p.start() #启动一个子进程 后面的print也同时执行 所以是异步的
print('获取此进程的pid:',os.getpid())
print('获取此进程的父进程pid:', os.getppid())
方法一
from multiprocessing import Process
class MyProcess(Process): #创建一个类,继承Process类
def __init__(self,args1,args2):
super().__init__()
self.args1 = args1
self.args2 = args2
def run(self): #必须实现一个run方法,run方法中是在子进程中的代码
print(self.args1)
print(self.args2)
if __name__ == '__main__':
p = MyProcess(1,2)
p.start()
View Code
2.进程的几个方法:
from multiprocessing import Process
import time
def func(n):
print(n)
time.sleep(2)
if __name__ == '__main__':
p = Process(target=func,args=(1,))
p.start()
p.join() #join方法 感知子进程的结束,才执行之后的print
print('hello')
View Code
3.开启多个子进程:
from multiprocessing import Process
import time
def func(s):
print('子进程%s'%s)
time.sleep(2)
if __name__ == '__main__':
for i in range(10):
p = Process(target=func,args=(i,))
p.start()
View Code
>>基于多进程实现scoket实验(可以重复开启多个client)
server端
import socket
from multiprocessing import Process
def serve(conn):
ret = '你好'.encode('utf-8')
conn.send(ret)
msg = conn.recv(1024).decode('utf-8')
print(msg)
conn.close()
if __name__ == '__main__':
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
while True:
conn,addr = sk.accept()
p = Process(target=serve,args=(conn,))
p.start()
sk.close()
client端
import socket
sk=socket.socket()
sk.connect(('127.0.0.1',8080))
msg = sk.recv(1024).decode('utf-8')
print(msg)
msg2= input('>>>').encode('utf-8')
sk.send(msg2)
sk.close()
4. 多进程之间数据隔离问题
from multiprocessing import Process
n = 100
def func():
global n
n = n-1
return 111
if __name__ == '__main__':
n_l = []
for i in range(100):
p = Process(target=func)
p.start()
n_l.append(p)
for p in n_l : p.join()
print(n)
#多进程之间的数据是隔离的不共享的
View Code
5.守护进程
#守护进程随着主进程代码执行结束而结束
#守护线程会在主线程结束之后等待子线程结束而结束
from multiprocessing import Process
import time
def func():
while True:
time.sleep(0.2)
print('我还活着')
if __name__ == '__main__':
p = Process(target=func)
p.daemon = True #设置子线程为守护进程
p.start()
time.sleep(2)
print('主进程结束')
#守护进程会随着 主进程的代码执行完毕而结束
#在主进程结束一个子进程 p.terminate() 结束一个进程并不是执行方法后立即生效,需要一个操作系统效应的过程
#p.name获取进程的名字
View Code
6.进程锁
lock = Lock() # 创造了一把锁
lock.acquire() # 获取了这把锁的钥匙
lock.release() # 归还这把锁的钥匙
import json
import time
from multiprocessing import Process
from multiprocessing import Lock,Manager
def show(i):
with open('ticket') as f:
dic=json.load(f)
print("余票 %s"%dic['ticket'])
def buy_ticket(i,lock):
lock.acquire()
with open('ticket') as f:
dic = json.load(f) #文件用load 字符串用loads
time.sleep(0.1)
if dic['ticket']>0:
dic['ticket'] -= 1
print("买到票了%s"%i)
else:
print("票卖完了%s"%i)
time.sleep(0.1)
with open('ticket','w') as f:
json.dump(dic,f)
lock.release()
if __name__ == '__main__':
for i in range(10):
p = Process(target=show,args=(i,))
p.start()
lock = Lock()
for i in range(10):
p = Process(target=buy_ticket,args=(i,lock))
p.start()
---------------------------------------------------
ticket
{"ticket": 1}
View Code
7.信号量
from multiprocessing import Process,Semaphore
import time
def func(i,sem):
sem.acquire()
print('进程%s'%i)
time.sleep(4)
sem.release()
if __name__ == '__main__':
sem = Semaphore(4) #允许开启四个进程每一次
for i in range(10):
p = Process(target=func,args=(i,sem))
p.start()
View Code
8.事件(红绿灯案例)
'''
from multiprocessing import Event
e = Event() #创建一个事件
print(e.is_set()) #查看一个事件
e.set() #更改事件的状态为true
print(e.is_set())
e.wait() #依据e.is_set的值来决定是否阻塞
print(123)
e.clear()#将事件的状态更改为false
'''
from multiprocessing import Event,Process
import time
import random
def cars(e,i):
if not e.is_set():
print('car %i在等待通行'%i)
e.wait() #阻塞 直到一个事件状态改变
else:
print('car %i可通行' % i)
def light(e):
while True:
if e.is_set():
print('绿灯亮了')
time.sleep(2)
e.clear()
else:
print('红灯亮了')
time.sleep(2)
e.set()
if __name__ == '__main__':
e = Event()
p = Process(target=light,args=(e,))
p.start()
for i in range(20):
car = Process(target=cars,args=(e,i))
car.start()
time.sleep(random.random())
View Code
9.队列
from multiprocessing import Queue,Process
def produce(q):
q.put('hello')
def consume(q):
print(q.get())
if __name__ == '__main__':
q = Queue()
p = Process(target=produce,args=(q,))
p.start()
c = Process(target=consume, args=(q,))
c.start()
View Code
10.管道
from multiprocessing import Pipe,Process
def func(conn1,conn2):
conn2.close()
while True:
try:
msg = conn1.recv()
print(msg)
except EOFError:
conn1.close()
break
if __name__ == '__main__':
conn1, conn2 = Pipe()
Process(target=func,args=(conn1, conn2)).start()
conn1.close()
for i in range(20):
conn2.send('吃了么')
conn2.close()
View Code
11.进程池
import os
from multiprocessing import Pool
import time
def func(n):
print('start func%s'%n,os.getpid())
time.sleep(1)
print('end func%s'%n,os.getpid())
if __name__ == '__main__':
p = Pool(5)
for i in range(10):
p.apply(func,args=(i,)) #同步
-------------------------------------------------------------
import os
from multiprocessing import Pool
import time
def func(n):
print('start func%s'%n,os.getpid())
time.sleep(1)
print('end func%s'%n,os.getpid())
if __name__ == '__main__':
p = Pool(5)
for i in range(10):
p.apply(func,args=(i,)) #异步
p.close() #结束进程池接受任务
p.join() #感知进程池中的任务执行结束
View Code
>>基于进程池实现scoket实验(可以重复开启多个client)
server端
import socket
from multiprocessing import Pool
def func(conn):
conn.send(b'hello')
print(conn.recv(1024).decode('utf-8'))
conn.close()
if __name__ == '__main__':
p = Pool(5)
sk= socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
while True:
conn, addr = sk.accept()
p.apply_async(func,args=(conn,))
sk.close()
client端
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8080))
ret = sk.recv(1024).decode('utf-8')
print(ret)
msg = input('>>>').encode('utf-8')
sk.send(msg)
sk.close()
12.进程池回调函数
from multiprocessing import Pool
import os
def func1(n):
print('in func1',os.getpid())
return n*n #子进程
def func2(nn):
print('in func2',os.getpid())
print(nn) #主进程
if __name__ == '__main__':
print('主进程:',os.getpid())
p = Pool(5)
for i in range(10):
p.apply_async(func1,args=(10,),callback=func2) #callback回调
p.close()
p.join()
View Code
13.生产者消费者模型
import time
import random
from multiprocessing import Process,Queue
def produce(q,name,food):
for i in range(4):
time.sleep(random.randint(1, 3))
f = '%s生产%s%s' % (name, food, i)
print(f)
q.put(f)
def consumer(q,name):
while True:
food = q.get()
if food is None:
print("%s获得一个空"%name)
break
print('%s吃了%s'%(name,food))
if __name__ == '__main__':
q = Queue(20)
p = Process(target=produce,args=(q,'小白','肉包子'))
p2 = Process(target=produce, args=(q, '小黑', '菜包子'))
c1 = Process(target=consumer,args=(q,'狗'))
c2 = Process(target=consumer, args=(q,'猫'))
p.start()
p2.start()
c1.start()
c2.start()
p.join()
p2.join()
q.put(None)
q.put(None)
View Code
线程:
1.创建线程
from threading import Thread
import time
def func(n):
time.sleep(1)
print(n)
for i in range(10):
t = Thread(target=func, args=(i,))
t.start()
方法一
from threading import Thread
import time
class MyThread(Thread):
def __init__(self,arg):
super().__init__()
self.arg = arg
def run(self):
time.sleep(1)
print(self.arg)
t=MyThread(10)
t.start()
方法二
2.数据共享问题
from threading import Thread
import os
def func(n):
global g
g = 0
print(g,os.getpid())
g=100
t_list=[]
for i in range(10):
t = Thread(target=func, args=(i,))
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(g)
#多线程的数据是共享的
View Code
3.>>基于多线程实现scoket实验(可以重复开启多个client)
server端
import socket
from threading import Thread
def chat(conn):
conn.send(b'hello')
msg = conn.recv(1024).decode('utf-8')
print(msg)
conn.close()
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
while True:
conn,addr = sk.accept()
Thread(target=chat,args=(conn,)).start()
sk.close()
client端
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8080))
msg = sk.recv(1024).decode('utf-8')
print(msg)
info = input('>>>').encode('utf-8')
sk.send(info)
sk.close()
4.守护线程
#守护进程随着主进程代码执行结束而结束
#守护线程会在主线程结束之后等待子线程结束而结束
from threading import Thread
import time
def func1():
print(666)
time.sleep(2)
def func2():
print('in func2')
time.sleep(2)
t = Thread(target=func1,)
t.daemon = True
t.start()
print('主线程')
t2 = Thread(target=func2,)
t2.start()
View Code
5.线程锁
#死锁现象
from threading import Thread,Lock
import time
noodle_lock = Lock()
fork_lock = Lock()
def func(name):
noodle_lock.acquire()
print('%s拿到面条'%name)
fork_lock.acquire()
print('%s拿到叉子'% name)
print('%s吃面'%name)
fork_lock.release()
noodle_lock.release()
def func1(name):
fork_lock.acquire()
print('%s拿到叉子' % name)
time.sleep(5)
noodle_lock.acquire()
print('%s拿到面条' % name)
print('%s吃面' % name)
noodle_lock.release()
fork_lock.release()
Thread(target=func,args=('1号顾客',)).start()
Thread(target=func1,args=('2号顾客',)).start()
Thread(target=func,args=('3号顾客',)).start()
Thread(target=func1,args=('4号顾客',)).start()
死锁现象(互斥锁lock)
from threading import Thread,RLock
import time
noodle_lock = fork_lock = RLock()
def func(name):
noodle_lock.acquire()
print('%s拿到面条'%name)
fork_lock.acquire()
print('%s拿到叉子'% name)
print('%s吃面'%name)
time.sleep(2)
fork_lock.release()
noodle_lock.release()
def func1(name):
fork_lock.acquire()
print('%s拿到叉子' % name)
noodle_lock.acquire()
print('%s拿到面条' % name)
print('%s吃面' % name)
time.sleep(2)
noodle_lock.release()
fork_lock.release()
Thread(target=func,args=('1号顾客',)).start()
Thread(target=func1,args=('2号顾客',)).start()
Thread(target=func,args=('3号顾客',)).start()
Thread(target=func1,args=('4号顾客',)).start()
递归锁(rlock)
6.信号量
from threading import Thread,Semaphore
import time
def func(sem,a,b):
sem.acquire()
time.sleep(1)
print(a+b)
sem.release()
sem = Semaphore(4)
for i in range(10):
t = Thread(target=func, args=(sem,i, i + 5))
t.start()
View Code
7.事件
#事件被创建的时候是false状态
#flase状态 wait()阻塞
#true状态 wait()非阻塞
#set false状态设置为true
#clear 设置状态为false
#连接数据库
#检测数据库的可连接情况
from threading import Thread,Event
import time
import random
def connect_db(e):
count = 0
while count<3:
e.wait(0.5) #状态为flase的时候 只等待0.5s就结束
if e.is_set() == True:
print('连接成功')
break
else:
count += 1
print('第%s次连接失败'%count)
else:
raise TimeoutError
def check_web(e):
time.sleep(random.randint(0,3))
e.set()
e = Event()
t1 = Thread(target=connect_db,args=(e,))
t2 = Thread(target=check_web,args=(e,))
t1.start()
t2.start()
View Code
8.条件
#notify(int数据类型) 造钥匙
from threading import Condition,Thread
def func(con,i):
con.acquire()
con.wait() #等钥匙
print('在第%s个循环里'%i)
con.release()
con = Condition()
for i in range(10):
Thread(target=func,args=(con,i)).start()
while True:
num = int(input('>>>'))
con.acquire()
con.notify(num) #造钥匙
con.release()
View Code
9.定时器
from threading import Timer
import time
def func():
print('时间同步')
while True:
Timer(5,func).start()
time.sleep(5)
View Code
10.队列
import queue
q = queue.Queue() # 队列 先进先出
q.put(1)
q.put(2)
print(q.get())
print(q.get())
# q.put_nowait()
# q.get_nowait()
q = queue.LifoQueue() #栈 先进后出
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
q = queue.PriorityQueue() #优先级队列 数字越小优先级越高 相同优先级按ascii排
q.put((20,'a'))
q.put((10,'b'))
q.put((1,'c'))
q.put((1,'q'))
print(q.get())
View Code
11.线程池
from concurrent.futures import ThreadPoolExecutor
import time
def func(n):
time.sleep(2)
print(n)
return n*n
tpool = ThreadPoolExecutor(max_workers=5)
t_list = []
for i in range(20): #异步的
t = tpool.submit(func,i)
t_list.append(t)
tpool.shutdown() #close+join
print('主线程')
for t in t_list: print('***',t.result())
View Code
12.回调函数
from concurrent.futures import ThreadPoolExecutor
import time
def func(n):
time.sleep(2)
print(n)
return n*n
def callback(m):
print('结果是%s'%m.result())
tpool = ThreadPoolExecutor(max_workers=5)
for i in range(20):
tpool.submit(func,i).add_done_callback(callback)
View Code
协程
1.概念
#进程 启动多个进程 进程之间是由操作系统负责调用
#线程 启动多个线程 真正被cpu执行的最小单位是线程
#开启一个线程 创建一个线程 寄存器 堆栈
#关闭一个线程
#协程
#本质上是一个线程
#能够在多个任务之间切换来节省一些io时间
#协程中任务之间的切换时间消耗的时间 远远小于进程和线程之间的切换
#实现并发的手段
#进程和线程的任务切换由操作系统完成
#协程任务之间的切换由代码完成,只有遇到协程模块可识别的io操作,程序才会进程任务切换实现并发效果
from gevent import monkey;monkey.patch_all()
import time
import gevent
def task():
time.sleep(1)
print(123)
def sync():
for i in range(10):
task()
def asynct():
g_list = []
for i in range(10):
g = gevent.spawn(task)
g_list.append(g)
gevent.joinall(g_list)
sync()
asynct()
协程的概念
2.基于协程实现socket实验(可以重复开启多个client)
server端
import socket
from gevent import monkey;monkey.patch_all()
import gevent
def func(conn):
conn.send(b'hello')
msg = conn.recv(1024).decode('utf-8')
print(msg)
conn.close()
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
while True:
conn,addr = sk.accept()
gevent.spawn(func,conn)
sk.close()
client端
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8080))
msg = sk.recv(1024).decode('utf-8')
print(msg)
info = input('>>>').encode('utf-8')
sk.send(info)
sk.close()