January 26, 2023

AWS Step Functions Long Running Task Example

Share this

By Andrei Maksimov

February 6, 2021

boto3, cloudformation, lambda, python, rds, snapshot, step-functions

Enjoy what I do? Consider buying me a coffee ☕️

  • Home
  • AWS
  • AWS Step Functions Long Running Task Example

Managing and orchestrating multiple automation activities in the cloud may be a challenging task. In the article Cloud CRON – Scheduled Lambda Functions, we covered how to create and manage simple automation activities in the AWS cloud. By the term “simple,” we mean tasks running no longer than 15 minutes (see AWS Lambda quotas). But what if our job runs longer? Those scenarios we’ll cover in this article.

This article continues the Serverless articles series and covers creating and using AWS Step Functions workflow to manage long-running tasks in the AWS cloud. This article demonstrates how to implement an AWS Step Functions long-running task.

Long-Running Jobs In AWS Cloud

There’re multiple ways of handling long-running workloads in AWS in a Serverless way:

  • AWS Batch runs jobs pulled from the jobs FIFO queue, executed using the containerized environment, and exiting upon completion. Job execution time is unlimited.
  • AWS Fargate is a general-purpose container platform where you can do more. For example, you can bind your containers to the network and launch them as services. This service is a bit more complex than AWS Batch. Job execution time is unlimited.
  • AWS Lambda is a service for short-running jobs with a maximum execution time of 15 minutes.
  • Purpose-built AWS services for Machine Learning or Media Processing, for example.

AWS Batch vs. Fargate

When running batch processing jobs or long-running tasks in the cloud, there are two major options: AWS Batch and Fargate. Both have their pros, so it’s important to choose the right option for your needs.

AWS Batch is a managed service that lets you run batch-processing jobs packed in Docker containers. As soon as the batch job finishes its execution, the container used to run it terminates.

AWS Fargate is a serverless compute engine that allows you to run Docker containers indefinitely. You can’t split tasks across multiple containers and execute them independently. Think about how you would solve the same problem using EC2 instances. You would use the SQS topic as a task delivery mechanism to the pool of EC2 instances launched in the Auto Scaling group. The same approach will work for task processing jobs in AWS Fargate, but instead of using EC2 instances, you would process your tasks in Docker containers.

AWS Batch vs. Lambda

Long story short, AWS Lambda is not suited for long-running tasks. It is ideal for quick and predictable workloads lasting no longer than 900 seconds or 15 minutes. If the process has not ended by then, AWS Lambda stops it automatically. So, if you need to run long-running tasks, we recommend you use AWS Batch jobs.

Serverless long-running jobs in AWS

Here’s where AWS Step Functions come into play. AWS Step Functions service is a workflow orchestration service. It lets you describe your workflow (state machine) in a simple JSON structure. This workflow usually consists of multiple steps implemented using AWS Lambda functions or other AWS services integrated with Step Functions. As you describe the workflow, AWS Step Functions visualizes it and makes it available for execution. You can visually track the execution process as every step in the workflow highlights a green, yellow, or red color, making it super helpful to debug the workflow during development in the Step Functions console.

It’s very important that AWS Step Functions can tie Lambda functions and other Step Function workflows (state machines) together to build workflows of any complexity. From the first official release of AWS Step Functions, it has become straightforward to build complex workflow logic and orchestrate multiple tasks executed independently.

AWS Step Functions vs. Lambda

AWS Lambda is an ideal solution for simple tasks that can be executed within 15 minutes. If you’d like to build a complex serverless workflow with multiple Lambda functions, consider using AWS Step Functions as an AWS Lambda alternative for long-running jobs.

AWS Step Functions for Python developers

AWS Step Functions is the Lambda orchestration engine. It can execute Lambda functions natively. Once the Lambda function supports Python execution runtime, you can use the Boto3 library to interact with AWS services from your Step Function workflow.

Additionally, you can build a Custom Lambda runtime using Lambda Layers and add any library or third-party software to your layer to make it available for the Lambda function during its execution.

AWS Step Functions long-running task workflow

