Failure in mapped task prevents final cleanup

I’ve tried multiple things to get a cleanup task to run after mapped Tasks if there is a failure. Here is a minimal example that runs to completion if there are no errors, but fails to run the cleanup code if there is an error:

def fails_on_two(x):
    if x == 2:
        raise ValueError("Failed task")
    return x

def identity(y):
    return y

def cleanup_task():
    get_run_logger().info("**** Cleaning up ****")

def map_with_cleanup_task():
    f =[1,2,3])
    c =

    state = cleanup_task.submit(wait_for=[c], return_state=True)  # doesn't Complete
    # state = cleanup_task.submit(wait_for=[allow_failure(c)], return_state=True)  # doesn't Complete
    assert state.is_pending()

#    cleanup_task.submit(wait_for=[c])  # doesn't Complete
#    cleanup_task(c)   # UnfinishedRun exception thrown (PENDING state)
#    cleanup_task.submit(c)  # doesn't Complete
#    cleanup_task.submit(wait_for=identity)  # doesn't wait
#   # Runs for each non-failure, which is no good either

The commented-out lines are all things I’ve tried to no avail. Our real case is more complicated, of course, with the cleanup_task() replaced with a task sending many, file-based outputs to an external API. For this reason, mapping the cleanup_task wouldn’t work.

Possibly related to:

This thread is similar, but doesn’t use, which is required in our case:

Any help / suggestions would be greatly appreciated.