skip to Main Content

I have a requirement to read the data from redis cache in flink, but as per the requirements the cache data will be refreshed on an average every two hours. I was going through the documentation and other stackoverflow questions some people suggesting to use the apache-bahir driver to connect to redis cache but the driver is outdated and not maintained by flink. I am using richsink function in flink to connect to redis cache, but i am unable to figure out is there any way to refresh the cache from sink function. Is it recommended to use the sink function in flink to connect to redis.

2

Answers


  1. While one of the Redis sinks may not support this out of the box, it seems like something that you may be able to explicitly handle with a process function within your pipeline that makes the appropriate API call to Redis (may vary based on your configuration).

    You could implement this functionality with an explicit TTL that represents your update frequency (in this case two hours). This way as data passes through, it would check if the cache needed to be refreshed (based on a TTL) and if the state had expired, you could issue a request to refresh it that might look something like:

    class RedisCheckFunction: ProcessFunction<YourClass, YourClass>() {
        // ValueState for storing the boolean value
        private lateinit var shouldReset: ValueState<Boolean>
    
        @Throws(Exception::class)
        override fun open(parameters: Configuration?) {
            super.open(parameters)
    
            // Store a reference to the state for Redis
            val descriptor = ValueStateDescriptor("redis-reset-state", Boolean::class.java)
    
            // Set TTL for the state
            descriptor.enableTimeToLive(
                StateTtlConfig.newBuilder(Time.hours(2))
                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    .build()
            )
    
            // Materialize the state itself
            shouldReset = runtimeContext.getState(descriptor)
        }
    
        @Throws(Exception::class)
        override fun processElement(record: YourClass, context: Context, collector: Collector<YourClass>) {
            // Read the current value from state
            val shouldResetRedis = shouldReset.value()
            if (shouldResetRedis) {
                issueHttpRequestToUpdateRedis()
                shouldReset.update(true)
            } 
            
            collector.collect(record)
        }
        
        private fun issueHttpRequestToUpdateRedis() {
            // Omitted for brevity (use an HttpClient to issue a request to Redis)
        }
    }
    

    And this could function as a pass through prior to your sink (or if you are writing a custom sink, add this functionality to the sink):

    yourStream
        .process(RedisCheckFunction())
        .addSink(...)
    
    Login or Signup to reply.
  2. It might prove worthwhile to build on top of the Generic Asynchronous Base Sink.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search