What are best practices to build unit tests and integration tests for Prefect flows and for data pipelines in general?


  • unit tests make sense for testing tasks
  • unit tests make less sense for testing the DAG - DAG is a sort of integration test for a data pipeline already and Prefect may be used as a testing framework
  • testing a flow by feeding small test data in a sandbox environment seems like the best approach for an end-to-end flow test

View in #best-practices-orchestration on Slack

Edmondo_Porcu @Edmondo_Porcu: Quick question about Prefect. I have run a

with Flow(''') as flow:

in the body of the script. However, since there are parameters that set from env variable in the main like so, that was causing an exception

if __name__ == '__main__':
    flow  = build_flow()
    flow.executor = LocalDaskExecutor()
    project_name = os.environ['PROJECT_NAME']
    spark_version = os.environ['SPARK_VERSION']
    github_repo = os.environ['GITHUB_REPO']
    git_ref = os.environ['GIT_REF']

and now I have wrapped my flow definition in a function. Is that a reasonable thing to do?

Kevin_Kho @Kevin_Kho: Yes it is an a lot of people use this set-up to deploy register one flow multiple times with different parameters/configurations. You can have a flow factory that creates variations on a Flow

Edmondo_Porcu @Edmondo_Porcu: unfortunately, surprising, that code fails
Ok I am a bad person. I removed a @task and it broked it all

Kevin_Kho @Kevin_Kho: Lol no worries. You good now?

Edmondo_Porcu @Edmondo_Porcu: Yes. Need to check if the flow is successful, it should be in the docs
Is there any easy way? It seems flow.run returns a collection of states for each task

Kevin_Kho @Kevin_Kho: Like this ? Check testing state

Testing Prefect flows and tasks | Prefect Docs

Edmondo_Porcu @Edmondo_Porcu: Great.
• Is there a way to mock the result of a task to test the flow logic?
• If I use a task object within a function annotated with task. should I invoke the run method ?

Kevin_Kho @Kevin_Kho: You could use the run method to test yeah. Would you still need to mock the result if you use that?

Edmondo_Porcu @Edmondo_Porcu: I have updated my question

Kevin_Kho @Kevin_Kho: Yes to the second, but it’s not a task anymore. It’s just normal Python

Edmondo_Porcu @Edmondo_Porcu: This is still confusing, how to deal with tasks. Is it changing with Orion ? Is the run automatically invoked for all tasks created within a flow context manager ?
For the first question I am sure you understand there is value in testing complex workflows logic without executing the real tasks

Kevin_Kho @Kevin_Kho: Yes the run is invoked for all tasks in the Flow context manager
You could see the tasks library tests for some examples of how to mock the output when calling task.run()

GitHub: prefect/test_snowflake.py at master · PrefectHQ/prefect

Actually the aws tests might be nicer

GitHub: prefect/test_s3.py at master · PrefectHQ/prefect

In Orion, there is no task.run() yet. It might be task.fn()

Edmondo_Porcu @Edmondo_Porcu: This is a test of the tasks, what about the test of the flow?

Kevin_Kho @Kevin_Kho: The flow doesn’t have a return? What kind of logic are you trying to test?

Edmondo_Porcu @Edmondo_Porcu: Take these two flows:
• A task returns a number, a second task is executed N times where N = return value of first task
• Branching logic

Kevin_Kho @Kevin_Kho: Ok will ask the team for how to test stuff like that
May be Monday before I get a response

Anna_Geller @Anna_Geller: @Edmondo_Porcu what behavior exactly would you want to test? ignoring Prefect for a moment, how would you test that just using Python?

Edmondo_Porcu @Edmondo_Porcu:

def func1()
    perform complex sql query on google big table

def func2(input):

def func3(input):

def func4(input):

def flow():
    res1 = func1()
    res2 = [ func2(res) for res in res1]
    if res2 > 10:

Anna_Geller @Anna_Geller: so this is a flow graph representing the business logic, what would you want to test? perhaps testing data with a data validation test would be more helpful than a unit test given the big table query?

Edmondo_Porcu @Edmondo_Porcu: How do you test this? I can think of two ways:
• Dependency injection (making func1,func2,func3,func4) input arguments of the flow function so you can pass mock implementation
• using MagicMock to override local module members

Anna_Geller @Anna_Geller: I want to find out what would you want to test here before we try to figure out how to test it

Edmondo_Porcu @Edmondo_Porcu: The workflow logic: loops and conditionals. Probably also the DAG itself

Anna_Geller @Anna_Geller: I see, I think testing data pipelines is tricky in general because it requires testing both:
• code (business logic)
• data
so it may be actually more helpful to test tasks themselves to test the logic e.g. when you do some data transformations, test it with some tiny data to ensure that your transformation functions/tasks do what you intended them to do

and then testing the data itself with something like Pandera, GE or sth similar
for the workflow logic, unit testing seems a bit less helpful? because building a unit test for each flow is a kind of negative engineering itself that Prefect tries to eliminate - you use Prefect so that you don’t need to test every single DAG, because Prefect will ensure that state transition happens as you intended without you having to worry about it and write unit tests to ensure that a workflow transitions between states the way you designed it
for Prefect building blocks such as tasks or conditional logic, Prefect has already extensive suite of unit tests so that you can rely on those, or use those as examples if you build custom components extending those abstractions - you can check the tests in the prefect repo, as Kevin recommended, but it’s definitely worth looking at what would be your end goal with those tests
for testing conditional logic, if you choose to do it, you could do sth as in the tests Kevin referenced:

    @pytest.mark.parametrize("branch", ["a", "b", "c"])
    def test_case_execution(self, branch):
        with Flow("test") as flow:
            cond = identity(branch)
            with case(cond, "a"):
                a = identity(1)
                b = inc(a)

            with case(cond, "b"):
                c = identity(3)
                d = inc(c)

            e = merge(b, d)

        state = flow.run()

        if branch == "a":
            assert state.result[a].result == 1
            assert state.result[b].result == 2
            assert state.result[c].is_skipped()
            assert state.result[d].is_skipped()
            assert state.result[e].result == 2
        elif branch == "b":
            assert state.result[a].is_skipped()
            assert state.result[b].is_skipped()
            assert state.result[c].result == 3
            assert state.result[d].result == 4
            assert state.result[e].result == 4
        elif branch == "c":
            for t in [a, b, c, d, e]:
                assert state.result[t].is_skipped()


GitHub: prefect/test_control_flow.py at master · PrefectHQ/prefect

Edmondo_Porcu @Edmondo_Porcu: I think the idea that you need to run a DAG that can takes 4 hours to check if you are combining tasks correctly scares me

Anna_Geller @Anna_Geller: to check if you are combining tasks correctly, you could do:


I think this is not a problem related to Prefect but to data engineering workflows in general - it’s hard because it’s more than just testing whether your code works, you need to test whether the business logic is correct and whether your data is correct, and the data volume these days doesn’t make it any easier

Edmondo_Porcu @Edmondo_Porcu: that’s why being able to mock the tasks is critical

Anna_Geller @Anna_Geller: I 100% agree about testing the tasks - but testing the DAG is negative engineering IMO

Edmondo_Porcu @Edmondo_Porcu: Why? Testing something that can be wrong is negative engineering?
If you are using the DAG to combine tasks that helps you predict diseases and will trigger billion of dollars of investments, you’d rather want to be reassured that the DAG to have the right logic :smiley:

Anna_Geller @Anna_Geller: I definitely understand your sentiment and know what you mean - the only thing is that you are using Prefect so that you don’t have to think about testing your DAG. As long as you:
• test your business logic/your tasks
• define the order of execution through state and data dependencies
Prefect will ensure that the tasks in your flow will run in the right order and that if something goes wrong in between, your flow will fail. And you can define what to do when any task in the flow fails, and when to consider the flow run as successful.

So if you use Prefect, you only need to test your tasks and define the orchestration logic in your flow and Prefect will ensure that it works the way you designed it.

Of course, nobody will probably complain about having more tests (?) so you can do that, but you need to decide whether you want to do it. I wouldn’t - I would much rather spend this time building data validation tests, this would give me more confidence in my data than testing DAGs and branching logic

Zach_Schumacher @Zach_Schumacher: Testing a flow is an integration test, not a unit test. If you’re trying to test that composing a flow works as you expect, you are just reimplementing tests the framework (prefect) you’re using already tests

Anna_Geller @Anna_Geller: thanks Zach, I guess I had the same intuition as you but you put it really well in words :smile: it’s indeed an integration test!

Zach_Schumacher @Zach_Schumacher: when editing/adding flows, we usually ask the engineer to put the results of flow.visualize in the PR

Anna_Geller @Anna_Geller: that’s a good idea to confirm that the data flow is built the right way :thumbsup:

Edmondo_Porcu @Edmondo_Porcu: @Zach_Schumacher isn’t a human reviewing the result of flow.visualize is a manual human test of the flow logic?
I am just saying that the review should be expressed in code:
• are task dependencies the one we expect?
• are conditional leading to the right code path?
• are loops exercised correctly?
This can all be unit tested, or if you are very strict about the meaning of unit, this can tested in isolation, if you mock the tasks. If you have real tasks, it’s an integration test.
The argument that the flow logic should not be tested because it’s tested by the unit tests of the framework it’s equivalent to say that one cannot use a correct framework in an incorrect manner. It’s like saying that you cannot write a flow that’s wrong with respect to requirements, because the framework is right. I disagree:
• The flow can have the wrong flow logic, and it is desirable to verify it in isolation. Because it’s fast and quick.
• This does not replace integration testing, but it provides a “fast and cheap” partial verification of correctness, which is valuable because it’s fast and cheap.
As we adopted infrastructure as code, the need to test infrastructure as code emerged. https://docs.microsoft.com/en-us/azure/developer/terraform/best-practices-testing-overview

Testing Terraform code

@Chris_White what do you think?

Zach_Schumacher @Zach_Schumacher: Definitely not equivalent and pointing to terraform is not a good example at all.

A flow is just glue.

Edmondo_Porcu @Edmondo_Porcu: which cannot be wrongly glueing tasks?
anything that’s non trivial can be wrong

Alvaro_Durán_Tovar @Alvaro_Durán_Tovar: sure, it depends on the use case / context
you speaking about integration tests because that’s what the flows does basically
I’m not managing billions so I’m happy with not bothering with using a staging bigquery database to ensure my flow is writing the correct content to the correct table etc

Although I can test intermediate tasks (unit test) to make me feel safe

Edmondo_Porcu @Edmondo_Porcu: Why do you need to run the “real task” to verify loops, conditional and task dependencies?

Alvaro_Durán_Tovar @Alvaro_Durán_Tovar: probably there are ways to run the whole flow with mocks, but then you aren’t testing the final flow, testing the flow (or any other integration test) isn’t always an option or the complexity is just too high

Edmondo_Porcu @Edmondo_Porcu: @Alvaro_Durán_Tovar you are right, they are two complementary techniques.

Anna_Geller @Anna_Geller: @Edmondo_Porcu why did you tag Prefect’s CTO? Chris is not responsible for answering questions in the community. If you have some emergency situation, you can tag me

I’m not sure if you’ve seen our Code of Conduct, but we have a rule that you should not tag other users - we always respond even if it takes some time. If you need SLA-based support, you may contact our Professional Services

Prefect Community: FAQ - Prefect Community

Prefect Community: How to get in touch with Prefect Professional Services

Edmondo_Porcu @Edmondo_Porcu: Thank you @Anna_Geller, I didn’t know that. Apologies

Anna_Geller @Anna_Geller: no worries, just for the next time

Edmondo_Porcu @Edmondo_Porcu: @Alvaro_Durán_Tovar we have tasks that can run for hours or days, so running the real task is not an option, if you see what I mean. You basically have code that builds a DAG, and then the DAG is going to be executed. You want to test separately:
• does the DAG looks like what is supposed to ? unit test
• does the DAG execution produce the right result? integration test
The second one is much more powerful, expensive, and complicated. You might have a single integration environment, or executing certain tasks cannot always be possible.

def build_flow(p1,p2,p3): Flow

def test_flow_1():
flow = build_flow(2,3,4)
assert flow.n_tasks = 72
By the way, our average task cost 15k$ as it is a multi-thousands of hours big data job. It takes anywhere from 2 days to a week of computing even on large scale HPC. I cannot spend 15k$ every time I change a line of code in the flow, I hope you understand that :smile:

Alvaro_Durán_Tovar @Alvaro_Durán_Tovar: I don’t know what’s the deal here, I just like random flame wars lol

can’t you set an env var, say ENVIRONMENT=test then modify the flow execution to mock certain parts?

Cole_Murray @Cole_Murray: @Edmondo_Porcu You can provide test data and validate your flow is running correctly. In our case, we have very long-running jobs which we provide a test set of data to process as part of our integration tests.

Through our CI/CD pipeline, any new flows get automatically tested against test data, assuming pass then promoted into production

Edmondo_Porcu @Edmondo_Porcu: @Alvaro_Durán_Tovar that’s right! That’s exactly what I think should be done.
@Cole_Murray do you use unittest.MagicMock to replace real tasks with mocked tasks?\

Alvaro_Durán_Tovar @Alvaro_Durán_Tovar: what you should do you mean? @Edmondo_Porcu

Cole_Murray @Cole_Murray: No i mean:

def myDataDownloadTask(data_url):

and passing in parameterized data url that is different when running tests

Alvaro_Durán_Tovar @Alvaro_Durán_Tovar: Here you have a random example with mocks, testing tasks, etc, pretty sure prefect has many more better than this on the github repo

Nothing stops you from calling flow.run()
to run the whole flow
But then the intermediate code probably will get pretty complex

Edmondo_Porcu @Edmondo_Porcu: @Alvaro_Durán_Tovar that was what I was looking for. Mocking the tasks to run the flow
Using monkey patch. That’s what I was expecting as well! Somehow the discussion here led to “why would you ever do something like this?”

Cole_Murray @Cole_Murray: @Edmondo_Porcu, given that the cost of failure is $15k per run, I personally would not stake correctness on monkey patching & mocked tasks.

For me, running the real flow in a staging environment with a sample data will provide you near 1-1 (aside from scaling constraints on memory/cpu) of your actual task flow.

As an example of risk of monkey patching / mocking:

def my_missing_dependency(a):
  import pandas as pd
  df = pd.read_csv(a)

Your flow linkage may be correct, your unit tests on the tasks may be correct (depending on your test env, this may pass)
but this will still fail at runtime and take the $15k with it

Edmondo_Porcu @Edmondo_Porcu: @Cole_Murray I don’t see unit testing and integration testing as alternatives, I think they are both needed

Alvaro_Durán_Tovar @Alvaro_Durán_Tovar: When possible I like to cover most of the code with few integration tests, then the more complex things with unit tests

Anyway, we are doing something like this you can try

def flow_name(name: Text) -> Text:
    return f"{name}-{get_environment()}"

with Flow(flow_name(FLOW)) as flow:
    flow code...

That is flow name is changed to reflect it is configured for a specific environment (prod, qa, etc).

Then configure flow appropriately (qa DB, qa buckets, etc) having a less data to crunch when executing the flow as mentioned by @Cole_Murray

Then trigger the flow and make assertions after finishes