Is there a good way to design flows for restart after failure

I am currently working on flow of flows, with millions of tasks and thousands of subflows.
It is a step-by-step iterative scraping workflow, which scope grows at each step (starting from 100 resources to scrape, growing to millions at the last step).
I use the prefect precept of atomic tasks, so each scraping task correspond to a single request (one resource to get).
I will skip the details, except that I implemented the subflows as batches of 1000 requests to be executed, and those are executed as a mapping of subflow runs. The task immediately succeeding such a mapped scraping subflow run has a trigger set to all_finished so that a single batch failure should not stop the whole parent flow.
I am persisting the tasks results at the parent flow level, in order to be able to restart the flow for failed tasks.

This works well, however the flow fails when it has no more storage for the new task results (40 Go are filled).

My question are the following:

  1. Is there a way to mark a task as automatically restartable when its result is needed by a restarted downstream task? (this would enable the saving of most of the storage used by generated task result files, as most of my tasks are inexpensive requests preparation tasks)
  2. Is there a way to link a task run with its upstream and downstream task runs? (my idea would be to delete the persisted results once all downstream tasks have succeeded)
  3. Is there simply a better approach to my problem?
1 Like

I would start with a question: would you rather design this solution in Prefect 2.0?

I’m asking since 2.0 is the LTS version and 1.0 is in maintenance mode and we encourage users to migrate. If you are currently in a design phase of such a big project, it would be more beneficial to design it in the LTS product i.e. Prefect 2.0 - docs.prefect.io

Also, this would be easier to accomplish in 2.0

The migration resources are linked in:

1 Like

I intended to perform the migration, but is was on a lower priority than this problem.
But if there is a way to fix the problem “easily” in prefect 2.0, I will swap priorities.

So my questions remain, but for prefect 2.0.

1 Like

I would recommend designing this workflow using functional programming pattern in pure Python. Then, add flow and task decorators where needed to turn it into a Prefect flow, and given how easy it is to build composable subflows in 2.0, you can easily add retries e.g. even to subflows

Designing subflows indeed seem way easier with prefect 2.0.

However, it seems the ability to manually restart a flow after failure (or simply restart a flow) has been lost, alongside the persistence of task results, isn’t it?
Is it temporary, or will it be brought back in a potentially different form?

As I am dealing with a heavy and long flows, with a bunch of unavoidable and sometimes unforseeable errors, I found the ability to manually restart a flow from its point of failure (or even any point) to be very practical.

1 Like

Restart and retry are very different features

I do not believe that we will introduce restarts from arbitrary point since you can just rerun your script even from a local machine for the relevant subflows when needed, and all the execution metadata will be tracked in the backend

Are all tasks results systematically persisted in prefect 2.0?
I already had a problem in prefect 1.0 when persisting all tasks results because it filled the storage very quickly causing the flow to fail.

Is there a way to disable the persistence of tasks results in 2.0 or remove persisted results systematically when they are no longer needed?

Not yet, but Michael starts working on it this week

Regarding restart, this conversation might be relevant for you too:

In the meantime, how could I best delete task results after a flow is completed? Or perhaps the release of the feature is so imminent that waiting for it would make more sense?

I was thinking about running a clean-up task at the end of the flow, looking for all task run IDs using flow run context and deleting their results file from storage.

We have started working on an improved design for it this week - you should see more configurable setup in the next releases

Hi, I am evaluating Prefect 2.0 as a replacement for our existing Airflow deployment and have hit on a similar problem as well. One of the things that Airflow allowed us to do was to (re)trigger a previous dag or task using parameters unique to that run. In my case - the scheduled run time.

Example:
I have a run that is scheduled on 2022-09-14 08:00:00. This scheduled time (obtained from the RunContext) is used as input for the tasks. However that particular run failed and it is now 2022-09-15 16:00:00. We have fixed the issue, but now need to restart that run so it still uses 2022-09-14 08:00:00 as the input.

Is this achievable in Prefect 2.0?

Could you explain your use case and the process of how you load the data? why would you need this exact timestamp to drive your data ingestion/transformation process?

It absolutely is, but rather than relying on any implicit and prone to errors backfilling, we recommend using parametrized runs and explicitly specifying the period of data you want to process

We work mainly with time series data, and the scheduled time lets us easily request for historical data without having to start a new run with a custom parameter i.e. the run scheduled for 2022-09-14 is expected to return values correct ~as of~ 2022-09-14.

We also have SFTP pipelines that download files that are backdated e.g. the 2022-09-14 run needs to download files dated for the previous day (2022-09-13). Knowing the scheduled time allows my tasks to calculate what file date I am looking for.

The second reason is for reporting purposes. We want to record when a data point is scheduled to come in versus when it actually came in, so we can track inefficiencies in our processes and find ways to improve them. Starting an adhoc run means we potentially lose that piece of information.

All these is done in Airflow by re-running the scheduled run for that day, which is much simpler to execute than setting up an adhoc run. Not the best implementation as you’ve pointed out, but it is a straightforward recovery process that my non-technical users (and sadly even myself) have gotten used to.

Regardless of whether you leverage an implicit (and extremely error prone) scheduled start time from Airflow or explicit (and way more reliable) parameter value as I recommend here, in the end, you need to leverage that value in your dataflow or queries, so the outcome is the same

It’s a matter of how you design it

Hey Anna, specifically made an account to reply to this.

I’m currently going through the discovery phase of adopting a workflow orchestration tool for my current company. Having worked at two companies that require SOX audit compliance, I’m somewhat confident that each would not consider “rerunning your script from a local machine” to be in compliance in a production environment.

So, we would actually have to pre-define and pre-deploy runnable tasks as individual flows in order to fit the use case of being able to re-run individual tasks, or start a flow from an arbitrary task.

3 Likes

I am not sure I agree with this statement.
The execution time (timestamp, timezone, format etc.) are much safer when maintained by the sytem vs me manually (and explicity) entering them as parameters.

If I want to process yesterday’s data again, I only have to restart yesterday’s run in Airflow. In Prefect, I’d have to figure out all the parameters that construct “yesterday’s run” and manually enter them while starting a new run.

I am bound to eventually make mistakes doing this, hence I will argue it is more error prone.