Running on Prefect Server 2.10.8
(non Cloud) with Python 3.10.6
:
Version: 2.10.8
API version: 0.8.4
Python version: 3.10.6
Git commit: 79093235
Built: Mon, May 8, 2023 12:23 PM
OS/Arch: linux/x86_64
Profile: default
Server type: server
I have a subflow that gets executed as part of of a larger flow that goes like this:
- Parent flow
- Child flow 1
- Child flow 2
Child flow 1 as well as parent flow (up to the point of awaiting child flow 2) run successfully.
Upon running child flow 2, the flow run almost immediately crashes with the following error message in the Agent logs:
/usr/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
warn(RuntimeWarning(msg))
Engine execution of flow run 'd043a79a-4855-4170-9d8d-fb7137be9fb2' exited with unexpected exception
Traceback (most recent call last):
File "/home/prefect/.local/lib/python3.10/site-packages/prefect/engine.py", line 2233, in <module>
enter_flow_run_engine_from_subprocess(flow_run_id)
File "/home/prefect/.local/lib/python3.10/site-packages/prefect/engine.py", line 202, in enter_flow_run_engine_from_subprocess
return from_sync.wait_for_call_in_loop_thread(
File "/home/prefect/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 215, in wait_for_call_in_loop_thread
return call.result()
File "/home/prefect/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/home/prefect/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/home/prefect/.local/lib/python3.10/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
return await fn(*args, **kwargs)
File "/home/prefect/.local/lib/python3.10/site-packages/prefect/engine.py", line 301, in retrieve_flow_then_begin_flow_run
if flow.should_validate_parameters:
AttributeError: 'Flow' object has no attribute 'should_validate_parameters'
Process 'tremendous-mackerel' exited with status code: 1
I’ve verified that this attribute is present in flows.py:202
:
self.should_validate_parameters = validate_parameters
Both my server and agent are running on the same host, and I’ve verified that both are running the same prefect version (2.10.8
). I’ve verified the server version using the /docs
endpoint (since it does not list it upon server start) as well as the agent version on the CLI:
prefect@prefect:~$ /home/prefect/.local/bin/prefect agent start --pool default-agent-pool --work-queue default
Starting v2.10.8 agent connected to https://<...>/api...
I’ve looked inside my site-packages
and can’t seem to find anything wrong. The only weird indication that something odd may be happening is the first line in the error above:
/usr/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
but according to this post, this warning can be safely ignored at this time.
My Child flow 2
definition doesn’t do anything special:
@flow(task_runner=ConcurrentTaskRunner())
def update_athena_tables(
date: Date = pendulum.yesterday(),
process_full_month: bool = False,
) -> None:
access_key, secret_key = get_athena_credentials()
client: AthenaClient = boto3.client(
"athena",
region_name="eu-central-1",
aws_access_key_id=access_key,
aws_secret_access_key=secret_key.get_secret_value(),
)
update_invoices_only_outliers = update_table.submit(
client,
table=AthenaTable.intermediate_invoices_only_outliers,
date=date,
process_full_month=process_full_month,
)
update_invoices_without_outliers = update_table.submit(
client,
table=AthenaTable.intermediate_invoices_without_outliers,
date=date,
process_full_month=process_full_month,
)
update_primary_invoices = update_table.submit(
client,
table=AthenaTable.primary_invoices,
date=date,
process_full_month=process_full_month,
waitFor=[update_invoices_without_outliers],
)
if __name__ == "__main__":
update_athena_tables()
Since the flow run crashes immediately, I do not think that the actual implementation of update_table
should play any role here…
Does anybody have an idea as to how I could debug this further?