I’ve tried multiple things to get a cleanup task to run after mapped Tasks if there is a failure. Here is a minimal example that runs to completion if there are no errors, but fails to run the cleanup code if there is an error:
@task
def fails_on_two(x):
if x == 2:
raise ValueError("Failed task")
return x
@task
def identity(y):
return y
@task
def cleanup_task():
get_run_logger().info("**** Cleaning up ****")
@flow
def map_with_cleanup_task():
f = fails_on_two.map([1,2,3])
c = identity.map(f)
state = cleanup_task.submit(wait_for=[c], return_state=True) # doesn't Complete
# state = cleanup_task.submit(wait_for=[allow_failure(c)], return_state=True) # doesn't Complete
assert state.is_pending()
# cleanup_task.submit(wait_for=[c]) # doesn't Complete
# cleanup_task(c) # UnfinishedRun exception thrown (PENDING state)
# cleanup_task.submit(c) # doesn't Complete
# cleanup_task.submit(wait_for=identity) # doesn't wait
# cleanup_task.map(c) # Runs for each non-failure, which is no good either
The commented-out lines are all things I’ve tried to no avail. Our real case is more complicated, of course, with the cleanup_task() replaced with a task sending many, file-based outputs to an external API. For this reason, mapping the cleanup_task wouldn’t work.
Possibly related to:
This thread is similar, but doesn’t use Task.map, which is required in our case:
Any help / suggestions would be greatly appreciated.