Amazon Elastic Map Reduce (Amazon EMR) is a big data platform that provides Big Data Engineers and Scientists to process large amounts of data at scale. Amazon EMR utilizes open-source tools like Apache Spark, Hive, HBase, and Presto to run large-scale analyses cheaper than the traditional on-premise cluster. This Boto3 EMR tutorial covers how to use the Boto3 library (AWS SDK for Python) to automate the Amazon EMR cluster management.
If you’re new to the Boto3 library, check the Introduction to Boto3 library article.
Table of contents
Using Boto3 EMR for clusters management
Amazon EMR is a highly scalable service that provides you full control over the spawned compute capacity based on your workloads. To interact with the Amazon EMR service through the APIs, you can use the emr
client of the Boto3 library.
Creating a single-node EMR cluster
To create an Amazon EMR cluster of the specified configuration, you need to use the run_jobflow() method of the Boto3 library. In addition to the Boto3 documentation, we recommend you review the RunJobFlow API documentation.
A cluster requires a minimal set of configuration options:
Name
is an identifier to identify the cluster.ReleaseLabel
allows to select a version of the open source big data applications such as Spark, Hive, etc.Instances
defines a compute configuration of your cluster, for example, the number of instances, instance types, termination protection, etc.ServiceRole
– service IAM role that will be assumed by the Amazon EMR service to access AWS resources on your behalfJobRole
also called instance profile and EC2 role.
The following code creates a simple EMR cluster that consists of one single Master node:
import boto3
import json
client = boto3.client('emr', region_name='us-east-1')
response = client.run_job_flow(
Name="Boto3 test cluster",
ReleaseLabel='emr-6.3.0',
Instances={
'MasterInstanceType': 'm1.medium',
'InstanceCount': 1,
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
'Ec2KeyName': 'EMR Key Pair'
},
VisibleToAllUsers=True,
ServiceRole='EMR_DefaultRole',
JobFlowRole='EMR_EC2_DefaultRole',
AutoScalingRole="EMR_AutoScaling_DefaultRole"
)
print(json.dumps(response, indent=4, sort_keys=True, default=str))
Here’s an execution output:

You’ll also see the new starting cluster in the AWS web console:

Create EMR cluster with an instance group
To create an Amazon EMR clusterwith an instance group, you need to use the run_jobflow() method of the Boto3 library.
Every EMR cluster consists of instances organized into three categories of nodes:
- Master Nodes (Driver nodes) take care of the cluster management, job assignment, tracking and provides access to the user application. You can spin up multiple Master nodes for higher availability.
- Core Nodes (Data nodes) take care of the data storage. Data nodes can contain task daemons too, to perform parallel computations.
- Task Nodes (Executors) contain task daemons for parallel computations.
You’re using theInstanceGroups
parameter to specify every node category:
import boto3
import json
client = boto3.client('emr', region_name='us-east-1')
response = client.run_job_flow(
Name="Boto3 test cluster",
ReleaseLabel='emr-6.3.0',
Instances={
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
'InstanceGroups': [
{
'Name': 'Master',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm4.large',
'InstanceCount': 1,
'EbsConfiguration': {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification': {
'VolumeType': 'gp2',
'SizeInGB': 10
},
'VolumesPerInstance': 1
},
],
'EbsOptimized': False
}
},
{
'Name': 'Core',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm4.large',
'InstanceCount': 1,
'EbsConfiguration': {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification': {
'VolumeType': 'gp2',
'SizeInGB': 10
},
'VolumesPerInstance': 1
},
],
'EbsOptimized': False
}
},
{
'Name': 'Task',
'Market': 'ON_DEMAND',
'InstanceRole': 'TASK',
'InstanceType': 'm4.large',
'InstanceCount': 1,
'EbsConfiguration': {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification': {
'VolumeType': 'gp2',
'SizeInGB': 10
},
'VolumesPerInstance': 1
},
],
'EbsOptimized': False
}
},
],
'Ec2KeyName':'EMR Key Pair'
},
VisibleToAllUsers=True,
ServiceRole='EMR_DefaultRole',
JobFlowRole='EMR_EC2_DefaultRole',
AutoScalingRole="EMR_AutoScaling_DefaultRole"
)
print(json.dumps(response, indent=4, sort_keys=True, default=str))
The execution results in the following output:

The above code contains parameter definitions similar to that of the previous section. Under the Instances
key, there is a new key defined as InstanceGroups
where we defined a list of nodes for every node type.

