Using a message broker to trigger flow runs

Hi,
Sharing my use case of using a message broker to launch flows:

I needed an external tool to handle a kind of Pub/Sub interactions, and trigger different automations in my architecture, including Prefect flows. I chose to use a message broker, Memphis, to handle all of the messaging for non-Prefect flows.

Then, I developped a small server to transform Memphis messages into Prefect REST API calls. It uses the message’s payload to retrieve the deployment id and parameters to pass to Prefect. Code bellow, followed by a Kubernetes deployment sample.

Memphis GitHub issue to follow any progress on a integration : Feature request: Connector to Prefect for triggering flows · Issue #1070 · memphisdev/memphis · GitHub

  • prefect_connector.py
from __future__ import annotations
import asyncio
from memphis import Memphis, MemphisError, MemphisConnectError, MemphisHeaderError
import os
import httpx
import ast


#PREFECT
api_url = os.getenv('PREFECT_API_URL')
headers = {
  "Authorization": f"Bearer {os.getenv('PREFECT_API_KEY')}"
}



async def handle_message(msgs, error, context):
    try:
        for msg in msgs:
            print("message: ", msg.get_data())
            data = ast.literal_eval(msg.get_data().decode("UTF-8"))

            async with httpx.AsyncClient() as client:
                payload = { "parameters": data.get("parameters", {})}
                deployment_id = data["deployment_id"]
                url = f"{api_url}/deployments/{deployment_id}/create_flow_run"
                print(f"sending payload {payload} to url {url}")
                response = await client.post(
                    url,
                    headers=headers,
                    json=payload
                )
                response.raise_for_status()
            
            await msg.ack()
            if error:
                print(error)
    except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
        print(e)
        return
    except (Exception) as e:
        print(e)
        return



async def main():
    try:
        memphis = Memphis()
        await memphis.connect(host=os.getenv("MEMPHIS_HOST"), username=os.getenv("MEMPHIS_USERNAME"), password=os.getenv("MEMPHIS_PASSWORD"), account_id=os.getenv("MEMPHIS_ACCOUNT"))
        
        consumer = await memphis.consumer(station_name="prefect", consumer_name=os.getenv("MEMPHIS_CONSUMER"))
        consumer.set_context({"target": "prefect"})
        consumer.consume(handle_message)
        # Keep your main thread alive so the consumer will keep receiving data
        await asyncio.Event().wait()

    except (MemphisError, MemphisConnectError) as e:
        print(e)
        
    finally:
        await memphis.close()
        
if __name__ == "__main__":
    asyncio.run(main())
  • Dockerfile
FROM python:3.11-slim

RUN pip install memphis-py
RUN pip install httpx

COPY prefect_connector.py prefect_connector.py

ENV PREFECT_API_URL=https://api.prefect.cloud/api/accounts/###/workspaces/###
ENV PREFECT_API_KEY=###

ENV MEMPHIS_HOST=###
ENV MEMPHIS_USERNAME=###
ENV MEMPHIS_PASSWORD=###
ENV MEMPHIS_ACCOUNT=###
ENV MEMPHIS_CONSUMER=###

ENTRYPOINT ["python"]
CMD ["prefect_connector.py"]
  • deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: connector
spec:
  replicas: 1
  selector:
    matchLabels:
      app: connector
  template:
    metadata:
      labels:
        app: connector
    spec:
     containers:
      - name: connector
        image: ###/memphis-connector:1.0