skip to Main Content

I am looking for a performant (fast) and reliable (no exceptions to expect on a normal day besides throttling) way to upload many (say 1000) small (say 2kb) blobs to Azure. Azure blocks seem irrelevant as my blobs are small enough to be in a http call of their own.

AZCopy is not an option as I am looking for a .net solution, not a CLI.

I tried simple approaches (eg, a vanilla Task.WhenAll combined with UploadAsync) and more sophisticated approaches using SemaphoreSlim and retries on throttling exceptions. Despite that, I always end up with either random exceptions which I don’t understand the cause of (like System.ObjectDisposedException for object System.Net.Sockets.Socket), or, disappointing performances (best I got was 16s, which seems very long for a batch upload of 2Mb blobs). I don’t know if my expectations are reasonable, but I was hoping to get something around (numberOfBlobs/parallelism)*200ms, so 4s for 1000 blobs and 50 concurrent threads

Sharing below my inefficient and unreliable code, make sure you don’t use it for your applications

    const int numberOfBlobs = 1000;
    private static readonly string LogFilePath = @"C:log.txt";
    private static readonly TimeSpan RetryDelay = TimeSpan.FromSeconds(2);
    private Random random = new Random();
    private static int concurrentThreads = 50;
    private static byte[] dataBytes = Encoding.UTF8.GetBytes(new string('A', 2048));

    private static BlobUploadOptions options = new BlobUploadOptions
    {
        TransferOptions = new StorageTransferOptions
        {
            // Set the maximum number of workers that 
            // may be used in a parallel transfer.
            MaximumConcurrency = concurrentThreads,

            // 1MB.
            MaximumTransferSize = 1 * 1024 * 1024
        }
    };

    public static async Task Main(string[] args)
    {
        ThreadPool.SetMinThreads(concurrentThreads + 50, concurrentThreads + 50);

        ServicePointManager.DefaultConnectionLimit = concurrentThreads;
        
        //those don't seem to have any significant effect
        //ServicePointManager.Expect100Continue = false;
        //ServicePointManager.UseNagleAlgorithm = false;
        
        var p = new Program();
        var containerClient = await p.GetBlobContainerClient(ContainerUrl, ContainerName);
        //await p.ClearContainer(containerClient);
        await p.WriteAsyncWithSemaphores(containerClient);

    }

    private static async Task UploadStreamWithMetrics(BlobClient blobClient, MemoryStream stream)
    {
        Console.WriteLine($"Starting {blobClient.Name}");

        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();

        await blobClient.UploadAsync(stream, options);

        stopwatch.Stop();
        Console.WriteLine($"Stopping {blobClient.Name} Time: {stopwatch.ElapsedMilliseconds}");
    }


    public async Task WriteAsyncWithSemaphores(BlobContainerClient containerClient)
    {
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();

        var tasks = new List<Task>();
        using (var throttler = new SemaphoreSlim(concurrentThreads))
        {
            for (int i = 0; i < numberOfBlobs; i++)
            {
                await throttler.WaitAsync().ConfigureAwait(false);
                Console.WriteLine("tokens available: " + throttler.CurrentCount);
                tasks.Add(UploadBlobAsync(containerClient, throttler));
                //tasks.Add(FakeTask(containerClient, throttler));
            }

            await Task.WhenAll(tasks);
        }

        stopwatch.Stop();
        System.IO.File.AppendAllText(LogFilePath, $"Method: WriteAsyncWithSemaphores, {numberOfBlobs} blobs, {concurrentThreads} Threads, Time: {stopwatch.ElapsedMilliseconds}" + Environment.NewLine);
    }



    private async Task UploadBlobAsync(BlobContainerClient containerClient, SemaphoreSlim throttler)
    {
        try
        {
            await UploadBlobWithRetryAsync(containerClient, Guid.NewGuid().ToString());
        }
        finally
        {
            throttler.Release();
        }
    }

    private static async Task UploadBlobWithRetryAsync(BlobContainerClient containerClient, string filename)
    {
        int retryCount = 0;
        bool success = false;

        while (!success && retryCount < 3) 
        {
            try
            {
                BlobClient blobClient = containerClient.GetBlobClient(filename);
                using (var stream = new MemoryStream(dataBytes))
                {
                    await UploadStreamWithMetrics(blobClient, stream);
                }
                success = true;
            }
            catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.TooManyRequests || ex.Status == (int)HttpStatusCode.ServiceUnavailable)
            {
                retryCount++;
                await Task.Delay(RetryDelay * retryCount); 
            }
        }
    }

