代码区项目交易流程

Causal logging for distributed scientific computing with Eliot and Dask


If you’re using the Dask distributed computing framework, you can automatically use Eliot to trace computations across multiple processes or even machines. This is mostly useful for Dask’s Bag and Delayed support, but can also be used with arrays and dataframes.

In order to do this you will need to:

Ensure all worker processes write the Eliot logs to disk (if you’re using the multiprocessing or distributed backends). If you’re using multiple worker machines, aggregate all log files into a single place, so you can more easily analyze them with e.g. eliot-tree . Replace dask.compute() with eliot.dask.compute_with_trace() .

In the following example, you can see how this works for a Dask run using distributed , the recommended Dask scheduler. We’ll be using multiple worker processes, but only use a single machine:

from os import getpid from dask.bag import from_sequence import dask.config from dask.distributed import Client from eliot import log_call, to_file from eliot.dask import compute_with_trace @log_call def multiply(x, y=7): return x * y @log_call def add(x, y): return x + y @log_call def main_computation(): bag = from_sequence([1, 2, 3]) bag = bag.map(multiply).fold(add) return compute_with_trace(bag)[0] # instead of dask.compute(bag) def _start_logging(): # Name log file based on PID, so different processes so stomp on each # others' logfiles: to_file(open("{}.log".format(getpid()), "a")) def main(): # Setup logging on the main process: _start_logging() # Start three worker processes on the local machine: client = Client(n_workers=3, threads_per_worker=1) # Setup Eliot logging on each worker process: client.run(_start_logging) # Run the Dask computation in the worker processes: result = main_computation() print("Result:", result) if __name__ == '__main__': import dask_eliot dask_eliot.main()

In the output you can see how the various Dask tasks depend on each other, and the full trace of the computation:

$ python examples/dask_eliot.py Result: 42 $ ls *.log 7254.log 7269.log 7271.log 7273.log $ eliot-tree *.log ca126b8a-c611-447e-aaa7-f61701e2a371 └── main_computation/1 started 2019-01-01 17:27:13 0.047s ├── dask:compute/2/1 started 2019-01-01 17:27:13 0.029s │ ├── eliot:remote_task/2/8/1 started 2019-01-01 17:27:13 0.001s │ │ ├── dask:task/2/8/2 2019-01-01 17:27:13 │ │ │ ├── dependencies: │ │ │ │ └── 0: map-multiply-75feec3a197bf253863e330f3483d3ac-0 │ │ │ └── key: reduce-part-71950de8264334e8cea3cc79d1c2e639-0 │ │ ├── multiply/2/8/3/1 started 2019-01-01 17:27:13 0.000s │ │ │ ├── x: 1 │ │ │ ├── y: 7 │ │ │ └── multiply/2/8/3/2 succeeded 2019-01-01 17:27:13 │ │ │ └── result: 7 │ │ └── eliot:remote_task/2/8/4 succeeded 2019-01-01 17:27:13 │ ├── eliot:remote_task/2/9/1 started 2019-01-01 17:27:13 0.001s │ │ ├── dask:task/2/9/2 2019-01-01 17:27:13 │ │ │ ├── dependencies: │ │ │ │ └── 0: map-multiply-75feec3a197bf253863e330f3483d3ac-1 │ │ │ └── key: reduce-part-71950de8264334e8cea3cc79d1c2e639-1 │ │ ├── multiply/2/9/3/1 started 2019-01-01 17:27:13 0.000s │ │ │ ├── x: 2 │ │ │ ├── y: 7 │ │ │ └── multiply/2/9/3/2 succeeded 2019-01-01 17:27:13 │ │ │ └── result: 14 │ │ └── eliot:remote_task/2/9/4 succeeded 2019-01-01 17:27:13 │ ├── eliot:remote_task/2/10/1 started 2019-01-01 17:27:13 0.001s │ │ ├── dask:task/2/10/2 2019-01-01 17:27:13 │ │ │ ├── dependencies: │ │ │ │ └── 0: map-multiply-75feec3a197bf253863e330f3483d3ac-2 │ │ │ └── key: reduce-part-71950de8264334e8cea3cc79d1c2e639-2 │ │ ├── multiply/2/10/3/1 started 2019-01-01 17:27:13 0.000s │ │ │ ├── x: 3 │ │ │ ├── y: 7 │ │ │ └── multiply/2/10/3/2 succeeded 2019-01-01 17:27:13 │ │ │ └── result: 21 │ │ └── eliot:remote_task/2/10/4 succeeded 2019-01-01 17:27:13 │ ├── eliot:remote_task/2/11/1 started 2019-01-01 17:27:13 0.001s │ │ ├── dask:task/2/11/2 2019-01-01 17:27:13 │ │ │ ├── dependencies: │ │ │ │ ├── 0: reduce-part-71950de8264334e8cea3cc79d1c2e639-0 │ │ │ │ ├── 1: reduce-part-71950de8264334e8cea3cc79d1c2e639-1 │ │ │ │ └── 2: reduce-part-71950de8264334e8cea3cc79d1c2e639-2 │ │ │ └── key: reduce-aggregate-71950de8264334e8cea3cc79d1c2e639 │ │ ├── add/2/11/3/1 started 2019-01-01 17:27:13 0.000s │ │ │ ├── x: 7 │ │ │ ├── y: 14 │ │ │ └── add/2/11/3/2 succeeded 2019-01-01 17:27:13 │ │ │ └── result: 21 │ │ ├── add/2/11/4/1 started 2019-01-01 17:27:13 0.000s │ │ │ ├── x: 21 │ │ │ ├── y: 21 │ │ │ └── add/2/11/4/2 succeeded 2019-01-01 17:27:13 │ │ │ └── result: 42 │ │ └── eliot:remote_task/2/11/5 succeeded 2019-01-01 17:27:13 │ └── dask:compute/2/12 succeeded 2019-01-01 17:27:13 └── main_computation/3 succeeded 2019-01-01 17:27:13 └── result: 42

Warning

Retries within Dask will result in confusing log messages; this will eventually be fixed in a future release.

本文开发(python)相关术语:python基础教程 python多线程 web开发工程师 软件开发工程师 软件开发流程

点击收藏

LAST Learning Flask: Part 2: Routes Speeding up with Cython NEXT