skip to Main Content

I am trying to implement s3select in a spring boot app to query parquet file in s3 bucket, I am only getting partial result from the s3select output, Please help to identify the issue, i have used aws java sdk v2.

Upon checking the json output(printed in the console), overall characters in the output is 65k.

I am using eclipse and tried unchecking "Limit console output" in the console preference, which did not help.

Code is here:-

import java.util.List;
import java.util.concurrent.CompletableFuture;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CompressionType;
import software.amazon.awssdk.services.s3.model.EndEvent;
import software.amazon.awssdk.services.s3.model.ExpressionType;
import software.amazon.awssdk.services.s3.model.InputSerialization;
import software.amazon.awssdk.services.s3.model.JSONOutput;
import software.amazon.awssdk.services.s3.model.OutputSerialization;
import software.amazon.awssdk.services.s3.model.ParquetInput;
import software.amazon.awssdk.services.s3.model.RecordsEvent;
import software.amazon.awssdk.services.s3.model.SelectObjectContentEventStream;
import software.amazon.awssdk.services.s3.model.SelectObjectContentEventStream.EventType;
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponse;
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;

public class ParquetSelect {
    
    private static final String BUCKET_NAME = "<bucket-name>";
    private static final String KEY = "<object-key>";
    private static final String QUERY = "select * from S3Object s";
    public static S3AsyncClient s3;
    

    public static void selectObjectContent() {
        Handler handler = new Handler();
        
        SelectQueryWithHandler(handler).join();

        RecordsEvent recordsEvent = (RecordsEvent) handler.receivedEvents.stream()
                                                                         .filter(e -> e.sdkEventType() == EventType.RECORDS)
                                                                         .findFirst()
                                                                         .orElse(null);

        System.out.println(recordsEvent.payload().asUtf8String());

    }

    private static CompletableFuture<Void> SelectQueryWithHandler(SelectObjectContentResponseHandler handler) {
        InputSerialization inputSerialization = InputSerialization.builder()
                                                                  .parquet(ParquetInput.builder().build())
                                                                  .compressionType(CompressionType.NONE)
                                                                  .build();


        OutputSerialization outputSerialization = OutputSerialization.builder()
                                                                     .json(JSONOutput.builder().build())
                                                                     .build();


        SelectObjectContentRequest select = SelectObjectContentRequest.builder()
                                                                      .bucket(BUCKET_NAME)
                                                                      .key(KEY)
                                                                      .expression(QUERY)
                                                                      .expressionType(ExpressionType.SQL)
                                                                      .inputSerialization(inputSerialization)
                                                                      .outputSerialization(outputSerialization)
                                                                      .build();

        return s3.selectObjectContent(select, handler);
    }

    private static class Handler implements SelectObjectContentResponseHandler {
        private SelectObjectContentResponse response;
        private List<SelectObjectContentEventStream> receivedEvents = new ArrayList<>();
        private Throwable exception;

        @Override
        public void responseReceived(SelectObjectContentResponse response) {
            this.response = response;
        }

        @Override
        public void onEventStream(SdkPublisher<SelectObjectContentEventStream> publisher) {
            publisher.subscribe(receivedEvents::add);
        }

        @Override
        public void exceptionOccurred(Throwable throwable) {
            exception = throwable;
        }

        @Override
        public void complete() {
        }
    }
    
}

2

Answers


  1. I see you are using selectObjectContent(). Have you tried calling the s3AsyncClient.getObject() method. Does that work for you?

    For example, here is a code example that gets a PDF file from an Amazon S3 bucket and write the PDF file to a local file.

    package com.example.s3.async;
    // snippet-start:[s3.java2.async_stream_ops.complete]
    
    // snippet-start:[s3.java2.async_stream_ops.import]
    import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
    import software.amazon.awssdk.core.async.AsyncResponseTransformer;
    import software.amazon.awssdk.regions.Region;
    import software.amazon.awssdk.services.s3.S3AsyncClient;
    import software.amazon.awssdk.services.s3.model.GetObjectRequest;
    import software.amazon.awssdk.services.s3.model.GetObjectResponse;
    import java.nio.file.Paths;
    import java.util.concurrent.CompletableFuture;
    // snippet-end:[s3.java2.async_stream_ops.import]
    
    // snippet-start:[s3.java2.async_stream_ops.main]
    
    /**
     * Before running this Java V2 code example, set up your development environment, including your credentials.
     *
     * For more information, see the following documentation topic:
     *
     * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
     */
    
    public class S3AsyncStreamOps {
    
        public static void main(String[] args) {
    
            final String usage = "n" +
                    "Usage:n" +
                    "    <bucketName> <objectKey> <path>nn" +
                    "Where:n" +
                    "    bucketName - The name of the Amazon S3 bucket (for example, bucket1). nn" +
                    "    objectKey - The name of the object (for example, book.pdf). n" +
                    "    path - The local path to the file (for example, C:/AWS/book.pdf). n" ;
    
            if (args.length != 3) {
                System.out.println(usage);
                System.exit(1);
             }
    
            String bucketName = args[0];
            String objectKey = args[1];
            String path = args[2];
            ProfileCredentialsProvider credentialsProvider = ProfileCredentialsProvider.create();
            Region region = Region.US_EAST_1;
            S3AsyncClient s3AsyncClient = S3AsyncClient.builder()
                    .region(region)
                    .credentialsProvider(credentialsProvider)
                    .build();
    
            GetObjectRequest objectRequest = GetObjectRequest.builder()
                    .bucket(bucketName)
                    .key(objectKey)
                    .build();
    
            CompletableFuture<GetObjectResponse> futureGet = s3AsyncClient.getObject(objectRequest,
                    AsyncResponseTransformer.toFile(Paths.get(path)));
    
            futureGet.whenComplete((resp, err) -> {
                try {
                    if (resp != null) {
                        System.out.println("Object downloaded. Details: "+resp);
                    } else {
                        err.printStackTrace();
                    }
                } finally {
                   // Only close the client when you are completely done with it.
                    s3AsyncClient.close();
                }
            });
            futureGet.join();
        }
    }
    
    Login or Signup to reply.
  2. The problem is, that you may get multiple RecordsEvent events, each containing an InputStream with a part of the data – so you have to iterate through events and join them:

    fun selectS3ObjectContent(socRequest: SelectObjectContentRequest): InputStream {
        val handler = S3SelectObjectContentHandler()
        asyncS3.selectObjectContent(socRequest, handler).join()
    
        return if (handler.receivedEvents.filterIsInstance<RecordsEvent>().isEmpty())
            InputStream.nullInputStream()
        else
            handler.receivedEvents.filterIsInstance<RecordsEvent>().map {
                it.payload().asInputStream()
            }.reduce { l, r ->
                SequenceInputStream(l, r)
            }
    }
    
    class S3SelectObjectContentHandler : SelectObjectContentResponseHandler {
        private var response: SelectObjectContentResponse? = null
        var receivedEvents = ArrayList<SelectObjectContentEventStream>()
        private var exception: Throwable? = null
    
        override fun responseReceived(response: SelectObjectContentResponse?) {
            this.response = response
        }
    
        override fun onEventStream(publisher: SdkPublisher<SelectObjectContentEventStream>) {
            publisher.subscribe { event ->
                receivedEvents.add(event)
            }
        }
    
        override fun exceptionOccurred(throwable: Throwable?) {
            exception = throwable
        }
    
        override fun complete() { /* no-op */ }
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search