Hi,
while using Prefect to transcode some satellite images from TIFF to COG format, we saw a very peculiar behavior.
As a side note: We are using a Ray Cluster on Anyscale to handle the expected load (~1TB of images, roughly 250 CPUs estimated).
For the first ~25 minutes, everything worked as expected. Task runs are scheduled and executed as expected. New tasks change their status from “pending” to “running”.
Then, without any apparent reason or change to the setup, more and more tasks are showing up that all stay in the status “running” forever, have 0s as run time, and display the info message “Prefect Task run TASK_RUN_ID already finished” (see screenshot below).
At the same time, no new task runs are executed, and they stay “pending” until the flow run is canceled manually.
Unfortunately, I could not reproduce the error with a “simple” flow, maybe it only happens when the orchestrator is pushed to its limits? You can find our flow below in this post.
I was also unable to find anything about the “Prefect Task run TASK_RUN_ID already finished” error/behavior online.
Does anybody know what the context for this is?
Flow code:
import time
import boto3
import rasterio
from anyio import to_thread
from prefect import flow, task, get_run_logger
from prefect.utilities.asyncutils import sync_compatible
from prefect_ray import RayTaskRunner
from rasterio.session import AWSSession
from rio_cogeo import cog_profiles
from rio_cogeo.cogeo import cog_translate
# https://docs.anyscale.com/reference/anyscale-client-api#specifying-cluster-properties
RayAnyscaleTaskRunner = RayTaskRunner(
address="anyscale://toby-spot-v3",
init_kwargs={
"autosuspend": 45, # minutes
"cluster_env": "toby-ray-exp:4",
"cluster_compute": "toby-spot-3"
},
)
# ======================================================================================================================
# ============================== Cloud-Optimized-GeoTiff
# ======================================================================================================================
# @ray.remote
def translate_to_cog(src_path, dst_s3_bucket, dst_s3_key, cog_config_profile, cog_config_predictor, profile_options={},
**options):
# https://gdal.org/drivers/raster/gtiff.html#creation-options
output_profile = cog_profiles.get(cog_config_profile)
output_profile.update(dict(
BIGTIFF="IF_SAFER",
PREDICTOR=cog_config_predictor,
))
output_profile.update(profile_options)
config = dict(
GDAL_NUM_THREADS="ALL_CPUS",
GDAL_TIFF_INTERNAL_MASK=True,
)
with rasterio.io.MemoryFile() as mem_dst:
cog_translate(
src_path,
mem_dst.name,
output_profile,
config=config,
in_memory=False, # should also work inMemory but crashes my laptop & ray cluster for some reason
quiet=False,
**options,
)
client = boto3.client("s3")
client.upload_fileobj(mem_dst, dst_s3_bucket, dst_s3_key)
# Just to validate whether everything worked
with rasterio.open(f"s3://{dst_s3_bucket}/{dst_s3_key}") as src:
print(src.profile)
return src.profile
@sync_compatible
async def increase_limit(n):
to_thread.current_default_thread_limiter().total_tokens = n
# ======================================================================================================================
# ============================== Prefect
# ======================================================================================================================
@task(retries=3, retry_delay_seconds=15)
def tiff_to_cog(src_path, dst_s3_bucket, dst_s3_key, cog_config_profile, cog_config_predictor):
logger = get_run_logger()
aws_session = AWSSession(requester_pays=True)
with rasterio.Env(aws_session):
with rasterio.open(src_path) as src:
logger.info(src.profile)
translate_to_cog(src, dst_s3_bucket, dst_s3_key, cog_config_profile, cog_config_predictor)
return True
@task
def get_src_files(src_bucket: str, src_prefix: str):
logger = get_run_logger()
logger.info(f"{src_bucket=}; {src_prefix=}")
src_paths = []
client = boto3.client('s3')
resp = client.list_objects_v2(
Bucket=src_bucket,
Prefix=src_prefix,
RequestPayer='requester',
)
input_images = filter(lambda obj: obj['Key'].endswith(('.jp2', '.tiff', '.tif')), resp['Contents'])
for image in input_images:
src_paths.append(dict(
bucket=src_bucket,
key=image['Key'],
size=image['Size']
))
return src_paths
@task
def get_sts_identity():
sts_identity = boto3.client('sts').get_caller_identity()
return sts_identity
@flow(name="cog-translate", task_runner=RayAnyscaleTaskRunner)
# @flow(name="cog-translate")
def main(
src_bucket: str,
dst_bucket: str,
src_prefix: str = "",
dry_run: bool = True,
cog_config_profile: str = "DEFLATE",
cog_config_predictor: str = "2",
):
logger = get_run_logger()
# Work around Prefect AnyIO thread limit of 40
# https://prefect-community.slack.com/archives/CL09KU1K7/p1666888248841549?thread_ts=1666017309.670609&cid=CL09KU1K7
# TODO: Revisit after Michael Adkins had a look
increase_limit(5000)
sts_identity = get_sts_identity.submit().result()
logger.info(f"{sts_identity['UserId']=} on {sts_identity['Account']=}")
src_objs = get_src_files.submit(src_bucket, src_prefix).result()
src_size_gb = round((sum(image['size'] for image in src_objs)) / 1024 / 1024 / 1024, 4)
src_obj_num = len(src_objs)
logger.info(
f"The supplied flow parameters are queuing {src_obj_num} input images, with a total size of {src_size_gb}GB "
f"for conversion to COG"
)
if dry_run:
logger.info("DRYRUN. Exiting Flow.")
return True
for src_obj in src_objs:
src_path = f"s3://{src_obj['bucket']}/{src_obj['key']}"
logger.info(f"{src_path=}")
time.sleep(0.2) # to not overload the Prefect Cloud API
tiff_to_cog.submit(
src_path=src_path,
dst_s3_bucket=dst_bucket,
dst_s3_key=f"{src_obj['key']}.cog.tif",
cog_config_profile=cog_config_profile,
cog_config_predictor=cog_config_predictor
)
return True
# ======================================================================================================================
# ============================== Local Exec Config
# ======================================================================================================================
if __name__ == "__main__":
# roughly 500 task runs (images) with a total size of ~890GB
# src_bucket is public, dst_bucket needs to be accessible by IAM user/role that is executing the flow
main(src_bucket="spacenet-dataset", dst_bucket="ray-cluster-test-bucket", dry_run=False)