Hanging Active Tasks in my concurrency limit

I’m new to Prefect. I’ve created a concurrency limit and I’m testing my runs by simply running my Python code with a main guard, so it doesn’t have to do with a setup that resembles production at this point.

It works well, but after the flow is run, I get some active tasks holding slots on my concurrency limit and the flow has finished hours ago:

Am I doing something wrong?

Could you share a minimum reproducible example? It might be that your tasks got stuck even if the flow run finished

I’m not sure I can, since it involves some HTTP requests that would need me to expose tokens and secrets.

However, I can try to collect more debug information. The flow is really simple, uses only a single task and is async.

The code is small and looks like this:

class GzippedAzure(Azure):
    """Automatically compress the content with gzip"""
    async def write_path(self, path: str, content: bytes) -> str:  # type: ignore
        az = await AzureBlobStorageCredentials.load(self.bucket_path)
        async with az.get_container_client(self.bucket_path) as container:
            async with container.get_blob_client(path) as blob:
                await blob.upload_blob(
                    gzip.compress(content),
                    content_type='application/json',
                    overwrite=True,
                    content_settings=ContentSettings(
                        content_encoding='GZIP'
                    )
                )

@task(name="download-single-page-foo", tags=['advise'])
async def download_page(page=1):
    logger = get_run_logger()
    logger.info('task start: download_page: page %s', page)
    secret = await Secret.load("api-key")
    items = None
    content = None
    async with aiohttp.ClientSession() as session:
        async with session.get(
                'https://example.com',
            headers={
                'Authorization': f'Bearer {secret.value.get_secret_value()}'
            },
            params={
                'Foo': 'Bar',
            }
        ) as res:
            logger.debug('response: %s: %s', res.status, res.text)
            res.raise_for_status()
            content = await res.read()
            items = (await res.json())['itens']
            num_items = len(items)
            if num_items == 0:
                return
    az_block = await GzippedAzure.load("lake")
    blob_name = f'foo/bar/p{str(page).zfill(5)}.json'
    await az_block.write_path(blob_name, content)
    return items


@flow(name="download")
def download():
    logger = get_run_logger()
    page = 1
    step = 5
    while True:
        start = page
        end = page + step
        logger.debug('downloading pages %s..%s', start, end)
        num_items = 0
        reached_end = False
        futures = []
        for i in range(start, end):
            futures.append(download_page.submit(i))
        for future in futures:
            items = future.wait().result()
            if len(items) == 0:
                # reached a page without items - time to stop
                reached_end = True
            num_items += len(items)
        logger.info('page %s: downloaded %s items', page, num_items)
        if reached_end:
            break
        page += step


if __name__ == "__main__":
    download()

thanks for sharing - those async content managers look a bit complex - can you try running this sync and perhaps without context manager to figure out more incrementally what is the root cause of this? hard to say, I’m afk until 2023 so can’t reproduce for now

You got me in the right track! I’ve been able to solve it by making all my code sync (or, at least, avoiding Python’s asyncio stuff). What was preventing me before was the write_path method from my own GzippedAzure, but then I found out about the decorator @sync_compatible and now all is good.

My final code for GzippedAzure looks like this:

from prefect.utilities.asyncutils import sync_compatible

@sync_compatible
class GzippedAzure(Azure):
    """Automatically compress the content with gzip"""
    async def write_path(self, path: str, content: bytes) -> str:  # type: ignore
        az = await AzureBlobStorageCredentials.load(self.bucket_path)
        async with az.get_container_client(self.bucket_path) as container:
            async with container.get_blob_client(path) as blob:
                await blob.upload_blob(
                    gzip.compress(content),
                    content_type='application/json',
                    overwrite=True,
                    content_settings=ContentSettings(
                        content_encoding='GZIP'
                    )
                )
1 Like

I don’t know what was causing the issue before. Might be a Prefect bug, or a Python bug, but I can’t reproduce it anymore.

1 Like