List EMR clusters
To list AmazonEMR clusters, you need to use the list_clusters() method of the Boto3 library. Optionally, you can provide a timeframe to search by the cluster creation date or specify a cluster state.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'WAITING'
]
)
print(json.dumps(response, indent=4, sort_keys=True, default=str))
The code above lists clusters information:

Describe EMR cluster
To describe an AmazonEMR cluster, you need to use the describe_cluster() method of the Boto3 library. This method provides details of a cluster specified by the cluster_id
argument.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'TERMINATED'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.describe_cluster(
ClusterId=cluster_id
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))
Here’s an execution output:

Describe EMR cluster instances
To list and describe an AmazonEMR cluster’s instances, you need to use the list_instances() method of the Boto3 library.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'WAITING'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.list_instances(
ClusterId=cluster_id
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))
Here’s an execution output:

Terminate EMR cluster
To terminate an AmazonEMR cluster, you need to use the terminate_job_flows() method of the Boto3 library. This method can terminate multiple clusters provided in theJobFlowIds
argument.
import boto3
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'WAITING'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.terminate_job_flows(
JobFlowIds=[cluster_id]
)
print(response)
Here’s an execution output:

The AWS web console will instantly reflect the terinate request:

List EMR cluster bootstrap actions
To listbootstrap actions for the AmazonEMR cluster, you need to use the list_bootstrap_actions() method of the Boto3 library. This function returns the S3 bucket location containing the EMR cluster bootstrap scripts.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'WAITING'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.list_bootstrap_actions(
ClusterId=cluster_id
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))

Working with EMR scaling policies using Boto3
There are two ways of managing auto-scaling for your EMR cluster:
Regular auto-scaling policy – this approach is similar to setting up an auto-scaling policy for your EC2 instances. You choose CloudWatch metrics you’d like to monitor and scale based on.
Managed auto-scaling policy – this approach was introduced by the AWS in Jul 2020. Managed scaling lets you automatically increase or decrease the number of instances or units in your cluster based on workload. EMR continuously evaluates cluster metrics to make scaling decisions that optimize your clusters for cost and speed. Managed scaling is available for clusters with either instance groups or instance fleets.
Consider reading the Introducing Amazon EMR Managed Scaling – Automatically Resize Clusters to Lower Cost article for more information.
Attach managed scaling policy to the EMR cluster
Toattach the managed scaling policy for the AmazonEMR cluster, you need to use the put_managed_scaling_policy() method of the Boto3 library. The clusterId
and ManagedScalingPolicy
are required arguments for this method. The ManagedScalingPolicy defines the minimum and the maximum number of instances for cluster autoscaling operations.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'RUNNING'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.put_managed_scaling_policy(
ClusterId=cluster_id,
ManagedScalingPolicy={
'ComputeLimits': {
'UnitType': 'Instances',
'MinimumCapacityUnits': 1,
'MaximumCapacityUnits': 3
}
}
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))
Here’s an execution output:

Describe EMR cluster managed scaling policy
Todescribe the managed scaling policy for the AmazonEMR cluster, you need to use the get_managed_scaling_policy() method of the Boto3 library.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'WAITING'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.get_managed_scaling_policy(
ClusterId=cluster_id
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))
Here’s an execution output:

RemoveEMR cluster managed scaling policy
Todescribe the managed scaling policy for the AmazonEMR cluster, you need to use the remove_managed_scaling_policy() method of the Boto3 library.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'RUNNING'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.remove_managed_scaling_policy(
ClusterId=cluster_id
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))
Here’s an execution output:

Attach auto-scaling policy to the EMR cluster
Toattach the auto-scaling policy for the AmazonEMR cluster, you need to use the put_auto_scaling_policy() method of the Boto3 library.
Auto-scaling policies provide you with the flexibility to manipulate policies based on CloudWatch metrics of the EMR instance group.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'WAITING'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.list_instances(
ClusterId=cluster_id
)
instancegroup_id = response['Instances'][0]['InstanceGroupId']
response = client.put_auto_scaling_policy(
ClusterId=cluster_id,
InstanceGroupId=instancegroup_id,
AutoScalingPolicy={
'Constraints': {
'MinCapacity': 1,
'MaxCapacity': 2
},
'Rules': [
{
'Name': 'Scale Up',
'Description': 'string',
'Action': {
'SimpleScalingPolicyConfiguration': {
'AdjustmentType': 'CHANGE_IN_CAPACITY',
'ScalingAdjustment': 1,
'CoolDown': 120
}
},
'Trigger': {
'CloudWatchAlarmDefinition': {
'ComparisonOperator': 'GREATER_THAN_OR_EQUAL',
'EvaluationPeriods': 1,
'MetricName': 'Scale Up',
'Period': 300,
'Statistic': 'AVERAGE',
'Threshold': 75
}
}
},
]
}
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))
Here’s an execution output:

