Using Terraform to deploy S3->SQS->Lambda integration

Laptop with code
Related Content

One common integration pattern modern enterprises use is exchanging files using S3 buckets. Humans or automated systems might upload files to the S3 bucket from various sources using HTTPS, SFTP, or SCP protocols. As soon as the file is uploaded, there’s a need for future file processing. In this article, we’ll describe how this integration works and how to deploy it using Terraform.

Here’s what the architecture diagram for this integration looks like.

Using Terraform to deploy S3-SQS-Lambda integration - Architecture

Events structures

Working with AWS Lambda in Python using Boto3

Working with AWS Lambda in Python u...
Working with AWS Lambda in Python using Boto3

The integration is very easy to understand as soon as you can see the event messages sent from one service not another as soon as the file is uploaded to the S3 bucket.

Jumping ahead, the S3 event is transferred within the SQS message body, and that’s how the AWS Lambda function code can get access there (will be shown below).

S3 event structure

S3-generated events are widely used and well covered over the internet.

Here’s what this event structure looks like:

{
   "Records":[
      {
         "eventVersion":"2.1",
         "eventSource":"aws:s3",
         "awsRegion":"us-east-1",
         "eventTime":"2021-04-26T23:31:08.107Z",
         "eventName":"ObjectCreated:Put",
         "userIdentity":{
            "principalId":"AWS:012850762433:admin"
         },
         "requestParameters":{
            "sourceIPAddress":"108.41.58.86"
         },
         "responseElements":{
            "x-amz-request-id":"YP7DR0F7H7R1GN1S",
            "x-amz-id-2":"WYvnoGQrVxe2LfV6yr/sDsZXj/QDL0vD02WQYn9zXg3jX2iKfq83omTmcOcIiuSUk4dTmRRDrhdNNzffoi8AeSBN7RHs2ab0"
         },
         "s3":{
            "s3SchemaVersion":"1.0",
            "configurationId":"tf-s3-queue-20210426224851886600000002",
            "bucket":{
               "name":"amaksimov-s3-sqs-demo-bucket",
               "ownerIdentity":{
                  "principalId":"A1W385KKD8Q319"
               },
               "arn":"arn:aws:s3:::amaksimov-s3-sqs-demo-bucket"
            },
            "object":{
               "key":"4.+Beginner%27s+Guide+to+AWS+Step+functions+-+HelloWorld+Example.png",
               "size":9714,
               "eTag":"b21c122beffd36c0f0caabc4dbd8b16d",
               "sequencer":"0060874D3FC2FA681D"
            }
         }
      }
   ]
}

The S3 event consists of a list of records describing the object within the S3 bucket.

The most commonly used fields are:

  • event['Records'][*]['s3']['bucket']['name'] – gives us a bucket name where the file has been uploaded
  • event['Records'][*]['s3']['object']['key'] – gives us a file name and location within the S3 bucket

In addition to the file upload events, S3 sends test event to test, which has the following structure:

{
   "Service":"Amazon S3",
   "Event":"s3:TestEvent",
   "Time":"2021-04-27T13:57:03.224Z",
   "Bucket":"amaksimov-s3-sqs-demo-bucket",
   "RequestId":"MDSYJ6FFMMZ75MJ8",
   "HostId":"bydBlxgzo+XD8x1szLD+YfeaN8DUtNoxEHsMDySKd1wuX1PKvuYx4h/Iw8uUM1wx/uImu1On5sI="
}

We’ll need to skip this event in our Lambda function code to avoid error messages.

SQS event structure

SQS events are widely used, and they have a similar structure.

{
   "Records":[
      {
         "messageId":"581db230-9853-4be3-a1fe-72c9a5b3e4d4",
         "receiptHandle":"AQEBAwV4m8sSkn5jDd1k/GBLco1znfiv+xT0KTRZdEhQE7clWhAcFlVusMR07RQsBo5ImrlIDafWwdzfX+ZqsuRQPGWE0CcsR6ga8yQTTtG6N1CpWuotJ69Ef55XILtkOMKS+7HR3Ek1oigests3bmx5eCj0QlsRR56qSpj0o1yOOLktLsUehPPTEmWmWXGGPoTc2GayxbnL6lCheolswgiMdE2u0qmbaKV6Ek3E4PyvPfzkOx8XGXIurYJCkFMGcpi0sWrus1WO+dzbm5NtOL9n8qAzjxaMyMyV+nXvy+EO1QCLu2CuX0/rhKfjoq0+txWm8tNVb27VKbwsRKrU12odmV9mbULuvKDU55CqNOMF+LZl8zdZzceegvK2wgfA8KjMmpJ5wQVWo0S8WqVpcJCKSJYhoh/XzqGde+1gQ957YR8=",
         "body":"{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\",\"eventTime\":\"2021-04-26T23:25:17.884Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:012850762433:admin\"},\"requestParameters\":{\"sourceIPAddress\":\"108.41.58.86\"},\"responseElements\":{\"x-amz-request-id\":\"74CMGJPKH3HA1G87\",\"x-amz-id-2\":\"c52dEWNgb6rNUs7MNY20ArZHLgtNFiRJIhREfnNAnlLsXHotTUvLS7InfWnkniuawxPgTlkOkTKZICwIgsbfdHDZKQvL0LcV\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"tf-s3-queue-20210426224851886600000002\",\"bucket\":{\"name\":\"amaksimov-s3-sqs-demo-bucket\",\"ownerIdentity\":{\"principalId\":\"A1W385KKD8Q319\"},\"arn\":\"arn:aws:s3:::amaksimov-s3-sqs-demo-bucket\"},\"object\":{\"key\":\"6.+Beginner%27s+Guide+to+AWS+Step+functions+-+AWS+HelloWorld+example.png\",\"size\":458757,\"eTag\":\"e1148e80d0798b0e23502cbdae1fef58\",\"sequencer\":\"0060874BE06812C89A\"}}}]}",
         "attributes":{
            "ApproximateReceiveCount":"1",
            "SentTimestamp":"1619479521272",
            "SenderId":"AIDAJHIPRHEMV73VRJEBU",
            "ApproximateFirstReceiveTimestamp":"1619479521279"
         },
         "messageAttributes":{
            
         },
         "md5OfBody":"7195d8d0f011fac4dc115b59d3e86797",
         "eventSource":"aws:sqs",
         "eventSourceARN":"arn:aws:sqs:us-east-1:012850762433:amaksimov-s3-event-notification-queue",
         "awsRegion":"us-east-1"
      }
   ]
}

SQS event consists of a list of records representing a message grabbed from the SQS queue.

The most common field here is:

  • event['Records'][*]['body'] – this field contains the text body of the SQS message

Demo Lambda function code

As soon as we know the event structures for both events, we can easily write a demo Lambda function to process those events.

Here’s how the code looks like (index.py):

#!/usr/bin/env python3

import logging
import json

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)

def handler(event, context):
    try:
        LOGGER.info('SQS EVENT: %s', event)
        for sqs_rec in event['Records']:
            s3_event = json.loads(sqs_rec['body'])
            LOGGER.info('S3 EVENT: %s', s3_event)
            # Skipping S3 test event
            if 'Event' in s3_event.keys() and s3_event['Event'] == 's3:TestEvent':
                break
            for s3_rec in s3_event['Records']:
                LOGGER.info('Bucket name: %s', s3_rec['s3']['bucket']['name'])
                LOGGER.info('Object key: %s', s3_rec['s3']['object']['key'])
    except Exception as exception:
        LOGGER.error('Exception: %s', exception)
        raise

This Lambda function code processes SQS events.

The code extracts the S3 message from the SQS message body field for every received event.

We’re checking if we received the test event and skipping it.

Finally, the code logs events, S3 bucket name, and uploaded S3 object key.

This minimal working code is useful if you’d like to use it quickly in your project.

Terraform code

As soon as we have a Lambda function and understand how all the pieces need to be tied together, we can declare everything in Terraform.

Project structure

Here’s the project structure, which we’ll use for our project:

.
 ├── lambda
 │   └── index.py
 ├── main.tf
 ├── provider.tf
 ├── s3.tf
 └── variables.tf
 1 directory, 5 files

Provider configuration

Simple AWS provider configuration is declared in provider.tf file:

# AWS provider configuration
provider "aws" {
    region = var.region
}

Variables

We parameterize our Terraform code with the following variables defined in variables.tf file:

variable "region" {
    default = "us-east-1"
    description = "AWS Region to deploy to"
}

variable "app_env" {
    default = "amaksimov"
    description = "Common prefix for all Terraform created resources"
}

S3 and SQS

Declaration and integration of the S3 bucket and SQS have been put to the s3.tf file:

# SQS queue
resource "aws_sqs_queue" "queue" {
  name = "${var.app_env}-s3-event-notification-queue"

  policy = <<POLICY
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": "*",
      "Action": "sqs:SendMessage",
      "Resource": "arn:aws:sqs:*:*:${var.app_env}-s3-event-notification-queue",
      "Condition": {
        "ArnEquals": { "aws:SourceArn": "${aws_s3_bucket.bucket.arn}" }
      }
    }
  ]
}
POLICY
}

# S3 bucket
resource "aws_s3_bucket" "bucket" {
  bucket = "${var.app_env}-s3-sqs-demo-bucket"
}

# S3 event filter
resource "aws_s3_bucket_notification" "bucket_notification" {
  bucket = aws_s3_bucket.bucket.id

  queue {
    queue_arn     = aws_sqs_queue.queue.arn
    events        = ["s3:ObjectCreated:*"]
  }
}

# Event source from SQS
resource "aws_lambda_event_source_mapping" "event_source_mapping" {
  event_source_arn = aws_sqs_queue.queue.arn
  enabled          = true
  function_name    = aws_lambda_function.sqs_processor.arn
  batch_size       = 1
}

Lambda function

We’ve put Lambda function related resources and Lambda function CloudWatch Log Group to the main.tf file:

# Data resource to archive Lambda function code
data "archive_file" "lambda_zip" {
    source_dir  = "${path.module}/lambda/"
    output_path = "${path.module}/lambda.zip"
    type        = "zip"
}

# Lambda function policy
resource "aws_iam_policy" "lambda_policy" {
    name        = "${var.app_env}-lambda-policy"
    description = "${var.app_env}-lambda-policy"
 
    policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "s3:GetObject",
        "s3:PutObject"
      ],
      "Effect": "Allow",
      "Resource": "${aws_s3_bucket.bucket.arn}"
    },
    {
      "Action": [
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes"
      ],
      "Effect": "Allow",
      "Resource": "${aws_sqs_queue.queue.arn}"
    },
    {
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Effect": "Allow",
      "Resource": "*"
    }
  ]
}
EOF
}

# Lambda function role
resource "aws_iam_role" "iam_for_terraform_lambda" {
    name = "${var.app_env}-lambda-role"
    assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow"
    }
  ]
}
EOF
}

# Role to Policy attachment
resource "aws_iam_role_policy_attachment" "terraform_lambda_iam_policy_basic_execution" {
    role = aws_iam_role.iam_for_terraform_lambda.id
    policy_arn = aws_iam_policy.lambda_policy.arn
}

# Lambda function declaration
resource "aws_lambda_function" "sqs_processor" {
    filename = "lambda.zip"
    source_code_hash = data.archive_file.lambda_zip.output_base64sha256
    function_name = "${var.app_env}-lambda"
    role = aws_iam_role.iam_for_terraform_lambda.arn
    handler = "index.handler"
    runtime = "python3.8"
}

# CloudWatch Log Group for the Lambda function
resource "aws_cloudwatch_log_group" "lambda_loggroup" {
    name = "/aws/lambda/${aws_lambda_function.sqs_processor.function_name}"
    retention_in_days = 14
}

Deployment

As soon as we created all Terraform project files, we can deploy this integration using the following commands:

terraform init
terraform apply -auto-approve

Testing

After finishing the deployment, we can upload any file to the created S3 bucket and see the following CloudWatch messages in place.

Using Terraform to deploy S3-SQS-Lambda integration - CloudWatch Logs

Cleaning up

To clean up everything, you need to delete all the uploaded files from the S3 bucket and then execute the following Terraform command:

terraform destroy -auto-approve

Summary

In this article, we’ve created a widely used integration building block that consists of an S3 bucket, SQS queue, and Lambda function. The purpose of this building block is to give you a guarantee of processing every S3 upload event with the Lambda function in a Serverless way.

We hope that you found this article useful. If so, please, help us to spread it to the world!

LIKE THIS ARTICLE?
Facebook
Twitter
LinkedIn
Pinterest
WANT TO BE AN AUTHOR OF ANOTHER POST?

We’re looking for skilled technical authors for our blog!

Table of Contents