How to use logging in Prefect - a tutorial by Andrew Brookins

This blog post includes:

  • logging in tasks and flows
  • filtering logs in the UI
  • setting extra loggers using environment variable PREFECT_LOGGING_EXTRA_LOGGERS
  • formatting logs
  • custom log configuration via a YAML file

https://www.prefect.io/blog/logs-the-prefect-way/

1 Like

Very useful thanks. I have some questions about logging in prefect2:

  • can I use the standard logging.getLogger() instead of get_run_logger()? This seems to work via the root log configuration. only difference seems to be it does not have task/flow metadata but is it likely to cause other issues?
  • can i do log=logging.getLogger() at the top of the file to define a log for all tasks? This won’t work with get_run_logger as it may not be in a task/flow when imported. It does seem to work so far but I am not sure if it will be a problem sometimes when multiprocessing.
1 Like

Yes, you can! But this is considered an extra logger, so you would need to add it to your prefect configuration e.g., as an environment variable. Check this usage example:

Yes, exactly as shown in the example. This is true only for the extra logger, though. Defining Prefect logger globally wouldn’t work - see this topic:

2 Likes

this all works fine for sequentialtaskrunner. However it does not seem to format the output for dask or ray. Output I get and test code is below. What am I missing?

[I know there is a known issue which prevents it showing on orion but here I am talking about console output.]

sequential

11:15:29.331 | WARNING | Task run 'testtask-3dde1f43-0' - warning hi
11:15:29.333 | WARNING | root - warning std root log
11:15:29.335 | WARNING | test1 - warning extra log

dask

warning hi
warning std root log
warning extra log

ray

(begin_task_run pid=6347) warning hi
(begin_task_run pid=6347) warning std root log
(begin_task_run pid=6347) warning extra log
import os

os.environ.update(
    PREFECT_ORION_DATABASE_CONNECTION_TIMEOUT="60.0",
    PREFECT_LOGGING_EXTRA_LOGGERS="test1",
    PREFECT_API_URL="http://127.0.0.1:4200/api",
)

from prefect.flows import flow
from prefect.tasks import task

from prefect_dask.task_runners import DaskTaskRunner as runner

# from prefect.task_runners import SequentialTaskRunner as runner
# from prefect_ray.task_runners import RayTaskRunner as runner
from prefect import get_run_logger
import logging


@flow(task_runner=runner())
def testflow():
    testtask()


@task
def testtask():
    log = get_run_logger()
    log.warning("warning hi")
    log.debug("debug hi")

    log = logging.getLogger()
    log.warning("warning std root log")
    log.debug("debug std root log")

    log = logging.getLogger("test1")
    log.warning("warning extra log")
    log.debug("debug extra log")


if __name__ == "__main__":
    testflow()

1 Like

Thanks for sharing the example! I created an issue here:

At the moment, it seems that there’s a bug(?) with dask that only allows workers’ log configurations to be configured only through a ~/.config/dask/distributed.yaml file.

So you can populate that file like this:

logging:
  version: 1
  admin:
    log-format: '%(name)s - %(levelname)s - %(message)s'
  formatters:
    custom:
      format: "DASK ADMIN %(asctime)s __ %(levelname)-7s __ %(name)s __ %(message)s"
      datefmt: "%H:%M:%S"
  handlers:
    console:
      formatter: custom
      class: logging.StreamHandler
      level: INFO
  loggers:
    prefect.task_runs:
      level: INFO
      handlers:
        - console

And this:

import dask
from prefect import flow, task, get_run_logger
from prefect_dask import DaskTaskRunner


@task
def lazy_exponent(args):
    logger = get_run_logger()
    x, y = args
    result = x**y
    # the logging call to keep tabs on the computation
    logger.warning(f"Computed exponent {x}^{y} = {result}")
    return result


@flow(task_runner=DaskTaskRunner())
def test_flow():
    inputs = [[1, 2], [3, 4]]
    results = []
    for i in inputs:
        results.append(lazy_exponent(i))
    return results

test_flow().result()

Should output:

