Wait for Mapped Tasjs

Hello!

I’m using this flow below:

@flow(task_runner=DaskTaskRunner())
def texts_flow():

logger = get_run_logger()
futureIds = read_database_task.submit()
ids = futureIds.result()
arrayOfSaved = save_record.map(ids)
arrayOfPdfsSaved = save_pdf_task.map(arrayOfSaved)
arrayClassified = classify_pdfs_task.map(arrayOfPdfsSaved)
                                   
save_in_the_database_task.submit(arrayClassified)

It reads a database to get a list of ids. There are more than 6000 of these ids, that’s why I’m using the DaskTaskRunner. I map these ids to some tasks on them, and after these tasks are done, it saves the result in the database. The problem is, I can only trigger the last task when the list of tasks is complete, otherwise I will face the “database is locked” problem.

How do I wait for all tasks before the last to be completed, so I can send a complete list to the last task? In the current situation, with the flow as it is described, Prefect triggers the last task before all tasks are complete, then after a while triggers it again, so the saving of the database happens many times, causing “database is locked” errors.

1 Like

Did you ever find a solution for this? I’m trying to solve for the same thing

1 Like