Unit testing best practices for Prefect flows, subflows and tasks

This Discourse topic collects best practices and guidance around testing your dataflow.

I really like these prompts; thanks for setting them up!

To answer “Unit testing tasks,” this is how I typically do it:

module.py

from prefect import task

@task
def divide_task(x, y):
    return x / y

tests/test_module.py

import pytest
from module import divide_task

def test_divide_task():
    assert divide_task.fn(1, 2) == 0.5  # fn calls underlying function


def test_divide_task_error():
    with pytest.raises(ZeroDivisionError):
        divide_task.fn(1, 0)

Then run in the command line.
pytest .

There’s another way of writing it, but I think this might be too verbose.

import pytest
from prefect import flow
from module import divide_task

def test_divide_task():
    @flow
    def test_flow():
        return divide_task(1, 2)
    task_state = test_flow().result()
    actual = task_state.result()
    assert actual == 0.5 

Also, there’s docs here Testing - Prefect 2.0

1 Like

You are amazing, I didn’t even plan to share it until Tuesday (until then I plan to refine this content plan even more) and you are already contributing :pray:

1 Like

Summary

Prefect 2.0 makes it easier than ever to test your flows, tasks, and subflows!

Video link

Coming soon.

Audience

Testing your Prefect flows, subflows, and tasks helps you identify and remove any errors from your code. It is paramount to test prior to merging or deploying your code to a higher environment.

What is Changing

  • Testing is easier: subflows, for example, no longer require registry prior to testing.
  • The flow decorator (@flow) means your code looks identical to Python code, making
    it simpler than ever to create more robust test cases.

How to Convert from 1.0 to 2.0

Flows in 1.0

In Prefect 1.0, we needed to define our flow with a context manager. A flow required .run() to be callable.

"""Test a flow in Prefect 1.0"""
from prefect import Flow, task

@task
def my_task():
    return 42

with Flow('my flow') as my_flow:
    first_task = my_task()

# test a flow
def test_my_flow():
    # check the state of the flow for success
    state = my_flow.run()
    assert state.is_successful()

if __name__ == "__main__":
    test_my_flow()

Flows in 2.0

In Prefect 2.0, we replace the context manager with a flow decorator: @flow. Flows are directly callable so we don’t need .run().

We also have the option to use a context manager, prefect_test_harness, to run flows and tasks against a local SQLite database. If you want to use prefect_test_harness in multiple tests, you can use pytest.fixture and scope it to session to ensure efficient testing.

Learn more about pytest.fixture with prefect_test_harness.

Subflows in 1.0

In Prefect 1.0, in order to test a “flow of flows” locally, we would need to register the subflow and then use create_flow_run, specifying name, id and/or project. This makes testing more intricate.

Subflows in 2.0

In Prefect 2.0, testing a subflow is as easy as calling it.

"""Test a subflow in Prefect 2.0"""
from prefect import flow, task


@task
def subflow_task(nbr):
    return nbr * 2

@flow
def subflow(nbr):
    subflow_task(nbr)

@flow
def outer_flow():
    subflow()

# test a subflow
def test_subflow(nbr):
    subflow(nbr)

# test a subflow task
def test_subflow_task():
    assert subflow_task.fn(25) == 50

if __name__ == "__main__":
    test_subflow(2)
    test_subflow_task()

We can even use .fn() on tasks to test individual tasks.

Tasks in 1.0

In Prefect 1.0, you would use .run() to call the task. Alternatively, you might have used TaskRunner to track the state and result.

"""Test a flow in Prefect 1.0"""
from prefect import Flow, task

@task
def my_task():
    return 42

with Flow('my flow') as my_flow:
    first_task = my_task()

# test a flow
def test_my_flow():
    # check the state of the flow for success
    state = my_flow.run()
    assert state.is_successful()

if __name__ == "__main__":
    test_my_flow()

Tasks in 2.0

In Prefect 2.0, a task is callable so you don’t need .run() or a flow to test a task.

"""Test individual tasks with Prefect 2.0. Can also use task.fn()"""
from prefect import flow, task
from pytest import raises

@task
def my_task():
    return 42

def test_my_task():
    assert my_task.fn() == 42

def test_my_task_fails():
    with raises(AssertionError):
        assert my_task.fn() == 45

if __name__ == "__main__":
    test_my_task()
    test_my_task_fails()

With the huge improvements in 2.0, it’s easier than ever to create rigorous testing while reaping the vast benefits Prefect offers. Happy engineering!

3 Likes

@anna_geller
We could either:

  1. Remove the flow import, as this is just testing an individual task to show that it can be done. This shows improvements to the developer experience during testing.
  2. Add flow decorators to the tests if we wanted visibility into them. This shows improvements in visibility of tests.
1 Like

What about when our flows/tasks use prefect’s own logger by get_run_logger and we still want to only test the specific function without having to create a flow run for it? The .fn() seems to not work in that case as we get error:

RuntimeError: There is no active flow or task run context.

I asked the team how best to approach it. You’re right that if your tasks or flows use a Prefect logger, running only the relevant function without flow run or task run context will fail.

The answer I got so far:

  • You can disable the Python logger e.g. logging.getLogger("prefect").enabled = False or PREFECT_LOGGING_LEVEL=ERROR
  • If you want the logger temporarily disabled for some tests, you can write a fixture that sets and unsets the enabled attribute. It must be set after prefect.logging.setup_logging is called.

There’s now a contextmanager to disable logging and bypass RuntimeError: There is no active flow or task run context..

from prefect.logging import disable_run_logger

with disable_run_logger():
    a_task.fn()
2 Likes