This blog post includes:
- logging in tasks and flows
- filtering logs in the UI
- setting extra loggers using environment variable
PREFECT_LOGGING_EXTRA_LOGGERS
- formatting logs
- custom log configuration via a YAML file
This blog post includes:
PREFECT_LOGGING_EXTRA_LOGGERS
Very useful thanks. I have some questions about logging in prefect2:
Yes, you can! But this is considered an extra logger, so you would need to add it to your prefect configuration e.g., as an environment variable. Check this usage example:
Yes, exactly as shown in the example. This is true only for the extra logger, though. Defining Prefect logger globally wouldn’t work - see this topic:
this all works fine for sequentialtaskrunner. However it does not seem to format the output for dask or ray. Output I get and test code is below. What am I missing?
[I know there is a known issue which prevents it showing on orion but here I am talking about console output.]
11:15:29.331 | WARNING | Task run 'testtask-3dde1f43-0' - warning hi
11:15:29.333 | WARNING | root - warning std root log
11:15:29.335 | WARNING | test1 - warning extra log
warning hi
warning std root log
warning extra log
(begin_task_run pid=6347) warning hi
(begin_task_run pid=6347) warning std root log
(begin_task_run pid=6347) warning extra log
import os
os.environ.update(
PREFECT_ORION_DATABASE_CONNECTION_TIMEOUT="60.0",
PREFECT_LOGGING_EXTRA_LOGGERS="test1",
PREFECT_API_URL="http://127.0.0.1:4200/api",
)
from prefect.flows import flow
from prefect.tasks import task
from prefect_dask.task_runners import DaskTaskRunner as runner
# from prefect.task_runners import SequentialTaskRunner as runner
# from prefect_ray.task_runners import RayTaskRunner as runner
from prefect import get_run_logger
import logging
@flow(task_runner=runner())
def testflow():
testtask()
@task
def testtask():
log = get_run_logger()
log.warning("warning hi")
log.debug("debug hi")
log = logging.getLogger()
log.warning("warning std root log")
log.debug("debug std root log")
log = logging.getLogger("test1")
log.warning("warning extra log")
log.debug("debug extra log")
if __name__ == "__main__":
testflow()
Thanks for sharing the example! I created an issue here:
At the moment, it seems that there’s a bug(?) with dask that only allows workers’ log configurations to be configured only through a ~/.config/dask/distributed.yaml file.
So you can populate that file like this:
logging:
version: 1
admin:
log-format: '%(name)s - %(levelname)s - %(message)s'
formatters:
custom:
format: "DASK ADMIN %(asctime)s __ %(levelname)-7s __ %(name)s __ %(message)s"
datefmt: "%H:%M:%S"
handlers:
console:
formatter: custom
class: logging.StreamHandler
level: INFO
loggers:
prefect.task_runs:
level: INFO
handlers:
- console
And this:
import dask
from prefect import flow, task, get_run_logger
from prefect_dask import DaskTaskRunner
@task
def lazy_exponent(args):
logger = get_run_logger()
x, y = args
result = x**y
# the logging call to keep tabs on the computation
logger.warning(f"Computed exponent {x}^{y} = {result}")
return result
@flow(task_runner=DaskTaskRunner())
def test_flow():
inputs = [[1, 2], [3, 4]]
results = []
for i in inputs:
results.append(lazy_exponent(i))
return results
test_flow().result()
Should output:
17:18:53.170 | INFO | prefect.engine - Created flow run 'encouraging-frog' for flow 'test-flow'
17:18:53.171 | INFO | Flow run 'encouraging-frog' - Using task runner 'DaskTaskRunner'
17:18:53.173 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
17:18:53.653 | WARNING | bokeh.server.util - Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
17:18:55.548 | INFO | prefect.task_runner.dask - The Dask dashboard is available at http://127.0.0.1:8787/status
17:18:55.946 | INFO | Flow run 'encouraging-frog' - Created task run 'lazy_exponent-fddbc240-0' for task 'lazy_exponent'
17:18:56.272 | INFO | Flow run 'encouraging-frog' - Created task run 'lazy_exponent-fddbc240-1' for task 'lazy_exponent'
DASK ADMIN 17:18:58 __ WARNING __ prefect.task_runs __ Computed exponent 1^2 = 1
WARNING:prefect.task_runs:Computed exponent 1^2 = 1
DASK ADMIN 17:18:58 __ WARNING __ prefect.task_runs __ Computed exponent 3^4 = 81
WARNING:prefect.task_runs:Computed exponent 3^4 = 81
DASK ADMIN 17:18:58 __ INFO __ prefect.task_runs __ Finished in state Completed()
INFO:prefect.task_runs:Finished in state Completed()
DASK ADMIN 17:18:58 __ INFO __ prefect.task_runs __ Finished in state Completed()
INFO:prefect.task_runs:Finished in state Completed()
17:19:00.407 | INFO | Flow run 'encouraging-frog' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=1, task_run_id=897ebb3b-9c28-4ff5-9eff-10e1d61c2af9),
Completed(message=None, type=COMPLETED, result=81, task_run_id=42ead4a3-d1e2-4c5c-b822-1e31cc35ad4a)]
This partly works. I read the logging.yml file in and put it under a “logging” key and saved it in the dask config folder. It formats the logs correctly. However the “extra” loggers are at level WARNING even though I set the level as INFO which works in a sequential runner.
I tried DaskTaskRunner(cluster_kwargs=dict(env=dict(PREFECT_LOGGING_EXTRA_LOGGERS=“mylog”))
I believe you would need to configure your extra logger level on the extra logger itself. Afaik the log level set with PREFECT_LOGGING_LEVEL is only for the Prefect logger
Yes I set the extra logger as info. It show as info on sequential runner but not in dask
Ahh that must be somehow configured on Dask probably. I don’t know how though. Keep us posted if you find out more
OK. I have now done that using code below. It now handles logs as expected to both console and orion. However I suggest this should be done by Prefect2. Also something similar will be needed for Ray which also does not handle extra loggers.
def setup_dask_logging():
# read log settings
with open(os.environ["PREFECT_LOGGING_SETTINGS_PATH"]) as f:
logset = f.read()
for x in set(re.findall("\${.*}", logset)):
logset = logset.replace(x, os.environ.get(x[2:-1], "INFO"))
logset = yaml.safe_load(logset)
# set extra loggers
extra_settings = logset["loggers"]["prefect.extra"].copy()
extras = [
x.strip()
for x in os.environ.get("PREFECT_LOGGING_EXTRA_LOGGERS", "").split(",")
]
for extra in extras:
logset["loggers"][extra] = extra_settings
# TODO update rather than overwrite?
# save and copy to dask location
HOME = os.path.expanduser("~")
PREFECTX = os.path.abspath(os.path.dirname(__file__))
DASK_SRC = f"{PREFECTX}/logging_daskdistributed.yaml"
DASK_TGT = f"{HOME}/.config/dask/distributed.yaml"
with open(DASK_SRC, "w") as f:
f.write(yaml.dump(dict(logging=logset)))
os.makedirs(os.path.dirname(DASK_TGT), exist_ok=True)
shutil.copy(DASK_SRC, DASK_TGT)
Wow, this looks complicated and definitely too much boilerplate if you would have to do that in every flow. Let me open an issue