I’ve tried multiple things to get a cleanup task to run after mapped Tasks if there is a failure. Here is a minimal example that runs to completion if there are no errors, but fails to run the cleanup code if there is an error:
@task def fails_on_two(x): if x == 2: raise ValueError("Failed task") return x @task def identity(y): return y @task def cleanup_task(): get_run_logger().info("**** Cleaning up ****") @flow def map_with_cleanup_task(): f = fails_on_two.map([1,2,3]) c = identity.map(f) state = cleanup_task.submit(wait_for=[c], return_state=True) # doesn't Complete # state = cleanup_task.submit(wait_for=[allow_failure(c)], return_state=True) # doesn't Complete assert state.is_pending() # cleanup_task.submit(wait_for=[c]) # doesn't Complete # cleanup_task(c) # UnfinishedRun exception thrown (PENDING state) # cleanup_task.submit(c) # doesn't Complete # cleanup_task.submit(wait_for=identity) # doesn't wait # cleanup_task.map(c) # Runs for each non-failure, which is no good either
The commented-out lines are all things I’ve tried to no avail. Our real case is more complicated, of course, with the cleanup_task() replaced with a task sending many, file-based outputs to an external API. For this reason, mapping the cleanup_task wouldn’t work.
Possibly related to:
This thread is similar, but doesn’t use Task.map, which is required in our case:
Any help / suggestions would be greatly appreciated.