How to handle custom resource (CPU vs. GPU) and code dependencies across various tasks and flows?

There are multiple issues in your question so I’ll try to break it down.

1. Separating out tasks with custom dependencies into subflows

This problem is not specific to Prefect but to Python in general. If your function_a() needs a different Pandas version than a function_b(), how would you tackle this in Python without Prefect? You would likely separate those out into separate scripts and run each in an entirely different:

  • virtual environment, or
  • Docker container.

This is also how you can handle this in Prefect - you can call function_a() and function_b() within separate flows, and you can orchestrate such child flows from a parent flow (a flow-of-flows orchestrator pattern).

And the benefit of Prefect is that you can dynamically override the Docker image that you want to use for a specific flow or flow run based on your run configuration. This topic shows how you can do that:

2. Tracking parent-child-flow relationship from the UI

In Prefect < 2.0, there is no inherent UI link to track the lineage in a flow-of-flows. This means that when you navigate to a specific child flow run, you can’t see which parent flow run has triggered this child flow run.

However, if you navigate to the parent flow in the UI, you can see the child flow run URLs in the logs. This way, it’s possible to manually track such parent-child-flow relationships, which may be especially helpful in the event of failure (e.g. a child flow run fails and you want to find out why).

:point_right: Note that: in the image above you can see the logs of the child flow run directly from the parent flow run - this is because this flow was configured with the task wait_for_flow_run that contains the argument stream_logs=True. You can find the full flow example in the Discourse topic linked above.

3. Leveraging Kubernetes for resource management

In the end, we need to acknowledge that Prefect is a workflow orchestrator, rather than a resource orchestrator. There are plenty of tools that can orchestrate resources for your compute - you mentioned two of them which are Dask and Ray.

Another (container) orchestration system that you may leverage together with Prefect is Kubernetes, and Kubernetes is the right tool for the job when it comes to resource management and scheduling pods across various compute nodes.

You could have:

  • one KubernetesAgent with the label cpu for CPU-intensive workflows,
  • and another KubernetesAgent with the label gpu for GPU-intensive workflows.

The first agent could be spun up on a “normal” Kubernetes cluster with general-purpose compute instances created as EKS managed nodegroups. You could spin that up on AWS using eksctl:

eksctl create cluster --name=prefect-eks-cpu --nodes=2

The second agent could be spun up on a GPU-specific Kubernetes cluster with EKS managed nodegroups that have embedded GPUs and a preinstalled NVIDIA Kubernetes device plugin. This can also be created with a single eksctl command:

eksctl create cluster --name=prefect-eks-gpu --node-type=p2.xlarge  --nodes=2

Alternatively, you could have a single cluster and manage those different workflow types with heterogeneous resource requirements through different namespaces. But note that this requires more setup and is generally more time-consuming to configure.

4. Prefect 2.0 way of solving that problem

Prefect 2.0 solves many issues discussed above.

  • the Orion subflows provide the link between parent and child flow runs directly in the UI, making this orchestration pattern much easier,
  • the Radar view allows you to easily drill down into the relevant subflow flow run page,
  • the DaskTaskRunner allows you to set custom resource annotations:
import dask
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner

@task
def show(x):
    print(x)

# Create a `LocalCluster` with some resource annotations
# Annotations are abstract in dask and not inferred from your system.
# Here, we claim that our system has 1 GPU and 1 process available per worker
@flow(
    task_runner=DaskTaskRunner(
        cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}}
    )
)
def my_flow():
    with dask.annotate(resources={'GPU': 1}):
        future = show(0)  # this task requires 1 GPU resource on a worker

    with dask.annotate(resources={'process': 1}):
        # These tasks each require 1 process on a worker; because we've 
        # specified that our cluster has 1 process per worker and 1 worker,
        # these tasks will run sequentially
        future = show(1)
        future = show(2)
        future = show(3)