How can I set task-level timeouts in Prefect 2?

View in #prefect-community on Slack

Michael_Smith @Michael_Smith: Hello, my prefect 2.0 tests are going well…I see that Flow has a timeout_seconds parameter, is there any way to set Task level timeouts as well?

Kevin_Kho @Kevin_Kho: Orion doesn’t have timeouts yet so I don’t think there is a simple way like in Prefect 1

Michael_Adkins @Michael_Adkins: We don’t support cancellation of tasks at this time, but you can perform logic based on a wait/timeout

future = my_task(arg, arg)
state = future.wait(5)
if not state:
   # Do something based on this timeout
   ...

Here’s the example from the docstring

        Wait N sconds for the task to complete

        >>> @flow
        >>> def my_flow():
        >>>     future = my_task()
        >>>     final_state = future.wait(0.1)
        >>>     if final_state:
        >>>         ... # Task done
        >>>     else:
        >>>         ... # Task not done yet

For the latest versions of 2.0, you can set task-level timeouts by adding .submit().wait()

from prefect import flow, task, get_run_logger
from time import sleep


@task
def my_task():
    sleep(2)
    return 1


@flow
def my_flow():
    final_state = my_task.submit().wait(0.1)
    logger = get_run_logger()
    if final_state:
        logger.info("The task is done")

    else:
        logger.info("The task is canceled because it takes too long to run")


if __name__ == "__main__":
    my_flow()

Output:


14:11:11.406 | INFO    | prefect.engine - Created flow run 'glorious-condor' for
flow 'my-flow'                                                                  
14:11:13.032 | INFO    | prefect.flow_runs - Created task run                   
'my_task-20c6ece6-0' for task 'my_task'                                         
14:11:13.033 | INFO    | prefect.flow_runs - Submitted task run                 
'my_task-20c6ece6-0' for execution.                                             
14:11:13.135 | INFO    | prefect.flow_runs - The task is canceled because it   
takes too long to run                                                           
14:11:15.701 | INFO    | prefect.task_runs - Finished in state Completed()      
14:11:15.827 | INFO    | prefect.flow_runs - Finished in state Completed('All   
states completed.')           
1 Like

Hi @Khuyen_Tran, thanks for the update!

But that would mean that the tasks are only executed sequentially, correct?

Let’s say I want to spawn several compute-intensive Prefect tasks in parallel. I know that some of them might get stuck and I want them to fail if they run longer then 10 minutes.

With the .wait() approach, every task would be executed in a sequential fashion and not concurrently, correct?

from prefect import flow, task
from prefect_ray import RayTaskRunner


@task
def my_task(param):
    do_heavy_compute(param)
    return True


@flow(task_runner=RayTaskRunner(address="my-beefy-autoscaled-cluster"))
def my_flow():
    params = [
        "various computational inputs",
        "eg machine learning parameters",
    ]
    
    for param in params:
        my_task.submit(param).wait(60 * 10)


if __name__ == "__main__":
    my_flow()
1 Like

Toby, we are on it. There is a branch named add-task-timeout - should be released soon

2 Likes