Is it possible to skip a new scheduled flow run if a previous flow run of the same flow is still in progress?

Imagine that you have a workflow scheduled to run hourly. But sometimes this flow may take longer than an hour and you want to skip the next scheduled flow run if the previous flow run is still in progress.

In order to do that, you can attach the following flow-level state handler:

import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.engine.state import Skipped
import time


@task(log_stdout=True)
def hello_world():
    print("Sleeping...")
    time.sleep(360)


def skip_if_running_handler(obj, old_state, new_state):
    if new_state.is_running():
        client = Client()
        query = """
            query($flow_id: uuid) {
              flow_run(
                where: {_and: [{flow_id: {_eq: $flow_id}},
                {state: {_eq: "Running"}}]}
                limit: 1
                offset: 1
              ) {
                name
                state
                start_time
              }
            }
        """
        response = client.graphql(
            query=query, variables=dict(flow_id=prefect.context.flow_id)
        )
        active_flow_runs = response["data"]["flow_run"]
        if active_flow_runs:
            logger = prefect.context.get("logger")
            message = "Skipping this flow run since there are already some flow runs in progress"
            logger.info(message)
            return Skipped(message)
    return new_state


with Flow("skip_if_running", state_handlers=[skip_if_running_handler]) as flow:
    hello_task = hello_world()

The above solution will work for all Prefect < 2.0 flows, regardless of whether you use Prefect Cloud or Prefect Server. It will mark the flow run as Skipped if there is at least one flow run of that flow already in progress.

Thank you!
And how to translate this solution to prefect >= 2.0?

1 Like

In Prefect 2.0, it’s much easier - you can run it as a continuous service, as shown in this post:

Thank you Anna!

I read the medium post attentively, but somehow I could not find the answer to the question : how to prevent multiple instances of the same flow from running at the same time, i.e allow only one instance of the flow running at a time?

with the approach in this post, you are running only one flow run of this flow at all times

an alternative option would be to e.g. create a deployment with work queue concurrency limit of 1 and schedule it to run e.g. every 5 seconds, which is effectively the same in effect

I like the first option more, but up to you