Best strategy for long-running consumer jobs?

We are trying to implement a pattern where consumers scan S3 buckets for new files , process these files and then push normalized results onto a topic / store in db / etc.

Curently this is implemented as a Flow that dispatches files to download & process to tasks; we set concurrency limits on tags to manage how many files we process at the same time.

We’ve tried a few approaches here, but all seem to have compromises and I suspect we’re missing a more obvious pattern.

Our basic requirements:

  1. We want to ensure that a single flow is always running or running often enough that we are picking up new objects with minimal latency.
  2. As we are operating at the edge of capacity, we want this to be as efficient as possible. I.e. we don’t want the workers sitting idle if there’s work to do.
  3. We want only a single copy of the flow running – we will control how that gets scheduled into tasks (we’re using DaskTaskRunner).

What we have tried:

  • We initially tried running a single flow that would effectively loop forever looking for files, but the problem we encountered is that if the flow exits for some sort of unexpected error, nothing would ensure that it got restarted. Question: is there a foolproof way to catch all errors that a could happen within the context of a flow? I feel like we still had issues generic try/except clauses but maybe we should revisit this. (I think one issue we also encountered was restarts/redepoys would not cause the job to start again)
  • We have tried running things on a periodic schedule – e.g. “hourly” – and then trying to ensure that we finish up the work within the allotted time. This kinda works, but then the code has to be aware/told of the deployment schedule so it can keep running until the end-time is approach; this feels kinda wrong.
  • We have also tried running on a schedule but then just re-executing the flow (calling it via API at the end of the flow run) on repeat if there’s more work to do. This is still subject to errors possibly interrupting it and can lead to cases where we have two flow runs running at the same time until the first exits, which can complicate things.

None of the approaches we’ve taken seem bullet-proof. Writing them out, it seems like the first one (just run the flow forever) should be revisited as there must be a way to handle this. It would be nice if there was a watchdog functionality that could just ensure that certain flows are always in a running state (and start them, if not). It also feels like we might just be using Prefect for something outside of its intent.