I’m having an issue with proper clean up of acquired resources on task timeouts.
I have a Process deployment for the following flow and task, that connects to a Dask.distributed cluster in the following way:
from distributed import Client
from prefect import task, flow
@task(timeout_seconds=5)
def my_task():
client = Client("127.0.0.1:8787")
# some long computational operation
client.close()
@flow
def flow():
my_task()
Unfortunately, when the timeout goes off, even though there is an exception thrown, the client seems not to be deallocated, and the execution of the computation on the Dask graph keeps going.
I was hoping the Dask client would go out of scope, its connection be forcefully closed, and the computation be canceled from the cluster, but apparently, it is more complicated than this.
Any suggestion on how to make sure resources are properly released? Thanks!