Prefect flow hangs when using pool.join()

Hi!

I’m running the following code:

@flow(name="run_commands", flow_run_name="Commands", validate_parameters=False)
def flow_run_commands(commands: List[List[str]], arg1: str, arg2: str) -> None:
    num_cores = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=num_cores)
    logger.info(f"Running commands with {num_cores} cores")
    for cmd in commands:
        partial = functools.partial(run_command, cmd, arg1, arg2)
        pool.apply_async(partial)

    logger.info("Waiting for commands to finish")
    pool.close()
    pool.join()
    logger.info("Done running commands")

When running locally or specifically the flow inside a unit test, it runs perfectly but when I run it remotely, it just prints “Waiting for commands to finish” and get stuck.

Any idea what am I doing wrong?

what is your remote infrastructure?

Hi Nate,
I’m using the TF ECS recipe with tiny changes: prefect-recipes/devops/infrastructure-as-code/aws/tf-prefect2-ecs-agent at main · PrefectHQ/prefect-recipes · GitHub
(It’s pretty awesome btw :slight_smile: )

I launch the agent with “prefect agent start --limit 1”, 8 CPU, 16GB mem.
I want it to run 8 commands in parallel under the same task.

Hi @ori-scala,
did you manage to fix your issue? I am afraid I am getting stuck into a similar situation where I have an external lib using python’s ProcessPoolExecutor and the processes never join

Hi, unfortunately not.

I had a great discussion with Nate, summary of the problem on my end:

Hi,
I have 2k shell commands to run - I divide them to batches of 8 commands and launch a flow(similar to process_pokemon_batch in this orchestrator recipe) for each–> 250 flows, each running 8 commands.
Now I want each flow to run the 8 commands in parallel, each one using it’s own CPU.
It works great when I test it locally with concurrent.futures.ProcessPoolExecutor or multiprocessing.Pool but get stuck on the ECS agent.You could think of it as if the get_pokemon_info is a CPU bound command that takes 1 minute to run and therefore it makes sense to run these in parallel (not like the current get_pokemon_info , which is i/o bound and is a pretty light API call and therefore makes sense to run concurrently using async).So my question is how can I enable my own parallelism? (Great comment about Concurrency versus parallelism is in the Prefect docs)

I’ve shared the following minimal example from my end:

import functools
import multiprocessing
import subprocess

@flow(name="run_commands", flow_run_name="Run commands", validate_parameters=False)
async def flow_run_commands(shell_commands: List[List[str]], local_path: str) -> None:
    num_cores = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=num_cores)
    logger.info(f"Running commands with {num_cores} cores")

    for cmd in shell_commands:
        partial = functools.partial(run_command, cmd, local_path)
        pool.apply_async(partial)

    pool.close()
    pool.join()

def run_command(cmd: List[str], local_path: str) -> None:
    print(f"Running {cmd} at {local_path}") 
    process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
    stdout_b, stderr_b = process.communicate()

Nate suggested using Dask and given the following example:

from typing import List
from dask.distributed import Client, as_completed
import subprocess

from prefect import flow

@flow(name="run_commands", flow_run_name="Run commands")
async def flow_run_commands(shell_commands: List[List[str]], local_path: str) -> None:
    client = Client()  # Start a local Dask client
    num_cores = len(client.ncores())  # Get number of cores
    print(f"Running commands with {num_cores} cores")

    # Create a list of tasks
    tasks = [client.submit(run_command, cmd, local_path) for cmd in shell_commands]

    # Get the results
    for future in as_completed(tasks):
        result = future.result()
        print(result)

def run_command(cmd: List[str], local_path: str) -> dict:
    print(f"Running {cmd} at {local_path}")
    process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
    stdout = process.stdout.decode("utf-8")
    stderr = process.stderr.decode("utf-8")
    return {"stdout": stdout, "stderr": stderr}

if __name__ == "__main__":
    import asyncio
    
    asyncio.run(
        flow_run_commands(
            shell_commands=[
                ["echo", "Hello world!"],
                ["echo", "Hello world!"],
                ["echo", "Hello world!"],
                ["echo", "Hello world!"],
                ["echo", "Hello world!"],
            ],
            local_path="/tmp",
        )
    )

