Hi, unfortunately not.
I had a great discussion with Nate, summary of the problem on my end:
Hi,
I have 2k shell commands to run - I divide them to batches of 8 commands and launch a flow(similar to process_pokemon_batch
in this orchestrator recipe) for each–> 250 flows, each running 8 commands.
Now I want each flow to run the 8 commands in parallel, each one using it’s own CPU.
It works great when I test it locally with concurrent.futures.ProcessPoolExecutor
or multiprocessing.Pool
but get stuck on the ECS agent.You could think of it as if the get_pokemon_info
is a CPU bound command that takes 1 minute to run and therefore it makes sense to run these in parallel (not like the current get_pokemon_info
, which is i/o bound and is a pretty light API call and therefore makes sense to run concurrently using async).So my question is how can I enable my own parallelism? (Great comment about Concurrency versus parallelism is in the Prefect docs)
I’ve shared the following minimal example from my end:
import functools
import multiprocessing
import subprocess
@flow(name="run_commands", flow_run_name="Run commands", validate_parameters=False)
async def flow_run_commands(shell_commands: List[List[str]], local_path: str) -> None:
num_cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=num_cores)
logger.info(f"Running commands with {num_cores} cores")
for cmd in shell_commands:
partial = functools.partial(run_command, cmd, local_path)
pool.apply_async(partial)
pool.close()
pool.join()
def run_command(cmd: List[str], local_path: str) -> None:
print(f"Running {cmd} at {local_path}")
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
stdout_b, stderr_b = process.communicate()
Nate suggested using Dask and given the following example:
from typing import List
from dask.distributed import Client, as_completed
import subprocess
from prefect import flow
@flow(name="run_commands", flow_run_name="Run commands")
async def flow_run_commands(shell_commands: List[List[str]], local_path: str) -> None:
client = Client() # Start a local Dask client
num_cores = len(client.ncores()) # Get number of cores
print(f"Running commands with {num_cores} cores")
# Create a list of tasks
tasks = [client.submit(run_command, cmd, local_path) for cmd in shell_commands]
# Get the results
for future in as_completed(tasks):
result = future.result()
print(result)
def run_command(cmd: List[str], local_path: str) -> dict:
print(f"Running {cmd} at {local_path}")
process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
stdout = process.stdout.decode("utf-8")
stderr = process.stderr.decode("utf-8")
return {"stdout": stdout, "stderr": stderr}
if __name__ == "__main__":
import asyncio
asyncio.run(
flow_run_commands(
shell_commands=[
["echo", "Hello world!"],
["echo", "Hello world!"],
["echo", "Hello world!"],
["echo", "Hello world!"],
["echo", "Hello world!"],
],
local_path="/tmp",
)
)
Which outputs:
09:01:56.311 | INFO | prefect.engine - Created flow run ‘capable-marmot’ for flow ‘run_commands’
09:01:57.000 | INFO | distributed.http.proxy - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
09:01:57.003 | INFO | distributed.scheduler - State start
09:01:57.006 | INFO | distributed.scheduler - Scheduler at: tcp://127.0.0.1:64581
09:01:57.007 | INFO | distributed.scheduler - dashboard at: 127.0.0.1:8787
Running commands with 4 cores
Running [‘echo’, ‘Hello world!’] at /tmp
{‘stdout’: ‘Hello world!\n’, ‘stderr’: ‘’}
{‘stdout’: ‘Hello world!\n’, ‘stderr’: ‘’}
{‘stdout’: ‘Hello world!\n’, ‘stderr’: ‘’}
{‘stdout’: ‘Hello world!\n’, ‘stderr’: ‘’}
{‘stdout’: ‘Hello world!\n’, ‘stderr’: ‘’}
09:01:58.912 | INFO | Flow run ‘capable-marmot’ - Finished in state Completed()
I’ve tried incorporating Nate’s example with Dask into my code and hit TONS of errors, among them:
distributed.worker - ERROR - Scheduler was unaware of this worker '
tcp://127.0.0.1:44113
'. Shutting down.
distributed.core - Event loop was unresponsive in Worker for 37.67s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability
ERROR | distributed.scheduler - Couldn't gather keys {'async_run_command-e861b9bc963ae0ddb86bbe123dfc1ad5': ['
tcp://127.0.0.1:34995
']} state: ['memory'] workers: ['
tcp://127.0.0.1:34995
']
distributed.scheduler - ERROR - Shut down workers that don't have promised key: ['
tcp://127.0.0.1:34995
']
WARNING | distributed.nanny - Worker process still alive after 3.1999989318847657 seconds, killing
So this is where I’m stuck now, still looking for the best approach.
Ideally, I won’t have to incorporate Dask, given the overhead and the errors it currently outputs. I don’t see why Python’s native multiprocessing can’t be supported out of the box.