Here is an example of how to use filters when calling the Prefect 2 REST API from the Python client. This specific example calls read_flow_runs for the current flow’s flow_id in order to log the flow run number.
import asyncio
from prefect import task, flow, get_run_logger
from prefect.client import get_client
from prefect.context import get_run_context
from prefect.orion.schemas.filters import FlowFilter, FlowFilterId
@flow
async def get_count_flow_runs():
logger = get_run_logger()
context = get_run_context().flow_run.dict()
flow_id = context['flow_id']
client = get_client()
flow_runs = await client.read_flow_runs(
flow_filter=FlowFilter(id=FlowFilterId(any_=[flow_id]))
)
logger.info(f"This is run number {len(flow_runs)} of flow {flow_id}")
if __name__ == "__main__":
asyncio.run(get_count_flow_runs())
Quick update for anyone finding this in the future: The import path is slightly different now (as of writing, Prefect 2.10.18 is the latest version)
from prefect.client.schemas.filters import FlowFilter, FlowFilterId
@EmilRex, I’m still a bit confused how filters work. The following code snippet should recursively read the IDs from every Flow Run that is completed from what I understood in the documentation.
However, it still returns Flow Runs that are in state RUNNING. It seems like the filter is ignored for whatever reason.