进程/线程/协程

概念:

进程:运行中的程序    有生命周期 关掉程序就销毁了
进程调度:1)先来先服务算法
     2)短作业优先调度算法
     3)时间片轮转法
     4)多级反馈队列
并发:多个程序一起运行,轮流交替使用资源,独木桥 a先走一段 然后让给b走
并行:指两者同时执行,赛跑,两个人同时往前跑
阻塞:input sleep 文件的输入输出 网络请求 recv accept
同步:一个任务的完成需要依赖另一个任务,等到被依赖的任务完成后,依赖的任务才算完成 等待的过程什么都不干
异步:不需要等待被依赖的任务完成,当等待被依赖的任务完成时,立即执行 等待的过程可以干别的事
同步阻塞:一直等待
异步阻塞:可以干别的事
同步非阻塞:来回切换观察
异步非阻塞:可以干别的事。等待别人通知
多进程 :同时开启多个运行程序

进程:

1.创建进程:

from multiprocessing import Process
import os

def func(m,n):
    print(m,n)
    print('获取子进程的pid:', os.getpid())
    print('获取子进程的父进程pid:', os.getppid())

if __name__ == '__main__':
    p = Process(target=func,args=('参数1','参数2'))     #创建一个对象   传递一个参数是需要后面加,
    p.start()                              #启动一个子进程    后面的print也同时执行  所以是异步的
    print('获取此进程的pid:',os.getpid())
    print('获取此进程的父进程pid:', os.getppid())
方法一
from multiprocessing import Process

class MyProcess(Process):       #创建一个类,继承Process类
    def __init__(self,args1,args2):
        super().__init__()
        self.args1 = args1
        self.args2 = args2

    def run(self):              #必须实现一个run方法,run方法中是在子进程中的代码
        print(self.args1)
        print(self.args2)
if __name__ == '__main__':
    p = MyProcess(1,2)
    p.start()
View Code

2.进程的几个方法:

join
from multiprocessing import Process
import time

def func(n):
    print(n)
    time.sleep(2)


if __name__ == '__main__':
    p = Process(target=func,args=(1,))
    p.start()
    p.join()                   #join方法  感知子进程的结束,才执行之后的print
    print('hello')
View Code

3.开启多个子进程:

from multiprocessing import Process
import time

def func(s):
    print('子进程%s'%s)
    time.sleep(2)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=func,args=(i,))
        p.start()
View Code

>>基于多进程实现scoket实验(可以重复开启多个client)

server端

import socket
from multiprocessing import Process
def serve(conn):
    ret = '你好'.encode('utf-8')
    conn.send(ret)
    msg = conn.recv(1024).decode('utf-8')
    print(msg)
    conn.close()
if __name__ == '__main__':
    sk = socket.socket()
    sk.bind(('127.0.0.1',8080))
    sk.listen()
    while True:
        conn,addr = sk.accept()
        p = Process(target=serve,args=(conn,))
        p.start()

    sk.close()

client端 

import socket

sk=socket.socket()

sk.connect(('127.0.0.1',8080))
msg = sk.recv(1024).decode('utf-8')
print(msg)
msg2= input('>>>').encode('utf-8')
sk.send(msg2)


sk.close()

4. 多进程之间数据隔离问题

from  multiprocessing import  Process

n = 100
def func():
    global n
    n = n-1
    return 111

if __name__ == '__main__':
    n_l = []
    for i in range(100):
        p = Process(target=func)
        p.start()
        n_l.append(p)
    for p in n_l : p.join()
    print(n)

#多进程之间的数据是隔离的不共享的
View Code

5.守护进程

#守护进程随着主进程代码执行结束而结束
#守护线程会在主线程结束之后等待子线程结束而结束
from multiprocessing import Process
import time
def func():
    while True:
        time.sleep(0.2)
        print('我还活着')

if __name__ == '__main__':
    p = Process(target=func)
    p.daemon = True             #设置子线程为守护进程
    p.start()
    time.sleep(2)
    print('主进程结束')
#守护进程会随着 主进程的代码执行完毕而结束
#在主进程结束一个子进程 p.terminate()   结束一个进程并不是执行方法后立即生效,需要一个操作系统效应的过程
#p.name获取进程的名字
View Code

6.进程锁

lock = Lock() # 创造了一把锁
lock.acquire() # 获取了这把锁的钥匙
lock.release() # 归还这把锁的钥匙

import json
import time
from multiprocessing import Process
from multiprocessing import Lock,Manager

def show(i):
    with open('ticket') as f:
        dic=json.load(f)
    print("余票 %s"%dic['ticket'])

