multiprocessing— Process-based parallelism

Source code:Lib/multiprocessing/


Availability:not Android, not iOS, not WASI.

This module is not supported onmobile platforms orWebAssembly platforms.

Introduction

multiprocessingis a package that supports spawning processes using an API similar to thethreadingmodule. Themultiprocessingpackage offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lockby using subprocesses instead of threads. Due to this, themultiprocessingmodule allows the programmer to fully leverage multiple processors on a given machine. It runs on both POSIX and Windows.

Themultiprocessingmodule also introduces APIs which do not have analogs in thethreadingmodule. A prime example of this is the Poolobject which offers a convenient means of parallelizing the execution of a function across multiple input values, distributing the input data across processes (data parallelism). The following example demonstrates the common practice of defining such functions in a module so that child processes can successfully import that module. This basic example of data parallelism usingPool,

frommultiprocessingimportPool

deff(x):
returnx*x

if__name__=='__main__':
withPool(5)asp:
print(p.map(f,[1,2,3]))

will print to standard output

[1,4,9]

See also

concurrent.futures.ProcessPoolExecutoroffers a higher level interface to push tasks to a background process without blocking execution of the calling process. Compared to using thePool interface directly, theconcurrent.futuresAPI more readily allows the submission of work to the underlying process pool to be separated from waiting for the results.

TheProcessclass

Inmultiprocessing,processes are spawned by creating aProcess object and then calling itsstart()method.Process follows the API ofthreading.Thread.A trivial example of a multiprocess program is

frommultiprocessingimportProcess

deff(name):
print('hello',name)

if__name__=='__main__':
p=Process(target=f,args=('bob',))
p.start()
p.join()

To show the individual process IDs involved, here is an expanded example:

frommultiprocessingimportProcess
importos

definfo(title):
print(title)
print('module name:',__name__)
print('parent process:',os.getppid())
print('process id:',os.getpid())

deff(name):
info('function f')
print('hello',name)

if__name__=='__main__':
info('main line')
p=Process(target=f,args=('bob',))
p.start()
p.join()

For an explanation of why theif__name__=='__main__'part is necessary, seeProgramming guidelines.

Contexts and start methods

Depending on the platform,multiprocessingsupports three ways to start a process. Thesestart methodsare

spawn

The parent process starts a fresh Python interpreter process. The child process will only inherit those resources necessary to run the process object’srun()method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to usingforkorforkserver.

Available on POSIX and Windows platforms. The default on Windows and macOS.

fork

The parent process usesos.fork()to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.

Available on POSIX systems. Currently the default on POSIX except macOS.

Note

The default start method will change away fromforkin Python 3.14. Code that requiresforkshould explicitly specify that via get_context()orset_start_method().

Changed in version 3.12:If Python is able to detect that your process has multiple threads, the os.fork()function that this start method calls internally will raise aDeprecationWarning.Use a different start method. See theos.fork()documentation for further explanation.

forkserver

When the program starts and selects theforkserverstart method, a server process is spawned. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded unless system libraries or preloaded imports spawn threads as a side-effect so it is generally safe for it to useos.fork(). No unnecessary resources are inherited.

Available on POSIX platforms which support passing file descriptors over Unix pipes such as Linux.

Changed in version 3.4:spawnadded on all POSIX platforms, andforkserveradded for some POSIX platforms. Child processes no longer inherit all of the parents inheritable handles on Windows.

Changed in version 3.8:On macOS, thespawnstart method is now the default. Theforkstart method should be considered unsafe as it can lead to crashes of the subprocess as macOS system libraries may start threads. Seebpo-33725.

On POSIX using thespawnorforkserverstart methods will also start aresource trackerprocess which tracks the unlinked named system resources (such as named semaphores or SharedMemoryobjects) created by processes of the program. When all processes have exited the resource tracker unlinks any remaining tracked object. Usually there should be none, but if a process was killed by a signal there may be some “leaked” resources. (Neither leaked semaphores nor shared memory segments will be automatically unlinked until the next reboot. This is problematic for both objects because the system allows only a limited number of named semaphores, and shared memory segments occupy some space in the main memory.)

To select a start method you use theset_start_method()in theif__name__=='__main__'clause of the main module. For example:

importmultiprocessingasmp

deffoo(q):
q.put('hello')

if__name__=='__main__':
mp.set_start_method('spawn')
q=mp.Queue()
p=mp.Process(target=foo,args=(q,))
p.start()
print(q.get())
p.join()

set_start_method()should not be used more than once in the program.

Alternatively, you can useget_context()to obtain a context object. Context objects have the same API as the multiprocessing module, and allow one to use multiple start methods in the same program.

importmultiprocessingasmp

deffoo(q):
q.put('hello')

if__name__=='__main__':
ctx=mp.get_context('spawn')
q=ctx.Queue()
p=ctx.Process(target=foo,args=(q,))
p.start()
print(q.get())
p.join()

Note that objects related to one context may not be compatible with processes for a different context. In particular, locks created using theforkcontext cannot be passed to processes started using the spawnorforkserverstart methods.

A library which wants to use a particular start method should probably useget_context()to avoid interfering with the choice of the library user.

Warning

The'spawn'and'forkserver'start methods generally cannot be used with “frozen” executables (i.e., binaries produced by packages likePyInstallerandcx_Freeze) on POSIX systems. The'fork'start method may work if code does not use threads.

Exchanging objects between processes

multiprocessingsupports two types of communication channel between processes:

Queues

TheQueueclass is a near clone ofqueue.Queue.For example:

frommultiprocessingimportProcess,Queue

deff(q):
q.put([42,None,'hello'])

if__name__=='__main__':
q=Queue()
p=Process(target=f,args=(q,))
p.start()
print(q.get())# prints "[42, None, 'hello']"
p.join()

Queues are thread and process safe. Any object put into amultiprocessingqueue will be serialized.

Pipes

ThePipe()function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

frommultiprocessingimportProcess,Pipe

deff(conn):
conn.send([42,None,'hello'])
conn.close()

if__name__=='__main__':
parent_conn,child_conn=Pipe()
p=Process(target=f,args=(child_conn,))
p.start()
print(parent_conn.recv())# prints "[42, None, 'hello']"
p.join()

The two connection objects returned byPipe()represent the two ends of the pipe. Each connection object hassend()and recv()methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to thesameend of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

Thesend()method serializes the the object and recv()re-creates the object.

Synchronization between processes

multiprocessingcontains equivalents of all the synchronization primitives fromthreading.For instance one can use a lock to ensure that only one process prints to standard output at a time:

frommultiprocessingimportProcess,Lock

deff(l,i):
l.acquire()
try:
print('hello world',i)
finally:
l.release()

if__name__=='__main__':
lock=Lock()

fornuminrange(10):
Process(target=f,args=(lock,num)).start()

Without using the lock output from the different processes is liable to get all mixed up.

Sharing state between processes

As mentioned above, when doing concurrent programming it is usually best to avoid using shared state as far as possible. This is particularly true when using multiple processes.

However, if you really do need to use some shared data then multiprocessingprovides a couple of ways of doing so.

Shared memory

Data can be stored in a shared memory map usingValueor Array.For example, the following code

frommultiprocessingimportProcess,Value,Array

deff(n,a):
n.value=3.1415927
foriinrange(len(a)):
a[i]=-a[i]

if__name__=='__main__':
num=Value('d',0.0)
arr=Array('i',range(10))

p=Process(target=f,args=(num,arr))
p.start()
p.join()

print(num.value)
print(arr[:])

will print

3.1415927
[0,-1,-2,-3,-4,-5,-6,-7,-8,-9]

The'd'and'i'arguments used when creatingnumandarrare typecodes of the kind used by thearraymodule:'d'indicates a double precision float and'i'indicates a signed integer. These shared objects will be process and thread-safe.

For more flexibility in using shared memory one can use the multiprocessing.sharedctypesmodule which supports the creation of arbitrary ctypes objects allocated from shared memory.

Server process

A manager object returned byManager()controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned byManager()will support types list,dict,Namespace,Lock, RLock,Semaphore,BoundedSemaphore, Condition,Event,Barrier, Queue,ValueandArray.For example,

frommultiprocessingimportProcess,Manager

deff(d,l):
d[1]='1'
d['2']=2
d[0.25]=None
l.reverse()

if__name__=='__main__':
withManager()asmanager:
d=manager.dict()
l=manager.list(range(10))

p=Process(target=f,args=(d,l))
p.start()
p.join()

print(d)
print(l)

will print

{0.25:None,1:'1','2':2}
[9,8,7,6,5,4,3,2,1,0]

Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.

Using a pool of workers

ThePoolclass represents a pool of worker processes. It has methods which allows tasks to be offloaded to the worker processes in a few different ways.

For example:

frommultiprocessingimportPool,TimeoutError
importtime
importos

deff(x):
returnx*x

if__name__=='__main__':
# start 4 worker processes
withPool(processes=4)aspool:

# print "[0, 1, 4,..., 81]"
print(pool.map(f,range(10)))

# print same numbers in arbitrary order
foriinpool.imap_unordered(f,range(10)):
print(i)

# evaluate "f(20)" asynchronously
res=pool.apply_async(f,(20,))# runs in *only* one process
print(res.get(timeout=1))# prints "400"

# evaluate "os.getpid()" asynchronously
res=pool.apply_async(os.getpid,())# runs in *only* one process
print(res.get(timeout=1))# prints the PID of that process

# launching multiple evaluations asynchronously *may* use more processes
multiple_results=[pool.apply_async(os.getpid,())foriinrange(4)]
print([res.get(timeout=1)forresinmultiple_results])

# make a single worker sleep for 10 seconds
res=pool.apply_async(time.sleep,(10,))
try:
print(res.get(timeout=1))
exceptTimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")

print("For the moment, the pool remains available for more work")

# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")

Note that the methods of a pool should only ever be used by the process which created it.

Note

