Dynamically set Flow Configuration via Flow Run Parameters

Hi,

We are operating with extremely costly pipeline runs and would love to attribute the cloud cost we generate to specific customers/contracts.

To do that, we use AWS Cost Allocation Tags.
This normally works great, but if we have a Prefect Flow Deployment that can be used for many different customers, it would be great to set the tag via the TaskRunner configuration dynamically.

For example, the following code fails:

@flow(
    task_runner=RayTaskRunner(init_kwargs={
        "tag_customer": customer_name   # pseudo code
    })
)
def main(*, customer_name: str):
    # [...]


if __name__ == "__main__":
    main(customer_name="abc")

We tried to figure out how we could get the parameter value.
So in theory, the decorator code could access the kwargs:

import functools
def decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        print(kwargs)  # prints: "{'customer_name': 'abc'}"
        return func(*args, **kwargs)
    return wrapper

We tried to build a helper function that generates the tag_customer value out of the kwargs, but no luck so far.

Normally, it would also completely work to grab the parameters via prefect.context:

from prefect.context import get_run_context

@flow
def main(*, customer_name: str):
    print(get_run_context().flow_run.parameters)  # prints: "{'customer_name': 'abc'}"
    # [...]


if __name__ == "__main__":
    main(customer_name="abc")

But building a helper function to use the prefect.context for setting Flow configuration does not work:

def helper_function():
    run_context = get_run_context()
    print(run_context.flow_run.parameters.customer_name)
    return "Test"


@flow(name=helper_function())
def main(*, customer_name: str):
    # [...]


if __name__ == "__main__":
    main(customer_name="ABC")

# ERROR: prefect.exceptions.MissingContextError: No run context available. You are not in a flow or task run context.

Are there any Python-Gymnastics we could do right now to use flow run parameters in the configuration of the flow object?

In addition to the use case “cost allocation tag” I mentioned in the beginning, this would enable many other interesting scenarios.
For example, let’s say two different HPC clusters are available for a flow. One that is memory-optimized and one that is cpu-optimized. With the ability to set flow configuration based on the flow run parameters, you could choose which cluster the flow run should run on whenever you trigger a run via the API or UI.

Best,
Toby

With the release of 2.7.12, there is a new possibility to set a specific flow/task configuration (flow_run_name/task_run_name) via flow parameters.

Unfortunately, only the name can be modified in this way.

Is there any way to do the same with TaskRunner configuration like in the challenge outlined above?

(pinging @justabill / @cicdw based off the GH PR and announcement of 2.7.12)

Hey @trahloff - thank you for describing your use case so clearly! I think it makes a lot of sense, and we have some ideas for how to tackle this in a first class way. I’ve linked your issue internally and realistically I should be able to follow up in the next week or two. Our proposed solution looks a lot like your helper function, with some graceful handling when run outside of a run context.

2 Likes

Hi @cicdw, thanks a lot for taking a look! Did you have the time to look into a potential solution?

Hey @trahloff sincere apologies for the huge delay here, I had to step away from product work for a stretch but I’m back at it. I have internally scoped this and am going to try and get it in our next release on Thursday March 9th. I’ll link any relevant PRs in this thread when they’re up!

Hi @cicdw, no worries at all! Thanks for coming back to this.

We had a design session with our friends from Anyscale end of last week, and we came up with an additional idea that might make the implementation easier and more “Prefecty”.
Would it be more in line with how Prefect works if this behavior were realized by making the Prefect Context available in the helper function and other flow-related code blocks like inside the Infrastructure Block code?

Either way, wish you a great week :raised_hands: Feel free to ping me if you have additional questions

1 Like

Hey @trahloff revisiting this with a suggestion I should have thought of a long time ago; if you can run async code in the body of your script, you could do something like:

async def helper_function():
    from prefect.client.orchestration import get_client
    import os

    client = get_client()
    flow_run = await client.read_flow_run(os.environ.get("PREFECT__FLOW_RUN_ID"))
    return flow_run.parameters.customer_name

If you can’t use async, then in theory you could craft your own GET request to the API (this is always worth keeping in your back pocket for the future), it just might not be worth your time given we should have a supported solution for you next week.

I’ve got the design doc shared internally for a more forward-looking supported interface, and just to give you a preview and see if you have any opinions, it looks like:

from prefect import runtime # namespace for accessing dynamic runtime info as-needed

customer_name = runtime.deployment.parameters.get("customer_name", "test")

Hi @cicdw, yep, this workaround works, and this is how we are doing it at the moment with a little twist: We run it in a custom Infrastructure block that wraps the “normal” Infrastructure block to inject the configuration.

It works now, but having this natively in Prefect out-of-the-box would be great.

The runtime namespace you sketched out looks extremely convenient and fitting for this use case and many other scenarios. Looking forward to giving it a go!

For anyone finding this in the future and wondering whether this is now possible:

The discussed feature was implemented by the amazing @cicdw and released in 2.8.6 :tada:

Hi @cicdw, quick update. Maybe we are doing something wrong but it is not working for us.

from prefect import flow
from prefect_ray import RayTaskRunner
from prefect.runtime import flow_run


@flow(task_runner=RayTaskRunner(address=f"anyscale://{flow_run.id}"))
def main():
    print("done")


if __name__ == "__main__":
    main()

The flow_run.id is None when we execute it.

Ah yes this is expected, I need to add some documentation here to clarify - in your case where you are running the Python code yourself (as opposed to through a deployment), then there is no flow run that Prefect knows about until the flow function is called. The runtime namespace is only interesting when run in the context of a deployment.

If you’re running it yourself you can manually sync up all the parameters in your script, e.g.,

import uuid

from prefect import flow
from prefect_ray import RayTaskRunner
from prefect.runtime import flow_run

manual_id = uuid.uuid4().hex

@flow(task_runner=RayTaskRunner(address=f"anyscale://{flow_run.id or manual_id}"), 
      flow_run_name=manual_id)
def main():
    print("done")


if __name__ == "__main__":
    main()