How to use filters in the Prefect 2 API

Here is an example of how to use filters when calling the Prefect 2 REST API from the Python client. This specific example calls read_flow_runs for the current flow’s flow_id in order to log the flow run number.

import asyncio
​
from prefect import task, flow, get_run_logger
from prefect.client import get_client
from prefect.context import get_run_context
from prefect.orion.schemas.filters import FlowFilter, FlowFilterId
​
​
@flow
async def get_count_flow_runs():
    logger = get_run_logger()
    context = get_run_context().flow_run.dict()
    flow_id = context['flow_id']
​
    client = get_client()
    flow_runs = await client.read_flow_runs(
        flow_filter=FlowFilter(id=FlowFilterId(any_=[flow_id]))
    )
​
    logger.info(f"This is run number {len(flow_runs)} of flow {flow_id}")
​
​
if __name__ == "__main__":
    asyncio.run(get_count_flow_runs())
3 Likes

Quick update for anyone finding this in the future: The import path is slightly different now (as of writing, Prefect 2.10.18 is the latest version)

from prefect.client.schemas.filters import FlowFilter, FlowFilterId

@EmilRex, I’m still a bit confused how filters work. The following code snippet should recursively read the IDs from every Flow Run that is completed from what I understood in the documentation.
However, it still returns Flow Runs that are in state RUNNING. It seems like the filter is ignored for whatever reason.

Do you know what the root cause is?

from prefect.client.schemas.filters import FlowRunFilterState
from prefect.client.schemas.objects import StateType

async def paginate_flow_runs(prefect_client, page_size, page_offset=0):
    # https://docs.prefect.io/api-ref/prefect/client/orchestration/#prefect.client.orchestration.PrefectClient.read_flow_runs
    state_filter = FlowRunFilterState(any_=[StateType.COMPLETED, StateType.FAILED, StateType.CRASHED, StateType.CANCELLED])
    flows = await prefect_client.read_flow_runs(limit=page_size, offset=page_offset, flow_run_filter=state_filter)
    
    flow_run_ids = list(map(lambda flow_run: flow_run.id, flows))

    print(f"{page_offset=}")

    if len(flow_run_ids) < page_size:
        return flow_run_ids
    else:
        return flow_run_ids + await paginate_flow_runs(prefect_client, page_size, page_offset=page_size + page_offset)
1 Like

@trahloff I think you need an additional level of filtering with the FlowRunFilterStateType. So the state filter becomes:

from prefect.client.schemas.filters import FlowRunFilterState, FlowRunFilterStateType

...

    state_filter = FlowRunFilterState(
        type=FlowRunFilterStateType(
            any_=[StateType.COMPLETED, StateType.FAILED, StateType.CRASHED, StateType.CANCELLED]
        )
    )

Here’s a similar example that uses name instead of type: How to bulk delete flow runs in a certain state with the API

1 Like

Ahhh, I completely missed the different hierarchical levels of filters. Thanks!