skip to Main Content

I use Azure Service Bus Queues to collect messages that need to be posted to an API. I’m using an Azure trigger function on the Queue to process incoming messages.
Unfortunately, the API is unreliable so I’m working on adding my own retry schedule.

For example, the first time a message couldn’t be processed, I want to schedule another retry in 15 minutes, and after that 1h, 2h and so on. Once the maxDeliveryCount has been reached, messages should be sent to the DLQ.

The scheduling of the messages is working, however, each scheduled message is seen as a new message, meaning that the deliveryCount is always 1. I’m using the @azure/service-bus NPM package to do the rescheduling in the Azure function.

Therefore, my question is:
How can I reschedule SB Queue messages in an Azure trigger function while increasing the deliveryCount of those messages?

(I’m aware that I could add my own deliveryCount property to the message, but I’d rather not pollute the message body with metadata.)

To provide some more details, this is a MRE:

const serviceBusQueueTrigger: AzureFunction = async function (
  context: Context,
  msg: any
): Promise<void> {
  context.log("ServiceBus queue trigger function processed message", msg);


  const serviceBusClient = new ServiceBusClient(<connection_string>);
  const sender = serviceBusClient.createSender(<queue_name>);

  context.log("dequeueCount: ", context.bindingData.deliveryCount);

  try {
    // POST to API
  } catch (error) {
    
    // if an error is thrown here, trigger resubmits immediately
    // I want to wait proportionally to current deliveryCount
    
    await sender.scheduleMessages(
      {
        body: msgData,
        contentType: "application/json",
      },
      // just an example of schedule time
      new Date(Date.now() + Math.pow(2, context.bindingData.deliveryCount) * 60 * 1000)
    );
  }
  
  await sender.close();
}

===

Edit: Although the Microsoft Docs still contain (vague) details about the built-in retry mechanism (which @Manish linked in his answer), the functionality seems no longer supported. The relevant discussion can be found here.

Therefore, my initial question stands, how can I build this mechanism using an Azure Service Bus Trigger Function? Thanks!

host.json

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.0.0, 5.0.0)"
  },
  "concurrency": {
    "dynamicConcurrencyEnabled": true,
    "snapshotPersistenceEnabled": true
  },
  "extensions": {
    "serviceBus": {
      "clientRetryOptions": {
        "mode": "exponential",
        "tryTimeout": "00:01:00",
        "delay": "00:01:00",
        "maxDelay": "03:00:00",
        "maxRetries": 3
      }
    }
  }
}

function.json

{
  "bindings": [
    {
      "name": "incoming",
      "type": "serviceBusTrigger",
      "direction": "in",
      "queueName": "incoming",
      "connection": "SERVICEBUS"
    }
  ],
  "retry": {
    "strategy": "exponentialBackoff",
    "maxRetryCount": 5,
    "minimumInterval": "00:00:10",
    "maximumInterval": "00:15:00"
  },
  "scriptFile": "../dist/trigger/index.js"
}

2

Answers


  1. The issue is, whenever you send a message in queue, it treats is like a new message.
    How to fix it?
    You don’t have to push a new messages in the queue instead just remove try-catch and all the sending code and function app will take care of rest.

    const serviceBusQueueTrigger: AzureFunction = async function (
      context: Context,
      msg: any
    ): Promise<void> {
      context.log("ServiceBus queue trigger function processed message", msg);
      context.log("dequeueCount: ", context.bindingData.deliveryCount);
        // POST to API
     
    }
    

    please make sure your function.json file looks like following (code copied from the link below)

    {
    "bindings": [
        {
        "queueName": "testqueue",
        "connection": "MyServiceBusConnection",
        "name": "myQueueItem",
        "type": "serviceBusTrigger",
        "direction": "in"
        }
    ],
    "disabled": false
    }
    

    for more information -> https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-trigger?tabs=python-v2%2Cin-process%2Cextensionv5&pivots=programming-language-javascript

    For exponential retries, plese make sure you have host.json settings like below

    {
        "version": "2.0",
        "extensions": {
            "serviceBus": {
                "clientRetryOptions":{
                    "mode": "exponential",
                    "tryTimeout": "00:01:00",
                    "delay": "00:00:00.80",
                    "maxDelay": "00:01:00",
                    "maxRetries": 3
                },
                "prefetchCount": 0,
                "transportType": "amqpWebSockets",
                "webProxy": "https://proxyserver:8080",
                "autoCompleteMessages": true,
                "maxAutoLockRenewalDuration": "00:05:00",
                "maxConcurrentCalls": 16,
                "maxConcurrentSessions": 8,
                "maxMessageBatchSize": 1000,
                "sessionIdleTimeout": "00:01:00",
                "enableCrossEntityTransactions": false
            }
        }
    }
    

    enter image description here
    For more information -> https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus?tabs=in-process%2Cextensionv5%2Cextensionv3&pivots=programming-language-javascript

    Edit 1: After reading the github issue, we can try to use retry

    {
        "disabled": false,
        "bindings": [
            {
                ....
            }
        ],
        "retry": {
            "strategy": "exponentialBackoff",
            "maxRetryCount": 5,
            "minimumInterval": "00:00:10",
            "maximumInterval": "00:15:00"
        }
    }
    

    Edit 2:
    Make sure you have installed bundle
    More info here

    {
        "version": "2.0",
        "extensionBundle": {
            "id": "Microsoft.Azure.Functions.ExtensionBundle",
            "version": "[3.3.0, 4.0.0)"
        }
    }
    
    Login or Signup to reply.
  2. @Manish I can confirm the retry options on function.json works.

    @picklepick, to resolve your requirement, you can look at using the retry options on the function.json as Manish suggested and set the max delivery count on the queue to 1. This will allow all retries to be done at the function level using the function.json and then move the message to DLQ after they are done and failed.

    You can also use a combination of the 2 (function level and service bus level), i.e. let the function retries complete, move the message back to the queue and let it execute the function retries once again. For this, you will need to set the max delivery count to 2.

    So with these options, you can execute the retries using configuration without introducing addition retry code within the function itself.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search