skip to Main Content

this lambda is triggered by sqs and fetches messages from sqs and updates dynamodb table. 2 policies are attached to the lambda – to be called by sqs(get queue) and to put_item into dynamodb table.

import boto3
import json
import logging
import sys

logging.basicConfig(stream=sys.stdout, level=logging.INFO)

queue = boto3.resource('sqs', region_name='us-east-1').get_queue_by_name(QueueName="erjan")
table = boto3.resource('dynamodb', region_name='us-east-1').Table('Votes')

def process_message(message):
    logging.info('----------process_message----------------------')
    logging.info('-------------SQS auto genereated msg------------------------')
    logging.info(type(message))

    try:
        logging.info('----------process_message----------------------')

        payload = message.message_attributes
        voter = payload['voter']['StringValue']
        vote  = payload['vote']['StringValue']
        logging.info("Voter: %s, Vote: %s", voter, vote)
        update_count(vote)
        message.delete()
    except Exception as e:
        print('-----EXCEPTION-----')



def update_count(vote):
    logging.info('update count....')
    cur_count = 0
    if vote == 'b':
        logging.info('vote is b - update...')

        response = table.get_item(Key = {'voter':'count'})
        item = response['Item']
        item['b'] +=1
        table.put_item(Item = item)
            
    elif vote == 'a':
        logging.info('vote is a - update...')
        
        table.update_item(
        Key={'voter':'count'},
        UpdateExpression="ADD a :incr",
        ExpressionAttributeValues={':incr': 1})



def lambda_handler(event,context):


    logging.info('--------inside main-------')

    try:
        logging.info('--------------------------------------')
        logging.info(event)
        logging.info('------------------------inside try - queue.receive_messages-------------')
        messages = queue.receive_messages(MessageAttributeNames=['vote','voter'])
        logging.info(messages)
        logging.info('--------------------------------------')

        for message in messages:
            logging.info('----------every msg -------------')
            print('----------every msg -------------')
            process_message(message)
        
        return {'statusCode': 200, 'body': '{"status": "success"}'}

    except Exception as e:
       logging.error(e)
       return {'statusCode': 500, 'body': '{"status": "error"}'} 
       
  

the test event:

{
  "Records": [
    {
      "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
      "receiptHandle": "MessageReceiptHandle",
      "body": "",
      "attributes": {
        "ApproximateReceiveCount": "1"
       
      },
      "messageAttributes": {
        "vote": {
          "Type": "String",
          "StringValue": "b"
        },
        "voter": {
          "Type": "String",
          "StringValue": "count"
        }
      },
     
      "awsRegion": "us-east-1"
    }
  ]
}
   

running test event only returns result 200 success. But it does not show the logs and prints from lambda_handler() function at all.

I checked cloudwatch logs and same output – just 3 lines "start request_id, end request_id, report". The lambda actually only checks sqs queue(it does exist) and event context are not used.

But it does not print even the basic logging.info(‘——–inside main——-‘) or other logs under try:

logging.info('--------------------------------------')
        logging.info(event)
        logging.info('------------------------inside try - queue.receive_messages-------------')

2

Answers


  1. Chosen as BEST ANSWER

    for some reason just adding sqs trigger in aws console did not work. i had to add resource based policy statement in aws configuration.

    then you can send test msgs(events) from sqs console - "send and receive msg" button.

    under triggers should be 2 things- sqs and lambda.


  2. The payload object is calling the incorrect object.

    payload = message.message_attributes
    

    It should be:

    payload = record['messageAttributes']
    

    That one thing, there is a few points as well. Like you need to parse json object to able to call use json properties. Probably you have single event at the moment when sqs events scale you might need handle them with:

    for record in event['Records']:
        payload = loads(record['body'], parse_float=str)
        voter = record['messageAttributes']['Voter']['stringValue']
    

    I found this blog useful for your case especially examples given in python, step2 https://hevodata.com/learn/sqs-to-dynamodb/

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search