复建第一次写的线程池
重构第一次写的线程池
最近没有什么学习欲望,修改之前的线程池的计划一直搁置,这几天比较闲,还是做了一次重构,由之前的2个类拆分为现在的4个类.
1、首先是工作线程类:TaskThread,此类为一个工作线程,用于完成一个工作任务,提供等待(wait),继续(proceed),绑定任务(bindTask)等方法
2、线程池类(ThreadPool),此类是一个单例,用于模拟一个池,并提一个同步方法getThread用于获取池中线程,若池已经空了,返回None
3、任务队列类(TaskQueue),此类是一个单例,对Queue进行了简单的封装
4、线程池管理类(ThreadPoolManager),此类负责从任务队列中获取任务,然后绑定到线程池中获取空闲线程中执行.
第一次重构算是完成了,目前就差一个调节负载的轮询线程,不过暂时没有想到一个好点的调节策略,只有搁置...
最近没有什么学习欲望,修改之前的线程池的计划一直搁置,这几天比较闲,还是做了一次重构,由之前的2个类拆分为现在的4个类.
1、首先是工作线程类:TaskThread,此类为一个工作线程,用于完成一个工作任务,提供等待(wait),继续(proceed),绑定任务(bindTask)等方法
#!/usr/bin/env python # -*- coding:utf8 -*- import threading class TaskThread(threading.Thread): def __init__(self): super(TaskThread,self).__init__() self._e = threading.Event() self.setDaemon(True) self.isReady(True) self.isActive(True) def run(self): while self.isActive(): try: self.task() except: pass finally: self.wait() def wait(self): self.isReady(True) self._e.clear() self._e.wait() def proceed(self): self.isReady(False) self._e.set() if not self.isAlive() and self.isActive(): self.start() def isReady(self,flag = None): if isinstance(flag,bool): self.is_ready = flag return self.is_ready def isActive(self,flag = None): if isinstance(flag,bool): self.is_active = flag return self.is_active def bindTask(self,task): self.task = task
2、线程池类(ThreadPool),此类是一个单例,用于模拟一个池,并提一个同步方法getThread用于获取池中线程,若池已经空了,返回None
#!/usr/bin/env python # -*- coding:utf8 -*- import threading from TaskThread import TaskThread class ThreadPool(object): __instance = None __lock = threading.Lock() def __init__(): pass @classmethod def getInstance(self): self.__lock.acquire() if not self.__instance: self.__instance = super(ThreadPool,self).__new__(self) self.__lock.release() return self.__instance def initPool(self,pool_min_size = 5,pool_max_size = 10): self.pool_min_size = pool_min_size self.pool_max_size = pool_max_size self.pool = [] for i in range(self.pool_min_size): self.pool.append(TaskThread()) def getThread(self): th = None self.__lock.acquire() for t in self.pool: if not t._e.isSet() and t.isReady(): t.isReady(False) th = t break self.__lock.release() if th is None and len(self.pool) < self.pool_max_size: th = TaskThread() self.pool.append(th) return th
3、任务队列类(TaskQueue),此类是一个单例,对Queue进行了简单的封装
#!/usr/bin/env python # -*- coding:utf8 -*- import threading from Queue import Queue class TaskQueue(object): __instance = None __lock = threading.Lock() def __init__(): pass @classmethod def getInstance(self): self.__lock.acquire() if not self.__instance: self.__instance = super(TaskQueue,self).__new__(self) self.__lock.release() return self.__instance def initQueue(self,task_queue_size = 100): self.tasks = Queue(task_queue_size) def getTask(self): try: return self.tasks.get(0) except: raise Exception,'This queue is empty.' def addTask(self,task): try: self.tasks.put(task,0) except: raise Exception,'This queue is full.'
4、线程池管理类(ThreadPoolManager),此类负责从任务队列中获取任务,然后绑定到线程池中获取空闲线程中执行.
#!/usr/bin/env python # -*- coding:utf8 -*- import threading class ThreadPoolManager(threading.Thread): def __init__(self,pool,tasks): super(ThreadPoolManager,self).__init__() self.setDaemon(True) self.pool = pool self.tasks = tasks def run(self): while True: t = self.pool.getThread() if t is not None: try: task = self.tasks.getTask() except: t.isReady(True) else: t.bindTask(task) t.proceed()
第一次重构算是完成了,目前就差一个调节负载的轮询线程,不过暂时没有想到一个好点的调节策略,只有搁置...