python 并发并行,多线程,队列 多任务系统 并发 并行 同步与异步 python中的线程模块

多任务系统可以同时运行多个任务。

单核cpu也可以执行多任务,由于cpu执行代码都是顺序执行的,那么cpu是怎么执行多任务的?

答案是操作系统轮流让各个任务交替执行

任务1执行0.01s切换任务2,任务2执行0.01s切换任务3.

依次类推,表面上看,每个任务都是交替执行的,但是由于cpu执行速度实在太快,感觉上就是所有任务同时执行。

并发

并发 任务数多于cpu核数,通过操作系统的各种任务调度算法,实现多个任务一起执行,而实际上总有一些任务不在执行,因为切换速度够快,看上去一起执行

并行

并行 任务数量小于cpu核数,任务是真正的一起执行的

并发 10个客人点餐,1个服务员应对

并行 10个客人点餐,10个服务员应对

串行

先执行a,再执行b

同步与异步

同步

同步协调,指线程在访问某一资源类的时候,获得了该资源的返回结果之后才会执行其他操作,先做某件事,在做某件事,一步一步来

异步

异步 提交注册数据----校验数据----注册成功---登陆,校验校验数据之后即发送账号激活---邮件激活,两者同时发生,不用的等待注册总流程跑完再进行激活

python中的线程模块

threading

python的thread是较底层的模块,threading是对thread的一些封装。

通过target=指定开启线程的函数创建线程对象

def func1():
   for i in range(5):
      print(F"--{threading.current_thread()}---正在执行func1")
      time.sleep(1)

def func2():
   for i in range(6):
      print(F"--{threading.current_thread()}--------整在执行func2")
      time.sleep(1)

def main():
   t1 = threading.Thread(target=func1)
   t2 = threading.Thread(target=func2)
   s_time = time.time()

   print(t1.name)  # 可以直接根据name去获取,getName setName 取名字,设置线程名字
   t1.start()  # 开始执行线程1
   t2.start()  # 开始执行线程2
   # func1()
   # func2()
   print(threading.enumerate())
   print(threading.active_count())
   # 让主线程等待子线程执行完毕之后再继续往下执行
   t1.join()  # 主线程等待1秒之后再继续执行  不写,主线程等到t1完了之后再执行
   t2.join()
   end_time = time.time()
   print("总耗时:", end_time - s_time)

if __name__ == '__main__':
   main()

调用threading.Thread(target=开启线程的目标函数) 返回为一个线程对象,

通过继承Thread类重写run方法来开启线程

编写一个类,之后继承threading.Thread 并重写run方法。

至于为什么重写,来看一段thread的原码

def run(self):
    """Method representing the thread's activity.

    You may override this method in a subclass. The standard run() method
    invokes the callable object passed to the object's constructor as the
    target argument, if any, with sequential and keyword arguments taken
    from the args and kwargs arguments, respectively.

    """
    try:
        if self._target:
            self._target(*self._args, **self._kwargs)
    finally:
        # Avoid a refcycle if the thread is running a function with
        # an argument that has a member that points to the thread.
        del self._target, self._args, self._kwargs

这其中的self._target是是thread的类变量,在thread类被初始化的时候由target传入,if判断如果被传入,即调用被传入的目标函数。

而在写自己的方法类时,对run方法进行重写之后,便不在需要传入参数target,

第二种方式创建的实例代码。

# 第二种创建多线程的方法
# 继承thread类然后重写run方法
class RequestThread(threading.Thread):
   '''发送request请求'''
   def __init__(self,url):  # 可以通过重写init方法来做到参数化,
      self.url = url
      super().__init__()  # 但是不能忘掉要执行父类的init方法

   def run(self):
      for i in range(10):
         res = requests.get(self.url)
         print(F'线程:{threading.current_thread()}返回的状态嘛{res.status_code}')


s_time = time.time()
for i in range(5):
   t = RequestThread(url="")  # 调用的时候需要传入参数进去
   t.start()
t.join()
end_time = time.time()
print("耗时:",end_time-s_time)

补充:如果想要对线程操作类实现参数化进行复杂操作。

编写init方法时声明参数,同时不能忘记继承并执行父类的init方法,父类thread在类初始化时做了相当多的配置达到多线程运行的目的,不写的话会报错

线程类常用方法

run() 用以表示线程活动的方法

start() 启动线程活动

join([time]) 设置主线程会等待time秒后再往下执行,time默认为子线程结束

多个子线程之间的设置会叠加

isAlive() 返回线程是否活动的

getName() 返回线程名

setName() 设置线程名

threading.currentThread() 返回当前执行的线程

