线程锁、threading.local(flask源码中用的到)、线程池、生产者消费者模型

一、线程锁

  线程安全,多线程操作时,内部会让所有线程排队处理。如:list/dict/Queue

  线程不安全 + 人(锁) => 排队处理

1、RLock/Lock:一次放一个

  a、创建10个线程,在列表中追加自己,如下代码:

    import threading
    v = []
    def func(arg):
        v.append(arg)
        print(v)
    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()

  b、创建10个线程,把自己添加到列表中,再读取列表的最后一个,如下代码:

    import threading
    import time

    v = []
    lock = threading.Lock()

    def func(arg):
        lock.acquire()   # 加锁
        v.append(arg)
        time.sleep(0.01)
        m = v[-1]
        print(arg,m)
        lock.release()   # 释放锁

    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()

  注意:RLock和Lock用法一样,只是Lock只能锁一次解一次,RLock支持锁多次解多次,以后用RLock。

2、BoundedSemaphore(n) ,信号量, 一次放n个,如下代码:

    import threading
    import time

    lock = threading.BoundedSemaphore(3)

    def func(arg):
        lock.acquire()  # 加锁
        time.sleep(1)
        print(arg)
        lock.release()  # 释放锁

    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()

3、condition(),一次放x个,x可由用户动态输入,代码如下:

  1)方式一:

    import time
    import threading

    lock = threading.Condition()

    def func(arg):
        print('线程进来了')
        lock.acquire()
        lock.wait() # 加锁
        print(arg)
        time.sleep(1)
        lock.release()

    for i in range(10):
        t =threading.Thread(target=func,args=(i,))
        t.start()

    while True:
        inp = int(input('>>>'))
        lock.acquire()
        lock.notify(inp)
        lock.release()

  2)方式二:

    import time
    import threading

    lock = threading.Condition()
    def f1():
        print('来执行函数了')
        input(">>>")
        return True

    def func(arg):
        print('线程进来了')
        lock.wait_for(f1)  # 等函数f1执行完毕后继续往下走
        print(arg)
        time.sleep(1)

    for i in range(10):
        t =threading.Thread(target=func,args=(i,))
        t.start()

4、Event,一次放所有,如下示例:

    import threading

    lock = threading.Event()

    def func(arg):
        print('线程来了')
        lock.wait() # 加锁:红灯
        print(arg)

    for i in range(10):
        t =threading.Thread(target=func,args=(i,))
        t.start()

    input(">>>")
    lock.set() # 绿灯

    lock.clear() # 再次变红灯

    for i in range(10):
        t =threading.Thread(target=func,args=(i,))
        t.start()

    input(">>>")
    lock.set()

总结:

       线程安全,列表和字典线程安全;

       为什么要加锁?    非线程安全,控制一段代码;

二、threading.local()

         作用:内部自动为每个线程维护一个空间(本质是字典),用于当前线程存取属于自己的值,保证线程之间的数据隔离。

    {

                     线程ID : { . . . },

                     线程ID : { . . . },

                     线程ID : { . . . },

                     线程ID : { . . . },

    }

    """
    以后:Flask框架内部看到源码 上下文管理

    """
    import time
    import threading
    INFO = {}
    class Local(object):
        def __getattr__(self, item):
            ident = threading.get_ident()
            return INFO[ident][item]

        def __setattr__(self, key, value):
            ident = threading.get_ident()
            if ident in INFO:
                INFO[ident][key] = value
            else:
                INFO[ident] = {key:value}

    obj = Local()

    def func(arg):
        obj.phone = arg # 调用对象的 __setattr__方法(“phone”,1)
        time.sleep(2)
        print(obj.phone,arg)

    for i in range(10):
        t =threading.Thread(target=func,args=(i,))
        t.start()
threading.local()的原理:
    import threading
    import time

    v = threading.local()

    def func(arg):
        v.phone = arg  # 内部会为当前线程创建一个空间用于存储:phone = 自己的值
        time.sleep(2)
        print(v.phone,arg)  # 去当前线程自己空间取值

    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()
threading.local()的使用:

三、线程池

       以后写代码不要一个一个创建线程,而是创建一个线程池,再去线程池申请线程去执行任务,如下示例:

    from concurrent.futures import ThreadPoolExecutor
    import time

    def task(a1,a2):
        time.sleep(2)
        print(a1,a2)

    # 创建了一个线程池(最多5个线程)
    pool = ThreadPoolExecutor(5)

    for i in range(40):
        # 去线程池中申请一个线程,让线程执行task函数。
        pool.submit(task,i,8)
四、生产者消费者模型

       三部分:生产者,消费者,队列

       队列:先进先出

       栈:后进先出

  问题1:生产者消费者模型解决了什么问题?不用一直等待的问题。如下示例:

    import time
    import queue
    import threading
    q = queue.Queue() # 线程安全

    def producer(id):
        """
        生产者
        :return:
        """
        while True:
            time.sleep(2)
            q.put('包子')
            print('厨师%s 生产了一个包子' %id )

    for i in range(1,4):
        t = threading.Thread(target=producer,args=(i,))
        t.start()

    def consumer(id):
        """
        消费者
        :return:
        """
        while True:
            time.sleep(1)
            v = q.get()
            print('顾客 %s 吃了一个%s' % (id,v))

    for i in range(1,3):
        t = threading.Thread(target=consumer,args=(i,))
        t.start()

五、面向对象补充(了解,以后不会写,flask源码中会遇到)

    class Foo(object):
        def __init__(self):
            self.name = 'alex'
        def __setattr__(self, key, value):
            print(key,value)
    obj = Foo()   # 结果为:name alex   (说明执行了Foo的__setattr__方法)
    # 分析:因为obj.x自动执行__setattr__
    print(obj.name)  # 报错
    # 分析:__setattr__方法中没有设置的操作,只有打印
示例一:
    class Foo(object):
        def __init__(self):
            object.__setattr__(self, 'info', {})  # 在对象中设置值的本质
        def __setattr__(self, key, value):
            self.info[key] = value
        def __getattr__(self, item):
            return self.info[item]
    obj = Foo()  
    obj.name = 'alex'
    print(obj.name)
示例二:

相关推荐