How would I migrate the following example where flow generated RunConfigs are being used in StartFlowRun in Prefect 2.0?

We built an imager task that uses Google’s Kaniko to build and push images to ECR at the time of flow execution on Fargate or Kubernetes infrastructure. This allows developers to test new model enhancements without having to worry about building docker images.

In the Flow below, the imager task returns an image_name and image_tag which is then passed to ECSRun within StartFlowRunner.

Is there a way to achieve similar functionality in Prefect 2? There is Flow.with_options and in Prefect 2 the deployment image is defined with KubernetesJob. Any advice on how to approach this would be amazing! Thank you!

# Prefect
from prefect import Flow
from prefect.tasks.prefect import StartFlowRun

# pathos
from pathos.prefect.tasks import imager

@task
def ecs(
    image_name: str, image_tag: str, 
) -> Callable:
    """A Prefect Run Config that gets populated from an image_name and image_tag
    Args:
        image_name (str): A string that represents the image name
        image_tag (str): A string that represents the image tag
    Returns:
       ECSRun
    """
        return ECSRun(
            image_name=f"{image_name}:{image_tag}"
        )

# The below parameter contains two GitHub repositories, model and model_additions and the branches to run the flow with.
repos = Parameter(
    "repos",
    default={
        "model": "new_feature_1",
        "model_additions": "new_feature_2",
    },
) 

start_flow = StartFlowRun(project_name="Dev", wait=True)

with Flow(name="Run Scoring Job") as flow:
    image_name, image_tag = imager(
        image_tag=flow.name,
        repos=repos, 
        wandb_entity=wandb_entity,
    ) # Kaniko Imager Task to Push to ECR


    score_models_flow = start_flow(
        flow_name="Score Models (Mapped)",
        run_name=image_tag,
        run_config=ecs(image_name=image_name, image_tag=image_tag), # RunConfig 
        parameters=dict(
            model_config=model_config,
            wandb_project=wandb_project,
            wandb_entity=wandb_entity,
        ),
    )
1 Like

You would need to use the orchestrator pattern:

Basically you create deployments with infra/storage block configs as needed, and then you can call those from some parent flow using the pattern shown above

1 Like

Wow, this is awesome thank you very much!

1 Like

Would the same pattern be used if we want subflows to use different infrastructure? For example, a data processing job running on CPU and a modeling job running on GPU?

Yes, 100%! You’re spot on here

1 Like

Thank you for pointing me in the right direction here! I was able to implement what I needed to do using the orchestrator pattern! Is there a way to see the final state of the flow run and logs after the deployment has been created?

1 Like

We have an open issue to add a utility function for that: