Hello
Prefect 1.0 uses the concept of state_handlers that can be attached to tasks. In Prefect 2.0 PrefectFuture
were introduced.
I have defined function that sends message to the discord on task failure. This function checks if my_task
failed. Both function are async
async def on_failure_send_discord_msg(task_future: PrefectFuture):
task_future.wait() # block until completion
if task_future.get_state().is_failed():
....
....
...
@task(name="my_task")
async def my_task():
...
...
...
@flow(name="flow",
task_runner=SequentialTaskRunner())
async def flow(params):
....
....
future_obj = await my_task.submit()
await on_failure_send_discord_msg(future_obj)
return future_obj.result()
I deploy flow
function to prefect cloud. Every time when I start a run of the flow function on_failure_send_discord_msg
fails at task_future.get_state().is_failed()
Finished in state Failed("Flow run encountered an exception. AttributeError: 'coroutine' object has no attribute 'is_failed'\n")
. Could any help me with this? Why get_state()
return coroutine
object even though I’m awaiting?