Working with SQS in Python using Boto3

Abhinav Dumbre

Abhinav Dumbre

0
(0)

Amazon Simple Queuing Service (Amazon SQS) is a distributed messaging system that helps you to send, store, and receive messages between web services and software components at any scale without losing messages or requiring services or application components to be available all the time. Amazon SQS helps decouple and scale microservices, distributed systems, and serverless applications while eliminating the complexity and overhead of managing message-oriented middleware. This article covers how to create, receive, delete and modify SQS queues, and process SQS messages in Python using the Boto3 library.

Prerequisites

To start interacting with Amazon SQS programmatically and making API calls to manage SQS queues, you have to configure your Python environment.

In general, here’s what you need to have installed:

  • Python 3
  • Boto3
  • AWS CLI tools

Alternatively, you can use a Cloud9 IDE.

What is an SQS queue?

Amazon SQS is a fast, reliable, and fully managed message queuing service to host queues. Amazon SQS stores messages until microservices and serverless applications process them. It queues messages generated by one component, termed as Producers, and consumed by members, termed as Consumers.

Here’s an architecture example, that allows you to guarantee, that every uploaded object to the S3 bucket will be processed by the Lambda function.

Using Terraform to deploy S3-SQS-Lambda integration - Architecture

Amazon SQS is a fully managed service, making it an excellent choice for communication between independent systems and a reliable way to submit and receive messages from the queue.

SQS queue messages can contain up to 256 KB of data in any format such as JSON, XML, etc.

SQS offers two types of message queues. 

  • Standard queues: Standard queues provide maximum throughput, best-effort ordering, and at least one delivery. 
  • FIFO queues: FIFO queues are intended to guarantee that messages are processed precisely once, in the exact order they are sent.

SQS use cases

  • Decouple live user requests from intensive background work.
  • Allocate tasks to multiple worker nodes.
  • Batch messages for future processing.

Connect to Amazon SQS using Boto3

The Boto3 library provides you with two ways to access APIs for managing AWS services:

  • The client allows you to access the low-level API data. For example, you can get access to API response data in JSON format.
  • The resource allows you to use AWS services in a higher-level object-oriented way. For more information on the topic, take a look at AWS CLI vs botocore vs Boto3.

Here’s how we can instantiate the Boto3 SQS client to start working with Amazon SQS APIs:

import boto3

AWS_REGION = "us-east-1"

sqs_client = boto3.client("sqs", region_name=AWS_REGION)

Likewise, you can instantiate the Boto3 SQS resource:

import boto3

AWS_REGION = "us-east-1"

sqs_resource = boto3.resource("sqs", region_name=AWS_REGION)

Working with SQS queues

Amazon SQS provides an HTTP API over which applications can submit and read messages out of a messaging queue. An SQS queue works like a buffer between the application components that receive data and those components that process the data in your system.

Create a standard SQS queue

To create a standard SQS queue, you need to use the create_queue() method from the Boto3 resource.

Standard SQS queues are the default type of queues. These queues can handle an unlimited number of transactions (SendMessage, ReceiveMessage, or DeleteMessage API calls) per second.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_resource = boto3.resource("sqs", region_name=AWS_REGION)


def create_queue(queue_name, delay_seconds, visiblity_timeout):
    """
    Create a standard SQS queue
    """
    try:
        response = sqs_resource.create_queue(QueueName=queue_name,
                                             Attributes={
                                                 'DelaySeconds': delay_seconds,
                                                 'VisibilityTimeout': visiblity_timeout
                                             })
    except ClientError:
        logger.exception(f'Could not create SQS queue - {queue_name}.')
        raise
    else:
        return response


if __name__ == '__main__':
    # CONSTANTS
    QUEUE_NAME = 'hands-on-cloud-standard-queue'
    DELAY_SECONDS = '0'
    VISIBLITY_TIMEOUT = '60'

    output = create_queue(QUEUE_NAME, DELAY_SECONDS, VISIBLITY_TIMEOUT)

    logger.info(
        f'Standard Queue {QUEUE_NAME} created. Queue URL - {output.url}')

The create_queue() method returns the Queue object which contains the queue resource URL.

The required argument is:

  • QueueName: The name of the new queue that you want to create.

The following limits apply to queue name:

  • A queue name can contain alphanumeric characters, hyphens (-), and underscores (_).
  • Max queue name limit is 80 characters.

Optional arguments used in the above example are:

  • Attributes: Specifies the attribute values for the queue. Some of the commonly used parameters are:
    • DelaySeconds: Messages are delayed by this value before being delivered.
    • VisibilityTimeout: Visibility timeout is the period in seconds where a particular message is only visible to a single consumer.

Here’s an execution output:

Create a standard SQS queue

Create a FIFO SQS queue

To create a FIFO SQS queue, we need to use the create_queue() method from the Boto3 resource.

The First In First Out (FIFO) queue ensures the message is delivered once and remains in the queue till the receiver processes and deletes it.

FIFO queue guarantee message delivery order. The FIFO queue does not allow duplicates in the queue. It supports up to 300 messages/sec without batching and 3000 messages/sec with batching.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_resource = boto3.resource("sqs", region_name=AWS_REGION)


def create_queue(queue_name, delay_seconds, visiblity_timeout):
    """
    Create a First In First Out (FIFO) SQS queue
    """
    try:
        response = sqs_resource.create_queue(QueueName=queue_name,
                                             Attributes={
                                                 'DelaySeconds': delay_seconds,
                                                 'VisibilityTimeout': visiblity_timeout,
                                                 'FifoQueue': 'true'
                                             })
    except ClientError:
        logger.exception(f'Could not create SQS queue - {queue_name}.')
        raise
    else:
        return response


if __name__ == '__main__':
    # CONSTANTS
    QUEUE_NAME = 'hands-on-cloud-fifo-queue.fifo'
    DELAY_SECONDS = '0'
    VISIBLITY_TIMEOUT = '60'

    output = create_queue(QUEUE_NAME, DELAY_SECONDS, VISIBLITY_TIMEOUT)

    logger.info(f'FIFO Queue {QUEUE_NAME} created. Queue URL - {output.url}')

The create_queue() method returns the Queue object which contains the queue resource URL.

The required argument is:

  • QueueName: The name of the new queue that you want to create.

The name of a FIFO queue must end with a .fifo suffix and can only include alphanumeric characters, hyphens, or underscores and be 1 to 80 in length.

Required attribute:

  • FifoQueue defines a queue as FIFO. Valid values are true and false.

Optional Attributes are:

  • DelaySeconds: Messages are delayed by this value before being delivered.
  • VisibilityTimeout: Visibility timeout is the period in seconds where a particular message is only visible to a single consumer.

Here’s an execution output:

Create a FIFO SQS queue

List all SQS queues

To get a list of all the queues, you need to use the queues collection from the Boto3 SQS resource. This Boto3 collection creates an iterable object of SQS resources.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_resource = boto3.resource("sqs", region_name=AWS_REGION)


def list_queues():
    """
    Creates an iterable of all Queue resources in the collection.
    """
    try:
        sqs_queues = []
        for queue in sqs_resource.queues.all():
            sqs_queues.append(queue)
    except ClientError:
        logger.exception('Could not list queues.')
        raise
    else:
        return sqs_queues


if __name__ == '__main__':
    lst_of_sqs_queues = list_queues()

    for queue in lst_of_sqs_queues:
        logger.info(f'Queue URL - {queue.url}')

The queues.all() method returns a list object of all SQS queues. We can iterate over this list object to list out each queue URL.

Here’s an execution output:

3-List_all_SQS_queues

Filter SQS queues

To filter SQS queues you need to use the filter() method of the Queue class of the Boto3.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_resource = boto3.resource("sqs", region_name=AWS_REGION)


def filter_queues(queue_prefix):
    """
    Creates an iterable of filtered Queue resources in the collection.
    """
    try:
        sqs_queues = []

        filtered_queues = sqs_resource.queues.filter(
            QueueNamePrefix=queue_prefix)

        for queue in filtered_queues:
            sqs_queues.append(queue)
    except ClientError:
        logger.exception(f'Could not filter queues for prefix {queue_prefix}.')
        raise
    else:
        return sqs_queues


if __name__ == '__main__':
    # CONSTANTS
    QUEUE_PREFIX = 'hands-on-cloud'

    lst_of_sqs_queues = filter_queues(QUEUE_PREFIX)

    logger.info(f'A list of SQS queue(s) starting with prefix {QUEUE_PREFIX}:')

    for queue in lst_of_sqs_queues:
        logger.info(f'Queue URL - {queue.url}')

Optional parameters used in the above example:

  • QueueNamePrefixA queue name prefix to use for filtering the list results.

The filter() method returns a List object containing all the filtered resources. If no filters are specified, by default, the returned Queue collection will include all resources.

Here’s an execution output:

Filter SQS queues

Get SQS queue URL

To filter SQS queues, you need to use the get_queue_url() method from the boto3 library.

Note – In this example, we are using boto3.client object.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_client = boto3.client("sqs", region_name=AWS_REGION)


def get_queue(queue_name):
    """
    Returns the URL of an existing Amazon SQS queue.
    """
    try:
        response = sqs_client.get_queue_url(QueueName=queue_name)['QueueUrl']

    except ClientError:
        logger.exception(f'Could not get the {queue_name} queue.')
        raise
    else:
        return response


if __name__ == '__main__':

    # Constants
    QUEUE_NAME = 'hands-on-cloud-standard-queue'
    queue = get_queue(QUEUE_NAME)
    logger.info(f'Queue URL - {queue}')

The get_queue_url() method required at least one argument:

  • QueueName – The name of the queue whose URL is to be retrieved. Queue name must be within 80 characters limit. Valid values: alphanumeric characters, hyphens (-), and underscores (_).

The get_queue_url() method returns a queue URL.

Here’s an execution output:

4-Search_SQS_queues

Delete the SQS queue

To delete an SQS queue, we will use the delete_queue() method from the Boto3 library.

Note: when you delete a queue, any existing messages in the queue will disappear.

The delete a queue operation might take up to 60 seconds to complete.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_client = boto3.client("sqs", region_name=AWS_REGION)


def delete_queue(queue_name):
    """
    Deletes the queue specified by the QueueUrl.
    """
    try:
        response = sqs_client.delete_queue(QueueUrl=queue_name)

    except ClientError:
        logger.exception(f'Could not delete the {queue_name} queue.')
        raise
    else:
        return response


if __name__ == '__main__':

    # Constants
    QUEUE_URL = '<your-queue-url>'
    queue = delete_queue(QUEUE_URL)
    logger.info(f'{QUEUE_URL} deleted successfully.')

Here’s execution output:

5-Delete_the_SQS_queue

List dead letter source queues

SQS Dead Letter Queue (DLQ) is a queue that stores failed messages. In other words, SQS DLQ allows us to store messages that a consumer couldn’t process due to any reason like a wrong message structure, for example.

SQS DLQ stores the messages that are received by the queue successfully but were not processed by message consumers even after multiple tries.

We can set the maximum number of retries that the consumer can try to process the message before moving it to DLQ. Different SQS queues can have a single dead letter queue.

However, SQS doesn’t create DLQ for you automatically. If you want to use DLQ, you need to create it yourself.

We will use the create_queue() method seen in the earlier section to create a Dead Letter Queue.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_client = boto3.client("sqs", region_name=AWS_REGION)
sqs_resource = boto3.resource("sqs", region_name=AWS_REGION)


def create_queue(queue_name, delay_seconds, visiblity_timeout):
    """
    Create a standard SQS queue.
    """
    try:
        response = sqs_resource.create_queue(QueueName=queue_name,
                                             Attributes={
                                                 'DelaySeconds':
                                                 delay_seconds,
                                                 'VisibilityTimeout':
                                                 visiblity_timeout
                                             })
    except ClientError:
        logger.exception(f'Could not create SQS queue - {queue_name}.')
        raise
    else:
        return response


def create_dead_letter_queue(dlq_name, delay_seconds, visiblity_timeout):
    """
    Create a Dead letter queue.
    """
    try:
        response = sqs_resource.create_queue(QueueName=dlq_name,
                                             Attributes={
                                                 'DelaySeconds':
                                                 delay_seconds,
                                                 'VisibilityTimeout':
                                                 visiblity_timeout
                                             })
    except ClientError:
        logger.exception(f'Could not create DLQ - {dlq_name}.')
        raise
    else:
        return response


def get_queue_arn(dlq_url):
    """
    Returns the ARN of the Dead Letter Queue.
    """
    try:
        response = sqs_client.get_queue_attributes(QueueUrl=dlq_url,
                                                   AttributeNames=['QueueArn'])
    except ClientError:
        logger.exception(f'Could not return DLQ ARN - {dlq_url}.')
        raise
    else:
        return response


def configure_queue_to_use_dlq(queue_url, redrive_policy):
    """
    Configure queue to send messages to dead letter queue
    """
    try:
        response = sqs_client.set_queue_attributes(
            QueueUrl=queue_url,
            Attributes={'RedrivePolicy': json.dumps(redrive_policy)})
    except ClientError:
        logger.exception(f'Could not set RedrivePolicy on - {queue_url}.')
        raise
    else:
        return response


if __name__ == '__main__':
    # CONSTANTS
    QUEUE_NAME = 'hands-on-cloud-standard-queue'
    DLQ_NAME = 'hands-on-cloud-standard-dead-letter-queue'
    DELAY_SECONDS = '0'
    VISIBLITY_TIMEOUT = '60'

    queue = create_queue(QUEUE_NAME, DELAY_SECONDS, VISIBLITY_TIMEOUT)

    logger.info(
        f'Standard Queue {QUEUE_NAME} created. Queue URL - {queue.url}')

    dead_letter_queue = create_dead_letter_queue(DLQ_NAME, DELAY_SECONDS,
                                                 VISIBLITY_TIMEOUT)

    dlq_arn = get_queue_arn(dead_letter_queue.url)['Attributes']['QueueArn']

    logger.info(f'Dead Letter Queue {DLQ_NAME} created. Queue ARN - {dlq_arn}')

    redrive_policy = {'deadLetterTargetArn': dlq_arn, 'maxReceiveCount': '10'}

    output = configure_queue_to_use_dlq(queue.url, redrive_policy)

    logger.info(
        f'{QUEUE_NAME} queue is configured to use {DLQ_NAME} as dead letter queue.'
    )

In the above example, we have created an SQS queue that will act as a Dead Letter Queue for the other queues. 

Also, we have used boto3.client object along with boto3.resource object to interact with Amazon SQS and to create an SQS queue and fetch the ARN attribute of the created queue. The get_queue_attributes() method is used to retrieve the queue ARN attribute.

ARN attribute will be used as a reference while configuring the regular queue to send undeliverable messages to a dead letter queue.

After creating a dead letter queue, we need to configure other queues to route failed messages to the dead letter queue. To configure the SQS queue to route failed messages to the DLQ, we need to specify a RedrivePolicy and the maximum number of retries for a message (maxReceiveCount).

Here’s an execution output:

Configure a queue to use Dead Letter Queue

To get a list of your queues that have the RedrivePolicy queue attribute configured with a dead-letter queue, we will use the list_dead_letter_source_queues() method from the Boto3 library.

We will use the Boto3 library paginator object to get the complete output from the list_dead_letter_source_queues() method.

Some AWS requests return incomplete output, therefore, require subsequent requests to get the complete result. The process of sending subsequent requests to continue where a previous request left off is called pagination.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_client = boto3.client("sqs", region_name=AWS_REGION)


def list_dead_letter_source_queues(queue_url):
    """
    Get a list of queues that have the RedrivePolicy queue attribute configured with a dead-letter queue
    """
    try:
        paginator = sqs_client.get_paginator('list_dead_letter_source_queues')

        # creating a PageIterator from the paginator
        page_iterator = paginator.paginate(
            QueueUrl=queue_url).build_full_result()

        queues = []

        # loop through each page from page_iterator
        for page in page_iterator['queueUrls']:
            queues.append(page)

    except ClientError:
        logger.exception(f'Could not get source queues for - {queue_url}.')
        raise
    else:
        return queues


if __name__ == '__main__':
    # CONSTANTS
    DLQ_URL = '<dead-letter-queue-url>'

    queues = list_dead_letter_source_queues(DLQ_URL)

    logger.info('Dead letter source queues are: ')

    for queue in queues:
        logger.info(f'Queue URL - {queue}')

Replace <dead-letter-queue-url> with the dead letter queue URL created in the last code execution.

Here’s an execution output:

List dead letter source queues

Purge the SQS queue

To purge the SQS queue, you need to use the purge_queue() method of the Boto3 client.

Deleting all the messages from the queue can take up to 60 seconds.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_client = boto3.client("sqs", region_name=AWS_REGION)


def purge_queue(queue_url):
    """
    Deletes the messages in a specified queue
    """
    try:
        response = sqs_client.purge_queue(QueueUrl=queue_url)

    except ClientError:
        logger.exception(f'Could not purge the queue - {queue_url}.')
        raise
    else:
        return response


if __name__ == '__main__':
    # CONSTANTS
    QUEUE_URL = '<your-queue-url>'

    purge = purge_queue(QUEUE_URL)

    logger.info(f'All the message from {QUEUE_URL} are purged.')

The purge_queue() method does not return any output response after a successful queue purge.

Here’s a code execution:

Purge the SQS queue

Set the SQS queue attributes

To set or update attributes such as DelaySeconds, MaximumMessageSize, RedrivePolicy, etc., for the existing queue, you need to use the set_queue_attributes() method from the Boto3 library.

Changes can take up to 60 seconds for most of the attributes to propagate throughout the Amazon SQS system.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_client = boto3.client("sqs", region_name=AWS_REGION)


def configure_queue_attributes(queue_url, delay_seconds, max_msg_size):
    """
    Configure queue attributes.
    """
    try:
        response = sqs_client.set_queue_attributes(QueueUrl=queue_url,
                                                   Attributes={
                                                       'DelaySeconds':
                                                       delay_seconds,
                                                       'MaximumMessageSize':
                                                       max_msg_size
                                                   })
    except ClientError:
        logger.exception(f'Could not set attributes on - {queue_url}.')
        raise
    else:
        return response


if __name__ == '__main__':
    # CONSTANTS
    QUEUE_URL = '<your-queue-url>'
    DELAY_SECONDS = '15'
    MAX_MSG_SZIE = '2048'

    queue = configure_queue_attributes(QUEUE_URL, DELAY_SECONDS, MAX_MSG_SZIE)

    logger.info(f'Queue {QUEUE_URL} attributes created.')

The set_queue_attributes() method required at least two arguments:

  • QueueUrl – specifies the URL of the Amazon SQS queue whose attributes are set.
  • Attributes – specifies a dictionary object of attributes to set.

In the above example, we set DelaySeconds and MaximumMessageSize attributes for the queue.

The set_queue_attributes() method does not return any output response after successful code execution.

Here’s an execution output:

Set the SQS queue attributes

Set the SQS queue tags

To set the SQS queue tags, you need to use the tag_queue() method from the Boto3 library.

AWS tags are used for logical resources grouping, filtering, and cost allocation.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_client = boto3.client("sqs", region_name=AWS_REGION)


def apply_queue_tags(queue_url, tags):
    """
    Add resource tags to the specified Amazon SQS queue.
    """
    try:
        response = sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)
    except ClientError:
        logger.exception(f'Could not set tags on - {queue_url}.')
        raise
    else:
        return response


if __name__ == '__main__':
    # CONSTANTS
    QUEUE_URL = '<your-queue-url>'
    TAGS = {
        'Name': 'hands-on-cloud-standard-queue',
        'Team': 'hands-on-cloud',
        'Type': 'standard'
    }

    queue = apply_queue_tags(QUEUE_URL, TAGS)

    logger.info(f'Resource tags applied to the queue - {QUEUE_URL}.')

Required parameters are:

  • QueueUrl – The URL of the queue.
  • Tags – The list of tags to be added to the specified queue.

The tag_queue() method does not return any output response after successful execution.

Here’s an execution output:

Set the SQS queue tags

List SQS queue tags

To get SQS queue tags, you need to use the list_queue_tags() method from the Boto3 library.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_client = boto3.client("sqs", region_name=AWS_REGION)


def get_queue_tags(queue_url):
    """
    List all resource tags added to the specified Amazon SQS queue.
    """
    try:
        response = sqs_client.list_queue_tags(QueueUrl=queue_url)
    except ClientError:
        logger.exception(f'Could not get tags for - {queue_url}.')
        raise
    else:
        return response


if __name__ == '__main__':
    # CONSTANTS
    QUEUE_URL = '<your-queue-url>'

    tags = get_queue_tags(QUEUE_URL)

    logger.info('Tags for the queue: ')

    for key, value in tags['Tags'].items():
        print(f'Tag Key: {key}, Tag Value: {value}')

Here’s an execution output:

Get SQS queue tags

Set the SQS queue permissions

To set permissions to the SQS queue, you need to use the add_permission() method from the Boto3 library.

SQS queue permissions are used to provide access to the queue for AWS services and IAM identities. When the SQS queue is created, you have full control access rights for the queue, and only you, the owner of the queue, can grant or deny permissions to the queue.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_client = boto3.client("sqs", region_name=AWS_REGION)


def add_access_permissions(queue_url, label, account_ids, actions):
    """
    Adds permission to a queue for a specific principal.
    """
    try:
        response = sqs_client.add_permission(QueueUrl=queue_url,
                                             Label=label,
                                             AWSAccountIds=account_ids,
                                             Actions=actions)
    except ClientError:
        logger.exception(f'Could not add permissions for - {queue_url}.')
        raise
    else:
        return response


if __name__ == '__main__':
    # CONSTANTS
    QUEUE_URL = '<your-queue-url>'
    LABEL = 'HandOnCloudSendMessage'
    ACCOUNT_IDS = ['979450158315']
    ACTIONS = ['SendMessage', 'DeleteMessage']

    permissions = add_access_permissions(QUEUE_URL, LABEL, ACCOUNT_IDS,
                                         ACTIONS)

    logger.info(f'Permissions added to the queue with the label {LABEL}.')

The required arguments are:

  • QueueUrlThe URL of the SQS queue to which permissions are added. Queue URLs and names are case-sensitive.
  • LabelThe unique identifier of the permission you’re setting. Maximum 80 characters. Only alphanumeric characters, hyphens (-), and underscores (_) are allowed.
  • AWSAccountIds:  defines a list AWS accounts which principals are to receive permissions.
  • ActionsThe actions to be allowed for the specified principal.

The add_permission() method does not return any output response after a successful operation.

Here’s an execution output:

Set the SQS queue permissions

Remove the SQS queue permissions

To remove permissions from the SQS queue, you need to use the remove_permission() method from the Boto3 library.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_client = boto3.client("sqs", region_name=AWS_REGION)


def remove_access_permissions(queue_url, label):
    """
    Revokes any permissions in the queue policy.
    """
    try:
        response = sqs_client.remove_permission(QueueUrl=queue_url,
                                                Label=label)
    except ClientError:
        logger.exception(f'Could not remove permissions for - {queue_url}.')
        raise
    else:
        return response


if __name__ == '__main__':
    # CONSTANTS
    QUEUE_URL = '<your-queue-url>'
    LABEL = 'HandOnCloudSendMessage'

    permissions = remove_access_permissions(QUEUE_URL, LABEL)

    logger.info(f'Permissions {LABEL} removed from the queue.')

The required arguments are:

  • QueueUrlThe URL of the Amazon SQS queue from which permissions are removed.
  • LabelThe identifier of the permission to remove.

The remove_permission() method does not return any output response after a successful operation.

Here’s an execution output:

Remove the SQS queue permissions

Processing SQS queue messages

Boto3 library allows you effectively process SQS queue messages. Here’s how it works:

Send message to the SQS queue

To send a message to the SQS queue, you need to use the send_message() method from the Boto3 library.

A message can include only XML, JSON, and unformatted text.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_client = boto3.client("sqs", region_name=AWS_REGION)


def send_queue_message(queue_url, msg_attributes, msg_body):
    """
    Sends a message to the specified queue.
    """
    try:
        response = sqs_client.send_message(QueueUrl=queue_url,
                                           MessageAttributes=msg_attributes,
                                           MessageBody=msg_body)
    except ClientError:
        logger.exception(f'Could not send meessage to the - {queue_url}.')
        raise
    else:
        return response


if __name__ == '__main__':
    # CONSTANTS
    QUEUE_URL = '<your-queue-url>'
    MSG_ATTRIBUTES = {
        'Title': {
            'DataType': 'String',
            'StringValue': 'Working with SQS in Python using Boto3'
        },
        'Author': {
            'DataType': 'String',
            'StringValue': 'Abhinav D'
        }
    }
    MSG_BODY = 'Learn how to create, receive, delete and modify SQS queues and see the other functions available within the AWS.'

    msg = send_queue_message(QUEUE_URL, MSG_ATTRIBUTES, MSG_BODY)

    json_msg = json.dumps(msg, indent=4)

    logger.info(f'''
        Message sent to the queue {QUEUE_URL}.
        Message attributes: \n{json_msg}''')

The required arguments are:

  • QueueUrlThe URL of the Amazon SQS queue to which a message is sent.
  • MessageBodyThe message to send. The minimum size is one character. The maximum length is 256 KB.

Optional parameter used in above example,

  • MessageAttributes: Using message attributes, users can specify structured metadata with messages.

Sending Message attributes is optional and separate from the message body (however, they are sent alongside it).

Here’s an execution output:

Send message to the SQS queue

Read and delete messages from the SQS queue

To receive (read) a message from an SQS queue, you need to use the receive_message() method. To delete a message from an SQS queue, you need to use the delete_message() method.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_client = boto3.client("sqs", region_name=AWS_REGION)


def receive_queue_message(queue_url):
    """
    Retrieves one or more messages (up to 10), from the specified queue.
    """
    try:
        response = sqs_client.receive_message(QueueUrl=queue_url)
    except ClientError:
        logger.exception(
            f'Could not receive the message from the - {queue_url}.')
        raise
    else:
        return response


def delete_queue_message(queue_url, receipt_handle):
    """
    Deletes the specified message from the specified queue.
    """
    try:
        response = sqs_client.delete_message(QueueUrl=queue_url,
                                             ReceiptHandle=receipt_handle)
    except ClientError:
        logger.exception(
            f'Could not delete the meessage from the - {queue_url}.')
        raise
    else:
        return response


if __name__ == '__main__':
    # CONSTANTS
    QUEUE_URL = '<your-queue-url>'

    messages = receive_queue_message(QUEUE_URL)

    for msg in messages['Messages']:
        msg_body = msg['Body']
        receipt_handle = msg['ReceiptHandle']

        logger.info(f'The message body: {msg_body}')

        logger.info('Deleting message from the queue...')

        delete_queue_message(QUEUE_URL, receipt_handle)

    logger.info(f'Received and deleted message(s) from {QUEUE_URL}.')

The required attribute for the receive_message() method is:

  • QueueUrlThe queue URL of the queue from which messages are received.

The required attributes for the delete_message() method are:

  • QueueUrlThe queue URL of the queue from which messages are deleted.
  • ReceiptHandleThe receipt handle linked with the message to delete.

Here’s a code execution output:

Read and delete messages from the SQS queue

How to do SQS long polling?

The long polling operation allows you to minimize costs for processing SQS messages by querying as many messages as possible within up to 20 seconds time interval. The polling behavior is controlled by the ReceiveMessageWaitTimeSeconds attribute. By default, it is set to 0.

Enable long polling on an existing SQS queue

To enable long polling on an existing SQS queue, you need to use the set_queue_attributes() method of the Boto3 library.

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_client = boto3.client("sqs", region_name=AWS_REGION)


def configure_queue_long_polling(queue_url, msg_rcv_wait_time):
    """
    Configure queue to for long polling.
    """
    try:
        response = sqs_client.set_queue_attributes(
            QueueUrl=queue_url,
            Attributes={'ReceiveMessageWaitTimeSeconds': msg_rcv_wait_time})
    except ClientError:
        logger.exception(f'Could not configure long polling on - {queue_url}.')
        raise
    else:
        return response


if __name__ == '__main__':
    # CONSTANTS
    QUEUE_URL = '<your-queue-url>'
    MSG_RCV_WAIT_TIME = '20'

    queue = configure_queue_long_polling(QUEUE_URL, MSG_RCV_WAIT_TIME)

    logger.info(f'Queue {QUEUE_URL} configured for long polling.')

We can enable long polling by passing ReceiveMessageWaitTimeSeconds parameter in set_queue_attributes() method.

Here’s an execution output:

15-Enable_Long_Polling_on_an_Existing_Queue

Get SQS message attributes

To get SQS message attributes, you need to use the get_queue_attributes() method of the Boto3 library.

The following code retrieves two attributes named DelaySeconds and VisibilityTimeout:

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sqs_client = boto3.client("sqs", region_name=AWS_REGION)


def get_queue_attributes(queue_url, attribute_names):
    """
    Gets attributes for the specified queue.
    """
    try:
        response = sqs_client.get_queue_attributes(
            QueueUrl=queue_url, AttributeNames=attribute_names)
    except ClientError:
        logger.exception(f'Could not get queue attributes - {queue_url}.')
        raise
    else:
        return response


if __name__ == '__main__':
    # CONSTANTS
    QUEUE_URL = '<your-queue-url>'
    ATTRIBUTE_NAMES = ['DelaySeconds', 'VisibilityTimeout']

    attributes = get_queue_attributes(QUEUE_URL, ATTRIBUTE_NAMES)['Attributes']

    logger.info(f'Queue attributes:\n{json.dumps(attributes, indent=4)}')

The required parameters are:

  • QueueUrlThe queue URL of the queue whose attribute information is retrieved.

Here’s a code execution output:

Get SQS message attributes

Summary

This article covered the essential details about Amazon SQS using the Boto3 library in Python. We’ve also described how to create, receive, and delete SQS queues, concepts of working with SQS messages, long pooling, managing SQS queue permissions, and tags.

How useful was this post?

Click on a star to rate it!

As you found this post useful...

Follow us on social media!

We are sorry that this post was not useful for you!

Let us improve this post!

Tell us how we can improve this post?

Top rated Udemy Courses to improve you career

Subscribe to our updates

Like this article?

Share on facebook
Share on Facebook
Share on twitter
Share on Twitter
Share on linkedin
Share on Linkdin
Share on pinterest
Share on Pinterest

Want to be an author of another post?

We’re looking for skilled technical authors for our blog!

Leave a comment

If you’d like to ask a question about the code or piece of configuration, feel free to use https://codeshare.io/ or a similar tool as Facebook comments are breaking code formatting.