Contents
Summary
Using Prefect’s local agent can be a good fit for new users or those who want to run their workflows on vertically scaled machines
This guide will demonstrate how to deploy both 1.0 and 2.0 flows with Prefect Cloud as the backend, using a local agent, storage and runtime.
Video Link
coming soon!
How did we do this in Prefect 1.0?
pip install prefect==1.2.4
Using local storage and runtime
Let’s start with imports:
from prefect.run_configs import LocalRun
from prefect.storage import Local
which we can use in the definition of our Flow
object like:
with Flow(
'Control Flow.. Flow',
run_config=LocalRun(),
storage=Local()
) as flow:
...
… assuming we’ve got some task
definitions in our Flow context (see bottom of post for the code), we’re ready to register our flows!
Register a flow
To register our flow with our Prefect Cloud backend, we can run:
prefect register -p control_flow_1.py --project test
Starting the local agent process
In a cloud-authenticated environment:
prefect agent local start
… which will start an agent listening for flows labeled like my-machine-name.local
Run our newly registered flow
To run our flow from the CLI:
prefect run -n 'Control Flow.. Flow'
To run our flow from the UI we can just click on Quick Run on the flow page
Sanity Check
We can now check the agent logs and/or the flow run logs in the UI to see that our agent did in fact pick up our flow and run it as expected.
How do we do this in Prefect 2.0?
pip install prefect
Using local storage and runtime
By default, flows will use the local Process runtime infrastructure and LocalFileSystem
storage.
Once we’ve converted our Prefect 1.0 code to Prefect 2.0 code (how to), we’re ready to create a deployment and run it!
build
and apply
a deployment
To build a deployment from our python file where the flow lives:
prefect deployment build ./control_flow_2.py:my_flow -n my-first-deployment
… which will create a .yaml
file specifying our deployment, which we can then apply (i.e. deploy):
prefect deployment apply my_flow-deployment.yaml
Creating a work-queue
From the CLI:
prefect work-queue create <my work queue name>
or you can navigate to the work-queue section in the UI.
Starting the local agent process
In a cloud-authenticated environment:
prefect agent start <work-queue UUID>
… which will start an agent listening for flow runs submitted to that work-queue.
Run our new deployment
To run our flow from the CLI:
prefect deployment run 'Control Flow.. Flow'
To run our flow from the UI we can just click on Quick Run on the flow page
Sanity Check
We can now check the agent logs and/or the flow run logs in the UI to see that our agent did in fact pick up our flow and run it as expected.
the code I used
Prefect 1.0
from prefect import case, task, Flow
from prefect.tasks.control_flow import FilterTask
from prefect.run_configs import LocalRun
from prefect.storage import Local
from random import randint
from typing import List
@task
def random_numbers(n: int = 10) -> List[int]:
return [randint(0, 100) for _ in range(n)]
@task
def length_is_odd(something: object) -> bool:
return len(something) % 2 == 1
@task(log_stdout=True)
def conditional_task_if_true():
print('Hmm 🤔 that length was ~ODD~!')
@task(log_stdout=True)
def conditional_task_if_false():
print("We're ~EVEN~!")
even_filter = FilterTask(filter_func=lambda x: x % 2 == 0)
with Flow(
'Control Flow.. Flow',
run_config=LocalRun(),
storage=Local()
) as flow:
my_random_numbers = random_numbers()
my_even_numbers = even_filter(my_random_numbers)
isOdd = length_is_odd(my_even_numbers)
with case(isOdd, True):
conditional_task_if_true()
with case(isOdd, False):
conditional_task_if_false()
if __name__ =="__main__":
flow.run(run_on_schedule=False)
Prefect 2.0
from prefect import task, flow
from random import randint
from typing import List
@task
def random_numbers(n: int = 10) -> List[int]:
return [randint(0, 100) for _ in range(n)]
@task
def length_is_odd(something: object) -> bool:
return len(something) % 2 == 1
@task
def conditional_task_if_true():
print('Hmm 🤔 that length was ~ODD~!')
@task
def conditional_task_if_false():
print("We're ~EVEN~!")
@flow(name='Control Flow.. Flow')
def my_flow():
my_random_numbers = random_numbers()
my_even_numbers = list(filter(lambda x: x % 2 == 0, my_random_numbers))
isOdd = length_is_odd(my_even_numbers)
if isOdd:
conditional_task_if_true()
else:
conditional_task_if_false()
if __name__ =="__main__":
my_flow()