How to define DAG like flow in Prefect 2.0

Hello,
I’am switching from Prefect 1 to 2 one.
And I’m blocked with the DAG free conception.

One part of my flow is predefined rigid DAG.
How can I achieve “dag behavior” in Prefect 2?
Toy example of my graph:

graph = { "feature_1" : [ ], "feature_2" : [ ], "feature_3" : ["feature_1", "feature_2"], "feature_4" : ["feature_3"], "feature_5" : ["feature_4"], }

In reality it’s more than 50 features, and we won’t to write declarative every dependency in flow, because we have already the precomputed DAG.

So how can I “explain” that in the first time we can run in parallel feature_1 and feature_2, but not the others one.
We can run feature_3, only when feature_1 and feature_2 was successfully executed, and feature_3 takes like input parameters the results of feature_1 and feature_2 and so on.

P.S. In production it should be run with DaskTaskRunner. Could be there any difficulty? (edited)

How would you build it in pure Python?

can you share the code you used in Prefect 1?

Don’t take this DAG-free concept too literally - you can still have a DAG, you don’t need to use any operators/tasks or such to define it. For example, you could loop over those inputs/tasks one by one to define the dependency graph. And if you are looking into parsing some JSON representation for this, you can check this example where I parsed dbt config to construct a DAG in Prefect 2:

First of all, thank you for the quick response!
Ok I tried to remove all business logic, so code shouldn’t work, but I save the prefect logic and how we achieved the goal.
So we have the specified DAG. Each feature knows its dependancies. Today we have more than 50 features, so we don’t like to write every feature declarative in flow.

def get_matching_task(
    flow: Flow, name: str, tags: Set[str]
) -> Task:
    matching_tasks = [
        task
        for task in flow.tasks
        if task.name == name and len(task.tags) > 0 and task.tags.issubset(tags)
    ]

    return matching_tasks[0]

class FeatureType(str, Enum):
    Feature1 = "feature1"
    Feature2 = "feature2"
    Feature3 = "feature3"

class FeatureInterface:
    feature_type: FeatureType
    internal_dependencies: List[FeatureType] = []


class FeatureTask(Task):
    def __init__(
        self,
        **kwargs,
    ):
        super().__init__(**kwargs)

    # noinspection PyMethodOverriding
    @abstractmethod
    def run(
        self,
        **kwargs,
    ):
        pass

class Feature1(FeatureInterface, FeatureTask):
    internal_dependencies = []
    feature_name=FeatureType.Feature1

    def run(self):
        pass

class Feature2(FeatureInterface, FeatureTask):
    internal_dependencies = [
        FeatureType.Feature1,
    ]
    feature_name = FeatureType.Feature2

    def run(self, feature1):
        pass


class Feature3(FeatureInterface, FeatureTask):
    internal_dependencies = [
        FeatureType.Feature1,
        FeatureType.Feature2,
    ]
    feature_name = FeatureType.Feature3

    def run(self, feature1,feature2):
        pass

#dependencies specified in internal_dependencies always input parameters in run

dependency_ordered_features = [{Feature1:[]},
                               {Feature2:["feature1"]},
                               {Feature3:["feature1", "feature2"]}]

