from prefect import flow, task
from prefect_dask import get_dask_client
from dask.distributed import Client, LocalCluster
from prefect_dask import DaskTaskRunner
@task
def my_decorated_task(num: int,):
return True
@flow(name='my_flow', persist_result=True)
def my_decorated_flow():
with get_dask_client() as client:
numbers = list(range(20000))
return my_decorated_task.map(numbers)
if __name__ == '__main__':
with LocalCluster() as cluster:
with Client(cluster) as client:
result = my_decorated_flow.with_options(task_runner=DaskTaskRunner(client.scheduler.address))()
Here is something minimal. Using a local cluster, you can see here that it takes 10 minutes to submit a few thousand tasks. In total, all 20k tasks were not present in the prefect DB until 18 minutes later.
I ran it from an EC2 machine to rule out networking and local resources, drastically increased the size of the database server and the same with the API server to rule out those pieces of hardware.
The only reason the queued tasks isn’t the bottleneck in this case is that it is a local cluster with only 16 threads available for execution.
In an even more minimal example, task creation and submission seems to happen at the same rate without the Dask task runner. You can see here at 3 minutes in only around 3k tasks have made it to the db.
from prefect import flow, task
@task
def my_decorated_task(num: int):
return True
@flow(name='my_flow', persist_result=True)
def my_decorated_flow():
numbers = list(range(20000))
return my_decorated_task.map(numbers)
if __name__ == '__main__':
result = my_decorated_flow()
In the second use case, I can see how this would be fine and would rarely be a bottleneck, but in the first use case (distributed computing), it very quickly becomes the bottleneck to how many tasks can actually be run concurrently. I know tasks are currently being created asynchronously, but it seems to me as though they will need to be created in a distributed manner in order to keep from being a bottleneck in a use case like this.
It seems like my question at this point has morphed into: “Is there any established way to submit tasks to a flow run from another process/outside of the flow context”?
By the way, thanks so much for the support on this.