How to execute a cleanup task even if previous tasks fail?

The easiest way to accomplish that is to leverage the allow_failure utility.

Here is a simple flow example that demonstrates how you can use it:

from prefect import task, flow, get_run_logger, allow_failure
import random


@task
def extract_data():
    return 42


@task
def extract_data_2():
    return 2


@task
def transform_data(x: int, y: int = 2) -> int:
    if random.random() > 0.5:
        raise ValueError("Non-deterministic error has occured.")
    else:
        return (x + 42) * y


@task
def clean_up_task():
    logger = get_run_logger()
    logger.info("Cleaning up 🧹")


@flow
def allow_flaky_transformation_to_pass():
    data = extract_data.submit()
    data_2 = extract_data_2.submit()
    result = transform_data.submit(data, data_2)
    clean_up_task.submit(wait_for=[allow_failure(result)])


if __name__ == "__main__":
    allow_flaky_transformation_to_pass()

For a more detailed description, check the release blog post announcing that feature: