skip to Main Content

I found this repository at GitHub Long Polling Redis

So in spring boot, we can use a deferred request to hold the client request for several seconds (AppMsgController.java#L72)

and it will send back to the client until the deferred request is filled with the result (AppMsgHandler.java#L74) or until it reaches the timeout.

I also notice this mechanism also can be implemented with CompetableFuture in java using completeOnTimeout.

But I wonder can we use something similar in Kotlin Coroutines?

2

Answers


  1. In Kotlin coroutines there is the Deferred type, which is similar to CompletableFuture in the sense that it represents a value that is not yet available but probably will be in the future (if no error occurs/exception is thrown). @Joffrey pointed out that there is also a CompletableDeferred, which is even closer to ComplatableFuture enabling the user to manually call complete or exceptionallyComplete.

    Deferreds can easily be created with the async extension function on CoroutineScope. If you want to set a timeout, Kotlin has you covered with the withTimeout function that cancels the block of code after a given time.

    Note that withTimeout should be inside async and not the other way around.

    Take a look at this example: https://pl.kotl.in/uYe12ds7g

    Login or Signup to reply.
  2. As @Spitzbueb said, you could do something similar with CompletableDeferred.

    However, if you don’t need to support the clear() and count() methods, you could also probably simplify by replacing the ConcurrentHashMap with a simple MutableSharedFlow<Unit> that broadcasts "pings" from redis.

    In onMessage, you could emit Unit into the mutable shared flow to notify subscribers, and then you can simply implement your request mechanism by awaiting the first element on the shared flow and making the readSubset request:

    class AppMsgHandler(@Autowired private val appMsgRepo: AppMsgRepo) : MessageListener {
    
        private val events = MutableSharedFlow<Unit>()
    
        suspend fun requestMessages(start: Int, timeoutMillis: Long): List<AppMsg> {
            val currentMsgs = appMsgRepo.readSubset(start)
            if (currentMsgs.isNotEmpty()) {
                return currentMsgs
            }
            val newMessages = withTimeoutOrNull(timeoutMillis) {
                events.first()
                appMsgRepo.readSubset(start)
            }
            return newMessages ?: emptyList()
        }
    
        override fun onMessage(message: Message, pattern: ByteArray?) {
            LOG.info("RedisPub: {} on Channel: {}", String(message.body, UTF8), String(message.channel, UTF8))
            events.tryEmit(Unit)
        }
    
        companion object {
            private val LOG: Logger = LoggerFactory.getLogger(AppMsgHandler::class.java)
            private val UTF8: Charset = StandardCharsets.UTF_8
        }
    }
    

    The controller can then simply call requestMessages (provided you make your controller use suspend functions with Spring WebFlux).

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search