I’m new to Prefect. I’ve created a concurrency limit and I’m testing my runs by simply running my Python code with a main guard, so it doesn’t have to do with a setup that resembles production at this point.
It works well, but after the flow is run, I get some active tasks holding slots on my concurrency limit and the flow has finished hours ago:
thanks for sharing - those async content managers look a bit complex - can you try running this sync and perhaps without context manager to figure out more incrementally what is the root cause of this? hard to say, I’m afk until 2023 so can’t reproduce for now
You got me in the right track! I’ve been able to solve it by making all my code sync (or, at least, avoiding Python’s asyncio stuff). What was preventing me before was the write_path method from my own GzippedAzure, but then I found out about the decorator @sync_compatible and now all is good.
My final code for GzippedAzure looks like this:
from prefect.utilities.asyncutils import sync_compatible
@sync_compatible
class GzippedAzure(Azure):
"""Automatically compress the content with gzip"""
async def write_path(self, path: str, content: bytes) -> str: # type: ignore
az = await AzureBlobStorageCredentials.load(self.bucket_path)
async with az.get_container_client(self.bucket_path) as container:
async with container.get_blob_client(path) as blob:
await blob.upload_blob(
gzip.compress(content),
content_type='application/json',
overwrite=True,
content_settings=ContentSettings(
content_encoding='GZIP'
)
)