How can I set task-level timeouts in Prefect 2?

View in #prefect-community on Slack

Michael_Smith @Michael_Smith: Hello, my prefect 2.0 tests are going well…I see that Flow has a timeout_seconds parameter, is there any way to set Task level timeouts as well?

Kevin_Kho @Kevin_Kho: Orion doesn’t have timeouts yet so I don’t think there is a simple way like in Prefect 1

Michael_Adkins @Michael_Adkins: We don’t support cancellation of tasks at this time, but you can perform logic based on a wait/timeout

future = my_task(arg, arg)
state = future.wait(5)
if not state:
   # Do something based on this timeout
   ...

Here’s the example from the docstring

        Wait N sconds for the task to complete

        >>> @flow
        >>> def my_flow():
        >>>     future = my_task()
        >>>     final_state = future.wait(0.1)
        >>>     if final_state:
        >>>         ... # Task done
        >>>     else:
        >>>         ... # Task not done yet

For the latest versions of 2.0, you can set task-level timeouts by adding .submit().wait()

from prefect import flow, task, get_run_logger
from time import sleep


@task
def my_task():
    sleep(2)
    return 1


@flow
def my_flow():
    final_state = my_task.submit().wait(0.1)
    logger = get_run_logger()
    if final_state:
        logger.info("The task is done")

    else:
        logger.info("The task is canceled because it takes too long to run")


if __name__ == "__main__":
    my_flow()

Output:


