Pausing Flow Group Schedule with GraphQL

I’ve created a CLI that interacts with the Prefect graphql to identify available tags in our environment, then using an individual tag, disable or enable all flows that use it. The primary motivation here is so that our DBAs can target flows that use a specific environment in any associated tasks if there is going to be extended downtime. I’m currently using set_schedule_active/inactive to do this.

Additionally, because I don’t want to accidentally enable Flow Groups that were intentionally disabled for other reasons, I add and remove a label named “MAINTENANCE” (added when disabling; filtered on when enabling).

This works well, and as expected. The only drawback is that it removes any and all scheduled processes for a Flow Group until it’s re-enabled. Much of the time this is ok, or even preferred.

However, I’d also like to be able to simply pause individual flows, so that if a scheduled run time is past while it’s paused it will catch up with missed runs when the pause is removed.

I found action_config_imput and pause_schedule_action_config, but I can’t figure out how to execute a mutation that utilizes them.

Specifically, what I’m looking to do through graphql is use a flow group UUID to pause an individual flow group, and then later unpause that group (I already can successfully identify the uuid I want to use through existing code). Is this possible?

3 Likes

An interesting use case! If I understood the problem correctly: you want to pause the schedule for some flows when your DBAs are performing database maintenance, correct?

I think you’re spot on by using set_schedule_active/inactive mutations for that.

I would approach it with a separate flow that takes the flow names that need to be paused as Parameter and then you could use a mapped task to pause all those flows.

In a similar fashion, you can have either a separate flow for activating the flow again or even better, if you know in advance how long the maintenance will take, you may even schedule the schedule’s reactivation in the same flow.

An example will explain it better, here is what I mean:

import pendulum
from prefect import Client, unmapped
from prefect import task, Flow, Parameter
from prefect.tasks.prefect import create_flow_run


@task
def pause_schedule(flow_id: str):
    client = Client()
    flow_id_input = {"flow_id": flow_id}
    response = client.graphql(
        """
        mutation($flow_id: UUID!) {
            set_schedule_inactive(input: {flow_id: $flow_id}) {
                success
                error
            }
        }""",
        variables=flow_id_input,
        raise_on_error=True,
    )
    print(response)


@task
def set_schedule_active(flow_id: str):
    client = Client()
    flow_id_input = {"flow_id": flow_id}
    response = client.graphql(
        """
        mutation($flow_id: UUID!) {
            set_schedule_active(input: {flow_id: $flow_id}) {
                success
                error
            }
        }""",
        variables=flow_id_input,
        raise_on_error=True,
    )
    print(response)


with Flow("unpause_after_maintenance") as unpause_flow:
    flows_to_unpause = Parameter(
        "flows_to_unpause", default=["0ee1bc8d-b2fb-4cab-b0f4-60677f16075f"]
    )
    unpaused_flows = set_schedule_active.map(flows_to_unpause)


with Flow("pause_for_maintenance") as pause_flow:
    flows_to_pause = Parameter(
        "flows_to_pause", default=["0ee1bc8d-b2fb-4cab-b0f4-60677f16075f"]
    )
    paused_flows = pause_schedule.map(flows_to_pause)
    # reactivating a schedule
    create_flow_run(
        flow_name="unpause_after_maintenance",
        # set to the end of maintenance window
        scheduled_start_time=unmapped(
            pendulum.DateTime(year=2022, month=2, day=22, hour=22, minute=0)
        ),
        parameters=dict(flows_to_unpause=flows_to_pause),
        upstream_tasks=[paused_flows],
    )

if __name__ == "__main__":
    pause_flow.register("xyz")
    unpause_flow.register("xyz")

Thanks so much for the detailed reply. It’s always clear that you take time to respond well to folks (both here and Slack).

This is interesting idea. Hadn’t thought of building this kind of Prefect maintenance into another flow itself. I’m definitely going to explore that kind of thing. Lots of ideas have come to mind since I read your request a little while back.

