How can I add logs to my flow?

Prefect 2.0

Orion provides the get_run_logger function that returns a logging.Logger object. Retrieve the logger within the flow or task function and use it there.

Here is how you can use it in your tasks, flows, and subflows:

from prefect import task, get_run_logger

@task
def marvins_favorite_task():
    logger = get_run_logger()
    logger.info("Hello from the Marvin's favorite task!")
An example flow with various log levels
from prefect import flow, task, get_run_logger

@task
def foo():
    logger = get_run_logger()
    logger.info("Hello from the 'foo' task")

@task
def bar():
    logger = get_run_logger()
    logger.warning("Oh no, this is a bad task!")
    raise ValueError("Test")

@flow
def child_flow():
    bar()

@flow
def flow_with_logging():
    logger = get_run_logger()
    logger.info("Hello from the parent flow!")

    foo_future = foo()
    child_flow()

    logger.warning("Goodbye from the parent flow!")
    return foo_future

if __name__ == "__main__":
    flow_with_logging()

Prefect 1.0

To add custom logs within your tasks, you need to access the logger object from the Prefect context. Logging outside of tasks is generally not supported. Here is how it works:

import prefect

@task
def my_task():
    logger = prefect.context.get("logger")
    logger.info("An info message.")
    logger.warning("A warning message.")

Hello! I was curious to understand how the get_run_logger abstraction works when trying to unit test tasks. I’m trying to test a task using the instructions here, but I keep getting the following error when running my unit test:

E           RuntimeError: There is no active flow or task run context.

What would be the best way to work around this issue for unit testing tasks? I can see certain tests from the prefect-aws repo that one potential workaround is to create a temporary flow just for testing and running the flow instead of running the individual tasks.

Minimal reproducible example:

from prefect import task, get_run_logger

@task
def example_task():
    logger = get_run_logger()
    logger.info("This will break the test")
    return 42


def test_example_task():
    assert example_task.fn() == 42

Try adding @flow decorator here:

Thanks Anna! Marking the test function as a flow did work for the basic example task, but this requires that the test fixtures/flow arguments must be serializable using Pydantic.

For instance, I have other tests that look like this:

def test_retrieve_user(example_user: User):
    ...

where User is a model class not serializable using Pydantic. In this case, marking the test function as a flow causes Pydantic serialization issues and does not work.

In that case, I would recommend removing the logger from the functions that you would want to test and then you can test that using Pydantic models as any other functions

there are tradeoffs and you need to figure out how to best test the logic that you need to test without Prefect e.g. using plain Python functions, and then either call those from your flows and tasks, or turn those functions themselves into flows or tasks

how is logger = get_run_logger() different from logger = logging.getLogger(__name__)? does the logger produced by get_run_logger do something special with log messages?

get_run_logger() can only run from a Prefect context, i.e. from an active flow or task run

it’s special in the sense that it sends logs to the Prefect backend

1 Like