skip to Main Content

i am new to spark.
We are currently building a pipeline :

  1. Read the events from Kafka topic
  2. Enrich this data with the help of Redis-Lookup
  3. Write events to the new Kafka topic

So, my problem is when i want to use spark-redis library it performs very well, but data stays static in my streaming job.

Although data is refreshed at Redis, it does not reflect to my dataframe.
Spark reads data at first then never updates it.
Also i am reading from REDIS data at first,total data about 1mio key-val string.

What kind of approaches/methods i can do, i want to use Redis as in-memory dynamic lookup.
And lookup table is changing almost 1 hour.

Thanks.

used libraries:
spark-redis-2.4.1.jar
commons-pool2-2.0.jar
jedis-3.2.0.jar

Here is the code part:

import com.intertech.hortonworks.spark.registry.functions._
val config = Map[String, Object]("schema.registry.url" -> "http://aa.bbb.ccc.yyy:xxxx/api/v1")
implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)
var rawEventSchema = sparkSchema("my_raw_json_events") 


val my_raw_events_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
.option("subscribe", "my-raw-event")
.option("failOnDataLoss","false")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger",1000)
.load()
.select(from_json($"value".cast("string"),rawEventSchema, Map.empty[String, String])
        .alias("C"))


import com.redislabs.provider.redis._
val sc = spark.sparkContext
val stringRdd = sc.fromRedisKV("PARAMETERS:*") 
val lookup_map = stringRdd.collect().toMap
val lookup = udf((key: String) => lookup_map.getOrElse(key,"") )



val curated_df = my_raw_events_df 
.select(

     ...
     $"C.SystemEntryDate".alias("RecordCreateDate")
    ,$"C.Profile".alias("ProfileCode")     
    ,**lookup(expr("'PARAMETERS:PROFILE||'||NVL(C.Profile,'')")).alias("ProfileName")**
    ,$"C.IdentityType"     
    ,lookup(expr("'PARAMETERS:IdentityType||'||NVL(C.IdentityType,'')")).alias("IdentityTypeName")     
     ...

).as("C")



import org.apache.spark.sql.streaming.Trigger

val query = curated_df
   .select(to_sr(struct($"*"), "curated_event_sch").alias("value"))
   .writeStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
   .option("topic", "curated-event")
   .option("checkpointLocation","/user/spark/checkPointLocation/xyz")
   .trigger(Trigger.ProcessingTime("30 seconds"))
   .start()

   query.awaitTermination()

2

Answers


  1. One option is to not use spark-redis, but rather lookup in Redis directly. This can be achieved with df.mapPartitions function. You can find some examples for Spark DStreams here https://blog.codecentric.de/en/2017/07/lookup-additional-data-in-spark-streaming/. The idea for Structural Streaming is similar. Be careful to handle the Redis connection properly.

    Login or Signup to reply.
  2. Another solution is to do a stream-static join (spark docs):

    Instead of collecting the redis rdd to the driver, use the redis dataframe (spark-redis docs) as a static dataframe to be joined with your stream, so it will be like:

    val redisStaticDf = spark.read. ...
    val streamingDf = spark.readStream. ...
    
    streamingDf.join(redisStaticDf, ...)   
    

    Since spark micro-batch execution engine evaluates the query-execution on each trigger, the redis dataframe will fetch the data on each trigger, providing you an up-to-date data (if you will cache the dataframe it won’t)

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search