Hi,
Sharing my use case of using a message broker to launch flows:
I needed an external tool to handle a kind of Pub/Sub interactions, and trigger different automations in my architecture, including Prefect flows. I chose to use a message broker, Memphis, to handle all of the messaging for non-Prefect flows.
Then, I developped a small server to transform Memphis messages into Prefect REST API calls. It uses the message’s payload to retrieve the deployment id and parameters to pass to Prefect. Code bellow, followed by a Kubernetes deployment sample.
Memphis GitHub issue to follow any progress on a integration : Feature request: Connector to Prefect for triggering flows · Issue #1070 · memphisdev/memphis · GitHub
- prefect_connector.py
from __future__ import annotations
import asyncio
from memphis import Memphis, MemphisError, MemphisConnectError, MemphisHeaderError
import os
import httpx
import ast
#PREFECT
api_url = os.getenv('PREFECT_API_URL')
headers = {
"Authorization": f"Bearer {os.getenv('PREFECT_API_KEY')}"
}
async def handle_message(msgs, error, context):
try:
for msg in msgs:
print("message: ", msg.get_data())
data = ast.literal_eval(msg.get_data().decode("UTF-8"))
async with httpx.AsyncClient() as client:
payload = { "parameters": data.get("parameters", {})}
deployment_id = data["deployment_id"]
url = f"{api_url}/deployments/{deployment_id}/create_flow_run"
print(f"sending payload {payload} to url {url}")
response = await client.post(
url,
headers=headers,
json=payload
)
response.raise_for_status()
await msg.ack()
if error:
print(error)
except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
print(e)
return
except (Exception) as e:
print(e)
return
async def main():
try:
memphis = Memphis()
await memphis.connect(host=os.getenv("MEMPHIS_HOST"), username=os.getenv("MEMPHIS_USERNAME"), password=os.getenv("MEMPHIS_PASSWORD"), account_id=os.getenv("MEMPHIS_ACCOUNT"))
consumer = await memphis.consumer(station_name="prefect", consumer_name=os.getenv("MEMPHIS_CONSUMER"))
consumer.set_context({"target": "prefect"})
consumer.consume(handle_message)
# Keep your main thread alive so the consumer will keep receiving data
await asyncio.Event().wait()
except (MemphisError, MemphisConnectError) as e:
print(e)
finally:
await memphis.close()
if __name__ == "__main__":
asyncio.run(main())
- Dockerfile
FROM python:3.11-slim
RUN pip install memphis-py
RUN pip install httpx
COPY prefect_connector.py prefect_connector.py
ENV PREFECT_API_URL=https://api.prefect.cloud/api/accounts/###/workspaces/###
ENV PREFECT_API_KEY=###
ENV MEMPHIS_HOST=###
ENV MEMPHIS_USERNAME=###
ENV MEMPHIS_PASSWORD=###
ENV MEMPHIS_ACCOUNT=###
ENV MEMPHIS_CONSUMER=###
ENTRYPOINT ["python"]
CMD ["prefect_connector.py"]
- deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: connector
spec:
replicas: 1
selector:
matchLabels:
app: connector
template:
metadata:
labels:
app: connector
spec:
containers:
- name: connector
image: ###/memphis-connector:1.0