Pausing flows/tasks for a specific amount of time (non-blocking)

Hi, we are migrating from Prefect v1 to Orion. One of our core use cases is an ability to pause a task for a specific amount of time to wait for an external condition to be met e.g. wait for a Jira ticket to be resolved, or wait for a PR to be merged. In Prefect v1 it is as simple as raising a signal:

           start_time = datetime.utcnow() + timedelta(
                minutes=10
            )
            raise signals.PAUSE(start_time=start_time)

From the performance and reliability considerations this should be a non-blocking pause.

With the recent changes in Orion that added pausing and manually resuming flows (#7738 and #7863), we wonder could similar behaviour to Prefect v1 be achieved or is expected to be implemented in the near future?

The problem we see is that in Orion we could pause only the ‘main flow’ with the reschedule option, but we would need to have the ability to pause particulars tasks or subflows and synchronize it with the ‘main’ flow.

both blocking and non-blocking should work with the latest version.

pause/resume is on a flow run level, so you could call it from a task if you want to and you could implement multiple pauses in the same flow

I encourage you to give it a try, all what you described is already possible, you can call the pause_flow_run utility from anywhere in your code that needs pause/resume

Thanks for your quick response. We have tried implementing the behaviour in the code also looked carefully at the Prefect codebase and we believe that the use case we describe, i.e. ability to pause and reschedule flows based on external conditions – something that was very easy to achieve in Prefect v1 – is not possible in Orion. We see two problems:

  1. You can only use non-blocking pause in a main flow. When you try to execute:
pause_flow_run(timeout=600, reschedule=True)

in a subflow or a task it will fail with:

RuntimeError: Flow run cannot be paused: Cannot pause subflows with the reschedule option.

So there is no way to pause a particular task or subflow and notify a main flow that is should be paused and rescheduled also.

  1. The second problem is that the current pausing mechanism is very simple. It can only pause/resume flow in a particular place or at the beginning of a next task to be orchestrated (NotReady status). There is no way to conditionally pause/reschedule current task, so it can reschedule itself until an external condition is met. In Prefect v1 it was done like this:
pull = repo.get_pull(number)
if pull.state != "closed":
    start_time = datetime.utcnow() + timedelta(
        minutes=10
    )
    raise signals.PAUSE(start_time=start_time)
else:
    return "CLOSED"

I believe we could add that if you would open a feature request. Could you do that by opening a GitHub issue? Thanks for flagging, I didn’t know that. Until then, you could design your flow so that the pause is triggered when the task or subflow finishes

Why not? Have you tried implementing that using if-else statements? This is very doable

Btw the use case you describe is Event-Driven Trigger-Action paradigm which we now support via Automations in Cloud 2, currently released in beta. Soon you’ll be able to react to that external event in a first class way using Automations

alternative would be triggering the flow via AWS lambda for which we have a recipe here annageller.medium.com

I should be more precise. What we are trying to achieve is rescheduling a flow/task every X minutes to check for an external condition: as long as the condition is not met reschedule again, otherwise continue (mark the flow/task as succeeded). E.g. wait for the Jira ticket to be resolved. In Prefect v1 it was a matter of raising a signal to reschedule a task. In Orion using the pause_flow_run utility we could do something like this in a main flow:

    while True:
        ready = isReady()
        if not ready:
            pause_flow_run(reschedule=True)
        else:
            break

where isReady is an invocation of a task. The problem with this approach is that it would create a new tusk run for each iteration, so it does not seem right when it comes to performance considerations and also from the architectural point of view.

1 Like

I’d encourage you to not look as much in terms of orchestration features but more in terms of writing just python, I’d guess if you only had python at your disposal, you would schedule a cron script every minute and if this condition is not met you would simply do sys.exit() - that’s also totally feasible with Prefect - you can write it in pure python, schedule it every minute and you’ll get a simple solution which is not too tied to any orchestrator (essentially just python + using prefect to only schedule/operationalize/observe it)