I have a problem with prefect-snowflake and/or ConcurrentTaskRunner and/or asyncio:
I’m trying to run ~10 snowflake queries simultaneously.
And only 4 or 5 or 6 are picked, the rest queries run when some previous query finishes.
It looks that the threads do not “go to sleep” as intended when they wait for snowflake results and because of that other tasks are not picked.
This is a simple reproduction
for x in range(10):
snowflake_multiquery.submit(["select count(*) from (select seq4() from TABLE(GENERATOR(ROWCOUNT => 1000000000000)) )"], snowflake_credentials, as_transaction=transaction)`
This is the code from prefect_snowflake/database.py that causes the problem
with connection.cursor(cursor_type) as cursor:
results = []
for query in queries:
response = cursor.execute_async(query, params=params)
query_id = response["queryId"]
while connection.is_still_running(
connection.get_query_status_throw_if_error(query_id)
):
await asyncio.sleep(0.05)
cursor.get_results_from_sfqid(query_id)
result = cursor.fetchall()
results.append(result)
There is a
asyncio.sleep(0.05)
and I believe that when processor is a bit busy, those “mini sleeps” (0.05s) are not really happening, and other threads cannot start…
When I change 0.05 to for example 1s, I am able to run 10 queries concurrently
I don’t know if it’s issue with snowflake connector or with asyncio or with concurrent task runner…
And that 1s workaround probably isn’t perfect.