@Michael_Smith: Hello, my prefect 2.0 tests are going well…I see that Flow has a timeout_seconds parameter, is there any way to set Task level timeouts as well?
@Kevin_Kho: Orion doesn’t have timeouts yet so I don’t think there is a simple way like in Prefect 1
@Michael_Adkins: We don’t support cancellation of tasks at this time, but you can perform logic based on a wait/timeout
future = my_task(arg, arg)
state = future.wait(5)
if not state:
# Do something based on this timeout
...
Here’s the example from the docstring
Wait N sconds for the task to complete
>>> @flow
>>> def my_flow():
>>> future = my_task()
>>> final_state = future.wait(0.1)
>>> if final_state:
>>> ... # Task done
>>> else:
>>> ... # Task not done yet
For the latest versions of 2.0, you can set task-level timeouts by adding .submit().wait()
from prefect import flow, task, get_run_logger
from time import sleep
@task
def my_task():
sleep(2)
return 1
@flow
def my_flow():
final_state = my_task.submit().wait(0.1)
logger = get_run_logger()
if final_state:
logger.info("The task is done")
else:
logger.info("The task is canceled because it takes too long to run")
if __name__ == "__main__":
my_flow()
Output:
14:11:11.406 | INFO | prefect.engine - Created flow run 'glorious-condor' for
flow 'my-flow'
14:11:13.032 | INFO | prefect.flow_runs - Created task run
'my_task-20c6ece6-0' for task 'my_task'
14:11:13.033 | INFO | prefect.flow_runs - Submitted task run
'my_task-20c6ece6-0' for execution.
14:11:13.135 | INFO | prefect.flow_runs - The task is canceled because it
takes too long to run
14:11:15.701 | INFO | prefect.task_runs - Finished in state Completed()
14:11:15.827 | INFO | prefect.flow_runs - Finished in state Completed('All
states completed.')
But that would mean that the tasks are only executed sequentially, correct?
Let’s say I want to spawn several compute-intensive Prefect tasks in parallel. I know that some of them might get stuck and I want them to fail if they run longer then 10 minutes.
With the .wait() approach, every task would be executed in a sequential fashion and not concurrently, correct?
from prefect import flow, task
from prefect_ray import RayTaskRunner
@task
def my_task(param):
do_heavy_compute(param)
return True
@flow(task_runner=RayTaskRunner(address="my-beefy-autoscaled-cluster"))
def my_flow():
params = [
"various computational inputs",
"eg machine learning parameters",
]
for param in params:
my_task.submit(param).wait(60 * 10)
if __name__ == "__main__":
my_flow()
That’s what I expected when I ran that snippet in docs, before having to google the problem and end up here. Please take a look at the code and output. What’s wrong with it? Why does it complete?
CODE:
from prefect import task, get_run_logger, flow
import time
@task(timeout_seconds=1)
def show_timeouts():
logger = get_run_logger()
logger.info("I will execute")
time.sleep(5)
logger.info("I will not execute")
@flow
def test_timeout():
show_timeouts()
test_timeout()
OUTPUT:
13:11:07.642 | INFO | prefect.engine - Created flow run 'organic-toucan' for flow 'test-timeout'
13:11:07.646 | INFO | Flow run 'organic-toucan' - View at http://127.0.0.1:4200/flow-runs/flow-run/4d09c304-6fa2-4b20-8502-c3b0c09c964b
13:11:07.850 | INFO | Flow run 'organic-toucan' - Created task run 'show_timeouts-0' for task 'show_timeouts'
13:11:07.851 | INFO | Flow run 'organic-toucan' - Executing 'show_timeouts-0' immediately...
13:11:07.944 | INFO | Task run 'show_timeouts-0' - I will execute
13:11:12.957 | INFO | Task run 'show_timeouts-0' - I will not execute
13:11:13.005 | INFO | Task run 'show_timeouts-0' - Finished in state Completed()
13:11:13.049 | INFO | Flow run 'organic-toucan' - Finished in state Completed('All states completed.')
Process finished with exit code 0
I just ran the exact same code and encountered the expected TimeoutError, stack trace below. Can you double check that your Prefect version is up to date?
In [1]: from prefect import task, get_run_logger, flow
...: import time
...:
...:
...: @task(timeout_seconds=1)
...: def show_timeouts():
...: logger = get_run_logger()
...: logger.info("I will execute")
...: time.sleep(5)
...: logger.info("I will not execute")
...:
...:
...: @flow
...: def test_timeout():
...: show_timeouts()
...:
...:
...: test_timeout()
18:00:11.334 | INFO | prefect.engine - Created flow run 'fervent-degu' for flow 'test-timeout'
18:00:11.336 | INFO | Flow run 'fervent-degu' - View at https://app.prefect.cloud/account/0f4498-d380-4d7b-b2da03823f/workspace/969-4ab8-93b7-2dfa784903e6/flow-runs/flow-run/107f3ba9-93fa-4036-b84d-27ae8f8352e1
18:00:36.000 | INFO | Flow run 'fervent-degu' - Created task run 'show_timeouts-0' for task 'show_timeouts'
18:00:36.001 | INFO | Flow run 'fervent-degu' - Executing 'show_timeouts-0' immediately...
18:00:37.055 | INFO | Task run 'show_timeouts-0' - I will execute
18:00:42.063 | ERROR | Task run 'show_timeouts-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
result = await call.aresult()
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 294, in aresult
raise CancelledError() from exc
prefect._internal.concurrency.cancellation.CancelledError
18:00:42.264 | ERROR | Task run 'show_timeouts-0' - Finished in state TimedOut('Task run exceeded timeout of 1.0 seconds TimeoutError: ', type=FAILED)
18:00:42.275 | ERROR | Flow run 'fervent-degu' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
result = await call.aresult()
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 294, in aresult
raise CancelledError() from exc
prefect._internal.concurrency.cancellation.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py", line 833, in orchestrate_flow_run
result = await flow_call.aresult()
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "<ipython-input-1-33b94ab4af9e>", line 15, in test_timeout
show_timeouts()
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/tasks.py", line 505, in __call__
return enter_task_run_engine(
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py", line 1137, in enter_task_run_engine
return from_sync.wait_for_call_in_loop_thread(begin_run)
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
return call.result()
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
return self.future.result(timeout=timeout)
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
return self.__get_result()
File "/Users/georgecoyne/.pyenv/versions/3.10.8/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py", line 1302, in get_task_call_return_value
return await future._result()
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/Users/georgecoyne/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
TimeoutError
18:00:42.527 | ERROR | Flow run 'fervent-degu' - Finished in state Failed('Flow run encountered an exception. TimeoutError: ')
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:292, in Call.aresult(self)
291 try:
--> 292 return await asyncio.wrap_future(self.future)
293 except asyncio.CancelledError as exc:
CancelledError:
The above exception was the direct cause of the following exception:
CancelledError Traceback (most recent call last)
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py:1719, in orchestrate_task_run(task, task_run, parameters, wait_for, result_factory, log_prints, interruptible, client)
1716 call = from_async.call_soon_in_new_thread(
1717 create_call(task.fn, *args, **kwargs), timeout=task.timeout_seconds
1718 )
-> 1719 result = await call.aresult()
1721 except (CancelledError, asyncio.CancelledError) as exc:
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:294, in Call.aresult(self)
293 except asyncio.CancelledError as exc:
--> 294 raise CancelledError() from exc
CancelledError:
The above exception was the direct cause of the following exception:
TimeoutError Traceback (most recent call last)
Cell In[1], line 18
13 @flow
14 def test_timeout():
15 show_timeouts()
---> 18 test_timeout()
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/flows.py:511, in Flow.__call__(self, return_state, wait_for, *args, **kwargs)
507 parameters = get_call_parameters(self.fn, args, kwargs)
509 return_type = "state" if return_state else "result"
--> 511 return enter_flow_run_engine_from_flow_call(
512 self,
513 parameters,
514 wait_for=wait_for,
515 return_type=return_type,
516 )
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py:272, in enter_flow_run_engine_from_flow_call(flow, parameters, wait_for, return_type)
265 retval = from_async.wait_for_call_in_loop_thread(
266 begin_run,
267 done_callbacks=done_callbacks,
268 contexts=contexts,
269 )
271 else:
--> 272 retval = from_sync.wait_for_call_in_loop_thread(
273 begin_run,
274 done_callbacks=done_callbacks,
275 contexts=contexts,
276 )
278 return retval
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py:243, in from_sync.wait_for_call_in_loop_thread(_from_sync__call, timeout, done_callbacks, contexts)
241 stack.enter_context(context)
242 waiter.wait()
--> 243 return call.result()
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:283, in Call.result(self, timeout)
277 def result(self, timeout: Optional[float] = None) -> T:
278 """
279 Wait for the result of the call.
280
281 Not safe for use from asynchronous contexts.
282 """
--> 283 return self.future.result(timeout=timeout)
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:169, in Future.result(self, timeout)
167 raise CancelledError()
168 elif self._state == FINISHED:
--> 169 return self.__get_result()
171 self._condition.wait(timeout)
173 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
174 # Raise Prefect cancelled error instead of
175 # `concurrent.futures._base.CancelledError`
File ~/.pyenv/versions/3.10.8/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
401 if self._exception:
402 try:
--> 403 raise self._exception
404 finally:
405 # Break a reference cycle with the exception in self._exception
406 self = None
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:346, in Call._run_async(***failed resolving arguments***)
344 with set_current_call(self):
345 with self.future.enforce_async_deadline() as cancel_scope:
--> 346 result = await coro
347 except CancelledError:
348 # Report cancellation
349 if cancel_scope.timedout():
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/client/utilities.py:51, in inject_client.<locals>.with_injected_client(*args, **kwargs)
49 async with client_context as new_client:
50 kwargs.setdefault("client", new_client or client)
---> 51 return await fn(*args, **kwargs)
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py:375, in create_then_begin_flow_run(flow, parameters, wait_for, return_type, client, user_thread)
373 return state
374 elif return_type == "result":
--> 375 return await state.result(fetch=True)
376 else:
377 raise ValueError(f"Invalid return type for flow engine {return_type!r}.")
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure)
84 raise UnfinishedRun(
85 f"Run is in {state.type.name} state, its result is not available."
86 )
88 if raise_on_failure and (
89 state.is_crashed() or state.is_failed() or state.is_cancelled()
90 ):
---> 91 raise await get_state_exception(state)
93 if isinstance(state.data, DataDocument):
94 result = result_from_state_with_data_document(
95 state, raise_on_failure=raise_on_failure
96 )
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py:833, in orchestrate_flow_run(flow, flow_run, parameters, wait_for, interruptible, client, partial_flow_run_context, user_thread)
828 else:
829 from_async.call_soon_in_new_thread(
830 flow_call, timeout=flow.timeout_seconds
831 )
--> 833 result = await flow_call.aresult()
835 waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
836 flow_run_context.task_run_futures, client=client
837 )
838 except PausedRun:
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:292, in Call.aresult(self)
286 """
287 Wait for the result of the call.
288
289 For use from asynchronous contexts.
290 """
291 try:
--> 292 return await asyncio.wrap_future(self.future)
293 except asyncio.CancelledError as exc:
294 raise CancelledError() from exc
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:316, in Call._run_sync(***failed resolving arguments***)
314 with set_current_call(self):
315 with self.future.enforce_sync_deadline() as cancel_scope:
--> 316 result = self.fn(*self.args, **self.kwargs)
318 # Return the coroutine for async execution
319 if inspect.isawaitable(result):
Cell In[1], line 15, in test_timeout()
13 @flow
14 def test_timeout():
---> 15 show_timeouts()
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/tasks.py:505, in Task.__call__(self, return_state, wait_for, *args, **kwargs)
501 parameters = get_call_parameters(self.fn, args, kwargs)
503 return_type = "state" if return_state else "result"
--> 505 return enter_task_run_engine(
506 self,
507 parameters=parameters,
508 wait_for=wait_for,
509 task_runner=SequentialTaskRunner(),
510 return_type=return_type,
511 mapped=False,
512 )
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py:1137, in enter_task_run_engine(task, parameters, wait_for, return_type, task_runner, mapped)
1135 return from_async.wait_for_call_in_loop_thread(begin_run)
1136 else:
-> 1137 return from_sync.wait_for_call_in_loop_thread(begin_run)
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py:243, in from_sync.wait_for_call_in_loop_thread(_from_sync__call, timeout, done_callbacks, contexts)
241 stack.enter_context(context)
242 waiter.wait()
--> 243 return call.result()
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:283, in Call.result(self, timeout)
277 def result(self, timeout: Optional[float] = None) -> T:
278 """
279 Wait for the result of the call.
280
281 Not safe for use from asynchronous contexts.
282 """
--> 283 return self.future.result(timeout=timeout)
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:169, in Future.result(self, timeout)
167 raise CancelledError()
168 elif self._state == FINISHED:
--> 169 return self.__get_result()
171 self._condition.wait(timeout)
173 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
174 # Raise Prefect cancelled error instead of
175 # `concurrent.futures._base.CancelledError`
File ~/.pyenv/versions/3.10.8/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
401 if self._exception:
402 try:
--> 403 raise self._exception
404 finally:
405 # Break a reference cycle with the exception in self._exception
406 self = None
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:346, in Call._run_async(***failed resolving arguments***)
344 with set_current_call(self):
345 with self.future.enforce_async_deadline() as cancel_scope:
--> 346 result = await coro
347 except CancelledError:
348 # Report cancellation
349 if cancel_scope.timedout():
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/engine.py:1302, in get_task_call_return_value(task, flow_run_context, parameters, wait_for, return_type, task_runner, extra_task_inputs)
1300 return await future._wait()
1301 elif return_type == "result":
-> 1302 return await future._result()
1303 else:
1304 raise ValueError(f"Invalid return type for task engine {return_type!r}.")
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/futures.py:237, in PrefectFuture._result(self, timeout, raise_on_failure)
235 if not final_state:
236 raise TimeoutError("Call timed out before task finished.")
--> 237 return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File ~/.pyenv/versions/3.10.8/envs/marvin/lib/python3.10/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure)
84 raise UnfinishedRun(
85 f"Run is in {state.type.name} state, its result is not available."
86 )
88 if raise_on_failure and (
89 state.is_crashed() or state.is_failed() or state.is_cancelled()
90 ):
---> 91 raise await get_state_exception(state)
93 if isinstance(state.data, DataDocument):
94 result = result_from_state_with_data_document(
95 state, raise_on_failure=raise_on_failure
96 )
TimeoutError:```