AWS Lambda - How to process DynamoDB streams

AWS Lambda – How to process DynamoDB streams

Imagine you decided to launch a Serverless project at Amazon Web Services. In most cases, AWS Lambda and DynamoDB would be your technology choice. This article will cover one of the most common use cases of AWS Lambda – DynamoDB streams processing.

As soon as your project grows, you may start looking for a solution for the following use-cases:

  • Replicate DynamoDB tables to other AWS Regions.
  • Send the data from DynamoDB table to a real-time analytics system.
  • Send the data from DynamoDB table to ElasticSearch for full-text search.
  • Send a notification depending on the data inserted to the database.
  • Do more complex automation depending on the database data changes.

The simplest way to solve those problems is to process the Amazon DynamoDB stream. And that’s where AWS Lambda functions can help. They can do anything you want each time an item in the DynamoDB table is inserted or updated.

This article will show how to trigger AWS Lambda in case of such events.

What are DynamoDB Streams

DynamoDB Streams API is a serverless database that supports key-value and document data structures. It is an excellent service that can automatically scale and continuously backup your data.

DynamoDB Streams is a technology, which allows you to get notified when your DynamoDB table is updated.

Some features of the DynamoDB Streams:

  • Up to two Lambda functions can be subscribed to a single stream.
  • Streamed exactly once and delivery guaranteed.
  • Strictly ordered by key.
  • Durable and scalable.
  • Sub-second latency.
  • 24-hour data retention.

DynamoDB streams consist of Shards. DynamoDB Streams writes a stream record with the primary key attributes of the modified items to a shard. The number of shards equals the number of DynamoDB partitions. Each shard acts as a container for multiple stream records. We’ll need it at the end of the article to tune Lambda executions.

AWS Lambda processes DynamoDB Streams - Shards

What is AWS Lambda

AWS Lambda is an event-driven computing service that can execute your code in response to many different events. No need to manage any computing resources from your side. That’s what it means whenever you hear Serverless.

AWS Lambda is the fastest way to process DynamoDB streams. It reads records from the stream and invokes your code synchronously, providing it modified records from the stream.

CloudFormation Template

Let’s create a DynamoDB table with a demo Lambda function, which will log the data from your stream to CloudWatch Logs (cloudformation.yaml):

AWSTemplateFormatVersion: 2010-09-09
Description: >
  This stack creates DynamoDB table and subscribe looging Lambda function to
  DynamoDB stream.  
Resources:
  rLoggingFunction:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: '300'
      Handler: index.handler
      Role: !GetAtt rLambdaRole.Arn
      Code:
        ZipFile: |
          import logging
          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)
          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)          
  rLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        # Allow Lambda to assume this role
        - Effect: Allow
          Principal:
            Service:
            - lambda.amazonaws.com
          Action:
          - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: LambdaRolePolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
            # Allow Lambda to write logs to CloudWatch
            - Effect: Allow
              Action:
              - logs:CreateLogGroup
              - logs:CreateLogStream
              - logs:PutLogEvents
              Resource: arn:aws:logs:*:*:*
            # Allow Lambda to read from the DynamoDB stream
            - Effect: Allow
              Action:
              - dynamodb:DescribeStream
              - dynamodb:GetRecords
              - dynamodb:GetShardIterator
              - dynamodb:ListStreams
              Resource: "*"
  rDynamoDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: id
          AttributeType: S
      KeySchema:
        - AttributeName: id
          KeyType: HASH
      ProvisionedThroughput:
        ReadCapacityUnits: 1
        WriteCapacityUnits: 1
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES
  rDynamoDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      # The maximum number of DB items to send to Lambda
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt rDynamoDBTable.StreamArn
      FunctionName: !GetAtt rLoggingFunction.Arn
      # Always start at the tail of the Stream
      StartingPosition: LATEST
  rLambdaFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Join
        - ''
        - - '/aws/lambda/'
          -  !Ref rLoggingFunction
      RetentionInDays: 14
Outputs:
  rLoggingFunctionName:
    Value: !Ref rLoggingFunction
  oDynamoDBTableName:
    Value: !Ref rDynamoDBTable

The code here is pretty straightforward. We have:

  • rLoggingFunction – Lambda function declaration, which logs all incoming stream events from DynamoDB.
  • rLambdaRole – Lambda function role, which allows Lambda to read from DynamoDB Stream.
  • rDynamoDBTable – DynamoDB table declaration; StreamSpecification, determines which DB changes to be sent to the Stream.
  • rDynamoDBTableStream – connection of DynamoDB Stream and Lambda function.
  • rLambdaFunctionLogGroup – CloudWatch Log Group to store Lambda execution logs.

Several options for StreamViewType:

  • KEYS_ONLY — Only the key attributes of the modified item.
  • NEW_IMAGE — The entire item, as it appears after it was modified.
  • OLD_IMAGE — The entire item, as it appeared before it was modified.
  • NEW_AND_OLD_IMAGES — Both the new and the old images of the item.

Deploy

To deploy the stack, run the following command:

aws cloudformation create-stack \
    --stack-name DynamoDB-Streams-Test \
    --template-body file://cloudformation.yaml \
    --capabilities CAPABILITY_IAM

Let’s get CloudFormation stack outputs to test our LambdaFunction.

To get DynamoDB table name:

export TABLE_NAME=$(aws cloudformation describe-stacks \
    --stack-name DynamoDB-Streams-Test \
    --query "Stacks[0].Outputs[?OutputKey=='oDynamoDBTableName'].OutputValue" \
    --output text)
echo $TABLE_NAME

To get Lambda Funtion name:

export LAMBDA_NAME=$(aws cloudformation describe-stacks \
    --stack-name DynamoDB-Streams-Test \
    --query "Stacks[0].Outputs[?OutputKey=='rLoggingFunctionName'].OutputValue" \
    --output text)
echo $LAMBDA_NAME

Testing

Use the following test to check if your Lambda function is successfully created.

We will invoke the Lambda function manually using the invoke AWS Lambda CLI command. First, let’s trigger an event in DynamoDB to enable DynamoDB streams.

DynamoDB Stream example: input.json.

{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ]
}

We will execute the following event using the invoke command.

aws lambda invoke \
    --function-name $LAMBDA_NAME \
    --payload fileb://input.txt \
    outputfile.txt

You will get the following output:

{
    "StatusCode": 200,
    "ExecutedVersion": "$LATEST"
}

Here’s how to check CloudWatch logs as well to see the stream records:

export LOG_GROUP_NAME="/aws/lambda/$LAMBDA_NAME"
export LOG_STREAM_NAME=$(aws logs describe-log-streams \
    --log-group-name "$LOG_GROUP_NAME" \
    --query "logStreams[0].logStreamName" \
    --output text)
aws logs get-log-events \
    --log-group-name $LOG_GROUP_NAME \
    --log-stream-name $LOG_STREAM_NAME

Cleaning up

To delete stack and clean up everything, run the following command:

aws cloudformation delete-stack \
    --stack-name DynamoDB-Streams-Test

Tune DynamoDB Stream processing

rDynamoDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
        BatchSize: 1
        Enabled: True
        EventSourceArn: !GetAtt rDynamoDBTable.StreamArn
        FunctionName: !GetAtt rLoggingFunction.Arn
        StartingPosition: LATEST

In our example, the Lambda function is invoked every time the record is available in the stream. For significant workloads that may lead to inefficient Lambda executions. To avoid such behavior, we can tweak DynamoDB Stream.

Configuration tuning

You may check the official documentation for a complete list of options, but the following parameters are most helpful:

  • BatchSize: number of records to send to Lambda function (default: 100, max 1000).
  • MaximumBatchingWindowInSeconds – the amount of time in seconds to wait before sending records to Lambda function (min: 0, max: 300).

Here’s how our example can look like:

rDynamoDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
        BatchSize: 100
        MaximumBatchingWindowInSeconds: 300
        Enabled: True
        EventSourceArn: !GetAtt rDynamoDBTable.StreamArn
        FunctionName: !GetAtt rLoggingFunction.Arn
        StartingPosition: LATEST

Our Lambda function will receive a batch of 100 records or a smaller batch, but not often in 5 minutes.

Monitoring

To keep an eye on your DynamoDB Streams processing with Change Data Capture(CDC), it is worth creating a CloudWatch Dashboard and including the following metrics.

DynamoDB Streams:

  • ReturnedRecordsCount / ReturnedBytes.
  • UserErrors.

Lambda function:

  • Errors.
  • IteratorAge.
  • Throttles.
  • Duration.

Also, you may find those Serverless monitoring solutions quite helpful.

Error handling

At the end of 2019, AWS released Failure-Handling Features For DynamoDB EventSources. It means that now you have:

  • MaximumRetryAttempts – maximum attempts before skipping the batch.
  • MaximumRecordAgeInSeconds – skip processing a data record when it has reached its Maximum Record Age.
  • BisectBatchOnFunctionError – Lambda recursively breaks the impacted batch of records into two when a function returns an error and retries them separately.
  • DestinationConfig – An Amazon SQS queue or Amazon SNS topic destination for discarded records.

Common issues

The following issues are common for DynamoDB Streams processing:

  • IteratorAge is growing rapidly.
  • Rapid growth in Lambda concurrency.

AWS provided an excellent framework (a list of questions) that may help solve those issues in their deck Mastering AWS Lambda streaming event sources.

Summary

In this article, we created simple Lambda functions to log streams of your DynamoDB table to CloudWatch. Also, we paid attention to DynamoDB Streams processing tuning, monitoring, and error handling. I hope you can evolve this example yourself to cover your needs.

We hope you found this helpful article! If so, please, help us to spread it to the world!

Stay tuned!

Similar Posts