How to add retries when processing files and know which files failed to get processed?

View in #prefect-community on Slack

Discussion

Jason_Thomas @Jason_Thomas: Hi all, version 2.0b8

I’m playing with flow retries on and got a result I didn’t expect. I’ll post code in the thread, but here’s what’s happening:

  • flow has one retry
  • I call flow, passing in a list of 4 unprocessed ‘files’
  • retry 0:
    • flow filters the list to remove any processed files, leaving 4 files to process
    • flow iterates over the filtered list, calling task on each unprocessed file
    • task processes each file
    • on one file, task raises an error
    • flow continues to run, processing 3 out of 4 files
    • flow finishes in state AwaitingRetry
  • retry 1:
    • flow filters the list, leaving 1 file to process
    • flow iterates over the filtered list, but does not call task
    • flow finishes in state Completed

I’m guessing task is not called on the retry because the result has been cached and there is no reason to try again. That’s fine. However, I expected the final state of the task to be Failed. Is this behaving as intended?
Here’s the code

from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner
from app import log

class FileClass():

    def __init__(self, name):
        self.name: str = name
        self.status:str = 'Found'

    def is_processed(self):
        return (self.status == 'Processed')

    def __repr__(self):
        return (f"FileClass(name: {self.name}, status: {self.status})")

@task 
def t1(file):
    try:
        log(f"trying: {file.name}")
        if '4' in file.name:
            raise Exception(f"Invalid character: {4}")
    except Exception as e:
        file.status = 'Failed'
        log(f'error on {file.name}: {e}')
        raise e
    else:
        file.status = 'Processed'
        log(f"success: {file.name}")
    finally:
        log(f"new status for {file.name}: {file.status}")


@flow(task_runner=SequentialTaskRunner, 
      retries=3, retry_delay_seconds=2) 
def f1(files):
    files_to_process = [file for file in files if file.status != 'Processed']
    log(f"Processing: {files_to_process}")
    for file in files_to_process:
        t1(file)
    log(f"Processed: {files_to_process}")


if __name__ == '__main__':
    names = [
        'file_0',
        'file_2',
        'file_4',
        'file_6',
    ]
    files = [FileClass(name) for name in names]
    f1(files)

Anna_Geller @Anna_Geller: what is this line? from app import log

could you try using Prefect logger instead?

Jason_Thomas @Jason_Thomas: It’s just a wrapper for get_run_logger. Don’t think it’s involved, but I’l replace it and rerun.
No change. Here’s the code:



from prefect import task, flow, get_run_logger
from prefect.task_runners import SequentialTaskRunner

class FileClass():

    def __init__(self, name):
        self.name: str = name
        self.status:str = 'Found'

    def is_processed(self):
        return (self.status == 'Processed')

    def __repr__(self):
        return (f"FileClass(name: {self.name}, status: {self.status})")

@task 
def t1(file):
    try:
        get_run_logger().info(f"trying: {file.name}")
        if '4' in file.name:
            raise Exception(f"Invalid character: {4}")
    except Exception as e:
        file.status = 'Failed'
        get_run_logger().info(f'error on {file.name}: {e}')
        raise e
    else:
        file.status = 'Processed'
        get_run_logger().info(f"success: {file.name}")
    finally:
        get_run_logger().info(f"new status for {file.name}: {file.status}")


@flow(task_runner=SequentialTaskRunner, 
      retries=3, retry_delay_seconds=2) 
def f1(files):
    files_to_process = [file for file in files if file.status != 'Processed']
    get_run_logger().info(f"Processing: {files_to_process}")
    for file in files_to_process:
        t1(file)
    get_run_logger().info(f"Processed: {files_to_process}")

    # if not all(f.is_processed() for f in files_to_process):
    #     raise Exception(f"Not all files are complete.")


if __name__ == '__main__':
    names = [
        'file_0',
        'file_2',
        'file_4',
        'file_6',
    ]
    files = [FileClass(name) for name in names]
    f1(files)

And the logs:

