I am currently working on flow of flows, with millions of tasks and thousands of subflows.
It is a step-by-step iterative scraping workflow, which scope grows at each step (starting from 100 resources to scrape, growing to millions at the last step).
I use the prefect precept of atomic tasks, so each scraping task correspond to a single request (one resource to get).
I will skip the details, except that I implemented the subflows as batches of 1000 requests to be executed, and those are executed as a mapping of subflow runs. The task immediately succeeding such a mapped scraping subflow run has a trigger set to all_finished so that a single batch failure should not stop the whole parent flow.
I am persisting the tasks results at the parent flow level, in order to be able to restart the flow for failed tasks.
This works well, however the flow fails when it has no more storage for the new task results (40 Go are filled).
My question are the following:
Is there a way to mark a task as automatically restartable when its result is needed by a restarted downstream task? (this would enable the saving of most of the storage used by generated task result files, as most of my tasks are inexpensive requests preparation tasks)
Is there a way to link a task run with its upstream and downstream task runs? (my idea would be to delete the persisted results once all downstream tasks have succeeded)
I would start with a question: would you rather design this solution in Prefect 2.0?
I’m asking since 2.0 is the LTS version and 1.0 is in maintenance mode and we encourage users to migrate. If you are currently in a design phase of such a big project, it would be more beneficial to design it in the LTS product i.e. Prefect 2.0 - docs.prefect.io
I would recommend designing this workflow using functional programming pattern in pure Python. Then, add flow and task decorators where needed to turn it into a Prefect flow, and given how easy it is to build composable subflows in 2.0, you can easily add retries e.g. even to subflows
Designing subflows indeed seem way easier with prefect 2.0.
However, it seems the ability to manually restart a flow after failure (or simply restart a flow) has been lost, alongside the persistence of task results, isn’t it?
Is it temporary, or will it be brought back in a potentially different form?
As I am dealing with a heavy and long flows, with a bunch of unavoidable and sometimes unforseeable errors, I found the ability to manually restart a flow from its point of failure (or even any point) to be very practical.
I do not believe that we will introduce restarts from arbitrary point since you can just rerun your script even from a local machine for the relevant subflows when needed, and all the execution metadata will be tracked in the backend
Are all tasks results systematically persisted in prefect 2.0?
I already had a problem in prefect 1.0 when persisting all tasks results because it filled the storage very quickly causing the flow to fail.
Is there a way to disable the persistence of tasks results in 2.0 or remove persisted results systematically when they are no longer needed?
Hi, I am evaluating Prefect 2.0 as a replacement for our existing Airflow deployment and have hit on a similar problem as well. One of the things that Airflow allowed us to do was to (re)trigger a previous dag or task using parameters unique to that run. In my case - the scheduled run time.
I have a run that is scheduled on 2022-09-14 08:00:00. This scheduled time (obtained from the RunContext) is used as input for the tasks. However that particular run failed and it is now 2022-09-15 16:00:00. We have fixed the issue, but now need to restart that run so it still uses 2022-09-14 08:00:00 as the input.
We work mainly with time series data, and the scheduled time lets us easily request for historical data without having to start a new run with a custom parameter i.e. the run scheduled for 2022-09-14 is expected to return values correct ~as of~ 2022-09-14.
We also have SFTP pipelines that download files that are backdated e.g. the 2022-09-14 run needs to download files dated for the previous day (2022-09-13). Knowing the scheduled time allows my tasks to calculate what file date I am looking for.
The second reason is for reporting purposes. We want to record when a data point is scheduled to come in versus when it actually came in, so we can track inefficiencies in our processes and find ways to improve them. Starting an adhoc run means we potentially lose that piece of information.
All these is done in Airflow by re-running the scheduled run for that day, which is much simpler to execute than setting up an adhoc run. Not the best implementation as you’ve pointed out, but it is a straightforward recovery process that my non-technical users (and sadly even myself) have gotten used to.
Regardless of whether you leverage an implicit (and extremely error prone) scheduled start time from Airflow or explicit (and way more reliable) parameter value as I recommend here, in the end, you need to leverage that value in your dataflow or queries, so the outcome is the same