AttributeError: 'NoneType' object has no attribute 'call' on sync_portal

Hello,
I’m at a loss to explain why this flow fails and succeeds at the same time… It only consists in triggering a dbt build, which succeeds, but somehow the flow itself fails.

Here’s the error:

The error message:

    return flow_run_context.sync_portal.call(begin_run)
AttributeError: 'NoneType' object has no attribute 'call'

And the code:

@flow(name='DBT Incremental build')
def trigger_dbt_build():
    run_result = trigger_dbt_cloud_job_run_and_wait_for_completion(
        dbt_cloud_credentials=DbtCloudCredentials(
            api_key=Secret.load("dbt-api-key").get(),
            account_id=int(Secret.load("dbt-account-id").get())
        ),
        job_id=DBT_UPDATE_PROD_INCREMENTAL_JOB_ID,
        max_wait_seconds=1200
    )
    return run_result

@flow
def run_pipeline():
    run_fivetran()
    trigger_dbt_build()
    sync_metabase()

Thanks for your help

1 Like

I think the issue might be with returning something

Can you try this?

@flow(name='DBT Incremental build')
def trigger_dbt_build():
    trigger_dbt_cloud_job_run_and_wait_for_completion(
        dbt_cloud_credentials=DbtCloudCredentials(
            api_key=Secret.load("dbt-api-key").get(),
            account_id=int(Secret.load("dbt-account-id").get())
        ),
        job_id=DBT_UPDATE_PROD_INCREMENTAL_JOB_ID,
        max_wait_seconds=1200
    )

@flow
def run_pipeline():
    run_fivetran()
    trigger_dbt_build()
    sync_metabase()

btw for DbtCloudCredentials, you should be able to set this up once in the UI e.g. with a block name “prod”, and then only load it in your flow like so:

from prefect_dbt.cloud.credentials import DbtCloudCredentials


@flow(name='DBT Incremental build')
def trigger_dbt_build():
    trigger_dbt_cloud_job_run_and_wait_for_completion(
        dbt_cloud_credentials=DbtCloudCredentials.load("prod"),
        job_id=DBT_UPDATE_PROD_INCREMENTAL_JOB_ID,
        max_wait_seconds=1200
    )
1 Like

Thanks for your reply! I actually had also tried to include the trigger_dbt_cloud_job_run_and_wait_for_completion in run_pipeline(), to no avail. Just tried again to make sure, I’ll keep you posted!
As a side note, would you be able to explain how returning something might be an issue?

1 Like

So I inlined trigger_dbt_cloud_job_run_and_wait_for_completion in the main method as well as hardcoded the credentials, but it still fails with the same error…

1 Like

Can you share your prefect version output?

Also what prefect-dbt version are you currently using? Can yu try upgrading both packages in a new Conda environment?

Both versions are the latest, running in a Docker container on a remote server

# docker exec -it root-agent-1 prefect --version
2.4.0
# docker exec -it root-agent-1 pip list | grep dbt
prefect-dbt        0.2.2

Here’s the whole stacktrace from the main flow

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 595, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/tmp/tmpovddxriwprefect/flows/yespark_pipeline.py", line 31, in run_pipeline
    trigger_dbt_cloud_job_run_and_wait_for_completion(
  File "/usr/local/lib/python3.10/site-packages/prefect/flows.py", line 384, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 162, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 137, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 103, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 521, in create_and_begin_subflow_run
    return terminal_state.result()
  File "/usr/local/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 143, in result
    raise data
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 588, in orchestrate_flow_run
    result = await flow_call()
  File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cloud/jobs.py", line 287, in trigger_dbt_cloud_job_run_and_wait_for_completion
    run_id_future = get_run_id.submit(triggered_run_data_future)
  File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 492, in submit
    return enter_task_run_engine(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 740, in enter_task_run_engine
    return flow_run_context.sync_portal.call(begin_run)
AttributeError: 'NoneType' object has no attribute 'call'

Could you share the full flow code? it looks like the part that you shared works, but only the one using Fivetran doesn’t.

Actually I might not have been clear enough, but launching the dbt flow independently, without a return as you suggested earlier, fails in the same way :confused:

Can you try the same using the free version of Cloud: https://app.prefect.cloud/? It would be interesting to see if the issue is in the orchestration layer or execution layer

Good afternoon Anna,
I tried running everything locally (as opposed to Prefect Cloud UI <-> self-hosted remote agent), still fails with the same error :confused:

1 Like

Apparently there’s an open issue on prefect-dbt referring a Prefect PR Fixes sync calling async subflows, not sure how I missed it :slightly_frowning_face:

I rewrote the method like so (after trying various combinations with/without return with/without await:

@flow(name='DBT Incremental build')
async def trigger_dbt_build():
    dbt_api_key_secret = await Secret.load("dbt-api-key")
    dbt_api_key = dbt_api_key_secret.get()
    dbt_account_id_secret = await Secret.load("dbt-account-id")
    dbt_account_id = int(dbt_account_id_secret.get())

    return await trigger_dbt_cloud_job_run_and_wait_for_completion(
        dbt_cloud_credentials=DbtCloudCredentials(
            api_key=dbt_api_key,
            account_id=dbt_account_id
        ),
        job_id=DBT_UPDATE_PROD_INCREMENTAL_JOB_ID,
        max_wait_seconds=1200
    )

and now the dbt job is still created and runs normally, but the flow fails right afterwards with the following error:

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 589, in orchestrate_flow_run
    result = await flow_call()
  File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cloud/jobs.py", line 289, in trigger_dbt_cloud_job_run_and_wait_for_completion
    final_run_status, run_data = await wait_for_dbt_cloud_job_run(
  File "/usr/local/lib/python3.10/site-packages/prefect/client/orion.py", line 80, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 429, in create_and_begin_subflow_run
    parameters = await resolve_inputs(parameters)
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1407, in resolve_inputs
    return await run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 307, in visit_collection
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 307, in <listcomp>
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 273, in visit_nested
    return visit_collection(
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py", line 281, in visit_collection
    result = visit_fn(expr)
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1400, in resolve_input
    raise UpstreamTaskError(
prefect.exceptions.UpstreamTaskError: Upstream task run 'None' did not reach a 'COMPLETED' state.
01:07:28 PM
Crash detected! Execution was interrupted by an unexpected exception.
1 Like

Interesting, thanks so much for digging deeper. Just curious, if you don’t poll for the job status and don’t wait for its completion, does it work then?

@anna_geller: just triggering the job works completely fine.

2 Likes

@bogdan.cioata Thanks for chiming in! So to date you’re still having this issue? Just upgraded to 2.4.5 and it’s still occurring on every run…

I’d encourage you to chime in on the GitHub issue on the prefect-dbt repo dbt flow fails to sync back · Issue #67 · PrefectHQ/prefect-dbt · GitHub - this will make it easier to follow once this is fixed

FIXED!

1 Like

thanks for reporting back :heart:

1 Like