一段老外写的python线程池代码

最近在看python线程,对它的锁和GIL如此繁琐恶心至极!

看了iteye的一篇帖子,记录一下国外友人写的线程池代码,简单已读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import threading
class WorkerTask(object):
"""A task to be performed by the ThreadPool."""
def __init__(self, function, args=(), kwargs={}):
self.function = function
self.args = args
self.kwargs = kwargs
def __call__(self):
self.function(*self.args, **self.kwargs)
class WorkerThread(threading.Thread):
"""A thread managed by a thread pool."""
def __init__(self, pool):
threading.Thread.__init__(self)
self.setDaemon(True)
self.pool = pool
self.busy = False
self._started = False
self._event = None
def work(self):
if self._started is True:
if self._event is not None and not self._event.isSet():
self._event.set()
else:
self._started = True
self.start()
def run(self):
while True:
self.busy = True
while len(self.pool._tasks) > 0:
try:
task = self.pool._tasks.pop()
task()
except IndexError:
# Just in case another thread grabbed the task 1st.
pass
# Sleep until needed again
self.busy = False
if self._event is None:
self._event = threading.Event()
else:
self._event.clear()
self._event.wait()
class ThreadPool(object):
"""Executes queued tasks in the background."""
def __init__(self, max_pool_size=10):
self.max_pool_size = max_pool_size
self._threads = []
self._tasks = []
def _addTask(self, task):
self._tasks.append(task)
worker_thread = None
for thread in self._threads:
if thread.busy is False:
worker_thread = thread
break
if worker_thread is None and len(self._threads) <= self.max_pool_size:
worker_thread = WorkerThread(self)
self._threads.append(worker_thread)
if worker_thread is not None:
worker_thread.work()
def addTask(self, function, args=(), kwargs={}):
self._addTask(WorkerTask(function, args, kwargs))
class GlobalThreadPool(object):
"""ThreadPool Singleton class."""
_instance = None
def __init__(self):
"""Create singleton instance """
if GlobalThreadPool._instance is None:
# Create and remember instance
GlobalThreadPool._instance = ThreadPool()
def __getattr__(self, attr):
""" Delegate get access to implementation """
return getattr(self._instance, attr)
def __setattr__(self, attr, val):
""" Delegate set access to implementation """
return setattr(self._instance, attr, val)