For the latest versions of 2.0, you can set task-level timeouts by adding .submit().wait()
from prefect import flow, task, get_run_logger
from time import sleep
@task
def my_task():
sleep(2)
return 1
@flow
def my_flow():
final_state = my_task.submit().wait(0.1)
logger = get_run_logger()
if final_state:
logger.info("The task is done")
else:
logger.info("The task is canceled because it takes too long to run")
if __name__ == "__main__":
my_flow()
Output:
14:11:11.406 | INFO | prefect.engine - Created flow run 'glorious-condor' for
flow 'my-flow'
14:11:13.032 | INFO | prefect.flow_runs - Created task run
'my_task-20c6ece6-0' for task 'my_task'
14:11:13.033 | INFO | prefect.flow_runs - Submitted task run
'my_task-20c6ece6-0' for execution.
14:11:13.135 | INFO | prefect.flow_runs - The task is canceled because it
takes too long to run
14:11:15.701 | INFO | prefect.task_runs - Finished in state Completed()
14:11:15.827 | INFO | prefect.flow_runs - Finished in state Completed('All
states completed.')