I have a task that receives an object whose type is a class decorated with @dataclass
.
When I submit
this task and pass an instance of the class, the task crashes with saying “the __init__
method is missing arguments”.
The following is an example of the issue. This task crashes with the message
Exception: 'TypeError("File.__init__() missing 2 required positional arguments: 'folder' and 'name'")'
.
I found that
- Concurrent or Sequential task runner does not cause the problem.
- Just calling the task (not submitting) does not cause the problem.
How can I pass dataclass object to tasks run by DaskTaskRunner through submit
method ?
import os
from glob import glob
from dataclasses import dataclass
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@dataclass(frozen=True)
class File:
folder: str
name: str
@task
def cat_file(f: File):
fpath = os.path.join(f.folder, f.name)
with open(fpath) as fp:
print(fp.read())
@task
def list_files(d: str) -> list[File]:
return [File(d, os.path.basename(f)) for f in glob(f"{d}/*") if os.path.isfile(f)]
@flow(task_runner=DaskTaskRunner())
def myflow():
ret = list_files(".")
for f in ret:
cat_file.submit(f)
if __name__ == "__main__":
myflow()