I have a task which I would like to have the status as Failed but also return data, from what I have read in the docs, I’m unsure how to do this.
An example:
from prefect import flow, get_run_logger, task
from prefect.server.schemas.states import Completed, Failed
@flow(name="My Flow")
def my_flow():
data = data_task.submit()
state = failed_state_task.submit()
data_and_failed_state = data_and_failed_state_task.submit()
logger = get_run_logger()
logger.info("data_task: " + data.result())
logger.info("failed_state: " + str(state.wait()))
logger.info("data_and_failed_state: " + str(data_and_failed_state.wait()))
logger.info("data_and_failed_state: " + str(data_and_failed_state.result()))
@task
def data_task():
return "Hello World"
@task
def failed_state_task():
return Failed(message="Ohh no!")
@task
def data_and_failed_state_task():
return Failed(message="Not again!", result=123)
This produces the following output:
data_task: Hello World
failed_state: Failed('Ohh no!')
data_and_failed_state: Failed('Not again!')
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 833, in orchestrate_flow_run
result = await flow_call.aresult()
File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/tmp/tmp8see8jloprefect/admin/state_and_data.py", line 14, in my_flow
logger.info("data_and_failed_state: " + str(data_and_failed_state.result()))
File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 228, in result
return from_sync.call_soon_in_loop_thread(result).result()
File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
return self.future.result(timeout=timeout)
File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 178, in result
return self.__get_result()
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
prefect.exceptions.FailedRun: Not again!
So before I have a chance to get my result, Prefect is taken my Failed
task state and raising an Exception…
So is it possible to pass a result back from a Failed task? And how do you do it?