Ok, I’ve spent some more time understanding Results and failures and I think I’m starting to think of what I’ll need to do here, but here’s a simple example of the problem I’m facing, modifying this last example:
@task
def task_1():
print("task 1 succeeded")
@task
def task_2():
print("task 2 succeeded")
@task
def task_3():
print("task 3 succeeded")
@task
def task_4():
print("This task often fails")
if random.random() > 0.5:
raise ValueError("Non-deterministic error has occured.")
else:
print("task 4 succeeded")
return True
@task
def task_5(t4_result):
print(f"task 5 got {t4_result} and succeeded")
@task
def clean_up_task():
print("Cleaning up 🧹")
@flow(log_prints=True, name="Cleanup task may not get executed")
def main():
# 1, 2, 3 can run concurrently
one = task_1.submit()
two = task_2.submit()
three = task_3.submit()
four = task_4.submit(wait_for=[one, two, three])
five = task_5.submit(four)
clean_up_task.submit(wait_for=[allow_failure(four), allow_failure(five)])
if __name__ == "__main__":
main()
The problem (as I understand it) is that if task_4
fails non-deterministically, when the future is resolved in task_5
that raises an exception and no five
future is returned / set at all (?) In any event, in this modified form, the cleanup task will not run when there’s an error.
Is the right thing to do here to wrap the tasks in try / finally block? Or should I suppress the exception raising? Or maybe I’m still thinking about this wrong and the five
future does exist, but in that case I’m confused why clean_up_task
does not run, since it allows failure.
I read about the concept of state handlers in Prefect v1; does that concept exist in v2? That seems like another approach to having unconditional logic run at end of flow (?) Or do subflows allow for accumulating tasks and performing cleanup actions regardless of success/failure?
Thanks again for helping me work through this.
Edit: in peering at the logs I do see that cleanup task was submitted even when the error was raised, but in the UI it shows it as not having run:
08:15:08.864 | INFO | Flow run 'russet-beetle' - ValueError: Non-deterministic error has occured.
08:15:08.857 | ERROR | Task run 'task_4-2f032c67-0' - Encountered exception during execution:
08:15:08.876 | INFO | Flow run 'russet-beetle' - Created task run 'clean_up_task-ed05fc63-0' for task 'clean_up_task'
08:15:08.876 | INFO | Flow run 'russet-beetle' - Submitted task run 'clean_up_task-ed05fc63-0' for execution.
08:15:08.885 | ERROR | Task run 'task_4-2f032c67-0' - Finished in state Failed('Task run encountered an exception: ValueError: Non-deterministic error has occured.\n')
08:15:08.930 | ERROR | Flow run 'russet-beetle' - Finished in state Failed('1/6 states failed.')
Traceback (most recent call last):
<SNIP>
It seems like, in thinking about this more, that the wait_for
list should be built conditionally based on possible upstream failures. I.e. in this particular example, I shouldn’t include five
in the wait_for
list, since it never was resolved to either success or failure (?)