Edgecase Error: "Prefect Task run TASK_RUN_ID already finished"

Hi,

while using Prefect to transcode some satellite images from TIFF to COG format, we saw a very peculiar behavior.

As a side note: We are using a Ray Cluster on Anyscale to handle the expected load (~1TB of images, roughly 250 CPUs estimated).

For the first ~25 minutes, everything worked as expected. Task runs are scheduled and executed as expected. New tasks change their status from “pending” to “running”.

Then, without any apparent reason or change to the setup, more and more tasks are showing up that all stay in the status “running” forever, have 0s as run time, and display the info message “Prefect Task run TASK_RUN_ID already finished” (see screenshot below).

At the same time, no new task runs are executed, and they stay “pending” until the flow run is canceled manually.

Unfortunately, I could not reproduce the error with a “simple” flow, maybe it only happens when the orchestrator is pushed to its limits? You can find our flow below in this post.

I was also unable to find anything about the “Prefect Task run TASK_RUN_ID already finished” error/behavior online.
Does anybody know what the context for this is?


Flow code:

import time

import boto3
import rasterio

from anyio import to_thread

from prefect import flow, task, get_run_logger
from prefect.utilities.asyncutils import sync_compatible
from prefect_ray import RayTaskRunner

from rasterio.session import AWSSession
from rio_cogeo import cog_profiles
from rio_cogeo.cogeo import cog_translate

# https://docs.anyscale.com/reference/anyscale-client-api#specifying-cluster-properties
RayAnyscaleTaskRunner = RayTaskRunner(
    address="anyscale://toby-spot-v3",
    init_kwargs={
        "autosuspend": 45,  # minutes
        "cluster_env": "toby-ray-exp:4",
        "cluster_compute": "toby-spot-3"
    },
)


# ======================================================================================================================
# ============================== Cloud-Optimized-GeoTiff
# ======================================================================================================================
# @ray.remote
def translate_to_cog(src_path, dst_s3_bucket, dst_s3_key, cog_config_profile, cog_config_predictor, profile_options={},
                     **options):

    # https://gdal.org/drivers/raster/gtiff.html#creation-options
    output_profile = cog_profiles.get(cog_config_profile)
    output_profile.update(dict(
        BIGTIFF="IF_SAFER",
        PREDICTOR=cog_config_predictor,
    ))
    output_profile.update(profile_options)

    config = dict(
        GDAL_NUM_THREADS="ALL_CPUS",
        GDAL_TIFF_INTERNAL_MASK=True,
    )

    with rasterio.io.MemoryFile() as mem_dst:
        cog_translate(
            src_path,
            mem_dst.name,
            output_profile,
            config=config,
            in_memory=False,  # should also work inMemory but crashes my laptop & ray cluster for some reason
            quiet=False,
            **options,
        )

        client = boto3.client("s3")
        client.upload_fileobj(mem_dst, dst_s3_bucket, dst_s3_key)

    # Just to validate whether everything worked
    with rasterio.open(f"s3://{dst_s3_bucket}/{dst_s3_key}") as src:
        print(src.profile)
        return src.profile


@sync_compatible
async def increase_limit(n):
    to_thread.current_default_thread_limiter().total_tokens = n


# ======================================================================================================================
# ============================== Prefect
# ======================================================================================================================
@task(retries=3, retry_delay_seconds=15)
def tiff_to_cog(src_path, dst_s3_bucket, dst_s3_key, cog_config_profile, cog_config_predictor):
    logger = get_run_logger()

    aws_session = AWSSession(requester_pays=True)

    with rasterio.Env(aws_session):
        with rasterio.open(src_path) as src:
            logger.info(src.profile)
            translate_to_cog(src, dst_s3_bucket, dst_s3_key, cog_config_profile, cog_config_predictor)

    return True


