skip to Main Content

I’m currently working on a project where I’ve set up AWS Lambda functions using the Serverless Framework to process jobs from an SQS queue and create entries in a DynamoDB table. The deployment seems to be successful, and there are no suspicious behaviors in the logs. However, I’m encountering an issue where the SQS queue remains empty, and the DynamoDB table shows no items.

This is what I’m trying to do. I’m done with L4 but I’m stuck in L5.

serverless.yml

service: challenge1
frameworkVersion: '3'

provider:
  name: aws
  runtime: python3.8
  lambdaHashingVersion: '20201221'
  iamRoleStatements:
    - Effect: "Allow"
      Action: "dynamodb:*"
      Resource: "*"
    - Effect: "Allow"
      Action: "apigateway:*"
      Resource: "*"
    - Effect: "Allow"
      Action: "s3:*"
      Resource: "*"
    - Effect: "Allow"
      Action: "sqs:*"
      Resource: "*"
  environment:
    DYNAMODB_CARDS_TABLE_NAME: challenge1 
    S3_BUCKETNAME: serverlesschallenge-darla
    QUEUE_URL: https://sqs.us-east-1.amazonaws.com/874957933250/serverlesschallenge-darla

functions:
  prepareSQSjobS3:
    handler: handler.prepare_sqs_job
    events:
      - s3:
          bucket: serverlesschallenge-darla
          event: s3:ObjectCreated:Put
          existing: true
          rules:
            - suffix: .csv
  prepareSQSjobSQS:
    handler: handler.process_sqs_job
    events:
      - sqs:
          arn: "arn:aws:sqs:us-east-1:874957933250:serverlesschallenge-darla"

package:
  exclude:
    - venv/**
    - node_modules/**

resources:
  Resources:
    LoyaltyCardDynamodbTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        AttributeDefinitions:
          - AttributeName: card_number
            AttributeType: S
          - AttributeName: email
            AttributeType: S
        KeySchema:
          - AttributeName: card_number
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST
        TableName: ${self:provider.environment.DYNAMODB_CARDS_TABLE_NAME}
        GlobalSecondaryIndexes:
            - IndexName: emailIndex
              KeySchema:
                - AttributeName: email
                  KeyType: HASH
              Projection:
                ProjectionType: ALL

plugins:
  - serverless-python-requirements

handler.py

import json
import string
import random
import os
import boto3
import urllib.parse
import csv
import sys
from io import StringIO

from dynamodb_gateway import DynamodbGateway

s3 = boto3.client('s3')
sqs = boto3.client('sqs')
queue_url = os.getenv('QUEUE_URL')

#aws lambda trigger when theres new s3 file. reads line by line
def prepare_sqs_job(event, context):
    try:
        print(f"Received S3 event: {json.dumps(event)}")

        bucket_name = os.getenv("S3_BUCKETNAME")

        # Get the object details from the S3 event
        s3_record = event['Records'][0]['s3']
        bucket = s3_record['bucket']['name']
        file_key = urllib.parse.unquote_plus(s3_record['object']['key'], encoding='utf-8')

        # Download the file from S3
        response = s3.get_object(Bucket=bucket, Key=file_key)
        file_content = response['Body'].read().decode('utf-8')
        print(f"Object uploaded: s3://{bucket}/{file_key}")

        # Process CSV file and send each row as a message to SQS
        rows = [row for i, row in enumerate(csv.reader(StringIO(file_content))) if i > 0]

        message_attrs = {'AttributeName': {'StringValue': 'AttributeValue', 'DataType': 'String'}}
        for row in rows:
            print(row)
            sqs.send_message(
                QueueUrl=queue_url,
                MessageBody=row[0],
                MessageAttributes=message_attrs,
            )

        message = 'Messages accepted!'
        print(message)
        response = {"statusCode": 200, "body": json.dumps({"status": "success", "message": message})}

    except Exception as e:
        print(f'Error: {str(e)}')
        response = {"statusCode": 500, "body": json.dumps({"status": "error", "message": str(e)})}

    return response

def process_sqs_job(event, context):
    try:
        print(f"Received SQS event: {json.dumps(event)}")

        table_name = os.getenv("DYNAMODB_CARDS_TABLE_NAME")

        for record in event['Records']:
            # Parse JSON content from SQS message
            message_body = json.loads(record['body'])

            if isinstance(message_body, dict):
                # Extract necessary information from the message
                card_number = message_body.get('card_number')
                first_name = message_body.get('first_name')
                last_name = message_body.get('last_name')
                email = message_body.get('email')
                points = message_body.get('points')

                # Check if the email already exists in the DynamoDB table
                if email_exists(table_name, email):
                    print(f"Email {email} already used. Skipping...")
                    continue

                # Create a loyalty card in DynamoDB
                loyalty_card = {
                    "card_number": card_number,
                    "first_name": first_name,
                    "last_name": last_name,
                    "email": email,
                    "points": points
                }

                DynamodbGateway.upsert(
                    table_name=table_name,
                    mapping_data=[loyalty_card],
                    primary_keys=["card_number"]
                )

                print(f"Loyalty card created: {loyalty_card}")

        message = 'Messages processed successfully!'
        print(message)
        response = {"statusCode": 200, "body": json.dumps({"status": "success", "message": message})}

    except Exception as e:
        print(f'Error: {str(e)}')
        response = {"statusCode": 500, "body": json.dumps({"status": "error", "message": str(e)})}

    return response


def email_exists(table_name, email):
    # Check if the email already exists in the DynamoDB table using GSI
    result = DynamodbGateway.query_index_by_partition_key(
        index_name="emailIndex",
        table_name=table_name,
        partition_key_name="email",
        partition_key_query_value=email
    )

    return bool(result)

Any insights or suggestions on troubleshooting this issue would be greatly appreciated:) I’m unsure why the SQS queue isn’t receiving anything, and the DynamoDB table remains empty despite seemingly successful Lambda function executions. Thanks!

I tried

            try:
                res = sqs.send_message(
                    QueueUrl=queue_url,
                    MessageBody=row[0],
                    MessageAttributes=message_attrs,
                )
                print(res)
            except Exception as e:
                print(e)

and here are the CloudWatch logs for both functions.

/aws/lambda/challenge1-dev-prepareSQSjobSQS
2023/12/10/[$LATEST]f6cc4f59625f419a8dfa981cd3dcea3b

/aws/lambda/challenge1-dev-prepareSQSjobS3
2023/12/10/[$LATEST]e122778a13f6432196d82fd9c81a5db6

2

Answers


  1. How do you know your API call to push files to sqs succeeds it not. You need to add additional error handling and logging in your Lambda to ensure things are running as you expect.

    Change:

    sqs.send_message(
                    QueueUrl=queue_url,
                    MessageBody=row[0],
                    MessageAttributes=message_attrs,
                )
    

    To

    try:
        res = sqs.send_message(
                    QueueUrl=queue_url,
                    MessageBody=row[0],
                    MessageAttributes=message_attrs,
                )
        print(res)
    except Exception as e:
        print(e)
    
    Login or Signup to reply.
  2. Based on the logs of process_sqs_job. It seems that the body is string and not dict. Its something like body='58544...'. Because of this the if condition if isinstance(message_body, dict): will fail as its string and not dict.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search