AWS Step Functions - How to manage long running tasks

AWS Step Functions - How to manage long running tasks
Table Of Contents

Managing and orchestrating multiple automation activities in the cloud may be a challenging task. In the article Cloud CRON - Scheduled Lambda Functions, we covered how to create and manage simple automation activities in the AWS cloud. By the term “simple,” we mean any tasks running no longer than 15 minutes (see AWS Lambda quotas). But what if our job runs longer?

Those scenarios we’ll cover in this article.

AWS Long-Running Jobs

There’re multiple ways of handling long-running workloads in AWS in a Serverless way:

  • AWS Batch is used for running jobs that are pulled from the jobs FIFO queue, executed using containerized environment and exit upon completion. Job execution time is unlimited.
  • AWS Fargate is a general purpose container platform where you can do a bit more. For example, you can bind your containers to the network and launch them as a services. This service is a bit more complex, then AWS Batch. Job execution time is unlimited.
  • AWS Lambda is a service for short-running jobs with maximum execution time of 15 minutes.
  • AWS Serverless and other purpose built services for Machine Learning or Media Processing.

AWS Lambda Long-Running Tasks

Long story short, AWS Lambda is not suited for long-running tasks. It is an ideal option for quick and predictable workloads that lasts no longer than 900 seconds or 15 minutes in total. If the process not ended by that time, AWS Lambda stops it automatically.

Run Long-Running Jobs In AWS

Here’s where AWS Step Functions come into play. AWS Step Functions is a workflow orchestration service. It allows you to describe your workflow (state machine) in a simple JSON structure. This workflow usually consists of multiple Lambda functions and other AWS services integrated with Step Functions. As soon as you described the workflow, AWS Step Functions visualizes it and makes it available for execution. You can visually track the execution process as every single step in the workflow highlights a green, yellow, or red color, making it super helpful to debug the workflow during development.

Boto3 for Python Step Functions

AWS Step Functions is the Lambda orchestration engine. It can execute Lambda functions natively. As soon as AWS Lambda supports Python execution runtime, the boto3 library is available for you out of the box.

If you’re making Custom Lambda runtime using Lambda Layers, you can install boto3 to your layer and attach it to the Lambda during its execution.

Implementing a long-running job

It’s a prevalent task for any process orchestration system. And we’ll build our solution on top of Iterating a Loop Using Lambda example from official AWS documentation.

As a long-running process, we’ll take the RDS instance snapshot creation process. Take a look at this AWS Step Function diagram:

Our AWS Step Functions state machine will consist of:

  • Task, which initiates create the RDS Snapshot process (implemented using Lambda Function)
  • Wait for 1-minute state
  • Task, which describes RDS Snapshot Lambda Function
  • Choise state, which checks DB Snapshot status and either waits, either continues flow control to two stubs
  • Pass state, which represents state machine success or failed execution status; in our case, both states do nothing, but you may use them to do extra work, like sending SNS notifications or calling other AWS services

RDS Snapshot Lambda Function

Here’s a source code for the Lambda function, which will launch a long-running job (RDS snapshot operation):

import boto3
import datetime
import math

RDS_CLIENT = boto3.client('rds')

def handler(event,context):
    print('Event: {}'.format(event))

    now = datetime.datetime.now()
    timestamp = math.ceil(datetime.datetime.timestamp(now))

    params = {
        'DBSnapshotIdentifier': '${db_instance_identifier}-rds-snapshot-{}'.format(
            timestamp
        ),
        'DBInstanceIdentifier': '${db_instance_identifier}'
    }

    response = RDS_CLIENT.create_db_snapshot(**params)

    return {
        'DBSnapshot': {
            'DBSnapshotIdentifier': response['DBSnapshot']['DBSnapshotIdentifier'],
            'DBInstanceIdentifier': response['DBSnapshot']['DBInstanceIdentifier']
        }
    }

Our function takes DBSnapshotIdentifier and DBInstanceIdentifier as parameters and calling create_db_snapshot() function. As a result of the operation, we’re sending Python dict to the state machine’s next stage.

Describe RDS Snapshot Lambda Function

The following Lambda function will help us understand if previously launched RDS snapshot operation has been finished or not by querying its status:

import boto3
import datetime

RDS_CLIENT = boto3.client('rds')

def handler(event,context):
    print('Event: {}'.format(event))
    db_snapshot_identifier = event['DBSnapshot']['DBSnapshotIdentifier']
    db_instance_identifier = event['DBSnapshot']['DBInstanceIdentifier']

    response = RDS_CLIENT.describe_db_snapshots(
        DBSnapshotIdentifier=db_snapshot_identifier,
        DBInstanceIdentifier=db_instance_identifier,
        SnapshotType='manual'
    )

    return {
        'DBSnapshot': {
            'DBSnapshotIdentifier': response['DBSnapshots'][0]['DBSnapshotIdentifier'],
            'DBInstanceIdentifier': response['DBSnapshots'][0]['DBInstanceIdentifier'],
            'Status': response['DBSnapshots'][0]['Status']
        }
    }

This function is even simpler because it takes past parameters from the previous step, does describe_db_snapshots(), and returns the same dict with snapshot operation status.

Step Function Workflow

AWS Step Functions state machine description. Our graph consists of the following steps:

  • Create snapshot
  • Wait 1 minute
  • Get snapshot status
  • Snapshot completed?
  • Done
  • Failed

Here’s a complete workflow (state machine) description:

{
    "Comment": "State machine to create RDS DB instance Snapshot",
    "StartAt": "Create snapshot",
    "States": {
        "Create snapshot": {
            "Type": "Task",
            "Resource": "${create_snapshot_lambda_arn}",
            "Next": "Wait 1 minute"
        },
        "Wait 1 minute": {
            "Type": "Wait",
            "Seconds": 60,
            "Next": "Get snapshot status"
        },
        "Get snapshot status": {
            "Type": "Task",
            "Resource": "${get_snapshot_status_lambda_arn}",
            "Next": "Snapshot completed?"
        },
        "Snapshot completed?": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.DBSnapshot.Status",
                    "StringEquals": "available",
                    "Next": "Done"
                },
                {
                    "Variable": "$.DBSnapshot.Status",
                    "StringEquals": "failed",
                    "Next": "Failed"
                }
            ],
            "Default": "Wait 1 minute"
        },
        "Done": {
            "Type": "Pass",
            "End": true
        },
        "Failed": {
            "Type": "Pass",
            "End": true
        }
    }
}

Complete example

And finally, here’s the complete CloudFormation stack template, tying everything altogether:

AWSTemplateFormatVersion: 2010-09-09

Description: >
        This stack is creating RDS instance snapshot and wait till process finishes.

Parameters:

    DBInstanceIdentifier:
        Description: >
                        Identifier of RDS DB instance
        Type: String

Resources:

    LambdaIamRole:
        Type: 'AWS::IAM::Role'
        Properties:
            AssumeRolePolicyDocument:
                Version: 2012-10-17
                Statement:
                    -
                        Effect: Allow
                        Principal:
                            Service:
                                - lambda.amazonaws.com
                        Action:
                            - 'sts:AssumeRole'
            Path: /
            Policies:
                -
                    PolicyName: root
                    PolicyDocument:
                        Version: 2012-10-17
                        Statement:
                            -
                                Effect: Allow
                                Action:
                                    - 'rds:CreateDBSnapshot'
                                    - 'rds:DescribeDBSnapshots'
                                Resource: '*'
                            -
                                Effect: Allow
                                Action:
                                    - 'logs:CreateLogGroup'
                                    - 'logs:CreateLogStream'
                                    - 'logs:PutLogEvents'
                                Resource: 'arn:aws:logs:*:*:*'

    CreateSnapshotLambda:
        Type: AWS::Lambda::Function
        Properties:
            Handler: index.handler
            Role: !GetAtt LambdaIamRole.Arn
            Runtime: python3.6
            Timeout: 30
            Code:
                ZipFile:
                    !Sub
                        - |-
                            import boto3
                            import datetime
                            import math

                            RDS_CLIENT = boto3.client('rds')

                            def handler(event,context):
                                print('Event: {}'.format(event))

                                now = datetime.datetime.now()
                                timestamp = math.ceil(datetime.datetime.timestamp(now))

                                params = {
                                    'DBSnapshotIdentifier': '${db_instance_identifier}-rds-snapshot-{}'.format(
                                        timestamp
                                    ),
                                    'DBInstanceIdentifier': '${db_instance_identifier}'
                                }

                                response = RDS_CLIENT.create_db_snapshot(**params)

                                return {
                                    'DBSnapshot': {
                                        'DBSnapshotIdentifier': response['DBSnapshot']['DBSnapshotIdentifier'],
                                        'DBInstanceIdentifier': response['DBSnapshot']['DBInstanceIdentifier']
                                    }
                                }                            
                        -
                            db_instance_identifier:
                                !Sub '${DBInstanceIdentifier}'

    DescribeSnapshotLambda:
        Type: AWS::Lambda::Function
        Properties:
            Handler: index.handler
            Role: !GetAtt LambdaIamRole.Arn
            Runtime: python3.6
            Timeout: 30
            Code:
                ZipFile: |
                    import boto3
                    import datetime

                    RDS_CLIENT = boto3.client('rds')

                    def handler(event,context):
                        print('Event: {}'.format(event))
                        db_snapshot_identifier = event['DBSnapshot']['DBSnapshotIdentifier']
                        db_instance_identifier = event['DBSnapshot']['DBInstanceIdentifier']

                        response = RDS_CLIENT.describe_db_snapshots(
                            DBSnapshotIdentifier=db_snapshot_identifier,
                            DBInstanceIdentifier=db_instance_identifier,
                            SnapshotType='manual'
                        )

                        return {
                            'DBSnapshot': {
                                'DBSnapshotIdentifier': response['DBSnapshots'][0]['DBSnapshotIdentifier'],
                                'DBInstanceIdentifier': response['DBSnapshots'][0]['DBInstanceIdentifier'],
                                'Status': response['DBSnapshots'][0]['Status']
                            }
                        }                    

    StateMachineExecutionRole:
        Type: "AWS::IAM::Role"
        Properties:
            AssumeRolePolicyDocument:
                Version: "2012-10-17"
                Statement:
                    -
                        Effect: "Allow"
                        Principal:
                            Service:
                                - !Sub states.${AWS::Region}.amazonaws.com
                        Action: "sts:AssumeRole"
            Path: "/"
            Policies:
                -
                    PolicyName: StatesExecutionPolicy
                    PolicyDocument:
                        Version: "2012-10-17"
                        Statement:
                            -
                                Effect: Allow
                                Action:
                                    - "lambda:InvokeFunction"
                                Resource: "*"

    StateMachine:
        Type: "AWS::StepFunctions::StateMachine"
        Properties:
            DefinitionString:
                !Sub
                    - |-
                        {
                            "Comment": "State machine to create RDS DB instance Snapshot",
                            "StartAt": "Create snapshot",
                            "States": {
                                "Create snapshot": {
                                    "Type": "Task",
                                    "Resource": "${create_snapshot_lambda_arn}",
                                    "Next": "Wait 1 minute"
                                },
                                "Wait 1 minute": {
                                    "Type": "Wait",
                                    "Seconds": 60,
                                    "Next": "Get snapshot status"
                                },
                                "Get snapshot status": {
                                    "Type": "Task",
                                    "Resource": "${get_snapshot_status_lambda_arn}",
                                    "Next": "Snapshot completed?"
                                },
                                "Snapshot completed?": {
                                    "Type": "Choice",
                                    "Choices": [
                                        {
                                            "Variable": "$.DBSnapshot.Status",
                                            "StringEquals": "available",
                                            "Next": "Done"
                                        },
                                        {
                                            "Variable": "$.DBSnapshot.Status",
                                            "StringEquals": "failed",
                                            "Next": "Failed"
                                        }
                                    ],
                                    "Default": "Wait 1 minute"
                                },
                                "Done": {
                                    "Type": "Pass",
                                    "End": true
                                },
                                "Failed": {
                                    "Type": "Pass",
                                    "End": true
                                }
                            }
                        }                        
                    -
                        create_snapshot_lambda_arn: !GetAtt [ CreateSnapshotLambda, Arn ]
                        get_snapshot_status_lambda_arn: !GetAtt [ DescribeSnapshotLambda, Arn ]

            RoleArn: !GetAtt [ StateMachineExecutionRole, Arn ]

Outputs:

    StateMachine:
        Value: !Ref StateMachine
        Description: Step Function workflow Arn

Summary

This article shows how to create a Step Function workflow (state machine) to manage long-running automation tasks - RDS Snapshot. It is a simple but powerful example, which should give you an idea, what else you can implement in the same way.

And of course, I hope this article will save you some amount of time. If you found this useful, please, help me to spread it to the world.

Opt-In & Stay Tuned!

Authors


Author avatar
Andrei Maksimov

I’m a passionate Cloud Infrastructure Architect with more than 15 years of experience in IT.

Let’s discuss your AWS questions if you still have them.

This post represents my personal experience and opinion about the topic.