I have a flow that first downloads several large files, some of them CSVs, then processes these, and finally joins the tables into one large table.
I have divided that into two flows: The first uses a DaskTaskRunner, because downloading and some processing can be done in parallel.
The second flow is trickier: I process the large tables using dask.DataFrame. What I have not been able to figure out is if I use a DaskTaskRunner, how can I access the respective dask cluster for running the DataFrame processing inside a prefect task in parallel? And have cached, resumable tasks?
I ended up using a SequenctialTaskRunner for the second flow. The tasks that use dask.DataFrame get the scheduler_address of the dask Cluster as input, create a client, and thus use the same cluster one after the other, writing their temp results to parquet files, the names of which are task results and are easily cachable.
I am sure there is a better way to do this. Perhaps I can use a DaskTaskRunner for the complete flow, and somehow access it the associated cluster from inside a task, and use it to do the dask.dataframe processing? But then the tasks should be able to use all cluster workers for dataframe processing, not just one.
Any hints are appreciated!
Dummy code that captures how I do it currently:
from prefect import task, flow from prefect.task_runners import SequentialTaskRunner from prefect_dask import DaskTaskRunner from dask.distributed import LocalCluster, Client import dask.dataframe as dd @task def download(url): # ... @task def process_file(filename): # ... @task def process_table1(table_file, output_parquet, scheduler_address) client = Client(scheduler_address=scheduler_address) df = dd.read_csv(table_file) # process df df.to_parquet(output_parquet) return output_parquet @task def process_table2(table_file, output_parquet, scheduler_address) # ... @task def join_tables(table1_parquet, table2_parquet, result_parquet, scheduler_address): client = Client(scheduler_address=scheduler_address) table1 = dd.read_parquet(table1_parquet) table2 = dd.read_parquet(table2_parquet) final_table = table1.join(table2) final_table.to_parquet(result_parquet) return result_parquet @flow def parallele_tasks_flow(): downloaded_file1 = download.submit("url1") downloaded_file2 = download.submit("url2") processed_file = process_file.submit(downloaded_file1) # ... @flow(task_runner=SequentialTaskRunner()) def dask_dataframes_flow(scheduler_address): table1_parquet = process_table1.submit("table1.csv", "table1.parquet", scheduler_address) table2_parquet = process_table2.sumbit("table2.csv", "table2.parquet", scheduler_address) final_table = join_tables(table1_parquet, table2_parquet, "final.parquet", scheduler_address) return final_table @flow def main_flow(scheduler_address): dask_tasks_flow.with_options(task_runner=DaskTaskRunner(address=scheduler_address)) final_table = dask_dataframes_flow(scheduler_address) if __name__ == "__main__": # Has to be set to the same port for the results to be properly cached, because the input does not change dask_cluster_port = 12345 dask_n_workers = 50 cluster = LocalCluster( n_workers=dask_n_workers, scheduler_port=dask_cluster_port, ) final_table = main_flow(cluster.scheduler_address)