Prefect 1.0 -> Prefect 2.0 Graphql API to REST API

Summary

Prefect supports an API that allows users the ability to provide information about and in some cases manipulate Prefect objects, flows/tasks/etc.

In Prefect 1, this was supported through a GraphQL API. While being flexible for the end user it could often be confusing making it not particularly user friendly in addition to being fairly heavy, this is why the Prefect Server container is so large.

In Prefect 2 this is supported by a Rest API. There were many reasons for this change. First and foremost, many users are more familiar with REST API format, making it more accessible. Second, the REST API is far more lightweight so less work is required to interact with it. Our CTO Chris White explains in a bit more detail in this video from a previous livestream.

Audience

If you’re looking for guidance to get started working with Prefect 2’s RestAPI with a locally hosted Prefect server, then this article is for you.

What is Staying the Same

Many of the same or similar Mutations are still supported with the Rest API through various endpoints. Users are still able to utilize the prefect client to interact with the api in their code.

What is Different

Users are no longer required to construct a GraphQL query to interact with the API. Instead they can make calls to specified endpoints each with their own supported parameters, filters, and sorting behavior. The Prefect 2 API is asynchronous and calls to it must be awaited.

How to Convert from 1 to 2

Below are two examples of submitting calls to the GraphQl API and RestAPI respectively to get the last 5 flow runs sorted by time:

from prefect import Client
import os

api_key = os.getenv("api_key")

client = Client(api_key=api_key)


test =client.graphql(
            {
                "query": {
                    "flow_run(limit: 5, order_by: { created: desc })": {
                        'id',
                        "created"
                    }
                }
            }
        )
        

print(test)
from prefect.client import get_client
from prefect.server.schemas.sorting import FlowRunSort
import asyncio


async def get_flow_runs():
    client = get_client()
    r = await client.read_flow_runs(limit=5, sort=FlowRunSort.END_TIME_DESC)
    return r


r = asyncio.run(get_flow_runs())

for flow in r:
    print(flow.name, flow.flow_id, flow.created)

if __name__ == "__main__":
    asyncio.run(get_flow_runs())

Note that in Prefect 2 the api call returns a list for flow runs schemas for each flow which can be used to easily reference the values stored in each schema. The Current API context is inferred from the currently selected profile in your environment.

Links

Troubleshooting

Since the Prefect 2 client is asynchronous it is necessary to await any calls made with the client you can reference Python’s docs for more information on async functions: Coroutines and Tasks — Python 3.11.3 documentation,

4 Likes

Hi @Mason_Menges,
I copy-pasted your code and got the following error:

sys:1: RuntimeWarning: coroutine 'get_flow_runs' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Did you not try it or is it solely an error on my end?
Thanks for your help!

1 Like

I edited and added something, can you try again?

1 Like

Works flawlessly! Just getting into Prefect (after too many hurdles with Airflow) and I’m really enjoying it so far, not least thanks to how impressively active and helpful the community is! Thanks for your help!

2 Likes

Updated to use server rather than orion