I need to send data to an external api, but this API has a limit of requests per endpoint(i.e: 60 requests per minute).
The data come from Kafka, then every message goes to redis(because I can send a request with 200 items). So, I use a simple cache to help me, and I can guarantee that if my server goes down, I wont lose any message.
The problem is, that there are moments when the Kafka starts to send to many messages, then the redis starts to grow(more than 1 million of messages to send to the api), and we can not make requests too fast as messages come in. Then, we have a big delay.
My first code was simple: ExecutorService executor = Executors.newFixedThreadPool(1);
This works very well, when there are few messages, and the delay is minimal.
So, the first thing that I did was change the executor to: ExecutorService executor = Executors.newCachedThreadPool();
So I can demand new threads as I need to make the requests to the external api faster, but, I have the problem with the limit of requests per minute.
There are endpoints that I can make 300 requests per minutes, others 500, others 30 and so on.
The code that I did is not very good, and this is for the company that I work, so, I really need to make this better.
So, every time that I am going to request the external api, I call the makeRequest method, this method is synchronized, I know that I could use a synchonized list, but I think that a synchronized method is better at this situation.
// This is an inner class
private static class IntegrationType {
final Queue<Long> requests; // This queue is used to store the timestamp of the requests
final int maxRequestsPerMinute; // How many requests I can make per minute
public IntegrationType(final int maxRequestsPerMinute) {
this.maxRequestsPerMinute = maxRequestsPerMinute;
this.requests = new LinkedList<>();
}
synchronized void makeRequest() {
final long current = System.currentTimeMillis();
requests.add(current);
if (requests.size() >= maxRequestsPerMinute) {
long first = requests.poll(); // gets the first request
// The difference between the current request and the first request of the queue
final int differenceInSeconds = (int) (current - first) / 1000;
// if the difference is less than the maximum allowed
if (differenceInSeconds <= 60) {
// seconds to sleep.
final int secondsToSleep = 60 - differenceInSeconds;
sleep(secondsToSleep);
}
}
}
void sleep( int seconds){
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
So, there is a Data Structure that I could use?
What considerations should I take?
Thanks in advance.
2
Answers
I implemented something different that what @gthanop suggested.
Something that I discover, is that the limits may change. So, I might need to grow or shrink the blocking list. Another reason, would not be so easily to adapt our current code to this. And another one, we might use more than one instance, so we will need a distributed lock.
So, I implement something more easily, but not so efficiently as the answer of @ghtanop.
Here is my code(adapted, cause I can not show the company code):
If understand your problem correctly, you can use a
BlockingQueue
with aScheduledExecutorService
as follows.BlockingQueue
s have the methodput
which will only add the given element at the queue if there is available space, otherwise the method call will wait (until there is free space). They also have the methodtake
which will only remove an element from the queue if there are any elements at all, otherwise the method call will wait (until there is at least one element to take).Specifically you can use a
LinkedBlockingQueue
or anArrayBlockingQueue
which can be given with a fixed size of elements to hold at any given time. This fixed size means that you can submit withput
as many requests as you like, but you will onlytake
requests and process them once every second or something (so as to make 60 requests per minute for example).To instantiate a
LinkedBlockingQueue
with fixed size, just use the corresponding constructor (which accepts the size as the argument).LinkedBlockingQueue
willtake
elements in FIFO order according to its documentation.To instantiate an
ArrayBlockingQueue
with fixed size, use the constructor which accepts the size but also theboolean
flag namedfair
. If this flag istrue
then the queue willtake
elements also in FIFO order.Then you can have a
ScheduledExecutorService
(instead of waiting inside a loop) where you can submit a singleRunnable
which willtake
from the queue, make the communication with the external API and then wait for the required delay between communications.Follows a simple demonstration example of the above:
Note that I used
scheduleWithFixedDelay
and notscheduleAtFixedRate
. You can see in their documentation that the first one will wait for the delay between the end of the call of the submittedRunnable
to start the next one, while the second one will not wait and just resubmit theRunnable
everyperiod
time units. But we don’t know how long does it take to communicate with the external API, so what if for example wescheduleAtFixedRate
with aperiod
of once every minute, but the request takes more than a minute to be completed?… Then a new request would be submitted while the first one is not yet finished. So that is why I usedscheduleWithFixedDelay
instead ofscheduleAtFixedRate
. But there is more: I used a single thread scheduled executor service. Does that mean that if the first call is not finished, then a second cannot be started?… Well it seems, if you take a look at the implementation ofExecutors#newSingleThreadScheduledExecutor()
, that a second call may occur because single thread core pool size, does not mean that the pool is of fixed size.Another reason that I used
scheduleWithFixedDelay
is because of underflow of requests. For example what about the queue being empty? Then the scheduling should also wait and not submit theRunnable
again.On the other hand, if we use
scheduleWithFixedDelay
, with say a delay of1/60f
seconds between scheduling, and there are submitted more than 60 requests in a single minute, then this will surely make our throughput to the external API drop, because withscheduleWithFixedDelay
we can guarantee that at most 60 requests will be made to the external API. It can be less than that, but we don’t want it to be. We would like to reach the limit every single time. If that’s not a concern to you, then you can use the above implementation already.But let’s say you do care to reach as close to the limit as possible every time, in which case and as far as I know, you can do this with a custom scheduler, which would be less clean solution than the first, but more time accurate.
Bottomline, with the above implementation, you need to make sure that the communication with the external API to serve the requests is as fast as possible.
Finaly, I should warn you to consider that I couldn’t find what happens if the
BlockingQueue
implementations I suggested are notput
ing in FIFO order. I mean, what if 2 requests arrive at almost the same time while the queue is full? They will both wait, but will the first one which arrived be waiting and getput
ed first, or the second one beput
ed first? I don’t know. If you don’t care about some requests being made at the external API out of order, then don’t worry and use the code up to this point. If you do care however, and you are able to put for example a serial number at each request, then you can use aPriorityQueue
after theBlockingQueue
, or even experiment withPriorityBlockingQueue
(which is unfortunately unbounded). That would complicate things even more, so I didn’t post relevant code with thePriorityQueue
. At least I did my best and I hope I shed some good ideas. I am not saying this post is a complete solution to all your problems, but it is some considerations to start with.