skip to Main Content

With Radisson, simply receiving events is enough to add new items to the list. To do this, you need to do the following:

object Test extends App {
  val redisson = Redisson.create()
  val events = redisson.getQueue[String]("minio_events", new StringCodec())
  events.addListener(new ListAddListener() {
    override def onListAdd(o: String): Unit = println(o)

Difficulties begin when it needs to be wrapped in ZIO. How can I wrap this event in ZIO or ZStream to start the chain of event processing?



  1. ZStream is pull based, that means you’ll have to pull the data from minio_events in some way

    val redisson = Redisson.create()    
    val bqueue : RQueue[String] = redisson.getQueue("minio_events", new StringCodec())
    val pollQueue = 
        .effect(Option(bqueue.poll())) // RQueue.poll returns null if the queue is empty

    This creates a ZIO[Any, Throwable, String] representing your polling operation which can now be turned into a ZStream by calling the ZStream.fromEffect method

      .foreach(s => putStrLn(s))

    If you place this code inside a zio.App main function, you’ll see that it runs only once. So we need to make it run forever and retry it until if finds another element

      .foreach(s => putStrLn(s))  
    Login or Signup to reply.
  2. It looks like Redisson supports converting the RedissonClient into a reactive streams client which there is a zio-interop for. But if you just want to work directly with the java interface I think you can do something like this (note I haven’t actually tested this):

    object Test extends zio.App {
       def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] = 
           (for {
             // Construct the client in the scope of the stream so it shuts down when done
             c <- ZStream.managed(ZManaged.makeEffect(Redisson.create())(_.shutdown()))
             // Variant of effectAsync* that lets you specify an interrupter
             s <- ZStream.effectAsyncInterrupt[Any, Nothing, String] { k =>
               val queue = c.getQueue[String]("", new StringCodec())
               val listenerId = queue.addListener(new ListAddListener {
                 // Invoke the callback by passing in a ZIO with a single chunk
                 def onListAdd(name: String): Unit = k(ZIO.succeed(Chunk.single(name)))
               // Return a cancellation handler.
          } { zio.console.putStrLn(s) }).exitCode
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top