Python线程池的实现 发表于 2017-02-07 | 面试了Python开发,对于多线程的理解还是有不少问题。恶补了一下午,跟着Demo做了一个线程池,还是能达到效果。 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231#coding:utf8import Queueimport sysimport threadingimport traceback# 定义一些Exception,用于自定义异常处理class NoResultsPending(Exception): """All works requests have been processed""" passclass NoWorkersAvailable(Exception): """No worket threads available to process remaining requests.""" passdef _handle_thread_exception(request, exc_info): """默认的异常处理函数,只是简单的打印""" traceback.print_exception(*exc_info)# classesclass WorkerThread(threading.Thread): """后台线程,真正的工作线程,从请求队列(requestQueue)中获取work, 并将执行后的结果添加到结果队列(resultQueue)""" def __init__(self, requestQueue, resultQueue, poll_timeout=5, **kwds): threading.Thread.__init__(self, **kwds) '''设置为守护进行''' self.setDaemon(True) self._requestQueue = requestQueue self._resultQueue = resultQueue self._poll_timeout = poll_timeout '''设置一个flag信号,用来表示该线程是否还被dismiss,默认为false''' self._dismissed = threading.Event() self.start() def run(self): '''每个线程尽可能多的执行work,所以采用loop, 只要线程可用,并且requestQueue有work未完成,则一直loop''' while True: if self._dismissed.is_set(): break try: ''' Queue.Queue队列设置了线程同步策略,并且可以设置timeout。 一直block,直到requestQueue有值,或者超时 ''' request = self._requestQueue.get(True, self._poll_timeout) except Queue.Empty: continue else: '''之所以在这里再次判断dimissed,是因为之前的timeout时间里,很有可能,该线程被dismiss掉了''' if self._dismissed.is_set(): self._requestQueue.put(request) break try: '''执行callable,讲请求和结果以tuple的方式放入requestQueue''' result = request.callable(*request.args, **request.kwds) print self.getName() self._resultQueue.put((request, result)) except: '''异常处理''' request.exception = True self._resultQueue.put((request, sys.exc_info())) def dismiss(self): '''设置一个标志,表示完成当前work之后,退出''' self._dismissed.set()class WorkRequest: ''' @param callable_:,可定制的,执行work的函数 @param args: 列表参数 @param kwds: 字典参数 @param requestID: id @param callback: 可定制的,处理resultQueue队列元素的函数 @param exc_callback:可定制的,处理异常的函数 ''' def __init__(self, callable_, args=None, kwds=None, requestID=None, callback=None, exc_callback=_handle_thread_exception): if requestID == None: self.requestID = id(self) else: try: self.requestID = hash(requestID) except TypeError: raise TypeError("requestId must be hashable") self.exception = False self.callback = callback self.exc_callback = exc_callback self.callable = callable_ self.args = args or [] self.kwds = kwds or {} def __str__(self): return "WorkRequest id=%s args=%r kwargs=%r exception=%s" % \ (self.requestID, self.args, self.kwds, self.exception)class ThreadPool: ''' @param num_workers:初始化的线程数量 @param q_size,resq_size: requestQueue和result队列的初始大小 @param poll_timeout: 设置工作线程WorkerThread的timeout,也就是等待requestQueue的timeout ''' def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): self._requestQueue = Queue.Queue(q_size) self._resultQueue = Queue.Queue(resq_size) self.workers = [] self.dismissedWorkers = [] self.workRequests = {} # 设置个字典,方便使用 self.createWorkers(num_workers, poll_timeout) def createWorkers(self, num_workers, poll_timeout=5): '''创建num_workers个WorkThread,默认timeout为5''' for i in range(num_workers): self.workers.append(WorkerThread(self._requestQueue, self._resultQueue, poll_timeout=poll_timeout)) def dismissWorkers(self, num_workers, do_join=False): '''停用num_workers数量的线程,并加入dismiss_list''' dismiss_list = [] for i in range(min(num_workers, len(self.workers))): worker = self.workers.pop() worker.dismiss() dismiss_list.append(worker) if do_join: for worker in dismiss_list: worker.join() else: self.dismissedWorkers.extend(dismiss_list) def joinAllDismissedWorkers(self): '''join 所有停用的thread''' # print len(self.dismissedWorkers) for worker in self.dismissedWorkers: worker.join() self.dismissedWorkers = [] def putRequest(self, request, block=True, timeout=None): assert isinstance(request, WorkRequest) assert not getattr(request, 'exception', None) '''当queue满了,也就是容量达到了前面设定的q_size,它将一直阻塞,直到有空余位置,或是timeout''' self._requestQueue.put(request, block, timeout) self.workRequests[request.requestID] = request def poll(self, block=False): while True: if not self.workRequests: raise NoResultsPending elif block and not self.workers: raise NoWorkersAvailable try: '''默认只要resultQueue有值,则取出,否则一直block''' request, result = self._resultQueue.get(block=block) if request.exception and request.exc_callback: request.exc_callback(request, result) if request.callback and not (request.exception and request.exc_callback): request.callback(request, result) del self.workRequests[request.requestID] except Queue.Empty: break def wait(self): while True: try: self.poll(True) except NoResultsPending: break def workersize(self): return len(self.workers) def stop(self): '''join 所有的thread,确保所有的线程都执行完毕''' self.dismissWorkers(self.workersize(), True) self.joinAllDismissedWorkers()# 测试代码if __name__ == '__main__': import random import time import datetime def do_work(data): time.sleep(random.randint(1, 3)) res = str(datetime.datetime.now()) + "" + str(data) return res def print_result(request, result): print "---Result from request %s : %r" % (request.requestID, result) main = ThreadPool(3) for i in range(40): req = WorkRequest(do_work, args=[i], kwds={}, callback=print_result) main.putRequest(req) print "work request #%s added." % req.requestID print '-' * 20, main.workersize(), '-' * 20 counter = 0 while True: try: time.sleep(0.5) main.poll() if (counter == 5): print "Add 3 more workers threads" main.createWorkers(3) print '-' * 20, main.workersize(), '-' * 20 if (counter == 10): print "dismiss 2 workers threads" main.dismissWorkers(2) print '-' * 20, main.workersize(), '-' * 20 counter += 1 except NoResultsPending: print "no pending results" break main.stop() print "Stop"