skip to Main Content

I like to stream large data files over REST directly to a Azure Blob Storage.
Now i face the problem that when I send a filter with 250MB over REST to the Azure Blob Storage, but with 260MB it stuck for ever.

I made an example Project where you can verify this behavior. https://github.com/git9999999/azure-blob-large-file-upload-problem

Question is my Reactive Code correct?

https://github.com/git9999999/azure-blob-large-file-upload-problem/blob/main/downloader/src/main/java/com/azureproblem/blob/controller/AzureBlobBugDownloaderController.java#L131

    @GetMapping(path = "/trigger-download-to-blob/{fileSizeInMb}")
    public void triggerDownloadToBlob(@PathVariable int fileSizeInMb) {
        log.info("triggerDownload");

        var flux = this.webClient
            .get()
            .uri("/serve-file/" + fileSizeInMb)
            .accept(MediaType.APPLICATION_OCTET_STREAM)
            .exchangeToFlux(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()));

        var destination = "TestDownloadToAzureBlobStorage" + System.currentTimeMillis() + ".pdf";

        var blobClientTarget = this.containerClient.getBlobClient(destination);

        try (var outputStream = blobClientTarget.getBlockBlobClient().getBlobOutputStream(this.parallelTransferOptions, null, null, null, null)) {
            DataBufferUtils.write(flux, outputStream)
                .map(DataBufferUtils::release)
                .blockLast(Duration.ofHours(22));
            outputStream.flush();
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }

        log.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!   end download of {}", destination);
    }

2

Answers


  1. Chosen as BEST ANSWER

    The Problem was that I used the BlobContainerClient and not the BlobContainerAsyncClient. The BlobContainerAsyncClient has special API's to handle the Flux-"Stuff"

    Hear is the code that I use now:

     public void uploadAsync(Flux<DataBuffer> flux, Path destination) {
            BlobAsyncClient blobClientTarget = this.blobContainerAsyncClient.getBlobAsyncClient(relativePathUnix(destination));
            blobClientTarget.upload(flux.map((dataBuffer) -> {
                ByteBuffer buffer = ByteBuffer.allocate(dataBuffer.readableByteCount());
                dataBuffer.toByteBuffer(buffer);
                DataBufferUtils.release(dataBuffer);
                return buffer;
            }), this.parallelTransferOptions).block();
        }
    

    And Here is the Example repo with solution https://github.com/git9999999/azure-blob-large-file-upload-problem

    Here the Ticket that solve the Problem: https://github.com/Azure/azure-sdk-for-java/issues/35477


    • The uploadFile method now accepts a Flux<DataBuffer> directly as a request part instead of using MultipartFile

    • The file data is streamed using reactive programming with DataBufferUtils.write, Check the below code:

    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.core.io.buffer.DataBufferUtils;
    import org.springframework.http.MediaType;
    import org.springframework.stereotype.Controller;
    import org.springframework.util.StopWatch;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestPart;
    import org.springframework.web.multipart.MultipartFile;
    import reactor.core.publisher.Flux;
    
    import javax.annotation.PostConstruct;
    import java.io.IOException;
    import java.time.Duration;
    
    @Controller
    public class AzureBlobUploaderController {
    
        private final AzureBlobContainerClient containerClient;
        private final int bufferSize;
        private final int timeoutSeconds;
    
        public AzureBlobUploaderController(@Value("${azure.storage.containerName}") String containerName,
                                           @Value("${bufferSize}") int bufferSize,
                                           @Value("${timeoutSeconds}") int timeoutSeconds) {
            this.containerClient = new AzureBlobContainerClient(containerName);
            this.bufferSize = bufferSize;
            this.timeoutSeconds = timeoutSeconds;
        }
    
        @PostConstruct
        public void init() {
            // Create the container if it doesn't exist
            if (!containerClient.exists()) {
                containerClient.create();
            }
        }
    
        @PostMapping(path = "/upload")
        public void uploadFile(@RequestPart("file") Flux<DataBuffer> file) {
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
    
            String destinationBlobName = "UploadedFile_" + System.currentTimeMillis() + ".bin";
            var blobClient = containerClient.getBlobClient(destinationBlobName);
            var blobOutputStream = blobClient.getBlockBlobClient().getBlobOutputStream();
    
            DataBufferUtils.write(file, blobOutputStream, bufferSize)
                    .doOnError(throwable -> {
                        // Handle the error gracefully
                        stopWatch.stop();
                        System.out.println("Error occurred during data streaming: " + throwable.getMessage());
                    })
                    .doFinally(signal -> {
                        try {
                            blobOutputStream.flush();
                            blobOutputStream.close();
                            stopWatch.stop();
                            System.out.println("File uploaded successfully " + stopWatch.getTotalTimeMillis() + "ms");
                        } catch (IOException e) {
                            System.out.println("Error occurred while closing the output stream: " + e.getMessage());
                        }
                    })
                    .blockLast(Duration.ofSeconds(timeoutSeconds));
        }
    
        public static void main(String[] args) {
            // Replace with your Azure Blob Storage container name
            String containerName = "your-container-name";
    
            // Replace with your buffer size and timeout settings
            int bufferSize = ;
            int timeoutSeconds = ;
    
            AzureBlobUploaderController controller = new AzureBlobUploaderController(containerName, bufferSize, timeoutSeconds);
        }
    }
    

    Output :
    enter image description here

    • Here by the post request, I am able to upload the 1GB file to my blob container.

    enter image description here

    • Here I am able to see the file in my container.

    enter image description here

    enter image description here

    Updated code:

    public Mono<Void> uploadFileToBlobStorage(WebClient webClient, String containerName, String blobName, File file) {
        return webClient.put()
                .uri("/" + containerName + "/" + blobName)
                .header(HttpHeaders.CONTENT_LENGTH, String.valueOf(file.length()))
                .header(HttpHeaders.CONTENT_TYPE, "application/octet-stream")
                .body(BodyInserters.fromPublisher(getFileDataBufferFlux(file), DataBuffer.class))
                .retrieve()
                .bodyToMono(Void.class);
    }
    
    private Flux<DataBuffer> getFileDataBufferFlux(File file) {
        return DataBufferUtils.read(FileChannel::open, file.toPath(), DEFAULT_BUFFER_SIZE)
                .doFinally(signalType -> file.delete()); // Delete the file after reading its data
    }
    
    • The getFileDataBufferFlux reads the file in a reactive manner, returning a Flux<DataBuffer> the uploadFileToBlobStorage method uses the WebClient to perform a PUT request with the file’s data as the request body.
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search