day37 GIL 和线程池 ,同步异步,回调函数 GIL 和线程池 ,同步异步,回调函数 GIL带来的问题 为什么需要GIL 进程池与线程池 同步异步-阻塞非阻塞 异步回调

GIL:

释义:
在CPython中,这个全局解释器锁,也称为GIL,是一个互斥锁,防止多个线程在同一时间执行Python字节码,这个锁是非常重要的,因为CPython的内存管理非线程安全的,很多其他的特性依赖于GIL,所以即使它影响了程序效率也无法将其直接去除

总结:
在CPython中,GIL会把线程的并行变成串行,导致效率降低

GIL带来的问题

首先必须明确执行一个py文件,分为三个步骤

  1. 从硬盘加载Python解释器到内存

  2. 从硬盘加载py文件到内存

  3. 解释器解析py文件内容,交给CPU执行

其次需要明确的是每当执行一个py文件,就会立即启动一个python解释器

day37 GIL 和线程池 ,同步异步,回调函数
GIL 和线程池 ,同步异步,回调函数
GIL带来的问题
为什么需要GIL
进程池与线程池
同步异步-阻塞非阻塞
异步回调

GIL,叫做全局解释器锁,加到了解释器上,并且是一把互斥锁,那么这把锁对应用程序到底有什么影响?

这就需要知道解释器的作用,以及解释器与应用程序代码之间的关系

py文件中的内容本质都是字符串,只有在被解释器解释时,才具备语法意义,解释器会将py代码翻译为当前系统支持的指令交给系统执行。

当进程中仅存在一条线程时,GIL锁的存在没有不会有任何影响,但是如果进程中有多个线程时,GIL锁就开始发挥作用了。如下图:

day37 GIL 和线程池 ,同步异步,回调函数
GIL 和线程池 ,同步异步,回调函数
GIL带来的问题
为什么需要GIL
进程池与线程池
同步异步-阻塞非阻塞
异步回调

开启子线程时,给子线程指定了一个target表示该子线程要处理的任务即要执行的代码。代码要执行则必须交由解释器,即多个线程之间就需要共享解释器,为了避免共享带来的数据竞争问题,于是就给解释器加上了互斥锁!

由于互斥锁的特性,程序串行,保证数据安全,降低执行效率,GIL将使得程序整体效率降低!

为什么需要GIL

day37 GIL 和线程池 ,同步异步,回调函数
GIL 和线程池 ,同步异步,回调函数
GIL带来的问题
为什么需要GIL
进程池与线程池
同步异步-阻塞非阻塞
异步回调

 

GIL与GC的孽缘

在使用Python中进行编程时,程序员无需参与内存的管理工作,这是因为Python有自带的内存管理机制,简称GC。那么GC与GIL有什么关联?

要搞清楚这个问题,需先了解GC的工作原理,Python中内存管理使用的是引用计数,每个数会被加上一个整型的计数器,表示这个数据被引用的次数,当这个整数变为0时则表示该数据已经没有人使用,成了垃圾数据。

当内存占用达到某个阈值时,GC会将其他线程挂起,然后执行垃圾清理操作,垃圾清理也是一串代码,也就需要一条线程来执行。

 

from threading import  Thread
def task():
    a = 10
    print(a)

# 开启三个子线程执行task函数
Thread(target=task).start()
Thread(target=task).start()
Thread(target=task).start()

GIL的加锁与解锁时机

加锁的时机:在调用解释器时立即加锁

解锁时机:

  • 当前线程遇到了IO时释放

  • 当前线程执行时间超过设定值时释放

进程池与线程池

什么是进程/线程池?

池表示一个容器,本质上就是一个存储进程或线程的列表

池子中存储线程还是进程?

如果是IO密集型任务使用线程池,如果是计算密集任务则使用进程池

为什么需要进程/线程池?

在很多情况下需要控制进程或线程的数量在一个合理的范围,例如TCP程序中,一个客户端对应一个线程,虽然线程的开销小,但肯定不能无限的开,否则系统资源迟早被耗尽,解决的办法就是控制线程的数量。

线程/进程池不仅帮我们控制线程/进程的数量,还帮我们完成了线程/进程的创建,销毁,以及任务的分配

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,os

# 创建进程池,指定最大进程数为3,此时不会创建进程,不指定数量时,默认为CPU和核数
pool = ProcessPoolExecutor(3)

def task():
    time.sleep(1)
    print(os.getpid(),"working..")

if __name__ == '__main__':
    for i in range(10):
        pool.submit(task) # 提交任务时立即创建进程

    # 任务执行完成后也不会立即销毁进程
    time.sleep(2)

    for i in range(10):
        pool.submit(task) #再有新任务是 直接使用之前已经创建好的进程来执行

线程池的使用:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from threading import current_thread,active_count
import time,os

# 创建进程池,指定最大线程数为3,此时不会创建线程,不指定数量时,默认为CPU和核数*5
pool = ThreadPoolExecutor(3)
print(active_count()) # 只有一个主线

def task():
    time.sleep(1)
    print(current_thread().name,"working..")

