I recently encounter a situation with my flows, the agent stop working and generate a long queue of schedules that was marked as late. As soon as the agent got back online, all this late schedules started to run and saturated my environment.
Is it possible to mark this Late Run as obsolete/cancel after a period of time, preventing them from running?
The way I’ve solve this is to provide an argument to the main flow that tells it how much time difference between its scheduled time and current time is allowed to be. If the allowed time difference is exceeded, the flow is automatically cancelled.
import pendulum
from prefect import context, flow
from prefect.states import Cancelled
@flow
def my_flow(
lag_treshold: int = 5
):
# Calculate how much we're behind the time of scheduled run
flow_behind_schedule_in_minutes = (
pendulum.now("UTC").diff(context.get_run_context().flow_run.expected_start_time).in_minutes()
)
# If we are too much behind according to the input treshold, cancel the flow
if flow_behind_schedule_in_minutes > lag_threshold:
return Cancelled(
message=f"The run is too much behind schedule "
f"({flow_behind_schedule_in_minutes} minutes)."
f" Allowed time is within {lag_threshold} minutes. Skipping all tasks."
)
# flow tasks
In the example above, the lag_treshold is in minutes. Change the .in_minutes() to desired time unit’s function.