Functionality within this package requires that the__main__module be importable by the children. This is covered inProgramming guidelines however it is worth pointing out here. This means that some examples, such as themultiprocessing.pool.Poolexamples will not work in the interactive interpreter. For example:

>>>frommultiprocessingimportPool
>>>p=Pool(5)
>>>deff(x):
...returnx*x
...
>>>withp:
...p.map(f,[1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError:Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>

(If you try this it will actually output three full tracebacks interleaved in a semi-random fashion, and then you may have to stop the parent process somehow.)

Reference

Themultiprocessingpackage mostly replicates the API of the threadingmodule.

Processand exceptions

classmultiprocessing.Process(group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)

Process objects represent activity that is run in a separate process. The Processclass has equivalents of all the methods of threading.Thread.

The constructor should always be called with keyword arguments.group should always beNone;it exists solely for compatibility with threading.Thread.targetis the callable object to be invoked by therun()method. It defaults toNone,meaning nothing is called.nameis the process name (seenamefor more details). argsis the argument tuple for the target invocation.kwargsis a dictionary of keyword arguments for the target invocation. If provided, the keyword-onlydaemonargument sets the processdaemonflag toTrueorFalse.IfNone(the default), this flag will be inherited from the creating process.

By default, no arguments are passed totarget.Theargsargument, which defaults to(),can be used to specify a list or tuple of the arguments to pass totarget.

If a subclass overrides the constructor, it must make sure it invokes the base class constructor (Process.__init__()) before doing anything else to the process.

Changed in version 3.3:Added thedaemonparameter.

run()

Method representing the process’s activity.

You may override this method in a subclass. The standardrun() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from theargsandkwargsarguments, respectively.

Using a list or tuple as theargsargument passed toProcess achieves the same effect.

Example:

>>>frommultiprocessingimportProcess
>>>p=Process(target=print,args=[1])
>>>p.run()
1
>>>p=Process(target=print,args=(1,))
>>>p.run()
1
start()

Start the process’s activity.

This must be called at most once per process object. It arranges for the object’srun()method to be invoked in a separate process.

join([timeout])

If the optional argumenttimeoutisNone(the default), the method blocks until the process whosejoin()method is called terminates. Iftimeoutis a positive number, it blocks at mosttimeoutseconds. Note that the method returnsNoneif its process terminates or if the method times out. Check the process’sexitcodeto determine if it terminated.

A process can be joined many times.

A process cannot join itself because this would cause a deadlock. It is an error to attempt to join a process before it has been started.

name

The process’s name. The name is a string used for identification purposes only. It has no semantics. Multiple processes may be given the same name.

The initial name is set by the constructor. If no explicit name is provided to the constructor, a name of the form ‘Process-N1:N2:…:Nk’ is constructed, where each Nkis the N-th child of its parent.

is_alive()

Return whether the process is alive.

Roughly, a process object is alive from the moment thestart() method returns until the child process terminates.

daemon

The process’s daemon flag, a Boolean value. This must be set before start()is called.

The initial value is inherited from the creating process.

When a process exits, it attempts to terminate all of its daemonic child processes.

Note that a daemonic process is not allowed to create child processes. Otherwise a daemonic process would leave its children orphaned if it gets terminated when its parent process exits. Additionally, these arenot Unix daemons or services, they are normal processes that will be terminated (and not joined) if non-daemonic processes have exited.

In addition to thethreading.ThreadAPI,Processobjects also support the following attributes and methods:

pid

Return the process ID. Before the process is spawned, this will be None.

exitcode

The child’s exit code. This will beNoneif the process has not yet terminated.

If the child’srun()method returned normally, the exit code will be 0. If it terminated viasys.exit()with an integer argumentN,the exit code will beN.

If the child terminated due to an exception not caught within run(),the exit code will be 1. If it was terminated by signalN,the exit code will be the negative value-N.

authkey

The process’s authentication key (a byte string).

Whenmultiprocessingis initialized the main process is assigned a random string usingos.urandom().

When aProcessobject is created, it will inherit the authentication key of its parent process, although this may be changed by settingauthkeyto another byte string.

SeeAuthentication keys.

sentinel

A numeric handle of a system object which will become “ready” when the process ends.

You can use this value if you want to wait on several events at once usingmultiprocessing.connection.wait().Otherwise callingjoin()is simpler.

On Windows, this is an OS handle usable with theWaitForSingleObject andWaitForMultipleObjectsfamily of API calls. On POSIX, this is a file descriptor usable with primitives from theselectmodule.

Added in version 3.3.

terminate()

Terminate the process. On POSIX this is done using theSIGTERMsignal; on WindowsTerminateProcess()is used. Note that exit handlers and finally clauses, etc., will not be executed.

Note that descendant processes of the process willnotbe terminated – they will simply become orphaned.

Warning

If this method is used when the associated process is using a pipe or queue then the pipe or queue is liable to become corrupted and may become unusable by other process. Similarly, if the process has acquired a lock or semaphore etc. then terminating it is liable to cause other processes to deadlock.

kill()

Same asterminate()but using theSIGKILLsignal on POSIX.

Added in version 3.7.

close()

Close theProcessobject, releasing all resources associated with it.ValueErroris raised if the underlying process is still running. Onceclose()returns successfully, most other methods and attributes of theProcessobject will raiseValueError.

Added in version 3.7.

Note that thestart(),join(),is_alive(), terminate()andexitcodemethods should only be called by the process that created the process object.

Example usage of some of the methods ofProcess:

>>>importmultiprocessing,time,signal
>>>mp_context=multiprocessing.get_context('spawn')
>>>p=mp_context.Process(target=time.sleep,args=(1000,))
>>>print(p,p.is_alive())
<...Process... initial> False
>>>p.start()
>>>print(p,p.is_alive())
<...Process... started> True
>>>p.terminate()
>>>time.sleep(0.1)
>>>print(p,p.is_alive())
<...Process... stopped exitcode=-SIGTERM> False
>>>p.exitcode==-signal.SIGTERM
True
exceptionmultiprocessing.ProcessError

The base class of allmultiprocessingexceptions.

exceptionmultiprocessing.BufferTooShort

Exception raised byConnection.recv_bytes_into()when the supplied buffer object is too small for the message read.

Ifeis an instance ofBufferTooShortthene.args[0]will give the message as a byte string.

exceptionmultiprocessing.AuthenticationError

Raised when there is an authentication error.

exceptionmultiprocessing.TimeoutError

Raised by methods with a timeout when the timeout expires.

Pipes and Queues

When using multiple processes, one generally uses message passing for communication between processes and avoids having to use any synchronization primitives like locks.

For passing messages one can usePipe()(for a connection between two processes) or a queue (which allows multiple producers and consumers).

TheQueue,SimpleQueueandJoinableQueuetypes are multi-producer, multi-consumerFIFO queues modelled on thequeue.Queueclass in the standard library. They differ in thatQueuelacks the task_done()andjoin()methods introduced into Python 2.5’squeue.Queueclass.

If you useJoinableQueuethen youmustcall JoinableQueue.task_done()for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow, raising an exception.

One difference from other Python queue implementations, is thatmultiprocessing queues serializes all objects that are put into them usingpickle. The object return by the get method is a re-created object that does not share memory with the original object.

Note that one can also create a shared queue by using a manager object – see Managers.

Note

multiprocessinguses the usualqueue.Emptyand queue.Fullexceptions to signal a timeout. They are not available in themultiprocessingnamespace so you need to import them from queue.

Note

When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences which are a little surprising, but should not cause any practical difficulties – if they really bother you then you can instead use a queue created with a manager.

  1. After putting an object on an empty queue there may be an infinitesimal delay before the queue’sempty() method returnsFalseandget_nowait()can return without raisingqueue.Empty.

  2. If multiple processes are enqueuing objects, it is possible for the objects to be received at the other end out-of-order. However, objects enqueued by the same process will always be in the expected order with respect to each other.

Warning

If a process is killed usingProcess.terminate()oros.kill() while it is trying to use aQueue,then the data in the queue is likely to become corrupted. This may cause any other process to get an exception when it tries to use the queue later on.

Warning

As mentioned above, if a child process has put items on a queue (and it has not usedJoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

For an example of the usage of queues for interprocess communication see Examples.

multiprocessing.Pipe([duplex])

Returns a pair(conn1,conn2)of Connectionobjects representing the ends of a pipe.

IfduplexisTrue(the default) then the pipe is bidirectional. If duplexisFalsethen the pipe is unidirectional:conn1can only be used for receiving messages andconn2can only be used for sending messages.

Thesend()method serializes the the object using pickleand therecv()re-creates the object.

classmultiprocessing.Queue([maxsize])

Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.

The usualqueue.Emptyandqueue.Fullexceptions from the standard library’squeuemodule are raised to signal timeouts.

Queueimplements all the methods ofqueue.Queueexcept for task_done()andjoin().

qsize()

Return the approximate size of the queue. Because of multithreading/multiprocessing semantics, this number is not reliable.

Note that this may raiseNotImplementedErroron platforms like macOS wheresem_getvalue()is not implemented.

empty()

ReturnTrueif the queue is empty,Falseotherwise. Because of multithreading/multiprocessing semantics, this is not reliable.

May raise anOSErroron closed queues. (not guaranteed)

full()

ReturnTrueif the queue is full,Falseotherwise. Because of multithreading/multiprocessing semantics, this is not reliable.

put(obj[,block[,timeout]])

Put obj into the queue. If the optional argumentblockisTrue (the default) andtimeoutisNone(the default), block if necessary until a free slot is available. Iftimeoutis a positive number, it blocks at mosttimeoutseconds and raises thequeue.Fullexception if no free slot was available within that time. Otherwise (blockis False), put an item on the queue if a free slot is immediately available, else raise thequeue.Fullexception (timeoutis ignored in that case).

Changed in version 3.8:If the queue is closed,ValueErroris raised instead of AssertionError.

put_nowait(obj)

Equivalent toput(obj,False).

get([block[,timeout]])

Remove and return an item from the queue. If optional argsblockis True(the default) andtimeoutisNone(the default), block if necessary until an item is available. Iftimeoutis a positive number, it blocks at mosttimeoutseconds and raises thequeue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the queue.Emptyexception (timeoutis ignored in that case).

Changed in version 3.8:If the queue is closed,ValueErroris raised instead of OSError.

get_nowait()

Equivalent toget(False).

multiprocessing.Queuehas a few additional methods not found in queue.Queue.These methods are usually unnecessary for most code:

close()

Indicate that no more data will be put on this queue by the current process. The background thread will quit once it has flushed all buffered data to the pipe. This is called automatically when the queue is garbage collected.

join_thread()

Join the background thread. This can only be used afterclose()has been called. It blocks until the background thread exits, ensuring that all data in the buffer has been flushed to the pipe.

By default if a process is not the creator of the queue then on exit it will attempt to join the queue’s background thread. The process can call cancel_join_thread()to makejoin_thread()do nothing.

cancel_join_thread()

Preventjoin_thread()from blocking. In particular, this prevents the background thread from being joined automatically when the process exits – seejoin_thread().

A better name for this method might be allow_exit_without_flush().It is likely to cause enqueued data to be lost, and you almost certainly will not need to use it. It is really only there if you need the current process to exit immediately without waiting to flush enqueued data to the underlying pipe, and you don’t care about lost data.

Note

This class’s functionality requires a functioning shared semaphore implementation on the host operating system. Without one, the functionality in this class will be disabled, and attempts to instantiate aQueuewill result in anImportError.See bpo-3770for additional information. The same holds true for any of the specialized queue types listed below.

classmultiprocessing.SimpleQueue

It is a simplifiedQueuetype, very close to a lockedPipe.

close()

Close the queue: release internal resources.

A queue must not be used anymore after it is closed. For example, get(),put()andempty()methods must no longer be called.

Added in version 3.9.

empty()

ReturnTrueif the queue is empty,Falseotherwise.

Always raises anOSErrorif the SimpleQueue is closed.

get()

Remove and return an item from the queue.

put(item)

Putiteminto the queue.

classmultiprocessing.JoinableQueue([maxsize])

JoinableQueue,aQueuesubclass, is a queue which additionally hastask_done()andjoin()methods.

task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumers. For eachget()used to fetch a task, a subsequent call totask_done()tells the queue that the processing on the task is complete.

If ajoin()is currently blocking, it will resume when all items have been processed (meaning that atask_done()call was received for every item that had beenput()into the queue).

Raises aValueErrorif called more times than there were items placed in the queue.

join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done()to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join()unblocks.

Miscellaneous

multiprocessing.active_children()

Return list of all live children of the current process.

Calling this has the side effect of “joining” any processes which have already finished.

multiprocessing.cpu_count()

Return the number of CPUs in the system.

This number is not equivalent to the number of CPUs the current process can use. The number of usable CPUs can be obtained with os.process_cpu_count()(orlen(os.sched_getaffinity(0))).

When the number of CPUs cannot be determined aNotImplementedError is raised.

Changed in version 3.13:The return value can also be overridden using the -Xcpu_countflag orPYTHON_CPU_COUNTas this is merely a wrapper around theoscpu count APIs.

multiprocessing.current_process()

Return theProcessobject corresponding to the current process.

An analogue ofthreading.current_thread().

multiprocessing.parent_process()

Return theProcessobject corresponding to the parent process of thecurrent_process().For the main process,parent_processwill beNone.

Added in version 3.8.

multiprocessing.freeze_support()

Add support for when a program which usesmultiprocessinghas been frozen to produce a Windows executable. (Has been tested withpy2exe, PyInstallerandcx_Freeze.)

One needs to call this function straight after theif__name__== '__main__'line of the main module. For example:

frommultiprocessingimportProcess,freeze_support

deff():
print('hello world!')

if__name__=='__main__':
freeze_support()
Process(target=f).start()

If thefreeze_support()line is omitted then trying to run the frozen executable will raiseRuntimeError.

Callingfreeze_support()has no effect when invoked on any operating system other than Windows. In addition, if the module is being run normally by the Python interpreter on Windows (the program has not been frozen), thenfreeze_support()has no effect.

multiprocessing.get_all_start_methods()

Returns a list of the supported start methods, the first of which is the default. The possible start methods are'fork', 'spawn'and'forkserver'.Not all platforms support all methods. SeeContexts and start methods.

Added in version 3.4.

multiprocessing.get_context(method=None)

Return a context object which has the same attributes as the multiprocessingmodule.

IfmethodisNonethen the default context is returned. Otherwisemethodshould be'fork','spawn', 'forkserver'.ValueErroris raised if the specified start method is not available. SeeContexts and start methods.

Added in version 3.4.

multiprocessing.get_start_method(allow_none=False)

Return the name of start method used for starting processes.

If the start method has not been fixed andallow_noneis false, then the start method is fixed to the default and the name is returned. If the start method has not been fixed andallow_none is true thenNoneis returned.

The return value can be'fork','spawn','forkserver' orNone.SeeContexts and start methods.

Added in version 3.4.

Changed in version 3.8:On macOS, thespawnstart method is now the default. Theforkstart method should be considered unsafe as it can lead to crashes of the subprocess. Seebpo-33725.

multiprocessing.set_executable(executable)

Set the path of the Python interpreter to use when starting a child process. (By defaultsys.executableis used). Embedders will probably need to do some thing like

set_executable(os.path.join(sys.exec_prefix,' Python w.exe'))

before they can create child processes.

Changed in version 3.4:Now supported on POSIX when the'spawn'start method is used.

Changed in version 3.11:Accepts apath-like object.

multiprocessing.set_forkserver_preload(module_names)

Set a list of module names for the forkserver main process to attempt to import so that their already imported state is inherited by forked processes. AnyImportErrorwhen doing so is silently ignored. This can be used as a performance enhancement to avoid repeated work in every process.

For this to work, it must be called before the forkserver process has been launched (before creating aPoolor starting aProcess).

Only meaningful when using the'forkserver'start method. SeeContexts and start methods.

Added in version 3.4.

multiprocessing.set_start_method(method,force=False)

Set the method which should be used to start child processes. Themethodargument can be'fork','spawn'or'forkserver'. RaisesRuntimeErrorif the start method has already been set andforce is notTrue.IfmethodisNoneandforceisTruethen the start method is set toNone.IfmethodisNoneandforceisFalse then the context is set to the default context.

Note that this should be called at most once, and it should be protected inside theif__name__=='__main__'clause of the main module.

SeeContexts and start methods.

Added in version 3.4.

Connection Objects

Connection objects allow the sending and receiving of picklable objects or strings. They can be thought of as message oriented connected sockets.

Connection objects are usually created using Pipe– see also Listeners and Clients.

classmultiprocessing.connection.Connection
send(obj)

Send an object to the other end of the connection which should be read usingrecv().

The object must be picklable. Very large pickles (approximately 32 MiB+, though it depends on the OS) may raise aValueErrorexception.

recv()

Return an object sent from the other end of the connection using send().Blocks until there is something to receive. Raises EOFErrorif there is nothing left to receive and the other end was closed.

fileno()

Return the file descriptor or handle used by the connection.

close()

Close the connection.

This is called automatically when the connection is garbage collected.

poll([timeout])

Return whether there is any data available to be read.

Iftimeoutis not specified then it will return immediately. If timeoutis a number then this specifies the maximum time in seconds to block. IftimeoutisNonethen an infinite timeout is used.

Note that multiple connection objects may be polled at once by usingmultiprocessing.connection.wait().

send_bytes(buffer[,offset[,size]])

Send byte data from abytes-like objectas a complete message.

Ifoffsetis given then data is read from that position inbuffer.If sizeis given then that many bytes will be read from buffer. Very large buffers (approximately 32 MiB+, though it depends on the OS) may raise a ValueErrorexception

recv_bytes([maxlength])

Return a complete message of byte data sent from the other end of the connection as a string. Blocks until there is something to receive. RaisesEOFErrorif there is nothing left to receive and the other end has closed.

Ifmaxlengthis specified and the message is longer thanmaxlength thenOSErroris raised and the connection will no longer be readable.

Changed in version 3.3:This function used to raiseIOError,which is now an alias ofOSError.

recv_bytes_into(buffer[,offset])

Read intobuffera complete message of byte data sent from the other end of the connection and return the number of bytes in the message. Blocks until there is something to receive. Raises EOFErrorif there is nothing left to receive and the other end was closed.

buffermust be a writablebytes-like object.If offsetis given then the message will be written into the buffer from that position. Offset must be a non-negative integer less than the length ofbuffer(in bytes).

If the buffer is too short then aBufferTooShortexception is raised and the complete message is available ase.args[0]wheree is the exception instance.

Changed in version 3.3:Connection objects themselves can now be transferred between processes usingConnection.send()andConnection.recv().

Connection objects also now support the context management protocol – see Context Manager Types.__enter__()returns the connection object, and__exit__()callsclose().

For example:

>>>frommultiprocessingimportPipe
>>>a,b=Pipe()
>>>a.send([1,'hello',None])
>>>b.recv()
[1, 'hello', None]
>>>b.send_bytes(b'thank you')
>>>a.recv_bytes()
b'thank you'
>>>importarray
>>>arr1=array.array('i',range(5))
>>>arr2=array.array('i',[0]*10)
>>>a.send_bytes(arr1)
>>>count=b.recv_bytes_into(arr2)
>>>assertcount==len(arr1)*arr1.itemsize
>>>arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

Warning

TheConnection.recv()method automatically unpickles the data it receives, which can be a security risk unless you can trust the process which sent the message.

Therefore, unless the connection object was produced usingPipe()you should only use therecv()andsend() methods after performing some sort of authentication. See Authentication keys.

Warning

If a process is killed while it is trying to read or write to a pipe then the data in the pipe is likely to become corrupted, because it may become impossible to be sure where the message boundaries lie.

Synchronization primitives

Generally synchronization primitives are not as necessary in a multiprocess program as they are in a multithreaded program. See the documentation for threadingmodule.

Note that one can also create synchronization primitives by using a manager object – seeManagers.

classmultiprocessing.Barrier(parties[,action[,timeout]])

A barrier object: a clone ofthreading.Barrier.

Added in version 3.3.

classmultiprocessing.BoundedSemaphore([value])

A bounded semaphore object: a close analog of threading.BoundedSemaphore.

A solitary difference from its close analog exists: itsacquiremethod’s first argument is namedblock,as is consistent withLock.acquire().

Note

On macOS, this is indistinguishable fromSemaphorebecause sem_getvalue()is not implemented on that platform.

classmultiprocessing.Condition([lock])

A condition variable: an alias forthreading.Condition.

Iflockis specified then it should be aLockorRLock object frommultiprocessing.

Changed in version 3.3:Thewait_for()method was added.

classmultiprocessing.Event

A clone ofthreading.Event.

classmultiprocessing.Lock

A non-recursive lock object: a close analog ofthreading.Lock. Once a process or thread has acquired a lock, subsequent attempts to acquire it from any process or thread will block until it is released; any process or thread may release it. The concepts and behaviors of threading.Lockas it applies to threads are replicated here in multiprocessing.Lockas it applies to either processes or threads, except as noted.

Note thatLockis actually a factory function which returns an instance ofmultiprocessing.synchronize.Lockinitialized with a default context.

Locksupports thecontext managerprotocol and thus may be used inwithstatements.

acquire(block=True,timeout=None)

Acquire a lock, blocking or non-blocking.

With theblockargument set toTrue(the default), the method call will block until the lock is in an unlocked state, then set it to locked and returnTrue.Note that the name of this first argument differs from that inthreading.Lock.acquire().

With theblockargument set toFalse,the method call does not block. If the lock is currently in a locked state, returnFalse; otherwise set the lock to a locked state and returnTrue.

When invoked with a positive, floating-point value fortimeout,block for at most the number of seconds specified bytimeoutas long as the lock can not be acquired. Invocations with a negative value for timeoutare equivalent to atimeoutof zero. Invocations with a timeoutvalue ofNone(the default) set the timeout period to infinite. Note that the treatment of negative orNonevalues for timeoutdiffers from the implemented behavior in threading.Lock.acquire().Thetimeoutargument has no practical implications if theblockargument is set toFalseand is thus ignored. ReturnsTrueif the lock has been acquired orFalseif the timeout period has elapsed.

release()

Release a lock. This can be called from any process or thread, not only the process or thread which originally acquired the lock.

Behavior is the same as inthreading.Lock.release()except that when invoked on an unlocked lock, aValueErroris raised.

classmultiprocessing.RLock

A recursive lock object: a close analog ofthreading.RLock.A recursive lock must be released by the process or thread that acquired it. Once a process or thread has acquired a recursive lock, the same process or thread may acquire it again without blocking; that process or thread must release it once for each time it has been acquired.

Note thatRLockis actually a factory function which returns an instance ofmultiprocessing.synchronize.RLockinitialized with a default context.

RLocksupports thecontext managerprotocol and thus may be used inwithstatements.

acquire(block=True,timeout=None)

Acquire a lock, blocking or non-blocking.

When invoked with theblockargument set toTrue,block until the lock is in an unlocked state (not owned by any process or thread) unless the lock is already owned by the current process or thread. The current process or thread then takes ownership of the lock (if it does not already have ownership) and the recursion level inside the lock increments by one, resulting in a return value ofTrue.Note that there are several differences in this first argument’s behavior compared to the implementation ofthreading.RLock.acquire(),starting with the name of the argument itself.

When invoked with theblockargument set toFalse,do not block. If the lock has already been acquired (and thus is owned) by another process or thread, the current process or thread does not take ownership and the recursion level within the lock is not changed, resulting in a return value ofFalse.If the lock is in an unlocked state, the current process or thread takes ownership and the recursion level is incremented, resulting in a return value ofTrue.

Use and behaviors of thetimeoutargument are the same as in Lock.acquire().Note that some of these behaviors oftimeout differ from the implemented behaviors inthreading.RLock.acquire().

release()

Release a lock, decrementing the recursion level. If after the decrement the recursion level is zero, reset the lock to unlocked (not owned by any process or thread) and if any other processes or threads are blocked waiting for the lock to become unlocked, allow exactly one of them to proceed. If after the decrement the recursion level is still nonzero, the lock remains locked and owned by the calling process or thread.

Only call this method when the calling process or thread owns the lock. AnAssertionErroris raised if this method is called by a process or thread other than the owner or if the lock is in an unlocked (unowned) state. Note that the type of exception raised in this situation differs from the implemented behavior inthreading.RLock.release().

classmultiprocessing.Semaphore([value])

A semaphore object: a close analog ofthreading.Semaphore.

A solitary difference from its close analog exists: itsacquiremethod’s first argument is namedblock,as is consistent withLock.acquire().

Note

On macOS,sem_timedwaitis unsupported, so callingacquire()with a timeout will emulate that function’s behavior using a sleeping loop.

Note

Some of this package’s functionality requires a functioning shared semaphore implementation on the host operating system. Without one, the multiprocessing.synchronizemodule will be disabled, and attempts to import it will result in anImportError.See bpo-3770for additional information.

SharedctypesObjects

It is possible to create shared objects using shared memory which can be inherited by child processes.

multiprocessing.Value(typecode_or_type,*args,lock=True)

Return actypesobject allocated from shared memory. By default the return value is actually a synchronized wrapper for the object. The object itself can be accessed via thevalueattribute of aValue.

typecode_or_typedetermines the type of the returned object: it is either a ctypes type or a one character typecode of the kind used by thearray module.*argsis passed on to the constructor for the type.

IflockisTrue(the default) then a new recursive lock object is created to synchronize access to the value. Iflockis aLockorRLockobject then that will be used to synchronize access to the value. IflockisFalsethen access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.

Operations like+=which involve a read and write are not atomic. So if, for instance, you want to atomically increment a shared value it is insufficient to just do

counter.value+=1

Assuming the associated lock is recursive (which it is by default) you can instead do

withcounter.get_lock():
counter.value+=1

Note thatlockis a keyword-only argument.

multiprocessing.Array(typecode_or_type,size_or_initializer,*,lock=True)

Return a ctypes array allocated from shared memory. By default the return value is actually a synchronized wrapper for the array.

typecode_or_typedetermines the type of the elements of the returned array: it is either a ctypes type or a one character typecode of the kind used by thearraymodule. Ifsize_or_initializeris an integer, then it determines the length of the array, and the array will be initially zeroed. Otherwise,size_or_initializeris a sequence which is used to initialize the array and whose length determines the length of the array.

IflockisTrue(the default) then a new lock object is created to synchronize access to the value. Iflockis aLockor RLockobject then that will be used to synchronize access to the value. IflockisFalsethen access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.

Note thatlockis a keyword only argument.

Note that an array ofctypes.c_charhasvalueandraw attributes which allow one to use it to store and retrieve strings.

Themultiprocessing.sharedctypesmodule

Themultiprocessing.sharedctypesmodule provides functions for allocating ctypesobjects from shared memory which can be inherited by child processes.

Note

Although it is possible to store a pointer in shared memory remember that this will refer to a location in the address space of a specific process. However, the pointer is quite likely to be invalid in the context of a second process and trying to dereference the pointer from the second process may cause a crash.

multiprocessing.sharedctypes.RawArray(typecode_or_type,size_or_initializer)

Return a ctypes array allocated from shared memory.

typecode_or_typedetermines the type of the elements of the returned array: it is either a ctypes type or a one character typecode of the kind used by thearraymodule. Ifsize_or_initializeris an integer then it determines the length of the array, and the array will be initially zeroed. Otherwisesize_or_initializeris a sequence which is used to initialize the array and whose length determines the length of the array.

Note that setting and getting an element is potentially non-atomic – use Array()instead to make sure that access is automatically synchronized using a lock.

multiprocessing.sharedctypes.RawValue(typecode_or_type,*args)

Return a ctypes object allocated from shared memory.

typecode_or_typedetermines the type of the returned object: it is either a ctypes type or a one character typecode of the kind used by thearray module.*argsis passed on to the constructor for the type.

Note that setting and getting the value is potentially non-atomic – use Value()instead to make sure that access is automatically synchronized using a lock.

Note that an array ofctypes.c_charhasvalueandraw attributes which allow one to use it to store and retrieve strings – see documentation forctypes.

multiprocessing.sharedctypes.Array(typecode_or_type,size_or_initializer,*,lock=True)

The same asRawArray()except that depending on the value oflocka process-safe synchronization wrapper may be returned instead of a raw ctypes array.

IflockisTrue(the default) then a new lock object is created to synchronize access to the value. Iflockis a LockorRLockobject then that will be used to synchronize access to the value. IflockisFalsethen access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.

Note thatlockis a keyword-only argument.

multiprocessing.sharedctypes.Value(typecode_or_type,*args,lock=True)

The same asRawValue()except that depending on the value oflocka process-safe synchronization wrapper may be returned instead of a raw ctypes object.

IflockisTrue(the default) then a new lock object is created to synchronize access to the value. Iflockis aLockor RLockobject then that will be used to synchronize access to the value. IflockisFalsethen access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.

Note thatlockis a keyword-only argument.

multiprocessing.sharedctypes.copy(obj)

Return a ctypes object allocated from shared memory which is a copy of the ctypes objectobj.

multiprocessing.sharedctypes.synchronized(obj[,lock])

Return a process-safe wrapper object for a ctypes object which useslockto synchronize access. IflockisNone(the default) then a multiprocessing.RLockobject is created automatically.

A synchronized wrapper will have two methods in addition to those of the object it wraps:get_obj()returns the wrapped object and get_lock()returns the lock object used for synchronization.

Note that accessing the ctypes object through the wrapper can be a lot slower than accessing the raw ctypes object.

Changed in version 3.5:Synchronized objects support thecontext managerprotocol.

The table below compares the syntax for creating shared ctypes objects from shared memory with the normal ctypes syntax. (In the tableMyStructis some subclass ofctypes.Structure.)

ctypes

sharedctypes using type

sharedctypes using typecode

c_double(2.4)

RawValue(c_double, 2.4)

RawValue(‘d’, 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray(‘h’, 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray(‘i’, (9, 2, 8))

Below is an example where a number of ctypes objects are modified by a child process:

frommultiprocessingimportProcess,Lock
frommultiprocessing.sharedctypesimportValue,Array
fromctypesimportStructure,c_double

classPoint(Structure):
_fields_=[('x',c_double),('y',c_double)]

defmodify(n,x,s,A):
n.value**=2
x.value**=2
s.value=s.value.upper()
forainA:
a.x**=2
a.y**=2

if__name__=='__main__':
lock=Lock()

n=Value('i',7)
x=Value(c_double,1.0/3.0,lock=False)
s=Array('c',b'hello world',lock=lock)
A=Array(Point,[(1.875,-6.25),(-5.75,2.0),(2.375,9.5)],lock=lock)

p=Process(target=modify,args=(n,x,s,A))
p.start()
p.join()

print(n.value)
print(x.value)
print(s.value)
print([(a.x,a.y)forainA])

The results printed are

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

Managers

Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects.Other processes can access the shared objects by using proxies.

multiprocessing.Manager()

Returns a startedSyncManagerobject which can be used for sharing objects between processes. The returned manager object corresponds to a spawned child process and has methods which will create shared objects and return corresponding proxies.

Manager processes will be shutdown as soon as they are garbage collected or their parent process exits. The manager classes are defined in the multiprocessing.managersmodule:

classmultiprocessing.managers.BaseManager(address=None,authkey=None,serializer='pickle',ctx=None,*,shutdown_timeout=1.0)

Create a BaseManager object.

Once created one should callstart()orget_server().serve_forever()to ensure that the manager object refers to a started manager process.

addressis the address on which the manager process listens for new connections. IfaddressisNonethen an arbitrary one is chosen.

authkeyis the authentication key which will be used to check the validity of incoming connections to the server process. If authkeyisNonethencurrent_process().authkeyis used. Otherwiseauthkeyis used and it must be a byte string.

serializermust be'pickle'(usepickleserialization) or 'xmlrpclib'(usexmlrpc.clientserialization).

ctxis a context object, orNone(use the current context). See the get_context()function.

shutdown_timeoutis a timeout in seconds used to wait until the process used by the manager completes in theshutdown()method. If the shutdown times out, the process is terminated. If terminating the process also times out, the process is killed.

Changed in version 3.11:Added theshutdown_timeoutparameter.

start([initializer[,initargs]])

Start a subprocess to start the manager. Ifinitializeris notNone then the subprocess will callinitializer(*initargs)when it starts.

get_server()

Returns aServerobject which represents the actual server under the control of the Manager. TheServerobject supports the serve_forever()method:

>>>frommultiprocessing.managersimportBaseManager
>>>manager=BaseManager(address=('',50000),authkey=b'abc')
>>>server=manager.get_server()
>>>server.serve_forever()

Serveradditionally has anaddressattribute.

connect()

Connect a local manager object to a remote manager process:

>>>frommultiprocessing.managersimportBaseManager
>>>m=BaseManager(address=('127.0.0.1',50000),authkey=b'abc')
>>>m.connect()
shutdown()

Stop the process used by the manager. This is only available if start()has been used to start the server process.

This can be called multiple times.

register(typeid[,callable[,proxytype[,exposed[,method_to_typeid[,create_method]]]]])

A classmethod which can be used for registering a type or callable with the manager class.

typeidis a “type identifier” which is used to identify a particular type of shared object. This must be a string.

callableis a callable used for creating objects for this type identifier. If a manager instance will be connected to the server using theconnect()method, or if the create_methodargument isFalsethen this can be left as None.

proxytypeis a subclass ofBaseProxywhich is used to create proxies for shared objects with thistypeid.IfNonethen a proxy class is created automatically.

exposedis used to specify a sequence of method names which proxies for this typeid should be allowed to access using BaseProxy._callmethod().(IfexposedisNonethen proxytype._exposed_is used instead if it exists.) In the case where no exposed list is specified, all “public methods” of the shared object will be accessible. (Here a “public method” means any attribute which has a__call__()method and whose name does not begin with'_'.)

method_to_typeidis a mapping used to specify the return type of those exposed methods which should return a proxy. It maps method names to typeid strings. (Ifmethod_to_typeidisNonethen proxytype._method_to_typeid_is used instead if it exists.) If a method’s name is not a key of this mapping or if the mapping isNone then the object returned by the method will be copied by value.

create_methoddetermines whether a method should be created with name typeidwhich can be used to tell the server process to create a new shared object and return a proxy for it. By default it isTrue.

BaseManagerinstances also have one read-only property:

address

The address used by the manager.

Changed in version 3.3:Manager objects support the context management protocol – see Context Manager Types.__enter__()starts the server process (if it has not already started) and then returns the manager object.__exit__()callsshutdown().

In previous versions__enter__()did not start the manager’s server process if it was not already started.

classmultiprocessing.managers.SyncManager

A subclass ofBaseManagerwhich can be used for the synchronization of processes. Objects of this type are returned by multiprocessing.Manager().

Its methods create and returnProxy Objectsfor a number of commonly used data types to be synchronized across processes. This notably includes shared lists and dictionaries.

Barrier(parties[,action[,timeout]])

Create a sharedthreading.Barrierobject and return a proxy for it.

Added in version 3.3.

BoundedSemaphore([value])

Create a sharedthreading.BoundedSemaphoreobject and return a proxy for it.

Condition([lock])

Create a sharedthreading.Conditionobject and return a proxy for it.

Iflockis supplied then it should be a proxy for a threading.Lockorthreading.RLockobject.

Changed in version 3.3:Thewait_for()method was added.

Event()

Create a sharedthreading.Eventobject and return a proxy for it.

Lock()

Create a sharedthreading.Lockobject and return a proxy for it.

Namespace()

Create a sharedNamespaceobject and return a proxy for it.

Queue([maxsize])

Create a sharedqueue.Queueobject and return a proxy for it.

RLock()

Create a sharedthreading.RLockobject and return a proxy for it.

Semaphore([value])

Create a sharedthreading.Semaphoreobject and return a proxy for it.

Array(typecode,sequence)

Create an array and return a proxy for it.

Value(typecode,value)

Create an object with a writablevalueattribute and return a proxy for it.

dict()
dict(mapping)
dict(sequence)

Create a shareddictobject and return a proxy for it.

list()
list(sequence)

Create a sharedlistobject and return a proxy for it.

Changed in version 3.6:Shared objects are capable of being nested. For example, a shared container object such as a shared list can contain other shared objects which will all be managed and synchronized by theSyncManager.

classmultiprocessing.managers.Namespace

A type that can register withSyncManager.

A namespace object has no public methods, but does have writable attributes. Its representation shows the values of its attributes.

However, when using a proxy for a namespace object, an attribute beginning with'_'will be an attribute of the proxy and not an attribute of the referent:

>>>mp_context=multiprocessing.get_context('spawn')
>>>manager=mp_context.Manager()
>>>Global=manager.Namespace()
>>>Global.x=10
>>>Global.y='hello'
>>>Global._z=12.3# this is an attribute of the proxy
>>>print(Global)
Namespace(x=10, y='hello')

Customized managers

To create one’s own manager, one creates a subclass ofBaseManagerand uses theregister()classmethod to register new types or callables with the manager class. For example:

frommultiprocessing.managersimportBaseManager

classMathsClass:
defadd(self,x,y):
returnx+y
defmul(self,x,y):
returnx*y

classMyManager(BaseManager):
pass

MyManager.register('Maths',MathsClass)

if__name__=='__main__':
withMyManager()asmanager:
maths=manager.Maths()
print(maths.add(4,3))# prints 7
print(maths.mul(7,8))# prints 56

Using a remote manager

It is possible to run a manager server on one machine and have clients use it from other machines (assuming that the firewalls involved allow it).

Running the following commands creates a server for a single shared queue which remote clients can access:

>>>frommultiprocessing.managersimportBaseManager
>>>fromqueueimportQueue
>>>queue=Queue()
>>>classQueueManager(BaseManager):pass
>>>QueueManager.register('get_queue',callable=lambda:queue)
>>>m=QueueManager(address=('',50000),authkey=b'abracadabra')
>>>s=m.get_server()
>>>s.serve_forever()

One client can access the server as follows:

>>>frommultiprocessing.managersimportBaseManager
>>>classQueueManager(BaseManager):pass
>>>QueueManager.register('get_queue')
>>>m=QueueManager(address=('foo.bar.org',50000),authkey=b'abracadabra')
>>>m.connect()
>>>queue=m.get_queue()
>>>queue.put('hello')

Another client can also use it:

>>>frommultiprocessing.managersimportBaseManager
>>>classQueueManager(BaseManager):pass
>>>QueueManager.register('get_queue')
>>>m=QueueManager(address=('foo.bar.org',50000),authkey=b'abracadabra')
>>>m.connect()
>>>queue=m.get_queue()
>>>queue.get()
'hello'

Local processes can also access that queue, using the code from above on the client to access it remotely:

>>>frommultiprocessingimportProcess,Queue
>>>frommultiprocessing.managersimportBaseManager
>>>classWorker(Process):
...def__init__(self,q):
...self.q=q
...super().__init__()
...defrun(self):
...self.q.put('local hello')
...
>>>queue=Queue()
>>>w=Worker(queue)
>>>w.start()
>>>classQueueManager(BaseManager):pass
...
>>>QueueManager.register('get_queue',callable=lambda:queue)
>>>m=QueueManager(address=('',50000),authkey=b'abracadabra')
>>>s=m.get_server()
>>>s.serve_forever()

Proxy Objects

A proxy is an object whichrefersto a shared object which lives (presumably) in a different process. The shared object is said to be thereferentof the proxy. Multiple proxy objects may have the same referent.

A proxy object has methods which invoke corresponding methods of its referent (although not every method of the referent will necessarily be available through the proxy). In this way, a proxy can be used just like its referent can:

>>>mp_context=multiprocessing.get_context('spawn')
>>>manager=mp_context.Manager()
>>>l=manager.list([i*iforiinrange(10)])
>>>print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>>l[4]
16
>>>l[2:5]
[4, 9, 16]

Notice that applyingstr()to a proxy will return the representation of the referent, whereas applyingrepr()will return the representation of the proxy.

An important feature of proxy objects is that they are picklable so they can be passed between processes. As such, a referent can contain Proxy Objects.This permits nesting of these managed lists, dicts, and otherProxy Objects:

>>>a=manager.list()
>>>b=manager.list()
>>>a.append(b)# referent of a now contains referent of b
>>>print(a,b)
[<ListProxy object, typeid 'list' at...>] []
>>>b.append('hello')
>>>print(a[0],b)
['hello'] ['hello']

Similarly, dict and list proxies may be nested inside one another:

>>>l_outer=manager.list([manager.dict()foriinrange(2)])
>>>d_first_inner=l_outer[0]
>>>d_first_inner['a']=1
>>>d_first_inner['b']=2
>>>l_outer[1]['c']=3
>>>l_outer[1]['z']=26
>>>print(l_outer[0])
{'a': 1, 'b': 2}
>>>print(l_outer[1])
{'c': 3, 'z': 26}

If standard (non-proxy)listordictobjects are contained in a referent, modifications to those mutable values will not be propagated through the manager because the proxy has no way of knowing when the values contained within are modified. However, storing a value in a container proxy (which triggers a__setitem__on the proxy object) does propagate through the manager and so to effectively modify such an item, one could re-assign the modified value to the container proxy:

# create a list proxy and append a mutable object (a dictionary)
lproxy=manager.list()
lproxy.append({})
# now mutate the dictionary
d=lproxy[0]
d['a']=1
d['b']=2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0]=d

This approach is perhaps less convenient than employing nested Proxy Objectsfor most use cases but also demonstrates a level of control over the synchronization.

Note

The proxy types inmultiprocessingdo nothing to support comparisons by value. So, for instance, we have:

>>>manager.list([1,2,3])==[1,2,3]
False

One should just use a copy of the referent instead when making comparisons.

classmultiprocessing.managers.BaseProxy

Proxy objects are instances of subclasses ofBaseProxy.

_callmethod(methodname[,args[,kwds]])

Call and return the result of a method of the proxy’s referent.

Ifproxyis a proxy whose referent isobjthen the expression

proxy._callmethod(methodname,args,kwds)

will evaluate the expression

getattr(obj,methodname)(*args,**kwds)

in the manager’s process.

The returned value will be a copy of the result of the call or a proxy to a new shared object – see documentation for themethod_to_typeid argument ofBaseManager.register().

If an exception is raised by the call, then is re-raised by _callmethod().If some other exception is raised in the manager’s process then this is converted into aRemoteErrorexception and is raised by_callmethod().

Note in particular that an exception will be raised ifmethodnamehas not beenexposed.

An example of the usage of_callmethod():

>>>l=manager.list(range(10))
>>>l._callmethod('__len__')
10
>>>l._callmethod('__getitem__',(slice(2,7),))# equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>>l._callmethod('__getitem__',(20,))# equivalent to l[20]
Traceback (most recent call last):
...
IndexError:list index out of range
_getvalue()

Return a copy of the referent.

If the referent is unpicklable then this will raise an exception.

__repr__()

Return a representation of the proxy object.

__str__()

Return the representation of the referent.

Cleanup

A proxy object uses a weakref callback so that when it gets garbage collected it deregisters itself from the manager which owns its referent.

A shared object gets deleted from the manager process when there are no longer any proxies referring to it.

Process Pools

One can create a pool of processes which will carry out tasks submitted to it with thePoolclass.

classmultiprocessing.pool.Pool([processes[,initializer[,initargs[,maxtasksperchild[,context]]]]])

A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.

processesis the number of worker processes to use. Ifprocessesis Nonethen the number returned byos.process_cpu_count()is used.

Ifinitializeris notNonethen each worker process will call initializer(*initargs)when it starts.

maxtasksperchildis the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. The defaultmaxtasksperchildisNone,which means worker processes will live as long as the pool.

contextcan be used to specify the context used for starting the worker processes. Usually a pool is created using the functionmultiprocessing.Pool()or thePool()method of a context object. In both casescontextis set appropriately.

Note that the methods of the pool object should only be called by the process which created the pool.

Warning

multiprocessing.poolobjects have internal resources that need to be properly managed (like any other resource) by using the pool as a context manager or by callingclose()andterminate()manually. Failure to do this can lead to the process hanging on finalization.

Note that it isnot correctto rely on the garbage collector to destroy the pool as CPython does not assure that the finalizer of the pool will be called (seeobject.__del__()for more information).

Changed in version 3.2:Added themaxtasksperchildparameter.

Changed in version 3.4:Added thecontextparameter.

Changed in version 3.13:processesusesos.process_cpu_count()by default, instead of os.cpu_count().

Note

Worker processes within aPooltypically live for the complete duration of the Pool’s work queue. A frequent pattern found in other systems (such as Apache, mod_wsgi, etc) to free resources held by workers is to allow a worker within a pool to complete only a set amount of work before being exiting, being cleaned up and a new process spawned to replace the old one. Themaxtasksperchild argument to thePoolexposes this ability to the end user.

apply(func[,args[,kwds]])

Callfuncwith argumentsargsand keyword argumentskwds.It blocks until the result is ready. Given this blocks,apply_async()is better suited for performing work in parallel. Additionally,func is only executed in one of the workers of the pool.

apply_async(func[,args[,kwds[,callback[,error_callback]]]])

A variant of theapply()method which returns a AsyncResultobject.

Ifcallbackis specified then it should be a callable which accepts a single argument. When the result becomes readycallbackis applied to it, that is unless the call failed, in which case theerror_callback is applied instead.

Iferror_callbackis specified then it should be a callable which accepts a single argument. If the target function fails, then theerror_callbackis called with the exception instance.

Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.

map(func,iterable[,chunksize])

A parallel equivalent of themap()built-in function (it supports only oneiterableargument though, for multiple iterables seestarmap()). It blocks until the result is ready.

This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by settingchunksizeto a positive integer.

Note that it may cause high memory usage for very long iterables. Consider usingimap()orimap_unordered()with explicitchunksize option for better efficiency.

map_async(func,iterable[,chunksize[,callback[,error_callback]]])

A variant of themap()method which returns a AsyncResultobject.

Ifcallbackis specified then it should be a callable which accepts a single argument. When the result becomes readycallbackis applied to it, that is unless the call failed, in which case theerror_callback is applied instead.

Iferror_callbackis specified then it should be a callable which accepts a single argument. If the target function fails, then theerror_callbackis called with the exception instance.

Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.

imap(func,iterable[,chunksize])

A lazier version ofmap().

Thechunksizeargument is the same as the one used by themap() method. For very long iterables using a large value forchunksizecan make the job completemuchfaster than using the default value of 1.

Also ifchunksizeis1then thenext()method of the iterator returned by theimap()method has an optionaltimeoutparameter: next(timeout)will raisemultiprocessing.TimeoutErrorif the result cannot be returned withintimeoutseconds.

imap_unordered(func,iterable[,chunksize])

The same asimap()except that the ordering of the results from the returned iterator should be considered arbitrary. (Only when there is only one worker process is the order guaranteed to be “correct”.)

starmap(func,iterable[,chunksize])

Likemap()except that the elements of theiterableare expected to be iterables that are unpacked as arguments.

Hence aniterableof[(1,2),(3,4)]results in[func(1,2), func(3,4)].

Added in version 3.3.

starmap_async(func,iterable[,chunksize[,callback[,error_callback]]])

A combination ofstarmap()andmap_async()that iterates over iterableof iterables and callsfuncwith the iterables unpacked. Returns a result object.

Added in version 3.3.

close()

Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.

terminate()

Stops the worker processes immediately without completing outstanding work. When the pool object is garbage collectedterminate()will be called immediately.

join()

Wait for the worker processes to exit. One must callclose()or terminate()before usingjoin().

Changed in version 3.3:Pool objects now support the context management protocol – see Context Manager Types.__enter__()returns the pool object, and__exit__()callsterminate().

classmultiprocessing.pool.AsyncResult

The class of the result returned byPool.apply_async()and Pool.map_async().

get([timeout])

Return the result when it arrives. Iftimeoutis notNoneand the result does not arrive withintimeoutseconds then multiprocessing.TimeoutErroris raised. If the remote call raised an exception then that exception will be reraised byget().

wait([timeout])

Wait until the result is available or untiltimeoutseconds pass.

ready()

Return whether the call has completed.

successful()

Return whether the call completed without raising an exception. Will raiseValueErrorif the result is not ready.

Changed in version 3.7:If the result is not ready,ValueErroris raised instead of AssertionError.

The following example demonstrates the use of a pool:

frommultiprocessingimportPool
importtime

deff(x):
returnx*x

if__name__=='__main__':
withPool(processes=4)aspool:# start 4 worker processes
result=pool.apply_async(f,(10,))# evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1))# prints "100" unless your computer is *very* slow

print(pool.map(f,range(10)))# prints "[0, 1, 4,..., 81]"

it=pool.imap(f,range(10))
print(next(it))# prints "0"
print(next(it))# prints "1"
print(it.next(timeout=1))# prints "4" unless your computer is *very* slow

result=pool.apply_async(time.sleep,(10,))
print(result.get(timeout=1))# raises multiprocessing.TimeoutError

Listeners and Clients

Usually message passing between processes is done using queues or by using Connectionobjects returned by Pipe().

However, themultiprocessing.connectionmodule allows some extra flexibility. It basically gives a high level message oriented API for dealing with sockets or Windows named pipes. It also has support fordigest authenticationusing thehmacmodule, and for polling multiple connections at the same time.

multiprocessing.connection.deliver_challenge(connection,authkey)

Send a randomly generated message to the other end of the connection and wait for a reply.

If the reply matches the digest of the message usingauthkeyas the key then a welcome message is sent to the other end of the connection. Otherwise AuthenticationErroris raised.

multiprocessing.connection.answer_challenge(connection,authkey)

Receive a message, calculate the digest of the message usingauthkeyas the key, and then send the digest back.

If a welcome message is not received, then AuthenticationErroris raised.

multiprocessing.connection.Client(address[,family[,authkey]])

Attempt to set up a connection to the listener which is using address address,returning aConnection.

The type of the connection is determined byfamilyargument, but this can generally be omitted since it can usually be inferred from the format of address.(SeeAddress Formats)

Ifauthkeyis given and notNone,it should be a byte string and will be used as the secret key for an HMAC-based authentication challenge. No authentication is done ifauthkeyisNone. AuthenticationErroris raised if authentication fails. SeeAuthentication keys.

classmultiprocessing.connection.Listener([address[,family[,backlog[,authkey]]]])

A wrapper for a bound socket or Windows named pipe which is ‘listening’ for connections.

addressis the address to be used by the bound socket or named pipe of the listener object.

Note

If an address of ‘0.0.0.0’ is used, the address will not be a connectable end point on Windows. If you require a connectable end-point, you should use ‘127.0.0.1’.

familyis the type of socket (or named pipe) to use. This can be one of the strings'AF_INET'(for a TCP socket),'AF_UNIX'(for a Unix domain socket) or'AF_PIPE'(for a Windows named pipe). Of these only the first is guaranteed to be available. IffamilyisNonethen the family is inferred from the format ofaddress.Ifaddressis also Nonethen a default is chosen. This default is the family which is assumed to be the fastest available. See Address Formats.Note that iffamilyis 'AF_UNIX'and address isNonethen the socket will be created in a private temporary directory created usingtempfile.mkstemp().

If the listener object uses a socket thenbacklog(1 by default) is passed to thelisten()method of the socket once it has been bound.

Ifauthkeyis given and notNone,it should be a byte string and will be used as the secret key for an HMAC-based authentication challenge. No authentication is done ifauthkeyisNone. AuthenticationErroris raised if authentication fails. SeeAuthentication keys.

accept()

Accept a connection on the bound socket or named pipe of the listener object and return aConnectionobject. If authentication is attempted and fails, then AuthenticationErroris raised.

close()

Close the bound socket or named pipe of the listener object. This is called automatically when the listener is garbage collected. However it is advisable to call it explicitly.

Listener objects have the following read-only properties:

address

The address which is being used by the Listener object.

last_accepted

The address from which the last accepted connection came. If this is unavailable then it isNone.

Changed in version 3.3:Listener objects now support the context management protocol – see Context Manager Types.__enter__()returns the listener object, and__exit__()callsclose().

multiprocessing.connection.wait(object_list,timeout=None)

Wait till an object inobject_listis ready. Returns the list of those objects inobject_listwhich are ready. Iftimeoutis a float then the call blocks for at most that many seconds. If timeoutisNonethen it will block for an unlimited period. A negative timeout is equivalent to a zero timeout.

For both POSIX and Windows, an object can appear inobject_listif it is

A connection or socket object is ready when there is data available to be read from it, or the other end has been closed.

POSIX:wait(object_list,timeout)almost equivalent select.select(object_list,[],[],timeout).The difference is that, ifselect.select()is interrupted by a signal, it can raiseOSErrorwith an error number ofEINTR,whereas wait()will not.

Windows:An item inobject_listmust either be an integer handle which is waitable (according to the definition used by the documentation of the Win32 functionWaitForMultipleObjects()) or it can be an object with afileno()method which returns a socket handle or pipe handle. (Note that pipe handles and socket handles arenotwaitable handles.)

Added in version 3.3.

Examples

The following server code creates a listener which uses'secretpassword'as an authentication key. It then waits for a connection and sends some data to the client:

frommultiprocessing.connectionimportListener
fromarrayimportarray

address=('localhost',6000)# family is deduced to be 'AF_INET'

withListener(address,authkey=b'secret password')aslistener:
withlistener.accept()asconn:
print('connection accepted from',listener.last_accepted)

conn.send([2.25,None,'junk',float])

conn.send_bytes(b'hello')

conn.send_bytes(array('i',[42,1729]))

The following code connects to the server and receives some data from the server:

frommultiprocessing.connectionimportClient
fromarrayimportarray

address=('localhost',6000)

withClient(address,authkey=b'secret password')asconn:
print(conn.recv())# => [2.25, None, 'junk', float]

print(conn.recv_bytes())# => 'hello'

arr=array('i',[0,0,0,0,0])
print(conn.recv_bytes_into(arr))# => 8
print(arr)# => array('i', [42, 1729, 0, 0, 0])

The following code useswait()to wait for messages from multiple processes at once:

frommultiprocessingimportProcess,Pipe,current_process
frommultiprocessing.connectionimportwait

deffoo(w):
foriinrange(10):
w.send((i,current_process().name))
w.close()

if__name__=='__main__':
readers=[]

foriinrange(4):
r,w=Pipe(duplex=False)
readers.append(r)
p=Process(target=foo,args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()

whilereaders:
forrinwait(readers):
try:
msg=r.recv()
exceptEOFError:
readers.remove(r)
else:
print(msg)

Address Formats

  • An'AF_INET'address is a tuple of the form(hostname,port)where hostnameis a string andportis an integer.

  • An'AF_UNIX'address is a string representing a filename on the filesystem.

  • An'AF_PIPE'address is a string of the form r'\\.\pipe\PipeName'.To useClient()to connect to a named pipe on a remote computer calledServerNameone should use an address of the formr'\\ServerName\pipe\PipeName'instead.

Note that any string beginning with two backslashes is assumed by default to be an'AF_PIPE'address rather than an'AF_UNIX'address.

Authentication keys

When one usesConnection.recv,the data received is automatically unpickled. Unfortunately unpickling data from an untrusted source is a security risk. ThereforeListenerandClient()use thehmacmodule to provide digest authentication.

An authentication key is a byte string which can be thought of as a password: once a connection is established both ends will demand proof that the other knows the authentication key. (Demonstrating that both ends are using the same key doesnotinvolve sending the key over the connection.)

If authentication is requested but no authentication key is specified then the return value ofcurrent_process().authkeyis used (see Process). This value will be automatically inherited by anyProcessobject that the current process creates. This means that (by default) all processes of a multi-process program will share a single authentication key which can be used when setting up connections between themselves.

Suitable authentication keys can also be generated by usingos.urandom().

Logging

Some support for logging is available. Note, however, that thelogging package does not use process shared locks so it is possible (depending on the handler type) for messages from different processes to get mixed up.

multiprocessing.get_logger()

Returns the logger used bymultiprocessing.If necessary, a new one will be created.

When first created the logger has levellogging.NOTSETand no default handler. Messages sent to this logger will not by default propagate to the root logger.

Note that on Windows child processes will only inherit the level of the parent process’s logger – any other customization of the logger will not be inherited.

multiprocessing.log_to_stderr(level=None)

This function performs a call toget_logger()but in addition to returning the logger created by get_logger, it adds a handler which sends output tosys.stderrusing format '[%(levelname)s/%(processName)s]%(message)s'. You can modifylevelnameof the logger by passing alevelargument.

Below is an example session with logging turned on:

>>>importmultiprocessing,logging
>>>logger=multiprocessing.log_to_stderr()
>>>logger.setLevel(logging.INFO)
>>>logger.warning('doomed')
[WARNING/MainProcess] doomed
>>>m=multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>>delm
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

For a full table of logging levels, see theloggingmodule.

Themultiprocessing.dummymodule

multiprocessing.dummyreplicates the API ofmultiprocessingbut is no more than a wrapper around thethreadingmodule.

In particular, thePoolfunction provided bymultiprocessing.dummy returns an instance ofThreadPool,which is a subclass of Poolthat supports all the same method calls but uses a pool of worker threads rather than worker processes.

classmultiprocessing.pool.ThreadPool([processes[,initializer[,initargs]]])

A thread pool object which controls a pool of worker threads to which jobs can be submitted.ThreadPoolinstances are fully interface compatible withPoolinstances, and their resources must also be properly managed, either by using the pool as a context manager or by callingclose()and terminate()manually.

processesis the number of worker threads to use. Ifprocessesis Nonethen the number returned byos.process_cpu_count()is used.

Ifinitializeris notNonethen each worker process will call initializer(*initargs)when it starts.

UnlikePool,maxtasksperchildandcontextcannot be provided.

Note

AThreadPoolshares the same interface asPool,which is designed around a pool of processes and predates the introduction of theconcurrent.futuresmodule. As such, it inherits some operations that don’t make sense for a pool backed by threads, and it has its own type for representing the status of asynchronous jobs, AsyncResult,that is not understood by any other libraries.

Users should generally prefer to use concurrent.futures.ThreadPoolExecutor,which has a simpler interface that was designed around threads from the start, and which returnsconcurrent.futures.Futureinstances that are compatible with many other libraries, includingasyncio.

Programming guidelines

There are certain guidelines and idioms which should be adhered to when using multiprocessing.

All start methods

The following applies to all start methods.

Avoid shared state

As far as possible one should try to avoid shifting large amounts of data between processes.

It is probably best to stick to using queues or pipes for communication between processes rather than using the lower level synchronization primitives.

Picklability

Ensure that the arguments to the methods of proxies are picklable.

Thread safety of proxies

Do not use a proxy object from more than one thread unless you protect it with a lock.

(There is never a problem with different processes using thesameproxy.)

Joining zombie processes

On POSIX when a process finishes but has not been joined it becomes a zombie. There should never be very many because each time a new process starts (or active_children()is called) all completed processes which have not yet been joined will be joined. Also calling a finished process’sProcess.is_alivewill join the process. Even so it is probably good practice to explicitly join all the processes that you start.

Better to inherit than pickle/unpickle

When using thespawnorforkserverstart methods many types frommultiprocessingneed to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.

Avoid terminating processes

Using theProcess.terminate method to stop a process is liable to cause any shared resources (such as locks, semaphores, pipes and queues) currently being used by the process to become broken or unavailable to other processes.

Therefore it is probably best to only consider using Process.terminateon processes which never use any shared resources.

Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

An example which will deadlock is the following:

frommultiprocessingimportProcess,Queue

deff(q):
q.put('X'*1000000)

if__name__=='__main__':
queue=Queue()
p=Process(target=f,args=(queue,))
p.start()
p.join()# this deadlocks
obj=queue.get()

A fix here would be to swap the last two lines (or simply remove the p.join()line).

Explicitly pass resources to child processes

On POSIX using theforkstart method, a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.

Apart from making the code (potentially) compatible with Windows and the other start methods this also ensures that as long as the child process is still alive the object will not be garbage collected in the parent process. This might be important if some resource is freed when the object is garbage collected in the parent process.

So for instance

frommultiprocessingimportProcess,Lock

deff():
...dosomethingusing"lock"...

if__name__=='__main__':
lock=Lock()
foriinrange(10):
Process(target=f).start()

should be rewritten as

frommultiprocessingimportProcess,Lock

deff(l):
...dosomethingusing"l"...

if__name__=='__main__':
lock=Lock()
foriinrange(10):
Process(target=f,args=(lock,)).start()

Beware of replacingsys.stdinwith a “file like object”

multiprocessingoriginally unconditionally called:

os.close(sys.stdin.fileno())

in themultiprocessing.Process._bootstrap()method — this resulted in issues with processes-in-processes. This has been changed to:

sys.stdin.close()
sys.stdin=open(os.open(os.devnull,os.O_RDONLY),closefd=False)

Which solves the fundamental issue of processes colliding with each other resulting in a bad file descriptor error, but introduces a potential danger to applications which replacesys.stdin()with a “file-like object” with output buffering. This danger is that if multiple processes call close()on this file-like object, it could result in the same data being flushed to the object multiple times, resulting in corruption.

If you write a file-like object and implement your own caching, you can make it fork-safe by storing the pid whenever you append to the cache, and discarding the cache when the pid changes. For example:

@property
defcache(self):
pid=os.getpid()
ifpid!=self._pid:
self._pid=pid
self._cache=[]
returnself._cache

For more information, seebpo-5155,bpo-5313andbpo-5331

Thespawnandforkserverstart methods

There are a few extra restrictions which don’t apply to thefork start method.

More picklability

Ensure that all arguments toProcess.__init__()are picklable. Also, if you subclassProcessthen make sure that instances will be picklable when theProcess.startmethod is called.

Global variables

Bear in mind that if code run in a child process tries to access a global variable, then the value it sees (if any) may not be the same as the value in the parent process at the time thatProcess.startwas called.

However, global variables which are just module level constants cause no problems.

Safe importing of main module

Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such as starting a new process).

For example, using thespawnorforkserverstart method running the following module would fail with a RuntimeError:

frommultiprocessingimportProcess

deffoo():
print('hello')

p=Process(target=foo)
p.start()

Instead one should protect the “entry point” of the program by usingif __name__=='__main__':as follows:

frommultiprocessingimportProcess,freeze_support,set_start_method

deffoo():
print('hello')

if__name__=='__main__':
freeze_support()
set_start_method('spawn')
p=Process(target=foo)
p.start()

(Thefreeze_support()line can be omitted if the program will be run normally instead of frozen.)

This allows the newly spawned Python interpreter to safely import the module and then run the module’sfoo()function.

Similar restrictions apply if a pool or manager is created in the main module.

Examples

Demonstration of how to create and use customized managers and proxies:

frommultiprocessingimportfreeze_support
frommultiprocessing.managersimportBaseManager,BaseProxy
importoperator

##

classFoo:
deff(self):
print('you called Foo.f()')
defg(self):
print('you called Foo.g()')
def_h(self):
print('you called Foo._h()')

# A simple generator function
defbaz():
foriinrange(10):
yieldi*i

# Proxy type for generator objects
classGeneratorProxy(BaseProxy):
_exposed_=['__next__']
def__iter__(self):
returnself
def__next__(self):
returnself._callmethod('__next__')

# Function to return the operator module
defget_operator_module():
returnoperator

##

classMyManager(BaseManager):
pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1',Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2',Foo,exposed=('g','_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz',baz,proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator',get_operator_module)

##

deftest():
manager=MyManager()
manager.start()

print('-'*20)

f1=manager.Foo1()
f1.f()
f1.g()
assertnothasattr(f1,'_h')
assertsorted(f1._exposed_)==sorted(['f','g'])

print('-'*20)

f2=manager.Foo2()
f2.g()
f2._h()
assertnothasattr(f2,'f')
assertsorted(f2._exposed_)==sorted(['g','_h'])

print('-'*20)

it=manager.baz()
foriinit:
print('<%d>'%i,end=' ')
print()

print('-'*20)

op=manager.operator()
print('op.add(23, 45) =',op.add(23,45))
print('op.pow(2, 94) =',op.pow(2,94))
print('op._exposed_ =',op._exposed_)

##

if__name__=='__main__':
freeze_support()
test()

UsingPool:

importmultiprocessing
importtime
importrandom
importsys

#
# Functions used by test code
#

defcalculate(func,args):
result=func(*args)
return'%ssays that%s%s=%s'%(
multiprocessing.current_process().name,
func.__name__,args,result
)

defcalculatestar(args):
returncalculate(*args)

defmul(a,b):
time.sleep(0.5*random.random())
returna*b

defplus(a,b):
time.sleep(0.5*random.random())
returna+b

deff(x):
return1.0/(x-5.0)

defpow3(x):
returnx**3

defnoop(x):
pass

#
# Test code
#

deftest():
PROCESSES=4
print('Creating pool with%dprocesses\n'%PROCESSES)

withmultiprocessing.Pool(PROCESSES)aspool:
#
# Tests
#

TASKS=[(mul,(i,7))foriinrange(10)]+\
[(plus,(i,8))foriinrange(10)]

results=[pool.apply_async(calculate,t)fortinTASKS]
imap_it=pool.imap(calculatestar,TASKS)
imap_unordered_it=pool.imap_unordered(calculatestar,TASKS)

print('Ordered results using pool.apply_async():')
forrinresults:
print('\t',r.get())
print()

print('Ordered results using pool.imap():')
forxinimap_it:
print('\t',x)
print()

print('Unordered results using pool.imap_unordered():')
forxinimap_unordered_it:
print('\t',x)
print()

print('Ordered results using pool.map() --- will block till complete:')
forxinpool.map(calculatestar,TASKS):
print('\t',x)
print()

#
# Test error handling
#

print('Testing error handling:')

try:
print(pool.apply(f,(5,)))
exceptZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raiseAssertionError('expected ZeroDivisionError')

try:
print(pool.map(f,list(range(10))))
exceptZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raiseAssertionError('expected ZeroDivisionError')

try:
print(list(pool.imap(f,list(range(10)))))
exceptZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raiseAssertionError('expected ZeroDivisionError')

it=pool.imap(f,list(range(10)))
foriinrange(10):
try:
x=next(it)
exceptZeroDivisionError:
ifi==5:
pass
exceptStopIteration:
break
else:
ifi==5:
raiseAssertionError('expected ZeroDivisionError')

asserti==9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()

#
# Testing timeouts
#

print('Testing ApplyResult.get() with timeout:',end=' ')
res=pool.apply_async(calculate,TASKS[0])
while1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s'%res.get(0.02))
break
exceptmultiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()

print('Testing IMapIterator.next() with timeout:',end=' ')
it=pool.imap(calculatestar,TASKS)
while1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s'%it.next(0.02))
exceptStopIteration:
break
exceptmultiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()


if__name__=='__main__':
multiprocessing.freeze_support()
test()

An example showing how to use queues to feed tasks to a collection of worker processes and collect the results:

importtime
importrandom

frommultiprocessingimportProcess,Queue,current_process,freeze_support

#
# Function run by worker processes
#

defworker(input,output):
forfunc,argsiniter(input.get,'STOP'):
result=calculate(func,args)
output.put(result)

#
# Function used to calculate result
#

defcalculate(func,args):
result=func(*args)
return'%ssays that%s%s=%s'%\
(current_process().name,func.__name__,args,result)

#
# Functions referenced by tasks
#

defmul(a,b):
time.sleep(0.5*random.random())
returna*b

defplus(a,b):
time.sleep(0.5*random.random())
returna+b

#
#
#

deftest():
NUMBER_OF_PROCESSES=4
TASKS1=[(mul,(i,7))foriinrange(20)]
TASKS2=[(plus,(i,8))foriinrange(10)]

# Create queues
task_queue=Queue()
done_queue=Queue()

# Submit tasks
fortaskinTASKS1:
task_queue.put(task)

# Start worker processes
foriinrange(NUMBER_OF_PROCESSES):
Process(target=worker,args=(task_queue,done_queue)).start()

# Get and print results
print('Unordered results:')
foriinrange(len(TASKS1)):
print('\t',done_queue.get())

# Add more tasks using `put()`
fortaskinTASKS2:
task_queue.put(task)

# Get and print some more results
foriinrange(len(TASKS2)):
print('\t',done_queue.get())

# Tell child processes to stop
foriinrange(NUMBER_OF_PROCESSES):
task_queue.put('STOP')


if__name__=='__main__':
freeze_support()
test()