Can I use loguru logs in Prefect flows?

Prefect 2

Here is how you can create a custom function that will enable loguru logs:

from prefect import get_run_logger


def enable_loguru_support() -> None:
    """Redirect loguru logging messages to the prefect run logger.
    This function should be called from within a Prefect task or flow before calling any module that uses loguru.
    This function can be safely called multiple times.
    Example Usage:
    from prefect import flow
    from loguru import logger
    from prefect_utils import enable_loguru_support # import this function in your flow from your module
    @flow()
    def myflow():
        logger.info("This is hidden from the Prefect UI")
        enable_loguru_support()
        logger.info("This shows up in the Prefect UI")
    """
    # import here for distributed execution because loguru cannot be pickled.
    from loguru import logger  # pylint: disable=import-outside-toplevel

    run_logger = get_run_logger()
    logger.remove()
    log_format = "{name}:{function}:{line} - {message}"
    logger.add(
        run_logger.debug,
        filter=lambda record: record["level"].name == "DEBUG",
        level="TRACE",
        format=log_format,
    )
    logger.add(
        run_logger.warning,
        filter=lambda record: record["level"].name == "WARNING",
        level="TRACE",
        format=log_format,
    )
    logger.add(
        run_logger.error,
        filter=lambda record: record["level"].name == "ERROR",
        level="TRACE",
        format=log_format,
    )
    logger.add(
        run_logger.critical,
        filter=lambda record: record["level"].name == "CRITICAL",
        level="TRACE",
        format=log_format,
    )
    logger.add(
        run_logger.info,
        filter=lambda record: record["level"].name
        not in ["DEBUG", "WARNING", "ERROR", "CRITICAL"],
        level="TRACE",
        format=log_format,
    )

Usage in a flow, task and with custom functions

Gist with the same code as below:

from prefect import flow, task
from loguru import logger
from prefect_utils.loguru import enable_loguru_support


def mycustom_function():
    logger.info("I'm a custom function")


@task
def mytask():
    enable_loguru_support()
    logger.info("This task log shows up in the Prefect UI")


@flow
def myflow():
    enable_loguru_support()
    logger.info("This flow log shows up in the Prefect UI")
    mytask()
    mycustom_function()
    mytask.submit()


if __name__ == "__main__":
    myflow()

The UI should show a similar output when running this locally:

Run from deployment:

Big thanks to Justin from LiveEO for contributing his solution to this. :raised_hands:


Prefect 1

At the time of writing, there is no built-in integration to incorporate loguru logs in your Prefect flow.

:owl: Best practices: We recommend using the Prefect logger as much as it’s possible - this way, your logs will be automatically captured in the Prefect’s backend. This topic explains it in more detail:

However, if you wish to use loguru in your custom modules and functions, you may add Prefect’s log handlers as loguru sink. Here is the syntax for Prefect <= 1.0 that you may try:

import prefect
from loguru import logger
from prefect import task, Flow
from prefect.utilities.logging import get_logger


def custom_non_task_function():
    prefect_logger = get_logger()
    stream_handler, cloud_handler = prefect_logger.handlers
    logger.add(sink=stream_handler)
    logger.add(sink=cloud_handler)
    logger.info("Logging from loguru!")


@task(log_stdout=True)
def task_printing_stdout():
    print("Logging by printing to stdout!")


@task
def logging_task():
    prefect_logger = prefect.context.get("logger")
    prefect_logger.info("Logging from Prefect root logger retrieved from context")
    custom_non_task_function()


with Flow("loguru_flow") as flow:
    hw = task_printing_stdout()
    logging_task(upstream_tasks=[hw])

This results in the following output in the Prefect Cloud UI:

Using loguru with DaskExecutor

import prefect
from loguru import logger
from prefect import task, Flow
from prefect.utilities.logging import get_logger
from prefect.executors import DaskExecutor


def custom_non_task_function():
    prefect_logger = get_logger()
    stream_handler, cloud_handler = prefect_logger.handlers
    logger.add(sink=stream_handler)
    logger.add(sink=cloud_handler)
    logger.info("Logging from loguru!")


@task(log_stdout=True)
def task_printing_stdout():
    print("Logging by printing to stdout!")


@task
def logging_task():
    prefect_logger = prefect.context.get("logger")
    prefect_logger.info("Logging from Prefect root logger retrieved from context")
    custom_non_task_function()


with Flow("loguru_flow", executor=DaskExecutor()) as flow:
    hw = task_printing_stdout()
    logging_task(upstream_tasks=[hw])

Again, the output from Prefect Cloud UI:

This behavior was tested using Prefect 0.15.13.

:point_right: Note that: the logs may not be available in the UI if you are using a remote distributed Dask cluster. For more about that, see:

1 Like

From Slack discussion

Confirmation that this approach works when using a KubeCluster