Remove auto-scaling policy from the EMR cluster
Toremove the auto-scaling policy for the AmazonEMR cluster, you need to use the remove_auto_scaling_policy() method of the Boto3 library.
This method requires the ClusterId
and the InstanceGroup
arguments.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'RUNNING'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.list_instances(
ClusterId=cluster_id
)
instancegroup_id = response['Instances'][0]['InstanceGroupId']
response = client.remove_auto_scaling_policy(
ClusterId=cluster_id,
InstanceGroupId=instancegroup_id
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))
The Boto3 client removes the auto-scaling policy and generates the following output:

Working with EMR cluster termination policy using Boto3
You can terminate clusters in the STARTING
, RUNNING
, or WAITING
states. A cluster in the WAITING
state must be terminated or it runs indefinitely, generating charges to your account. You can terminate a cluster that fails to leave the STARTING
state or is unable to complete a step.
An auto-termination policy defines the amount of idle time in seconds after which a cluster automatically terminates.
Describe auto-termination policy of the EMR cluster
Todescribe the auto-termination policy for the AmazonEMR cluster, you need to use the get_auto_termination_policy() method of the Boto3 library. This method output reveals the idle time for a cluster, after which the cluster will be terminated.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'RUNNING'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.get_auto_termination_policy(
ClusterId=cluster_id
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))
Set auto-termination policy of the EMR cluster
Toset the auto-termination policy for the AmazonEMR cluster, you need to use the put_auto_termination_policy() method of the Boto3 library. TheIdleTimeout
defines the time after which the EMR cluster will be terminated.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'RUNNING'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.put_auto_termination_policy(
ClusterId=cluster_id,
AutoTerminationPolicy={
'IdleTimeout': 120
}
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))
Remove auto-termination policy from the EMR cluster
Toremove the auto-termination policy for the AmazonEMR cluster, you need to use the remove_auto_termination_policy() method of the Boto3 library.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'RUNNING'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.remove_auto_termination_policy(
ClusterId=cluster_id
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))
Working with EMR instance fleets using Boto3
Instance Fleet defines various provisioning capabilities for EC2 instances in EMR clusters. EMR gives various fleets of instance types (Upto 5 different classes of instances per instance group). By defining different fleets of instance types, EMR chooses the available configurations in case of price spikes in Spot Instance provisioning.
Here’s an example of creating a cluster with instance fleets:
import boto3
import json
client = boto3.client('emr', region_name='us-east-1')
response = client.run_job_flow(
Name="Boto3 test cluster",
ReleaseLabel='emr-6.3.0',
Instances={
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
'InstanceFleets': [
{
'Name': 'Master',
'InstanceFleetType': 'MASTER',
'TargetOnDemandCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm4.large',
'EbsConfiguration': {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification': {
'VolumeType': 'gp2',
'SizeInGB': 10
},
'VolumesPerInstance': 1
},
],
'EbsOptimized': False
}
},
],
'LaunchSpecifications': {
'OnDemandSpecification': {
'AllocationStrategy': 'lowest-price'
}
}
},
{
'Name': 'Core',
'InstanceFleetType': 'CORE',
'TargetOnDemandCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm4.large',
'EbsConfiguration': {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification': {
'VolumeType': 'gp2',
'SizeInGB': 10
},
'VolumesPerInstance': 1
},
],
'EbsOptimized': False
}
},
],
'LaunchSpecifications': {
'OnDemandSpecification': {
'AllocationStrategy': 'lowest-price'
}
}
},
],
'Ec2KeyName': 'EMR Key Pair'
},
VisibleToAllUsers=True,
ServiceRole='EMR_DefaultRole',
JobFlowRole='EMR_EC2_DefaultRole'
)
print(json.dumps(response, indent=4, sort_keys=True, default=str))
Add instance fleet to the EMR cluster
Toadd aninstance fleet to the AmazonEMR cluster, you need to use the add_instance_fleet() method of the Boto3 library. The fleet-related attributes are specified as arguments depending on the spot or on-demand instances configuration.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'TERMINATED'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.add_instance_fleet(
ClusterId=cluster_id,
InstanceFleet=
{
"InstanceFleetType":"TASK",
"TargetOnDemandCapacity":1,
"TargetSpotCapacity":0,
"LaunchSpecifications":{},
"InstanceTypeConfigs":[
{
"WeightedCapacity":1,
"EbsConfiguration":
{
"EbsBlockDeviceConfigs":
[
{
"VolumeSpecification":
{
"SizeInGB":32,
"VolumeType":"gp2"
},
"VolumesPerInstance":2}]},
"BidPriceAsPercentageOfOnDemandPrice":100,
"InstanceType":"m4.xlarge"
}
]
}
}
}
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))
Here’s an execution output:

