There are a couple of ways how you could approach that problem.
1. State-based execution with signals
If you look more closely at what sensors are doing, they are polling for some state (e.g. whether a file arrived in S3/SFTP) and if the condition is not met, the sensor sleeps and tries again in a couple of seconds/minutes later.
If you want to implement something similar in Prefect, you could leverage the RETRY
signal to implement such polling. Here is an example checking if “example_file.csv
” file arrived in S3:
import pendulum
from prefect.engine.signals import RETRY
import awswrangler as wr
def check_if_file_arrived_in_s3():
return wr.s3.does_object_exist("s3://bucket/example_file.csv")
@task
def s3_sensor(**kwargs):
bool_s3_object_arrived = check_if_file_arrived_in_s3()
if bool_s3_object_arrived is False:
raise RETRY(
"File not available yet, retrying in 20 seconds.",
start_time=pendulum.now().add(seconds=20),
)
2. Leveraging waiter’s abstractions
Many libraries such as AWS’s boto3
include waiters for similar use cases, which automatically implement polling under the hood.
import boto3
s3_resource = boto3.resource("s3")
bucket = s3_resource.Bucket("your_bucket_name")
# example file, doesn't need to be CSV
s3_object = "your_file_name.csv"
data_to_arrive = bucket.Object(s3_object)
data_to_arrive.wait_until_exists()
Prefect provides a generic AWSClientWait
task that allows you to do that with any AWS waiter: see the AWSClientWait API Reference.
It’s usually used with long-running AWS jobs such as AWS Batch jobs for which Prefect has the BatchSubmit
task: see the BatchSubmit API Reference.
3. Event-based execution with AWS Lambda
Once the event that you are looking for occurs, trigger a specific flow to run. If you are on AWS, this event could trigger a Lambda function that triggers a flow run. The event could be a new file that arrived in S3 or a new row streamed to DynamoDB/AWS Aurora etc. For a concrete implementation with AWS Lambda, check out this blog post.
import json
import os
import urllib.parse
import urllib.request
print("Loading function")
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
## prep the data
create_mutation = """
mutation($input: createFlowRunInput!){
createFlowRun(input: $input){
flow_run{
id
}
}
}
"""
inputs = dict(flowId=os.getenv("PREFECT__FLOW_ID"))
variables = dict(input=inputs)
data = json.dumps(
dict(query=create_mutation, variables=json.dumps(variables))
).encode("utf-8")
## prep the request
req = urllib.request.Request(os.getenv("PREFECT__CLOUD__API"), data=data)
req.add_header("Content-Type", "application/json")
req.add_header(
"Authorization", "Bearer {}".format(os.getenv("PREFECT__CLOUD__AUTH_TOKEN"))
)
## send the request and return the response
resp = urllib.request.urlopen(req)
return json.loads(resp.read().decode())
Also, check out this blog post showing how to do the same using the serverless framework:
4. API-based
You can always trigger a flow run upon request from your event using the create_flow_run
GraphQL mutation. You don’t even need to have Prefect installed to trigger a flow run - here is an example using Python’s requests
library.
import requests
query = """
mutation {
create_flow_run(input: { version_group_id: "fb919cc4-7f74-4ed7-9f3c-71bdd9be69e8" }) {
id
}
}
"""
url = "https://api.prefect.io"
response = requests.post(
url, json={"query": query}, headers={"authorization": "Bearer XXX"}
)
print(response.status_code)
print(response.text)
Note that we use version_group_id
, as it will remain the same regardless of new flow versions later.
To do the same with Parameters
:
import requests
create_mutation = """
mutation($input: createFlowRunInput!){
createFlowRun(input: $input){
flow_run{
id
}
}
}
"""
inputs = dict(
versionGroupId="339c86be-5c1c-48f0-b8d3-fe57654afe22", parameters=dict(x=6)
)
response = requests.post(
url="https://api.prefect.io",
json=dict(query=create_mutation, variables=dict(input=inputs)),
headers=dict(authorization=f"Bearer {API_KEY}"),
)
print(response.status_code)
print(response.text)
Use cases for event-driven flow run creation
There are many scenarios that would fit into this pattern, for instance:
- as soon as a database table gets updated in some custom application, this may trigger a flow to take action on that update,
- when your external job completes (e.g. a Spark job on Databricks), you trigger another flow run via an API call to do some post-processing or send you a notification about it.