skip to Main Content

I’m inserting from a batch execution around 2100 items in a cosmos DB. Those item represent a product detail taken from a supplier.

I’m saving them in a CosmosDB with the following code

 private async Task SaveProductToDbAsync(IList<ResponseItem> products, ILogger log, CancellationToken cancellationToken)
    {
        using var client = new CosmosClient(_cosmosDbOptions.CosmosDbAccountEndpoint, _cosmosDbOptions.CosmosDbAccountKey);

        var database = client.GetDatabase(_cosmosDbOptions.CosmosDbDatabaseName);

        var container = database.GetContainer(_cosmosDbOptions.CosmosDbContainerId);

        int count = 0;
        int successInsert = 0;
        foreach (var item in products)
        {
            item.id = Utility.ToGuid(item.pk);

            await container.CreateItemAsync(item, new PartitionKey(item.id), cancellationToken: cancellationToken)
                .ContinueWith(itemResponse =>
                     {
                         if (itemResponse.IsCompletedSuccessfully)
                         {
                             successInsert++;
                             log.LogDebug($"CosmosDB Created item in Database with id: {item.id}");
                         }
                         else
                         {
                             AggregateException innerExceptions = itemResponse.Exception.Flatten();
                             if (innerExceptions.InnerExceptions.FirstOrDefault(
                                     innerEx => innerEx is CosmosException) is CosmosException cosmosException)
                             {
                                 log.LogError(
                                     $"CosmosDB Could not Create item in Database with id: {item.id}, StatusCode: {cosmosException.StatusCode}, Error: {cosmosException.Message}");
                             }
                             else
                             {
                                 log.LogError(
                                     $"CosmosDB Could not Create item in Database with id: {item.id}, Error: {innerExceptions.InnerExceptions.FirstOrDefault()}");
                             }
                         }
                     }, cancellationToken);

            //This is to avoid 429 CosmosDB error messages
            await Task.Delay(80, cancellationToken);
            count++;
        }

        log.LogInformation($"Inserted {successInsert}/{count} products");
    }

If I don’t put the Task.Delay(80) I got a ton of 429 errors.

But at the end of execution there’re almost 3 minutes wasted waiting.. I’m running this code in a WebJob.

3

Answers


  1. Add throttling-retry settings when you init your CosmosClient like this:

    CosmosClientBuilder clientBuilder = new CosmosClientBuilder(cosmosDbEndpoint, tokenCredential)
    .WithConnectionModeDirect()
    .WithThrottlingRetryOptions(maxRetryWaitTimeOnThrottledRequests: TimeSpan.FromSeconds(30), maxRetryAttemptsOnThrottledRequests: 9);
    
    var client = clientBuilder.Build();
    

    This way, the client will handle 429-retries internally for you and only error out once the defined threshold have been exceeded.

    Login or Signup to reply.
  2. You don’t need to guess how long to call Task.Delay. The Cosmos exception tells you.

    try
    {
        foreach (var item in items)
        {
            await container.CreateItemAsync(item, new PartitionKey(item.id));
        }
    }
    catch (CosmosException ex)
    {
        Console.WriteLine(ex.Message);
    
        if (ex.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
        {
            TimeSpan delay = (TimeSpan)ex.RetryAfter;
            await Task.Delay(delay);
            await container.CreateItemAsync(record, new PartitionKey(id));
        }
    
    }
    
    Login or Signup to reply.
  3. As answered @MarkBrown and @Silent, you should use the RetryAfter property and configure MaxRetry properties to prevent looping uselessly.

    I had implementation that worked around this point :

        private async Task<U> SafeGetAsync<U>(IBaseEntityModel input, CancellationToken token, int retryQuery = 0)
                where U : class, IBaseEntityModel
    {
    try
    {
        // request your data
    }
    catch (DocumentClientException dce)
    {
        switch ((int)dce.StatusCode)
        {
            case 429: // RequestTooLarge exception - RU consumption - Should be handled by CosmosDBEventStore service from socle
                Thread.Sleep(dce.RetryAfter);
                break;
            case 408: // RequestTimeout exception - CPU Usage
            default:
                Thread.Sleep(TimeSpan.FromSeconds(++retryQuery));
                //Console.WriteLine("UpsertAsync Failed ({0}) : {1}", dce.InnerException.GetType(), dce.InnerException.Message);
                break;
        }
        return await SafeGetAsync<U>(input, token, retryQuery);
    }
    catch (Exception ex)
    {
        LogError($"Failed Operation - GetAsync : " +
            $"Message = {ex.Message} " +
            $"StackTrace = {ex.StackTrace} ");
    }
    

    Note that I also catch 408 issue that may happen if you overuse that CosmosDb, comparing his scaling and your usage.

    And configuration bellow :

    public static void ConfigureSocleServiceOptions(this IServiceCollection services, IConfiguration Configuration)
    {
        services.Configure<CosmosDBEventStoreOptions>(option);
        services.PostConfigure<CosmosDBEventStoreOptions>(options =>
        {
            options.RetryCount = 10;
        });
        services.PostConfigure<ExtraCosmosDBEventStoreOptions>(options =>
        {
            // RetryOptions
            options.MaxRetryAttemptsOnThrottledRequests = Configuration.GetSection("CosmosDb:EventStore:RetryOptions:MaxRetryAttemptsOnThrottledRequests").Get<int?>();
            options.MaxRetryWaitTimeInSeconds = Configuration.GetSection("CosmosDb:EventStore:RetryOptions:MaxRetryWaitTimeInSeconds").Get<int?>();
        });
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search