View in #prefect-community on Slack
@Marwan_Sarieddine: Hi folks - a question about prefect cloud task version locking
Please see a brief of the issue below
We have a flow which runs on a daily basis and was facing the following issue on a day-to-day:
The flow runs a task that checks if data is available in some s3 locations - if it is not available it will generate the data and then generate some artifacts (s3 artifacts not prefect artifacts) related to the data.
The task run successfully generates the data for the missing s3 locations but runs out of memory when attempting to generate the artifacts related to the data.
Dask then attempts to restart the worker, which results in restarting the task run. Given the data is now available in the s3 locations, the task run will skip generating artifacts and as such the task and flow run will complete successfully.
The consequence of this is we are missing these artifacts and given this was a silent failure we only found out about this two weeks later when we tried to fetch these artifacts.
My question is:
The flow has version locking enabled. Why did the version locking not throw an error to ensure the task runs only once? Are there any other settings we can use to ensure this is raised to error ? Is the current recommendation to configure dask to avoid restarts ?
(Happy to share links to the flow/flow run or task run via DM if need be)
@Anna_Geller: Just to understand: you’re missing the artifacts because the task run ran out of memory, correct? And when this happens, those artifacts are not generated properly and you would like to restart in such case? or not?
I’m wondering whether instead of version locking, something like this would be helpful:
• before you generate the artifacts, you have a task that checks if those artifacts already exist. If so, you raise ENDRUN or FAILED depending on whether you want to retry that task on failure or not
• raising FAILED signal respects retries. But if you don’t want the task to retry, you can use ENDRUN
@Kevin_Kho: Is the generation of data and generation of the artifact all 1 task or split into 2 tasks?
To your questions though, flow version locking is to stop a completed task from running twice. In the case you are describing, I think the first pass is not finishing due to the OOM. The worker is dying so the final state is not reported to Prefect. This is why version locking doesn’t stop another run from triggering.
To prevent the restarts, you need to look at configuring the
nanny service, but I’m not finding specific docs about that
@Marwan_Sarieddine: Hi @Anna_Geller and @Kevin_Kho thank you for your quick responses
> Just to understand: you’re missing the artifacts because the task run ran out of memory, correct? And when this happens, those artifacts are not generated
> And you would like to restart in such case? or not?
I would like the task run to fail
> Is the generation of data and generation of the artifact all 1 task or split into 2 tasks?
it is in 1 task run
> . The worker is dying so the final state is not reported to Prefect.
The strange thing however is that on restart the worker will report to Prefect that they are running the task - it seems from what you are describing the locking doesn’t take that into account
It is true that perhaps splitting this into separate tasks would help for this specific case but I am trying to better understand how task version locking is working
@Alex_Papanicolaou: @Marwan_Sarieddine didn’t we configure Dask to not use the nanny?
@Kevin_Kho: When the task is run again by Dask, you are right that it will tell Prefect it’s in the running state. From Prefect’s point of view though, it won’t prevent that task run from running because from what it knows, the task did not run successfully so it needs to be re-ran.
Maybe it will be clearer if I illustrate a scenario. Let’s say there is a task graph like A -> B -> C. These get submitted to the Dask worker in one piece. And then let’s say A succeeded, B succeeded, but C had an OOM issue. When the Dask worker restarts again, it will run the whole task graph again A -> B -> C. Prefect’s version locking will know A succeeded already, so that won’t be re-ran. B also succeeded already, so that won’t be re-ran. But then C had no record of completing because the worker died so Prefect will let that continue to run. There was no final state (Failed or Succeeded) reported because of the OOM. Does that make things clearer?
@Marwan_Sarieddine: Hi Kevin - yes I suppose that clarifies things - so task version locking is there to prevent a task from running to completion more than once - and not from running more than once
@Kevin_Kho: Yes exactly
@Alex_Papanicolaou: @Kevin_Kho how do we get the OOM to trigger a task failure so it alerts us?
@Kevin_Kho: You can’t because that is literally the Python process dying so any code that follows will not run (with or without Prefect)
On the Prefect side, you can maybe set up an Automation that sends a message when the Flow is taking too long?
Maybe you can use a Nanny Plugin? Docs here. And then you can put your message there? I have not dug into these myself.