At the moment, it seems that there’s a bug(?) with dask that only allows workers’ log configurations to be configured only through a ~/.config/dask/distributed.yaml file.
So you can populate that file like this:
logging:
version: 1
admin:
log-format: '%(name)s - %(levelname)s - %(message)s'
formatters:
custom:
format: "DASK ADMIN %(asctime)s __ %(levelname)-7s __ %(name)s __ %(message)s"
datefmt: "%H:%M:%S"
handlers:
console:
formatter: custom
class: logging.StreamHandler
level: INFO
loggers:
prefect.task_runs:
level: INFO
handlers:
- console
And this:
import dask
from prefect import flow, task, get_run_logger
from prefect_dask import DaskTaskRunner
@task
def lazy_exponent(args):
logger = get_run_logger()
x, y = args
result = x**y
# the logging call to keep tabs on the computation
logger.warning(f"Computed exponent {x}^{y} = {result}")
return result
@flow(task_runner=DaskTaskRunner())
def test_flow():
inputs = [[1, 2], [3, 4]]
results = []
for i in inputs:
results.append(lazy_exponent(i))
return results
test_flow().result()
Should output:
17:18:53.170 | INFO | prefect.engine - Created flow run 'encouraging-frog' for flow 'test-flow'
17:18:53.171 | INFO | Flow run 'encouraging-frog' - Using task runner 'DaskTaskRunner'
17:18:53.173 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
17:18:53.653 | WARNING | bokeh.server.util - Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
17:18:55.548 | INFO | prefect.task_runner.dask - The Dask dashboard is available at http://127.0.0.1:8787/status
17:18:55.946 | INFO | Flow run 'encouraging-frog' - Created task run 'lazy_exponent-fddbc240-0' for task 'lazy_exponent'
17:18:56.272 | INFO | Flow run 'encouraging-frog' - Created task run 'lazy_exponent-fddbc240-1' for task 'lazy_exponent'
DASK ADMIN 17:18:58 __ WARNING __ prefect.task_runs __ Computed exponent 1^2 = 1
WARNING:prefect.task_runs:Computed exponent 1^2 = 1
DASK ADMIN 17:18:58 __ WARNING __ prefect.task_runs __ Computed exponent 3^4 = 81
WARNING:prefect.task_runs:Computed exponent 3^4 = 81
DASK ADMIN 17:18:58 __ INFO __ prefect.task_runs __ Finished in state Completed()
INFO:prefect.task_runs:Finished in state Completed()
DASK ADMIN 17:18:58 __ INFO __ prefect.task_runs __ Finished in state Completed()
INFO:prefect.task_runs:Finished in state Completed()
17:19:00.407 | INFO | Flow run 'encouraging-frog' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=1, task_run_id=897ebb3b-9c28-4ff5-9eff-10e1d61c2af9),
Completed(message=None, type=COMPLETED, result=81, task_run_id=42ead4a3-d1e2-4c5c-b822-1e31cc35ad4a)]