def buy_ticket(i,lock):
    lock.acquire()
    with open('ticket') as f:
        dic = json.load(f)  #文件用load 字符串用loads
        time.sleep(0.1)
    if dic['ticket']>0:
        dic['ticket'] -= 1
        print("买到票了%s"%i)
    else:
        print("票卖完了%s"%i)
    time.sleep(0.1)
    with open('ticket','w') as f:
        json.dump(dic,f)
    lock.release()
if __name__ == '__main__':
    for i in range(10):
        p = Process(target=show,args=(i,))
        p.start()
    lock = Lock()
    for i in range(10):
        p = Process(target=buy_ticket,args=(i,lock))
        p.start()
---------------------------------------------------
ticket

{"ticket": 1}
View Code

7.信号量

from multiprocessing import Process,Semaphore
import time

def func(i,sem):
    sem.acquire()
    print('进程%s'%i)
    time.sleep(4)
    sem.release()

if __name__ == '__main__':
    sem = Semaphore(4)     #允许开启四个进程每一次
    for i in range(10):
        p = Process(target=func,args=(i,sem))
        p.start()
View Code

8.事件(红绿灯案例)

'''
from multiprocessing import Event

e = Event()   #创建一个事件
print(e.is_set())  #查看一个事件
e.set()   #更改事件的状态为true
print(e.is_set())
e.wait()     #依据e.is_set的值来决定是否阻塞
print(123)
e.clear()#将事件的状态更改为false

'''

from multiprocessing import Event,Process
import time
import random
def cars(e,i):
    if not e.is_set():
        print('car %i在等待通行'%i)
        e.wait()    #阻塞 直到一个事件状态改变

    else:
        print('car %i可通行' % i)
def light(e):
    while True:
        if e.is_set():
            print('绿灯亮了')
            time.sleep(2)
            e.clear()
        else:
            print('红灯亮了')
            time.sleep(2)
            e.set()

if __name__ == '__main__':
    e = Event()
    p = Process(target=light,args=(e,))
    p.start()
    for i in range(20):
        car = Process(target=cars,args=(e,i))
        car.start()
        time.sleep(random.random())
View Code

9.队列

from multiprocessing import Queue,Process
def produce(q):
    q.put('hello')

def consume(q):
    print(q.get())

if __name__ == '__main__':
    q = Queue()
    p = Process(target=produce,args=(q,))
    p.start()
    c = Process(target=consume, args=(q,))
    c.start()
View Code

10.管道

from multiprocessing import Pipe,Process

def func(conn1,conn2):
    conn2.close()
    while True:
        try:
            msg = conn1.recv()
            print(msg)
        except EOFError:
            conn1.close()
            break


if __name__ == '__main__':
    conn1, conn2 = Pipe()
    Process(target=func,args=(conn1, conn2)).start()
    conn1.close()
    for i in range(20):
        conn2.send('吃了么')
    conn2.close()
View Code

11.进程池

import os
from multiprocessing import Pool
import time
def func(n):
    print('start func%s'%n,os.getpid())
    time.sleep(1)
    print('end func%s'%n,os.getpid())

if __name__ == '__main__':
    p = Pool(5)
    for i in range(10):
        p.apply(func,args=(i,))   #同步
-------------------------------------------------------------
import os
from multiprocessing import Pool
import time
def func(n):
    print('start func%s'%n,os.getpid())
    time.sleep(1)
    print('end func%s'%n,os.getpid())

if __name__ == '__main__':
    p = Pool(5)
    for i in range(10):
        p.apply(func,args=(i,))   #异步
    p.close()   #结束进程池接受任务
    p.join()   #感知进程池中的任务执行结束
View Code

>>基于进程池实现scoket实验(可以重复开启多个client)

server端

import socket
from multiprocessing import Pool

def func(conn):
    conn.send(b'hello')
    print(conn.recv(1024).decode('utf-8'))
    conn.close()

if __name__ == '__main__':
    p = Pool(5)
    sk= socket.socket()
    sk.bind(('127.0.0.1',8080))
    sk.listen()

    while True:
        conn, addr = sk.accept()
        p.apply_async(func,args=(conn,))

    sk.close()

client端

import socket

sk = socket.socket()
sk.connect(('127.0.0.1',8080))

ret = sk.recv(1024).decode('utf-8')
print(ret)

msg = input('>>>').encode('utf-8')
sk.send(msg)

sk.close()

12.进程池回调函数

from multiprocessing import Pool
import os

def func1(n):
    print('in func1',os.getpid())
    return n*n                                          #子进程

def func2(nn):
    print('in func2',os.getpid())
    print(nn)                                            #主进程

