python多进程编程

由于GIL的存在,python多线程无法完成多核并行运算,因此在处理CPU-bound的工作时,可以利用multiprocessing模块实现多进程编程。与多线程相比,多进程有以下优点:

  • 不受GIL限制
  • 在Mac和Linux系统下效率高

1. The Process class

In multiprocessing, processes are spawned by creating a Process object and then calling its start() method. Process follows the API of threading.Thread.

class multiprocessing.Process()
pid # Return the process ID
exitcode # The child’s exit code.
terminate() # Terminate the process.
daemon # When a process exits, it attempts to terminate all of its daemonic child processes.

The multiprocessing module also introduces APIs which do not have analogs in the threading module.

  • Queue and Pipe
  • Pool

Note: on windows, if __name__ == '__main__' is especially important since on windows, this module spawns processes by creating new python interpreter which is different from the fork() method in Unix. And because of this, windows will take more time and resources to spawn processes compared to Unix.

use set_start_method('spawn') to set start method

2. Communication between processes

Use pipe and queue to avoid having to use any synchronization primitives like locks.

Queue(FIFO)

class multiprocessing.Queue([maxsize])

qsize()
empty()/full()
put(obj[, block[, timeout]])
get([block[, timeout]]) # Remove and return an item from the queue
close() # Indicate that no more data will be put on this queue by the current process
join_thread() # Join the background thread after called close()
cancel_join_thread()

Queue example:

from multiprocessing import Process, Queue
def (q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()

3. Pool

The Pool class represents a pool of worker processes. It has methods which allows tasks to be offloaded to the worker processes in a few different ways.

One can create a pool of processes which will carry out tasks submitted to it with the Pool class.

class multiprocessing.pool.Pool()

If you need to run a function in a separate process, but want the current process to block until that function returns, use map or apply, otherwise, use map_async or apply_async.

map(func, iterable[, chunksize]): same like map function, applies the same function to many arguments.

map_async(func[, args[, kwds[, callback[, error_callback]]]]): call returns immediately instead of waiting for the result. You call its get() method to retrieve the result of the function call. When the function is complete, callback() is called. This can be used instead of calling get().

An example of callback function:

import multiprocessing as mp
import time
def foo_pool(x):
time.sleep(2)
return x*x
result_list = []
def log_result(result):
# This is called whenever foo_pool(i) returns a result.
# result_list is modified only by the main process, not the pool workers.
result_list.append(result)
def apply_async_with_callback():
pool = mp.Pool()
for i in range(10):
pool.apply_async(foo_pool, args = (i, ), callback = log_result)
pool.close()
pool.join()
print(result_list)
if __name__ == '__main__':
apply_async_with_callback()

starmap_async(func, iterable[, chunksize]): Like map() except that the elements of the iterable are expected to be iterables that are unpacked as arguments.

An example of starmap_async:

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support
def func(a, b):
return a + b
def main():
a_args = [1,2,3]
second_arg = 1
with Pool() as pool:
# use startmap
L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
M = pool.starmap(func, zip(a_args, repeat(second_arg)))
# use partial function
N = pool.map(partial(func, b=second_arg), a_args)
assert L == M == N
if __name__=="__main__":
freeze_support()
main()

4. Sharing state between processes

Shared memory

Using Value and Array to share data:

from multiprocessing import Process, Value, Array
def (n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])

Server process

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.

5. The multiprocessing.dummy module

multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.

Test the efficiency of multithreading and multiprocessing

from urllib.request import urlopen
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import Pool
import time
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
'http://planet.python.org/',
'https://wiki.python.org/moin/LocalUserGroups',
'http://www.python.org/psf/',
'http://docs.python.org/devguide/',
'http://www.python.org/community/awards/'
]
t1 = time.time()
# Make the Pool of workers
with ThreadPool(8) as thread_pool:
results = thread_pool.map_async(urlopen, urls)
thread_pool.join()
t2 = time.time()
print("threading use {} seconds".format(t2-t1))
t3 = time.time()
with Pool(8) as process_pool:
results = process_pool.map_async(urlopen, urls)
process_pool.join()
t4 = time.time()
print("multiprocessing use {} seconds".format(t4-t3))

Result:

threading use 0.10493803024291992 seconds
multiprocessing use 0.1377570629119873 seconds