skip to Main Content

Here is my code

      KinesisClient kinesisClient = KinesisClient.builder().build();
      PutRecordRequest putRecordRequest = new PutRecordRequest();
      putRecordRequest.setStreamName("stream-name");
      putRecordRequest.setPartitionKey("SomeString-" + UUID.randomUUID());
      putRecordRequest.setData(ByteBuffer.wrap(data));

      PutRecordResponse putRecordResponse = kinesisClient.putRecord(putRecordRequest);

For some strange reason it’s saying it can’t resolve the putRecord(PutRecordRequest) method even though it’s clearly there when I look at the decompiled jar file. It even autocompletes that method when I start to type it.

autocomplete_image

What could be going wrong here?

2

Answers


  1. Chosen as BEST ANSWER

    I figured it out. For some reason I was using com.amazonaws.services.kinesis.model and software.amazon.awssdk.services.kinesis.model in my project. I was using the KinesisClient from one package and the PutRecordRequest from the other package.


  2. To work with AWS Services and Java, best pratice is to use AWS SDK for Java V2. The package software.amazon.awssdk.services.kinesis.model is V2. You are using V1, which is not recommended as specified on this AWS Page:

    AWS SDK Code Examples

    The full Java V2 code example for this operation is:

        package com.example.kinesis;
    
    //snippet-start:[kinesis.java2.putrecord.import]
    import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
    import software.amazon.awssdk.core.SdkBytes;
    import software.amazon.awssdk.regions.Region;
    import software.amazon.awssdk.services.kinesis.KinesisClient;
    import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
    import software.amazon.awssdk.services.kinesis.model.KinesisException;
    import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
    import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
    //snippet-end:[kinesis.java2.putrecord.import]
    
    /**
     * Before running this Java V2 code example, set up your development environment, including your credentials.
     *
     * For more information, see the following documentation topic:
     *
     * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
     */
    public class StockTradesWriter {
    
        public static void main(String[] args) {
    
            final String usage = "n" +
                "Usage:n" +
                "    <streamName>nn" +
                "Where:n" +
                "    streamName - The Amazon Kinesis data stream to which records are written (for example, StockTradeStream)nn";
    
                if (args.length != 1) {
                    System.out.println(usage);
                    System.exit(1);
                }
    
                String streamName = args[0];
                Region region = Region.US_EAST_1;
                KinesisClient kinesisClient = KinesisClient.builder()
                    .region(region)
                    .credentialsProvider(ProfileCredentialsProvider.create())
                    .build();
    
                // Ensure that the Kinesis Stream is valid.
                validateStream(kinesisClient, streamName);
                setStockData( kinesisClient, streamName);
                kinesisClient.close();
        }
    
        // snippet-start:[kinesis.java2.putrecord.main]
        public static void setStockData( KinesisClient kinesisClient, String streamName) {
    
            try {
                // Repeatedly send stock trades with a 100 milliseconds wait in between.
                StockTradeGenerator stockTradeGenerator = new StockTradeGenerator();
    
                // Put in 50 Records for this example.
                int index = 50;
                for (int x=0; x<index; x++){
                    StockTrade trade = stockTradeGenerator.getRandomTrade();
                    sendStockTrade(trade, kinesisClient, streamName);
                    Thread.sleep(100);
                }
    
            } catch (KinesisException | InterruptedException e) {
                System.err.println(e.getMessage());
                System.exit(1);
            }
            System.out.println("Done");
        }
    
        private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient,
                                           String streamName) {
            byte[] bytes = trade.toJsonAsBytes();
    
            // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
            if (bytes == null) {
                System.out.println("Could not get JSON bytes for stock trade");
                return;
            }
    
            System.out.println("Putting trade: " + trade);
            PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();
    
            try {
                kinesisClient.putRecord(request);
            } catch (KinesisException e) {
                System.err.println(e.getMessage());
            }
        }
    
        private static void validateStream(KinesisClient kinesisClient, String streamName) {
            try {
                DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                    .streamName(streamName)
                    .build();
    
                DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest);
    
                if(!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) {
                    System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again.");
                    System.exit(1);
                }
    
            }catch (KinesisException e) {
                System.err.println("Error found while describing the stream " + streamName);
                System.err.println(e);
                System.exit(1);
            }
        }
        // snippet-end:[kinesis.java2.putrecord.main]
    }
    

    You can find this AWS SDK for Java V2 example in the AWS Code Lib here:

    Kinesis examples using SDK for Java 2.x

    If you prefer using Github, here is where all AWS Java V2 code examples are located:

    https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/javav2

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search