How do I include both data and state dependences in a Prefect flow?

Question asked via StackOverflow

This seems like it should be simple, but i can’t figure out how to include both state and data dependencies in a single flow. Here is what i attempted (simplified):

    with Flow("load_data") as flow:
        test_results = prepare_file1()
        load_file1(test_results)

        participants = prepare_file2()
        load_file2(participants)

    email = flow.add_task(EmailTask(name='email', subject='Flow succeeded!', msg='flow succeeded', email_to='xxx', email_from='xxx', smtp_server='xxx',smtp_port=25, smtp_type='INSECURE',))
    flow.set_dependencies(task=email, upstream_tasks=[load_file1,load_file2])

    flow.visualize()

I get the following graph: enter image description here

Which means that load_file1 and load_file2 runs twice. I can i just set up an additional dependency so that email runs when the two load tasks finish?

Thanks

Answer

The issue is how you add the task to your Flow. When using tasks from the Prefect task library, it’s best to first initialize those and then call those in your Flow as follows:

send_email = EmailTask(name='email', subject='Flow succeeded!', msg='flow succeeded', email_to='xxx', email_from='xxx', smtp_server='xxx', smtp_port=25, smtp_type='INSECURE')


with Flow("load_data") as flow:
    send_email()

Or alternatively, do it in one step with double round brackets EmailTask(init_kwargs)(run_kwargs) . The first pair of brackets will initialize the task and the second one will call the task by invoking the task’s .run() method.

with Flow("load_data") as flow:
    EmailTask(name='email', subject='Flow succeeded!', msg='flow succeeded', email_to='xxx', email_from='xxx', smtp_server='xxx', smtp_port=25, smtp_type='INSECURE')()

The full flow example could look as follows:

from prefect import task, Flow
from prefect.tasks.notifications import EmailTask
from prefect.triggers import always_run


@task(log_stdout=True)
def prepare_file1():
    print("File1 prepared!")
    return "file1"


@task(log_stdout=True)
def prepare_file2():
    print("File2 prepared!")
    return "file2"


@task(log_stdout=True)
def load_file1(file: str):
    print(f"{file} loaded!")


@task(log_stdout=True)
def load_file2(file: str):
    print(f"{file} loaded!")


send_email = EmailTask(
    name="email",
    subject="Flow succeeded!",
    msg="flow succeeded",
    email_to="xxx",
    email_from="xxx",
    smtp_server="xxx",
    smtp_port=25,
    smtp_type="INSECURE",
    trigger=always_run,
)


with Flow("load_data") as flow:
    test_results = prepare_file1()
    load1_task = load_file1(test_results)

    participants = prepare_file2()
    load2_task = load_file2(participants)
    send_email(upstream_tasks=[load1_task, load2_task])


if __name__ == "__main__":
    flow.visualize()

Source