Let’s look at the simplest lAWS Step Functions in long-running task workflow. This example has been built based on the Iterating a Loop Using Lambda example from official AWS documentation.

AWS Step Functions long running task workflow

We’ll take the RDS instance snapshot creation process as a long-running process. Take a look at this AWS Step Function diagram:

Our AWS Step Functions state machine will consist of:

  • Task, which initiates creating the RDS Snapshot process (implemented using Lambda Function).
  • Wait for a 1-minute state.
  • Task, which describes RDS Snapshot Lambda Function.
  • Choice state checks DB Snapshot status and waits or continues flow control to two stubs.
  • Pass state represents state machine success or failed execution status; in our case, both states do nothing, but you may use them to do extra work, like sending SNS notifications or calling other AWS services.

Optionally, you can add error-handling steps to your state machine, but we’ll build only functional steps to simplify the example. For more information about error handling, check out the “Error handling in Step Functions” chapter of the official AWS documentation.

When you start Step Function execution, the Step Functions service executes the first workflow step and sends its output to the next step for further processing. Such behavior allows you to pass data between workflow steps easily.

Each workflow execution stores execution history logs, which allows you to identify execution metrics and failures in complex workflows quickly.

RDS Snapshot Lambda Function

Here’s a source code for the Lambda function, which will launch a long-running job (RDS snapshot operation):

import boto3
import datetime
import math
RDS_CLIENT = boto3.client('rds')
def handler(event,context):
    print('Event: {}'.format(event))
    now = datetime.datetime.now()
    timestamp = math.ceil(datetime.datetime.timestamp(now))
    params = {
        'DBSnapshotIdentifier': '${db_instance_identifier}-rds-snapshot-{}'.format(
            timestamp
        ),
        'DBInstanceIdentifier': '${db_instance_identifier}'
    }
    response = RDS_CLIENT.create_db_snapshot(**params)
    return {
        'DBSnapshot': {
            'DBSnapshotIdentifier': response['DBSnapshot']['DBSnapshotIdentifier'],
            'DBInstanceIdentifier': response['DBSnapshot']['DBInstanceIdentifier']
        }
    }

Our function takes DBSnapshotIdentifier and DBInstanceIdentifier as parameters and calling create_db_snapshot() function. As a result of the operation, we’re sending Python dict to the state machine’s next stage.

Describe RDS Snapshot Lambda Function

The following Lambda function will help us understand if the previously launched RDS snapshot operation has been finished or not by querying its status:

import boto3
import datetime
RDS_CLIENT = boto3.client('rds')
def handler(event,context):
    print('Event: {}'.format(event))
    db_snapshot_identifier = event['DBSnapshot']['DBSnapshotIdentifier']
    db_instance_identifier = event['DBSnapshot']['DBInstanceIdentifier']
    response = RDS_CLIENT.describe_db_snapshots(
        DBSnapshotIdentifier=db_snapshot_identifier,
        DBInstanceIdentifier=db_instance_identifier,
        SnapshotType='manual'
    )
    return {
        'DBSnapshot': {
            'DBSnapshotIdentifier': response['DBSnapshots'][0]['DBSnapshotIdentifier'],
            'DBInstanceIdentifier': response['DBSnapshots'][0]['DBInstanceIdentifier'],
            'Status': response['DBSnapshots'][0]['Status']
        }
    }

This function is even simpler because it takes past parameters from the previous step, does describe_db_snapshots(), and returns the same dict with snapshot operation status.

Step Function Workflow

Let’s define AWS Step Functions state machine workflow. Our graph consists of the following steps:

  • Create snapshot.
  • Wait 1 minute.
  • Get snapshot status.
  • Snapshot completed.
  • Done.
  • Failed.

Here’s a complete workflow (state machine) definition:

