skip to Main Content

I have a Lambda function that performs QLDB transactions. Because QLDB uses OCC (optimistic concurrency control), it’s necessary to add a SQS Fifo in front, making sure that potential conflicting transactions (in my case: transactions attached to a specific user) are processed in sequence.

Using the SQS SendMessage.waitForTaskToken can solve this problem, but it requires the use of multiple workflows.

Is it possible to add to a SQS and wait for the just-added message to be emitted from the SQS? I’m trying to avoid splitting this step function up.

2

Answers


  1. Based on my knowledge, instead of splitting the step function, you can create a solution that sends messages to the SQS FIFO queue and processes them sequentially using a Lambda function. The Lambda function can ensure that transactions attached to a specific user are processed one at a time.

    Let’s see some ways you can go about this:

    1. Create SQS FIFO Queue
    2. Send a message to SQS FIFO Queue
    3. Trigger the Lamda function
    4. Sequentially process the messages.
    #SendMessage to SQS FIFO:
    
    import boto3
    import json
    
    sqs = boto3.client('sqs')
    queue_url = 'https://sqs.<region>.amazonaws.com/<account-id>/<queue-name>.fifo'
    
    def send_message(user_id, transaction_data):
        response = sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(transaction_data),
            MessageGroupId=str(user_id),
            MessageDeduplicationId=str(user_id) + "-" + str(transaction_data['transaction_id'])
        )
        return response
    
    
    #Lambda Function to Process SQS Messages:
    import boto3
    import json
    from qldb_helpers import process_transaction  # Your QLDB transaction logic
    
    def lambda_handler(event, context):
        for record in event['Records']:
            message_body = json.loads(record['body'])
            user_id = record['messageAttributes']['MessageGroupId']['stringValue']
            
            # Process the QLDB transaction
            process_transaction(user_id, message_body)
            
            # Delete the message after successful processing
            sqs = boto3.client('sqs')
            sqs.delete_message(
                QueueUrl=record['eventSourceARN'],
                ReceiptHandle=record['receiptHandle']
            )
    Login or Signup to reply.
  2. Lambda Function: This function is for processing QLDB transactions. Before executing the transaction, it publishes a message to the SQS FIFO queue, making sure the message group ID is the ID of that user.

    SQS FIFO Queue: This queue ensures that messages from the same user will be processed in order. When a message is sent to the queue, it should wait until the message is processed before sending the next.

    Seperate Lambda Function (Optional): You can have a separate lambda function for listening to messages from the SQS queue and do the processing. But since you want to avoid splitting your Step Function, you can directly have the SQS queue integrated with the Step Function.

    Step Function: The Step Function arranges the entire thing. It starts by using the Lambda function responsible for QLDB transactions. Then it waits for a message to be emitted from the SQS FIFO queue before doing the next step.

    import * as cdk from '@aws-cdk/core';
    import * as lambda from '@aws-cdk/aws-lambda';
    import * as sqs from '@aws-cdk/aws-sqs';
    import * as sfn from '@aws-cdk/aws-stepfunctions';
    import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';
    
    export class QldbTransactionStack extends cdk.Stack {
      constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
        super(scope, id, props);
    
        // Create the SQS FIFO queue
        const queue = new sqs.Queue(this, 'TransactionQueue', {
          fifo: true
        });
    
        // Create the Lambda function for processing QLDB transactions
        const transactionFunction = new lambda.Function(this, 'TransactionFunction', {
          runtime: lambda.Runtime.NODEJS_14_X,
          handler: 'transaction.handler',
          code: lambda.Code.fromAsset('path/to/lambda/code'),
          environment: {
            QUEUE_URL: queue.queueUrl
          }
        });
    
        // Grant permissions to the Lambda function to send messages to the SQS queue
        queue.grantSendMessages(transactionFunction);
    
        // Define the Step Function
        const definition = new tasks.LambdaInvoke(this, 'ProcessTransaction', {
          lambdaFunction: transactionFunction,
          outputPath: '$.Payload'
        });
    
        // Add the wait for SQS message task
        const waitForMessage = new sfn.Wait(this, 'WaitForMessage', {
          time: sfn.WaitTime.secondsPath('$.wait_time_seconds')
        });
    
        // Define Step Function state machine
        const stateMachine = new sfn.StateMachine(this, 'TransactionStateMachine', {
          definition: definition
            .next(waitForMessage)
            .next(new tasks.SqsSendMessage(this, 'SendMessageToSQS', {
              queue,
              messageBody: sfn.TaskInput.fromJsonPathAt('$.message'),
              messageGroupId: sfn.JsonPath.stringAt('$.user_id'),
              messageDeduplicationId: sfn.JsonPath.stringAt('$.message_id')
            }))
            .next(definition),
          timeout: cdk.Duration.minutes(5)
        });
      }
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search