How to create a flow run from deployment (orchestrator pattern)?

Currently, there is no special task for that. You would need to

  • retrieve the deployment ID corresponding to the the flow you try to trigger, e.g. using prefect deployment ls

  • use Orion client as follows:

import asyncio
from prefect.client import get_client


async def main():
    async with get_client() as client:
        depl_id = "074db2e5-229a-460e-85ad-fca31b379fd2"
        response = await client.create_flow_run_from_deployment(depl_id)
        print(response)


if __name__ == "__main__":
    asyncio.run(main())

Incorporating it into your flow

You can use the logic from above within a task and call this task from your flow:

import asyncio
from prefect.client import get_client
from prefect import flow, task, get_run_logger


@task
async def child_flow():
    async with get_client() as client:
        depl_id = "71334806-93ff-4bdc-ba98-fde1f97d5622"  # marvin flow
        response = await client.create_flow_run_from_deployment(depl_id)
        logger = get_run_logger()
        logger.info(response)


@flow
async def parent_flow():
    await child_flow()


if __name__ == "__main__":
    asyncio.run(parent_flow())

You should then see the parent and child flow run in the UI:

2 Likes

Thanks for sharing this info. How to force the REST API to use the deployment_spec in the .py file?

Can you explain the problem you are trying to solve?

You can trigger a flow run by:

  • assigning a schedule on the DeploymentSpec,
  • via an API call trigger from the Orion client as shown here in this post,
  • or via a POST request directly as described here:

Prefect REST API - Prefect 2.0. API Name is “Create Deployment” and the payload is

{
“name”: “Flex_Report_UI_Params2_Deployment_2022_04_27”,
“flow_id”: “cdd66dcf-a83c-40a0-a50a-351217749689”,
“flow_data”: {
“blob”: “{“data”: “\”//localpart0/aop-shared/WAVES/workflows/Flex_Report_UI_Params2_v1-2022-04-27–15-23-41.py\”", “block_id”: null}",
“encoding”: “blockstorage”
}
}

Prefect 2.0 REST API, in this case, Create Deployment API. The payload is
{
“name”: “Flex_Report_UI_Params2_Deployment_2022_04_27”,
“flow_id”: “cdd66dcf-a83c-40a0-a50a-351217749689”,
“flow_data”: {
“blob”: “{“data”: “\”//localpart0/aop-shared/WAVES/workflows/Flex_Report_UI_Params2_v1-2022-04-27–15-23-41.py\”", “block_id”: null}",
“encoding”: “blockstorage”
}
}

how do you plan to execute this API call - from Python?

We use our Web Application Server to call the API which is running in the same server where Prefect is running . It is a Java code. For testing we use Postman.

if it’s triggered from the same server, perhaps it’s then easier to trigger it via a subprocess call? the command to do it:

prefect deployment run flow_name/deployment_name

doing it via REST can be tricky to pass the flow_data blob correctly

You can check the source code how it’s done there to read the deployment info before making the API call to create a flow run:

@deployment_app.command()
async def run(name: str):
    """
    Create a flow run for the given flow and deployment.

    The flow run will be scheduled for now and an agent must execute it.

    The flow run will not execute until an agent starts.
    """
    async with get_client() as client:
        try:
            deployment = await client.read_deployment_by_name(name)
        except ObjectNotFound:
            exit_with_error(f"Deployment {name!r} not found!")
        flow_run = await client.create_flow_run_from_deployment(deployment.id)

    console.print(f"Created flow run {flow_run.name!r} ({flow_run.id})")

So far, I’ve tried to keep our scientific/data code independent from infrastructure.

By using the child_flow task (with create_flow_run_from_deployment) from above, a Prefect server running is required. It is not possible to run that same parent flow and child flow locally when needed. We would prefer having the ability to run from deployment, but keeping the decision outside of the flows.

We could switch between create_flow_run_from_deployment and directly executing e.g. marvin_flow() depending on whether we can detect a server with a matching deployment available. But then there is also that flows cannot be run from within tasks, one can only call a flow in a flow.

Our current work-around is to have a helper function that either calls the flow directly or submits a task that runs the corresponding deployment. This leads to quite some amount of wrapper code around Prefect functionality.

complete code
import asyncio

from prefect import Flow, State, context, get_client, get_run_logger, task
from prefect.context import FlowRunContext
from prefect.deployments import run_deployment
from prefect.exceptions import ObjectNotFound


def is_run_from_deployment() -> bool:
    """
    Returns True if the current flow/task is run from a deployment on Prefect server, False if local.
    """
    run_context = context.get_run_context()
    if isinstance(run_context, FlowRunContext):
        flow_run = run_context.flow_run
    else:  # TaskRunContext
        flow_run = asyncio.run(
            run_context.client.read_flow_run(run_context.task_run.flow_run_id)
        )
    return flow_run.deployment_id is not None


@task
async def run_subflow_from_deployment(
    flow_name: str, parameters: dict, deployment_name: str
) -> State:
    flow_run = await run_deployment(
        name=f"{flow_name}/{deployment_name}", parameters=parameters
    )
    return flow_run.state
async def run_subflow(subflow: Flow, parameters: dict, deployment_name: str) -> State:
    logger = get_run_logger()
    if is_run_from_deployment():
        try:
            async with get_client() as client:
                existing_deployment = await client.read_deployment_by_name(
                    name=f"{subflow.name}/{deployment_name}"
                )
            if existing_deployment is not None:
                logger.info("run_subflow from deployment")
                future = await run_subflow_from_deployment.submit(
                    flow_name=subflow.name,
                    parameters=parameters,
                    deployment_name=deployment_name,
                )
                result = await future.result()
                return result
        except ObjectNotFound:
            pass
    # Otherwise, run the subflow locally.
    logger.info("run_subflow locally")
    if subflow.isasync:
        return await subflow(**parameters, return_state=True)
    else:
        return subflow(**parameters, return_state=True)

Is there a better way?

Have you tried run_deployment() function?

Yes, in fact my code uses that API (which internally calls create_flow_run_from_deployment), I just had not updated the text.