skip to Main Content

I need to send data to an external api, but this API has a limit of requests per endpoint(i.e: 60 requests per minute).

The data come from Kafka, then every message goes to redis(because I can send a request with 200 items). So, I use a simple cache to help me, and I can guarantee that if my server goes down, I wont lose any message.

The problem is, that there are moments when the Kafka starts to send to many messages, then the redis starts to grow(more than 1 million of messages to send to the api), and we can not make requests too fast as messages come in. Then, we have a big delay.

My first code was simple: ExecutorService executor = Executors.newFixedThreadPool(1);
This works very well, when there are few messages, and the delay is minimal.

So, the first thing that I did was change the executor to: ExecutorService executor = Executors.newCachedThreadPool();
So I can demand new threads as I need to make the requests to the external api faster, but, I have the problem with the limit of requests per minute.

There are endpoints that I can make 300 requests per minutes, others 500, others 30 and so on.

The code that I did is not very good, and this is for the company that I work, so, I really need to make this better.

So, every time that I am going to request the external api, I call the makeRequest method, this method is synchronized, I know that I could use a synchonized list, but I think that a synchronized method is better at this situation.

// This is an inner class
private static class IntegrationType {

    final Queue<Long> requests; // This queue is used to store the timestamp of the requests
    final int maxRequestsPerMinute; // How many requests I can make per minute

    public IntegrationType(final int maxRequestsPerMinute) {
        this.maxRequestsPerMinute = maxRequestsPerMinute;
        this.requests = new LinkedList<>();
    }

    synchronized void makeRequest() {
        final long current = System.currentTimeMillis();
        requests.add(current);
        if (requests.size() >= maxRequestsPerMinute) {
            long first = requests.poll(); // gets the first request

            // The difference between the current request and the first request of the queue
            final int differenceInSeconds = (int) (current - first) / 1000;
           
            // if the difference is less than the maximum allowed
            if (differenceInSeconds <= 60) {
                // seconds to sleep.
                final int secondsToSleep = 60 - differenceInSeconds;
                sleep(secondsToSleep);
            }
        }
    }

