Is there an equivalent to sensors in Prefect? How do I trigger event-driven workflows?

There are a couple of ways how you could approach that problem.

1. State-based execution with signals

If you look more closely at what sensors are doing, they are polling for some state (e.g. whether a file arrived in S3/SFTP) and if the condition is not met, the sensor sleeps and tries again in a couple of seconds/minutes later.

If you want to implement something similar in Prefect, you could leverage the RETRY signal to implement such polling. Here is an example checking if “example_file.csv” file arrived in S3:

import pendulum
from prefect.engine.signals import RETRY
import awswrangler as wr

def check_if_file_arrived_in_s3():
    return wr.s3.does_object_exist("s3://bucket/example_file.csv")

@task
def s3_sensor(**kwargs):
    bool_s3_object_arrived = check_if_file_arrived_in_s3()
    if bool_s3_object_arrived is False:
        raise RETRY(
            "File not available yet, retrying in 20 seconds.",
            start_time=pendulum.now().add(seconds=20),
        )

2. Leveraging waiter’s abstractions

Many libraries such as AWS’s boto3 include waiters for similar use cases, which automatically implement polling under the hood.

import boto3

s3_resource = boto3.resource("s3")
bucket = s3_resource.Bucket("your_bucket_name")

# example file, doesn't need to be CSV
s3_object = "your_file_name.csv"  
data_to_arrive = bucket.Object(s3_object)
data_to_arrive.wait_until_exists()

Prefect provides a generic AWSClientWait task that allows you to do that with any AWS waiter: see the AWSClientWait API Reference.

It’s usually used with long-running AWS jobs such as AWS Batch jobs for which Prefect has the BatchSubmit task: see the BatchSubmit API Reference.

3. Event-based execution with AWS Lambda

Once the event that you are looking for occurs, trigger a specific flow to run. If you are on AWS, this event could trigger a Lambda function that triggers a flow run. The event could be a new file that arrived in S3 or a new row streamed to DynamoDB/AWS Aurora etc. For a concrete implementation with AWS Lambda, check out this blog post.

import json
import os
import urllib.parse
import urllib.request

print("Loading function")

def lambda_handler(event, context):
    print("Received event: " + json.dumps(event, indent=2))

    ## prep the data
    create_mutation = """
    mutation($input: createFlowRunInput!){
        createFlowRun(input: $input){
            flow_run{
                id
            }
        }
    }
    """
    inputs = dict(flowId=os.getenv("PREFECT__FLOW_ID"))

    variables = dict(input=inputs)
    data = json.dumps(
        dict(query=create_mutation, variables=json.dumps(variables))
    ).encode("utf-8")

    ## prep the request
    req = urllib.request.Request(os.getenv("PREFECT__CLOUD__API"), data=data)
    req.add_header("Content-Type", "application/json")
    req.add_header(
        "Authorization", "Bearer {}".format(os.getenv("PREFECT__CLOUD__AUTH_TOKEN"))
    )

    ## send the request and return the response
    resp = urllib.request.urlopen(req)
    return json.loads(resp.read().decode())

Also, check out this blog post showing how to do the same using the serverless framework:

4. API-based

You can always trigger a flow run upon request from your event using the create_flow_run GraphQL mutation. You don’t even need to have Prefect installed to trigger a flow run - here is an example using Python’s requests library.

import requests

query = """
 mutation {
  create_flow_run(input: { version_group_id: "fb919cc4-7f74-4ed7-9f3c-71bdd9be69e8" }) {
    id
  }
}
"""

url = "https://api.prefect.io"
response = requests.post(
    url, json={"query": query}, headers={"authorization": "Bearer XXX"}
)
print(response.status_code)
print(response.text)

Note that we use version_group_id, as it will remain the same regardless of new flow versions later.

To do the same with Parameters:

import requests

create_mutation = """
mutation($input: createFlowRunInput!){
    createFlowRun(input: $input){
        flow_run{
            id
        }
    }
}
"""

inputs = dict(
    versionGroupId="339c86be-5c1c-48f0-b8d3-fe57654afe22", parameters=dict(x=6)
)
response = requests.post(
    url="https://api.prefect.io",
    json=dict(query=create_mutation, variables=dict(input=inputs)),
    headers=dict(authorization=f"Bearer {API_KEY}"),
)
print(response.status_code)
print(response.text)

Use cases for event-driven flow run creation

There are many scenarios that would fit into this pattern, for instance:

  • as soon as a database table gets updated in some custom application, this may trigger a flow to take action on that update,
  • when your external job completes (e.g. a Spark job on Databricks), you trigger another flow run via an API call to do some post-processing or send you a notification about it.