13:24:16.962 | Beginning flow run 'friendly-scorpion' for flow 'main-flow'...
13:24:16.962 | Starting task runner `SequentialTaskRunner`...
13:24:17.121 | Beginning subflow run 'apricot-weasel' for flow 'subflow-1'...
13:24:17.121 | Starting task runner `SequentialTaskRunner`...
13:24:17.151 | Beginning subflow run 'spotted-coot' for flow 'subflow-4'...
13:24:17.151 | Starting task runner `SequentialTaskRunner`...
Subflow 1 started!
Subflow 4 started!
13:24:17.355 | Beginning subflow run 'masked-jackrabbit' for flow 'subflow-3'...
13:24:17.355 | Starting task runner `SequentialTaskRunner`...
Subflow 3 started!
13:24:17.560 | Beginning subflow run 'ivory-kiwi' for flow 'subflow-2'...
13:24:17.560 | Starting task runner `SequentialTaskRunner`...
Subflow 2 started!
13:24:18.190 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.277 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.314 | Subflow run 'apricot-weasel' finished in state Completed(message=None, type=COMPLETED)
13:24:18.337 | Subflow run 'spotted-coot' finished in state Completed(message=None, type=COMPLETED)
13:24:18.394 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.451 | Subflow run 'masked-jackrabbit' finished in state Completed(message=None, type=COMPLETED)
13:24:18.599 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.653 | Subflow run 'ivory-kiwi' finished in state Completed(message=None, type=COMPLETED)
13:24:18.655 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.677 | Flow run 'friendly-scorpion' finished in state Completed(message='All states completed.', type=COMPLETED)
We’ll likely add an interface for running subflows in parallel without using async in the future.
Prefect 1.0
To create multiple parallel subflow runs in Prefect 1.0, you would need to leverage mapping. Here is an example mapping over three subflows from the same project:
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor
with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
mapped_flow_run_ids = create_flow_run.map(
flow_name=["flow_name_1", "flow_name_2", "flow_name_3"],
project_name=unmapped("your_project_name"),
)
Hi! I would like to ask if there are any new best practices on how to run subflows in parallel in Prefect 2.0. I think a good implementation would be to support .submit() of flows to a task_runner like Dask
import time
import requests
from prefect import get_run_logger, task
from prefect.deployments import run_deployment
@task
def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000):
logger = get_run_logger()
api_key = "XX"
api_url = "xx"
while True:
flow_run = request_prefect_api(path=f"/flow_runs/{flow_run_id}", api_key=api_key, api_url=api_url)
logger.info(f"{flow_run['name']} : {flow_run['state_type']}")
state = flow_run["state_type"]
if state in ["COMPLETED", "CANCELLED"]:
break
if state == "FAILED":
raise Exception(f"Deployment: {flow_run['name']}, Flow run failed: {flow_run['state']['message']}")
if flow_run["total_run_time"] > max_run_time:
raise Exception(f"Flow run exceeded max run time of {max_run_time} seconds: marking as failed")
time.sleep(log_polling_interval)
def request_prefect_api(path: str, api_key: str, api_url: str):
url = f"{api_url}{path}"
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
return requests.get(url, headers=headers).json()
# now we can do
run_ids = []
for _ in range(1, 100):
run_ids.append(run_deployment(name="flow/deployment", timeout=0).id)
[wait_for_deployment(r) for r in run_ids]
I am following the prefect 1.0 solution and got the following errors
“Specify a flow name to run”
“Found multiple flows at ‘batch_feature/create_batch_flow.py’”
the code loops over a bunch of config files under same folder to create multiple flows.
any clue why it doesn’t work?
yes! that recipe uses run_deployment to run a pre-existing deployment as a subflow of the calling flow
this is effectively the same as calling the flow object itself in a parent flow with something like asyncio.gather
the big difference is that when you use run_deployment, the subflows will run according to the infrastructure associated with that deployment, whereas when you call the subflow() object itself, it will necessarily run on the calling (parent) flow’s infrastructure as a process
Has anyone run into this issue where a subflow returns the following error?
Flow run encountered an exception. RuntimeError: The call get_task_call_return_value(task=<prefect.tasks.Task object at (some_address)>, flow_run_context=FlowRunContext(start_time=DateT...) is already done.
We started getting these errors after upgrading to 2.10.20 from 2.10.3.
Note: We’re using calling the subflow function instead of run_deployment
Here is some code that we’re using (a bit stripped down), there is also a library package that we maintain that this code references, but they’re just API calls to our own internal services.
Just to note, we’re using a Kubernetes infrastructure block to spawn Jobs in our cluster.
I have only been able to replicate this issue once locally.
import asyncio
import paramiko
from prefect import flow, task
from prefect.context import FlowRunContext
from commonLibrary import jobqueue_api_call, get_host_config, update_job, send_email
###
### Child Flow
###
@task
def get_parameters(job):
expected_checksum = job["parameters"]["expected_checksum"]
file_to_checksum = job["parameters"]["file_to_checksum"]
return expected_checksum, file_to_checksum
@task
def get_tool_server():
# returns a remote server to perform CRC commands on
host = get_host_config("CRC Server")
return host
@task
def run_md5sum_command(host, file):
command = f"md5sum {file}"
client = paramiko.client.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host, username="<username>", password="<password>")
_, stdout, stderr = client.exec_command(command)
err = stderr.readlines()
output = stdout.readlines()
client.close()
if len(err):
raise Exception(" ".join(err))
return output
@task
def compare_checksums(expected, actual):
if expected != actual:
raise ValueError(f"Expected checksum {expected=} does not match {actual=}")
@flow
async def crc_file(job):
try:
expected_checksum, file = get_parameters(job)
host = get_tool_server()
file_checksum = run_md5sum_command(host, file)
compare_checksums(expected_checksum, file_checksum)
job["status"] = "SUCCESS"
except Exception as err:
job["status"] = "READY"
job["retries"] -= 1
if job["retries"] <= 0:
send_email(err)
finally:
update_job(job)
###
### Parent Flow
###
@task
def checkout_jobs(id, limit):
payload = [{"workflowId": id, "maxCountToReturn": limit}]
jobs = jobqueue_api_call("jobs/checkout", "POST", payload)
return jobs
@flow
async def crc_job_watcher(limit=20):
jobs = checkout_jobs(limit)
if jobs:
task_runner_type = type(FlowRunContext.get().task_runner)
futures = [
crc_file.with_options(
flow_run_name=job["name"], task_runner=task_runner_type()
)(job)
for job in jobs
]
await asyncio.gather(*futures)
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 825, in orchestrate_flow_run
from_async.call_soon_in_waiting_thread(
File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 125, in call_soon_in_waiting_thread
waiter.submit(call)
File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/waiters.py", line 110, in submit
raise RuntimeError(f"The call {self._call} is already done.")
RuntimeError: The call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fe0101b1e70>, flow_run_context=FlowRunContext(start_time=DateT...) is already done.
Finished in state Failed('Flow run encountered an exception. RuntimeError: The call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fe0101b1e70>, flow_run_context=FlowRunContext(start_time=DateT...) is already done.')