Apparently there’s an open issue on prefect-dbt referring a Prefect PR Fixes sync calling async subflows, not sure how I missed it
I rewrote the method like so (after trying various combinations with/without return
with/without await
:
@flow(name='DBT Incremental build')
async def trigger_dbt_build():
dbt_api_key_secret = await Secret.load("dbt-api-key")
dbt_api_key = dbt_api_key_secret.get()
dbt_account_id_secret = await Secret.load("dbt-account-id")
dbt_account_id = int(dbt_account_id_secret.get())
return await trigger_dbt_cloud_job_run_and_wait_for_completion(
dbt_cloud_credentials=DbtCloudCredentials(
api_key=dbt_api_key,
account_id=dbt_account_id
),
job_id=DBT_UPDATE_PROD_INCREMENTAL_JOB_ID,
max_wait_seconds=1200
)
and now the dbt job is still created and runs normally, but the flow fails right afterwards with the following error:
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 589, in orchestrate_flow_run
result = await flow_call()
File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cloud/jobs.py", line 289, in trigger_dbt_cloud_job_run_and_wait_for_completion
final_run_status, run_data = await wait_for_dbt_cloud_job_run(
File "/usr/local/lib/python3.10/site-packages/prefect/client/orion.py", line 80, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 429, in create_and_begin_subflow_run
parameters = await resolve_inputs(parameters)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1407, in resolve_inputs
return await run_sync_in_worker_thread(
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 307, in visit_collection
items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 307, in <listcomp>
items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 273, in visit_nested
return visit_collection(
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 281, in visit_collection
result = visit_fn(expr)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1400, in resolve_input
raise UpstreamTaskError(
prefect.exceptions.UpstreamTaskError: Upstream task run 'None' did not reach a 'COMPLETED' state.
01:07:28 PM
Crash detected! Execution was interrupted by an unexpected exception.