if __name__ == '__main__':
    print('主进程:',os.getpid())
    p = Pool(5)
    for i in range(10):
        p.apply_async(func1,args=(10,),callback=func2)   #callback回调
    p.close()
    p.join()
View Code

13.生产者消费者模型

import time
import random
from multiprocessing import Process,Queue

def produce(q,name,food):
    for i in range(4):
        time.sleep(random.randint(1, 3))
        f = '%s生产%s%s' % (name, food, i)
        print(f)
        q.put(f)


def consumer(q,name):
    while True:
        food = q.get()
        if food is None:
            print("%s获得一个空"%name)
            break
        print('%s吃了%s'%(name,food))

if __name__ == '__main__':

    q = Queue(20)
    p = Process(target=produce,args=(q,'小白','肉包子'))
    p2 = Process(target=produce, args=(q, '小黑', '菜包子'))
    c1 = Process(target=consumer,args=(q,''))
    c2 = Process(target=consumer, args=(q,''))
    p.start()
    p2.start()
    c1.start()
    c2.start()
    p.join()
    p2.join()
    q.put(None)
    q.put(None)
View Code

线程:

1.创建线程

from threading import Thread
import time

def func(n):
    time.sleep(1)
    print(n)

for i in range(10):
    t = Thread(target=func, args=(i,))
    t.start()
方法一
from threading import Thread
import time

class MyThread(Thread):
    def __init__(self,arg):
        super().__init__()
        self.arg = arg

    def run(self):
        time.sleep(1)
        print(self.arg)

t=MyThread(10)
t.start()
方法二

2.数据共享问题

from threading import Thread
import os

def func(n):
    global g
    g = 0
    print(g,os.getpid())

g=100
t_list=[]
for i in range(10):
    t = Thread(target=func, args=(i,))
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print(g)
#多线程的数据是共享的
View Code

3.>>基于多线程实现scoket实验(可以重复开启多个client)

server端

import socket
from threading import Thread

def chat(conn):
    conn.send(b'hello')
    msg = conn.recv(1024).decode('utf-8')
    print(msg)
    conn.close()

sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
while True:
    conn,addr = sk.accept()
    Thread(target=chat,args=(conn,)).start()

sk.close()

client端

import socket

sk = socket.socket()
sk.connect(('127.0.0.1',8080))


msg = sk.recv(1024).decode('utf-8')
print(msg)
info = input('>>>').encode('utf-8')
sk.send(info)

sk.close()

4.守护线程

#守护进程随着主进程代码执行结束而结束
#守护线程会在主线程结束之后等待子线程结束而结束
from threading import Thread
import time

def func1():
    print(666)
    time.sleep(2)
def func2():
    print('in func2')
    time.sleep(2)

t = Thread(target=func1,)
t.daemon = True
t.start()
print('主线程')
t2 = Thread(target=func2,)
t2.start()
View Code

5.线程锁

#死锁现象
from threading import Thread,Lock
import time
noodle_lock = Lock()
fork_lock = Lock()
def func(name):
    noodle_lock.acquire()
    print('%s拿到面条'%name)
    fork_lock.acquire()
    print('%s拿到叉子'% name)
    print('%s吃面'%name)
    fork_lock.release()
    noodle_lock.release()

def func1(name):
    fork_lock.acquire()
    print('%s拿到叉子' % name)
    time.sleep(5)
    noodle_lock.acquire()
    print('%s拿到面条' % name)
    print('%s吃面' % name)
    noodle_lock.release()
    fork_lock.release()
Thread(target=func,args=('1号顾客',)).start()
Thread(target=func1,args=('2号顾客',)).start()
Thread(target=func,args=('3号顾客',)).start()
Thread(target=func1,args=('4号顾客',)).start()
死锁现象(互斥锁lock)
from threading import Thread,RLock
import time
noodle_lock = fork_lock = RLock()

def func(name):
    noodle_lock.acquire()
    print('%s拿到面条'%name)
    fork_lock.acquire()
    print('%s拿到叉子'% name)
    print('%s吃面'%name)
    time.sleep(2)
    fork_lock.release()
    noodle_lock.release()

def func1(name):
    fork_lock.acquire()
    print('%s拿到叉子' % name)
    noodle_lock.acquire()
    print('%s拿到面条' % name)
    print('%s吃面' % name)
    time.sleep(2)
    noodle_lock.release()
    fork_lock.release()
Thread(target=func,args=('1号顾客',)).start()
Thread(target=func1,args=('2号顾客',)).start()
Thread(target=func,args=('3号顾客',)).start()
Thread(target=func1,args=('4号顾客',)).start()
递归锁(rlock)