I had built a CLI that does the above behavior. It allows a DBA to list out all tags used in Prefect (the majority are referring to system connections), view only flows with that tag, disable flows with that tag, enable flows with that tag.

Example:
python wccprefect.py list >> lists all flows
python wccprefect.py env >> lists all tags in use
python wccprefect.py list server-prod >> lists flows (and status) that use “server-prod” tag
python wccprefect.py update server-prod disable >> disable relevant flows
python wccprefect.py update server-prod enable >> enable relevant flows

What I’m hoping to be possible is this: is it possible to use graphql to pause flows without removing all of their scheduled runs? i.e., a flow is paused, maintenance is performed on a relevant target server, when flow is unpaused it will immediately “catch up” and run any scheduled run that it missed while paused.

It looks like set_schedule_active/inactive turns off the schedule for the flow altogether, so it removes all scheduled runs. If what I’m trying to accomplish isn’t possible, it’s not the end of the world (the general need is still met), but figure that since I made it as far as I did I might as well see if the 100% ideal scenario is possible.

As always, thanks for your time!

Also, related to your suggestion of putting this functionality in a flow to be run from prefect itself…

Does the prefect.engine.signals.PAUSE signal allow data to be input somewhere in the Prefect interface before the flow is manually set to continue? i.e., one task could show all of the tags available, then a DBA could input a string for that tag before the flow continues.

(I used states (fail, success, and enrun mostly), but I’ve not actually interacted with PAUSE up to this point, and I will tinker around with it, but figured if you happen by to answer the previous comment this might be an easy yes/no answer on your part).

Gotcha, thanks for explaining that in more detail. You asked basically two questions, one about the “catchup” and another one about the PAUSE signal - I tried to structure the response, hope it doesn’t seem overwhelming this way :smile:

1. Catchup issue

The problem with “catchup”

The issue with “catchup” is that you would need to manually create “missed” flow runs because Prefect never creates flow runs with the schedule being in the past because:

  • this pattern is generally used by legacy workflow orchestration solutions which are entirely schedule based, while Prefect decouples scheduling from flow run execution, allowing flow runs to be triggered e.g. event-based
  • for most use cases we’ve seen this leads to confusion and hard to debug issues.

This scheduler documentation explains it this way:

“If you pause a schedule, any future auto-scheduled runs that have not started will be deleted. Reactivating the schedule will cause them to be recreated, as long as they are scheduled to start in the future. The scheduler will never create runs that were scheduled to start in the past.”

Backfilling

But why would you need to create flow runs for the past? If you need it for backfilling, you may solve that by manually creating flow runs using this approach:

System-wide pause

If you use Prefect Cloud and you would be open to temporarily pause all work during your maintenance window, the easy solution would be to also hit the system-wide pause button (but this wouldn’t solve the issue of pausing specific flows and the catchup/backfill problem is also unaddressed this way):


Orion!

The good news for you is that in Orion your use case is that much easier! You may simply create a separate work queue for the respective flow tags (exactly what you do!) and then you may pause and resume the work queue when needed.

This documentation page explains it best:

2. PAUSE signal

I think you’re right that the PAUSE-resume pattern may be useful in your use case because it would allow your DBAs to manually resume once they are done with maintenance.

This page explains that more:

Regarding passing some string that can be retrieved in downstream tasks after the approval, you could leverage the KV Store if you are on Prefect Cloud or some other stateful mechanism if you are on Prefect Server (say S3 or Redis?):

Thanks for the quick response. All of the above makes sense.

Regarding run scheduled runs being deleted when pausing – that works for me. Edge cases where waiting for the new scheduled run doesn’t work we can just run them manually after maintenance (or come up with some other clever mechanism for those cases).

Pause signal – Thanks for the feedback. Now I know that it’s worth my time to dig into it more deeply. (and yes, we are on Prefect Cloud).

Thanks!

1 Like