Have been working hard on some migration efforts recently. Really love the new perks with 2.0 but still have some minor issues I found hard to migrate:
Previously in 1.0 we can:
Define flow starter tasks like other tasks, and they will be orchestrated and run in parallel if necessary
Let downstream tasks/subflows to have dependence linked to subflow runs.
Understood from the examples that in 2.0 we can use async to run subflows in parallel, but how would we ensure parallel subflow execution if we are using subflows in a flow with rather complex upstream/downstream tasks dependencies?
In another word, is it possible to make flows return PrefectFutures and adhere to the same submit(wait_for=[xxx]) signature?
What I want to achieve:
def master_flow():
task1 = some_task.submit()
task2 = some_task.submit()
subflow_1_result = subflow_1.submit(wait_for=[task_1])
subflow_2_result = subflow_2.submit(wait_for=[task_2])
task_final = final_task.submit(wait_for=[subflow_1_result , task_2])
And in this case, we should see subflow 1&2 running in parallel if task1&2 finish at around the same time, we should also see task_final being run once both subflow_1 and task2 finish.
Also stumbled upon these GitHub request that seems related:
opened 02:41AM - 26 Oct 22 UTC
enhancement
status:roadmap
from:sales
v1-parity
### First check
- [X] I added a descriptive title to this issue.
- [X] I used tā¦ he GitHub search to find a similar request and didn't find it.
- [X] I searched the Prefect documentation for this feature.
### Prefect Version
2.x
### Describe the current behavior
This issue is extracting an enhancement request buried in the discussion of issue #5853 which was about resolving `RuntimeError: The task runner is already started!` when running concurrent subflows. In the discussion for #5853 [this example](https://github.com/PrefectHQ/prefect/issues/5853#issuecomment-1221536726) was presented which I have updated with the workaround from #7319.
```python
import asyncio
from prefect import task, flow, get_run_logger
from prefect.context import FlowRunContext
@task
async def print_x(x):
logger = get_run_logger()
logger.info(x)
await asyncio.sleep(2)
@flow
async def subflow(x):
await print_x(x)
@flow
async def parent_flow():
task_runner_type = type(FlowRunContext.get().task_runner)
futures = [subflow.with_options(task_runner=task_runner_type())(x) for x in ["x1", "x2", "x3"]]
await asyncio.gather(*futures)
if __name__ == "__main__":
asyncio.run(parent_flow())
```
The feature request here is two parts:
1. For [subflow tasks to not have to be async](https://github.com/PrefectHQ/prefect/issues/5853#issuecomment-1147655272)
2. For a cleaner interface (e.g. `flow.map`) to invoke the parallel subflows.
### Describe the proposed behavior
Desired implementation is to be able to achieve concurrent subflow execution with synchronous subflow tasks by doing something like this.
```python
import time
from prefect import task, flow, get_run_logger
@task
def print_x(x):
logger = get_run_logger()
logger.info(x)
time.sleep(2)
@flow
def subflow(x):
print_x(x)
@flow
def parent_flow():
subflow.map(["x1", "x2", "x3"])
if __name__ == "__main__":
parent_flow()
```
### Example Use
_No response_
### Additional context
```
Version: 2.6.4
API version: 0.8.2
Python version: 3.9.13
Git commit: 51e92dda
Built: Thu, Oct 20, 2022 3:11 PM
OS/Arch: linux/x86_64
Profile: default
Server type: hosted
```
opened 03:33PM - 02 Sep 22 UTC
enhancement
needs:design
status:roadmap
priority:medium
from:sales
### First check
- [X] I added a descriptive title to this issue.
- [X] I used tā¦ he GitHub search to find a similar request and didn't find it.
- [X] I searched the Prefect documentation for this feature.
### Prefect Version
2.x
### Describe the current behavior
Currently, subflows can only be run in the main process and only tasks can be submitted to external infrastructure and run in the background.
### Describe the proposed behavior
Flows should be submittable to external infrastructure without a deployment. Calling `Flow.submit` should return a future that can be used to wait for the flow run's completion.
All of the features available for flow calls should be available for submission. We may limit this in the first iteration to get the feature in user's hands faster.
### Example Use
```python
from prefect import flow
@flow(infrastructure=KubernetesJob(...))
def bar():
return 1
@flow
def foo():
future = bar.submit()
x = future.result()
```
### Additional context
An extension of https://github.com/PrefectHQ/prefect/issues/6688
This feature will require expert changes to Prefect internals:
- Futures need to be updated to support flows
- Infrastructure declaration needs to be added at the flow decorator level
- Waiting for flow run completion from a future needs to be performant
- Flow result retrieval from a future will require design
- We may need to require result handling to be configured to use this feature
- We may be able to pass results back via cloudpickle and stdout or a mounted file
- For container based infrastructure, we will need to determine what image the flow should run in
- Since the user is calling it from the current flow we may be able to use the parent's image in some cases
- We need to determine how to get the flow to the infrastructure, i.e. serialization
Is there an ETA on this feature? Are there any possible workarounds?
2 Likes
I fully understand the problem and Iām with you on that, we should add that. No ETA, but Iād recommend following up on this GitHub issue. I will link this topic to it.
1 Like