Which outputs:

09:01:56.311 | INFO | prefect.engine - Created flow run ‘capable-marmot’ for flow ‘run_commands’
09:01:57.000 | INFO | distributed.http.proxy - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
09:01:57.003 | INFO | distributed.scheduler - State start
09:01:57.006 | INFO | distributed.scheduler - Scheduler at: tcp://127.0.0.1:64581
09:01:57.007 | INFO | distributed.scheduler - dashboard at: 127.0.0.1:8787
Running commands with 4 cores
Running [‘echo’, ‘Hello world!’] at /tmp
{‘stdout’: ‘Hello world!\n’, ‘stderr’: ‘’}
{‘stdout’: ‘Hello world!\n’, ‘stderr’: ‘’}
{‘stdout’: ‘Hello world!\n’, ‘stderr’: ‘’}
{‘stdout’: ‘Hello world!\n’, ‘stderr’: ‘’}
{‘stdout’: ‘Hello world!\n’, ‘stderr’: ‘’}
09:01:58.912 | INFO | Flow run ‘capable-marmot’ - Finished in state Completed()

I’ve tried incorporating Nate’s example with Dask into my code and hit TONS of errors, among them:

  1. distributed.worker - ERROR - Scheduler was unaware of this worker ' tcp://127.0.0.1:44113 '. Shutting down.
  2. distributed.core - Event loop was unresponsive in Worker for 37.67s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability
  3. ERROR | distributed.scheduler - Couldn't gather keys {'async_run_command-e861b9bc963ae0ddb86bbe123dfc1ad5': [' tcp://127.0.0.1:34995 ']} state: ['memory'] workers: [' tcp://127.0.0.1:34995 ']
  4. distributed.scheduler - ERROR - Shut down workers that don't have promised key: [' tcp://127.0.0.1:34995 ']
  5. WARNING | distributed.nanny - Worker process still alive after 3.1999989318847657 seconds, killing

So this is where I’m stuck now, still looking for the best approach.
Ideally, I won’t have to incorporate Dask, given the overhead and the errors it currently outputs. I don’t see why Python’s native multiprocessing can’t be supported out of the box.

1 Like

Adding @grigolet 's issue as well: Prefect 2 hangs when using ProcessPoolExecutor
and his great minimal example: GitHub - grigolet/prefect-demo: Demo for troubleshooting prefect issue

So this is where I’m stuck now, still looking for the best approach. Ideally, I won’t have to incorporate Dask, given the overhead and the errors it currently outputs. I don’t see why Python’s native multiprocessing can’t be supported out of the box.

Thanks a lot for the detailed report on your side!
It is true that is a bit weird that python’s native multiprocessing seems to not behave as we would like. However, I think this has to do with prefect 2, because with v1 I don’t have such issue

I guess it’s good to know that this was working on Prefect 1 and seems like a regression but we’ve kicked off with Prefect only a couple of months ago, directly using Prefect 2 and definitely not going to downgrade just to address this issue.
Truly hope someone from Prefect’s Engineering/Product team will pick this up.

thank you @grigolet and @ori-scala for the detailed discussion!

when possible, I will take a look at the examples linked above and try to understand if there’s something we can do here.

1 Like

Thanks Nate! Crossing my fingers here :slight_smile:

Hi @nate, how goes?
Any update on this one? It’s becoming a bigger obstacle for us with every day that passes and I’d be thrilled if we could get it solved.
Thanks,
Ori.

Also seeing this issue…trying to use multiprocessing.Pool starmap and our flow is just hanging when it hits this.

The interesting thing is that it runs fine locally and only hangs on our ECS infra.

1 Like

I had something similar, don’t know if this is right though, but I will share…

I took out pool.join() and increased memory to next setting for my process, and it worked for me.

Without pool.join(), the main thread won’t wait for the rest of the the processes to complete so I’m not sure how it would work for our use case…

ahh good point, sorry, my suggestion is way off haha

1 Like

sorry for the latency here @ori-scala

is this issue close enough to what you’re describing? or would you like to open a separate issue for this

this may require some research

Hi Nate, I think these are pretty similar, thanks for connecting between the 2. I’d be happy to test it as soon as the github issue is reported as closed and report back here.