AWS Step Functions – How to manage long running tasks

Andrei Maksimov

Andrei Maksimov

5
(3)

This article continues Serverless articles series and covers how to create and use AWS Step Functions workflow to manage long-running tasks in AWS cloud. By the end of the article we’ll deploy our workflow using CloudFormation.

Managing and orchestrating multiple automation activities in the cloud may be a challenging task. In the article Cloud CRON – Scheduled Lambda Functions, we covered how to create and manage simple automation activities in the AWS cloud. By the term “simple,” we mean any tasks running no longer than 15 minutes (see AWS Lambda quotas). But what if our job runs longer?

Those scenarios we’ll cover in this article.

Long-Running Jobs In AWS Cloud

There’re multiple ways of handling long-running workloads in AWS in a Serverless way:

  • AWS Batch is used for running jobs that are pulled from the jobs FIFO queue, executed using containerized environment and exit upon completion. Job execution time is unlimited.
  • AWS Fargate is a general purpose container platform where you can do a bit more. For example, you can bind your containers to the network and launch them as a services. This service is a bit more complex, then AWS Batch. Job execution time is unlimited.
  • AWS Lambda is a service for short-running jobs with maximum execution time of 15 minutes.
  • AWS Serverless and other purpose built services for Machine Learning or Media Processing.

AWS Lambda And Long-Running Tasks

Long story short, AWS Lambda is not suited for long-running tasks. It is an ideal option for quick and predictable workloads that lasts no longer than 900 seconds or 15 minutes in total. If the process not ended by that time, AWS Lambda stops it automatically.

Run Long-Running Jobs In AWS

Here’s where AWS Step Functions come into play. AWS Step Functions is a workflow orchestration service. It allows you to describe your workflow (state machine) in a simple JSON structure. This workflow usually consists of multiple Lambda functions and other AWS services integrated with Step Functions. As soon as you described the workflow, AWS Step Functions visualizes it and makes it available for execution. You can visually track the execution process as every single step in the workflow highlights a green, yellow, or red color, making it super helpful to debug the workflow during development.

Boto3 For Python Step Functions

AWS Step Functions is the Lambda orchestration engine. It can execute Lambda functions natively. As soon as AWS Lambda supports Python execution runtime, the boto3 library is available for you out of the box.

If you’re making Custom Lambda runtime using Lambda Layers, you can install boto3 to your layer and attach it to the Lambda during its execution.

Implementing A Long-Running Job

It’s a prevalent task for any process orchestration system. And we’ll build our solution on top of Iterating a Loop Using Lambda example from official AWS documentation.

Step Functions - Workflow Orchestration Example

As a long-running process, we’ll take the RDS instance snapshot creation process. Take a look at this AWS Step Function diagram:

Our AWS Step Functions state machine will consist of:

  • Task, which initiates create the RDS Snapshot process (implemented using Lambda Function).
  • Wait for 1-minute state.
  • Task, which describes RDS Snapshot Lambda Function.
  • Choice state, which checks DB Snapshot status and either waits, either continues flow control to two stubs.
  • Pass state, which represents state machine success or failed execution status; in our case, both states do nothing, but you may use them to do extra work, like sending SNS notifications or calling other AWS services.

RDS Snapshot Lambda Function

Here’s a source code for the Lambda function, which will launch a long-running job (RDS snapshot operation):

import boto3
import datetime
import math

RDS_CLIENT = boto3.client('rds')

def handler(event,context):
    print('Event: {}'.format(event))

    now = datetime.datetime.now()
    timestamp = math.ceil(datetime.datetime.timestamp(now))

    params = {
        'DBSnapshotIdentifier': '${db_instance_identifier}-rds-snapshot-{}'.format(
            timestamp
        ),
        'DBInstanceIdentifier': '${db_instance_identifier}'
    }

    response = RDS_CLIENT.create_db_snapshot(**params)

    return {
        'DBSnapshot': {
            'DBSnapshotIdentifier': response['DBSnapshot']['DBSnapshotIdentifier'],
            'DBInstanceIdentifier': response['DBSnapshot']['DBInstanceIdentifier']
        }
    }

Our function takes DBSnapshotIdentifier and DBInstanceIdentifier as parameters and calling create_db_snapshot() function. As a result of the operation, we’re sending Python dict to the state machine’s next stage.

Describe RDS Snapshot Lambda Function

The following Lambda function will help us understand if previously launched RDS snapshot operation has been finished or not by querying its status:

import boto3
import datetime

RDS_CLIENT = boto3.client('rds')

def handler(event,context):
    print('Event: {}'.format(event))
    db_snapshot_identifier = event['DBSnapshot']['DBSnapshotIdentifier']
    db_instance_identifier = event['DBSnapshot']['DBInstanceIdentifier']

    response = RDS_CLIENT.describe_db_snapshots(
        DBSnapshotIdentifier=db_snapshot_identifier,
        DBInstanceIdentifier=db_instance_identifier,
        SnapshotType='manual'
    )

    return {
        'DBSnapshot': {
            'DBSnapshotIdentifier': response['DBSnapshots'][0]['DBSnapshotIdentifier'],
            'DBInstanceIdentifier': response['DBSnapshots'][0]['DBInstanceIdentifier'],
            'Status': response['DBSnapshots'][0]['Status']
        }
    }

This function is even simpler because it takes past parameters from the previous step, does describe_db_snapshots(), and returns the same dict with snapshot operation status.

Step Function Workflow

AWS Step Functions state machine description. Our graph consists of the following steps:

  • Create snapshot.
  • Wait 1 minute.
  • Get snapshot status.
  • Snapshot completed.
  • Done.
  • Failed.

Here’s a complete workflow (state machine) description:

{
    "Comment": "State machine to create RDS DB instance Snapshot",
    "StartAt": "Create snapshot",
    "States": {
        "Create snapshot": {
            "Type": "Task",
            "Resource": "${create_snapshot_lambda_arn}",
            "Next": "Wait 1 minute"
        },
        "Wait 1 minute": {
            "Type": "Wait",
            "Seconds": 60,
            "Next": "Get snapshot status"
        },
        "Get snapshot status": {
            "Type": "Task",
            "Resource": "${get_snapshot_status_lambda_arn}",
            "Next": "Snapshot completed?"
        },
        "Snapshot completed?": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.DBSnapshot.Status",
                    "StringEquals": "available",
                    "Next": "Done"
                },
                {
                    "Variable": "$.DBSnapshot.Status",
                    "StringEquals": "failed",
                    "Next": "Failed"
                }
            ],
            "Default": "Wait 1 minute"
        },
        "Done": {
            "Type": "Pass",
            "End": true
        },
        "Failed": {
            "Type": "Pass",
            "End": true
        }
    }
}

Complete CloudFormation Template

And finally, here’s the complete CloudFormation stack template, tying everything altogether:

AWSTemplateFormatVersion: 2010-09-09

Description: >
        This stack is creating RDS instance snapshot and wait till process finishes.

Parameters:

    DBInstanceIdentifier:
        Description: >
                        Identifier of RDS DB instance
        Type: String

Resources:

    LambdaIamRole:
        Type: 'AWS::IAM::Role'
        Properties:
            AssumeRolePolicyDocument:
                Version: 2012-10-17
                Statement:
                    -
                        Effect: Allow
                        Principal:
                            Service:
                                - lambda.amazonaws.com
                        Action:
                            - 'sts:AssumeRole'
            Path: /
            Policies:
                -
                    PolicyName: root
                    PolicyDocument:
                        Version: 2012-10-17
                        Statement:
                            -
                                Effect: Allow
                                Action:
                                    - 'rds:CreateDBSnapshot'
                                    - 'rds:DescribeDBSnapshots'
                                Resource: '*'
                            -
                                Effect: Allow
                                Action:
                                    - 'logs:CreateLogGroup'
                                    - 'logs:CreateLogStream'
                                    - 'logs:PutLogEvents'
                                Resource: 'arn:aws:logs:*:*:*'

    CreateSnapshotLambda:
        Type: AWS::Lambda::Function
        Properties:
            Handler: index.handler
            Role: !GetAtt LambdaIamRole.Arn
            Runtime: python3.6
            Timeout: 30
            Code:
                ZipFile:
                    !Sub
                        - |-
                            import boto3
                            import datetime
                            import math

                            RDS_CLIENT = boto3.client('rds')

                            def handler(event,context):
                                print('Event: {}'.format(event))

                                now = datetime.datetime.now()
                                timestamp = math.ceil(datetime.datetime.timestamp(now))

                                params = {
                                    'DBSnapshotIdentifier': '${db_instance_identifier}-rds-snapshot-{}'.format(
                                        timestamp
                                    ),
                                    'DBInstanceIdentifier': '${db_instance_identifier}'
                                }

                                response = RDS_CLIENT.create_db_snapshot(**params)

                                return {
                                    'DBSnapshot': {
                                        'DBSnapshotIdentifier': response['DBSnapshot']['DBSnapshotIdentifier'],
                                        'DBInstanceIdentifier': response['DBSnapshot']['DBInstanceIdentifier']
                                    }
                                }                            
                        -
                            db_instance_identifier:
                                !Sub '${DBInstanceIdentifier}'

    DescribeSnapshotLambda:
        Type: AWS::Lambda::Function
        Properties:
            Handler: index.handler
            Role: !GetAtt LambdaIamRole.Arn
            Runtime: python3.6
            Timeout: 30
            Code:
                ZipFile: |
                    import boto3
                    import datetime

                    RDS_CLIENT = boto3.client('rds')

                    def handler(event,context):
                        print('Event: {}'.format(event))
                        db_snapshot_identifier = event['DBSnapshot']['DBSnapshotIdentifier']
                        db_instance_identifier = event['DBSnapshot']['DBInstanceIdentifier']

                        response = RDS_CLIENT.describe_db_snapshots(
                            DBSnapshotIdentifier=db_snapshot_identifier,
                            DBInstanceIdentifier=db_instance_identifier,
                            SnapshotType='manual'
                        )

                        return {
                            'DBSnapshot': {
                                'DBSnapshotIdentifier': response['DBSnapshots'][0]['DBSnapshotIdentifier'],
                                'DBInstanceIdentifier': response['DBSnapshots'][0]['DBInstanceIdentifier'],
                                'Status': response['DBSnapshots'][0]['Status']
                            }
                        }                    

    StateMachineExecutionRole:
        Type: "AWS::IAM::Role"
        Properties:
            AssumeRolePolicyDocument:
                Version: "2012-10-17"
                Statement:
                    -
                        Effect: "Allow"
                        Principal:
                            Service:
                                - !Sub states.${AWS::Region}.amazonaws.com
                        Action: "sts:AssumeRole"
            Path: "/"
            Policies:
                -
                    PolicyName: StatesExecutionPolicy
                    PolicyDocument:
                        Version: "2012-10-17"
                        Statement:
                            -
                                Effect: Allow
                                Action:
                                    - "lambda:InvokeFunction"
                                Resource: "*"

    StateMachine:
        Type: "AWS::StepFunctions::StateMachine"
        Properties:
            DefinitionString:
                !Sub
                    - |-
                        {
                            "Comment": "State machine to create RDS DB instance Snapshot",
                            "StartAt": "Create snapshot",
                            "States": {
                                "Create snapshot": {
                                    "Type": "Task",
                                    "Resource": "${create_snapshot_lambda_arn}",
                                    "Next": "Wait 1 minute"
                                },
                                "Wait 1 minute": {
                                    "Type": "Wait",
                                    "Seconds": 60,
                                    "Next": "Get snapshot status"
                                },
                                "Get snapshot status": {
                                    "Type": "Task",
                                    "Resource": "${get_snapshot_status_lambda_arn}",
                                    "Next": "Snapshot completed?"
                                },
                                "Snapshot completed?": {
                                    "Type": "Choice",
                                    "Choices": [
                                        {
                                            "Variable": "$.DBSnapshot.Status",
                                            "StringEquals": "available",
                                            "Next": "Done"
                                        },
                                        {
                                            "Variable": "$.DBSnapshot.Status",
                                            "StringEquals": "failed",
                                            "Next": "Failed"
                                        }
                                    ],
                                    "Default": "Wait 1 minute"
                                },
                                "Done": {
                                    "Type": "Pass",
                                    "End": true
                                },
                                "Failed": {
                                    "Type": "Pass",
                                    "End": true
                                }
                            }
                        }                        
                    -
                        create_snapshot_lambda_arn: !GetAtt [ CreateSnapshotLambda, Arn ]
                        get_snapshot_status_lambda_arn: !GetAtt [ DescribeSnapshotLambda, Arn ]

            RoleArn: !GetAtt [ StateMachineExecutionRole, Arn ]

Outputs:

    StateMachine:
        Value: !Ref StateMachine
        Description: Step Function workflow Arn

Summary

In this article, we covered how to create a Step Function workflow (state machine) to manage long-running automation tasks – RDS Snapshot. It is a simple but powerful example, which should give you an idea, what else you can implement in the same way.

I hope this article will save you some amount of time. If you found this useful, please, help me to spread it to the world.

Stay tuned!

How useful was this post?

Click on a star to rate it!

As you found this post useful...

Follow us on social media!

We are sorry that this post was not useful for you!

Let us improve this post!

Tell us how we can improve this post?

Top rated Udemy Courses to improve you career

Subscribe to our updates

Like this article?

Share on facebook
Share on Facebook
Share on twitter
Share on Twitter
Share on linkedin
Share on Linkdin
Share on pinterest
Share on Pinterest

Want to be an author of another post?

We’re looking for skilled technical authors for our blog!

Leave a comment

If you’d like to ask a question about the code or piece of configuration, feel free to use https://codeshare.io/ or a similar tool as Facebook comments are breaking code formatting.