Could not submit dataclass object to DaskTaskRunner

I have a task that receives an object whose type is a class decorated with @dataclass.
When I submit this task and pass an instance of the class, the task crashes with saying “the __init__ method is missing arguments”.

The following is an example of the issue. This task crashes with the message
Exception: 'TypeError("File.__init__() missing 2 required positional arguments: 'folder' and 'name'")' .

I found that

  • Concurrent or Sequential task runner does not cause the problem.
  • Just calling the task (not submitting) does not cause the problem.

How can I pass dataclass object to tasks run by DaskTaskRunner through submit method ?

import os
from glob import glob
from dataclasses import dataclass

from prefect import flow, task
from prefect_dask import DaskTaskRunner


@dataclass(frozen=True)
class File:
    folder: str
    name: str


@task
def cat_file(f: File):
    fpath = os.path.join(f.folder, f.name)
    with open(fpath) as fp:
        print(fp.read())


@task
def list_files(d: str) -> list[File]:
    return [File(d, os.path.basename(f)) for f in glob(f"{d}/*") if os.path.isfile(f)]


@flow(task_runner=DaskTaskRunner())
def myflow():
    ret = list_files(".")
    for f in ret:
        cat_file.submit(f)


if __name__ == "__main__":
    myflow()

1 Like

I found that if I move the definition of the dataclass to another module and import it, the exception is no longer raised.
This solves my situation. But I don’t understand the essential cause of the error.
For just curiosity, I’m happy if someone could explain the scene behind this problem. I am new to prefect and Dask and start loving them.

1 Like

I don’t immediately see why this would result in a different behavior. Did you try using pydantic model instead of a dataclass?

when in doubt, you can open a bug report issue on GitHub in the prefect repo