{
    "Comment": "State machine to create RDS DB instance Snapshot",
    "StartAt": "Create snapshot",
    "States": {
        "Create snapshot": {
            "Type": "Task",
            "Resource": "${create_snapshot_lambda_arn}",
            "Next": "Wait 1 minute"
        },
        "Wait 1 minute": {
            "Type": "Wait",
            "Seconds": 60,
            "Next": "Get snapshot status"
        },
        "Get snapshot status": {
            "Type": "Task",
            "Resource": "${get_snapshot_status_lambda_arn}",
            "Next": "Snapshot completed?"
        },
        "Snapshot completed?": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.DBSnapshot.Status",
                    "StringEquals": "available",
                    "Next": "Done"
                },
                {
                    "Variable": "$.DBSnapshot.Status",
                    "StringEquals": "failed",
                    "Next": "Failed"
                }
            ],
            "Default": "Wait 1 minute"
        },
        "Done": {
            "Type": "Pass",
            "End": true
        },
        "Failed": {
            "Type": "Pass",
            "End": true
        }
    }
}

Complete CloudFormation Template

And finally, here’s the complete CloudFormation stack template, tying everything all together and defining state machine execution:

AWSTemplateFormatVersion: 2010-09-09
Description: >
        This stack is creating RDS instance snapshot and wait till process finishes.
Parameters:
    DBInstanceIdentifier:
        Description: >
                        Identifier of RDS DB instance
        Type: String
Resources:
    LambdaIamRole:
        Type: 'AWS::IAM::Role
        Properties:
            AssumeRolePolicyDocument:
                Version: 2012-10-17
                Statement:
                    -
                        Effect: Allow
                        Principal:
                            Service:
                                - lambda.amazonaws.com
                        Action:
                            - 'sts:AssumeRole'
            Path: /
            Policies:
                -
                    PolicyName: root
                    PolicyDocument:
                        Version: 2012-10-17
                        Statement:
                            -
                                Effect: Allow
                                Action:
                                    - 'rds:CreateDBSnapshot'
                                    - 'rds:DescribeDBSnapshots'
                                Resource: '*'
                            -
                                Effect: Allow
                                Action:
                                    - 'logs:CreateLogGroup'
                                    - 'logs:CreateLogStream'
                                    - 'logs:PutLogEvents'
                                Resource: 'arn:aws:logs:*:*:*'
    CreateSnapshotLambda:
        Type: AWS::Lambda::Function
        Properties:
            Handler: index.handler
            Role: !GetAtt LambdaIamRole.Arn
            Runtime: python3.6
            Timeout: 30
            Code:
                ZipFile:
                    !Sub
                        - |-
                            import boto3
                            import datetime
                            import math
                            RDS_CLIENT = boto3.client('rds')
                            def handler(event,context):
                                print('Event: {}'.format(event))
                                now = datetime.datetime.now()
                                timestamp = math.ceil(datetime.datetime.timestamp(now))
                                params = {
                                    'DBSnapshotIdentifier': '${db_instance_identifier}-rds-snapshot-{}'.format(
                                        timestamp
                                    ),
                                    'DBInstanceIdentifier': '${db_instance_identifier}'
                                }
                                response = RDS_CLIENT.create_db_snapshot(**params)
                                return {
                                    'DBSnapshot': {
                                        'DBSnapshotIdentifier': response['DBSnapshot']['DBSnapshotIdentifier'],
                                        'DBInstanceIdentifier': response['DBSnapshot']['DBInstanceIdentifier']
                                    }
                                }                            
                        -
                            db_instance_identifier:
                                !Sub '${DBInstanceIdentifier}'
    DescribeSnapshotLambda:
        Type: AWS::Lambda::Function
        Properties:
            Handler: index.handler
            Role: !GetAtt LambdaIamRole.Arn
            Runtime: python3.6
            Timeout: 30
            Code:
                ZipFile: |
                    import boto3
                    import datetime
                    RDS_CLIENT = boto3.client('rds')
                    def handler(event,context):
                        print('Event: {}'.format(event))
                        db_snapshot_identifier = event['DBSnapshot']['DBSnapshotIdentifier']
                        db_instance_identifier = event['DBSnapshot']['DBInstanceIdentifier']
                        response = RDS_CLIENT.describe_db_snapshots(
                            DBSnapshotIdentifier=db_snapshot_identifier,
                            DBInstanceIdentifier=db_instance_identifier,
                            SnapshotType='manual'
                        )
                        return {
                            'DBSnapshot': {
                                'DBSnapshotIdentifier': response['DBSnapshots'][0]['DBSnapshotIdentifier'],
                                'DBInstanceIdentifier': response['DBSnapshots'][0]['DBInstanceIdentifier'],
                                'Status': response['DBSnapshots'][0]['Status']
                            }
                        }                    
    StateMachineExecutionRole:
        Type: "AWS::IAM::Role"
        Properties:
            AssumeRolePolicyDocument:
                Version: "2012-10-17"
                Statement:
                    -
                        Effect: "Allow"
                        Principal:
                            Service:
                                - !Sub states.${AWS::Region}.amazonaws.com
                        Action: "sts:AssumeRole"
            Path: "/"
            Policies:
                -
                    PolicyName: StatesExecutionPolicy
                    PolicyDocument:
                        Version: "2012-10-17"
                        Statement:
                            -
                                Effect: Allow
                                Action:
                                    - "lambda:InvokeFunction"
                                Resource: "*"
    StateMachine:
        Type: "AWS::StepFunctions::StateMachine"
        Properties:
            DefinitionString:
                !Sub
                    - |-
                        {
                            "Comment": "State machine to create RDS DB instance Snapshot",
                            "StartAt": "Create snapshot",
                            "States": {
                                "Create snapshot": {
                                    "Type": "Task",
                                    "Resource": "${create_snapshot_lambda_arn}",
                                    "Next": "Wait 1 minute"
                                },
                                "Wait 1 minute": {
                                    "Type": "Wait",
                                    "Seconds": 60,
                                    "Next": "Get snapshot status"
                                },
                                "Get snapshot status": {
                                    "Type": "Task",
                                    "Resource": "${get_snapshot_status_lambda_arn}",
                                    "Next": "Snapshot completed?"
                                },
                                "Snapshot completed?": {
                                    "Type": "Choice",
                                    "Choices": [
                                        {
                                            "Variable": "$.DBSnapshot.Status",
                                            "StringEquals": "available",
                                            "Next": "Done"
                                        },
                                        {
                                            "Variable": "$.DBSnapshot.Status",
                                            "StringEquals": "failed",
                                            "Next": "Failed"
                                        }
                                    ],
                                    "Default": "Wait 1 minute"
                                },
                                "Done": {
                                    "Type": "Pass",
                                    "End": true
                                },
                                "Failed": {
                                    "Type": "Pass",
                                    "End": true
                                }
                            }
                        }                        
                    -
                        create_snapshot_lambda_arn: !GetAtt [ CreateSnapshotLambda, Arn ]
                        get_snapshot_status_lambda_arn: !GetAtt [ DescribeSnapshotLambda, Arn ]
            RoleArn: !GetAtt [ StateMachineExecutionRole, Arn ]
Outputs:
    StateMachine:
        Value: !Ref StateMachine
        Description: Step Function workflow Arn

During the execution, CloudFormation will call Step Functions API, IAM API, and Lambda API to deploy the workflow and other resources from the template.

The Step Function execution process will take a snapshot of the DB instance provided as input data (parameter) in the CloudFormation template.

Summary

This article covered creating a Step Function workflow (state machine) to manage long-running automation tasks – RDS Snapshot. It is a simple but powerful example that should give you an idea of what else you can implement in the same way.

Andrei Maksimov

I’m a passionate Cloud Infrastructure Architect with more than 20 years of experience in IT. In addition to the tech, I'm covering Personal Finance topics at https://amaksimov.com.

Any of my posts represent my personal experience and opinion about the topic.

  • {"email":"Email address invalid","url":"Website address invalid","required":"Required field missing"}

    Related Posts

    Comprehensive Guide to Install Boto3 Python
    Python Boto3 Session: A Comprehensive Guide

    Andrei Maksimov

    11/17/2023

    Ultimate Guide to Amazon Bedrock

    Ultimate Guide to Amazon Bedrock
    AWS Proxies: Enhancing Data Collection and Security

    Subscribe now to get the latest updates!

    >