View in #prefect-community on Slack
@Ricardo_Gaspar: Hey there, maybe you can help me out here with some Prefect code and the rendered schematics.
Context: I’m developing a flow that creates and submits simple spark apps to EMR. To my surprise there are no built-in tasks for EMR (docs).
Nonetheless, I am trying to use boto3
and awswrangler
APIs (as I’ve seen others users asking for it on the community channel).
My current issue, is on the lack of understanding on how the Flows are rendered on the Schematic view. It’s related to tasks dependencies and inter-task communication (passing values).
The first image shows the Airflow DAG I am trying to reproduce. That DAG uses jinja templating to pass values.
To be honest, Airflow does this too if we are using the task outputs (task.output
). I i use such a feature, the DAG shows like this:
That said, this is what I am getting with Prefect:
My question: is there any way to improve the flow schematic? (I’m using Prefect Cloud)
Do you know it is improved on ORION (i’ve seen radar UI )
@Anna_Geller: When you say to improve the flow schematic, do you mean some sort of drill-down functionality?
@Kevin_Kho: So I am not sure this can be improved because you are using that cluster_id
, which is a task, in downstream tasks so it gets connected. I think the only way this schematic can be improved is if cluster_id
is hardcoded, which loses a lot of flexibility.
I believe this can be done in Orion because you can materialize tasks as Python objects by calling .wait().result()
so you can materialize the cluster_id
task and then pass it downstream as a normal Python object
@Anna_Geller: Another thing that could improve your flow, in general, would be to leverage a resource manager for the boto3 client that you are invoking within the flow. Instead of passing this HTTP client between tasks, you could pass it in a way that would be safer and would provide you even more visibility into it within your flow, This blog post has an example
Managing temporary resources | Prefect Docs
Orchestrating ELT with Prefect, dbt Cloud, and Snowflake (Part 3)
@Ricardo_Gaspar I wanted to help more and I included a resource manager for the EMR cluster creation because resource manager is such a good fit for that use case and it gives you way better visualization in the UI. Here is the Gist:
emr_prefect_flow.py · GitHub
from typing import Tuple, Any
import prefect
from botocore.client import BaseClient
from prefect import Task, task, Flow
import awswrangler as wr
import boto3
from prefect.run_configs import UniversalRun
from prefect.storage import S3
from prefect import resource_manager
@task
def add_emr_step(cluster_id: str, steps: list) -> str:
emr_client = boto3.client("emr", region_name="us-west-2")
response = emr_client.add_job_flow_steps(JobFlowId=cluster_id, Steps=steps)
return response["StepIds"][0] # todo deal with array out of bounds
@task
def wait_emr_step(cluster_id: str, step_id: str):
while wr.emr.get_step_state(cluster_id, step_id) != "COMPLETED":
pass
SPARK_STEPS_SUCCESS: list = [
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
},
}
]
SPARK_STEPS_ERROR: list = [
{
"Name": "calculate_pi_failure",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "x"],
},
}
]
@resource_manager
class EMRCluster:
def __init__(self, region_name="us-west-2"):
self.region_name = region_name
def setup(self):
cluster_id = self.create_emr_cluster(cluster_name="prefect-example_cluster")
return cluster_id
def create_emr_cluster(self, cluster_name: str):
"""
:param cluster_name: name
:return: ID of the created cluster
See API https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.run_job_flow
"""
emr_client = boto3.client("emr", region_name=self.region_name)
response = emr_client.run_job_flow(
Name=cluster_name,
ReleaseLabel="emr-6.0.0",
LogUri="s3://carpe-spark-logs/",
Applications=[
{"Name": "Spark"},
{"Name": "Hadoop"},
{"Name": "Hive"},
{"Name": "Ganglia"},
],
Configurations=[
{
"Classification": "spark",
"Properties": {
"maxRemoteBlockSizeFetchToMem": "2gb",
"maximizeResourceAllocation": "true",
},
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.dynamicAllocation.enabled": "true",
"spark.local.dir": "/mnt",
},
},
{
"Classification": "spark-hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
},
},
],
Instances={
"InstanceGroups": [
{
"Name": "Master nodes",
"Market": "ON_DEMAND",
"InstanceRole": "MASTER",
"InstanceType": "m5.xlarge",
"InstanceCount": 1,
},
{
"Name": "Slave nodes",
"Market": "SPOT", # default max bid price is equal to ON_DEMAND
"InstanceRole": "CORE",
"InstanceType": "m5.xlarge",
"InstanceCount": 2,
},
],
"Ec2KeyName": "KeyCarpeDataDOPS",
"Ec2SubnetId": "subnet-c7fb0aa3",
"EmrManagedMasterSecurityGroup": "sg-50770d21",
"EmrManagedSlaveSecurityGroup": "sg-32710b43",
"KeepJobFlowAliveWhenNoSteps": True,
"TerminationProtected": False,
},
ScaleDownBehavior="TERMINATE_AT_TASK_COMPLETION",
StepConcurrencyLevel=1,
EbsRootVolumeSize=10,
BootstrapActions=[],
VisibleToAllUsers=True,
JobFlowRole="EMR_EC2_DefaultRole",
ServiceRole="EMR_DefaultRole",
Tags=[
{"Key": "Name", "Value": "devops_prefect"},
{"Key": "project", "Value": "devops_prefect"},
{"Key": "env_type", "Value": "non_prod"},
{"Key": "environment", "Value": "dev"},
{"Key": "dd-monitor", "Value": "ignore"},
{"Key": "DataStore", "Value": ""},
],
)
# todo check if the creation was successful, deal with array out of bounds
return response["JobFlowId"]
def terminate_cluster(self, cluster_id: str):
emr_client = boto3.client("emr", region_name=self.region_name)
response = emr_client.terminate_job_flows(JobFlowIds=[cluster_id])
return response
def cleanup(self, cluster_id):
self.terminate_cluster(cluster_id=cluster_id)
with Flow(
name="example_emr_jobs-success_error", run_config=UniversalRun(labels=["aws"])
) as flow:
with EMRCluster() as emr_cluster_id:
# success step
add_step_one_id = add_emr_step(
cluster_id=emr_cluster_id,
steps=SPARK_STEPS_SUCCESS,
task_args=dict(name="add_step_one"),
)
# wait_step_one = wait_emr_step(cluster_id=cluster_id, step_id=add_step_one)
wait_step_one = wait_emr_step(
cluster_id=emr_cluster_id,
step_id=add_step_one_id,
task_args=dict(name="wait_step_one"),
)
# failure step
add_step_two = add_emr_step(
cluster_id=emr_cluster_id,
steps=SPARK_STEPS_ERROR,
task_args=dict(name="add_step_two"),
)
wait_step_two = wait_emr_step(
cluster_id=add_step_two,
step_id=add_step_two,
task_args=dict(name="wait_step_two"),
)
# execution order
add_step_two.set_upstream(wait_step_one)
# flow.storage = S3(
# bucket="carpe-prefect-rivdata-dev",
# local_script_path="Example_emr_jobs-success_error.py",
# stored_as_script=True,
# )
# to run visualize, make sure you have graphviz installed on your mac: brew install graphviz
flow.visualize() # uncomment to view the graph locally
# flow.storage.build()
# flow.register(project_name="carpe_poc")
and the flow diagram:
btw curious to hear why did you choose to run the cluster 24/7? With Awswrangler, you could also create the cluster on demand and delete it once your job finished
@Ricardo_Gaspar: thank you both @Kevin_Kho and @Anna_Geller!
- that
.wait().result()
would probably very useful! Another thing I’ve noted was that if I’m grabbing a value from a task result, let’s say my task it’s returning an array and I want to use it as arguments to multiple tasks ( e.g. task2(arg1=task1['result1']
) , the graph renders new boxes/nodes for that intermediate results.
- Didn’t know about that
resourse manager
feature! I’ll look into it, it seems very appropiate for my use case. Thanks for sharing.
- I don’t wanna use a 24/7 cluster, I delegate the creation and destruction to the orchestration tool: a create cluster, run steps, terminate approach. I don’t want to submit all the steps at the creation time to have control of the submission, data dependencies between my spark apps and not rely on EMR to do so. I’ll have a timeout for EMR to kill itself as part of cluster configs (EMR > 6.0.0).
- I’m not using awsdatawrangler yet because the method to create cluster uses instance fleets and not instance groups. For this example I’m using instance groups. I’ll change it in the future
a question for you @Anna_Geller about that resource manager: if I needed to create multiple clusters and submit steps , how would I do that given that with… resource
block syntax? or is there a imperative one as well (like a singleton object approach)?
My first thoughts are:
- having multiple flows (each with it’s own resource manager) and invoking them on a main flow
@Kevin_Kho: If you invoke a Flow from the main Flow, you need to define the resource manager or executor at the subflow level. Man have you seen Orion though? Cuz Orion lets you define this from the main flow level. Is your thinking just very in line with Orion?
@Anna_Geller: I would approach it exactly the way you described with a separate flow run for each such use case. It’s actually a very common and even recommended by AWS approach to have ephemeral EMR clusters per run/job.
@Ricardo_Gaspar: @Anna_Geller creating a cluster takes a lot of time, so reusing makes a lot of sense and it’s a common practise.
One can do it in different ways; but even Databricks recenly added this ability: https://databricks.com/blog/2022/02/04/saving-time-and-costs-with-cluster-reuse-in-databricks-jobs.html
Databricks: How to Save Time and Costs With Cluster Reuse in Databricks Jobs
thanks a lot for your help
really appreciate it; you rock! :prefect: