How can I restart a flow in Prefect 2.0?

Hello there, first of many questions as I’ve been evaluating Prefect 2.0 for our use. The first is quite basic.

The documentation on the UI states:

If you’re having issues with a flow run, Prefect provides multiple tools to help you identify issues, re-run flows, and even delete a flow or flow run.

However, I don’t see any option to re-run a flow via the UI or the CLI. The CLI commands for flow-run are limited to ls, inspect, and delete. Am I missing something? :thinking:

I am evaluating Prefect 2.0.2.

1 Like

What do you mean when you say rerun? Prefect 2.0 supports fully dynamic workflows so that restarting a flow e.g. from the third task in a flow wouldn’t work, but restarting an entire flow or only a specific subflow would work and you can do it even by running the flow as any normal Python script from your IDE. UI and CLI works too as long as your flow is deployed and you have a running agent

Hi Anna, thanks for the follow-up (BTW, also subscribed to the Slack channel so I’ll take a look for ideas there as well). I fully admit that perhaps the way I’m looking at it is wrong.

I’ve been able to get a deployment and a running agent working with no problem and submit flow runs via the deployment.

What we are trying to do is understand how we can restart a failed flow run. Ideally, we’d like to use the UI to inspect failed workflows and restart those that have failed. However, I’m unable to find a way to reference a prior flow run and instruct prefect to restart that flow.

It seems like there was a "restart"option in the UI for Prefect 1.0 but I don’t see the equivalent in Prefect 2.0.

Did you know that Prefect 2.0 has flow-level retries? You can configure it the same way as with tasks.

Rerunning a flow works the same way as you would trigger any flow:

Restarts functionality doesn’t exist as a feature in 2.0 due to the highly dynamic nature of 2.0, but restarting or rerunning a flow works

Yes we are aware, thanks! I think for now we’ll consider building some sort of resubmission approach until that feature is available.

To be transparent, I’m not sure if any Restart functionality will ever be available in 2.0 - we could explore that at some point, but I’d encourage you to rethink your design and build your flows in a way that doesn’t need that functionality

Restarting a workflow from some arbitrary point is a relic of the DAG-based workflows, which instead of supporting dynamic flows that can share data and business logic, opt for a constraining approach where every part of your workflow is a large job in itself and is chained together in some DAG construct known at build-time. Prefect 2.0 rejects that and encourages DAG-free workflows with small dynamic tasks

Anna, ok got it. I’ll ponder our model a bit. :thinking:

I guess where I’m struggling is that in our current model our desired flow makes incremental changes to data in each step. If that flow then fails in step 5 out of 8 for example (after a few retries of step 5), that piece of data is left in a state of result after step 4. We may need to evaluate the issue and when resolved, then pick up from step 5 again for that piece of data.

One approach could be to ensure that steps 1-4 (as well as other steps) are smart enough to check for the state of the step and not re-process the data if it has already been done. This way we could just rerun the entire flow with the parameters as needed and steps 1-4 should breeze through if the works already been done.

Alternatively I could pass data from one step to another in the flow, but in this case I’m weary about that approach since (a) the data is quite large and (b) the processing times for steps 1-4 can be long so I’d rather not replicate work that’s already been done.

I’ll try to read up on some more orchestration approaches to see if I can come up with any ideas. Seems like my first approach would be the best though.

1 Like

:100: yes - this is also best practice in data engineering to ensure your components are as self-contained and dependency-free as possible. Doing such check can even be something as simple as querying theMAX(LAST_UPDATED) timestamp field in your database, if you have that

Another alternative is to leverage caching - assuming you run this every hour, if you would add caching with 1 h expiration, this will have exactly the effect you are looking for: successful tasks won’t be rerun, but those that failed and haven’t been cached, will run again in the subsequent run.

from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def hello_task(name_input):
    # Doing some work
    print("Saying hello")
    return "hello " + name_input

@flow
def hello_flow(name_input):
    hello_task(name_input)