14:11:11.406 | INFO    | prefect.engine - Created flow run 'glorious-condor' for
flow 'my-flow'                                                                  
14:11:13.032 | INFO    | prefect.flow_runs - Created task run                   
'my_task-20c6ece6-0' for task 'my_task'                                         
14:11:13.033 | INFO    | prefect.flow_runs - Submitted task run                 
'my_task-20c6ece6-0' for execution.                                             
14:11:13.135 | INFO    | prefect.flow_runs - The task is canceled because it   
takes too long to run                                                           
14:11:15.701 | INFO    | prefect.task_runs - Finished in state Completed()      
14:11:15.827 | INFO    | prefect.flow_runs - Finished in state Completed('All   
states completed.')           
1 Like

Hi @Khuyen_Tran, thanks for the update!

But that would mean that the tasks are only executed sequentially, correct?

Let’s say I want to spawn several compute-intensive Prefect tasks in parallel. I know that some of them might get stuck and I want them to fail if they run longer then 10 minutes.

With the .wait() approach, every task would be executed in a sequential fashion and not concurrently, correct?

from prefect import flow, task
from prefect_ray import RayTaskRunner


@task
def my_task(param):
    do_heavy_compute(param)
    return True


@flow(task_runner=RayTaskRunner(address="my-beefy-autoscaled-cluster"))
def my_flow():
    params = [
        "various computational inputs",
        "eg machine learning parameters",
    ]
    
    for param in params:
        my_task.submit(param).wait(60 * 10)


if __name__ == "__main__":
    my_flow()
1 Like

Toby, we are on it. There is a branch named add-task-timeout - should be released soon

2 Likes

The task decorator has a timeout_seconds parameter now but frozen tasks do not get cancelled.

1 Like

Exactly. Any fix to it?

Timed out tasks are treated like any other failed tasks! Docs here

That’s what I expected when I ran that snippet in docs, before having to google the problem and end up here. Please take a look at the code and output. What’s wrong with it? Why does it complete?

CODE:

from prefect import task, get_run_logger, flow
import time


@task(timeout_seconds=1)
def show_timeouts():
    logger = get_run_logger()
    logger.info("I will execute")
    time.sleep(5)
    logger.info("I will not execute")


@flow
def test_timeout():
    show_timeouts()


test_timeout()

OUTPUT:

13:11:07.642 | INFO    | prefect.engine - Created flow run 'organic-toucan' for flow 'test-timeout'
13:11:07.646 | INFO    | Flow run 'organic-toucan' - View at http://127.0.0.1:4200/flow-runs/flow-run/4d09c304-6fa2-4b20-8502-c3b0c09c964b
13:11:07.850 | INFO    | Flow run 'organic-toucan' - Created task run 'show_timeouts-0' for task 'show_timeouts'
13:11:07.851 | INFO    | Flow run 'organic-toucan' - Executing 'show_timeouts-0' immediately...
13:11:07.944 | INFO    | Task run 'show_timeouts-0' - I will execute
13:11:12.957 | INFO    | Task run 'show_timeouts-0' - I will not execute
13:11:13.005 | INFO    | Task run 'show_timeouts-0' - Finished in state Completed()
13:11:13.049 | INFO    | Flow run 'organic-toucan' - Finished in state Completed('All states completed.')

Process finished with exit code 0

I just ran the exact same code and encountered the expected TimeoutError, stack trace below. Can you double check that your Prefect version is up to date?

In [1]: from prefect import task, get_run_logger, flow
   ...: import time
   ...:
   ...:
   ...: @task(timeout_seconds=1)
   ...: def show_timeouts():
   ...:     logger = get_run_logger()
   ...:     logger.info("I will execute")
   ...:     time.sleep(5)
   ...:     logger.info("I will not execute")
   ...:
   ...:
   ...: @flow
   ...: def test_timeout():
   ...:     show_timeouts()
   ...:
   ...:
   ...: test_timeout()
18:00:11.334 | INFO    | prefect.engine - Created flow run 'fervent-degu' for flow 'test-timeout'
18:00:11.336 | INFO    | Flow run 'fervent-degu' - View at https://app.prefect.cloud/account/0f4498-d380-4d7b-b2da03823f/workspace/969-4ab8-93b7-2dfa784903e6/flow-runs/flow-run/107f3ba9-93fa-4036-b84d-27ae8f8352e1
18:00:36.000 | INFO    | Flow run 'fervent-degu' - Created task run 'show_timeouts-0' for task 'show_timeouts'
18:00:36.001 | INFO    | Flow run 'fervent-degu' - Executing 'show_timeouts-0' immediately...
18:00:37.055 | INFO    | Task run 'show_timeouts-0' - I will execute
18:00:42.063 | ERROR   | Task run 'show_timeouts-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
    result = await call.aresult()
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 294, in aresult
    raise CancelledError() from exc
prefect._internal.concurrency.cancellation.CancelledError
18:00:42.264 | ERROR   | Task run 'show_timeouts-0' - Finished in state TimedOut('Task run exceeded timeout of 1.0 seconds TimeoutError: ', type=FAILED)
18:00:42.275 | ERROR   | Flow run 'fervent-degu' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
    result = await call.aresult()
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 294, in aresult
    raise CancelledError() from exc
prefect._internal.concurrency.cancellation.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py", line 833, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "<ipython-input-1-33b94ab4af9e>", line 15, in test_timeout
    show_timeouts()
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/tasks.py", line 505, in __call__
    return enter_task_run_engine(
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py", line 1137, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
    return self.future.result(timeout=timeout)
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
    return self.__get_result()
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py", line 1302, in get_task_call_return_value
    return await future._result()
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
TimeoutError
18:00:42.527 | ERROR   | Flow run 'fervent-degu' - Finished in state Failed('Flow run encountered an exception. TimeoutError: ')
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:292, in Call.aresult(self)
    291 try:
--> 292     return await asyncio.wrap_future(self.future)
    293 except asyncio.CancelledError as exc:

CancelledError:

The above exception was the direct cause of the following exception:

CancelledError                            Traceback (most recent call last)
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py:1719, in orchestrate_task_run(task, task_run, parameters, wait_for, result_factory, log_prints, interruptible, client)
   1716     call = from_async.call_soon_in_new_thread(
   1717         create_call(task.fn, *args, **kwargs), timeout=task.timeout_seconds
   1718     )
-> 1719     result = await call.aresult()
   1721 except (CancelledError, asyncio.CancelledError) as exc:

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:294, in Call.aresult(self)
    293 except asyncio.CancelledError as exc:
--> 294     raise CancelledError() from exc

CancelledError:

The above exception was the direct cause of the following exception:

TimeoutError                              Traceback (most recent call last)
Cell In[1], line 18
     13 @flow
     14 def test_timeout():
     15     show_timeouts()
---> 18 test_timeout()

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/flows.py:511, in Flow.__call__(self, return_state, wait_for, *args, **kwargs)
    507 parameters = get_call_parameters(self.fn, args, kwargs)
    509 return_type = "state" if return_state else "result"
--> 511 return enter_flow_run_engine_from_flow_call(
    512     self,
    513     parameters,
    514     wait_for=wait_for,
    515     return_type=return_type,
    516 )

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py:272, in enter_flow_run_engine_from_flow_call(flow, parameters, wait_for, return_type)
    265     retval = from_async.wait_for_call_in_loop_thread(
    266         begin_run,
    267         done_callbacks=done_callbacks,
    268         contexts=contexts,
    269     )
    271 else:
--> 272     retval = from_sync.wait_for_call_in_loop_thread(
    273         begin_run,
    274         done_callbacks=done_callbacks,
    275         contexts=contexts,
    276     )
    278 return retval

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py:243, in from_sync.wait_for_call_in_loop_thread(_from_sync__call, timeout, done_callbacks, contexts)
    241     stack.enter_context(context)
    242 waiter.wait()
--> 243 return call.result()

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:283, in Call.result(self, timeout)
    277 def result(self, timeout: Optional[float] = None) -> T:
    278     """
    279     Wait for the result of the call.
    280
    281     Not safe for use from asynchronous contexts.
    282     """
--> 283     return self.future.result(timeout=timeout)

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:169, in Future.result(self, timeout)
    167     raise CancelledError()
    168 elif self._state == FINISHED:
--> 169     return self.__get_result()
    171 self._condition.wait(timeout)
    173 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    174     # Raise Prefect cancelled error instead of
    175     # `concurrent.futures._base.CancelledError`

File ~/.pyenv/versions/3.10.8/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:346, in Call._run_async(***failed resolving arguments***)
    344     with set_current_call(self):
    345         with self.future.enforce_async_deadline() as cancel_scope:
--> 346             result = await coro
    347 except CancelledError:
    348     # Report cancellation
    349     if cancel_scope.timedout():

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/client/utilities.py:51, in inject_client.<locals>.with_injected_client(*args, **kwargs)
     49 async with client_context as new_client:
     50     kwargs.setdefault("client", new_client or client)
---> 51     return await fn(*args, **kwargs)

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py:375, in create_then_begin_flow_run(flow, parameters, wait_for, return_type, client, user_thread)
    373     return state
    374 elif return_type == "result":
--> 375     return await state.result(fetch=True)
    376 else:
    377     raise ValueError(f"Invalid return type for flow engine {return_type!r}.")

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure)
     84     raise UnfinishedRun(
     85         f"Run is in {state.type.name} state, its result is not available."
     86     )
     88 if raise_on_failure and (
     89     state.is_crashed() or state.is_failed() or state.is_cancelled()
     90 ):
---> 91     raise await get_state_exception(state)
     93 if isinstance(state.data, DataDocument):
     94     result = result_from_state_with_data_document(
     95         state, raise_on_failure=raise_on_failure
     96     )

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py:833, in orchestrate_flow_run(flow, flow_run, parameters, wait_for, interruptible, client, partial_flow_run_context, user_thread)
    828         else:
    829             from_async.call_soon_in_new_thread(
    830                 flow_call, timeout=flow.timeout_seconds
    831             )
--> 833         result = await flow_call.aresult()
    835         waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
    836             flow_run_context.task_run_futures, client=client
    837         )
    838 except PausedRun:

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:292, in Call.aresult(self)
    286 """
    287 Wait for the result of the call.
    288
    289 For use from asynchronous contexts.
    290 """
    291 try:
--> 292     return await asyncio.wrap_future(self.future)
    293 except asyncio.CancelledError as exc:
    294     raise CancelledError() from exc

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:316, in Call._run_sync(***failed resolving arguments***)
    314 with set_current_call(self):
    315     with self.future.enforce_sync_deadline() as cancel_scope:
--> 316         result = self.fn(*self.args, **self.kwargs)
    318 # Return the coroutine for async execution
    319 if inspect.isawaitable(result):

Cell In[1], line 15, in test_timeout()
     13 @flow
     14 def test_timeout():
---> 15     show_timeouts()

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/tasks.py:505, in Task.__call__(self, return_state, wait_for, *args, **kwargs)
    501 parameters = get_call_parameters(self.fn, args, kwargs)
    503 return_type = "state" if return_state else "result"
--> 505 return enter_task_run_engine(
    506     self,
    507     parameters=parameters,
    508     wait_for=wait_for,
    509     task_runner=SequentialTaskRunner(),
    510     return_type=return_type,
    511     mapped=False,
    512 )

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py:1137, in enter_task_run_engine(task, parameters, wait_for, return_type, task_runner, mapped)
   1135     return from_async.wait_for_call_in_loop_thread(begin_run)
   1136 else:
-> 1137     return from_sync.wait_for_call_in_loop_thread(begin_run)

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py:243, in from_sync.wait_for_call_in_loop_thread(_from_sync__call, timeout, done_callbacks, contexts)
    241     stack.enter_context(context)
    242 waiter.wait()
--> 243 return call.result()

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:283, in Call.result(self, timeout)
    277 def result(self, timeout: Optional[float] = None) -> T:
    278     """
    279     Wait for the result of the call.
    280
    281     Not safe for use from asynchronous contexts.
    282     """
--> 283     return self.future.result(timeout=timeout)

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:169, in Future.result(self, timeout)
    167     raise CancelledError()
    168 elif self._state == FINISHED:
--> 169     return self.__get_result()
    171 self._condition.wait(timeout)
    173 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    174     # Raise Prefect cancelled error instead of
    175     # `concurrent.futures._base.CancelledError`

File ~/.pyenv/versions/3.10.8/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:346, in Call._run_async(***failed resolving arguments***)
    344     with set_current_call(self):
    345         with self.future.enforce_async_deadline() as cancel_scope:
--> 346             result = await coro
    347 except CancelledError:
    348     # Report cancellation
    349     if cancel_scope.timedout():

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py:1302, in get_task_call_return_value(task, flow_run_context, parameters, wait_for, return_type, task_runner, extra_task_inputs)
   1300     return await future._wait()
   1301 elif return_type == "result":
-> 1302     return await future._result()
   1303 else:
   1304     raise ValueError(f"Invalid return type for task engine {return_type!r}.")

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/futures.py:237, in PrefectFuture._result(self, timeout, raise_on_failure)
    235 if not final_state:
    236     raise TimeoutError("Call timed out before task finished.")
--> 237 return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)

File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure)
     84     raise UnfinishedRun(
     85         f"Run is in {state.type.name} state, its result is not available."
     86     )
     88 if raise_on_failure and (
     89     state.is_crashed() or state.is_failed() or state.is_cancelled()
     90 ):
---> 91     raise await get_state_exception(state)
     93 if isinstance(state.data, DataDocument):
     94     result = result_from_state_with_data_document(
     95         state, raise_on_failure=raise_on_failure
     96     )

TimeoutError:```

That’s odd… if I start the server and go to the API page - Settings, I get:

image

Furthermore, if I close the server, and type prefect version in the terminal inside my pipenv, I get:

Version: 2.10.16 API version: 0.8.4 Python version: 3.10.7 Git commit: 6cd7c3ee Built: Tue, Jun 20, 2023 2:59 PM OS/Arch: win32/AMD64 Profile: brScrape Server type: server

As far as I know, that’s the latest version.