06:17:01.151 | INFO    | prefect.flow_runs - Using task runner 'SequentialTaskRunner'
06:17:01.201 | WARNING | prefect.flow_runs - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
06:17:01.319 | INFO    | prefect.flow_runs - Processing: [FileClass(name: file_0, status: Found), FileClass(name: file_2, status: Found), FileClass(name: file_4, status: Found), FileClass(name: file_6, status: Found)]
06:17:01.416 | INFO    | prefect.flow_runs - Created task run 't1-daa10d70-0' for task 't1'
06:17:01.503 | INFO    | prefect.task_runs - trying: file_0
06:17:01.504 | INFO    | prefect.task_runs - success: file_0
06:17:01.505 | INFO    | prefect.task_runs - new status for file_0: Processed
06:17:01.584 | INFO    | prefect.task_runs - Finished in state Completed()
06:17:01.652 | INFO    | prefect.flow_runs - Created task run 't1-daa10d70-1' for task 't1'
06:17:01.723 | INFO    | prefect.task_runs - trying: file_2
06:17:01.724 | INFO    | prefect.task_runs - success: file_2
06:17:01.725 | INFO    | prefect.task_runs - new status for file_2: Processed
06:17:01.796 | INFO    | prefect.task_runs - Finished in state Completed()
06:17:01.863 | INFO    | prefect.flow_runs - Created task run 't1-daa10d70-2' for task 't1'
06:17:01.934 | INFO    | prefect.task_runs - trying: file_4
06:17:01.935 | INFO    | prefect.task_runs - error on file_4: Invalid character: 4
06:17:01.936 | INFO    | prefect.task_runs - new status for file_4: Failed
06:17:01.938 | ERROR   | prefect.task_runs - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/engine.py", line 890, in orchestrate_task_run
    result = await run_sync_in_interruptible_worker_thread(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 116, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "/Users/jason.thomas/dev/sftp_sweeper/_flow_retry_1.py", line 28, in t1
    raise e
  File "/Users/jason.thomas/dev/sftp_sweeper/_flow_retry_1.py", line 24, in t1
    raise Exception(f"Invalid character: {4}")
Exception: Invalid character: 4
06:17:02.044 | ERROR   | prefect.task_runs - Finished in state Failed('Task run encountered an exception.')
06:17:02.112 | INFO    | prefect.flow_runs - Created task run 't1-daa10d70-3' for task 't1'
06:17:02.186 | INFO    | prefect.task_runs - trying: file_6
06:17:02.187 | INFO    | prefect.task_runs - success: file_6
06:17:02.188 | INFO    | prefect.task_runs - new status for file_6: Processed
06:17:02.262 | INFO    | prefect.task_runs - Finished in state Completed()
06:17:02.263 | INFO    | prefect.flow_runs - Processed: [FileClass(name: file_0, status: Processed), FileClass(name: file_2, status: Processed), FileClass(name: file_4, status: Failed), FileClass(name: file_6, status: Processed)]
06:17:02.363 | INFO    | prefect.flow_runs - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
06:17:03.463 | INFO    | prefect.flow_runs - Processing: [FileClass(name: file_4, status: Failed)]
06:17:03.499 | INFO    | prefect.flow_runs - Created task run 't1-daa10d70-0' for task 't1'
06:17:03.573 | INFO    | prefect.flow_runs - Processed: [FileClass(name: file_4, status: Failed)]
06:17:03.630 | INFO    | prefect.flow_runs - Finished in state Completed('All states completed.')

I’m guessing the unexpected outcome is related to the caching behavior, because when I change it to eliminating the caching I get a different result.

edit: Nevermind, the change I was referring to still did not result in the flow failing.

Anna_Geller @Anna_Geller: I believe there are several things to consider here:
• to determine the final state if a flow run, you would need to either return a state directly or return a task run result that you care about - the one that should determine whether the flow run was successful or not
• you use try/except/finally which trap exceptions and make it hard to test the actual Prefect exception handling
• the class FileClass (being a class!) is stateful and may cause side effects - perhaps returning the object from a function makes it easier to test out the behavior you want
• you need to differentiate between flow and task level retries - the flow retry is for the final state of a flow run and I believe the entire file processing could be better handled on the task level, depending on what you try to accomplish in this example
This docs page may help here https://orion-docs.prefect.io/concepts/flows/#final-state-determination

Flows - Prefect 2.0

debug logs may be helpful too:

prefect config set PREFECT_LOGGING_LEVEL='DEBUG'

Jason_Thomas @Jason_Thomas: Thanks Anna, I’ll look at the docs and consider your points.

I have the debug-level logs but didn’t notice anything helpful there. I can post them if you like.

Regarding the structure (try/except/finally, flow-level retries, loop in flow and call task on each element), it’s the simplest way I’ve found to accomplish the behavior I want. If there’s a better way please let me know.

Problem

Here’s what I’m trying to do:

• Pass in a list containing details of files I want to process.
• For each element in the list:
◦ check an attribute (is_processed) and skip if it’s already been processed
◦ insert or update to a db table indicating the current status of the file
◦ try to process the file
◦ on success:
:black_small_square:︎ update the attribute to indicate processed
◦ on error
◦ update the attribute to indicate failed
◦ mark the flow as failed (to trigger retry)
:black_small_square:︎ but continue looping
◦ on either success or error:
:black_small_square:︎ update the db table with the new status

Anna_Geller @Anna_Geller: > • For each element in the list:
> ◦ check an attribute (is_processed) and skip if it’s already been processed
I’d suggest looking it up in some other way e.g. even on a JSON block since modifying a class state this way from a flow may lead to undesired behavior if you leverage e.g. ConcurrentTaskRunner or move to dask/ray or any form of parallelism/concurrency

the ideal state is when all tasks and flows are stateless and the only state is something that tasks pass between each other as data dependency or when the state is stored external to the flow run e.g. as key-value pair using JSON Block or some other store (DB, Redis, etc)
in some way, your is_processed is like a Completed state in Prefect - perhaps instead of using it you can simply check if the task run’s state was successful or not and continue downstream based on that Prefect state rather than this class’s state

Jason_Thomas @Jason_Thomas: Thanks Anna, I will take this all into consideration. But I’m afraid I’ve glossed over the main point here.

My main concern is this:
• The outcome of the flow’s first attempt ended in a failed state. This triggered a retry
• The outcome of the flow’s retry was the same as the first attempt, but it ended in a different state

Anna_Geller @Anna_Geller: :thinking_face:
so IMO with this step

update the db table with the new status

you kind of are doing orchestration within an orchestrator :slightly_smiling_face:

Solution

I believe what you want to accomplish is using task-level retries with some way of knowing which files failed to get processed in order to e.g. investigate why and process those manually later on (perhaps you would check manually and discover it failed to process due to some weird delimiter or a single bad row)

Here’s how I would do it - you can replace Slack with any other action based on what you want to do with failed files: Retries when processing files in Prefect 2.0 · GitHub

import os
import random
import requests
from prefect import task, flow
from prefect.orion.schemas.states import Completed, Failed
from prefect.futures import PrefectFuture
from typing import List


@task(retries=3, retry_delay_seconds=2)
def process_file(file_name: str):
    return random.choice(
        [
            Completed(message=f"File {file_name} processed successfully ✅"),
            Failed(message=f"File {file_name} failed ❌"),
        ]
    )


def send_slack_alert_on_failure(task_future: PrefectFuture, file_name: str):
    task_future.wait()
    if task_future.get_state().is_failed():
        name_ = task_future.task_run.name
        id_ = task_future.task_run.flow_run_id
        requests.post(
            os.environ["SLACK_WEBHOOK_URL"],
            json={
                "text": f"File {file_name} failed ❌ in task `{name_}` in a flow run `{id_}`."
            },
        )


@flow
def process_files(files_to_process: List[str]):
    for file in files_to_process:
        file_state = process_file(file)
        send_slack_alert_on_failure(file_state, file)


if __name__ == "__main__":
    files_param = [f"file_{i}" for i in range(1, 20)]
    process_files(files_param)

as a result after all 3 retries per file, eventually only one out of 19 files failed and I got a message telling me which one failed - see image

and the final flow run state is failed even though only one file failed

15:30:37.565 | ERROR   | Flow run 'airborne-rooster' - Finished in state Failed('1/19 states failed.')

image