Prefect 1.0 uses the concept of state_handlers that can be attached to tasks. In Prefect 2.0
PrefectFuture were introduced.
I have defined function that sends message to the discord on task failure. This function checks if
my_task failed. Both function are
async def on_failure_send_discord_msg(task_future: PrefectFuture): task_future.wait() # block until completion if task_future.get_state().is_failed(): .... .... ...
@task(name="my_task") async def my_task(): ... ... ...
@flow(name="flow", task_runner=SequentialTaskRunner()) async def flow(params): .... .... future_obj = await my_task.submit() await on_failure_send_discord_msg(future_obj) return future_obj.result()
flow function to prefect cloud. Every time when I start a run of the flow function
on_failure_send_discord_msg fails at
Finished in state Failed("Flow run encountered an exception. AttributeError: 'coroutine' object has no attribute 'is_failed'\n"). Could any help me with this? Why
coroutine object even though I’m awaiting?