Once you establish a dependency between tasks that pass data between each other, you no longer have to define upstream and downstream dependencies between those - Prefect will infer those through data dependencies.
There are two ways to define such dependencies:
- Initialize the tasks and then pass data dependencies within the Flow constructor
- Not using Flow constructor and pass data dependencies within
keyword_tasks
.
Example #1:
from prefect import Task, Flow
class GetData(Task):
def run(self):
return 1
class PlusOneTask(Task):
def run(self, x):
return x + 1
data = GetData()
plus_one = PlusOneTask()
with Flow("imperative_api_example") as flow:
x = data()
plus_one(x)
Example #2:
from prefect import Task, Flow
class GetData(Task):
def run(self):
return 1
class PlusOneTask(Task):
def run(self, x):
return x + 1
data = GetData()
plus_one = PlusOneTask()
flow = Flow("imp")
flow.set_dependencies(task=plus_one, keyword_tasks=dict(item=data))