Migration: Using a local agent and runtime in Prefect 1.0 and 2.0

Contents

Summary

Using Prefect’s local agent can be a good fit for new users or those who want to run their workflows on vertically scaled machines

This guide will demonstrate how to deploy both 1.0 and 2.0 flows with Prefect Cloud as the backend, using a local agent, storage and runtime.

Video Link

coming soon!

How did we do this in Prefect 1.0?

pip install prefect==1.2.4

Using local storage and runtime

Let’s start with imports:

from prefect.run_configs import LocalRun
from prefect.storage import Local

which we can use in the definition of our Flow object like:

with Flow(
    'Control Flow.. Flow',
    run_config=LocalRun(),
    storage=Local()
) as flow:
...

… assuming we’ve got some task definitions in our Flow context (see bottom of post for the code), we’re ready to register our flows!

Register a flow

To register our flow with our Prefect Cloud backend, we can run:

prefect register -p control_flow_1.py --project test

Starting the local agent process

In a cloud-authenticated environment:

prefect agent local start

… which will start an agent listening for flows labeled like my-machine-name.local

Run our newly registered flow

To run our flow from the CLI:

prefect run -n 'Control Flow.. Flow'

To run our flow from the UI we can just click on Quick Run on the flow page

Sanity Check

We can now check the agent logs and/or the flow run logs in the UI to see that our agent did in fact pick up our flow and run it as expected.

How do we do this in Prefect 2.0?

pip install prefect

Using local storage and runtime

By default, flows will use the local Process runtime infrastructure and LocalFileSystem storage.

Once we’ve converted our Prefect 1.0 code to Prefect 2.0 code (how to), we’re ready to create a deployment and run it!

build and apply a deployment

To build a deployment from our python file where the flow lives:

prefect deployment build ./control_flow_2.py:my_flow -n my-first-deployment

… which will create a .yaml file specifying our deployment, which we can then apply (i.e. deploy):

prefect deployment apply my_flow-deployment.yaml

Creating a work-queue

From the CLI:

prefect work-queue create <my work queue name>

or you can navigate to the work-queue section in the UI.

Starting the local agent process

In a cloud-authenticated environment:

prefect agent start <work-queue UUID>

… which will start an agent listening for flow runs submitted to that work-queue.

Run our new deployment

To run our flow from the CLI:

prefect deployment run 'Control Flow.. Flow'

To run our flow from the UI we can just click on Quick Run on the flow page

Sanity Check

We can now check the agent logs and/or the flow run logs in the UI to see that our agent did in fact pick up our flow and run it as expected.

the code I used

Prefect 1.0

from prefect import case, task, Flow
from prefect.tasks.control_flow import FilterTask
from prefect.run_configs import LocalRun
from prefect.storage import Local
from random import randint
from typing import List

@task
def random_numbers(n: int = 10) -> List[int]:
    return [randint(0, 100) for _ in range(n)]

@task
def length_is_odd(something: object) -> bool:
    return len(something) % 2 == 1

@task(log_stdout=True)
def conditional_task_if_true():
    print('Hmm 🤔 that length was ~ODD~!')
    
@task(log_stdout=True)
def conditional_task_if_false():
    print("We're ~EVEN~!")

even_filter = FilterTask(filter_func=lambda x: x % 2 == 0)


with Flow(
    'Control Flow.. Flow',
    run_config=LocalRun(),
    storage=Local()
) as flow:
    my_random_numbers = random_numbers()
    my_even_numbers = even_filter(my_random_numbers)
    
    isOdd = length_is_odd(my_even_numbers)
    
    with case(isOdd, True):
        conditional_task_if_true()
        
    with case(isOdd, False):
        conditional_task_if_false()

if __name__ =="__main__":
    flow.run(run_on_schedule=False)

Prefect 2.0

from prefect import task, flow
from random import randint
from typing import List

@task
def random_numbers(n: int = 10) -> List[int]:
    return [randint(0, 100) for _ in range(n)]

@task
def length_is_odd(something: object) -> bool:
    return len(something) % 2 == 1

@task
def conditional_task_if_true():
    print('Hmm 🤔 that length was ~ODD~!')
    
@task
def conditional_task_if_false():
    print("We're ~EVEN~!")

@flow(name='Control Flow.. Flow')
def my_flow():
    my_random_numbers = random_numbers()
    my_even_numbers = list(filter(lambda x: x % 2 == 0, my_random_numbers))
    
    isOdd = length_is_odd(my_even_numbers)
    
    if isOdd:
        conditional_task_if_true()
    else:
        conditional_task_if_false()

if __name__ =="__main__":
    my_flow()

3 Likes