How to create a flow run from deployment (orchestrator pattern)?

Currently, there is no special task for that. You would need to

  • retrieve the deployment ID corresponding to the the flow you try to trigger, e.g. using prefect deployment ls

  • use Orion client as follows:

import asyncio
from prefect.client import get_client


async def main():
    async with get_client() as client:
        depl_id = "074db2e5-229a-460e-85ad-fca31b379fd2"
        response = await client.create_flow_run_from_deployment(depl_id)
        print(response)


if __name__ == "__main__":
    asyncio.run(main())

Incorporating it into your flow

You can use the logic from above within a task and call this task from your flow:

import asyncio
from prefect.client import get_client
from prefect import flow, task, get_run_logger


@task
async def child_flow():
    async with get_client() as client:
        depl_id = "71334806-93ff-4bdc-ba98-fde1f97d5622"  # marvin flow
        response = await client.create_flow_run_from_deployment(depl_id)
        logger = get_run_logger()
        logger.info(response)


@flow
async def parent_flow():
    await child_flow()


if __name__ == "__main__":
    asyncio.run(parent_flow())

You should then see the parent and child flow run in the UI:

Thanks for sharing this info. How to force the REST API to use the deployment_spec in the .py file?

Can you explain the problem you are trying to solve?

You can trigger a flow run by:

  • assigning a schedule on the DeploymentSpec,
  • via an API call trigger from the Orion client as shown here in this post,
  • or via a POST request directly as described here:

Prefect REST API - Prefect 2.0. API Name is “Create Deployment” and the payload is

{
“name”: “Flex_Report_UI_Params2_Deployment_2022_04_27”,
“flow_id”: “cdd66dcf-a83c-40a0-a50a-351217749689”,
“flow_data”: {
“blob”: “{“data”: “\”//localpart0/aop-shared/WAVES/workflows/Flex_Report_UI_Params2_v1-2022-04-27–15-23-41.py\”", “block_id”: null}",
“encoding”: “blockstorage”
}
}

Prefect 2.0 REST API, in this case, Create Deployment API. The payload is
{
“name”: “Flex_Report_UI_Params2_Deployment_2022_04_27”,
“flow_id”: “cdd66dcf-a83c-40a0-a50a-351217749689”,
“flow_data”: {
“blob”: “{“data”: “\”//localpart0/aop-shared/WAVES/workflows/Flex_Report_UI_Params2_v1-2022-04-27–15-23-41.py\”", “block_id”: null}",
“encoding”: “blockstorage”
}
}

how do you plan to execute this API call - from Python?

We use our Web Application Server to call the API which is running in the same server where Prefect is running . It is a Java code. For testing we use Postman.

if it’s triggered from the same server, perhaps it’s then easier to trigger it via a subprocess call? the command to do it:

prefect deployment run flow_name/deployment_name

doing it via REST can be tricky to pass the flow_data blob correctly

You can check the source code how it’s done there to read the deployment info before making the API call to create a flow run:

@deployment_app.command()
async def run(name: str):
    """
    Create a flow run for the given flow and deployment.

    The flow run will be scheduled for now and an agent must execute it.

    The flow run will not execute until an agent starts.
    """
    async with get_client() as client:
        try:
            deployment = await client.read_deployment_by_name(name)
        except ObjectNotFound:
            exit_with_error(f"Deployment {name!r} not found!")
        flow_run = await client.create_flow_run_from_deployment(deployment.id)

    console.print(f"Created flow run {flow_run.name!r} ({flow_run.id})")