skip to Main Content

I am learning leaky bucket algorithm and want to get my hand dirty by writing some simple code with redis plus golang http.

When I searched here with the keyword redis, leaky, bucket. There are many similar questions as shown in [1], which is nice. However I find I have a problem to understand the entire logic after going through those threads and wiki[2]. I suppose there is something I do not understand and am also not aware of it. So I would like to rephrase it again here; and please correct me if I get it wrong.

The pseudo code:

key := "ip address, token or anything that can be the representative of a client"
redis_queue_size := 5
interval_between_each_request := 7
request := obtain_http_request_from_somewhere()

if check_current_queue_size() < redis_queue_size:
    if is_queue_empty()
        add_request_to_the_queue() // zadd "ip1" now() now() // now() is something like seconds, milliseconds or nanoseconds e.g. t = 1
        process_request(request)
    else
        now := get_current_time()
        // add_request_to_... retrieves the first element in the queue
        // compute the expected timestamp to execute the request and its current time
        // e.g. zadd "ip1" <time of the first elment in the queue + interval_between_each_request> now
        add_request_to_redis_queue_with_timestamp(now, interval_between_each_request) // e.g. zadd "ip" <timestamp as score> <timestamp a request is allowed to be executed>
        // Below function check_the_time_left...() will check how many time left at which the current request need to wait. 
        // For instance, the first request stored in the queue with the command
        //    zadd "ip1" 1 1  // t = 1
        // and the second request arrives at t = 4 but it is allowed t be executed at t = 8
        //    zadd "ip1" 8 4 // where 4 := now, 8 := 1 + interval_between_each_request
        // so the N will be 4 
        N := check_the_time_left_for_the_current_request_to_execute(now, interval_between_each_request) 
        sleep(N) // now the request wait for 4 seconds before processing the request
        process_request(http_request_obj)
else
    return // discard request

I understand the part when queue is full, then the following requests will be discarded. However I suppose I may misunderstand when the queue is not full, how to reshape the incoming request so it can be executed in a fixed rate.

I appreciate any suggestions

[1]. https://stackoverflow.com/search?q=redis+leaky+bucket+&s=aa2eaa93-a6ba-4e31-9a83-68f791c5756e

[2]. https://en.wikipedia.org/wiki/Leaky_bucket#As_a_queue

2

Answers


  1. There are several ways you can implement a leaky bucket but there should be two separate parts for the process. One that puts things in the bucket and another that removes them at a set interval if there is anything to remove.

    You can use a separate goroutine that would consume the messages at a set interval. This would simplify your code since on one code path you would only have to look into the queue size and drop packets and another code path would just consume whatever there is.

    Login or Signup to reply.
    1. If this is for simple rate-limiting the sliding window approach using a sorted set is what we see implemented by most Redis users https://github.com/Redislabs-Solution-Architects/RateLimitingExample/blob/sliding_window/app.py

    2. If you are set on leaky bucket you might consider using a redis stream per consumerID (apiToken/IP Address etc) as follows

    request comes in for consumerID

    XADD requests-[consumerID] MAXLEN [BUCKET SIZE]

    spawn a go routine if necessary for that consumerID
    get current time
    if XLEN of requests-[consumerID] is 0 exit go routine

    XREAD COUNT [number_of_requests_per_period] BLOCK [time period – 1 ms] STREAMS requests-[consumerID]
    get the current time and sleep for the remainder of the time period

    https://redis.io/commands#stream details how streams work

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search