EDIT: I noticed that the problem might be that somehow, all the available threads in SemaphoreSlim aren’t getting used but I have not been able to find what prevents them to be allocated

tokens available: 46
Starting f201eaec-9c6e-4cd7-965a-fe69e831cdc2
tokens available: 45
Starting e4d4ea9b-d396-4736-a25a-570125b564d5
tokens available: 44
Starting 9a52abae-3b9e-4835-8e0a-d17bb633f476
Stopping dd8b52d5-ab94-4708-8502-d1a5cee1bdf0 Time: 143
tokens available: 44
Starting 7851ced0-22b0-46ba-8e4a-3c97452231b7
Stopping a64b86a6-fa4f-4ea8-8e32-eb4c920b46a1 Time: 123
tokens available: 44
Starting 7ea658b3-a839-4b9b-a2c5-1f4a4553de2a
tokens available: 43
Starting cbe69d17-e436-4f47-96cf-c311a26865ca
tokens available: 42
Starting 9b7aed6a-31f1-4dd5-bca3-0528a5ecb0f7
Stopping c3fe9d26-fcc7-4f3f-a4dd-c12597070a9f Time: 152
Stopping f201eaec-9c6e-4cd7-965a-fe69e831cdc2 Time: 117
tokens available: 43
Starting 77cee6e0-3e06-4260-aff4-ab42560edb7a
Stopping e4d4ea9b-d396-4736-a25a-570125b564d5 Time: 128
Stopping 9a52abae-3b9e-4835-8e0a-d17bb633f476 Time: 124
tokens available: 44
Starting 64e4da60-698b-48a2-9ea3-5a31427e300e
Stopping 7851ced0-22b0-46ba-8e4a-3c97452231b7 Time: 124
tokens available: 44
Stopping 7ea658b3-a839-4b9b-a2c5-1f4a4553de2a Time: 114
Starting bc813234-0b87-4654-8c71-88b9f0d80519
Stopping cbe69d17-e436-4f47-96cf-c311a26865ca Time: 120
tokens available: 45

The result is that it doesn’t actually run with the expected degree of parallelism, so while individual calls are fast (about 150ms), the overall upload takes about 30s, which checks out with the observations above: (1000 blobs / 5 actual threads used on average) * 150ms = 30s

I created a FakeTask to confirm that all threads are normally used to cross out potential environment problems of mine:

private async Task FakeTask(BlobContainerClient containerClient, SemaphoreSlim throttler)
{
   using (var stream = new MemoryStream(dataBytes))
   {
       await Task.Delay(random.Next(3000, 3001));
   }
   throttler.Release();

}

output:

tokens available: 0
tokens available: 1
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 1
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 0
tokens available: 1
tokens available: 0
tokens available: 0
tokens available: 1
tokens available: 0

As threads are properly consumed there, there must be something in UploadBlobWithRetryAsync that prevents/delays thread allocation.

2

