`PrefectFuture` method `get_state()` returns `coroutine` object

Hello
Prefect 1.0 uses the concept of state_handlers that can be attached to tasks. In Prefect 2.0 PrefectFuture were introduced.
I have defined function that sends message to the discord on task failure. This function checks if my_task failed. Both function are async

async def on_failure_send_discord_msg(task_future: PrefectFuture):
    task_future.wait()  # block until completion
    if task_future.get_state().is_failed():
    ....
    ....
    ...

@task(name="my_task")
async def my_task():
   ...
   ...
   ...
@flow(name="flow",
     task_runner=SequentialTaskRunner())
async def flow(params):
     ....
     ....
    future_obj = await my_task.submit()
    await on_failure_send_discord_msg(future_obj)
    return  future_obj.result()

I deploy flow function to prefect cloud. Every time when I start a run of the flow function on_failure_send_discord_msg fails at task_future.get_state().is_failed()

Finished in state Failed("Flow run encountered an exception. AttributeError: 'coroutine' object has no attribute 'is_failed'\n"). Could any help me with this? Why get_state() return coroutine object even though Iā€™m awaiting?

I modified a little on your code, i think the issue here is that you need to have await on PrefectFuture object

from prefect import flow, task
from prefect.futures import PrefectFuture
import asyncio

async def on_failure_send_discord_msg(task_future: PrefectFuture, WaitTime):
    return(await task_future.wait(WaitTime))  # Notice the await here!!!

    # Or if you want something similar to is_failed()
    # if str(await task_future.wait(WaitTime)) != 'Completed()':
    #     return('Not Completed')
    # else:
    #     return('Completed')

@task(name="my_task")
async def my_task():
    await asyncio.sleep(1)
    return('task run success')

@flow(name="flow")
async def flow(WaitTime):
    future_obj = await my_task.submit()
    status = await on_failure_send_discord_msg(future_obj, WaitTime)
    print(status)
asyncio.run(flow(0.5)) #Returns None
asyncio.run(flow(1.5)) #Returns Completed()
1 Like