How to return a State as well as data

I have a task which I would like to have the status as Failed but also return data, from what I have read in the docs, I’m unsure how to do this.

An example:

from prefect import flow, get_run_logger, task
from prefect.server.schemas.states import Completed, Failed


@flow(name="My Flow")
def my_flow():
    data = data_task.submit()
    state = failed_state_task.submit()
    data_and_failed_state = data_and_failed_state_task.submit()
    logger = get_run_logger()
    logger.info("data_task: " + data.result())
    logger.info("failed_state: " + str(state.wait()))
    logger.info("data_and_failed_state: " + str(data_and_failed_state.wait()))
    logger.info("data_and_failed_state: " + str(data_and_failed_state.result()))

@task
def data_task():
    return "Hello World"

@task
def failed_state_task():
    return Failed(message="Ohh no!")

@task
def data_and_failed_state_task():
    return Failed(message="Not again!", result=123)

This produces the following output:

data_task: Hello World
failed_state: Failed('Ohh no!')
data_and_failed_state: Failed('Not again!')
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 833, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/tmp/tmp8see8jloprefect/admin/state_and_data.py", line 14, in my_flow
    logger.info("data_and_failed_state: " + str(data_and_failed_state.result()))
  File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 228, in result
    return from_sync.call_soon_in_loop_thread(result).result()
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 178, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
  File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
prefect.exceptions.FailedRun: Not again!

So before I have a chance to get my result, Prefect is taken my Failed task state and raising an Exception…

So is it possible to pass a result back from a Failed task? And how do you do it?

2 Likes

I’ve been experimenting with this particular problem, and it looks like this could work:

@task(name="my_task")
def fail_task():
    return Failed(message="oh no!", data=123)

@flow(name="my_flow")
def my_flow():
    state = fail_task(return_state=True)
    return_data = state.data.get()
    print(return_data)