In Orion, you can raise a custom exception directly in the flow, which will result in a
Failed task run:
from prefect import task @task def signal_task(message): if message == 'stop_immediately!': raise RuntimeError(message='Got a signal to end the task run!')
Alternatively, your flow may return a specific state:
from prefect import task from prefect.orion.schemas.states import Failed @task def signal_task(message): if message == 'stop_immediately!': return Failed(message='Stopping the task run immediately!')
Full flow example
from prefect import task, flow import random @task def random_number(): nr = random.random() if nr > 0.5: raise ValueError("Big number!") print("Continue the execution...") @flow def main_flow(): random_number() if __name__ == "__main__": main_flow()
Prefect 1.0 uses
signals to raise specific task run states. Signals are ways of telling the Prefect engine that a task should be moved immediately into a specific state or that the task should immediately end:
from prefect.engine.signals import FAIL, ENDRUN, SUCCESS, SKIP, PAUSE, RETRY from prefect import task @task def signal_task(message): if message == 'stop_immediately!': raise ENDRUN(message='Got a signal to end the task run!')
You can also end the task run in a given state (e.g.
Failed as in the example below) with no subsequent retries. If you raise a
FAILED signal instead of
ENDRUN, this will respect retries.
If this task has some downstream dependencies and you leverage the default
all_successful trigger, then this will end your flow run as well since the
TriggerFailed state will by default propagate to your downstream tasks.
from prefect import task from prefect.engine.signals import ENDRUN from prefect.engine.state import Failed @task def check_if_condition_met(): if "some state of the world": raise ENDRUN(Failed(message="Data is already in the database"))
You can find more about signals on the Signals page: