Prefect 2
Here is how you can create a custom function that will enable loguru logs:
from prefect import get_run_logger
def enable_loguru_support() -> None:
"""Redirect loguru logging messages to the prefect run logger.
This function should be called from within a Prefect task or flow before calling any module that uses loguru.
This function can be safely called multiple times.
Example Usage:
from prefect import flow
from loguru import logger
from prefect_utils import enable_loguru_support # import this function in your flow from your module
@flow()
def myflow():
logger.info("This is hidden from the Prefect UI")
enable_loguru_support()
logger.info("This shows up in the Prefect UI")
"""
# import here for distributed execution because loguru cannot be pickled.
from loguru import logger # pylint: disable=import-outside-toplevel
run_logger = get_run_logger()
logger.remove()
log_format = "{name}:{function}:{line} - {message}"
logger.add(
run_logger.debug,
filter=lambda record: record["level"].name == "DEBUG",
level="TRACE",
format=log_format,
)
logger.add(
run_logger.warning,
filter=lambda record: record["level"].name == "WARNING",
level="TRACE",
format=log_format,
)
logger.add(
run_logger.error,
filter=lambda record: record["level"].name == "ERROR",
level="TRACE",
format=log_format,
)
logger.add(
run_logger.critical,
filter=lambda record: record["level"].name == "CRITICAL",
level="TRACE",
format=log_format,
)
logger.add(
run_logger.info,
filter=lambda record: record["level"].name
not in ["DEBUG", "WARNING", "ERROR", "CRITICAL"],
level="TRACE",
format=log_format,
)
Usage in a flow, task and with custom functions
Gist with the same code as below:
from prefect import flow, task
from loguru import logger
from prefect_utils.loguru import enable_loguru_support
def mycustom_function():
logger.info("I'm a custom function")
@task
def mytask():
enable_loguru_support()
logger.info("This task log shows up in the Prefect UI")
@flow
def myflow():
enable_loguru_support()
logger.info("This flow log shows up in the Prefect UI")
mytask()
mycustom_function()
mytask.submit()
if __name__ == "__main__":
myflow()
The UI should show a similar output when running this locally:
Run from deployment:
Big thanks to Justin from LiveEO for contributing his solution to this.
Prefect 1
At the time of writing, there is no built-in integration to incorporate loguru
logs in your Prefect flow.
Best practices: We recommend using the Prefect logger as much as it’s possible - this way, your logs will be automatically captured in the Prefect’s backend. This topic explains it in more detail:
However, if you wish to use loguru
in your custom modules and functions, you may add Prefect’s log handlers as loguru sink
. Here is the syntax for Prefect <= 1.0 that you may try:
import prefect
from loguru import logger
from prefect import task, Flow
from prefect.utilities.logging import get_logger
def custom_non_task_function():
prefect_logger = get_logger()
stream_handler, cloud_handler = prefect_logger.handlers
logger.add(sink=stream_handler)
logger.add(sink=cloud_handler)
logger.info("Logging from loguru!")
@task(log_stdout=True)
def task_printing_stdout():
print("Logging by printing to stdout!")
@task
def logging_task():
prefect_logger = prefect.context.get("logger")
prefect_logger.info("Logging from Prefect root logger retrieved from context")
custom_non_task_function()
with Flow("loguru_flow") as flow:
hw = task_printing_stdout()
logging_task(upstream_tasks=[hw])
This results in the following output in the Prefect Cloud UI:
Using loguru with DaskExecutor
import prefect
from loguru import logger
from prefect import task, Flow
from prefect.utilities.logging import get_logger
from prefect.executors import DaskExecutor
def custom_non_task_function():
prefect_logger = get_logger()
stream_handler, cloud_handler = prefect_logger.handlers
logger.add(sink=stream_handler)
logger.add(sink=cloud_handler)
logger.info("Logging from loguru!")
@task(log_stdout=True)
def task_printing_stdout():
print("Logging by printing to stdout!")
@task
def logging_task():
prefect_logger = prefect.context.get("logger")
prefect_logger.info("Logging from Prefect root logger retrieved from context")
custom_non_task_function()
with Flow("loguru_flow", executor=DaskExecutor()) as flow:
hw = task_printing_stdout()
logging_task(upstream_tasks=[hw])
Again, the output from Prefect Cloud UI:
This behavior was tested using Prefect 0.15.13.
Note that: the logs may not be available in the UI if you are using a remote distributed Dask cluster. For more about that, see: