I’m currently working on a project where I’ve set up AWS Lambda functions using the Serverless Framework to process jobs from an SQS queue and create entries in a DynamoDB table. The deployment seems to be successful, and there are no suspicious behaviors in the logs. However, I’m encountering an issue where the SQS queue remains empty, and the DynamoDB table shows no items.
This is what I’m trying to do. I’m done with L4 but I’m stuck in L5.
serverless.yml
service: challenge1
frameworkVersion: '3'
provider:
name: aws
runtime: python3.8
lambdaHashingVersion: '20201221'
iamRoleStatements:
- Effect: "Allow"
Action: "dynamodb:*"
Resource: "*"
- Effect: "Allow"
Action: "apigateway:*"
Resource: "*"
- Effect: "Allow"
Action: "s3:*"
Resource: "*"
- Effect: "Allow"
Action: "sqs:*"
Resource: "*"
environment:
DYNAMODB_CARDS_TABLE_NAME: challenge1
S3_BUCKETNAME: serverlesschallenge-darla
QUEUE_URL: https://sqs.us-east-1.amazonaws.com/874957933250/serverlesschallenge-darla
functions:
prepareSQSjobS3:
handler: handler.prepare_sqs_job
events:
- s3:
bucket: serverlesschallenge-darla
event: s3:ObjectCreated:Put
existing: true
rules:
- suffix: .csv
prepareSQSjobSQS:
handler: handler.process_sqs_job
events:
- sqs:
arn: "arn:aws:sqs:us-east-1:874957933250:serverlesschallenge-darla"
package:
exclude:
- venv/**
- node_modules/**
resources:
Resources:
LoyaltyCardDynamodbTable:
Type: 'AWS::DynamoDB::Table'
Properties:
AttributeDefinitions:
- AttributeName: card_number
AttributeType: S
- AttributeName: email
AttributeType: S
KeySchema:
- AttributeName: card_number
KeyType: HASH
BillingMode: PAY_PER_REQUEST
TableName: ${self:provider.environment.DYNAMODB_CARDS_TABLE_NAME}
GlobalSecondaryIndexes:
- IndexName: emailIndex
KeySchema:
- AttributeName: email
KeyType: HASH
Projection:
ProjectionType: ALL
plugins:
- serverless-python-requirements
handler.py
import json
import string
import random
import os
import boto3
import urllib.parse
import csv
import sys
from io import StringIO
from dynamodb_gateway import DynamodbGateway
s3 = boto3.client('s3')
sqs = boto3.client('sqs')
queue_url = os.getenv('QUEUE_URL')
#aws lambda trigger when theres new s3 file. reads line by line
def prepare_sqs_job(event, context):
try:
print(f"Received S3 event: {json.dumps(event)}")
bucket_name = os.getenv("S3_BUCKETNAME")
# Get the object details from the S3 event
s3_record = event['Records'][0]['s3']
bucket = s3_record['bucket']['name']
file_key = urllib.parse.unquote_plus(s3_record['object']['key'], encoding='utf-8')
# Download the file from S3
response = s3.get_object(Bucket=bucket, Key=file_key)
file_content = response['Body'].read().decode('utf-8')
print(f"Object uploaded: s3://{bucket}/{file_key}")
# Process CSV file and send each row as a message to SQS
rows = [row for i, row in enumerate(csv.reader(StringIO(file_content))) if i > 0]
message_attrs = {'AttributeName': {'StringValue': 'AttributeValue', 'DataType': 'String'}}
for row in rows:
print(row)
sqs.send_message(
QueueUrl=queue_url,
MessageBody=row[0],
MessageAttributes=message_attrs,
)
message = 'Messages accepted!'
print(message)
response = {"statusCode": 200, "body": json.dumps({"status": "success", "message": message})}
except Exception as e:
print(f'Error: {str(e)}')
response = {"statusCode": 500, "body": json.dumps({"status": "error", "message": str(e)})}
return response
def process_sqs_job(event, context):
try:
print(f"Received SQS event: {json.dumps(event)}")
table_name = os.getenv("DYNAMODB_CARDS_TABLE_NAME")
for record in event['Records']:
# Parse JSON content from SQS message
message_body = json.loads(record['body'])
if isinstance(message_body, dict):
# Extract necessary information from the message
card_number = message_body.get('card_number')
first_name = message_body.get('first_name')
last_name = message_body.get('last_name')
email = message_body.get('email')
points = message_body.get('points')
# Check if the email already exists in the DynamoDB table
if email_exists(table_name, email):
print(f"Email {email} already used. Skipping...")
continue
# Create a loyalty card in DynamoDB
loyalty_card = {
"card_number": card_number,
"first_name": first_name,
"last_name": last_name,
"email": email,
"points": points
}
DynamodbGateway.upsert(
table_name=table_name,
mapping_data=[loyalty_card],
primary_keys=["card_number"]
)
print(f"Loyalty card created: {loyalty_card}")
message = 'Messages processed successfully!'
print(message)
response = {"statusCode": 200, "body": json.dumps({"status": "success", "message": message})}
except Exception as e:
print(f'Error: {str(e)}')
response = {"statusCode": 500, "body": json.dumps({"status": "error", "message": str(e)})}
return response
def email_exists(table_name, email):
# Check if the email already exists in the DynamoDB table using GSI
result = DynamodbGateway.query_index_by_partition_key(
index_name="emailIndex",
table_name=table_name,
partition_key_name="email",
partition_key_query_value=email
)
return bool(result)
Any insights or suggestions on troubleshooting this issue would be greatly appreciated:) I’m unsure why the SQS queue isn’t receiving anything, and the DynamoDB table remains empty despite seemingly successful Lambda function executions. Thanks!
I tried
try:
res = sqs.send_message(
QueueUrl=queue_url,
MessageBody=row[0],
MessageAttributes=message_attrs,
)
print(res)
except Exception as e:
print(e)
and here are the CloudWatch logs for both functions.
/aws/lambda/challenge1-dev-prepareSQSjobSQS
2023/12/10/[$LATEST]f6cc4f59625f419a8dfa981cd3dcea3b
/aws/lambda/challenge1-dev-prepareSQSjobS3
2023/12/10/[$LATEST]e122778a13f6432196d82fd9c81a5db6
2
Answers
How do you know your API call to push files to sqs succeeds it not. You need to add additional error handling and logging in your Lambda to ensure things are running as you expect.
Change:
To
Based on the logs of
process_sqs_job
. It seems that thebody
is string and not dict. Its something likebody='58544...'
. Because of this the if conditionif isinstance(message_body, dict):
will fail as its string and not dict.