Working with Step Functions in Python using Boto3

Tuvshinsanaa Tuul

Tuvshinsanaa Tuul

0
(0)

AWS Step Functions is a visual workflow service that allows you to orchestrate AWS services, automate business processes, and build serverless applications. Step Functions workflows manage failures, retries, parallelization, service integrations, and observability to help developers focus on higher-value business logic. This article will cover how to manage AWS Step Functions workflows (state machines) using the Boto3 library (Python SDK for AWS).

What is Step Functions

AWS Step Functions is an orchestration service that allows you to use AWS Lambda functions and other AWS services to build and automate business-critical workflows. It allows you to describe your processes as a series of event-driven steps, each of which is powered by AWS Lambda or other supported AWS services and responsible for its own part of the process. Every workflow is described in JSON-like syntax.

We’ve recently covered Step Functions in the following articles:

In addition to those articles, we strongly recommend you to watch the “Building Business Workflows with AWS Step Functions” re:Invent video:

Working with Step Functions with Boto3

As an example, let’s create a workflow that consists of two Lambda functions and uses the Choice state.

Demo State Machine

  • ProcessTransaction – is the Choice state that uses the TransactionType input value to choose the next step (process purchase or process refund)
  • ProcessPurchase – is the Task implemented by the Lambda function
  • ProcessRefund – is the Task implemented by the Lambda function

Here’s workflow visualization:

Working with Step Functions in Python using Boto3 - Demo workflow

Here’s workflow definition:

{
    'Comment': 'Transaction Processor State Machine',
    'StartAt': 'ProcessTransaction',
    'States': {
        'ProcessTransaction': {
            'Type': 'Choice',
            'Choices': [
                {
                    'Variable': '$.TransactionType',
                    'StringEquals': 'PURCHASE',
                    'Next': 'ProcessPurchase'
                },
                {
                    'Variable': '$.TransactionType',
                    'StringEquals': 'REFUND',
                    'Next': 'ProcessRefund'
                }
            ]
        },
        'ProcessPurchase': {
            'Type': 'Task',
            'Resource': process_purchase_arn,
            'End': True
        },
        'ProcessRefund': {
            'Type': 'Task',
            'Resource': process_refund_arn,
            'End': True
        }
    }
}

Create Step Functions workflow

To create the Step Functions workflows, you need to use the create_state_machine() method for the Boto3 SFN client.

For our demo, we need to create two Lambda functions for our State Machine first.

For more information on working with AWS Lambda using Boto3, please look at our Working with AWS Lambda in Python using the Boto3 article.

Process purchase Lambda

This is the demo Lambda function that is supposed to process a purchase:

def lambda_handler(event, context):
    print('event:', event)

    response = {
        'TransactionType': event['TransactionType'],
        'Message': 'From Process Purchase',
    }

    return response

Process refund Lambda

This is the demo Lambda function that is supposed to process a refund:

def lambda_handler(event, context):
    print('event:', event)

    response = {
        'TransactionType': event['TransactionType'],
        'Message': 'From Process Refund',
    }

    return response

Now, we need to zip our functions and upload them to the AWS using Boto3:

Zip Lambda functions
Zip Lambda functions

To create a Lambda function zip archive from Python code, you need to use the shutil.make_archive() method.

import shutil
shutil.make_archive(output_filename, 'zip', dir_name)

Deploying Lambda functions

Next, we need to define a Lambda function IAM role with the Lambda execution permissions.

For simplicity, we’ll use AWS managed LambdaBasicExecution role.

For more information on working with IAM in Python using Boto3, check out our Working with IAM in Python using the Boto3 article.

import boto3

iam_client = boto3.client('iam')
lambda_client = boto3.client('lambda')

with open('process_purchase.zip', 'rb') as f:
	process_purchase_zip = f.read()

with open('process_refund.zip', 'rb') as f:
	process_refund_zip = f.read()
  
role = iam_client.get_role(RoleName='LambdaBasicExecution')

process_purchase_response = lambda_client.create_function(
    FunctionName='process_purchase',
    Runtime='python3.9',
    Role=role['Role']['Arn'],
    Handler='process_purchase.lambda_handler',
    Code=dict(ZipFile=process_purchase_zip),
    Timeout=300, # Maximum allowable timeout
)

process_refund_response = lambda_client.create_function(
    FunctionName='process_refund',
    Runtime='python3.9',
    Role=role['Role']['Arn'],
    Handler='process_refund.lambda_handler',
    Code=dict(ZipFile=process_refund_zip),
    Timeout=300, # Maximum allowable timeout
)

print(process_purchase_response)
print(process_refund_response)

Here’s an execution output:

Create Lambda Functions
Create Lambda Functions

Step Functions workflow execution IAM role

Now, let’s grant Step Functions workflow (state machine) permissions to trigger Lambda functions:

import boto3
import json 

iam = boto3.client('iam')

role_policy = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "states.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

response = iam.create_role(
  RoleName='StepFunctionLambdaBasicExecution',
  AssumeRolePolicyDocument=json.dumps(role_policy),
)

attach_policy_response = iam.attach_role_policy(
    RoleName='StepFunctionLambdaBasicExecution',
    PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaRole'
)

print(response)
print(attach_policy_response)

Here’s an execution output:

Grant Step Functions permissions to trigger Lambda functions
Grant Step Functions permissions to trigger Lambda functions

Step Functions workflow deployment

Finally, we can deploy the Step Functions workflow:

import boto3
import json

sfn_client = boto3.client('stepfunctions')
lambda_client = boto3.client('lambda')
iam_client = boto3.client('iam')

process_purchase_lambda = lambda_client.get_function(
    FunctionName='process_purchase'
)

process_refund_lambda = lambda_client.get_function(
    FunctionName='process_refund'
)

role = iam_client.get_role(RoleName='LambdaBasicExecution')

process_purchase_arn = process_purchase_lambda['Configuration']['FunctionArn']
process_refund_arn = process_refund_lambda['Configuration']['FunctionArn']

asl_definition = {
    'Comment': 'Transaction Processor State Machine',
    'StartAt': 'ProcessTransaction',
    'States': {
        'ProcessTransaction': {
            'Type': 'Choice',
            'Choices': [
                {
                    'Variable': '$.TransactionType',
                    'StringEquals': 'PURCHASE',
                    'Next': 'ProcessPurchase'
                },
                {
                    'Variable': '$.TransactionType',
                    'StringEquals': 'REFUND',
                    'Next': 'ProcessRefund'
                }
            ]
        },
        'ProcessPurchase': {
            'Type': 'Task',
            'Resource': process_purchase_arn,
            'End': True
        },
        'ProcessRefund': {
            'Type': 'Task',
            'Resource': process_refund_arn,
            'End': True
        }
    }
}

response = sfn_client.create_state_machine(
    name='ProcessTransactionStateMachine',
    definition=json.dumps(asl_definition),
    roleArn=role['Role']['Arn']
)

print(response)

Here’s an execution output:

Deploy the Step Functions workflow
Deploy the Step Functions workflow

Describe Step Functions workflow

To describe the Step Functions workflow, you need to use the describe_state_machine() method for the Boto3 SFN client.

import boto3

sfn_client = boto3.client('stepfunctions')

state_machine_arn = 'arn:aws:states:ap-northeast-1:585584209241:stateMachine:ProcessTransactionStateMachine'

response = sfn_client.describe_state_machine(
    stateMachineArn=state_machine_arn
)

print(response)

Here’s an execution output:

Describe Step Functions using Boto3
Describe Step Functions using Boto3

Execute Step Functions workflow

To execute the Step Functions workflow, you need to use the start_execution() method for the Boto3 SFN client.

In our demo example, the Step Functions workflow expects the following JSON data structure as an input:

{
    TransactionType: "PURCHASE"
}

Depending on the TransactionType, State Machine will invoke different Lambda functions.

Expected transaction types are:

  • PURCHASE
  • REFUND

Note: if there’s no input data for the Step Function workflow, you still have to provide an empty JSON structure, for example: "{}".

import boto3
import json

sfn_client = boto3.client('stepfunctions')

state_machine_arn = 'arn:aws:states:ap-northeast-1:585584209241:stateMachine:ProcessTransactionStateMachine'

response = sfn_client.start_execution(
    stateMachineArn=state_machine_arn,
    name='test1',
    input=json.dumps({ 'TransactionType': 'PURCHASE' })
)

print(response)

Here’s an execution output:

Execute Step Functions workflow
Execute Step Functions workflow

Based on the provided input, you’ll see different execution results:

Step Functions execution graph
Step Functions execution graph

List Step Functions workflows

To list Step Functions workflows, you need to use the list_state_machines() method for the Boto3 SFN client.

import boto3
import json

sfn_client = boto3.client('stepfunctions')

response = sfn_client.list_state_machines()

print(response)

Here’s an execution output:

List Step Functions workflows
List Step Functions workflows

Delete Step Functions workflow

To list Step Functions workflows, you need to use the delete_state_machine() method for the Boto3 SFN client.

import boto3

sfn_client = boto3.client('stepfunctions')

state_machine_arn = 'arn:aws:states:ap-northeast-1:585584209241:stateMachine:ProcessTransactionStateMachine'

response = sfn_client.delete_state_machine(
    stateMachineArn=state_machine_arn
)

print(response)

Here’s an execution output:

Delete Step Functions workflow
Delete Step Functions workflow

Summary

In this article, we’ve covered how to manage AWS Step Functions using the Python Boto3 library. We’ve created a demo Step Function workflow that invokes different Lambda functions depending on the provided input.

How useful was this post?

Click on a star to rate it!

As you found this post useful...

Follow us on social media!

We are sorry that this post was not useful for you!

Let us improve this post!

Tell us how we can improve this post?

Top rated Udemy Courses to improve you career

Subscribe to our updates

Like this article?

Share on facebook
Share on Facebook
Share on twitter
Share on Twitter
Share on linkedin
Share on Linkdin
Share on pinterest
Share on Pinterest

Want to be an author of another post?

We’re looking for skilled technical authors for our blog!

Leave a comment

If you’d like to ask a question about the code or piece of configuration, feel free to use https://codeshare.io/ or a similar tool as Facebook comments are breaking code formatting.