I have a flow that first downloads several large files, some of them CSVs, then processes these, and finally joins the tables into one large table.
I have divided that into two flows: The first uses a DaskTaskRunner, because downloading and some processing can be done in parallel.
The second flow is trickier: I process the large tables using dask.DataFrame. What I have not been able to figure out is if I use a DaskTaskRunner, how can I access the respective dask cluster for running the DataFrame processing inside a prefect task in parallel? And have cached, resumable tasks?
I ended up using a SequenctialTaskRunner for the second flow. The tasks that use dask.DataFrame get the scheduler_address of the dask Cluster as input, create a client, and thus use the same cluster one after the other, writing their temp results to parquet files, the names of which are task results and are easily cachable.
I am sure there is a better way to do this. Perhaps I can use a DaskTaskRunner for the complete flow, and somehow access it the associated cluster from inside a task, and use it to do the dask.dataframe processing? But then the tasks should be able to use all cluster workers for dataframe processing, not just one.
Any hints are appreciated!
Dummy code that captures how I do it currently:
from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner
from prefect_dask import DaskTaskRunner
from dask.distributed import LocalCluster, Client
import dask.dataframe as dd
@task
def download(url):
# ...
@task
def process_file(filename):
# ...
@task
def process_table1(table_file, output_parquet, scheduler_address)
client = Client(scheduler_address=scheduler_address)
df = dd.read_csv(table_file)
# process df
df.to_parquet(output_parquet)
return output_parquet
@task
def process_table2(table_file, output_parquet, scheduler_address)
# ...
@task
def join_tables(table1_parquet, table2_parquet, result_parquet, scheduler_address):
client = Client(scheduler_address=scheduler_address)
table1 = dd.read_parquet(table1_parquet)
table2 = dd.read_parquet(table2_parquet)
final_table = table1.join(table2)
final_table.to_parquet(result_parquet)
return result_parquet
@flow
def parallele_tasks_flow():
downloaded_file1 = download.submit("url1")
downloaded_file2 = download.submit("url2")
processed_file = process_file.submit(downloaded_file1)
# ...
@flow(task_runner=SequentialTaskRunner())
def dask_dataframes_flow(scheduler_address):
table1_parquet = process_table1.submit("table1.csv", "table1.parquet", scheduler_address)
table2_parquet = process_table2.sumbit("table2.csv", "table2.parquet", scheduler_address)
final_table = join_tables(table1_parquet, table2_parquet, "final.parquet", scheduler_address)
return final_table
@flow
def main_flow(scheduler_address):
dask_tasks_flow.with_options(task_runner=DaskTaskRunner(address=scheduler_address))
final_table = dask_dataframes_flow(scheduler_address)
if __name__ == "__main__":
# Has to be set to the same port for the results to be properly cached, because the input does not change
dask_cluster_port = 12345
dask_n_workers = 50
cluster = LocalCluster(
n_workers=dask_n_workers,
scheduler_port=dask_cluster_port,
)
final_table = main_flow(cluster.scheduler_address)