The provisioning of the instances are triggered for the Task nodes:

Modify instance fleet of the EMR cluster
Tomodify theinstance fleet of the AmazonEMR cluster, you need to use the modify_instance_fleet() method of the Boto3 library.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'RUNNING'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.list_instances(
ClusterId=cluster_id
)
fleet_id = response['Instances'][0]['InstanceFleetId']
response = client.modify_instance_fleet(
ClusterId=cluster_id,
InstanceFleet={
'InstanceFleetId': fleet_id,
'TargetOnDemandCapacity': 4,
'TargetSpotCapacity': 0
}
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))
Similar to the previous section, the output returns a success response for a successfully modified instance fleet.

Here’s what the cluster resizing looks like with the made changes.

Working with EMR cluster instance groups using Boto3
Instance groups provide a far easier setup compared to the fleets. Based on the workload, the user can create up to 50 instance groups (a master instance group, a core instance group, and 48 optional task instance groups). An account holds up to 20 instances by default.
Add instance group to the EMR cluster
To add an instance group to the AmazonEMR cluster, you need to use the add_instance_groups() method of the Boto3 library. Each created instance group has its own ID. Instance groups are useful to identify specifictask instance groups for termination or modification. EMR allows adding only task instance groups for the job flow runtime.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'RUNNING'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.add_instance_groups(
JobFlowId=cluster_id,
InstanceGroups={
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
'InstanceGroups': [
{
'Name': 'Core',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm4.large',
'InstanceCount': 3,
'EbsConfiguration': {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification': {
'VolumeType': 'gp2',
'SizeInGB': 10
},
'VolumesPerInstance': 1
},
],
'EbsOptimized': False
}
},
{
'Name': 'Task',
'Market': 'ON_DEMAND',
'InstanceRole': 'TASK',
'InstanceType': 'm4.large',
'InstanceCount': 1,
'EbsConfiguration': {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification': {
'VolumeType': 'gp2',
'SizeInGB': 10
},
'VolumesPerInstance': 1
},
],
'EbsOptimized': False
}
},
]
}
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))
Here’s an execution output:

Modifyinstance group of the EMR cluster
To modify the instance group of the AmazonEMR cluster, you need to use the modify_instance_groups() method of the Boto3 library. This method allows you to modify such parameters as node counts and shrinks policies. InstanceGroupId
acts as the identifier to determine the instance group to be modified. The InstanceCount
key defines the size of the target instance group.
import boto3
import json
from datetime import datetime
client = boto3.client('emr', region_name='us-east-1')
response = client.list_clusters(
CreatedAfter=datetime(2021, 9, 1),
CreatedBefore=datetime(2021, 9, 30),
ClusterStates=[
'WAITING'
]
)
cluster_id = response['Clusters'][0]['Id']
response = client.list_instances(
ClusterId=cluster_id
)
instance_group_id = response['Instances'][0]['InstanceGroupId']
response = client.modify_instance_groups(
ClusterId=cluster_id,
InstanceGroups=[
{
'InstanceGroupId': instance_group_id,
'InstanceCount': 3
},
]
)
print (json.dumps(response, indent=4, sort_keys=True, default=str))
Here’s an execution output:

Additional Learning Resources
Free hands-on AWS workshops
We recommend the following free AWS Lambda Workshops to get more hands-on experience on the subject:
- AWS Lambda Workshop
- Migrate Spring Boot applications to AWS Lambda
- Building CI/CD pipelines for Lambda canary deployments using AWS CDK
- Content Generation Lambda@Edge
Also, we recommend the following free AWS Step Functions Workshops:
Paid courses
From our experience, these are the best hands-on paid learning materials today related to Serverless, AWS Lambda, and Step Functions:
Summary
This article covered how to create, modify, and scale the Amazon EMR clusters using the Boto3 library.