Wait for successful execution of previous flow on Prefect

I’ve posted this on SO and even set a bounty there, that will be open until June 30th 2023.

I’m trying to use Prefect 2 to configure the execution of a few tasks on Windows (everything is on my local machine).

I often have problems with some tasks, that I might need to fix manually and re-run it. After this, I want other tasks to wait until a successful execution of the first task and only then start, without me having to execute failed schedules manually. As I’ve made on the code below, task_that_should_wait waits for task_that_fails to finish, regardless of whether it is completed successfully or failed. They are schedules by one flow for each, the reason for using separate flows is because they are scheduled to execute nominally at different times.

from prefect import flow, task
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
    

@task
def task_that_fails():
    raise(Exception("Failed here")) # Imagine this is a bug, that I'll fix manually
    return True

    
@flow
def flow_that_fails():
    task_that_fails()

deploy_that_fails = Deployment.build_from_flow(
    name="task_that_fails",
    flow=flow_that_fails,
    schedule=CronSchedule(cron="36 21 * * MON,TUE,WED,THU,FRI", timezone='US/Central'),
)

deploy_that_fails.apply()
    
@task
def task_that_should_wait():
    print("hey!")
    return True

    
@flow
def flow_that_waits():
    task_that_should_wait(wait_for=['task_that_fails'])

deploy_that_waits = Deployment.build_from_flow(
    name="deploy_that_waits",
    flow=flow_that_waits,
    schedule=CronSchedule(cron="35 21 * * MON,TUE,WED,THU,FRI", timezone='US/Central'),
)

deploy_that_waits.apply()

How can I configure task_that_should_wait to actually wait for the successful completion of task_that_fails? These are props for processes that take very long and have multiple task dependencies, so think that task_that_waits is not necessary run after task_that_fails, so I also need to avoid making them a single task or flow.

I was looking for the same thing and found the .wait_for=().