Unit testing best practices for Prefect flows, subflows and tasks

Summary

Prefect 2.0 makes it easier than ever to test your flows, tasks, and subflows!

Video link

Coming soon.

Audience

Testing your Prefect flows, subflows, and tasks helps you identify and remove any errors from your code. It is paramount to test prior to merging or deploying your code to a higher environment.

What is Changing

  • Testing is easier: subflows, for example, no longer require registry prior to testing.
  • The flow decorator (@flow) means your code looks identical to Python code, making
    it simpler than ever to create more robust test cases.

How to Convert from 1.0 to 2.0

Flows in 1.0

In Prefect 1.0, we needed to define our flow with a context manager. A flow required .run() to be callable.

"""Test a flow in Prefect 1.0"""
from prefect import Flow, task

@task
def my_task():
    return 42

with Flow('my flow') as my_flow:
    first_task = my_task()

# test a flow
def test_my_flow():
    # check the state of the flow for success
    state = my_flow.run()
    assert state.is_successful()

if __name__ == "__main__":
    test_my_flow()

Flows in 2.0

In Prefect 2.0, we replace the context manager with a flow decorator: @flow. Flows are directly callable so we don’t need .run().

We also have the option to use a context manager, prefect_test_harness, to run flows and tasks against a local SQLite database. If you want to use prefect_test_harness in multiple tests, you can use pytest.fixture and scope it to session to ensure efficient testing.

Learn more about pytest.fixture with prefect_test_harness.

Subflows in 1.0

In Prefect 1.0, in order to test a “flow of flows” locally, we would need to register the subflow and then use create_flow_run, specifying name, id and/or project. This makes testing more intricate.

Subflows in 2.0

In Prefect 2.0, testing a subflow is as easy as calling it.

"""Test a subflow in Prefect 2.0"""
from prefect import flow, task


@task
def subflow_task(nbr):
    return nbr * 2

@flow
def subflow(nbr):
    subflow_task(nbr)

@flow
def outer_flow():
    subflow()

# test a subflow
def test_subflow(nbr):
    subflow(nbr)

# test a subflow task
def test_subflow_task():
    assert subflow_task.fn(25) == 50

if __name__ == "__main__":
    test_subflow(2)
    test_subflow_task()

We can even use .fn() on tasks to test individual tasks.

Tasks in 1.0

In Prefect 1.0, you would use .run() to call the task. Alternatively, you might have used TaskRunner to track the state and result.

"""Test a flow in Prefect 1.0"""
from prefect import Flow, task

@task
def my_task():
    return 42

with Flow('my flow') as my_flow:
    first_task = my_task()

# test a flow
def test_my_flow():
    # check the state of the flow for success
    state = my_flow.run()
    assert state.is_successful()

if __name__ == "__main__":
    test_my_flow()

Tasks in 2.0

In Prefect 2.0, a task is callable so you don’t need .run() or a flow to test a task.

"""Test individual tasks with Prefect 2.0. Can also use task.fn()"""
from prefect import flow, task
from pytest import raises

@task
def my_task():
    return 42

def test_my_task():
    assert my_task.fn() == 42

def test_my_task_fails():
    with raises(AssertionError):
        assert my_task.fn() == 45

if __name__ == "__main__":
    test_my_task()
    test_my_task_fails()

With the huge improvements in 2.0, it’s easier than ever to create rigorous testing while reaping the vast benefits Prefect offers. Happy engineering!

3 Likes