How can I test flows and tasks in Prefect 2.0?

View in #prefect-community on Slack

Jai_P @Jai_P: are there any best practices around testing flows/tasks in prefect 2.0? I see this page but something we’re noticing that testing can be particularly slow on flows, (sometimes taking ~1s to start up a each test) and it appears we always need to wrap tasks inside of a flow to test them

Michael_Adkins @Michael_Adkins: We don’t have recommendations yet, we’re hoping to design a nice testing UX for both tasks and flows. In particular, we’re planning to create a way to test tasks outside of flows. cc @alex

Can you share an example where your test takes a second to start the flow? We’re running thousands of flows in our internal tests and I haven’t seen that.

Jai_P @Jai_P: here’s a trivial example, and the associated output from running pytest

import pytest
from prefect import flow, task
from prefect.utilities.testing import prefect_test_harness
def double_value(value):
    return value * 2
def half_value(value):
    return value / 2
def sum_values(values):
    return sum(values)
def double_plus_half_flow(value: int):
    doubled = double_value(value)
    half = half_value(value, wait_for=[doubled])
    answer = sum_values([doubled, half])
    return answer
def prefect_test_fixture():
    with prefect_test_harness():
@pytest.mark.parametrize("blah", [(i) for i in range(10)])
def test_double_plus_half_flow(blah):
    double_plus_half_flow(100).result().result() == 250


==================================================================================== test session starts =====================================================================================
platform darwin -- Python 3.10.2, pytest-7.1.1, pluggy-1.0.0
rootdir: /path/to/dir
plugins: anyio-3.5.0
collected 10 items

tests/ ..........                                                                                                                                                          [100%]
====== 10 passed in 11.09s =========

Michael_Adkins @Michael_Adkins: Hm interesting this seems to be related to the test harness utility
We run our internal tests with a higher performance lower level reset of the database
If I switch your example to that, it runs in about 3.5 seconds
The test harness we provide creates a temporary directory and new database for each test. You’ll find it much more performant to use that at the session scope then use a separate fixture to delete all the data between tests. We can probably expose this in the near future.

Jai_P @Jai_P: ah yeah, switching it to session scope cut the time in half! i guess there’s a risk with conflicts between tests if i do that? or should i be generally safe because flows shouldn’t really interact between tests
and i guess when you say

> expose this in the near future
you’re talking about the higher performance lower level reset? the session scope is just a pytest change right?

Michael_Adkins @Michael_Adkins: If it’s session scoped, yeah your tests can collide if you’re making assertions about state that requires a clean database. You should be fine since you’re just testing your flows and not asserting things like one call of a flow function results in one flow run in the backend like we are
And yeah we can expose a lower-level faster reset in the future, you can definitely just change the scope of the fixture yourself immediately.

Jai_P @Jai_P: gotcha. i think we may have cases where we want to assert subflows are kicked off but i think if things are a little slower when it comes to that stuff, its ok. we can always just use this as a local workaround and let our CI be a little bit slower until the lower level faster reset is available.

is there anywhere i may be able to track the progress/availability of that? also thanks so much for responding so quickly!

Michael_Adkins @Michael_Adkins: You can still make assertions about the subflows by returning their states and querying for the associated flow run ids. That’s exactly the kind of thing we want to make a great UX for, like my_flow.test(...) returns a TestResult object that gives you full introspection of all of the task and flow runs that it created, their states, number of retries, return values, etc. so you can make the assertions you want.

Jai_P @Jai_P: ohhh that type of UX would be epic, definitely looking forward to that being rolled out! Also thanks for the issue link! i’ll be sure to record it on our side so we can keep an eye on it. thanks so much and have a good one!

Existing tests that can be used as examples:

You can also check out the tests used in our collections, such as prefect-slack:

Similarly, to call tasks out of context, you can use the .fn() method on tasks like so:

import pytest

from my_prefect_collection.tasks import my_task

async def test_successful_task(mock_successful_calls):
    my_task_result = await my_task.fn()

    assert type(trigger_sync_result) is whatever