We use our Web Application Server to call the API which is running in the same server where Prefect is running . It is a Java code. For testing we use Postman.
You can check the source code how it’s done there to read the deployment info before making the API call to create a flow run:
@deployment_app.command()
async def run(name: str):
"""
Create a flow run for the given flow and deployment.
The flow run will be scheduled for now and an agent must execute it.
The flow run will not execute until an agent starts.
"""
async with get_client() as client:
try:
deployment = await client.read_deployment_by_name(name)
except ObjectNotFound:
exit_with_error(f"Deployment {name!r} not found!")
flow_run = await client.create_flow_run_from_deployment(deployment.id)
console.print(f"Created flow run {flow_run.name!r} ({flow_run.id})")
So far, I’ve tried to keep our scientific/data code independent from infrastructure.
By using the child_flow task (with create_flow_run_from_deployment) from above, a Prefect server running is required. It is not possible to run that same parent flow and child flow locally when needed. We would prefer having the ability to run from deployment, but keeping the decision outside of the flows.
We could switch between create_flow_run_from_deployment and directly executing e.g. marvin_flow() depending on whether we can detect a server with a matching deployment available. But then there is also that flows cannot be run from within tasks, one can only call a flow in a flow.
Our current work-around is to have a helper function that either calls the flow directly or submits a task that runs the corresponding deployment. This leads to quite some amount of wrapper code around Prefect functionality.
complete code
import asyncio
from prefect import Flow, State, context, get_client, get_run_logger, task
from prefect.context import FlowRunContext
from prefect.deployments import run_deployment
from prefect.exceptions import ObjectNotFound
def is_run_from_deployment() -> bool:
"""
Returns True if the current flow/task is run from a deployment on Prefect server, False if local.
"""
run_context = context.get_run_context()
if isinstance(run_context, FlowRunContext):
flow_run = run_context.flow_run
else: # TaskRunContext
flow_run = asyncio.run(
run_context.client.read_flow_run(run_context.task_run.flow_run_id)
)
return flow_run.deployment_id is not None
@task
async def run_subflow_from_deployment(
flow_name: str, parameters: dict, deployment_name: str
) -> State:
flow_run = await run_deployment(
name=f"{flow_name}/{deployment_name}", parameters=parameters
)
return flow_run.state
async def run_subflow(subflow: Flow, parameters: dict, deployment_name: str) -> State:
logger = get_run_logger()
if is_run_from_deployment():
try:
async with get_client() as client:
existing_deployment = await client.read_deployment_by_name(
name=f"{subflow.name}/{deployment_name}"
)
if existing_deployment is not None:
logger.info("run_subflow from deployment")
future = await run_subflow_from_deployment.submit(
flow_name=subflow.name,
parameters=parameters,
deployment_name=deployment_name,
)
result = await future.result()
return result
except ObjectNotFound:
pass
# Otherwise, run the subflow locally.
logger.info("run_subflow locally")
if subflow.isasync:
return await subflow(**parameters, return_state=True)
else:
return subflow(**parameters, return_state=True)