Very useful thanks. I have some questions about logging in prefect2:
can I use the standard logging.getLogger() instead of get_run_logger()? This seems to work via the root log configuration. only difference seems to be it does not have task/flow metadata but is it likely to cause other issues?
can i do log=logging.getLogger() at the top of the file to define a log for all tasks? This won’t work with get_run_logger as it may not be in a task/flow when imported. It does seem to work so far but I am not sure if it will be a problem sometimes when multiprocessing.
Yes, you can! But this is considered an extra logger, so you would need to add it to your prefect configuration e.g., as an environment variable. Check this usage example:
Yes, exactly as shown in the example. This is true only for the extra logger, though. Defining Prefect logger globally wouldn’t work - see this topic:
this all works fine for sequentialtaskrunner. However it does not seem to format the output for dask or ray. Output I get and test code is below. What am I missing?
[I know there is a known issue which prevents it showing on orion but here I am talking about console output.]
sequential
11:15:29.331 | WARNING | Task run 'testtask-3dde1f43-0' - warning hi
11:15:29.333 | WARNING | root - warning std root log
11:15:29.335 | WARNING | test1 - warning extra log
dask
warning hi
warning std root log
warning extra log
ray
(begin_task_run pid=6347) warning hi
(begin_task_run pid=6347) warning std root log
(begin_task_run pid=6347) warning extra log
import os
os.environ.update(
PREFECT_ORION_DATABASE_CONNECTION_TIMEOUT="60.0",
PREFECT_LOGGING_EXTRA_LOGGERS="test1",
PREFECT_API_URL="http://127.0.0.1:4200/api",
)
from prefect.flows import flow
from prefect.tasks import task
from prefect_dask.task_runners import DaskTaskRunner as runner
# from prefect.task_runners import SequentialTaskRunner as runner
# from prefect_ray.task_runners import RayTaskRunner as runner
from prefect import get_run_logger
import logging
@flow(task_runner=runner())
def testflow():
testtask()
@task
def testtask():
log = get_run_logger()
log.warning("warning hi")
log.debug("debug hi")
log = logging.getLogger()
log.warning("warning std root log")
log.debug("debug std root log")
log = logging.getLogger("test1")
log.warning("warning extra log")
log.debug("debug extra log")
if __name__ == "__main__":
testflow()
At the moment, it seems that there’s a bug(?) with dask that only allows workers’ log configurations to be configured only through a ~/.config/dask/distributed.yaml file.
import dask
from prefect import flow, task, get_run_logger
from prefect_dask import DaskTaskRunner
@task
def lazy_exponent(args):
logger = get_run_logger()
x, y = args
result = x**y
# the logging call to keep tabs on the computation
logger.warning(f"Computed exponent {x}^{y} = {result}")
return result
@flow(task_runner=DaskTaskRunner())
def test_flow():
inputs = [[1, 2], [3, 4]]
results = []
for i in inputs:
results.append(lazy_exponent(i))
return results
test_flow().result()
Should output:
17:18:53.170 | INFO | prefect.engine - Created flow run 'encouraging-frog' for flow 'test-flow'
17:18:53.171 | INFO | Flow run 'encouraging-frog' - Using task runner 'DaskTaskRunner'
17:18:53.173 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
17:18:53.653 | WARNING | bokeh.server.util - Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
17:18:55.548 | INFO | prefect.task_runner.dask - The Dask dashboard is available at http://127.0.0.1:8787/status
17:18:55.946 | INFO | Flow run 'encouraging-frog' - Created task run 'lazy_exponent-fddbc240-0' for task 'lazy_exponent'
17:18:56.272 | INFO | Flow run 'encouraging-frog' - Created task run 'lazy_exponent-fddbc240-1' for task 'lazy_exponent'
DASK ADMIN 17:18:58 __ WARNING __ prefect.task_runs __ Computed exponent 1^2 = 1
WARNING:prefect.task_runs:Computed exponent 1^2 = 1
DASK ADMIN 17:18:58 __ WARNING __ prefect.task_runs __ Computed exponent 3^4 = 81
WARNING:prefect.task_runs:Computed exponent 3^4 = 81
DASK ADMIN 17:18:58 __ INFO __ prefect.task_runs __ Finished in state Completed()
INFO:prefect.task_runs:Finished in state Completed()
DASK ADMIN 17:18:58 __ INFO __ prefect.task_runs __ Finished in state Completed()
INFO:prefect.task_runs:Finished in state Completed()
17:19:00.407 | INFO | Flow run 'encouraging-frog' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=1, task_run_id=897ebb3b-9c28-4ff5-9eff-10e1d61c2af9),
Completed(message=None, type=COMPLETED, result=81, task_run_id=42ead4a3-d1e2-4c5c-b822-1e31cc35ad4a)]