RayTaskRunner freezes when writing data to a local directory

I have run flow successfully with task_runner=DaskTaskRunner(cluster_kwargs=dict(n_workers=8)). I replaced this with task_runner=RayTaskRunner(init_kwargs=dict(num_cpus=8)). Should this work or is there some other configuration required?

The log shows 68 tasks submitted and 14 tasks completed then stops reporting. The orion server reports 10 tasks completed. The flow is still showing as running but no tasks are running and no further progress is made.

I would check Ray dahsboard but cannot see this - probably a separate ip address issue.

Thanks for checking that. RayTaskRunner is finished and ready for use. If you experience any bugs, it’s best to create a bug report as a GitHub issue, but you can also post it here, and I can create an issue, too.

So far, I’ve tested this with a simple flow from the docs, and everything seems to be working fine:

from prefect import flow, task, get_run_logger
from prefect.task_runners import RayTaskRunner


@task
def say_hello(name):
    logger = get_run_logger()
    logger.info("hello %s", name)


@task
def say_goodbye(name):
    logger = get_run_logger()
    logger.info("goodbye %s", name)


@flow(task_runner=RayTaskRunner())
def greetings(names):
    for name in names:
        say_hello(name)
        say_goodbye(name)


if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])

I can see Ray dashboard: http://127.0.0.1:8265/

Extending the same flow to over 700 tasks also works fine:

from prefect import flow, task, get_run_logger
from prefect.task_runners import RayTaskRunner


@task
def say_hello(name):
    logger = get_run_logger()
    logger.info("hello %s", name)


@task
def say_goodbye(name):
    logger = get_run_logger()
    logger.info("goodbye %s", name)


@flow(task_runner=RayTaskRunner())
def greetings(names):
    for name in names:
        say_hello(name)
        say_goodbye(name)


if __name__ == "__main__":
    text = """Am if number no up period regard sudden better. Decisively surrounded all admiration and not you. Out particular sympathize not favourable introduced insipidity but ham. Rather number can and set praise. Distrusts an it contented perceived attending oh. Thoroughly estimating introduced stimulated why but motionless.
    
    Why end might ask civil again spoil. She dinner she our horses depend. Remember at children by reserved to vicinity. In affronting unreserved delightful simplicity ye. Law own advantage furniture continual sweetness bed agreeable perpetual. Oh song well four only head busy it. Afford son she had lively living. Tastes lovers myself too formal season our valley boy. Lived it their their walls might to by young.
    
    Possession her thoroughly remarkably terminated man continuing. Removed greater to do ability. You shy shall while but wrote marry. Call why sake has sing pure. Gay six set polite nature worthy. So matter be me we wisdom should basket moment merely. Me burst ample wrong which would mr he could. Visit arise my point timed drawn no. Can friendly laughter goodness man him appetite carriage. Any widen see gay forth alone fruit bed.
    
    Eat imagine you chiefly few end ferrars compass. Be visitor females am ferrars inquiry. Latter law remark two lively thrown. Spot set they know rest its. Raptures law diverted believed jennings consider children the see. Had invited beloved carried the colonel. Occasional principles discretion it as he unpleasing boisterous. She bed sing dear now son half.
    
    Am finished rejoiced drawings so he elegance. Set lose dear upon had two its what seen. Held she sir how know what such whom. Esteem put uneasy set piqued son depend her others. Two dear held mrs feet view her old fine. Bore can led than how has rank. Discovery any extensive has commanded direction. Short at front which blind as. Ye as procuring unwilling principle by.
    
    Sportsman delighted improving dashwoods gay instantly happiness six. Ham now amounted absolute not mistaken way pleasant whatever. At an these still no dried folly stood thing. Rapid it on hours hills it seven years. If polite he active county in spirit an. Mrs ham intention promotion engrossed assurance defective. Confined so graceful building opinions whatever trifling in. Insisted out differed ham man endeavor expenses. At on he total their he songs. Related compact effects is on settled do.
    
    Needed feebly dining oh talked wisdom oppose at. Applauded use attempted strangers now are middleton concluded had. It is tried no added purse shall no on truth. Pleased anxious or as in by viewing forbade minutes prevent. Too leave had those get being led weeks blind. Had men rose from down lady able. Its son him ferrars proceed six parlors. Her say projection age announcing decisively men. Few gay sir those green men timed downs widow chief. Prevailed remainder may propriety can and.
    
    No depending be convinced in unfeeling he. Excellence she unaffected and too sentiments her. Rooms he doors there ye aware in by shall. Education remainder in so cordially. His remainder and own dejection daughters sportsmen. Is easy took he shed to kind.
    
    On projection apartments unsatiable so if he entreaties appearance. Rose you wife how set lady half wish. Hard sing an in true felt. Welcomed stronger if steepest ecstatic an suitable finished of oh. Entered at excited at forming between so produce. Chicken unknown besides attacks gay compact out you. Continuing no simplicity no favourable on reasonably melancholy estimating. Own hence views two ask right whole ten seems. What near kept met call old west dine. Our announcing sufficient why pianoforte.
    
    Do am he horrible distance marriage so although. Afraid assure square so happen mr an before. His many same been well can high that. Forfeited did law eagerness allowance improving assurance bed. Had saw put seven joy short first. Pronounce so enjoyment my resembled in forfeited sportsman. Which vexed did began son abode short may. Interested astonished he at cultivated or me. Nor brought one invited she produce her.
    """
    random_strings = text.replace(".", "").split(" ")
    greetings(random_strings)

