The right way to have a prefect task that always runs after every other task (at the end of the flow)?

I’m new to Prefect, working on porting over an Airflow DAG as a proof-of concept. In our DAG we have some bookkeeping code (a task) that always needs to run at the end of the flow.

In airflow this is a little janky (like most things), but handled roughly like:

close_out_search = PythonOperator(
        task_id="close_out_search", python_callable=close_out_search, trigger_rule=TriggerRule.ALL_DONE
    )

[other_task1_res, other_task2_res, other_task3_res] >> close_out_search

Where other_task1_res etc. are just the sentinel values that set up the task dependencies so that this always happens after those other tasks have completed (successfully or not, including the case where they didn’t run at all).

I tried to do something like this in Prefect

    close_out_search.submit(
        search,
        wait_for=[
            allow_failure(other_task1_res),
            allow_failure(other_task2_res),
            allow_failure(other_task3_res),
        ],
    )

However, it seems that if it errors out before some of those futures are even created that this does not get run. I’m sure I’m missing something obvious, but searching hasn’t turned up an answer for me yet. I’m sure I’m just using the wrong terms for Airflow. Appreciate any pointers!

can you try:

    close_out_search.submit(
       allow_failure(search),
        wait_for=[
            allow_failure(other_task1_res),
            allow_failure(other_task2_res),
            allow_failure(other_task3_res),
        ],
    )

otherwise, can you share a minimal (full) reproducible example?

Thanks for the reply! Yes, I’ll try this out and revert back with a full example if that doesn’t do it.

I wasn’t sure if there was an established pattern for this sort of “clean-up” task. If this is it, then I’ll keep poking at this to get to the bottom of why it wasn’t working. (I was getting errors related to “Missing state”, but I was working through many issues in my flow, so I can’t say for sure that this was the problem I’m attributing. I’ll keep working it.)

1 Like

here is another example:

from prefect import task, flow, allow_failure
import random


@task
def task_1():
    print("task 1 succeeded")


@task
def task_2():
    print("task 2 succeeded")


@task
def task_3():
    print("task 3 succeeded")


@task
def task_4():
    print("This task often fails")
    if random.random() > 0.5:
        raise ValueError("Non-deterministic error has occured.")
    else:
        print("task 4 succeeded")


@task
def task_5():
    print("task 5 succeeded")


@task
def clean_up_task():
    print("Cleaning up 🧹")


@flow(log_prints=True, name="Cleanup task may not get executed")
def main():
    # 1, 2, 3 can run concurrently
    one = task_1.submit()
    two = task_2.submit()
    three = task_3.submit()
    four = task_4.submit(wait_for=[one, two, three])
    five = task_5.submit()
    clean_up_task.submit(wait_for=[allow_failure(four), allow_failure(five)])


if __name__ == "__main__":
    main()

Ok, I’ve spent some more time understanding Results and failures and I think I’m starting to think of what I’ll need to do here, but here’s a simple example of the problem I’m facing, modifying this last example:



@task
def task_1():
    print("task 1 succeeded")


@task
def task_2():
    print("task 2 succeeded")


@task
def task_3():
    print("task 3 succeeded")


@task
def task_4():
    print("This task often fails")
    if random.random() > 0.5:
        raise ValueError("Non-deterministic error has occured.")
    else:
        print("task 4 succeeded")
        return True


@task
def task_5(t4_result):
    print(f"task 5 got {t4_result} and succeeded")


@task
def clean_up_task():
    print("Cleaning up 🧹")


@flow(log_prints=True, name="Cleanup task may not get executed")
def main():
    # 1, 2, 3 can run concurrently
    one = task_1.submit()
    two = task_2.submit()
    three = task_3.submit()
    four = task_4.submit(wait_for=[one, two, three])
    five = task_5.submit(four)
    clean_up_task.submit(wait_for=[allow_failure(four), allow_failure(five)])


if __name__ == "__main__":
    main()

The problem (as I understand it) is that if task_4 fails non-deterministically, when the future is resolved in task_5 that raises an exception and no five future is returned / set at all (?) In any event, in this modified form, the cleanup task will not run when there’s an error.

Is the right thing to do here to wrap the tasks in try / finally block? Or should I suppress the exception raising? Or maybe I’m still thinking about this wrong and the five future does exist, but in that case I’m confused why clean_up_task does not run, since it allows failure.