threading.enumerate() 返回正在运行的所有线程(list),正在运行 指的是线程启动后,结束前,不包括启动前和终止之后的线程

threading.activateCount() 返回正在运行的线程数量。。

多线程的bug---数据安全

多线程中如果使用全局变量,数据会变的不安全不稳定

原因: 子线程和总线程使用的是同一块内存空间,在主线程中存在的变量为全局变量,各个线程都可以使用,

python只支持单核,通过多任务之间快速切换,任务量少的时候是完全没问题的。

但是在任务量级特别大的时候,线程之间的切换就会发生问题,数据产生偏差

从10W量级时开始出现问题。

下面看代码实例

# 全局变量
a = 100

# 10W的时候就已经开始出现问题
# 为什么?
# python里面线程只支持单核
# 多任务之间快速切换,任务量少可以杜绝误差
def func1():
   global a
   for i in range(1000000):
      a += 1
   print("线程1修改玩A:",a)

print(a)
# 并发,但是python中只有一个核,是不可能完全并行的
# 所以线程之间必定存在切换,量级十分巨大时切换就会发生问题


def func2():
   global a
   for i in range(1000000):
      a += 1
   print("线程2修改玩A:",a)
t1 = threading.Thread(target=func1)
t2 = threading.Thread(target=func2)
t1.start()
t2.start()

t1.join()
t2.join()
print(a)
# 执行结果
# 100
# 线程1修改玩A: 1203921
# 线程2修改玩A: 1287478
# 1287478

互斥锁

为了解决这样的线程互相竞争资源导致数据不稳定不安全的问题,引入锁的概念。

线程同步能够邦正多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。

互斥锁为资源引入一个状态,锁定/非锁定

某个线程要更改共享数据时,先将其锁定,此时资源状态为锁定,其他线程不能更改知道该线程释放资源。

将资源的状态改变成 非锁定 其他的线程才能再次锁定该资源进行操作。,

互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程下数据的正确性。

threading 模块中定义了Lock类,可以方便的处理锁定

# 创建锁
mutex = threading.Lock() 
# 锁定方法
mutex.acquire()
# 解锁
mutex.release()
# 上锁之后不会切换到其他线程,只有被释放之后才可以切换到其他线程

上锁的过程

当一个线程调用锁的acquire(0方法获得锁时,锁进入locked状态

每次只有一个线程可以获得锁,如果此时另一个线程视图获得这个锁,该线程就会变成blocked被阻塞,知道拥有锁的线程调用锁的release()释放锁之后,锁进入unlocked状态。

线程调度程序从处于通同步阻塞状态的线程中选一个来获得锁,并使得该线程进入 running状态。

总结

锁的好处

确保了某段关键代码只能由一个线程从头到尾完整执行

锁的坏处

组织了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率大大下降,犹豫可以存在多个锁,不同线程持有不同的锁,并试图跟获取对象持有的锁时,可能会造成线程死锁,导致程序直接阻塞不向下运行

正案例

来看一个线程锁的应用实例。

# python里面线程只支持单核
# 多任务之间快速切换,任务量少可以杜绝误差
def func1():
   global a
   for i in range(1000000):
      meta.acquire()  # 上锁
      a += 1
      meta.release()  # 释放锁
   print("线程1修改玩A:", a)
# 上锁之后必须要release,
# 互斥锁,只有一个人能上  加上锁之后,任务运行速度会下降
# ,因为必须要等一个线程release之后才可以
# 让下一个线程进行使用
def func2():
   global a
   for i in range(1000000):
      meta.acquire()  # 上锁
      # 线程锁,执行完成之后,才会进入另外一个线程 ,从而保证数据在执行的时候不会造成数据影响
      a += 1
      meta.release()  # 释放锁

   print("线程2修改玩A:", a)


# 创建锁
meta = threading.Lock()


t1 = threading.Thread(target=func1)
t2 = threading.Thread(target=func2)
t1.start()
t2.start()

t1.join()
t2.join()
print(a)

死锁案例

def func1():
   global a
   for i in range(1000000):
      metaA.acquire()  # 上锁A
      metaB.acquire()  # 上锁B
      print("-----------------1")
      a += 1
      metaB.release()  # 解锁B
      metaA.release()  # 解锁A
   print("线程1修改玩A:", a)
# 两边互相等待对方线程释放资源,就会无限等待,两把锁互相等待的情况下会出现死锁
def func2():
   global a
   for i in range(1000000):
      metaB.acquire()  # 上锁B
      metaA.acquire()  # 上锁A
      print("-----------------2")

      a += 1
      metaA.release()  # 解锁A
      metaB.release()  # 解锁B
   print("线程2修改玩A:", a)
   # 线程死锁案例
metaA = threading.Lock()
metaB = threading.Lock()

该案例中,两边互相等待对方线程释放资源,就会造成无限等待,行程死锁。

全局解释器锁 GIL

官方文档中,搜索关键字 怎样修改全局变量是线程安全的?

不用自己操作跟定义, 控制线程运行的

描述 gil的概念,以及它对python多线程的影响

参考哦

1,python语言和GIL没有关系,仅仅是犹豫历史原因在Cpython虚拟机(解释器)难以移除GIL

2,GIL 全局解释器,每个线程在执行的过程都需要先获取GIL,保证同一时刻只有一个线程可以执行代码

3,线程释放GIL锁的情况,

---在io操作等可能会引起阻塞的 system call之前,可以暂时释放GIL,但在执行完毕后

必须重新获取 GIL

--python 3使用计时器,执行时间达到阈值之后 当前线程释放GIL 或python 2.x 技tickets技术达到100

4 python使用多进程是可以利用多核的cpu资源的

多线程与单线程分别更适合什么

1.io密集型:涉及到网络、磁盘io的任务都是io密集型任务,这类任务的特点是cpu消耗很少,任务的大部分的

时间都在等待io操作完成(因为io的速度远远低于cpu和内训的速度)

结论:io密集型操作,多线程比单线程要快

2.cpu密集型:cpu密集型也称为计算密集型,任务的特点是要进行大量的计算,消耗cpu资源,比如

计算圆周率、对视频进行高清解码等等,全靠cpu的运算能力

结论:cpu密集型操作,单线程比多线程要快

队列

python的queue模块中提供了同步的,线程安全的队列类,

1,FIFO 先进先出 queue

2,LIFO 后进先出队列 LifoQueue

3,PriorityQueue 按照优先级

能够在多线程中直接使用,

可以用队列来实现线程间的同步。

 

初始化queue() 对象时, q = queue()

如果参数中没有指定最大可接受消息量,货数量为负值

则代表可接受的消息没有上限

import queue
# 队列中的方法
# def task_done()  表示在完成一项工作之后,使用Queue.task.done()方法可以向队列发送一个信号
# 表示该任务执行完毕
# def join()  实际上意味着等到队列中所有的任务(数据) 执行完毕之后,再往下,否则一直等待
#### join是判断的依据,不单单指的是队列中没有数据,数据get出去之后,要使用task_done()
#### 向队列发送一个信号,表示该任务执行(数据使用)完毕

# def qsize()  返回队列包含信息数量
# def empty()  判断队列是否为空, 为空返回true ,不为空返回fasle
# def full()   判断队列是否已满,满为true 不满为false
# def put()
   # param1 block=表示是否等待True/false 默认true  timeout = 等待时间 默认None

# def get()  从队列中获取数据
   # param1 block=表示是否等待True/false 默认true  timeout = 等待时间 默认None
# def put_nowait()  相当于put方法,设置block=false 不进行等待
# def get_nowait()  相当于get方法,设置blcok=false 不进行等待

# 三种队列
# 1,先进先出 FIFO
# 先进去的数据先出来
# 有点类似于竹筒塞球
q = queue.Queue(4)  # 创建队列的时候可以指定长度,如果不写可以无限塞
# 往队列中添加数据
q.put(1)
q.put(11)
q.put(33)
# q.put(55,block=False)  # 队列满了之后会等待吧数据拿出去之后再添加
# 队列已满的话会报错,
# queue.Full 如果设置不等待,满了之后就会报错,异常为queue.Full
# print(q.get())
# print(q.get())
# print(q.get())
# # print(q.get(block=False)) # get方法默认等待,与添加的时候一样 除非修改block的参数值
# print(q.get_nowait())  # 简便写法

q.put(12)
print(q.qsize())  # 获取队列中的数据量
print(q.full())  # 判断队列是否已满
print(q.empty())  # 判断队列是否为空
q.task_done()
q.task_done()
q.task_done()
q.task_done()
# join 判断队列中的任务是否执行完毕
q.join() # 队列里的所有任务全部执行完毕才会接着向下执行,否则会阻塞
print("join之后的代码")  # 比如之前用了4个put,对应的要写4个taskdone来通知队列操作完成
# 阻塞的原因,python认为队列中的任务没有执行完
             # 解决方法,使用taskdone方法,通知队列任务执行完毕

# 2,后入先出 LIFO
# 后塞进去的数据先出来
# 有点类似弹夹上弹