Persist context of LOOP tasks for flow restarts

Hi,

I have a very specific problem.
I need to run a dynamic number of subflows (number decided by parent tasks) in a sequence.
Indeed, I need those subflows to be run one at a time (in order to not overload the dask scheduler with too many tasks at once).
I created a LOOP task that create the subflow run on the prefect server, run it and wait for its completion, then it runs the next subflow.

It works well as long as no subflow fails, but if one subflow fails and I restart manually the overarching flow, it will rerun the subflow execution LOOP task from scratch, even when I added persistence to the whole flow.
Every other task is persisted correctly, but it looks like the LOOP task context is not persisted, only the results.

Is there a way to persist the context of a LOOP task ? (especially the task_loop_result field)

This sounds like a use case for mapping, rather than looping. If you opted for LOOP instead of mapping because you need to run it sequentially, then the solution would be running this flow with a LocalExecutor rather than DaskExecutor. Could you give it a try? This would be much easier than looping.

Also, not sure where you are in your Prefect journey, but switching to Prefect 2.0 may help you address this use case in a much easier way: orion-docs.prefect.io - I understand if you can’t switch to 2.0 yet.

Re:

The task_loop_result is by default persisted in the prefect.context:

prefect.context.task_loop_result

You are right.
I completely forgot I could use a local agent to ensure a sequential execution of subflows.

However I now have a problem, as I’m trying to simplify my flow of flows.
I am registering the following flow on a prefect server, executing it on a prefect local agent (using git storage):

import random

from prefect import Flow, Parameter, task
from prefect.engine.results import LocalResult


@task
def risky_task(r=0.25):
    if random.random() < r:
        raise Exception("something bad happened")
    print("something good happened")


@task(task_run_name="run_flow-{name}")
def run_flow(parameters=None, name=""):
    parameters = {} if parameters is None else parameters
    with Flow(name="subflow") as flow:
        r = Parameter("r", default=0.25)
        t1 = risky_task(r)
        t2 = risky_task(r, upstream_tasks=[t1])
        t3 = risky_task(r, upstream_tasks=[t2])
        risky_task(r, upstream_tasks=[t3])
    state = flow.run(parameters=parameters)
    if state.is_failed():
        raise Exception(f"subflow {name} failed")
    return


with Flow("flow-flows", result=LocalResult("/mnt/prefect/results")) as flow:
    names = [*range(10)]
    run_flow.map(name=names)

Whenever I try to execute this flow, I get a weird error when running the first run_flow task:

Unexpected error while running flow: KeyError('Task slug Constant[list]-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?')
Traceback (most recent call last):
  File "/home/cloud/.local/lib/python3.9/site-packages/prefect/engine/cloud/flow_runner.py", line 398, in initialize_run
    task = tasks[task_run.task_slug]
KeyError: 'Constant[list]-1'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/cloud/.local/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 264, in run
    state, task_states, run_context, task_contexts = self.initialize_run(
  File "/home/cloud/.local/lib/python3.9/site-packages/prefect/engine/cloud/flow_runner.py", line 400, in initialize_run
    raise KeyError(
KeyError: 'Task slug Constant[list]-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'

And afterwards, there is a cascade of errors for each other mapped subflow run:

Failed to set task state with error: ClientError([{'message': 'State update failed for task run ID ebd17a82-6853-4e9a-b3e3-4da372910735: provided a running state but associated flow run e72e5c67-4ee7-4b55-9866-6e897be73843 is not in a running state.', 'locations': [{'line': 2, 'column': 5}], 'path': ['set_task_run_states'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'State update failed for task run ID ebd17a82-6853-4e9a-b3e3-4da372910735: provided a running state but associated flow run e72e5c67-4ee7-4b55-9866-6e897be73843 is not in a running state.'}}}])
Traceback (most recent call last):
  File "/home/cloud/.local/lib/python3.9/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
    state = self.client.set_task_run_state(
  File "/home/cloud/.local/lib/python3.9/site-packages/prefect/client/client.py", line 1604, in set_task_run_state
    result = self.graphql(
  File "/home/cloud/.local/lib/python3.9/site-packages/prefect/client/client.py", line 464, in graphql
    raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'message': 'State update failed for task run ID ebd17a82-6853-4e9a-b3e3-4da372910735: provided a running state but associated flow run e72e5c67-4ee7-4b55-9866-6e897be73843 is not in a running state.', 'locations': [{'line': 2, 'column': 5}], 'path': ['set_task_run_states'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'State update failed for task run ID ebd17a82-6853-4e9a-b3e3-4da372910735: provided a running state but associated flow run e72e5c67-4ee7-4b55-9866-6e897be73843 is not in a running state.'}}}]

What am I doing wrong?

The flow is working fine in a local environment without server registration.

You can’t call Flows from tasks - it doesn’t work that way. Check out those resources to understand this flow-of-flows pattern better in Prefect 1.0:

That’s what I thought initially, but I found posts on other websites where programmers said it was possible. Maybe they meant in local mode only.

I already implemented a working task to run flows as subflows, and tested the prefect.tasks.prefect tasks successfully as well. I wanted to test if an easier way was possible.
Notably, I wanted to fix the following issues:

  • link the flow source code with the subflow code so they will always be compatible (risks of version mismatch if they are registered independently)
  • avoid flooding the prefect server dashboard with hundreds of subflow runs as they have no interest themselves out of their parent flow context
  • when a subflow execution fails, and the task creating it has a retry policy, I would like it to restart the failed subflow (and its failed tasks only) directly without recreating a new subflow starting from scratch

Is there a way to implement flow of flows in prefect 1.x that avoid those issues?

Does upgrading to prefect 2.x would solve some or all of those issues?

it’s not - I strongly encourage to use flow-of-flows as described in the docs

subflows in Prefect 2.0 can likely help. but this is also not a panacea, it’s more of a bookkeeping feature

it would be easier if you would describe the problem you’re trying to solve purely from a business perspective, without looking at any Prefect concepts yet

I am building a scraping workflow.
This workflow is in multiple steps, the resources ids which pages are to be scraped depends on what was found at the previous step.
As I followed prefect guidelines, I implemented small tasks for this purpose (e.g one task per resource id scraped).

Though it is very efficient when a failure occurs, as I can restart from the failure point and not from scratch, it creates a problem of scale: tens of thousands and up to a million tasks would be created, and if they are created at the same time (i.e in one flow), the dask scheduler just breaks down (its machine runs out of memory and crashes).

This is the reason why I thought about flow of flows to separate the tasks, running one subflow with a smaller batch of resources to scrape at a time (less than 10,000 tasks each), at each step of the scraping workflow.
This works well, except it creates the issues I mentioned:

Some details about the prefect environment:

  • prefect server
  • prefect local agent taking the server’s flows
  • dask cluster of 3 workers (though this number could change in the future)

@smartan007 sorry for late reply, I was on vacation

so it looks like the problem you are facing is concurrency limiting? this is something available out of the box in Prefect Cloud:

in Server, you can do:

I am now using flow-of-flows, executing subflows using task mapping solves the concurrency issue.
However it still raises the issues:

But as it is a different problem that would deserve an issue ticket on its own, I mark the original problem as solved.