@task
def get_src_files(src_bucket: str, src_prefix: str):
    logger = get_run_logger()

    logger.info(f"{src_bucket=}; {src_prefix=}")
    src_paths = []

    client = boto3.client('s3')
    resp = client.list_objects_v2(
        Bucket=src_bucket,
        Prefix=src_prefix,
        RequestPayer='requester',
    )

    input_images = filter(lambda obj: obj['Key'].endswith(('.jp2', '.tiff', '.tif')), resp['Contents'])

    for image in input_images:
        src_paths.append(dict(
            bucket=src_bucket,
            key=image['Key'],
            size=image['Size']
        ))

    return src_paths


@task
def get_sts_identity():
    sts_identity = boto3.client('sts').get_caller_identity()
    return sts_identity


@flow(name="cog-translate", task_runner=RayAnyscaleTaskRunner)
# @flow(name="cog-translate")
def main(
        src_bucket: str,
        dst_bucket: str,
        src_prefix: str = "",
        dry_run: bool = True,
        cog_config_profile: str = "DEFLATE",
        cog_config_predictor: str = "2",
):
    logger = get_run_logger()

    # Work around Prefect AnyIO thread limit of 40
    # https://prefect-community.slack.com/archives/CL09KU1K7/p1666888248841549?thread_ts=1666017309.670609&cid=CL09KU1K7
    # TODO: Revisit after Michael Adkins had a look
    increase_limit(5000)

    sts_identity = get_sts_identity.submit().result()
    logger.info(f"{sts_identity['UserId']=} on {sts_identity['Account']=}")

    src_objs = get_src_files.submit(src_bucket, src_prefix).result()

    src_size_gb = round((sum(image['size'] for image in src_objs)) / 1024 / 1024 / 1024, 4)
    src_obj_num = len(src_objs)
    logger.info(
        f"The supplied flow parameters are queuing {src_obj_num} input images, with a total size of {src_size_gb}GB "
        f"for conversion to COG"
    )

    if dry_run:
        logger.info("DRYRUN. Exiting Flow.")
        return True

    for src_obj in src_objs:
        src_path = f"s3://{src_obj['bucket']}/{src_obj['key']}"
        logger.info(f"{src_path=}")
        time.sleep(0.2)  # to not overload the Prefect Cloud API
        tiff_to_cog.submit(
            src_path=src_path,
            dst_s3_bucket=dst_bucket,
            dst_s3_key=f"{src_obj['key']}.cog.tif",
            cog_config_profile=cog_config_profile,
            cog_config_predictor=cog_config_predictor
        )

    return True


# ======================================================================================================================
# ============================== Local Exec Config
# ======================================================================================================================
if __name__ == "__main__":
    # roughly 500 task runs (images) with a total size of ~890GB
    # src_bucket is public, dst_bucket needs to be accessible by IAM user/role that is executing the flow
    main(src_bucket="spacenet-dataset", dst_bucket="ray-cluster-test-bucket", dry_run=False)

Thanks for flagging this, Toby. I’ll share this with the team.

1 Like

Quick update: After going through the logs again, I found this peculiar error message that could be connected:

08:54:34.554 | ERROR   | Flow run 'olivine-hound' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/tobias/.cache/pypoetry/virtualenvs/ray-toby-tests-hzXs9SHv-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 582, in orchestrate_flow_run
    waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
  File "/home/tobias/.cache/pypoetry/virtualenvs/ray-toby-tests-hzXs9SHv-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 1254, in wait_for_task_runs_and_report_crashes
    if not state.type == StateType.CRASHED:
AttributeError: 'coroutine' object has no attribute 'type'
08:54:34.564 | ERROR   | Flow run 'olivine-hound' - Crash detected! Execution was interrupted by an unexpected exception: AttributeError: 'coroutine' object has no attribute 'type'

