How to use Dask without mapping in Prefect 1.0: using Dask worker_client to call client.submit() inside a Prefect task

To use the Dask client inside a task, you can use the worker_client:

from dask.distributed import worker_client

def calling_compute_in_a_task(filepath):
    with worker_client():
        df = dd.read_csv(filepath)
        return df.describe().compute()

This will get the existing client from the Dask executor and then use it to submit a task. Similarly, you can call client.submit().

def fib(value: int) -> int:
    if value < 2:
        return value
    with worker_client() as client:
        a_future = client.submit(fib, value - 1)
        b_future = client.submit(fib, value - 2)
        a, b = client.gather([a_future, b_future])
        return a + b

The DaskExecutor is mainly used if you need mapping or parallelization of tasks. If your Tasks are already doing some form of parallelism, consider creating the Dask cluster and client yourself for more control of Dask submission. Otherwise, it’s easy to fall into a scenario with competing resources.

If you need a DaskExecutor for a few steps within a Flow run. You can consider using a resource manager to spin a temporary Dask cluster.

For example,

with Flow("dask-example") as flow:
    n_workers = Parameter("n_workers", default=None)
    out_path = Parameter("out_path", default="summary.csv")

    with DaskCluster(n_workers=n_workers) as client:
        # These tasks rely on a dask cluster to run, so we create them inside
        # the `DaskCluster` resource manager
        df = load_data()
        summary = summarize(df, client)


This is a great summary, thank you. I’m just wondering if it’s possible to combine the features of Prefect with dask tasks. For example, I want to use a dask DataFrame but to cache the results from each task (which relates to a chunk of the DataFrame) so that it’s fault tolerant. Can we wrap the df.X.compute() in a Prefect task somehow?

The compute() will create a Pandas DataFrame and then you can use the PandasSerializer alongside the Prefect result if you return the dataframe, the Result interface of Prefect will persist it.

Right, but compute() will only return when the entire DataFrame is processed, meaning we only serialize the full DataFrame. I’m wondering if we can serialize the child tasks, namely the tasks that each relate to one partition/subset of the DataFrame. What if the df.apply() fails halfway through, when 5 out of 10 partitions have been completed? We will have to re-run all 10 partitions because we never serialized any of the child tasks.

It ultimately depends on how you design your flow. If you rely on Prefect mapping and Dask executor, and you additionally configure results with a PandasSerializer as Kevin described, then Prefect can use that to checkpoint the task run results.

It’s up to you (and your workflow design) to determine what your task returns. And Prefect can persist the returned results (in whichever serialized form you choose based on your Serializer configuration) to easier recover from failures, e.g. using Restart functionality from the UI.

Yes, I understand that if I manually design the flow using .map I can get each subtask to be checkpointed, but this isn’t what I’m asking. To make it explicitly clear, I want to checkpoint the implicit tasks created by Dask DataFrame methods.

For example, in the below task I would like to checkpoint each of the implicit .assign tasks being computed for each partition of the data and not just the entire task:

def add_stuff(filepath):
    with worker_client():
        df = dd.read_csv(filepath)
        return df.assign(c = lambda df: df.a + df.b).compute()

For that you would need to persist the results yourself - there is no such feature in Prefect to automatically infer what you want to persist :smile: unless you return it and checkpoint the result.

Do you happen to use AWS S3? If so, you could persist this data in a single command using awswrangler - I love this library!

Alternatively, you could manually call .write() method on results within your task:

from prefect import Flow, task
from prefect.engine.results import S3Result

def my_task():
    s3_result = S3Result(bucket='bucket_of_models')

More on that:

Also, you can check:

Right, but as I understand, a Prefect task running under a DaskExecutor is kind of a wrapper around a Dask task. So I was wondering if I could add a hook that captures each Dask task as it is submitted by the dask DataFrame, and then define the Prefect parameters for it including the target. But I’m guessing this wouldn’t be easy.

I appreciate the suggestions, but neither methods of task persistence (S3 or locally) will work, because I want to serialize the implicit tasks created by dask.DataFrame methods, and not the entire “parent” task. I don’t have access to these implicit tasks directly, so I can’t just write them somewhere.

you’re right :slight_smile:

I want to serialize the implicit tasks created by dask.DataFrame methods

Maybe it’s easier to ask in the Dask Discourse then?

So I was wondering if I could add a hook that captures each Dask task as it is submitted by the dask DataFrame, and then define the Prefect parameters for it including the target.

On this one, you might be able to do this if you implement a DaskSerializer on the Prefect side. You can find more in the Serializer docs. Then on the Prefect task level, you can persist the Dask DataFrame.

But in the example,

return df.assign(c = lambda df: df.a + df.b).compute()

It returns a Pandas DataFrame after compute right? I think though you are saying you want to have persistence at the assign operation before compute? I don’t think any native Dask operations will be able to save your work if it succeeds for 5 out of 10 partitions. I may be wrong, I just don’t know of any such operation at the moment. If you want persistence at the partition level, you likely need to write your own logic with mapPartitions and handle it independently of Prefect.

But then it becomes choice of using mapPartitions for your logic versus native Dask. For example, what if you wanted to do groupby-max, Using the mapPartitions way, you are on own your with implementing the logic to partition and then get the max of each partition. If you use the native Dask groupby-max, then there is optimization that doesn’t require shuffling of partitions (it is performed MapReduce style).

If you use the native Dask operations, I am not seeing how we can inject intermediate persistence. The hooks need to be available on the Dask-level. I’m not aware of anything like this (but I can be wrong again). So I think unless you explicitly call the to_csv or to_parquet at specific points of your code. I don’t think it can be done implicitly.

Thanks for your helpful reply. The issue is that df.assign() returns a lazy data frame, and so if I persist that directly then I haven’t actually stored any results, just a list of operations. If I persist the result of df.assign().compute() then I’m persisting the final result which isn’t what I want either.

If you want persistence at the partition level, you likely need to write your own logic with mapPartitions and handle it independently of Prefect.

You are probably right about this. I would lose a lot of convenience by having to implement serialization and also certain operations myself. I’m also not aware of a hook to capture tasks as they are submitted and completed, but this would be ideal. Because then I could persist results and also load them from the cache if they’re already finished.

Oddly, it seems that you can submit tasks to the distributed cluster even without worker_client(). I think the dask.data_frame methods are able to detect when there is a cluster available and uses it automatically.

1 Like

Thanks for the update. I think you know more about this than I do, but LMK if you have any remaining questions on this topic

By the way, something I think worth mentioning is that it’s very easy to cause a deadlock if you do this improperly (as I have been doing). So you need to read through this: Launch Tasks from Tasks — Dask.distributed 2022.5.0 documentation.

Basically, if you submit Dask tasks from your “parent” Prefect task, and you then wait on the child tasks from your parent, you will cause a deadlock where the child can’t run until the parent has finished and vice versa. You can identify this pattern because 99% of the subtasks will complete, but one will not finish, and it will be the one on the same worker as the Prefect task. In the Worker panel of the dashboard it looks like this:
Screenshot from 2022-05-12 15-46-35.

What this is showing is a parent Prefect task (bottom), and a single Dask child Dask, task (top) in deadlock.

So if this happens, you need to make sure to call dask.distributed.secede() before waiting on the subtasks (using dask.distributed.wait() or otherwise).