Imagine that you have a workflow scheduled to run hourly. But sometimes this flow may take longer than an hour and you want to skip the next scheduled flow run if the previous flow run is still in progress.
In order to do that, you can attach the following flow-level state handler:
import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.engine.state import Skipped
import time
@task(log_stdout=True)
def hello_world():
print("Sleeping...")
time.sleep(360)
def skip_if_running_handler(obj, old_state, new_state):
if new_state.is_running():
client = Client()
query = """
query($flow_id: uuid) {
flow_run(
where: {_and: [{flow_id: {_eq: $flow_id}},
{state: {_eq: "Running"}}]}
limit: 1
offset: 1
) {
name
state
start_time
}
}
"""
response = client.graphql(
query=query, variables=dict(flow_id=prefect.context.flow_id)
)
active_flow_runs = response["data"]["flow_run"]
if active_flow_runs:
logger = prefect.context.get("logger")
message = "Skipping this flow run since there are already some flow runs in progress"
logger.info(message)
return Skipped(message)
return new_state
with Flow("skip_if_running", state_handlers=[skip_if_running_handler]) as flow:
hello_task = hello_world()
The above solution will work for all Prefect < 2.0 flows, regardless of whether you use Prefect Cloud or Prefect Server. It will mark the flow run as Skipped if there is at least one flow run of that flow already in progress.
I read the medium post attentively, but somehow I could not find the answer to the question : how to prevent multiple instances of the same flow from running at the same time, i.e allow only one instance of the flow running at a time?
an alternative option would be to e.g. create a deployment with work queue concurrency limit of 1 and schedule it to run e.g. every 5 seconds, which is effectively the same in effect
@anna_geller Is it possible for you to send that article over (I do not have a medium account). I’m also trying to come up with a way to skip a scheduled flow run if the previous run is still running. Thank you
@Sahil_Thapar I found this on the Prefect slack channels, you should be able to adjust this to your needs. A function can obtain the current flowrun context and call the prefect server to find if a different instance of the same flow is running. Can also see if the parameters are the same. If so it cancels the current flow.
async def check_if_current_flow_is_running_in_prefect() -> bool:
"""Asynchronously determines if there is another instance of the current Prefect flow run with the same parameters.
This function checks the current flow run against other running flows with the same name.
It returns True if another flow run with the same parameters is found, otherwise False.
Returns:
bool: True if another instance with the same parameters is running, False otherwise.
"""
flow_run_name = flow_run.flow_name
flow_run_id = flow_run.id
flow_params = flow_run.parameters
print(f"Checking if {flow_run_name} is running")
flow_runs_list = await get_prefect_flow_runs_running_by_flow_name(
flow_name=flow_run_name
)
for flow_run_item in flow_runs_list:
print(flow_run_item)
if flow_run_item["id"] != flow_run_id and flow_params_match(
flow_run_item["parameters"], flow_params
):
return True
return False
async def get_prefect_flow_runs_running_by_flow_name(flow_name: str) -> list[dict]:
"""This function returns a list of all running flow runs for a given flow name in Prefect Cloud.
Args:
flow_name (str): The name of the flow to get running flow runs for.
Returns:
list[dict]: A list of dictionaries representing the running flow runs for the given flow name.
"""
url = f"{PREFECT_API_URL}/flow_runs/filter"
headers = {
"Authorization": f"Bearer {PREFECT_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"flow_runs": {
"operator": "and_",
"state": {
"operator": "and_",
"type": {"any_": ["RUNNING"]},
},
},
"flows": {
"name": {"any_": [flow_name]},
},
}
async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
response = await client.post(url, headers=headers, json=payload)
return response.json()
def flow_params_match(
flow_run_item_params: dict[str, Any], flow_params: dict[str, Any]
) -> bool:
"""Checks if the parameters of the flow run item match the given flow parameters.
Args:
flow_run_item_params (dict[str, Any]): Parameters of a flow run item.
flow_params (dict[str, Any]): Parameters of the current flow.
Returns:
bool: True if parameters match, False otherwise.
"""
for key, value in flow_params.items():
if key in flow_run_item_params:
# Handling for lists containing Enums or other special cases
if isinstance(value, list):
new_list = [
item.value if isinstance(item, Enum) else item for item in value
]
value = new_list # noqa PLW2901
# Handling for individual Enum values
if isinstance(value, Enum):
value = value.value # noqa PLW2901
if flow_run_item_params[key] != value:
return False
return True
-- and then this runs at the beginning of the flow
flow_check: bool = await check_if_current_flow_is_running_in_prefect()
if flow_check:
logger.info("The flow is cancelled")
return Cancelled(message="The flow is cancelled")