This Discourse topic collects best practices and guidance around testing your dataflow.
https://discourse.prefect.io/t/how-to-disable-prefect-logger-for-unit-tests/1968
This Discourse topic collects best practices and guidance around testing your dataflow.
https://discourse.prefect.io/t/how-to-disable-prefect-logger-for-unit-tests/1968
I really like these prompts; thanks for setting them up!
To answer “Unit testing tasks,” this is how I typically do it:
module.py
from prefect import task
@task
def divide_task(x, y):
return x / y
tests/test_module.py
import pytest
from module import divide_task
def test_divide_task():
assert divide_task.fn(1, 2) == 0.5 # fn calls underlying function
def test_divide_task_error():
with pytest.raises(ZeroDivisionError):
divide_task.fn(1, 0)
Then run in the command line.
pytest .
There’s another way of writing it, but I think this might be too verbose.
import pytest
from prefect import flow
from module import divide_task
def test_divide_task():
@flow
def test_flow():
return divide_task(1, 2)
task_state = test_flow().result()
actual = task_state.result()
assert actual == 0.5
Also, there’s docs here Testing - Prefect 2.0
You are amazing, I didn’t even plan to share it until Tuesday (until then I plan to refine this content plan even more) and you are already contributing
Prefect 2.0 makes it easier than ever to test your flows, tasks, and subflows!
Coming soon.
Testing your Prefect flows, subflows, and tasks helps you identify and remove any errors from your code. It is paramount to test prior to merging or deploying your code to a higher environment.
In Prefect 1.0, we needed to define our flow with a context manager. A flow required .run()
to be callable.
"""Test a flow in Prefect 1.0"""
from prefect import Flow, task
@task
def my_task():
return 42
with Flow('my flow') as my_flow:
first_task = my_task()
# test a flow
def test_my_flow():
# check the state of the flow for success
state = my_flow.run()
assert state.is_successful()
if __name__ == "__main__":
test_my_flow()
In Prefect 2.0, we replace the context manager with a flow decorator: @flow
. Flows are directly callable so we don’t need .run()
.
We also have the option to use a context manager, prefect_test_harness
, to run flows and tasks against a local SQLite database. If you want to use prefect_test_harness
in multiple tests, you can use pytest.fixture
and scope it to session
to ensure efficient testing.
Learn more about pytest.fixture
with prefect_test_harness
.
In Prefect 1.0, in order to test a “flow of flows” locally, we would need to register the subflow and then use create_flow_run
, specifying name
, id
and/or project
. This makes testing more intricate.
In Prefect 2.0, testing a subflow is as easy as calling it.
"""Test a subflow in Prefect 2.0"""
from prefect import flow, task
@task
def subflow_task(nbr):
return nbr * 2
@flow
def subflow(nbr):
subflow_task(nbr)
@flow
def outer_flow():
subflow()
# test a subflow
def test_subflow(nbr):
subflow(nbr)
# test a subflow task
def test_subflow_task():
assert subflow_task.fn(25) == 50
if __name__ == "__main__":
test_subflow(2)
test_subflow_task()
We can even use .fn()
on tasks to test individual tasks.
In Prefect 1.0, you would use .run()
to call the task. Alternatively, you might have used TaskRunner to track the state and result.
"""Test a flow in Prefect 1.0"""
from prefect import Flow, task
@task
def my_task():
return 42
with Flow('my flow') as my_flow:
first_task = my_task()
# test a flow
def test_my_flow():
# check the state of the flow for success
state = my_flow.run()
assert state.is_successful()
if __name__ == "__main__":
test_my_flow()
In Prefect 2.0, a task is callable so you don’t need .run()
or a flow to test a task.
"""Test individual tasks with Prefect 2.0. Can also use task.fn()"""
from prefect import flow, task
from pytest import raises
@task
def my_task():
return 42
def test_my_task():
assert my_task.fn() == 42
def test_my_task_fails():
with raises(AssertionError):
assert my_task.fn() == 45
if __name__ == "__main__":
test_my_task()
test_my_task_fails()
With the huge improvements in 2.0, it’s easier than ever to create rigorous testing while reaping the vast benefits Prefect offers. Happy engineering!
@anna_geller
We could either:
What about when our flows/tasks use prefect’s own logger by get_run_logger
and we still want to only test the specific function without having to create a flow run for it? The .fn() seems to not work in that case as we get error:
RuntimeError: There is no active flow or task run context.
I asked the team how best to approach it. You’re right that if your tasks or flows use a Prefect logger, running only the relevant function without flow run or task run context will fail.
The answer I got so far:
logging.getLogger("prefect").enabled = False
or PREFECT_LOGGING_LEVEL=ERROR
prefect.logging.setup_logging
is called.There’s now a contextmanager to disable logging and bypass RuntimeError: There is no active flow or task run context.
.
from prefect.logging import disable_run_logger
with disable_run_logger():
a_task.fn()