Using Dask client persistence within a Prefect Flow

Hi! I’m fairly new Prefect and struggling with using Dask persistence within a Prefect Flow. I’m not sure if this an issue with code or a limitation within Prefect.

I made the following trivial example of where I’m running into issues. My real use case is far more complex, but I’m hoping this example will concisely illustrate the core of my issue. I want to persist a Dask DataFrame to the Dask client provided by the Prefect DaskTaskRunner. I then want to pass that persisted dataframe to multiple downstream tasks.

"""
Example of persisting a Dask DataFrame with Prefect. DOES NOT work.
"""

import dask.dataframe as dd
import pandas as pd
from prefect import task, flow
from prefect_dask import get_dask_client, DaskTaskRunner
from dask.distributed import wait

@task
def load_ddf_A() -> dd.DataFrame:
    """
    Loads sample data into a Dask DataFrame and attempts to perist it to the Dask client. This does not work.
    """

    with get_dask_client():
        data = [
            [1,2,3], [4,5,6], [7,8,9]
            ,[11,12,13], [14,15,16], [17,18,19]
            ,[21,22,23], [24,25,26], [27,28,29]
            ,[31,32,33], [34,35,36], [37,38,39]
            ,[41,42,43], [44,45,46], [47,48,49]
            ,[51,52,53], [54,55,56], [57,58,59]
            ,[61,62,63], [64,65,66], [67,68,69]
            ,[71,72,23], [74,75,76], [77,78,79]
            ,[81,82,33], [84,85,86], [87,88,89]
            ,[91,92,93], [94,95,96], [97,98,99]
        ]

        cols = ['A', 'B', 'C']

        df = pd.DataFrame(data=data, columns=cols)
        ddf = dd.from_pandas(df, npartitions=1)
        ddf = ddf.persist()
        wait(ddf)

    return ddf

@task
def calc_ddf_B(source_ddf: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 2.
    """

    return source_ddf * 2

@task
def calc_ddf_C(source_ddf: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 2.
    """

    return source_ddf * 3

@flow(task_runner=DaskTaskRunner())
def example_flow():
    """
    Example Prefect Flow that loads sample data into Dask DataFrame A and then creates dataframes B and C by multiplying A by 2 and 3, respectively.
    """

    ddf_A = load_ddf_A.submit()
    ddf_B = calc_ddf_B.submit(ddf_A)
    ddf_C = calc_ddf_C.submit(ddf_A)

    print('ddf_B is: \n')
    print(ddf_B.result().head(10))
    print('\nddf_C is: \n')
    print(ddf_C.result().head(10))

if __name__ == '__main__':
    example_flow()

This approach fails when trying to persist the dataframe. I receive the following error message:

14:22:22.878 | ERROR   | Task run 'load_ddf_A-0' - Crash detected! Execution was cancelled by the runtime environment.
2023-08-28 14:22:22,922 - distributed.worker - WARNING - Compute Failed
Key:       load_ddf_A-0-72b3fab73f2c45c8a3bdc15afffee986-1
Function:  begin_task_run
args:      ()
kwargs:    {'task': <prefect.tasks.Task object at 0x000001AFC5275310>, 'task_run': TaskRun(id=UUID('72b3fab7-3f2c-45c8-a3bd-c15afffee986'), name='load_ddf_A-0', flow_run_id=UUID('613a493a-d2cc-4ae1-ad5a-a339f8588ffd'), task_ke
y='__main__.load_ddf_A', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), ta
gs=[], state_id=UUID('38834eaa-1282-4121-8e8a-8a4b736d767f'), task_inputs={}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 8, 28, 21, 22, 17, 768737,
 tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microse
conds=13275), state=Pending(message=None, type=PENDING, result=None)), 'parameters': {}, 'wait_for': None, 
Exception: 'CancelledError()'

14:22:22.922 | WARNING | distributed.worker - Compute Failed
Key:       load_ddf_A-0-72b3fab73f2c45c8a3bdc15afffee986-1
Function:  begin_task_run
args:      ()
kwargs:    {'task': <prefect.tasks.Task object at 0x000001AFC5275310>, 'task_run': TaskRun(id=UUID('72b3fab7-3f2c-45c8-a3bd-c15afffee986'), name='load_ddf_A-0', flow_run_id=UUID('613a493a-d2cc-4ae1-ad5a-a339f8588ffd'), task_ke
y='__main__.load_ddf_A', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), ta
gs=[], state_id=UUID('38834eaa-1282-4121-8e8a-8a4b736d767f'), task_inputs={}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 8, 28, 21, 22, 17, 768737,
 tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microse
conds=13275), state=Pending(message=None, type=PENDING, result=None)), 'parameters': {}, 'wait_for': None,
Exception: 'CancelledError()'

The following code implements the same logic, but with pure Dask. It works as expected.


"""
Example of persisting a Dask DataFrame without Prefect. DOES work.
"""

