Flow run crashes with AttributeError: 'Flow' object has no attribute 'should_validate_parameters'

Running on Prefect Server 2.10.8 (non Cloud) with Python 3.10.6:

Version:             2.10.8
API version:         0.8.4
Python version:      3.10.6
Git commit:          79093235
Built:               Mon, May 8, 2023 12:23 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         server

I have a subflow that gets executed as part of of a larger flow that goes like this:

  1. Parent flow
  2. Child flow 1
  3. Child flow 2

Child flow 1 as well as parent flow (up to the point of awaiting child flow 2) run successfully.

Upon running child flow 2, the flow run almost immediately crashes with the following error message in the Agent logs:

/usr/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
  warn(RuntimeWarning(msg))
Engine execution of flow run 'd043a79a-4855-4170-9d8d-fb7137be9fb2' exited with unexpected exception
Traceback (most recent call last):
  File "/home/prefect/.local/lib/python3.10/site-packages/prefect/engine.py", line 2233, in <module>
    enter_flow_run_engine_from_subprocess(flow_run_id)
  File "/home/prefect/.local/lib/python3.10/site-packages/prefect/engine.py", line 202, in enter_flow_run_engine_from_subprocess
    return from_sync.wait_for_call_in_loop_thread(
  File "/home/prefect/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 215, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/prefect/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/prefect/.local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/home/prefect/.local/lib/python3.10/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/prefect/.local/lib/python3.10/site-packages/prefect/engine.py", line 301, in retrieve_flow_then_begin_flow_run
    if flow.should_validate_parameters:
AttributeError: 'Flow' object has no attribute 'should_validate_parameters'
Process 'tremendous-mackerel' exited with status code: 1

I’ve verified that this attribute is present in flows.py:202:

        self.should_validate_parameters = validate_parameters

Both my server and agent are running on the same host, and I’ve verified that both are running the same prefect version (2.10.8). I’ve verified the server version using the /docs endpoint (since it does not list it upon server start) as well as the agent version on the CLI:

prefect@prefect:~$ /home/prefect/.local/bin/prefect agent start --pool default-agent-pool --work-queue default
Starting v2.10.8 agent connected to https://<...>/api...

I’ve looked inside my site-packages and can’t seem to find anything wrong. The only weird indication that something odd may be happening is the first line in the error above:

/usr/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour

but according to this post, this warning can be safely ignored at this time.

My Child flow 2 definition doesn’t do anything special:

@flow(task_runner=ConcurrentTaskRunner())
def update_athena_tables(
    date: Date = pendulum.yesterday(),
    process_full_month: bool = False,
) -> None:
    access_key, secret_key = get_athena_credentials()
    client: AthenaClient = boto3.client(
        "athena",
        region_name="eu-central-1",
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_key.get_secret_value(),
    )

    update_invoices_only_outliers = update_table.submit(
        client,
        table=AthenaTable.intermediate_invoices_only_outliers,
        date=date,
        process_full_month=process_full_month,
    )
    update_invoices_without_outliers = update_table.submit(
        client,
        table=AthenaTable.intermediate_invoices_without_outliers,
        date=date,
        process_full_month=process_full_month,
    )
    update_primary_invoices = update_table.submit(
        client,
        table=AthenaTable.primary_invoices,
        date=date,
        process_full_month=process_full_month,
        waitFor=[update_invoices_without_outliers],
    )


if __name__ == "__main__":
    update_athena_tables()

Since the flow run crashes immediately, I do not think that the actual implementation of update_table should play any role here…

Does anybody have an idea as to how I could debug this further?

After lots of debugging I have found the problem to be the schema validation of the pendulum.Date type:

(.venv) prefect@prefect:~/prefect-flows$ python flow_definitions/script.py 
Traceback (most recent call last):
  File "/home/prefect/prefect-flows/flow_definitions/script.py", line 270, in <module>
    def update_athena_tables(
  File "/home/prefect/prefect-flows/.venv/lib/python3.10/site-packages/prefect/flows.py", line 712, in flow
    Flow(
  File "/home/prefect/prefect-flows/.venv/lib/python3.10/site-packages/prefect/context.py", line 178, in __register_init__
    __init__(__self__, *args, **kwargs)
  File "/home/prefect/prefect-flows/.venv/lib/python3.10/site-packages/prefect/flows.py", line 201, in __init__
    self.parameters = parameter_schema(self.fn)
  File "/home/prefect/prefect-flows/.venv/lib/python3.10/site-packages/prefect/utilities/callables.py", line 296, in parameter_schema
    pydantic.create_model(
  File "pydantic/main.py", line 664, in pydantic.main.BaseModel.schema
  File "pydantic/schema.py", line 187, in pydantic.schema.model_schema
  File "pydantic/schema.py", line 581, in pydantic.schema.model_process_schema
  File "pydantic/schema.py", line 622, in pydantic.schema.model_type_schema
  File "pydantic/schema.py", line 255, in pydantic.schema.field_schema
  File "pydantic/schema.py", line 527, in pydantic.schema.field_type_schema
  File "pydantic/schema.py", line 924, in pydantic.schema.field_singleton_schema
  File "/usr/lib/python3.10/abc.py", line 123, in __subclasscheck__
    return _abc_subclasscheck(cls, subclass)
TypeError: issubclass() arg 1 must be a class

Exactly how and why this leads to the above error is unfortunately still a mystery to me. I’ll try to create a MRE example and create an issue for it.

The root of the issue seems to be `from __future__ import annotations` causes flow initialization to fail with `issubclass arg must be a class` · Issue #7502 · PrefectHQ/prefect · GitHub.

Solution: remove from __future__ import annotations. After that, the above error disappeared. Still unsure as to how these things interact exactly. ¯_(ツ)_/¯