skip to Main Content

I am writing a very simple lambda in JS. Its main purpose is to take the message from SQS (that i am triggering manually from AWS console) and then publish to kinesis stream through a put record. I have given the required permission to the lambda for PutRecord. But most of the time i am getting the below error.

  "errorType": "Runtime.UnhandledPromiseRejection",
  "errorMessage": "Error [ERR_HTTP2_STREAM_CANCEL]: The pending stream has been canceled (caused by: connect ETIMEDOUT 34.223.45.15:443)",
  "trace": [
    "Runtime.UnhandledPromiseRejection: Error [ERR_HTTP2_STREAM_CANCEL]: The pending stream has been canceled (caused by: connect ETIMEDOUT 34.223.45.15:443)",
    "    at process.<anonymous> (file:///var/runtime/index.mjs:1276:17)",
    "    at process.emit (node:events:517:28)",
    "    at emit (node:internal/process/promises:149:20)",
    "    at processPromiseRejections (node:internal/process/promises:283:27)",
    "    at process.processTicksAndRejections (node:internal/process/task_queues:96:32)"
  ]
}

However surprisingly sometimes it would work too especially when i deploy by changing the partitionKey to a different random value.

import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis";

export const KINESIS_CLIENT = new KinesisClient([
  {
    httpOptions: {
      connectTimeout: 10000,
    },
    maxRetries: 1,
    region: 'us-west-2',
  },
]);

export const handler = async (event) => {

  const promises = [];

  for (const {messageId, body} of event.Records) {
    processEvent(body)
    promises.push(processEvent(body));
  }

  const responses = await Promise.allSettled(promises);
  responses.forEach((response) => {
    if (response.status !== "fulfilled") {
      throw Error(JSON.stringify(responses));
    }
  });
};

export async function processEvent(body) {
  const newBody = JSON.parse(body);
  newBody['field'] = 'Random';

  await KINESIS_CLIENT.send(
      new PutRecordCommand({
        Data: new TextEncoder().encode(JSON.stringify(newBody)),
        StreamName: 'InputEventStream',
        PartitionKey: '3', <--- I change this and sometimes after redeployment it seems to work
      }),
  );
}```

2

Answers


  1. While you already set connectTimeout to 10 seconds, you might want to set both the socketTimeout (for read/write operations) and maxAttempts (for retries) explicitly.

    export const KINESIS_CLIENT = new KinesisClient({
      region: 'us-west-2',
      maxAttempts: 3, // Allows more retry attempts
      requestHandler: new NodeHttpHandler({
        connectionTimeout: 20000, // 20 seconds connection timeout
        socketTimeout: 20000 // 20 seconds socket timeout
      })
    });
    

    Improved Error Handling

    promises.push(processEvent(body).catch((err) => {
      console.error("Error processing record:", err);
      return err; // Capture error instead of failing immediately
    }));
    
    Login or Signup to reply.
  2. You are already setting connectTimeout: 10000 in your KinesisClient. This seems fine, but it’s possible that your function needs more time to establish the connection, especially under high load or poor network conditions.

    Increase timeout settings: You can try increasing the timeout value to see if it helps.
    Retry logic: AWS SDK automatically retries on failures, but you can add more robust retry logic or backoff strategies.
    Example of adjusting httpOptions for a longer timeout:

    js
    Copy code
    export const KINESIS_CLIENT = new KinesisClient({
    region: ‘us-west-2’,
    maxRetries: 3, // More retries in case of failure
    httpOptions: {
    connectTimeout: 20000, // Increase to 20 seconds
    timeout: 30000, // Increase overall request timeout
    },
    });

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search