How to accomplish a never-ending flow or skip a new scheduled run if the previous flow run hasn't finished yet?

View in #prefect-community on Slack

Ievgenii_Martynenko @Ievgenii_Martynenko: Hi, I have a task that should be executed each 30 minutes; sometimes task is running longer than 30 minutes, and at the same time another instance of this Task is started in parallel which is causing logical and performance issues. Is there any option not to start the same Task if previous execution is not finished yet for non-cloud? (seems like for cloud it’s possible using labels and number of parallel executions allows).

Never-ending flow

@Anna_Geller: @Ievgenii_Martynenko correct, for cloud you could use concurrency limits.

In general I understand that you want this flow to basically run all the time? i.e. when one flow run finishes execution, starting the next one? If so, you can use the following state handler:

from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run
import time


@task(log_stdout=True)
def hello_world():
    print("Sleeping...")
    time.sleep(4)  # to have enough time to kill it
    return "hello world"


def never_ending_state_handler(obj, old_state, new_state):
    if new_state.is_successful():
        create_flow_run.run(flow_name="never-ending-flow", project_name="community")
    return new_state


with Flow("never-ending-flow", state_handlers=[never_ending_state_handler]) as flow:
    hello_task = hello_world()

Ievgenii_Martynenko @Ievgenii_Martynenko: Thanks for the answer. Not really, though never ending flow could be a partial solution. I meant the following: if task (Flow) is executed in less than 30 minutes, says 26 minutes - it’s usual behavior and some other abstract tasks that are running against the same database have some more resources. If tasks (Flow) is executed in more that 30 minutes, say 34 minutes - no new execution is started until current is finished. And here we have to options: either start new Task instance again immediately at 34 minute, or wait another 26 minutes and then start it.
Just for reference: such behavior is possible in Autosys (CA Workload Automations) or SQL Server Agent.

@Anna_Geller: Can you explain the problem you try to solve? Perhaps you can use the above code and if there is some condition in your data indicating that what your task is doing is already done, then raising ENDRUN signal to finish this run?

from prefect import task
from prefect.engine.signals import ENDRUN
from prefect.engine.state import Success


@task(log_stdout=True)
def check_if_condition_met():
    if "some state of the world":
        raise ENDRUN(Success(message="Data is already in the DB"))

> Just for reference: such behavior is possible in Autosys (CA Workload Automations) or SQL Server Agent.
All that those solutions are doing is basically: Skip if running. If you want this, you could have a state handler that queries the API if there is currently any flow run in a Running state for this flow, and if so, ending the current flow run immediately without doing anything.

Ievgenii_Martynenko @Ievgenii_Martynenko: Yes, state handler that queries API sounds like a plan. In this case we will have 1 state handler per each Flow, each Flow checking it’s own state. Do you have an reference/implementation example somewhere?

Kevin_Kho @Kevin_Kho: Do you mean Flow every 30 mins or task every 30 mins??

Ievgenii_Martynenko @Ievgenii_Martynenko: Flow (sorry, there is name confusion between different systems)

Kevin_Kho @Kevin_Kho: So this will be a first class feature in Orion (Prefect 2.0). But for now, there are a few ways to do it. First would be the concurrency limits in Cloud like Anna suggested that let you have 1 concurrent flow run at a time. This will wait for the previous one to execute before moving on. The second one is the state handler which lets you query for current executions and then mark the Flow run as success if there is an existing run.

There is an example of the second one here .

GitHub: Code Snippet to Limit Flow Run Concurrency · Issue #307 · PrefectHQ/server

@Anna_Geller: @Ievgenii_Martynenko this state handler should do exactly what you wanted: if there are any active flow runs for this flow, it skips the new flow run:

Skip if running

import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.engine.state import Skipped
import time


@task(log_stdout=True)
def hello_world():
    print("Sleeping...")
    time.sleep(360)


def skip_if_running_handler(obj, old_state, new_state):
    if new_state.is_running():
        client = Client()
        query = """
            query($flow_id: uuid) {
              flow_run(
                where: {_and: [{flow_id: {_eq: $flow_id}},
                {state: {_eq: "Running"}}]}
              ) {
                name
                state
                start_time
              }
            }
        """
        response = client.graphql(
            query=query, variables=dict(flow_id=prefect.context.flow_id)
        )
        active_flow_runs = response["data"]["flow_run"]
        if active_flow_runs:
            logger = prefect.context.get("logger")
            message = "Skipping this flow run since there are already some flow runs in progress"
            logger.info(message)
            return Skipped(message)
    return new_state


with Flow("skip_if_running", state_handlers=[skip_if_running_handler]) as flow:
    hello_task = hello_world()

Ievgenii_Martynenko @Ievgenii_Martynenko: Kevin, Anna thank you for answers, and also for pointing the idea about never-ending flow. We’ll consider state handler, but overall it’s a nice motivation to migrate from Prefect to Orion once released. Hope it goes smoothly.