Best approach to handle parallel problem

Hi all,

I need to download 32 pairs of files (actual, huge file and a checksum file) from an sftp server, process them (they are text files I have to turn to parquet) and then upload to hdfs. Currently, I’m doing all of that with a shell script that has a lot of xargs -P4 -n1 calls, and prefect executes that file. I’d like to move away from shell and have everything in prefect.

To that end, I came up with the following solution, but I’m not happy with it, and looking for ways how to improve it. Bellow is only a rough skeleton. What I don’t show is how I limit concurrency: I’ve set up task concurrency for tag eod to 8, so only 8 tasks run in parallel.

What I don’t like is:

  • individual subflows are not cancellable
  • when one fails, the whole flow is marked as failed, yet some individual subflows remain marked blue as if they are still running
  • not possible to see which tasks are actually running

What approaches others use to tackle these kinds of problems?

import asyncio
import os
import shutil
import subprocess
from pathlib import Path
from typing import Optional

import pendulum
from prefect import flow, get_run_logger, tags, task
from prefect.task_runners import ConcurrentTaskRunner

from flows.common.hdfs import cluster

prefix = "/mnt/data"
PRIMARY1 = "xxx"

def out_folder(date_id: int) -> str:
    return f"{prefix}/raw/{date_id}"


def base_name(date_id: int, stream_id: int, kind: str) -> str:
    return f"XXX_{kind.title()}_{date_id}_{stream_id:02d}"

def download_file(date_id: int, stream_id: int, kind: str):
    pass
    #process = subprocess.check_output(...)

def checksum_file(date_id: int, stream_id: int, kind: str):
    pass
    #process = subprocess.check_output(...)


@task(retries=6, retry_delay_seconds=1800, task_run_name="fetch-{kind}-{stream_id}-{date_id}")
def fetch_from_sftp(date_id: int, stream_id: int, kind: str):
    download_file(date_id, stream_id, kind)
    checksum_file(date_id, stream_id, kind)


@task(task_run_name="transform-{kind}-{stream_id}-{date_id}")
def transform_data(date_id: int, stream_id: int, kind: str):
    pass
    # call
    # process = subprocess.Popen(...)
    # to process text files and turn to parquet
    # monitors stdout pipe to show progress

def upload_dir(fs, logger, local_path, remote_path):
    """Upload dir to remote, as pyarrow can't do in one go."""
    pass


@task(retries=3, retry_delay_seconds=1200, task_run_name="upload-{kind}-{stream_id}-{date_id}")
def upload_to_hdfs(date_id: int, stream_id: int, kind: str):
    upload_dir(cluster.hdfs, logger, Path(processed_local_folder), Path(parquet_out_folder))
    shutil.rmtree(processed_local_folder)
    return True


@flow(task_runner=ConcurrentTaskRunner())
async def process_single(date_id: int, stream_id: int, kind: str):
    with tags("eod"):
        fetcher = fetch_from_sftp.submit(date_id, stream_id, kind)
        transformer = transform_data.submit(date_id, stream_id, kind, wait_for=[fetcher])
        upload_to_hdfs.submit(date_id, stream_id, kind, wait_for=[transformer])


@flow(task_runner=ConcurrentTaskRunner())
async def process_day(date_id: Optional[int] = None, t: int = 0):
    date_id = gen_date_id(date_id, t)

    futures = []
    for kind in (
        "requests",
        "confirmations",
    ):
        for stream_id in range(16):
            stream_id = stream_id + 1
            futures.append(
                process_single.with_options(name=f"{kind}_stream_id_{stream_id}")(date_id, stream_id, kind)
            )

    await asyncio.gather(*futures)


if __name__ == "__main__":
    main_flow_state = asyncio.run(process_day())