I’d like to be able to assign the task name to task_a based on the assigned variable, in this example my_task.
The reason I ask is because we have quite long names for variables and right now we then have to assign a task to a variable and also give that variable again as the name of the task - which makes for unwieldy code. For example:
We want the task name in the logs to be some_super_long_variable_name_that_is_really_long - whereas if we don’t set the name param when we call the task then the logs show the task name as the name of the function defining the task (i.e., task_a). We could of course just reduce the length of our variable names, but there’s an advantage to having very descriptive variable names
It would be great to be able to dynamically set the task name as the name of the variable the task output is assigned to. I don’t know if this is possible. I think in Prefect 1 this was the case that the variable assigned to the task execution was the name of the task, but maybe not?
The below thread here is related but there was no follow up issue opened AFAICT:
There’s task_run_name to format based on the args you pass.
You can distinguish runs of this task by providing a task_run_name; this setting accepts a string that can optionally contain templated references to the keyword arguments of your task. The name will be formatted using Python’s standard string formatting syntax as can be seen here:
import datetime
from prefect import flow, task
@task(name="My Example Task",
description="An example task for a tutorial.",
task_run_name="hello-{name}-on-{date:%A}")
def my_task(name, date):
pass
@flow
def my_flow():
# creates a run with a name like "hello-marvin-on-Thursday"
my_task(name="marvin", date=datetime.datetime.utcnow())
I am not sure if it’s possible to extract the variable name to use as task name without significant hacking.
from functools import wraps
from prefect import task as _task
from typing import Callable, List, Union, Any, Dict
def task(
tags: List[Union[str, None]] = [],
retries: int = 0,
retry_delay_seconds: int = 0,
*args,
**kwargs,
):
"""Decorator factory that replaces prefect's own "task" decorator by accepting the
name of the customer and using it to name prefect's tasks dynamically.
With this strategy, we override the static behavior of Python decorators.
Attention
---------
The keyword arguments must contain the argument called "customer".
Also, even if not used, retries and retry_delay_seconds must be informed.
Parameters
----------
tags : List[str]
List of tags to be used, by default []
retries : int
Number of retries for the task, by default 0
retry_delay_seconds : int
Time of seconds between each retry, by default 0
Returns
-------
Callable
Function that can be used to decorate other functions, transforming them into
prefect tasks.
"""
def decorator(function):
def inner(*args, **kwargs):
if tags:
full_tags = [kwargs["customer"], *tags] if kwargs["customer"] else tags
else:
full_tags = (
[
kwargs["customer"],
]
if kwargs["customer"]
else []
)
@_task(
tags=full_tags,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
)
@wraps(function)
def _inner(*args, **kwargs):
return function(*args, **kwargs)
return _inner(*args, **kwargs)
return inner
return decorator
Then, I just decorate my tasks using @task(tags=, retry_delay_seconds=X, retries=Y) and they are going to be named by taking into account the “customer” argument passed to the decorated function.
def task(
tags: List[Union[str, None]] = [],
retries: int = 0,
retry_delay_seconds: int = 0,
*args,
**kwargs,
):
"""Decorator factory that replaces prefect's own "task" decorator by accepting the
name of the customer and using it to name prefect's tasks dynamically.
With this strategy, we override the static behavior of Python decorators.
Attention
---------
The keyword arguments must contain the argument called "customer".
Also, even if not used, retries and retry_delay_seconds must be informed.
Parameters
----------
tags : List[str]
List of tags to be used, by default []
retries : int
Number of retries for the task, by default 0
retry_delay_seconds : int
Time of seconds between each retry, by default 0
Returns
-------
Callable
Function that can be used to decorate other functions, transforming them into
prefect tasks.
"""
def decorator(function):
def inner(*args, **kwargs):
if tags:
full_tags = [kwargs["customer"], *tags] if kwargs["customer"] else tags
else:
full_tags = (
[
kwargs["customer"],
]
if kwargs["customer"]
else []
)
@_task(
tags=full_tags,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
)
@wraps(function)
def _inner(*args, **kwargs):
return function(*args, **kwargs)
return _inner(*args, **kwargs)
return inner
return decorator
In this way, I can decorate functions with @task(tags = , retries=X, retry_delay_seconds=Y) and the task name will be named by taking into account the value of the “customer” argument passed to the underlying function.
Hi @scott
Apologies, the code I pasted missed a chunk. Here it is:
import contextlib
from functools import wraps
from prefect import task as _task
from typing import Callable
def task(
tags: List[Union[str, None]] = [],
retries: int = 0,
retry_delay_seconds: int = 0,
*args,
**kwargs,
):
"""Decorator factory that replaces prefect's own "task" decorator by accepting the
name of the customer and using it to name prefect's tasks dynamically.
With this strategy, we override the static behavior of Python decorators.
Attention
---------
The keyword arguments must contain the argument called "customer".
Also, even if not used, retries and retry_delay_seconds must be informed.
Parameters
----------
tags : List[str]
List of tags to be used, by default []
retries : int
Number of retries for the task, by default 0
retry_delay_seconds : int
Time of seconds between each retry, by default 0
Returns
-------
Callable
Function that can be used to decorate other functions, transforming them into
prefect tasks.
"""
def decorator(function):
def inner(*args, **kwargs):
task_name = f"{name}"
with contextlib.suppress(KeyError):
if kwargs["customer"]:
task_name += f" - customer: {kwargs['customer']}"
if kwargs["filename"]:
task_name += f" - filename: {kwargs['filename']}"
if tags:
full_tags = [kwargs["customer"], *tags] if kwargs["customer"] else tags
else:
full_tags = (
[
kwargs["customer"],
]
if kwargs["customer"]
else []
)
@_task(
name=task_name,
tags=full_tags,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
)
@wraps(function)
def _inner(*args, **kwargs):
return function(*args, **kwargs)
return _inner(*args, **kwargs)
return inner
return decorator