Can I use loguru logs in Prefect flows?

At the time of writing, there is no built-in integration to incorporate loguru logs in your Prefect flow.

:owl: Best practices: We recommend using the Prefect logger as much as it’s possible - this way, your logs will be automatically captured in the Prefect’s backend. This topic explains it in more detail:

However, if you wish to use loguru in your custom modules and functions, you may add Prefect’s log handlers as loguru sink. Here is the syntax for Prefect <= 1.0 that you may try:

import prefect
from loguru import logger
from prefect import task, Flow
from prefect.utilities.logging import get_logger


def custom_non_task_function():
    prefect_logger = get_logger()
    stream_handler, cloud_handler = prefect_logger.handlers
    logger.add(sink=stream_handler)
    logger.add(sink=cloud_handler)
    logger.info("Logging from loguru!")


@task(log_stdout=True)
def task_printing_stdout():
    print("Logging by printing to stdout!")


@task
def logging_task():
    prefect_logger = prefect.context.get("logger")
    prefect_logger.info("Logging from Prefect root logger retrieved from context")
    custom_non_task_function()


with Flow("loguru_flow") as flow:
    hw = task_printing_stdout()
    logging_task(upstream_tasks=[hw])

This results in the following output in the Prefect Cloud UI:

Using loguru with DaskExecutor

import prefect
from loguru import logger
from prefect import task, Flow
from prefect.utilities.logging import get_logger
from prefect.executors import DaskExecutor


def custom_non_task_function():
    prefect_logger = get_logger()
    stream_handler, cloud_handler = prefect_logger.handlers
    logger.add(sink=stream_handler)
    logger.add(sink=cloud_handler)
    logger.info("Logging from loguru!")


@task(log_stdout=True)
def task_printing_stdout():
    print("Logging by printing to stdout!")


@task
def logging_task():
    prefect_logger = prefect.context.get("logger")
    prefect_logger.info("Logging from Prefect root logger retrieved from context")
    custom_non_task_function()


with Flow("loguru_flow", executor=DaskExecutor()) as flow:
    hw = task_printing_stdout()
    logging_task(upstream_tasks=[hw])

Again, the output from Prefect Cloud UI:

This behavior was tested using Prefect 0.15.13.

:point_right: Note that: the logs may not be available in the UI if you are using a remote distributed Dask cluster. For more about that, see:

From Slack discussion

Confirmation that this approach works when using a KubeCluster