Documentation related to Dask, Distributed, and related packages. Utility functions commonly used by AICS projects.
- Distributed handler to manage various debugging or cluster configurations
- Documentation on example cluster deployments
Before we jump into quick starts there are some basic definitions to understand.
A task is a single static function to be processed. Simple enough. However, relevant to
AICS, is that when usingaicsimageio
(and / ordask.array.Array
), your image (or
dask.array.Array
) is split up intomanytasks. This is dependent on the image reader
and the size of the file you are reading. But in general it is safe to assume that each
image you read is split many thousands of tasks. If you want to see how many tasks your
image is split into you can either compute:
- Psuedo-code:
sum(2 * size(channel) for channel if channel not in [ "Y", "X" ])
- Dask graph length:
len(AICSImage.dask_data.__dask_graph__())
Apply a given function to the provided iterables as used as parameters to the function.
Givenlambda x: x + 1
and[1, 2, 3]
,the result ofmap(func, *iterables)
in this
case would be[2, 3, 4]
.Usually, you are provided back an iterable offuture
objects back from amap
operation. The results from the map operation are not
guaranteed to be in the order of the iterable that went in as operations are started as
resources become available and item to item variance may result in different output
ordering.
An object that will become available but is currently not defined. There is no guarantee
that the object is a valid result or an error and you should handle errors once the
future's state has resolved (usually this means after agather
operation).
Block the process from moving forward until all futures are resolved. Control flow here would mean that you could potentially generate thousands of futures and keep moving on locally while those futures slowly resolve but if you ever want a hard stop and wait for some set of futures to complete, you would need gather them.
Dask tries to mirror the standard libraryconcurrent.futures
wherever possible which
is what allows for this library to have simple wrappers around Dask to allow for easy
debugging as we are simply swapping outdistributed.Client.map
with
concurrent.futures.ThreadPoolExecutor.map
for example. If at any point in your code
you don't want to usedask
for some reason or another, it is equally valid to use
concurrent.futures.ThreadPoolExecutor
orconcurrent.futures.ProcessPoolExecutor
.
If you have an iterable (or iterables) that would result in less than hundreds of
thousands of tasks, it you can simply use the normalmap
provided by the
DistributedHandler.client
.
Important Note:Notice, "... iterable that wouldresultin less than hundreds
of thousands of tasks... ". This is important because what happens when you try tomap
over a thousand image paths, each which spawns anAICSImage
object. Each one adds
thousands more tasks to the scheduler to complete. This will break and you should look
toLarge Iterable Batchinginstead.
fromaics_dask_utilsimportDistributedHandler
# `None` address provided means use local machine threads
withDistributedHandler(None)ashandler:
futures=handler.client.map(
lambdax:x+1,
[1,2,3]
)
results=handler.gather(futures)
fromdistributedimportLocalCluster
cluster=LocalCluster()
# Actual address provided means use the dask scheduler
withDistributedHandler(cluster.scheduler_address)ashandler:
futures=handler.client.map(
lambdax:x+1,
[1,2,3]
)
results=handler.gather(futures)
If you have an iterable (or iterables) that would result in more than hundreds of
thousands of tasks, you should usehandler.batched_map
to reduce the load on the
client. This will batch your requests rather than send than all at once.
fromaics_dask_utilsimportDistributedHandler
# `None` address provided means use local machine threads
withDistributedHandler(None)ashandler:
results=handler.batched_map(
lambdax:x+1,
range(1e9)# 1 billion
)
fromdistributedimportLocalCluster
cluster=LocalCluster()
# Actual address provided means use the dask scheduler
withDistributedHandler(cluster.scheduler_address)ashandler:
results=handler.batched_map(
lambdax:x+1,
range(1e9)# 1 billion
)
Note:Notice that there is nohandler.gather
call afterbatched_map
.This is
becausebatched_map
gathers results at the end of each batch rather than simply
returning their future's.
Stable Release:pip install aics_dask_utils
Development Head:pip install git+https://github.com/AllenCellModeling/aics_dask_utils.git
For full package documentation please visit AllenCellModeling.github.io/aics_dask_utils.
SeeCONTRIBUTING.mdfor information related to developing the code.
This README, provided tooling, and documentation are not meant to be all encompassing
of the various operations you can do withdask
and other similar computing systems.
For further reading go todask.org.
Free software: Allen Institute Software License