13:24:16.962 | Beginning flow run 'friendly-scorpion' for flow 'main-flow'...
13:24:16.962 | Starting task runner `SequentialTaskRunner`...
13:24:17.121 | Beginning subflow run 'apricot-weasel' for flow 'subflow-1'...
13:24:17.121 | Starting task runner `SequentialTaskRunner`...
13:24:17.151 | Beginning subflow run 'spotted-coot' for flow 'subflow-4'...
13:24:17.151 | Starting task runner `SequentialTaskRunner`...
Subflow 1 started!
Subflow 4 started!
13:24:17.355 | Beginning subflow run 'masked-jackrabbit' for flow 'subflow-3'...
13:24:17.355 | Starting task runner `SequentialTaskRunner`...
Subflow 3 started!
13:24:17.560 | Beginning subflow run 'ivory-kiwi' for flow 'subflow-2'...
13:24:17.560 | Starting task runner `SequentialTaskRunner`...
Subflow 2 started!
13:24:18.190 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.277 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.314 | Subflow run 'apricot-weasel' finished in state Completed(message=None, type=COMPLETED)
13:24:18.337 | Subflow run 'spotted-coot' finished in state Completed(message=None, type=COMPLETED)
13:24:18.394 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.451 | Subflow run 'masked-jackrabbit' finished in state Completed(message=None, type=COMPLETED)
13:24:18.599 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.653 | Subflow run 'ivory-kiwi' finished in state Completed(message=None, type=COMPLETED)
13:24:18.655 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.677 | Flow run 'friendly-scorpion' finished in state Completed(message='All states completed.', type=COMPLETED)
Weâll likely add an interface for running subflows in parallel without using async in the future.
Prefect 1.0
To create multiple parallel subflow runs in Prefect 1.0, you would need to leverage mapping. Here is an example mapping over three subflows from the same project:
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor
with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
mapped_flow_run_ids = create_flow_run.map(
flow_name=["flow_name_1", "flow_name_2", "flow_name_3"],
project_name=unmapped("your_project_name"),
)
Hi! I would like to ask if there are any new best practices on how to run subflows in parallel in Prefect 2.0. I think a good implementation would be to support .submit() of flows to a task_runner like Dask
import time
import requests
from prefect import get_run_logger, task
from prefect.deployments import run_deployment
@task
def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000):
logger = get_run_logger()
api_key = "XX"
api_url = "xx"
while True:
flow_run = request_prefect_api(path=f"/flow_runs/{flow_run_id}", api_key=api_key, api_url=api_url)
logger.info(f"{flow_run['name']} : {flow_run['state_type']}")
state = flow_run["state_type"]
if state in ["COMPLETED", "CANCELLED"]:
break
if state == "FAILED":
raise Exception(f"Deployment: {flow_run['name']}, Flow run failed: {flow_run['state']['message']}")
if flow_run["total_run_time"] > max_run_time:
raise Exception(f"Flow run exceeded max run time of {max_run_time} seconds: marking as failed")
time.sleep(log_polling_interval)
def request_prefect_api(path: str, api_key: str, api_url: str):
url = f"{api_url}{path}"
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
return requests.get(url, headers=headers).json()
# now we can do
run_ids = []
for _ in range(1, 100):
run_ids.append(run_deployment(name="flow/deployment", timeout=0).id)
[wait_for_deployment(r) for r in run_ids]
I am following the prefect 1.0 solution and got the following errors
âSpecify a flow name to runâ
âFound multiple flows at âbatch_feature/create_batch_flow.pyââ
the code loops over a bunch of config files under same folder to create multiple flows.
any clue why it doesnât work?