opened 03:03PM - 02 Nov 21 UTC
closed 03:03PM - 02 Nov 21 UTC
enhancement
Seen from the community, this is a code snippet that will exit Flows if there ar…e already copies of the same flow running. It will cancel extra flow runs.
```python
import datetime
from datetime import timedelta, datetime as dt
import json
import os
import gc
from typing import Optional
import pytz
import requests
import pandas as pd
import prefect
from prefect import task, Flow
from prefect.engine.state import Failed
from prefect.utilities.notifications import slack_notifier
from prefect.engine.signals import SKIP
from prefect.engine.state import Cancelled, State
from prefect.client import Client
def concurrent_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State]:
if old_state.is_pending() and new_state.is_running():
client = Client()
now = dt.now(tz=pytz.UTC).replace(microsecond=0) + timedelta(seconds=1)
# Replacing microseconds because graphql api cant always handle the number of decimals
result = client.graphql(
"""{
flow(where: {
archived: {_eq: false},
name: {_eq: "%s"}
}) {
name
archived
flow_runs (where: {
state: {_in: ["Submitted", "Queued", "Scheduled", "Retrying", "Running"]},
scheduled_start_time:{_lte: "%s"}
}) {
scheduled_start_time
start_time
name
state
id
}
}
}"""
% (flow.name, now.isoformat()) # Sorry for % operator, but those {} make it a pain
)
# These flow runs will be everything thats scheduled to start in the past and
# might have built up.
logger = prefect.context.get("logger")
# This might fail if the GraphQL cant find anything, but havent seen this in practise
flow_runs = result["data"]["flow"][0]["flow_runs"]
# I dont want to run another task if theres already more than one flow running
# For me, Im happy to have two running at once, as API issues means we can get timeouts and
# hangs that dont terminate easily. For other use cases, Id generally say to cancel if theres
# any running
num_running = sum([1 if f["state"] in ("Running", "Retrying") else 0 for f in flow_runs])
if num_running > 1:
msg = "Existing tasks are already running"
logger.info(msg)
return Cancelled(msg)
# And if there are multiple scheduled, only the latest one should be run
scheduled = [
f for f in flow_runs if f["state"] in ("Pending", "Scheduled", "Queued", "Submitted")
]
if len(scheduled) > 1:
last_scheduled_time = max(
[dt.fromisoformat(f["scheduled_start_time"]) for f in scheduled]
)
this_flow_run_id = prefect.context.get("flow_run_id")
matching_runs = [f for f in scheduled if f["id"] == this_flow_run_id]
if not matching_runs:
logger.info(f"Current id is {this_flow_run_id}")
logger.info(f"Flow runs are: {scheduled}")
return Cancelled("Nope")
this_run = matching_runs[0]
this_run_time = dt.fromisoformat(this_run["scheduled_start_time"])
if this_run_time != last_scheduled_time:
msg = "Multiple scheduled tasks, this is not the last one"
logger.info(msg)
return Cancelled(msg)
return new_state
```