if __name__ == '__main__':
    for i in range(10):
        pool.submit(task) # 第一次提交任务时立即创建线程

    # 任务执行完成后也不会立即销毁
    time.sleep(2)

    for i in range(10):
        pool.submit(task) #再有新任务时 直接使用之前已经创建好的线程来执行

同步异步-阻塞非阻塞

同步异步-阻塞非阻塞,经常会被程序员提及,并且概念非常容易混淆!

阻塞非阻塞指的是程序的运行状态

阻塞:当程序执行过程中遇到了IO操作,在执行IO操作时,程序无法继续执行其他代码,称为阻塞!

非阻塞:程序在正常运行没有遇到IO操作,或者通过某种方式使程序即时遇到了也不会停在原地,还可以执行其他操作,以提高CPU的占用率

同步-异步 指的是提交任务的方式

同步指调用:发起任务后必须在原地等待任务执行完成,才能继续执行

异步指调用:发起任务后必须不用等待任务执行,可以立即开启执行其他操作

同步会有等待的效果但是这和阻塞是完全不同的,阻塞时程序会被剥夺CPU执行权,而同步调用则不会!

很明显异步调用效率更高,但是任务的执行结果如何获取呢?

程序中的异步调用并获取结果方式1:

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time

pool = ThreadPoolExecutor(3)
def task(i):
    time.sleep(0.01)
    print(current_thread().name,"working..")
    return i ** i

if __name__ == '__main__':
    objs = []
    for i in range(3):
        res_obj = pool.submit(task,i) # 异步方式提交任务# 会返回一个对象用于表示任务结果
        objs.append(res_obj)

# 该函数默认是阻塞的 会等待池子中所有任务执行结束后执行
pool.shutdown(wait=True)

# 从结果对象中取出执行结果
for res_obj in objs:
    print(res_obj.result())
print("over")

程序中的异步调用并获取结果方式2:

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time

pool = ThreadPoolExecutor(3)
def task(i):
    time.sleep(0.01)
    print(current_thread().name,"working..")
    return i ** i

if __name__ == '__main__':
    objs = []
    for i in range(3):
        res_obj = pool.submit(task,i) # 会返回一个对象用于表示任务结果
        print(res_obj.result()) #result是同步的一旦调用就必须等待 任务执行完成拿到结果
print("over")

异步回调

什么是异步回调

异步回调指的是:在发起一个异步任务的同时指定一个函数,在异步任务完成时会自动的调用这个函数

为什么需要异步回调

之前在使用线程池或进程池提交任务时,如果想要处理任务的执行结果则必须调用result函数或是shutdown函数,而它们都是是阻塞的,会等到任务执行完毕后才能继续执行,这样一来在这个等待过程中就无法执行其他任务,降低了效率,所以需要一种方案,即保证解析结果的线程不用等待,又能保证数据能够及时被解析,该方案就是异步回调

import requests,re,os,random,time
from concurrent.futures import ProcessPoolExecutor

def get_data(url):
    print("%s 正在请求%s" % (os.getpid(),url))
    time.sleep(random.randint(1,2))
    response = requests.get(url)
    print(os.getpid(),"请求成功 数据长度",len(response.content))
    #parser(response) # 3.直接调用解析方法  哪个进程请求完成就那个进程解析数据  强行使两个操作耦合到一起了
    return response

def parser(obj):
    data = obj.result()
    htm = data.content.decode("utf-8")
    ls = re.findall("href=.*?com",htm)
    print(os.getpid(),"解析成功",len(ls),"个链接")

if __name__ == '__main__':
    pool = ProcessPoolExecutor(3)
    urls = ["https://www.baidu.com",
            "https://www.sina.com",
            "https://www.python.org",
            "https://www.tmall.com",
            "https://www.mysql.com",
            "https://www.apple.com.cn"]
    # objs = []
    for url in urls:
        # res = pool.submit(get_data,url).result() # 1.同步的方式获取结果 将导致所有请求任务不能并发
        # parser(res)

        obj = pool.submit(get_data,url) # 
        obj.add_done_callback(parser) # 4.使用异步回调,保证了数据可以被及时处理,并且请求和解析解开了耦合
        # objs.append(obj)
        
    # pool.shutdown() # 2.等待所有任务执行结束在统一的解析
    # for obj in objs:
    #     res = obj.result()
    #     parser(res)
    # 1.请求任务可以并发 但是结果不能被及时解析 必须等所有请求完成才能解析
    # 2.解析任务变成了串行,

总结:异步回调使用方法就是在提交任务后得到一个Futures对象,调用对象的add_done_callback来指定一个回调函数,

如果把任务比喻为烧水,没有回调时就只能守着水壶等待水开,有了回调相当于换了一个会响的水壶,烧水期间可用作其他的事情,等待水开了水壶会自动发出声音,这时候再回来处理。水壶自动发出声音就是回调。

注意:

  1. 使用进程池时,回调函数都是主进程中执行执行

  2. 使用线程池时,回调函数的执行线程是不确定的,哪个线程空闲就交给哪个线程

  3. 回调函数默认接收一个参数就是这个任务对象自己,再通过对象的result函数来获取任务的处理结果