Using Dask Dataframes in prefect 2 tasks

I have a flow that first downloads several large files, some of them CSVs, then processes these, and finally joins the tables into one large table.
I have divided that into two flows: The first uses a DaskTaskRunner, because downloading and some processing can be done in parallel.

The second flow is trickier: I process the large tables using dask.DataFrame. What I have not been able to figure out is if I use a DaskTaskRunner, how can I access the respective dask cluster for running the DataFrame processing inside a prefect task in parallel? And have cached, resumable tasks?

I ended up using a SequenctialTaskRunner for the second flow. The tasks that use dask.DataFrame get the scheduler_address of the dask Cluster as input, create a client, and thus use the same cluster one after the other, writing their temp results to parquet files, the names of which are task results and are easily cachable.

I am sure there is a better way to do this. Perhaps I can use a DaskTaskRunner for the complete flow, and somehow access it the associated cluster from inside a task, and use it to do the dask.dataframe processing? But then the tasks should be able to use all cluster workers for dataframe processing, not just one.

Any hints are appreciated!

Dummy code that captures how I do it currently:

from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner
from prefect_dask import DaskTaskRunner
from dask.distributed import LocalCluster, Client
import dask.dataframe as dd

def download(url):
    # ...

def process_file(filename):
    # ...

def process_table1(table_file, output_parquet, scheduler_address)
  	client = Client(scheduler_address=scheduler_address)
   df = dd.read_csv(table_file)
   # process df 
   return output_parquet

def process_table2(table_file, output_parquet, scheduler_address)
   # ...

def join_tables(table1_parquet, table2_parquet, result_parquet, scheduler_address):
   	client = Client(scheduler_address=scheduler_address)
    table1 = dd.read_parquet(table1_parquet)
    table2 = dd.read_parquet(table2_parquet)
    final_table = table1.join(table2)
    return result_parquet

def parallele_tasks_flow():
    downloaded_file1 = download.submit("url1")
    downloaded_file2 = download.submit("url2")
    processed_file = process_file.submit(downloaded_file1)
    # ...

def dask_dataframes_flow(scheduler_address):
    table1_parquet = process_table1.submit("table1.csv", "table1.parquet", scheduler_address)
    table2_parquet = process_table2.sumbit("table2.csv", "table2.parquet", scheduler_address)
    final_table = join_tables(table1_parquet, table2_parquet, "final.parquet", scheduler_address)
    return final_table

def main_flow(scheduler_address):
    final_table = dask_dataframes_flow(scheduler_address)

if __name__ == "__main__":
    # Has to be set to the same port for the results to be properly cached, because the input does not change
    dask_cluster_port = 12345
    dask_n_workers = 50
    cluster = LocalCluster(
    final_table = main_flow(cluster.scheduler_address)

this is fine, for downloading you could even use ConcurrentTaskRunner instead of dask because it’s IO

Actually, you don’t need to do that as explicitly. If you don’t use submit, everything will run in the main flow run process sequentially:

def dask_dataframes_flow(scheduler_address):
    table1_parquet = process_table1("table1.csv", "table1.parquet", scheduler_address)
    table2_parquet = process_table2("table2.csv", "table2.parquet", scheduler_address)
    final_table = join_tables(table1_parquet, table2_parquet, "final.parquet", scheduler_address)
    return final_table

even if this would be possible, it would be implicit and less clean than what you have now

Your flow looks fine. Nice work and thanks for sharing this!

1 Like