Prefect 2.0
You need to add the wait_for
argument within your @task
decorator:
from prefect import task, flow
@task
def task_1():
pass
@task
def task_2():
pass
@task
def task_3():
pass
@flow(name="flow_with_dependencies")
def main_flow():
t1 = task_1()
t2 = task_2(wait_for=[t1])
t3 = task_3(wait_for=[t2])
Prefect 1.0
To do the same in Prefect 1.0, you can use the upstream_tasks
argument:
from prefect import task, Flow
@task
def task_1():
pass
@task
def task_2():
pass
@task
def task_3():
pass
with Flow("flow_with_dependencies") as flow:
t1 = task_1()
t2 = task_2(upstream_tasks=[t1])
t3 = task_3(upstream_tasks=[t2])