Answers


  1. Chosen as BEST ANSWER

    After a while, I eventually understood what caused my code to underperform and not have all available threads allocated. It turns out that when I didn't run in Debug mode, performances got better by 10x. I don't understand why FakeTask had proper thread allocation in Debug mode but UploadBlobAsync task didn't. Even if there is some synchronization needed for the Debug mode to work, I thought the run would make use of available threads or behave consistently for different tasks, especially since I had no breakpoint and since there was some degree of parallelism still (5 threads at a time), so that's still a mystery. This finding allowed me to eventually reach an overall 3s latency for 1000 blobs and 50 concurrent threads, so, in line with the 150ms latency I had when uploading to Azure.


  2. I have increased the degree of parallelism and optimized the settings to improve the upload speed. I successfully uploaded 1000 2kb blobs to the Azure storage within 4 to 5 seconds without any issues.

    Code :

    using System.Diagnostics;
    using System.Net;
    using Azure;
    using Azure.Storage.Blobs;
    using Azure.Storage.Blobs.Models;
    
    public class Program
    {
        private const int NumberOfBlobs = 1000; 
        private static readonly string LogFilePath = @"C:log.txt"; 
        private static readonly string BlobsDirectoryPath = @"C:blobs"; 
        private static readonly TimeSpan RetryDelay = TimeSpan.FromSeconds(2);
        private const string ConnectionString = "<storage_connec_string>";
        private const string ContainerName = "<container_name>";
        public static async Task Main(string[] args)
        {
            ServicePointManager.DefaultConnectionLimit = 1000;
            ServicePointManager.Expect100Continue = false;
            ServicePointManager.UseNagleAlgorithm = false;
    
            var p = new Program();
            var containerClient = await p.GetBlobContainerClient(ConnectionString, ContainerName);
            await p.ClearContainer(containerClient);
            await p.UploadBlobsInParallel(containerClient);
        }
        private async Task<BlobContainerClient> GetBlobContainerClient(string connectionString, string containerName)
        {
            var containerClient = new BlobContainerClient(connectionString, containerName);
            await containerClient.CreateIfNotExistsAsync(PublicAccessType.None);
            return containerClient;
        }
        private async Task ClearContainer(BlobContainerClient containerClient)
        {
            await foreach (var blob in containerClient.GetBlobsAsync())
            {
                await containerClient.DeleteBlobAsync(blob.Name);
            }
        }
        private async Task UploadBlobsInParallel(BlobContainerClient containerClient)
        {
            var stopwatch = Stopwatch.StartNew();
            var semaphore = new SemaphoreSlim(200);
            var tasks = new List<Task>();
            var files = Directory.GetFiles(BlobsDirectoryPath);
            foreach (var filePath in files)
            {
                if (tasks.Count >= NumberOfBlobs)
                    break;
                await semaphore.WaitAsync();
                tasks.Add(Task.Run(async () =>
                {
                    try
                    {
                        await UploadBlobWithRetryAsync(containerClient, Path.GetFileName(filePath), filePath);
                    }
                    finally
                    {
                        semaphore.Release();
                    }
                }));
            }
            await Task.WhenAll(tasks);
            stopwatch.Stop();
            Console.WriteLine($"{NumberOfBlobs} files uploaded successfully in {stopwatch.ElapsedMilliseconds}ms.");
            await File.AppendAllTextAsync(LogFilePath, $"Time: {stopwatch.ElapsedMilliseconds}ms{Environment.NewLine}");
        }
        private static async Task UploadBlobWithRetryAsync(BlobContainerClient containerClient, string filename, string filePath)
        {
            int retryCount = 0;
            bool success = false;
            while (!success && retryCount < 3)
            {
                try
                {
                    var blobClient = containerClient.GetBlobClient(filename);
                    using var stream = File.OpenRead(filePath);
                    await blobClient.UploadAsync(stream, overwrite: true);
                    success = true;
                }
                catch (RequestFailedException ex) when (ex.Status == 429 || ex.Status == 503) 
                {
                    retryCount++;
                    await Task.Delay(RetryDelay * retryCount);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Failed to upload {filename}: {ex.Message}");
                    throw;
                }
            }
        }
    }
    

    Output :

    1000 blob files uploaded successfully in 5 seconds as below,

    enter image description here

    log.txt:

    It depends on the network speed. You can see in the below log.txt, that the blobs uploading Time has changed according to the network speed,

    enter image description here

    The 1000 2kb blobs were uploaded successfully to the Azure blob storage as below.

    enter image description here

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search