skip to Main Content

With Radisson, simply receiving events is enough to add new items to the list. To do this, you need to do the following:

object Test extends App {
  val redisson = Redisson.create()
  val events = redisson.getQueue[String]("minio_events", new StringCodec())
  events.addListener(new ListAddListener() {
    override def onListAdd(o: String): Unit = println(o)
  })
}

Difficulties begin when it needs to be wrapped in ZIO. How can I wrap this event in ZIO or ZStream to start the chain of event processing?

2

Answers


  1. ZStream is pull based, that means you’ll have to pull the data from minio_events in some way

    val redisson = Redisson.create()    
    val bqueue : RQueue[String] = redisson.getQueue("minio_events", new StringCodec())
        
    val pollQueue = 
      ZIO
        .effect(Option(bqueue.poll())) // RQueue.poll returns null if the queue is empty
        .someOrFail(NoElementsOnStream)
    

    This creates a ZIO[Any, Throwable, String] representing your polling operation which can now be turned into a ZStream by calling the ZStream.fromEffect method

    ZStream
      .fromEffect(pollQueue)
      .foreach(s => putStrLn(s))
      .exitCode
    

    If you place this code inside a zio.App main function, you’ll see that it runs only once. So we need to make it run forever and retry it until if finds another element

    ZStream
      .fromEffect(pollQueue)
      .retry(Schedule.spaced(1.second))
      .forever
      .foreach(s => putStrLn(s))  
      .exitCode
    
    Login or Signup to reply.
  2. It looks like Redisson supports converting the RedissonClient into a reactive streams client which there is a zio-interop for. But if you just want to work directly with the java interface I think you can do something like this (note I haven’t actually tested this):

    object Test extends zio.App {
       def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] = 
           (for {
             // Construct the client in the scope of the stream so it shuts down when done
             c <- ZStream.managed(ZManaged.makeEffect(Redisson.create())(_.shutdown()))
    
             // Variant of effectAsync* that lets you specify an interrupter
             s <- ZStream.effectAsyncInterrupt[Any, Nothing, String] { k =>
               val queue = c.getQueue[String]("", new StringCodec())
               val listenerId = queue.addListener(new ListAddListener {
    
                 // Invoke the callback by passing in a ZIO with a single chunk
                 def onListAdd(name: String): Unit = k(ZIO.succeed(Chunk.single(name)))
               })
    
               // Return a cancellation handler.
               Left(UIO(queue.removeListener(listenerId)))
            }
          } { zio.console.putStrLn(s) }).exitCode
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search