`PrefectFuture` method `get_state()` returns `coroutine` object

Prefect 1.0 uses the concept of state_handlers that can be attached to tasks. In Prefect 2.0 PrefectFuture were introduced.
I have defined function that sends message to the discord on task failure. This function checks if my_task failed. Both function are async

async def on_failure_send_discord_msg(task_future: PrefectFuture):
    task_future.wait()  # block until completion
    if task_future.get_state().is_failed():

async def my_task():
async def flow(params):
    future_obj = await my_task.submit()
    await on_failure_send_discord_msg(future_obj)
    return  future_obj.result()

I deploy flow function to prefect cloud. Every time when I start a run of the flow function on_failure_send_discord_msg fails at task_future.get_state().is_failed()

Finished in state Failed("Flow run encountered an exception. AttributeError: 'coroutine' object has no attribute 'is_failed'\n"). Could any help me with this? Why get_state() return coroutine object even though I’m awaiting?

I modified a little on your code, i think the issue here is that you need to have await on PrefectFuture object

from prefect import flow, task
from prefect.futures import PrefectFuture
import asyncio

async def on_failure_send_discord_msg(task_future: PrefectFuture, WaitTime):
    return(await task_future.wait(WaitTime))  # Notice the await here!!!

    # Or if you want something similar to is_failed()
    # if str(await task_future.wait(WaitTime)) != 'Completed()':
    #     return('Not Completed')
    # else:
    #     return('Completed')

async def my_task():
    await asyncio.sleep(1)
    return('task run success')

async def flow(WaitTime):
    future_obj = await my_task.submit()
    status = await on_failure_send_discord_msg(future_obj, WaitTime)
asyncio.run(flow(0.5)) #Returns None
asyncio.run(flow(1.5)) #Returns Completed()
1 Like