6.信号量

from  threading import Thread,Semaphore
import time

def func(sem,a,b):
    sem.acquire()
    time.sleep(1)
    print(a+b)
    sem.release()

sem = Semaphore(4)
for i in range(10):
    t = Thread(target=func, args=(sem,i, i + 5))
    t.start()
View Code

7.事件 

#事件被创建的时候是false状态
#flase状态 wait()阻塞
#true状态 wait()非阻塞
#set false状态设置为true
#clear 设置状态为false

#连接数据库
#检测数据库的可连接情况

from threading import Thread,Event
import time
import random

def connect_db(e):
    count = 0
    while count<3:
        e.wait(0.5)   #状态为flase的时候 只等待0.5s就结束
        if e.is_set() == True:
            print('连接成功')
            break
        else:
            count += 1
            print('第%s次连接失败'%count)
    else:
        raise TimeoutError
def check_web(e):
    time.sleep(random.randint(0,3))
    e.set()


e = Event()
t1 = Thread(target=connect_db,args=(e,))
t2 = Thread(target=check_web,args=(e,))
t1.start()
t2.start()
View Code

 8.条件

#notify(int数据类型) 造钥匙
from threading import Condition,Thread

def func(con,i):
    con.acquire()
    con.wait()             #等钥匙
    print('在第%s个循环里'%i)
    con.release()
con = Condition()
for i in range(10):
    Thread(target=func,args=(con,i)).start()

while True:
    num = int(input('>>>'))
    con.acquire()
    con.notify(num)          #造钥匙
    con.release()
View Code

 9.定时器

from  threading import Timer
import time
def func():
    print('时间同步')

while True:
    Timer(5,func).start()
    time.sleep(5)
View Code

10.队列

import queue
q = queue.Queue() # 队列 先进先出
q.put(1)
q.put(2)
print(q.get())
print(q.get())
# q.put_nowait()
# q.get_nowait()

q = queue.LifoQueue() #栈 先进后出
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())

q = queue.PriorityQueue()  #优先级队列 数字越小优先级越高 相同优先级按ascii排
q.put((20,'a'))
q.put((10,'b'))
q.put((1,'c'))
q.put((1,'q'))
print(q.get())
View Code

11.线程池

from concurrent.futures import ThreadPoolExecutor
import time

def func(n):
   time.sleep(2)
   print(n)
   return n*n

tpool = ThreadPoolExecutor(max_workers=5)
t_list = []
for i in range(20):     #异步的
    t = tpool.submit(func,i)
    t_list.append(t)
tpool.shutdown()   #close+join
print('主线程')
for t in t_list: print('***',t.result())
View Code

12.回调函数

from concurrent.futures import ThreadPoolExecutor
import time

def func(n):
   time.sleep(2)
   print(n)
   return n*n
def callback(m):
    print('结果是%s'%m.result())
tpool = ThreadPoolExecutor(max_workers=5)
for i in range(20):
    tpool.submit(func,i).add_done_callback(callback)
View Code

协程

1.概念

#进程 启动多个进程 进程之间是由操作系统负责调用
#线程 启动多个线程 真正被cpu执行的最小单位是线程
#开启一个线程 创建一个线程 寄存器 堆栈
#关闭一个线程

#协程
#本质上是一个线程
#能够在多个任务之间切换来节省一些io时间
#协程中任务之间的切换时间消耗的时间 远远小于进程和线程之间的切换
#实现并发的手段
#进程和线程的任务切换由操作系统完成
#协程任务之间的切换由代码完成,只有遇到协程模块可识别的io操作,程序才会进程任务切换实现并发效果

from gevent import monkey;monkey.patch_all()
import time
import gevent

def task():
    time.sleep(1)
    print(123)

def sync():
    for i in range(10):
        task()

def asynct():
    g_list = []
    for i in range(10):
        g = gevent.spawn(task)
        g_list.append(g)
    gevent.joinall(g_list)

sync()
asynct()
协程的概念

2.基于协程实现socket实验(可以重复开启多个client)

server端

import socket
from gevent import monkey;monkey.patch_all()
import gevent

def func(conn):
    conn.send(b'hello')
    msg = conn.recv(1024).decode('utf-8')
    print(msg)
    conn.close()

sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
while True:
    conn,addr = sk.accept()
    gevent.spawn(func,conn)
sk.close()

client端

import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8080))

msg = sk.recv(1024).decode('utf-8')
print(msg)
info = input('>>>').encode('utf-8')
sk.send(info)
sk.close()

相关推荐