     void sleep( int seconds){
        try {
            Thread.sleep(seconds * 1000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
     }
}

So, there is a Data Structure that I could use?
What considerations should I take?

Thanks in advance.

2

Answers


  1. Chosen as BEST ANSWER

    I implemented something different that what @gthanop suggested.

    Something that I discover, is that the limits may change. So, I might need to grow or shrink the blocking list. Another reason, would not be so easily to adapt our current code to this. And another one, we might use more than one instance, so we will need a distributed lock.

    So, I implement something more easily, but not so efficiently as the answer of @ghtanop.

    Here is my code(adapted, cause I can not show the company code):

    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.Semaphore;
    
    public class Teste {
        
        private static enum ExternalApi {    
            A, B, C;
        }
    
        private static class RequestManager {
    
            private long firstRequest; // First request in one minute
        
            // how many request have we made
            private int requestsCount = 0;
        
            // A timer thread, it will execute at every minute, it will refresh the request count and the first request time
            private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        
            RequestManager() {
                final long initialDelay = 0L;
                final long fixedRate = 60;
        
                executor.scheduleAtFixedRate(() -> {
                    System.out.println("Clearing the current count!");
                    requestsCount = 0;
                    firstRequest = System.currentTimeMillis();
                }, initialDelay, fixedRate, TimeUnit.SECONDS);
            }
        
            void incrementRequest() {
                requestsCount++;
            }
        
            long getFirstRequest() {
                return firstRequest;
            }
        
        
            boolean requestsExceeded(final int requestLimit) {
                return requestsCount >= requestLimit;
            }
        
        }
    
        public static class RequestHelper {
    
            private static final byte SECONDS_IN_MINUTE = 60;
            private static final short MILLISECONDS_IN_SECOND = 1000;
            private static final byte ZERO_SECONDS = 0;
        
            // Table to support the time, and count of the requests
            private final Map<Integer, RequestManager> requests;
        
            // Table that contains the limits of each type of request
            private final Map<Integer, Integer> requestLimits;
        
            /**
             * We need an array of semaphores, because, we might lock the requests for ONE, but not for TWO
             */
            private final Semaphore[] semaphores;
        
            private RequestHelper(){
        
                // one semaphore for type
                semaphores = new Semaphore[ExternalApi.values().length];
                requests = new ConcurrentHashMap<>();
                requestLimits = new HashMap<>();
        
                for (final ExternalApi type : ExternalApi.values()) {
    
                    // Binary semaphore, must be fair, because we are updating things.
                    semaphores[type.ordinal()] = new Semaphore(1, true);
                }
            }
        
            /**
             * When my token expire, I must update this, because the limits might change.
             * @param limits the new api limits
             */
            protected void updateLimits(final Map<ExternalApi, Integer> limits) {
                limits.forEach((key, value) -> requestLimits.put(key.ordinal(), value));
            }
        
        
            /**
             * Increments the counter for the type of the request,
             * Using the mutual exclusion lock, we can handle and block other threads that are trying to
             * do a request to the api.
             * If the incoming requests are going to exceed the maximum, we will make the thread sleep for N seconds ( 60 - time since first request)
             * since we are using a Binary Semaphore, it will block incoming requests until the thread that is sleeping, wakeup and release the semaphore lock.
             *
             * @param type of the integration, Supp, List, PET etc ...
             */
            protected final void addRequest(final ExternalApi type) {
        
                // the index of this request
                final int requestIndex = type.ordinal();
        
                // we get the permit for the semaphore of the type
                final Semaphore semaphore = semaphores[requestIndex];
        
                // Try to acquire a permit, if no permit is available, it will block until one is available.
                semaphore.acquireUninterruptibly();
        
                ///gets the requestManager for the type
                final RequestManager requestManager = getRequest(requestIndex);
        
                // increments the number of requests
                requestManager.incrementRequest();
        
                if (requestManager.requestsExceeded(requestLimits.get(type.ordinal()))) {
        
                    // the difference in seconds between a minute - the time that we needed to reach the maximum of requests
                    final int secondsToSleep = SECONDS_IN_MINUTE - (int) (System.currentTimeMillis() - requestManager.getFirstRequest()) / MILLISECONDS_IN_SECOND;
        
                    // We reached the maximum in less than a minute
                    if (secondsToSleep > ZERO_SECONDS) {
                        System.out.printf("We reached the maximum of: %d per minute by: %s. We must wait for: %d before make a new request!n", requestLimits.get(type.ordinal()), type.name(), secondsToSleep);
                        sleep(secondsToSleep * MILLISECONDS_IN_SECOND);
                    }
                }
                // releases the semaphore
                semaphore.release();
            }
        
        
            private final void sleep(final long time) {
                try {
                    Thread.sleep(time);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        
            /**
             * Gets the first Request Manager, if it is the first request, it will create the
             * RequestManager object
             * @param index
             * @return a RequestManager instance
             */
            private RequestManager getRequest(final int index) {
                RequestManager request = requests.get(index);
                if(request == null) {
                    request = new RequestManager();
                    requests.put(index, request);
                }
                return request;
            }
        }
    
        public static void main(String[] args) {
            
            final RequestHelper requestHelper = new RequestHelper();
            
            final Map<ExternalApi, Integer> apiLimits = Map.of(ExternalApi.A, 30, ExternalApi.B, 60, ExternalApi.C, 90);
            
            // update the limits
            requestHelper.updateLimits(apiLimits);
    
            final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
            executor.scheduleWithFixedDelay(() -> {
                System.out.println("A new request is going to happen");
                requestHelper.addRequest(ExternalApi.A);
                sleep(65);
            }, 0, 100, TimeUnit.MILLISECONDS);
    
            executor.scheduleWithFixedDelay(() -> {
                System.out.println("B new request is going to happen");
                requestHelper.addRequest(ExternalApi.B);
                sleep(50);
            }, 0, 200, TimeUnit.MILLISECONDS);
    
            executor.scheduleWithFixedDelay(() -> {
                System.out.println("C new request is going to happen");
                requestHelper.addRequest(ExternalApi.C);
                sleep(30);
            }, 0, 300, TimeUnit.MILLISECONDS);
    
        }
        
        
        private static final void sleep(final long time) {
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } 
    }
    

  2. If understand your problem correctly, you can use a BlockingQueue with a ScheduledExecutorService as follows.

    BlockingQueues have the method put which will only add the given element at the queue if there is available space, otherwise the method call will wait (until there is free space). They also have the method take which will only remove an element from the queue if there are any elements at all, otherwise the method call will wait (until there is at least one element to take).

    Specifically you can use a LinkedBlockingQueue or an ArrayBlockingQueue which can be given with a fixed size of elements to hold at any given time. This fixed size means that you can submit with put as many requests as you like, but you will only take requests and process them once every second or something (so as to make 60 requests per minute for example).

    To instantiate a LinkedBlockingQueue with fixed size, just use the corresponding constructor (which accepts the size as the argument). LinkedBlockingQueue will take elements in FIFO order according to its documentation.

    To instantiate an ArrayBlockingQueue with fixed size, use the constructor which accepts the size but also the boolean flag named fair. If this flag is true then the queue will take elements also in FIFO order.

    Then you can have a ScheduledExecutorService (instead of waiting inside a loop) where you can submit a single Runnable which will take from the queue, make the communication with the external API and then wait for the required delay between communications.

    Follows a simple demonstration example of the above:

    import java.util.Objects;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
        
        public static class RequestSubmitter implements Runnable {
            private final BlockingQueue<Request> q;
            
            public RequestSubmitter(final BlockingQueue<Request> q) {
                this.q = Objects.requireNonNull(q);
            }
            
            @Override
            public void run() {
                try {
                    q.put(new Request()); //Will block until available capacity.
                }
                catch (final InterruptedException ix) {
                    System.err.println("Interrupted!"); //Not expected to happen under normal use.
                }
            }
        }
        
        public static class Request {
            public void make() {
                try {
                    //Let's simulate the communication with the external API:
                    TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 100));
                }
                catch (final InterruptedException ix) {
                    //Let's say here we failed to communicate with the external API...
                }
            }
        }
        
        public static class RequestImplementor implements Runnable {
            private final BlockingQueue<Request> q;
            
            public RequestImplementor(final BlockingQueue<Request> q) {
                this.q = Objects.requireNonNull(q);
            }
            
            @Override
            public void run() {
                try {
                    q.take().make(); //Will block until there is at least one element to take.
                    System.out.println("Request made.");
                }
                catch (final InterruptedException ix) {
                    //Here the 'taking' from the 'q' is interrupted.
                }
            }
        }
        
        public static void main(final String[] args) throws InterruptedException {
            
            /*The following initialization parameters specify that we
            can communicate with the external API 60 times per 1 minute.*/
            final int maxRequestsPerTime = 60;
            final TimeUnit timeUnit = TimeUnit.MINUTES;
            final long timeAmount = 1;
            
            final BlockingQueue<Request> q = new ArrayBlockingQueue<>(maxRequestsPerTime, true);
            //final BlockingQueue<Request> q = new LinkedBlockingQueue<>(maxRequestsPerTime);
            
            //Submit some RequestSubmitters to the pool...
            final ExecutorService pool = Executors.newFixedThreadPool(100);
            for (int i = 0; i < 50_000; ++i)
                pool.submit(new RequestSubmitter(q));
            
            System.out.println("Serving...");
            
            //Find out the period between communications with the external API:
            final long delayMicroseconds = TimeUnit.MICROSECONDS.convert(timeAmount, timeUnit) / maxRequestsPerTime;
            //We could do the same with NANOSECONDS for more accuracy, but that would be overkill I think.
            
            //The most important line probably:
            Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new RequestImplementor(q), 0L, delayMicroseconds, TimeUnit.MICROSECONDS);
        }
    }
    

    Note that I used scheduleWithFixedDelay and not scheduleAtFixedRate. You can see in their documentation that the first one will wait for the delay between the end of the call of the submitted Runnable to start the next one, while the second one will not wait and just resubmit the Runnable every period time units. But we don’t know how long does it take to communicate with the external API, so what if for example we scheduleAtFixedRate with a period of once every minute, but the request takes more than a minute to be completed?… Then a new request would be submitted while the first one is not yet finished. So that is why I used scheduleWithFixedDelay instead of scheduleAtFixedRate. But there is more: I used a single thread scheduled executor service. Does that mean that if the first call is not finished, then a second cannot be started?… Well it seems, if you take a look at the implementation of Executors#newSingleThreadScheduledExecutor(), that a second call may occur because single thread core pool size, does not mean that the pool is of fixed size.

    Another reason that I used scheduleWithFixedDelay is because of underflow of requests. For example what about the queue being empty? Then the scheduling should also wait and not submit the Runnable again.

    On the other hand, if we use scheduleWithFixedDelay, with say a delay of 1/60f seconds between scheduling, and there are submitted more than 60 requests in a single minute, then this will surely make our throughput to the external API drop, because with scheduleWithFixedDelay we can guarantee that at most 60 requests will be made to the external API. It can be less than that, but we don’t want it to be. We would like to reach the limit every single time. If that’s not a concern to you, then you can use the above implementation already.

    But let’s say you do care to reach as close to the limit as possible every time, in which case and as far as I know, you can do this with a custom scheduler, which would be less clean solution than the first, but more time accurate.

    Bottomline, with the above implementation, you need to make sure that the communication with the external API to serve the requests is as fast as possible.

    Finaly, I should warn you to consider that I couldn’t find what happens if the BlockingQueue implementations I suggested are not puting in FIFO order. I mean, what if 2 requests arrive at almost the same time while the queue is full? They will both wait, but will the first one which arrived be waiting and get puted first, or the second one be puted first? I don’t know. If you don’t care about some requests being made at the external API out of order, then don’t worry and use the code up to this point. If you do care however, and you are able to put for example a serial number at each request, then you can use a PriorityQueue after the BlockingQueue, or even experiment with PriorityBlockingQueue (which is unfortunately unbounded). That would complicate things even more, so I didn’t post relevant code with the PriorityQueue. At least I did my best and I hope I shed some good ideas. I am not saying this post is a complete solution to all your problems, but it is some considerations to start with.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search