AWS Step Functions – How to manage long-running task

Laptop with code
Related Content

This article continues the Serverless articles series and covers creating and using AWS Step Functions workflow to manage long-running tasks in the AWS cloud. By the end of the article, we’ll deploy our workflow using CloudFormation.

Managing and orchestrating multiple automation activities in the cloud may be a challenging task. In the article Cloud CRON – Scheduled Lambda Functions, we covered how to create and manage simple automation activities in the AWS cloud. By the term “simple,” we mean tasks running no longer than 15 minutes (see AWS Lambda quotas). But what if our job runs longer? Those scenarios we’ll cover in this article.

Long-Running Jobs In AWS Cloud

Working with AWS Lambda in Python using Boto3

Working with AWS Lambda in Python u...
Working with AWS Lambda in Python using Boto3

There’re multiple ways of handling long-running workloads in AWS in a Serverless way:

  • AWS Batch runs jobs pulled from the jobs FIFO queue, executed using the containerized environment, and exiting upon completion. Job execution time is unlimited.
  • AWS Fargate is a general-purpose container platform where you can do more. For example, you can bind your containers to the network and launch them as services. This service is a bit more complex than AWS Batch. Job execution time is unlimited.
  • AWS Lambda is a service for short-running jobs with a maximum execution time of 15 minutes.
  • Purpose-built AWS services for Machine Learning or Media Processing, for example.

AWS Batch vs. Fargate

When it comes to running batch processing jobs or long-running tasks in the cloud, there are two major options: AWS Batch and Fargate. Both have their own pros, so it’s important to choose the right option for your needs.

AWS Batch is a managed service that lets you run batch processing jobs in the packed in Docker containers. As soon as the batch job finishes its execution, the container used to run it terminates.

AWS Fargate is a serverless compute engine that allows you to run Docker containers indefinitely. That means you can’t split tasks across multiple containers and execute them independently. Think, how you would solve the same problem using EC2 instances. You would use SQS topic as a task delivery mechanism to the pool of EC2 instances launched in Auto Scaling group. The same approach will work for task processing jobs in AWS Fargate but instead of using EC2 instances, you would process your tasks in Docker containers.

AWS Batch vs. Lambda

Long story short, AWS Lambda is not suited for long-running tasks. It is ideal for quick and predictable workloads lasting no longer than 900 seconds or 15 minutes. If the process has not ended by then, AWS Lambda stops it automatically. So, if you need to run long-running tasks, we recommend you use AWS Batch jobs.

Serverless long-running jobs in AWS

Here’s where AWS Step Functions come into play. AWS Step Functions service is a workflow orchestration service. It lets you describe your workflow (state machine) in a simple JSON structure. This workflow usually consists of multiple steps usually implemented using AWS Lambda functions or other AWS services integrated with Step Functions. As you describe the workflow, AWS Step Functions visualizes it and makes it available for execution. You can visually track the execution process as every step in the workflow highlights a green, yellow, or red color, making it super helpful to debug the workflow during development in Step Functions console.

It’s very important that AWS Step Functions can tie not only Lambda functions but also other Step Function workflows (state machines) together to build workflows of any complexity. From the first official release of AWS Step Functions it becomes extremely easy to build a complex workflow logic and orchestrate multiple tasks executed independently.

AWS Step Functions vs. Lambda

AWS Lambda is an ideal solution for simple tasks that can be executed within 15 minutes. If you’d like to build a complex serverless workflow where multiple Lambda functions are involved, you need to use AWS Step Functions.

AWS Step Functions for Python developers

AWS Step Functions is the Lambda orchestration engine. It can execute Lambda functions natively. As soon as Lambda function supports Python execution runtime, you can use the Boto3 library to interract with AWS services from your Step Function workflow.

Additionally, you can build a Custom Lambda runtime using Lambda Layers and add any library or third-party software to your layer to make it available for the Lambda function during its execution.

AWS Step Functions long-running task workflow

Let’s take a look at the simplest lAWS Step Functions long-running task workflow. This example has been built based on the Iterating a Loop Using Lambda example from official AWS documentation.

Step Functions - Workflow Orchestration Example

We’ll take the RDS instance snapshot creation process as a long-running process. Take a look at this AWS Step Function diagram:

Our AWS Step Functions state machine will consist of:

  • Task, which initiates creating the RDS Snapshot process (implemented using Lambda Function).
  • Wait for a 1-minute state.
  • Task, which describes RDS Snapshot Lambda Function.
  • Choice state checks DB Snapshot status and waits or continues flow control to two stubs.
  • Pass state represents state machine success or failed execution status; in our case, both states do nothing, but you may use them to do extra work, like sending SNS notifications or calling other AWS services.

Optionally, you can add error handling steps to your state machine but to simplify the example we’ll build only functional steps. For more information about error handling, check out the “Error handling in Step Functions” chapter of the official AWS documentation.

When you start Step Function execution, the Step Functions service executes the first workflow step and sends its output to the next step for further processing. Such a behaviour allows you to easily pass data between workflow steps.

Each workflow execution stores execution history logs which allows you to quickly identify execution metrics and failures in complex workflows.

RDS Snapshot Lambda Function

Here’s a source code for the Lambda function, which will launch a long-running job (RDS snapshot operation):

import boto3
import datetime
import math

RDS_CLIENT = boto3.client('rds')

def handler(event,context):
    print('Event: {}'.format(event))

    now = datetime.datetime.now()
    timestamp = math.ceil(datetime.datetime.timestamp(now))

    params = {
        'DBSnapshotIdentifier': '${db_instance_identifier}-rds-snapshot-{}'.format(
            timestamp
        ),
        'DBInstanceIdentifier': '${db_instance_identifier}'
    }

    response = RDS_CLIENT.create_db_snapshot(**params)

    return {
        'DBSnapshot': {
            'DBSnapshotIdentifier': response['DBSnapshot']['DBSnapshotIdentifier'],
            'DBInstanceIdentifier': response['DBSnapshot']['DBInstanceIdentifier']
        }
    }

Our function takes DBSnapshotIdentifier and DBInstanceIdentifier as parameters and calling create_db_snapshot() function. As a result of the operation, we’re sending Python dict to the state machine’s next stage.

Describe RDS Snapshot Lambda Function

The following Lambda function will help us understand if the previously launched RDS snapshot operation has been finished or not by querying its status:

import boto3
import datetime

RDS_CLIENT = boto3.client('rds')

def handler(event,context):
    print('Event: {}'.format(event))
    db_snapshot_identifier = event['DBSnapshot']['DBSnapshotIdentifier']
    db_instance_identifier = event['DBSnapshot']['DBInstanceIdentifier']

    response = RDS_CLIENT.describe_db_snapshots(
        DBSnapshotIdentifier=db_snapshot_identifier,
        DBInstanceIdentifier=db_instance_identifier,
        SnapshotType='manual'
    )

    return {
        'DBSnapshot': {
            'DBSnapshotIdentifier': response['DBSnapshots'][0]['DBSnapshotIdentifier'],
            'DBInstanceIdentifier': response['DBSnapshots'][0]['DBInstanceIdentifier'],
            'Status': response['DBSnapshots'][0]['Status']
        }
    }

This function is even simpler because it takes past parameters from the previous step, does describe_db_snapshots(), and returns the same dict with snapshot operation status.

Step Function Workflow

Let’s define AWS Step Functions state machine workflow. Our graph consists of the following steps:

  • Create snapshot.
  • Wait 1 minute.
  • Get snapshot status.
  • Snapshot completed.
  • Done.
  • Failed.

Here’s a complete workflow (state machine) definition:

{
    "Comment": "State machine to create RDS DB instance Snapshot",
    "StartAt": "Create snapshot",
    "States": {
        "Create snapshot": {
            "Type": "Task",
            "Resource": "${create_snapshot_lambda_arn}",
            "Next": "Wait 1 minute"
        },
        "Wait 1 minute": {
            "Type": "Wait",
            "Seconds": 60,
            "Next": "Get snapshot status"
        },
        "Get snapshot status": {
            "Type": "Task",
            "Resource": "${get_snapshot_status_lambda_arn}",
            "Next": "Snapshot completed?"
        },
        "Snapshot completed?": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.DBSnapshot.Status",
                    "StringEquals": "available",
                    "Next": "Done"
                },
                {
                    "Variable": "$.DBSnapshot.Status",
                    "StringEquals": "failed",
                    "Next": "Failed"
                }
            ],
            "Default": "Wait 1 minute"
        },
        "Done": {
            "Type": "Pass",
            "End": true
        },
        "Failed": {
            "Type": "Pass",
            "End": true
        }
    }
}

Complete CloudFormation Template

And finally, here’s the complete CloudFormation stack template, tying everything altogether and defining state machine execution:

AWSTemplateFormatVersion: 2010-09-09

Description: >
        This stack is creating RDS instance snapshot and wait till process finishes.

Parameters:

    DBInstanceIdentifier:
        Description: >
                        Identifier of RDS DB instance
        Type: String

