Is it possible to skip a new scheduled flow run if a previous flow run of the same flow is still in progress?

Imagine that you have a workflow scheduled to run hourly. But sometimes this flow may take longer than an hour and you want to skip the next scheduled flow run if the previous flow run is still in progress.

In order to do that, you can attach the following flow-level state handler:

import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.engine.state import Skipped
import time


@task(log_stdout=True)
def hello_world():
    print("Sleeping...")
    time.sleep(360)


def skip_if_running_handler(obj, old_state, new_state):
    if new_state.is_running():
        client = Client()
        query = """
            query($flow_id: uuid) {
              flow_run(
                where: {_and: [{flow_id: {_eq: $flow_id}},
                {state: {_eq: "Running"}}]}
                limit: 1
                offset: 1
              ) {
                name
                state
                start_time
              }
            }
        """
        response = client.graphql(
            query=query, variables=dict(flow_id=prefect.context.flow_id)
        )
        active_flow_runs = response["data"]["flow_run"]
        if active_flow_runs:
            logger = prefect.context.get("logger")
            message = "Skipping this flow run since there are already some flow runs in progress"
            logger.info(message)
            return Skipped(message)
    return new_state


with Flow("skip_if_running", state_handlers=[skip_if_running_handler]) as flow:
    hello_task = hello_world()

The above solution will work for all Prefect < 2.0 flows, regardless of whether you use Prefect Cloud or Prefect Server. It will mark the flow run as Skipped if there is at least one flow run of that flow already in progress.

1 Like

Thank you!
And how to translate this solution to prefect >= 2.0?

2 Likes

In Prefect 2.0, it’s much easier - you can run it as a continuous service, as shown in this post:

Thank you Anna!

I read the medium post attentively, but somehow I could not find the answer to the question : how to prevent multiple instances of the same flow from running at the same time, i.e allow only one instance of the flow running at a time?

with the approach in this post, you are running only one flow run of this flow at all times

an alternative option would be to e.g. create a deployment with work queue concurrency limit of 1 and schedule it to run e.g. every 5 seconds, which is effectively the same in effect

I like the first option more, but up to you

@anna_geller Is it possible for you to send that article over (I do not have a medium account). I’m also trying to come up with a way to skip a scheduled flow run if the previous run is still running. Thank you

Did you ever figure this out?
I don’t want a forever running flow, but simply skip the scheduled run if there is one already running.

I did figure it out but it was a long time ago and since then I moved away from prefect :man_shrugging:

@Sahil_Thapar I found this on the Prefect slack channels, you should be able to adjust this to your needs. A function can obtain the current flowrun context and call the prefect server to find if a different instance of the same flow is running. Can also see if the parameters are the same. If so it cancels the current flow.

async def check_if_current_flow_is_running_in_prefect() -> bool:
    """Asynchronously determines if there is another instance of the current Prefect flow run with the same parameters.

    This function checks the current flow run against other running flows with the same name.
    It returns True if another flow run with the same parameters is found, otherwise False.

    Returns:
        bool: True if another instance with the same parameters is running, False otherwise.

    """
    flow_run_name = flow_run.flow_name
    flow_run_id = flow_run.id
    flow_params = flow_run.parameters
    print(f"Checking if {flow_run_name} is running")
    flow_runs_list = await get_prefect_flow_runs_running_by_flow_name(
        flow_name=flow_run_name
    )

    for flow_run_item in flow_runs_list:
        print(flow_run_item)
        if flow_run_item["id"] != flow_run_id and flow_params_match(
            flow_run_item["parameters"], flow_params
        ):
            return True

    return False


async def get_prefect_flow_runs_running_by_flow_name(flow_name: str) -> list[dict]:
    """This function returns a list of all running flow runs for a given flow name in Prefect Cloud.

    Args:
        flow_name (str): The name of the flow to get running flow runs for.

    Returns:
        list[dict]: A list of dictionaries representing the running flow runs for the given flow name.
    """
    url = f"{PREFECT_API_URL}/flow_runs/filter"

    headers = {
        "Authorization": f"Bearer {PREFECT_API_KEY}",
        "Content-Type": "application/json",
    }
    payload = {
        "flow_runs": {
            "operator": "and_",
            "state": {
                "operator": "and_",
                "type": {"any_": ["RUNNING"]},
            },
        },
        "flows": {
            "name": {"any_": [flow_name]},
        },
    }
    async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
        response = await client.post(url, headers=headers, json=payload)
        return response.json()


def flow_params_match(
    flow_run_item_params: dict[str, Any], flow_params: dict[str, Any]
) -> bool:
    """Checks if the parameters of the flow run item match the given flow parameters.

    Args:
        flow_run_item_params (dict[str, Any]): Parameters of a flow run item.
        flow_params (dict[str, Any]): Parameters of the current flow.

    Returns:
        bool: True if parameters match, False otherwise.
    """
    for key, value in flow_params.items():
        if key in flow_run_item_params:
            # Handling for lists containing Enums or other special cases
            if isinstance(value, list):
                new_list = [
                    item.value if isinstance(item, Enum) else item for item in value
                ]
                value = new_list  # noqa PLW2901

            # Handling for individual Enum values
            if isinstance(value, Enum):
                value = value.value  # noqa PLW2901

            if flow_run_item_params[key] != value:
                return False
    return True

-- and then this runs at the beginning of the flow
  flow_check: bool = await check_if_current_flow_is_running_in_prefect()
        if flow_check:
            logger.info("The flow is cancelled")
            return Cancelled(message="The flow is cancelled")