In the end, I’m getting logs:

(begin_task_run pid=66112) 13:54:10.605 | INFO    | Task run 'say_hello-d71d0552-726' - Finished in state Completed(None)
(begin_task_run pid=66109) 13:54:10.815 | INFO    | Task run 'say_goodbye-18d6ba96-726' - Finished in state Completed(None)
13:54:25.311 | INFO    | Flow run 'whispering-chital' - Finished in state Completed('All states completed.')

The same with this flow:

from prefect import flow, task
from prefect import get_run_logger
from prefect.task_runners import RayTaskRunner
import time


@task
def compute_something(x):
    logger = get_run_logger()
    logger.info("Computing: %d x 2 = %d", x, x * 2)
    time.sleep(2)


@flow(task_runner=RayTaskRunner())
def ray_flow():
    for i in range(100):
        compute_something(i)


if __name__ == "__main__":
    ray_flow()

logs:

14:17:34.522 | INFO    | prefect.engine - Created flow run 'real-wildebeest' for flow 'ray-flow'
14:17:34.523 | INFO    | Flow run 'real-wildebeest' - Using task runner 'RayTaskRunner'
14:17:34.525 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
2022-04-06 14:17:37,671	INFO services.py:1412 -- View the Ray dashboard at http://127.0.0.1:8265
14:17:40.655 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
14:17:40.655 | INFO    | prefect.task_runner.ray - The Ray UI is available at 127.0.0.1:8265
14:17:41.048 | INFO    | Flow run 'real-wildebeest' - Created task run 'compute_something-86d821e8-0' for task 'compute_something'
14:17:41.282 | INFO    | Flow run 'real-wildebeest' - Created task run 'compute_something-86d821e8-1' for task 'compute_something'
14:17:41.460 | INFO    | Flow run 'real-wildebeest' - Created task run 'compute_something-86d821e8-2' for task 'compute_something'
14:17:41.644 | INFO    | Flow run 'real-wildebeest' - Created task run 'compute_something-86d821e8-3' for task 'compute_something'
...
(begin_task_run pid=69201) 14:18:26.912 | INFO    | Task run 'compute_something-86d821e8-16' - Computing: 16 x 2 = 32
(begin_task_run pid=69203) 14:18:26.881 | INFO    | Task run 'compute_something-86d821e8-40' - Computing: 40 x 2 = 80
(begin_task_run pid=69205) 14:18:26.941 | INFO    | Task run 'compute_something-86d821e8-47' - Computing: 47 x 2 = 94
(begin_task_run pid=69202) 14:18:26.953 | INFO    | Task run 'compute_something-86d821e8-78' - Computing: 78 x 2 = 156
(begin_task_run pid=69201) 14:18:29.846 | INFO    | Task run 'compute_something-86d821e8-16' - Finished in state Completed(None)
(begin_task_run pid=69203) 14:18:29.846 | INFO    | Task run 'compute_something-86d821e8-40' - Finished in state Completed(None)
(begin_task_run pid=69205) 14:18:29.903 | INFO    | Task run 'compute_something-86d821e8-47' - Finished in state Completed(None)
(begin_task_run pid=69202) 14:18:29.897 | INFO    | Task run 'compute_something-86d821e8-78' - Finished in state Completed(None)
14:18:37.244 | INFO    | Flow run 'real-wildebeest' - Finished in state Completed('All states completed.')

Can you create a Github issue if you see some bug here, or share your code that causes issues so that I can try to reproduce it?

Thanks. Tried running your examples and they worked! However adapted them and it seems to be that it freezes when there are a lot of tasks with dependencies.

1 Like

Thanks for the issue! I’ll respond there

:point_right: Note to anyone using DaskTaskRunner or RayTaskRunner:

from prefect version 2.0b8 onwards, those task runners were moved to the respective Prefect Collections for better code dependency management (the core library no longer requires dask or ray as dependencies - now, those can be installed sepataely when needed).

The correct imports are now:

from prefect_dask import DaskTaskRunner
from prefect_ray import RayTaskRunner