Asynchronous parallel tasks

I would like to build a flow which would trigger a subflow each time a message is read from a RabbitMQ queue. I would like to have the subflows be executed in parallel while the main flow listens to the queue.

To achieve this, I need a dask part for the parallelism and an asynchronous part for the triggering by messages.

I technically have achieved each part individually, but I can’t seem to merge them.

Parallelism:

from logging import Logger
from prefect import flow, task, get_run_logger
from prefect_dask import DaskTaskRunner

import time


@task
def shout(number: int) -> None:
    logger: Logger = get_run_logger()
    time.sleep(0.5)
    logger.warning(f"#{number}")


@flow(task_runner=DaskTaskRunner)
def count_to(highest: int = 10) -> None:
    for number in range(highest):
        shout.submit(number=number)


if __name__ == "__main__":
    count_to(highest=10)
from logging import Logger

import aiormq
from prefect import flow, task, get_run_logger

import asyncio
import aio_pika

from time import sleep
import sys


@task(name="duration_waiter")
def duration_waiter(msg: str, logger: Logger) -> int:
    logger.info(f"Beginning wait of {msg} seconds")
    seconds: int = int(msg)   # The order may change on remote systems
    sleep(seconds)
    return seconds


@task(name="value_printer")
def value_printer(seconds: int, logger: Logger) -> None:
    logger.info(f"Waited {seconds} seconds successfully")


@flow(name="message_handling")
async def secondary_flow(msg: str):
    logger: Logger = get_run_logger()
    duration: int = duration_waiter(msg=msg, logger=logger)
    value_printer(seconds=duration, logger=logger)


@flow(name="message_dispatcher")
async def main_flow() -> None:
    async def callback(msg: aio_pika.abc.AbstractIncomingMessage):
        logger.info("Message received")
        async with msg.process():
            await secondary_flow(msg=str(msg.body.decode(encoding="utf-8")))

    logger: Logger = get_run_logger()
    connection = await aio_pika.connect_robust(host="<rabbit_mq_address>")
    logger.info("Connected to RabbitMQ")
    queue_name: str = "Test_channel"
    channel = await connection.channel()
    logger.info("Connected to channel")
    await channel.set_qos(prefetch_count=100)
    try:
        await channel.queue_delete(queue_name=queue_name)
    except aiormq.exceptions.ChannelNotFoundEntity:
        pass
    queue = await channel.declare_queue(queue_name, auto_delete=True)
    logger.info("Connected to queue")

    while True:
        await queue.consume(callback)


if __name__ == "__main__":
    try:
        main_flow()
    except KeyboardInterrupt:
        print("Interrupted")
1 Like

this might help:

Hi! Thank you for your answer, this is one of my inspirations already. Though it is useful for the parallel part, I don’t know how (not even if it is possible) to add the asynchronous part of triggering the subflows through a RabbitMQ queue.
The objective is to have the primary flow handle the messages and trigger subflows with parameters from the messages. This also prevents me from starting “n” subflows at the same time as was done in the example, I need them to be started on the reception of the message.

maybe try this pattern then?

from prefect.deployments import run_deployment


for _ in range(1, 100):
    run_deployment(name="flow/deployment", timeout=0)

Thanks for this! This pattern works, though I needed to add an await in front of the run_deployment command for it to have an effect.
Dispatcher:

import aiormq
import aio_pika
import json
from logging import Logger
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment

finished: bool = False

@flow()
async def dispatcher() -> None:
    async def callback(msg: aio_pika.abc.AbstractIncomingMessage):
        global finished
        logger.info("Message received")
        async with msg.process():
            if msg.body == b"exit":
                finished = True
            logger.info(json.dumps(json.loads(msg.body.decode(encoding="utf-8"))))
            await run_deployment(
                name="handler/handler",
                timeout=0,
                parameters=json.loads(
                    msg.body.decode(encoding="utf-8")
                )
            )
    logger: Logger = get_run_logger()
    connection = await aio_pika.connect_robust(host="<rabbit_mq_address>")
    logger.info("Connected to RabbitMQ")
    queue_name: str = "Test_channel"
    channel = await connection.channel()
    logger.info("Connected to channel")
    await channel.set_qos(prefetch_count=100)
    try:
        await channel.queue_delete(queue_name=queue_name)
    except aiormq.exceptions.ChannelNotFoundEntity:
        pass
    queue = await channel.declare_queue(queue_name, auto_delete=True)
    logger.info("Connected to queue")

    while not finished:
        await queue.consume(callback)

if __name__ == "__main__":
    dispatcher()

Handler:

from datetime import datetime
from logging import Logger
from prefect import flow, task, get_run_logger
import time
from typing import Dict, Optional

@task
def waiter(seconds: int) -> None:
    time.sleep(seconds)

@flow()
def handler(params: Optional[Dict] = None) -> None:
    if params is None:
        params = {}
    logger: Logger = get_run_logger()
    for k, v in params.items():
        logger.info(f"Pair {k}: {v}")
    waiter(params.get("time", 0))

if __name__ == "__main__":
    handler()

These codes allow to send messages like {"params": {"time": 10}} to make the handler wait 10 seconds. The handler subflows are then started at message reception, and are not blocking the main flow listening of the queue. This indeed makes them asynchronous and parallel.
This yields some warnings about task names being the same for the task handler/handler which is a bit weird considering it is a flow, but it isn’t a blocking error, so, although I’d rather have it disappear, it is not that bad.

1 Like

nice work!

maybe try this to avoid run name duplication:

waiter.with_options(name="some_new_name")(params.get("time", 0))

The issue does not arise from the waiter task, but from the handler task (which is in fact my secondary flow). This is started through the run_deployment method, in which I am de facto obligated to use the name handler/handler as it is the name of my handler deployment. Am I missing something?

I do :sweat_smile: run_deployment is not a task, dunno why this happens

you can try providing extra parameter to each run_deployment call and set it as flow_run_name on run_deployment

image

The warning still appears:

/~/anaconda3/envs/copernicus_tasking/lib/python3.9/site-packages/prefect/tasks.py:201: 
UserWarning: A task named 'handler/handler' and defined at '/~/anaconda3/envs/copernicus_tasking/lib/python3.9/site-packages/prefect/deployments.py:100' conflicts with another task. 
Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(

What baffles me though is the fact that it only appears the second time the secondary flow is started (thus the first time there could be conflict), but not the subsequent times…

1 Like

given that it’s “just a warning” you can ignore it for now. I opened an issue to suppress that

1 Like

Thank you for your help!

1 Like

When i’m trying to run the run_deployment functions it’s returning me a coroutine object.


params = {
  "mailing_list": "nishant.singh@shadowfax.in",
  "hub_id": 365,
  "city_id": 2}


deployment_name = 'default'
flow_run = run_deployment(
   name = "base_data_fetching_flow/" + deployment_name,
   parameters = params,
   flow_run_name = "my_run",
   timeout=0
)
print(flow_run.id)

error: AttributeError: ‘coroutine’ object has no attribute ‘id’