Resources:

    LambdaIamRole:
        Type: 'AWS::IAM::Role'
        Properties:
            AssumeRolePolicyDocument:
                Version: 2012-10-17
                Statement:
                    -
                        Effect: Allow
                        Principal:
                            Service:
                                - lambda.amazonaws.com
                        Action:
                            - 'sts:AssumeRole'
            Path: /
            Policies:
                -
                    PolicyName: root
                    PolicyDocument:
                        Version: 2012-10-17
                        Statement:
                            -
                                Effect: Allow
                                Action:
                                    - 'rds:CreateDBSnapshot'
                                    - 'rds:DescribeDBSnapshots'
                                Resource: '*'
                            -
                                Effect: Allow
                                Action:
                                    - 'logs:CreateLogGroup'
                                    - 'logs:CreateLogStream'
                                    - 'logs:PutLogEvents'
                                Resource: 'arn:aws:logs:*:*:*'

    CreateSnapshotLambda:
        Type: AWS::Lambda::Function
        Properties:
            Handler: index.handler
            Role: !GetAtt LambdaIamRole.Arn
            Runtime: python3.6
            Timeout: 30
            Code:
                ZipFile:
                    !Sub
                        - |-
                            import boto3
                            import datetime
                            import math

                            RDS_CLIENT = boto3.client('rds')

                            def handler(event,context):
                                print('Event: {}'.format(event))

                                now = datetime.datetime.now()
                                timestamp = math.ceil(datetime.datetime.timestamp(now))

                                params = {
                                    'DBSnapshotIdentifier': '${db_instance_identifier}-rds-snapshot-{}'.format(
                                        timestamp
                                    ),
                                    'DBInstanceIdentifier': '${db_instance_identifier}'
                                }

                                response = RDS_CLIENT.create_db_snapshot(**params)

                                return {
                                    'DBSnapshot': {
                                        'DBSnapshotIdentifier': response['DBSnapshot']['DBSnapshotIdentifier'],
                                        'DBInstanceIdentifier': response['DBSnapshot']['DBInstanceIdentifier']
                                    }
                                }                            
                        -
                            db_instance_identifier:
                                !Sub '${DBInstanceIdentifier}'

    DescribeSnapshotLambda:
        Type: AWS::Lambda::Function
        Properties:
            Handler: index.handler
            Role: !GetAtt LambdaIamRole.Arn
            Runtime: python3.6
            Timeout: 30
            Code:
                ZipFile: |
                    import boto3
                    import datetime

                    RDS_CLIENT = boto3.client('rds')

                    def handler(event,context):
                        print('Event: {}'.format(event))
                        db_snapshot_identifier = event['DBSnapshot']['DBSnapshotIdentifier']
                        db_instance_identifier = event['DBSnapshot']['DBInstanceIdentifier']

                        response = RDS_CLIENT.describe_db_snapshots(
                            DBSnapshotIdentifier=db_snapshot_identifier,
                            DBInstanceIdentifier=db_instance_identifier,
                            SnapshotType='manual'
                        )

                        return {
                            'DBSnapshot': {
                                'DBSnapshotIdentifier': response['DBSnapshots'][0]['DBSnapshotIdentifier'],
                                'DBInstanceIdentifier': response['DBSnapshots'][0]['DBInstanceIdentifier'],
                                'Status': response['DBSnapshots'][0]['Status']
                            }
                        }                    

    StateMachineExecutionRole:
        Type: "AWS::IAM::Role"
        Properties:
            AssumeRolePolicyDocument:
                Version: "2012-10-17"
                Statement:
                    -
                        Effect: "Allow"
                        Principal:
                            Service:
                                - !Sub states.${AWS::Region}.amazonaws.com
                        Action: "sts:AssumeRole"
            Path: "/"
            Policies:
                -
                    PolicyName: StatesExecutionPolicy
                    PolicyDocument:
                        Version: "2012-10-17"
                        Statement:
                            -
                                Effect: Allow
                                Action:
                                    - "lambda:InvokeFunction"
                                Resource: "*"

    StateMachine:
        Type: "AWS::StepFunctions::StateMachine"
        Properties:
            DefinitionString:
                !Sub
                    - |-
                        {
                            "Comment": "State machine to create RDS DB instance Snapshot",
                            "StartAt": "Create snapshot",
                            "States": {
                                "Create snapshot": {
                                    "Type": "Task",
                                    "Resource": "${create_snapshot_lambda_arn}",
                                    "Next": "Wait 1 minute"
                                },
                                "Wait 1 minute": {
                                    "Type": "Wait",
                                    "Seconds": 60,
                                    "Next": "Get snapshot status"
                                },
                                "Get snapshot status": {
                                    "Type": "Task",
                                    "Resource": "${get_snapshot_status_lambda_arn}",
                                    "Next": "Snapshot completed?"
                                },
                                "Snapshot completed?": {
                                    "Type": "Choice",
                                    "Choices": [
                                        {
                                            "Variable": "$.DBSnapshot.Status",
                                            "StringEquals": "available",
                                            "Next": "Done"
                                        },
                                        {
                                            "Variable": "$.DBSnapshot.Status",
                                            "StringEquals": "failed",
                                            "Next": "Failed"
                                        }
                                    ],
                                    "Default": "Wait 1 minute"
                                },
                                "Done": {
                                    "Type": "Pass",
                                    "End": true
                                },
                                "Failed": {
                                    "Type": "Pass",
                                    "End": true
                                }
                            }
                        }                        
                    -
                        create_snapshot_lambda_arn: !GetAtt [ CreateSnapshotLambda, Arn ]
                        get_snapshot_status_lambda_arn: !GetAtt [ DescribeSnapshotLambda, Arn ]

            RoleArn: !GetAtt [ StateMachineExecutionRole, Arn ]

Outputs:

    StateMachine:
        Value: !Ref StateMachine
        Description: Step Function workflow Arn

During the execution, CloudFormation will call Step Functions API, IAM API, and Lambda API to deploy the workflow and other resources from the template.

The Step Function execution process will take a snapshot of the DB instance provided as an input data (parameter) in the CloudFormation template.

Summary

This article covered creating a Step Function workflow (state machine) to manage long-running automation tasks – RDS Snapshot. It is a simple but powerful example that should give you an idea of what else you can implement in the same way.

I hope this article will save you some amount of time. If you found this useful, please, help me to spread it to the world.

Stay tuned!

LIKE THIS ARTICLE?
Facebook
Twitter
LinkedIn
Pinterest
WANT TO BE AN AUTHOR OF ANOTHER POST?

We’re looking for skilled technical authors for our blog!

Table of Contents