Trouble executing subflows concurrently with DaskTaskRunner

I am attempting to execute several subflows concurrently using the DaskTaskRunner with ephemeral Dask clusters that are configured to use the SlurmCluster in dask_jobqueue. My objective is to leverage the capability to run each subflow on a Slurm cluster, which can be imported and executed asynchronously.

To validate this approach, I have implemented the following code snippet:

import asyncio

from prefect import flow, task, get_run_logger
from executors import get_slurm_task_runner


@task(name="square", description="Returns the square of input number")
def get_square(num: int) -> int:
    squared = num**2

    return squared


@task(name="plus1", description="Add 1 to the input number")
def increment_num(num: int) -> int:
    result = num + 1

    return result


def build_subflow():
    @flow(description="Take square of a given number and add 1", task_runner=get_slurm_task_runner(memory="1 GB"))
    async def square_plus1(num: int) -> int:
        logger = get_run_logger()
        logger.info(f"Input number: {num}")
        squared = get_square.submit(num).result()
        logger.info(f"Squared: {squared}")
        incremented = increment_num.submit(squared).result()
        logger.info(f"Incremented: {incremented}")

        return incremented
    return square_plus1


@flow(description="Take square of a given array and add 1")
async def square_plus1_list(num_list: list[int]) -> list[int]:
    results = await asyncio.gather(*[build_subflow()(num) for num in num_list])
    return results


if __name__ == "__main__":
    new_nums = asyncio.run(square_plus1_list([1, 2, 3]))
    print(new_nums)

The get_slurm_task_runner() function is a simple wrapper that returns a properly configured DaskTaskRunner. The synchronous version of this code executed successfully, so hopefully there is no issue with the function. To execute the same subflow with varying parameters, I have used the build_subflow() function, as suggested in this comment.

However, when I attempt to execute the code, the full log indicates that the execution gets stuck indefinitely at a certain point:

10:00:14.313 | INFO    | prefect.engine - Created flow run 'majestic-chihuahua' for flow 'square-plus1-list'
~/conda/envs/py310/lib/python3.10/site-packages/prefect/flows.py:214: UserWarning: A flow named 'square-plus1' and defined at '~/testbed/prefect_executors/forloop.py:53' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
~/conda/envs/py310/lib/python3.10/site-packages/prefect/flows.py:214: UserWarning: A flow named 'square-plus1' and defined at '~/testbed/prefect_executors/forloop.py:53' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
~/conda/envs/py310/lib/python3.10/site-packages/prefect/flows.py:214: UserWarning: A flow named 'square-plus1' and defined at '~/testbed/prefect_executors/forloop.py:53' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
~/conda/envs/py310/lib/python3.10/site-packages/prefect/tasks.py:270: UserWarning: A task named 'square-plus1' and defined at '~/testbed/prefect_executors/forloop.py:53' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
10:00:15.152 | INFO    | Flow run 'majestic-chihuahua' - Created subflow run 'giga-oriole' for flow 'square-plus1'
10:00:15.160 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `executors.get_slurm_cluster`
10:00:15.190 | INFO    | distributed.http.proxy - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
10:00:15.197 | INFO    | distributed.scheduler - State start
10:00:15.205 | INFO    | distributed.scheduler -   Scheduler at:  tcp://xxx.xxx.xxx.xxx:34497
10:00:15.208 | INFO    | distributed.scheduler -   dashboard at:                     :8787
10:00:15.229 | INFO    | Flow run 'majestic-chihuahua' - Created subflow run 'chirpy-pillbug' for flow 'square-plus1'
10:00:15.232 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `executors.get_slurm_cluster`
~/conda/envs/py310/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 35247 instead
  warnings.warn(
10:00:15.237 | INFO    | distributed.scheduler - State start
10:00:15.241 | INFO    | distributed.scheduler -   Scheduler at:  tcp://xxx.xxx.xxx.xxx:36965
10:00:15.241 | INFO    | distributed.scheduler -   dashboard at:                    :35247
10:00:15.999 | INFO    | distributed.scheduler - Receive client connection: Client-18783984-b63a-11ed-8fc5-b496915d3afc
10:00:16.000 | INFO    | distributed.core - Starting established connection to tcp://xxx.xxx.xxx.xxx:52872
10:00:16.006 | INFO    | distributed.scheduler - Receive client connection: Client-187861d0-b63a-11ed-8fc5-b496915d3afc
10:00:16.006 | INFO    | distributed.core - Starting established connection to tcp://xxx.xxx.xxx.xxx:42012
10:00:16.008 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://xxx.xxx.xxx.xxx:8787/status
10:00:16.011 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://xxx.xxx.xxx.xxx:35247/status
10:00:16.098 | INFO    | Flow run 'majestic-chihuahua' - Created subflow run 'meaty-squid' for flow 'square-plus1'
10:00:16.103 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `executors.get_slurm_cluster`
~/conda/envs/py310/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 44257 instead
  warnings.warn(
10:00:16.109 | INFO    | distributed.scheduler - State start
10:00:16.112 | INFO    | distributed.scheduler -   Scheduler at:  tcp://xxx.xxx.xxx.xxx:32783
10:00:16.112 | INFO    | distributed.scheduler -   dashboard at:                    :44257
10:00:16.409 | INFO    | distributed.scheduler - Receive client connection: Client-191d0de7-b63a-11ed-8fc5-b496915d3afc
10:00:16.410 | INFO    | distributed.core - Starting established connection to tcp://xxx.xxx.xxx.xxx:32980
10:00:16.411 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://xxx.xxx.xxx.xxx:44257/status
10:00:16.431 | INFO    | Flow run 'chirpy-pillbug' - Input number: 3
~/conda/envs/py310/lib/python3.10/site-packages/prefect/utilities/asyncutils.py:258: UserWarning: `sync` called from an asynchronous context; you should `await` the async function directly instead.
  warnings.warn(
^C10:02:22.878 | INFO    | distributed.core - Event loop was unresponsive in Scheduler for 126.46s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
10:02:22.899 | INFO    | distributed.core - Event loop was unresponsive in Scheduler for 126.48s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
10:02:22.919 | INFO    | distributed.core - Event loop was unresponsive in Scheduler for 126.49s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

It appears that the problem arises from the attempt to create multiple instances of DaskTaskRunner. I am uncertain as to why there is no log output for logger.info(f"Squared: {squared}"), which may suggest an issue with the execution of the get_square.submit(num) task.