Traceback (most recent call last):
  File "/home/tobias/projects/dataops/ray-toby-tests/liveeo/prefect_flow_template_flows/example_flow/philipp_flow.py", line 174, in <module>
    main(src_bucket="spacenet-dataset", dst_bucket="ray-cluster-test-bucket", dry_run=False)
  File "/home/tobias/.cache/pypoetry/virtualenvs/ray-toby-tests-hzXs9SHv-py3.9/lib/python3.9/site-packages/prefect/flows.py", line 439, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/tobias/.cache/pypoetry/virtualenvs/ray-toby-tests-hzXs9SHv-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 150, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/tobias/.cache/pypoetry/virtualenvs/ray-toby-tests-hzXs9SHv-py3.9/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/tobias/.cache/pypoetry/virtualenvs/ray-toby-tests-hzXs9SHv-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/home/tobias/anaconda3/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/tobias/anaconda3/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/tobias/.cache/pypoetry/virtualenvs/ray-toby-tests-hzXs9SHv-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/tobias/.cache/pypoetry/virtualenvs/ray-toby-tests-hzXs9SHv-py3.9/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/tobias/.cache/pypoetry/virtualenvs/ray-toby-tests-hzXs9SHv-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 222, in create_then_begin_flow_run
    state = await begin_flow_run(
  File "/home/tobias/.cache/pypoetry/virtualenvs/ray-toby-tests-hzXs9SHv-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 353, in begin_flow_run
    terminal_state = await orchestrate_flow_run(
  File "/home/tobias/.cache/pypoetry/virtualenvs/ray-toby-tests-hzXs9SHv-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 627, in orchestrate_flow_run
    await wait_for_task_runs_and_report_crashes(
  File "/home/tobias/.cache/pypoetry/virtualenvs/ray-toby-tests-hzXs9SHv-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 1254, in wait_for_task_runs_and_report_crashes
    if not state.type == StateType.CRASHED:
AttributeError: 'coroutine' object has no attribute 'type'

Additional information about dependencies:

❯ prefect version
Version:             2.6.5
API version:         0.8.3
Python version:      3.9.7
Git commit:          9fc2658f
Built:               Thu, Oct 27, 2022 2:24 PM
OS/Arch:             linux/x86_64
Profile:             dev
Server type:         cloud
# pyproject.toml
[tool.poetry]
name = "cog-translate"
version = "0.0.1"
description = ""

[tool.poetry.dependencies]
python = "^3.8"
prefect = "^2.6.3"
s3fs = "^2022.5.0"

[tool.poetry.group.flows.dependencies]
prefect-ray = "^0.2.1"
ray = "^2.0.0"
anyscale = "^0.5.47"
asyncio = "^3.4.3"
anyio = "^3.6.1"
lz4 = "^4.0.2"
rio-cogeo = "^3.4.1"
rio-viz = "^0.9.6"

[tool.poetry.group.dev.dependencies]
# dev dependencies
pre-commit = "^2.20.0"
1 Like

Thanks for much, keep adding if you find any new info. We have an ongoing WIP dedicated to address this kind of issues this month

1 Like

We’re seeing a very similar issue to this as well. We’re launching ~50 ray tasks, and for about 30 minutes everything seems to be working fine. Then, they start failing, or more commonly getting stuck as running with ‘task run already finished’. We’re running our flow in Azure Container Instances, and shortly after this starts happening the flow’s container is just promptly deleted for no reason. Very confused at the moment.

A notable difference from your case, @trahloff, is that for us every FAILED task actually has a non-zero duration. Many are however stuck as running with 0s duration. I’ll try to fiddle with the autoscaler to see if I can get around these issues, I’ll keep you posted if I manage to get it working.

1 Like

thanks for reporting this, keep us posted what you’ll find

@vholmer any chance you could get a minimal reproducible example of when this starts to happen and describe it in a GitHub issue? it would make it easier to discuss with other engineers what’s causing the problem

Hi Anna, sure, I’ll try. I’ll post an issue if I manage to create such an example.

1 Like

Here’s the github issue for my minimum example: Flow crashes after multiple hanged tasks with "Prefect Task run TASK_RUN_ID already finished" · Issue #58 · PrefectHQ/prefect-ray · GitHub

1 Like

@trahloff we just merged a fix for this Fix `RayTaskRunner` exception handling in Prefect >= 2.6.0 by rpeden · Pull Request #60 · PrefectHQ/prefect-ray · GitHub

1 Like

Great, thanks for the update @anna_geller!

1 Like