Implementing waiting for approval in Prefect 2

In certain scenarios, you may want to continue processing your flow only once you’ve checked the status of previous processing.

Concrete scenarios

This typically involves some semi-manual processes that involve both some automation and some human component, e.g.:

  1. Run automated data extraction, transformation, and processing; then automatically inform some stakeholders about the status and let them know this data/process is ready for a manual quality check; once this person manually gives their approval, the flow run can continue.

  2. Run automated data preprocessing and feature extraction and send an alert to a data scientist that this process has been completed; once this person approves, the flow run can continue with downstream tasks that may, e.g., train an ML model with that preprocessed and manually validated data and extracted features.


We plan to support pausing and resuming workflows, as well as tasks that require manual approval as a first-class feature. You will be able to specify that as part of your workflow definition.

:bulb: If you’re curious how this will (likely) work - Prefect will check an orchestration rule indicating whether the flow or task run is allowed to enter a Running state - 1) if not, it will wait for a certain amount of time and set the state to e.g. Paused/AwaitingApproval (just an example), 2) if yes, it will start running.

Temporary workaround solution

:warning: Note that the example below is just a temporary workaround until we have a first-class way to handle this type of use case.

First, create a JSON block.

Create a Block to store information for each process requiring approval

You can create a block either from the UI or from code, e.g., using code as shown here:

from prefect.blocks.system import JSON

block = JSON(
)"approval", overwrite=True)

Write your flow with business logic that requires manual approval

Here, we show a very basic flow just to demonstrate the pattern - adjust it to your needs.

The flow below does the following:

  1. Starts some initial processing
  2. Sends a Slack alert notifying about its completion with a link to approve the status by editing the approval flag on the Block
  3. Runs a while loop that polls for the current state of the approval flag set on the Block. The loop ends as soon as the approval flag changes to True.
  4. Once approved, continues processing of some critical part of the process that could run without prior approval.
  5. Finally, the flow sets the approval flag back to False so that subsequent runs can start from a clean slate.
from prefect import task, flow
from prefect import get_run_logger
from prefect.blocks.system import JSON
from prefect.blocks.notifications import SlackWebhook
import time

def send_alert(message: str):
    slack_webhook_block = SlackWebhook.load("hq")

def process_approved(process: str, block_name: str):
    block_value = JSON.load(block_name).value
    return block_value[process]

def set_approval_flag_back_to_false(process: str, block_name: str):
    block_value = JSON.load(block_name).value
    block_value[process] = False
    block = JSON(value=block_value), overwrite=True)

def run_initial_processing():
    logger = get_run_logger()"Processing something important 🤖")"Calculating the answer to life, the universe, and everything...")

def run_something_critical():
    logger = get_run_logger()"Got approval to reveal the answer to life, the universe, and everything!")"The answer is... 42!")

def semi_manual_process(
    process: str = "process_1",
    block_name: str = "approval",
    poll_for_approval_every_sec: int = 5,
) -> None:
    logger = get_run_logger()
    url = ""
    send_alert(f"{process} finished. Please approve to continue processing: {url}")
    while True:
        if process_approved(process, block_name):
  "Process got approved! 🎉 Moving on to the next task")
  "Waiting for approval...")
    run_something_critical()  # post-processing, ML training process, reporting on KPIs
    set_approval_flag_back_to_false(process, block_name)

if __name__ == "__main__":

Got it. I will wait until it is released as a feature and I am happy to here that it will be supported similar to 1.0. While I see how this could work (we did something similar with KV store in 1.0 before), it is ultimately pretty inefficient for when you have long periods of pausing. You would be paying for compute for the duration of the pause until the block was updated. In 1.0 running on ECS Fargate the ECS task shuts down and deprovisions and then a new ECS task runs when unpaused… this means you aren’t using any compute during long periods of pause. In our use case we have several instances where things pause for several hours.

1 Like

In that case, it’s probably better not to wait but instead leverage the orchestrator pattern in combination with e.g. blocks (similarly to how this is shown here but using separate flow run processes rather than tasks):

1 Like

I’m not sure I completely follow the orchestrator pattern concept. Is there more docs or examples I can reference.

I am not sure this is going to be a solution for us. While I appreciate that it might work from a technical level I think from a business use case it will create too much noise. We use this pause/resume flow structure for hundreds of flows and it provides the ability for us to encapsulate those complete pipeline processes that involve multiple outside services to a single view within Prefect. When we break down into smaller pieces that would be multiple flows and runs we lose that ability to have a complete view of a pipeline in a single place. Given the number of flows involved this sounds like it could be increasingly messy and hard to manage.

I get you, we need more examples on that for sure, let’s see how we can prioritize that

for now, definitely the pattern above seems to be sth worth considering (or using the same pattern but with the pause/resume info being stored in sth like Redis or some database)

1 Like

Just as feedback, I’ve implemented this workaround for pausing a workflow that is being triggered from a REST API endpoint where spatial data files are being uploaded. It is working nicely so far! I don’t have a public repository yet but these are the most relevant parts:

# Prefect flow deployed

# main prefect flow
@flow(name="Data loading workflow")
def load_data_workflow_from_minio(path_to_wf: str):
    logger = get_run_logger()
    wf_state = get_workflow_state(workflow_id) # flow to read workflow status from an external table"Workflow {workflow_id} is {wf_state}")
    if wf_state == "RUNNING":
        # update external workflow table status 
        state = update_workflow_state(
            workflow_id, "WAITING APPROVAL"
        raise Exception("Workflow state is not RUNNING")
    wait_for_approval(workflow_id, 10)
    # one json block for each workflow being processed
    flag = JSON.load(f"approval-{workflow_id}").value["flag"]"Workflow approval is {flag}")
    # other subflows/tasks in the flow

# waiting for approval flow
@flow(name="Waiting for approval")
def wait_for_approval(
    wf_id: str,
    poll_for_approval_every_sec: int = 5
) -> None:
    logger = get_run_logger()

    block_name = f"approval-{wf_id}"
    while True:
        if workflow_approved(block_name):
  "Process got approved! 🎉 Moving on to the next steps")
  "Waiting for approval...")

# task to create the json block with the flag for pausing
def create_approval_step(wf_id) -> None:
    block = JSON(value=dict(flag=False))"approval-{wf_id}", overwrite=True)

# REST API section
# run the workflow deployment
from prefect.deployments import run_deployment
            # last line of code before returning an HTTP 200 response in the upload endpoint
            await run_deployment(
                name='Data loading workflow/Data loading workflow',

# change the flag in the json block from a PATCH endpoint on the external workflow table
from prefect.blocks.system import JSON

                if workflow.approved:
                    block_name = f"approval-{str(workflow.uuid)}"
                    block = await JSON.load(block_name)
                    block_value = block.value
                    block_value["flag"] = workflow.approved
                    block = JSON(value=block_value)
                    await, overwrite=True)
                return ServiceResult(workflow)

Hopefully, I can share the whole code in a public repo very soon

Awesome! Thank you so much for sharing, can’t wait to see the full repo :rocket:

there is an open issue about that here