Hi,
We are operating with extremely costly pipeline runs and would love to attribute the cloud cost we generate to specific customers/contracts.
To do that, we use AWS Cost Allocation Tags.
This normally works great, but if we have a Prefect Flow Deployment that can be used for many different customers, it would be great to set the tag via the TaskRunner configuration dynamically.
For example, the following code fails:
@flow(
task_runner=RayTaskRunner(init_kwargs={
"tag_customer": customer_name # pseudo code
})
)
def main(*, customer_name: str):
# [...]
if __name__ == "__main__":
main(customer_name="abc")
We tried to figure out how we could get the parameter value.
So in theory, the decorator code could access the kwargs:
import functools
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
print(kwargs) # prints: "{'customer_name': 'abc'}"
return func(*args, **kwargs)
return wrapper
We tried to build a helper function that generates the tag_customer
value out of the kwargs, but no luck so far.
Normally, it would also completely work to grab the parameters via prefect.context
:
from prefect.context import get_run_context
@flow
def main(*, customer_name: str):
print(get_run_context().flow_run.parameters) # prints: "{'customer_name': 'abc'}"
# [...]
if __name__ == "__main__":
main(customer_name="abc")
But building a helper function to use the prefect.context
for setting Flow configuration does not work:
def helper_function():
run_context = get_run_context()
print(run_context.flow_run.parameters.customer_name)
return "Test"
@flow(name=helper_function())
def main(*, customer_name: str):
# [...]
if __name__ == "__main__":
main(customer_name="ABC")
# ERROR: prefect.exceptions.MissingContextError: No run context available. You are not in a flow or task run context.
Are there any Python-Gymnastics we could do right now to use flow run parameters in the configuration of the flow
object?
In addition to the use case “cost allocation tag” I mentioned in the beginning, this would enable many other interesting scenarios.
For example, let’s say two different HPC clusters are available for a flow. One that is memory-optimized and one that is cpu-optimized. With the ability to set flow configuration based on the flow run parameters, you could choose which cluster the flow run should run on whenever you trigger a run via the API or UI.
Best,
Toby