17:18:53.170 | INFO    | prefect.engine - Created flow run 'encouraging-frog' for flow 'test-flow'
17:18:53.171 | INFO    | Flow run 'encouraging-frog' - Using task runner 'DaskTaskRunner'
17:18:53.173 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
17:18:53.653 | WARNING | bokeh.server.util - Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
17:18:55.548 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://127.0.0.1:8787/status
17:18:55.946 | INFO    | Flow run 'encouraging-frog' - Created task run 'lazy_exponent-fddbc240-0' for task 'lazy_exponent'
17:18:56.272 | INFO    | Flow run 'encouraging-frog' - Created task run 'lazy_exponent-fddbc240-1' for task 'lazy_exponent'
DASK ADMIN 17:18:58 __ WARNING __ prefect.task_runs __ Computed exponent 1^2 = 1
WARNING:prefect.task_runs:Computed exponent 1^2 = 1
DASK ADMIN 17:18:58 __ WARNING __ prefect.task_runs __ Computed exponent 3^4 = 81
WARNING:prefect.task_runs:Computed exponent 3^4 = 81
DASK ADMIN 17:18:58 __ INFO    __ prefect.task_runs __ Finished in state Completed()
INFO:prefect.task_runs:Finished in state Completed()
DASK ADMIN 17:18:58 __ INFO    __ prefect.task_runs __ Finished in state Completed()
INFO:prefect.task_runs:Finished in state Completed()
17:19:00.407 | INFO    | Flow run 'encouraging-frog' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=1, task_run_id=897ebb3b-9c28-4ff5-9eff-10e1d61c2af9),
 Completed(message=None, type=COMPLETED, result=81, task_run_id=42ead4a3-d1e2-4c5c-b822-1e31cc35ad4a)]
1 Like

This partly works. I read the logging.yml file in and put it under a “logging” key and saved it in the dask config folder. It formats the logs correctly. However the “extra” loggers are at level WARNING even though I set the level as INFO which works in a sequential runner.

I tried DaskTaskRunner(cluster_kwargs=dict(env=dict(PREFECT_LOGGING_EXTRA_LOGGERS=“mylog”))

I believe you would need to configure your extra logger level on the extra logger itself. Afaik the log level set with PREFECT_LOGGING_LEVEL is only for the Prefect logger

Yes I set the extra logger as info. It show as info on sequential runner but not in dask

Ahh that must be somehow configured on Dask probably. I don’t know how though. Keep us posted if you find out more

OK. I have now done that using code below. It now handles logs as expected to both console and orion. However I suggest this should be done by Prefect2. Also something similar will be needed for Ray which also does not handle extra loggers.

def setup_dask_logging():
    # read log settings
    with open(os.environ["PREFECT_LOGGING_SETTINGS_PATH"]) as f:
        logset = f.read()
    for x in set(re.findall("\${.*}", logset)):
        logset = logset.replace(x, os.environ.get(x[2:-1], "INFO"))
    logset = yaml.safe_load(logset)

    # set extra loggers
    extra_settings = logset["loggers"]["prefect.extra"].copy()
    extras = [
        x.strip()
        for x in os.environ.get("PREFECT_LOGGING_EXTRA_LOGGERS", "").split(",")
    ]
    for extra in extras:
        logset["loggers"][extra] = extra_settings

    # TODO update rather than overwrite?
    # save and copy to dask location
    HOME = os.path.expanduser("~")
    PREFECTX = os.path.abspath(os.path.dirname(__file__))
    DASK_SRC = f"{PREFECTX}/logging_daskdistributed.yaml"
    DASK_TGT = f"{HOME}/.config/dask/distributed.yaml"
    with open(DASK_SRC, "w") as f:
        f.write(yaml.dump(dict(logging=logset)))
    os.makedirs(os.path.dirname(DASK_TGT), exist_ok=True)
    shutil.copy(DASK_SRC, DASK_TGT)
2 Likes

Wow, this looks complicated and definitely too much boilerplate if you would have to do that in every flow. Let me open an issue

1 Like