Adding flow run parameters/context variables to all flows

Our firm uses custom logic that makes a task run return a “Skipped” state to support partial flow runs while preserving task-level dependencies.

To customize the set of tasks to include, we utilize the “parameter” of flow run deployments. We define our flows like:

@flow
def some_flow(partial_run_tasks=None):
    some_task()

This works great as now people can simply put a list of tasks they want to run in the UI’s Custom run tab, but it is also a bit weird to implement, as

  1. the argument partial_run_tasks needs to be added to all our flows
  2. it is not used anywhere in the flow function itself (tasks determine if they are in this list by accessing FlowRunContext.flow_run.parameters)

We feel this parameter function more like a context variable: it is set at a flow run level, shared to all task runs, and only used by the task runs. In 1.0 I remember we were able to add custom run contexts to a flow run from the UI, would a similar feature be possible in 2.0? Is there any smarter ways to make all our flows accept this context var other than adding it to all the flow function’s signature?

1 Like

+1 here. Our experience was also that in 1.0 it came in handy to add variables into the flow context and have all tasks be able to access them from the flow context.

That seems intentionally not possible in 2.0. The doc here states “Note that we do not send the flow run context to distributed task workers because the context is costly to serialize and deserialize.”

I can understand the serialization/deserialization performance concern for some use cases. But for us it’d be nice be able to have 2.0 behave the way 1.0 did. Adding some number of parameters into each and every task in the flow is an alternative but I agree with the OP that this is a lot less desirable for our use case.

So maybe take this as a feature request to make the flow context optionally sent to the tasks? Or do provide the ability to do that with some but not all of the flow context?

1 Like

hi @PPPSDavid and @mattklein

I believe you can accomplish what you want using a one or many of these options.

Let us know if these fall short

Thanks @nate. We are actually inspired by the options you listed and come up with the parameter-based partial flow run design. I think the pain point here is not if we can share a set of arguments to the task runs: we could define our own context var in the flow function, or just let the task run grab the flow run context (in a concurrent task runner), and read flow run arguments from there.

The key issue is that none of the above options feel elegant: with this concept of a maintained flow/task run context already constructed by Prefect, it feels suboptimal that we had to result to these alternatives when we want to have something as simple as a list of strings/booleans baked into the flow run context and shared among all tasks.

For example, if we are to use the custom context variable approach, we would need to:

  1. Still have to define thsi dummy flow run argument so that people can run this flow run with custom arguments in the UI
  2. Create a new context variable in every flow run we defined, and let it grab the flow run argument, then access it in all the task runs. Suppose we have a task that is shared by 10 flows, which context variable would I refer to when defining the task function logic?

Again, we appreciate the brainstorming you shared, and it inspired us to come up with a functional implementation. It is just that having a customizable flow run context would make all of these much smoother :slight_smile:

1 Like