import dask.dataframe as dd
import pandas as pd
from dask.distributed import wait, LocalCluster, Client

def load_ddf_A() -> dd.DataFrame:
    """
    Loads sample data into a Dask DataFrame and attempts to perist it to the Dask client. This does work.
    """

    data = [
        [1,2,3], [4,5,6], [7,8,9]
        ,[11,12,13], [14,15,16], [17,18,19]
        ,[21,22,23], [24,25,26], [27,28,29]
        ,[31,32,33], [34,35,36], [37,38,39]
        ,[41,42,43], [44,45,46], [47,48,49]
        ,[51,52,53], [54,55,56], [57,58,59]
        ,[61,62,63], [64,65,66], [67,68,69]
        ,[71,72,23], [74,75,76], [77,78,79]
        ,[81,82,33], [84,85,86], [87,88,89]
        ,[91,92,93], [94,95,96], [97,98,99]
    ]

    cols = ['A', 'B', 'C']

    df = pd.DataFrame(data=data, columns=cols)
    ddf = dd.from_pandas(df, npartitions=1)
    ddf = ddf.persist()
    wait(ddf)

    return ddf

def calc_ddf_B(source_ddf: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 2.
    """

    return source_ddf * 2

def calc_ddf_C(source_ddf: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 3.
    """

    return source_ddf * 3

def example_flow():
    """
    Example Dask workflow that loads sample data into Dask DataFrame A and then creates dataframes B and C by multiplying A by 2 and 3, respectively.
    """

    ddf_A = load_ddf_A()
    ddf_B = calc_ddf_B(ddf_A)
    ddf_C = calc_ddf_C(ddf_A)

    print('ddf_B is: \n')
    print(ddf_B.head(10))
    print('\nddf_C is: \n')
    print(ddf_C.head(10))

if __name__ == '__main__':
    cluster = LocalCluster()
    client = Client(cluster)
    example_flow()

Any suggestions on where I’m going wrong with Prefect would be much appreciated!

Update

I tried running this with the sequential task runner. The goal was to remove any complexities of using Dask for both the Prefect task runner as in the dataframes. This code still does not work, but it does give a different error message.

"""
Example of persisting a Dask DataFrame with Prefect. DOES NOT work.
"""

import dask.dataframe as dd
import pandas as pd
from prefect import task, flow
from dask.distributed import  Client
from prefect.task_runners import SequentialTaskRunner

@task
def load_ddf_A() -> dd.DataFrame:
    """
    Loads sample data into a Dask DataFrame and attempts to perist it to the Dask client. This does not work.
    """

    data = [
        [1,2,3], [4,5,6], [7,8,9]
        ,[11,12,13], [14,15,16], [17,18,19]
        ,[21,22,23], [24,25,26], [27,28,29]
        ,[31,32,33], [34,35,36], [37,38,39]
        ,[41,42,43], [44,45,46], [47,48,49]
        ,[51,52,53], [54,55,56], [57,58,59]
        ,[61,62,63], [64,65,66], [67,68,69]
        ,[71,72,23], [74,75,76], [77,78,79]
        ,[81,82,33], [84,85,86], [87,88,89]
        ,[91,92,93], [94,95,96], [97,98,99]
    ]

    cols = ['A', 'B', 'C']

    df = pd.DataFrame(data=data, columns=cols)
    ddf = dd.from_pandas(df, npartitions=2)
    ddf = dask_client.persist(ddf)

    return ddf

@task
def calc_ddf_B(source_ddf: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 2.
    """

    return source_ddf * 2

@task
def calc_ddf_C(source_ddf: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 3.
    """

    return source_ddf * 3

@flow(task_runner=SequentialTaskRunner())
def example_flow():
    """
    Example Dask workflow that loads sample data into Dask DataFrame A and then creates dataframes B and C by multiplying A by 2 and 3, respectively.
    """

    ddf_A = load_ddf_A.submit()
    ddf_B = calc_ddf_B.submit(ddf_A)
    ddf_C = calc_ddf_C.submit(ddf_A)

    print('ddf_B is: \n')
    print(ddf_B.result().head(10))
    print('\nddf_C is: \n')
    print(ddf_C.result().head(10))

if __name__ == '__main__':
    dask_client = Client()
    example_flow()

This gives a different error message:

17:24:18.706 | INFO    | Flow run 'arboreal-ibis' - Executing 'load_ddf_A-0' immediately...
17:24:28.674 | ERROR   | Task run 'load_ddf_A-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "D:\Users\matt.lessig\pyenv\PrefectETL\Lib\site-packages\prefect\engine.py", line 1729, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "D:\Users\matt.lessig\pyenv\PrefectETL\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 291, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\Users\matt.lessig\pyenv\PrefectETL\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 345, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "D:\Users\matt.lessig\pyenv\PrefectETL\Lib\site-packages\tornado\gen.py", line 780, in run
    yielded = self.gen.throw(exc)
              ^^^^^^^^^^^^^^^^^^^
  File "D:\Users\matt.lessig\pyenv\PrefectETL\Lib\site-packages\dask\base.py", line 329, in f
    yield wait(self)
  File "D:\Users\matt.lessig\pyenv\PrefectETL\Lib\site-packages\tornado\gen.py", line 767, in run
    value = future.result()
            ^^^^^^^^^^^^^^^
  File "D:\Users\matt.lessig\pyenv\PrefectETL\Lib\site-packages\tornado\gen.py", line 810, in handle_yield
    self.future = convert_yielded(yielded)
                  ^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\Users\matt.lessig\AppData\Local\Programs\Python\Python311\Lib\functools.py", line 909, in wrapper
    return dispatch(args[0].__class__)(*args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\Users\matt.lessig\pyenv\PrefectETL\Lib\site-packages\tornado\gen.py", line 880, in convert_yielded
    raise BadYieldError("yielded unknown object %r" % (yielded,))
tornado.gen.BadYieldError: yielded unknown object DoneAndNotDoneFutures(done={<Future: finished, type: pandas.core.frame.DataFrame, key: ('from_pandas-2b03722be3f7cc3abfd2dc51ee122283', 1)>, <Future: finished, type: pandas.cor
e.frame.DataFrame, key: ('from_pandas-2b03722be3f7cc3abfd2dc51ee122283', 0)>}, not_done=set())
17:24:28.731 | ERROR   | Task run 'load_ddf_A-0' - Finished in state Failed("Task run encountered an exception BadYieldError: yielded unknown object DoneAndNotDoneFutures(done={<Future: finished, type: pandas.core.frame.DataFr
ame, key: ('from_pandas-2b03722be3f7cc3abfd2dc51ee122283', 1)>, <Future: finished, type: pandas.core.frame.DataFrame, key: ('from_pandas-2b03722be3f7cc3abfd2dc51ee122283', 0)>}, not_done=set())")

Update 2

I figured out a hack that partially meets the requirement. Basically I just call compute() on the uncalculated Dask dataframe which will force it to calculate to create the Pandas dataframe and then read that calculated Pandas dataframe back into a Dask dataframe. I’d still really like to figure out how to make persist() work as this solution is really hacky and will still result in each downstream dataframe unnecessarily calling dd.from_pandas() rather than referring to a single Dask dataframe.

"""
Example of persisting a Dask DataFrame with Prefect. This somewhat works, but relies on a hack. It's also not as good as persist() as each downstream dataframe will still call from_panadas().
"""

import dask.dataframe as dd
import pandas as pd
from prefect import task, flow
from dask.distributed import  Client
from prefect_dask import DaskTaskRunner, get_dask_client

@task()
def load_df_A() -> dd.DataFrame:
    """
    Loads sample data into a Dask DataFrame and attempts to perist it to the Dask client. This somewhat works but relies on a hack. It's also not as good as persist() as each downstream dataframe will still call from_panadas().
    """
    with get_dask_client():
        #this would be read_csv in the real use case
        data = [
            [1,2,3], [4,5,6], [7,8,9]
            ,[11,12,13], [14,15,16], [17,18,19]
            ,[21,22,23], [24,25,26], [27,28,29]
            ,[31,32,33], [34,35,36], [37,38,39]
            ,[41,42,43], [44,45,46], [47,48,49]
            ,[51,52,53], [54,55,56], [57,58,59]
            ,[61,62,63], [64,65,66], [67,68,69]
            ,[71,72,23], [74,75,76], [77,78,79]
            ,[81,82,33], [84,85,86], [87,88,89]
            ,[91,92,93], [94,95,96], [97,98,99]
        ]
        cols = ['A', 'B', 'C']
        mock_from_csv_df = pd.DataFrame(data=data, columns=cols)
        mock_from_csv_ddf = dd.from_pandas(mock_from_csv_df, npartitions=2)

        #hack to force the Dask DataFrame to calculate by converting it to a Pandas DataFrame
        temp_computed_df = mock_from_csv_ddf.compute()
        semi_persisted_ddf = dd.from_pandas(temp_computed_df, npartitions=2)

        return semi_persisted_ddf

@task()
def calc_df_B(source_df: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 2.
    """

    return source_df * 2

@task()
def calc_df_C(source_df: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 3.
    """

    return source_df * 3

@flow(task_runner=DaskTaskRunner())
def example_flow():
    """
    Example Dask workflow that loads sample data into Dask DataFrame A and then creates dataframes B and C by multiplying A by 2 and 3, respectively.
    """

    df_A = load_df_A.submit()
    df_B = calc_df_B.submit(df_A).result()
    df_C = calc_df_C.submit(df_A).result()

    print('df_B is: \n')
    print(df_B.head(10))
    print('\ndf_C is: \n')
    print(df_C.head(10))

if __name__ == '__main__':
    example_flow()