How can I set task-level timeouts in Prefect 2?

For the latest versions of 2.0, you can set task-level timeouts by adding .submit().wait()

from prefect import flow, task, get_run_logger
from time import sleep


@task
def my_task():
    sleep(2)
    return 1


@flow
def my_flow():
    final_state = my_task.submit().wait(0.1)
    logger = get_run_logger()
    if final_state:
        logger.info("The task is done")

    else:
        logger.info("The task is canceled because it takes too long to run")


if __name__ == "__main__":
    my_flow()

Output:


14:11:11.406 | INFO    | prefect.engine - Created flow run 'glorious-condor' for
flow 'my-flow'                                                                  
14:11:13.032 | INFO    | prefect.flow_runs - Created task run                   
'my_task-20c6ece6-0' for task 'my_task'                                         
14:11:13.033 | INFO    | prefect.flow_runs - Submitted task run                 
'my_task-20c6ece6-0' for execution.                                             
14:11:13.135 | INFO    | prefect.flow_runs - The task is canceled because it   
takes too long to run                                                           
14:11:15.701 | INFO    | prefect.task_runs - Finished in state Completed()      
14:11:15.827 | INFO    | prefect.flow_runs - Finished in state Completed('All   
states completed.')           
1 Like