I read about the concept of state handlers in Prefect v1; does that concept exist in v2? That seems like another approach to having unconditional logic run at end of flow (?) Or do subflows allow for accumulating tasks and performing cleanup actions regardless of success/failure?

Thanks again for helping me work through this.

Edit: in peering at the logs I do see that cleanup task was submitted even when the error was raised, but in the UI it shows it as not having run:

08:15:08.864 | INFO    | Flow run 'russet-beetle' - ValueError: Non-deterministic error has occured.
08:15:08.857 | ERROR   | Task run 'task_4-2f032c67-0' - Encountered exception during execution:
08:15:08.876 | INFO    | Flow run 'russet-beetle' - Created task run 'clean_up_task-ed05fc63-0' for task 'clean_up_task'
08:15:08.876 | INFO    | Flow run 'russet-beetle' - Submitted task run 'clean_up_task-ed05fc63-0' for execution.
08:15:08.885 | ERROR   | Task run 'task_4-2f032c67-0' - Finished in state Failed('Task run encountered an exception: ValueError: Non-deterministic error has occured.\n')
08:15:08.930 | ERROR   | Flow run 'russet-beetle' - Finished in state Failed('1/6 states failed.')
Traceback (most recent call last):
<SNIP>

It seems like, in thinking about this more, that the wait_for list should be built conditionally based on possible upstream failures. I.e. in this particular example, I shouldn’t include five in the wait_for list, since it never was resolved to either success or failure (?)

no, wrap it in allow_failure() instead - you need to do that for all tasks for which you want to allow failure:

@flow(log_prints=True, name="Cleanup task may not get executed")
def main():
    # 1, 2, 3 can run concurrently
    one = task_1.submit()
    two = task_2.submit()
    three = task_3.submit()
    four = task_4.submit(wait_for=[one, two, three])
    five = task_5.submit(allow_failure(four),)
    clean_up_task.submit(wait_for=[allow_failure(four), allow_failure(five)])


if __name__ == "__main__":
    main()

in v2, there are Automations instead: https://docs.prefect.io/ui/automations/

Ah, I see – ok. In this case, I wouldn’t want task_5 to run, though, as it really does only make sense if task_4 succeeded. However, I do still want clean_up_task to run even if task_5 never runs (and never resolves the five future). Is there a way to encode this in Prefect 2.x?

For now, my workaround (which I think is working, though I also fixed some of the things causing failures, so may not have adequately tested this), is to wrap the meat of the logic in a subflow and do the bookkeeping logic (including cleanup task) outside of that call the subflow. I need to play more with the effects of errors in that setup, though I understand the subflows return a state variable that summarizes the states of the tasks. (I guess if the subflow raises an exception, I’ll be back to the drawing board making sure I handle that and perform the cleanup action.)

For reference my flow looks like:

@flow(log_prints=True)
def collect_and_analyze_results(...):
  # sub-flow which calls the tasks
  ...

@flow(log_prints=True)
def run_geo_search(location: Polygon, ds_names: list[str]):
    search = upload_geo_search.submit(location=location, datasources=ds_names)

    analyze_state = collect_and_analyze_results(datasources=ds_names, location=location, search=search)

    close_out_search.submit(search)

Thank you for the pointer on Automations; perhaps that provides some mechanism I can use for bookkeeping. I will check those out.

1 Like

if this doesn’t do what you want, it looks like a bug:

@flow(log_prints=True, name="Cleanup task may not get executed")
def main():
    # 1, 2, 3 can run concurrently
    one = task_1.submit()
    two = task_2.submit()
    three = task_3.submit()
    four = task_4.submit(wait_for=[one, two, three])
    five = task_5.submit(four),
    clean_up_task.submit(wait_for=[allow_failure(four), five])

you can also do sth like this to check states:

from prefect import task, flow
from prefect.blocks.notifications import SlackWebhook


def send_slack_alert(message: str):
    slack_webhook_block = SlackWebhook.load("default")  # adjust to match your Block name
    slack_webhook_block.notify(message)


@task
def always_succeeds_task():
    return "I'm fail safe! ✅"


@flow
def flow_reacting_to_states():
    state = always_succeeds_task(return_state=True)
    if state.name == "Completed":
        send_slack_alert("Important task completed! 🎉")


if __name__ == "__main__":
    flow_reacting_to_states()