Orchestrating Async Workflows with AWS Step Functions and Flask - Explore

Python
Flask
AWS Step Functions
AWS Lambda functions
Orchestrating Async Workflows with AWS Step Functions and Flask

by: Sharath Vijayan Nair

January 06, 2025

titleImage

Introduction

In modern microservices architecture, handling asynchronous workflows is a critical requirement for scalability and modularity. This blog will walk you through setting up a microservice architecture where:

  • A Python Flask-based backend service handles API requests.
  • AWS Step Functions orchestrate asynchronous workflows.
  • AWS Lambda functions trigger the workflows and execute specific tasks.

Architecture Overview

  1. Flask Service: Acts as the backend entry point, invoking AWS Lambda functions to trigger workflows.
  2. AWS Step Functions: Manages the sequence of tasks defined in an orchestrator template.
  3. AWS Lambda: Acts as the intermediary, finding and triggering the appropriate AWS Step Functions state machine.

Here’s how you can set up this architecture step by step.


Step 1: Set Up Your Flask Backend Service

First, create a Python Flask application that includes a function to invoke an AWS Lambda function asynchronously.

Code Example: backend-service

from flask import Flask, request, jsonify, current_app
import boto3
import json

app = Flask(__name__)

def call_lambda(func_name, payload):
    client = boto3.client("lambda")
    client.invoke(
        FunctionName=func_name,
        InvocationType="Event",
        Payload=payload
    )
    return None

@app.route("/trigger-workflow", methods=["POST"])
def trigger_workflow():
    payload = request.get_json()
    lambda_name = f"trigger-lambda-{current_app.config['ENVIRONMENT']}"
    call_lambda(lambda_name, json.dumps(payload))
    return jsonify({"message": "Workflow triggered!"}), 200

if __name__ == "__main__":
    app.config["ENVIRONMENT"] = "dev"
    app.run(debug=True)

Steps:

  1. Install Flask and Boto3:
    pip install flask boto3
  2. Save the code as app.py and run the Flask application:
    python app.py
  3. Test the endpoint by sending a POST request to /trigger-workflow with JSON payload.

Step 2: Create the Lambda Function

The Lambda function acts as the trigger for your AWS Step Functions state machine.

Code Example: trigger-lambda.py

import os
import json
import boto3
from botocore.exceptions import ClientError
import logging

logger = logging.getLogger(__name__)

def find(stepfunctions_client, state_machine_name):
    """
    Finds a state machine by name.
    """
    state_machine_arn = None
    try:
        paginator = stepfunctions_client.get_paginator("list_state_machines")
        for page in paginator.paginate():
            for machine in page["stateMachines"]:
                if machine["name"] == state_machine_name:
                    state_machine_arn = machine["stateMachineArn"]
                    break
            if state_machine_arn:
                break
        if state_machine_arn:
            logger.info("Found state machine %s with ARN %s.", state_machine_name, state_machine_arn)
        else:
            logger.info("Couldn't find state machine %s.", state_machine_name)
    except ClientError:
        logger.exception("Couldn't find state machine %s.", state_machine_name)
        raise
    return state_machine_arn

def lambda_handler(event, context):
    sfn_client = boto3.client("stepfunctions")
    state_machine_arn = find(sfn_client, f'orchestration-{os.environ["ENVIRONMENT"]}')
    sfn_client.start_execution(
        stateMachineArn=state_machine_arn,
        input=json.dumps(event)
    )
    return {"statusCode": 200, "body": json.dumps("Step Function Triggered!")}

Steps:

  1. Save this file as trigger-lambda.py.
  2. Deploy it to AWS Lambda with an IAM role that has permissions to invoke Step Functions.
  3. Set the environment variable ENVIRONMENT to match your setup (e.g., dev, prod).

Step 3: Define Your Step Functions Workflow

Create a JSON template to define your Step Functions workflow.

Example: orchestrator.tmpl.json

{
  "Comment": "Workflow",
  "StartAt": "Task1",
  "States": {
    "Task1": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:Task1",
      "Next": "Task2"
    },
    "Task2": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:Task2",
      "End": true
    }
  }
}

Steps:

  1. Replace REGION and ACCOUNT_ID with your AWS region and account ID.
  2. Save the file and deploy it to AWS Step Functions.
  3. Name the state machine orchestration-ENVIRONMENT.

Step 4: Test the Integration

  1. Start the Flask server and send a POST request to /trigger-workflow:
    curl -X POST http://127.0.0.1:5000/trigger-workflow -H "Content-Type: application/json" -d '{"key": "value"}'
  2. Check the AWS Step Functions console to see the triggered workflow and its execution status.
  3. Review the logs in AWS CloudWatch for both the Lambda function and Step Functions.

Conclusion

Combining Flask, AWS Lambda, and Step Functions provides a powerful way to manage asynchronous workflows. This architecture is scalable, modular, and can be tailored to various use cases. Experiment with this setup and adapt it to your project requirements.

contact us

Get started now

Get a quote for your project.
logofooter
title_logo

USA

Edstem Technologies LLC
254 Chapman Rd, Ste 208 #14734
Newark, Delaware 19702 US

INDIA

Edstem Technologies Pvt Ltd
Office No-2B-1, Second Floor
Jyothirmaya, Infopark Phase II
Ernakulam, Kerala 682303
iso logo

© 2024 — Edstem All Rights Reserved

Privacy PolicyTerms of Use