with Flow(
    ) as flow:
    dependency_ordered_features = Parameter("dependency_ordered_features", required=True)
    for feature in dependency_ordered_features:
        feature_results = feature()(
            **{
                parameter: get_matching_task(
                    flow=flow, name=parameter, tags={"feature", "metadata"}
                )
                for parameter in feature.internal_dependencies
            },

The problem in your example @anna_geller (if I understand it right) it’s not a task (the executed nodes)… so I can’t observe if it works fine or not on Orion.

In Pure python I’d do a pool of threads, so only threads without dependencies could de executed first, so the others will wait till their dependencies are not executed.

1 Like

In Slack I found a discussion about the same problem with snippet of code (I have modified it a little bit with passing the executed tasks):

my_network_graph = {
  "task_1" : [],
  "task_2" : [],
  "task_3" : ["task_1", "task_2"],
  "task_4" : ["task_3"],
  "task_5" : ["task_4"],
}

from prefect import flow, Flow, task

@task
def foo(task_name, others):
    print(task_name)
    print(others)
    return f"{task_name}_{others}"



def build_flow(network) -> Flow:
    tasks = {}
    for name in network.keys():
        tasks[name] = foo.with_options(name=name)

    @flow(name="demo_dynamic")
    def my_flow():
        for task_name, task in tasks.items():
            upstream_task_names = network[task_name]
            upstream_tasks = [tasks[u] for u in upstream_task_names]
            task.submit(task_name=task_name, others=upstream_tasks, wait_for=upstream_tasks)

    return my_flow


my_flow = build_flow(my_network_graph)

if __name__ == "__main__":
    my_flow()

The problems are:

  • It hasnt executed in specified order
  • I need to pass executed object like PrefectFuture, but what I really have prefect.tasks.Task object

Maybe I want to do smth, that Prefect 2.0 doesn’t support?

1 Like

I have found another snippet of code in slack:

from prefect import flow, task, allow_failure
from prefect.task_runners import ConcurrentTaskRunner
from typing import List
from prefect.orion.schemas.states import State
from prefect.futures import PrefectFuture
import time

@task()
def sql_task(task_name: str):
    print(f"RUNNING: {task_name}") # some execution done here
    time.sleep(5)
    if task_name == "task_2":
        raise Exception("failed task")

def my_tasks() -> List[PrefectFuture]:
    task_1 = sql_task.with_options(name="task_1").submit(task_name="task_1", wait_for=[])
    task_4 = sql_task.with_options(name="task_4").submit(task_name="task_4", wait_for=[])
    task_2 = sql_task.with_options(name="task_2").submit(task_name="task_2", wait_for=[task_1, task_4])
    task_3 = sql_task.with_options(name="task_3").submit(task_name="task_3", wait_for=[allow_failure(task_2)])
    #... task_3000+
    # Return the task futures
    return [task_future for task_name, task_future in locals().items() if task_name.startswith("task_")]

@flow(name="demo_7",task_runner=ConcurrentTaskRunner())
def demo_7():
    print("Started Flow (some start actions here)")
    tasks = my_tasks() # executing the actual tasks in the flow
    # Wait for all tasks to finish
    states: List[State] = [task.wait() for task in tasks]
    # Take action based on state
    if any([s.name == "Failed" for s in states]):
        print('Failed actions here')
    if any([s.name == "Canceled" for s in states]):
        pass # etc...

if __name__ == "__main__":
    demo_7()

Function my_tasks do perfectly what I need, besides that how to pass the results of FuturePrefect between tasks? Imagine my task_2 needs the result of task_1 and task_4 like an input parameters.

1 Like

you would need to call .result() on it.

nice work with all the research and finding the examples, it looks like you are on the right track. LMK if you have some specific questions we can help with

For someone who will be blocked with the same problem

import random
import time
from typing import List

from prefect import State, flow, task
from prefect.futures import PrefectFuture
from prefect.task_runners import ConcurrentTaskRunner

from pydantic import BaseModel


class Result(BaseModel):
    name: str
    number: int
    fake_path: str



my_network_graph = {
    "task_1": [],
    "task_2": [],
    "task_3": ["task_1", "task_2"],
    "task_4": ["task_3"],
    "task_5": ["task_4"],
}


@task(name="write_to_storage")
def write_result(result: Result):
    print(f"{result.number} is upload to storage: {result.fake_path}")


@task(name="compute_result")
def compute_result(
    task_name: str,
    upstream_futures: List[Result],
) -> Result:

    print(f"Hello from: {task_name}")
    print(f"My upstream computed dependencies: {upstream_futures}")
    # time.sleep(random.randint(10, 50))
    result = random.randint(1, 100)
    print(f"computed value is {result}")
    return Result(name=task_name, number=result, fake_path=f"gs:fake_path:{result}")


def create_pool_futures_write_results(results: List[Result]) -> List[PrefectFuture]:
    tasks = []
    for result in results:
        tasks.append(
            write_result.with_options(name=f"write_to_storage_{result.name}").submit(
                result=result,
            )
        )
    return tasks


# this function simulate predefined DAG
def create_pool_futures_compute_results() -> List[PrefectFuture]:
    tasks = []
    appended_tasks = {}

    for key, value in my_network_graph.items():
        if len(appended_tasks) == 0:
            tasks.append(
                compute_result.with_options(name=f"compute_result_{key}").submit(
                    task_name=key,
                    upstream_futures=[],
                    wait_for=[],
                )
            )
            appended_tasks[key] = tasks[-1]
        else:
            upstream_futures = [
                appended_tasks[_value] for _value in my_network_graph[key]
            ]
            tasks.append(
                compute_result.with_options(name=f"compute_result_{key}").submit(
                    task_name=key,
                    upstream_futures=upstream_futures,
                    wait_for=upstream_futures,
                )
            )
            appended_tasks[key] = tasks[-1]
    return tasks


@flow(task_runner=ConcurrentTaskRunner())
def futures_execution_flow():

    tasks_computes = (
        create_pool_futures_compute_results()
    )  # executing the features tasks in the flow

    states_features: List[State] = [
        tasks_compute.wait() for tasks_compute in tasks_computes
    ]  # Wait for all tasks to finish

    results = [state.result() for state in states_features]

    tasks_writing = create_pool_futures_write_results(results=results)
    states_writing_tasks: List[State] = [
        task_writing.wait() for task_writing in tasks_writing
    ]  # Wait for all tasks to finish


if __name__ == "__main__":
    futures_execution_flow()
2 Likes