Release resources on task timeout

I’m having an issue with proper clean up of acquired resources on task timeouts.
I have a Process deployment for the following flow and task, that connects to a Dask.distributed cluster in the following way:

from distributed import Client
from prefect import task, flow

@task(timeout_seconds=5)
def my_task():
    client = Client("127.0.0.1:8787")
    # some long computational operation
    client.close()

@flow
def flow():
    my_task()

Unfortunately, when the timeout goes off, even though there is an exception thrown, the client seems not to be deallocated, and the execution of the computation on the Dask graph keeps going.
I was hoping the Dask client would go out of scope, its connection be forcefully closed, and the computation be canceled from the cluster, but apparently, it is more complicated than this.
Any suggestion on how to make sure resources are properly released? Thanks!

1 Like

We have a WIP about server-side timeout implementation. Can you try doing it on a future level? e.g. when you submit a task to a DaskTaskRunner, you can do:

@flow(task_runner=DaskTaskRunner(...))
def test_flow():
	future = test_task.submit()
	future.wait(10)

here 10 means 10 seconds before timeout

I’m not sure I understand your suggestion, I do not use a DaskTaskRunner, but a SequentialTaskRunner, and I want the task to create a connection to the Dask.distributed cluster from within the task itself - I could go into the reasons why I choose to do it like this, but not sure it is relevant.
I believe I would have the same issue with an SQL connection not being released on timeout, or other similar situations. Does this make sense?

Yes it does, I understand. As I said:

1 Like