How can I restart a flow in Prefect 2.0?

Hello there, first of many questions as I’ve been evaluating Prefect 2.0 for our use. The first is quite basic.

The documentation on the UI states:

If you’re having issues with a flow run, Prefect provides multiple tools to help you identify issues, re-run flows, and even delete a flow or flow run.

However, I don’t see any option to re-run a flow via the UI or the CLI. The CLI commands for flow-run are limited to ls, inspect, and delete. Am I missing something? :thinking:

I am evaluating Prefect 2.0.2.

1 Like

What do you mean when you say rerun? Prefect 2.0 supports fully dynamic workflows so that restarting a flow e.g. from the third task in a flow wouldn’t work, but restarting an entire flow or only a specific subflow would work and you can do it even by running the flow as any normal Python script from your IDE. UI and CLI works too as long as your flow is deployed and you have a running agent

Hi Anna, thanks for the follow-up (BTW, also subscribed to the Slack channel so I’ll take a look for ideas there as well). I fully admit that perhaps the way I’m looking at it is wrong.

I’ve been able to get a deployment and a running agent working with no problem and submit flow runs via the deployment.

What we are trying to do is understand how we can restart a failed flow run. Ideally, we’d like to use the UI to inspect failed workflows and restart those that have failed. However, I’m unable to find a way to reference a prior flow run and instruct prefect to restart that flow.

It seems like there was a "restart"option in the UI for Prefect 1.0 but I don’t see the equivalent in Prefect 2.0.

Did you know that Prefect 2.0 has flow-level retries? You can configure it the same way as with tasks.

Rerunning a flow works the same way as you would trigger any flow:

Restarts functionality doesn’t exist as a feature in 2.0 due to the highly dynamic nature of 2.0, but restarting or rerunning a flow works

Yes we are aware, thanks! I think for now we’ll consider building some sort of resubmission approach until that feature is available.

To be transparent, I’m not sure if any Restart functionality will ever be available in 2.0 - we could explore that at some point, but I’d encourage you to rethink your design and build your flows in a way that doesn’t need that functionality

Restarting a workflow from some arbitrary point is a relic of the DAG-based workflows, which instead of supporting dynamic flows that can share data and business logic, opt for a constraining approach where every part of your workflow is a large job in itself and is chained together in some DAG construct known at build-time. Prefect 2.0 rejects that and encourages DAG-free workflows with small dynamic tasks

Anna, ok got it. I’ll ponder our model a bit. :thinking:

I guess where I’m struggling is that in our current model our desired flow makes incremental changes to data in each step. If that flow then fails in step 5 out of 8 for example (after a few retries of step 5), that piece of data is left in a state of result after step 4. We may need to evaluate the issue and when resolved, then pick up from step 5 again for that piece of data.

One approach could be to ensure that steps 1-4 (as well as other steps) are smart enough to check for the state of the step and not re-process the data if it has already been done. This way we could just rerun the entire flow with the parameters as needed and steps 1-4 should breeze through if the works already been done.

Alternatively I could pass data from one step to another in the flow, but in this case I’m weary about that approach since (a) the data is quite large and (b) the processing times for steps 1-4 can be long so I’d rather not replicate work that’s already been done.

I’ll try to read up on some more orchestration approaches to see if I can come up with any ideas. Seems like my first approach would be the best though.

1 Like

:100: yes - this is also best practice in data engineering to ensure your components are as self-contained and dependency-free as possible. Doing such check can even be something as simple as querying theMAX(LAST_UPDATED) timestamp field in your database, if you have that

Another alternative is to leverage caching - assuming you run this every hour, if you would add caching with 1 h expiration, this will have exactly the effect you are looking for: successful tasks won’t be rerun, but those that failed and haven’t been cached, will run again in the subsequent run.

from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def hello_task(name_input):
    # Doing some work
    print("Saying hello")
    return "hello " + name_input

@flow
def hello_flow(name_input):
    hello_task(name_input)

Hello,

We have the same problematic in my company.
We are looking in a new orchestrator, and prefect2 seems like a very good candidate but the lack of a restart command is really annoying.

When you run a flow, you have entry parameters but also sub results that you reuse in other tasks.
I you recreate a full run from the begining these sub results can change (so also the behaviour and the result of the flow).

For example, with this sequential flow (on a microservice architecture) :

  • API_logistic_send_package() => return $pkg_ref
  • API_notif_send_mail_pkg_sent($pkg_ref)
  • API_logistic_wait_delivery($pkg_ref)
  • API_notif_send_mail_pkg_delivered($pkg_ref)

When you have an error on the task API_logistic_wait_delivery, your flow run is killed. But obviously we need to play the missing steps.
If we run a new flow from the beginning, we will resend a new package to the client (and the associated mail) which is not possible.
How are you debuging your complex flow runs when you have an error ?

We can try to hack the code to save somewhere the value of the pkg_ref with the flow run ref to allow to run a new flow with params set in entry (and the task skipped if pkg_ref is already set) but it’s really complicated and impratical (and I don’t imagine if you have 20 steps to manage…).

The caching method seems also tricky and it’s only working for crons.

A solution could be to allow to directly set the result of some tasks when you instantiate a new flow run. The restart would be just getting the result of the success tasks on a flow run in error and instanciate a new flow run with these values.

It would also be amazing to easily test/mock the deployments with bypassing some steps with a specific result !

Perhaps it’s already possible ?

Best regards,
Max

1 Like

Caching works for all workflows not only for scheduled ones. You can do as much as commenting out the things that were successful and run only the rest

We have an open PR for it though so restarts are coming

Hi Anna! Thanks for your help here. Has there been an update with the PR for restarts by any chance?

1 Like

yup Modular Data Stack — Build a Data Platform with Prefect, dbt and Snowflake (Part 5) | by Anna Geller | The Prefect Blog | Medium