Prefect 2.7.12 is out with custom, templatable flow and task run names

Hi everyone!

Yesterday, we shipped an off-cycle bonus release, 2.7.12. In addition to squashing a few bugs, the release included a highly requested feature - custom flow and task run names!

New keyword arguments, flow_run_name for flows and task_run_name for tasks, have been added. They both accept a string that will be used to create a run name for each run of the function. For example:

from datetime import datetime
from prefect import flow, task


@task(task_run_name="custom-static-name")
def my_task(name):
    print(f"hello {name}")


@flow(flow_run_name="custom-but-fixed-name", name="custom-named-flow", log_prints=True)
def my_flow(name: str, date: datetime):
    print(date)
    return my_task(name)


if __name__ == "__main__":
    my_flow(name="world", date=datetime.now())

In order to make these names dynamic, you can template them using the parameter names of the task or flow function, using all of the basic rules of Python string formatting as follows:

from datetime import datetime
from prefect import flow, task


@task(task_run_name="my-dataflow-{name}")
def my_task(name):
    print(f"hello {name} 🎉")


@flow(flow_run_name="{name}-class-{date:%A}-flow-run", name="named-flow", log_prints=True)
def my_flow(name: str, date: datetime):
    print(date)
    return my_task(name)


if __name__ == "__main__":
    my_flow(name="world", date=datetime.now())

As always, see the Prefect release notes for more information.

5 Likes

That’s really great!
I had implemented my own way of circumventing this by wrapping the prefect’s flow and tasks functions like:

from functools import wraps
from prefect import flow as _flow
from typing import Callable, List, Union, Any, Dict

def flow(name: str) -> Callable:
    def decorator(function):
        def inner(*args, **kwargs):
            flow_name = f"{name}"
            with contextlib.suppress(KeyError):
                if kwargs["customer"]:
                    flow_name += f" - customer: {kwargs['customer']}"
                if kwargs["filename"]:
                    flow_name += f" - filename: {kwargs['filename']}"
            @_flow(name=flow_name)
            @wraps(function)
            def _inner(*args, **kwargs):
                return function(*args, **kwargs)
            return _inner(*args, **kwargs)
        return inner
    return decorator

I can now definitely throw away my work-around script!

Thanks for implementing this feature!

2 Likes

How will prefect handle flows/tasks with duplicate names (if we forget to change them dynamically)? I assume that this will be ok?

what do you mean by duplicated names? they are not forbidden

you have full control over naming here, perhaps you can try various examples and you can report back if you struggle

Thank you for this feature!
I’m building a generic ETL pipeline for several projects, therefore it is very useful to be able to pass the project name as parameter and see it in the flow run overview.
I’m wondering, though, if the pipeline is scheduled to run, say, once per day, could I somehow include the date into the flow run name? I tried passing date=pd.Timestamp.now and then using flow_run_name=f"{project}_{date():%Y%m%d}", but alas, JSON does not support serializing function objects…