skip to Main Content

I am trying to extract twitter data using rest API in zeppelin. Tried both option registerAsTable and registerTempTable, both ways are not working. Please help me to resolve the error. Getting below error while executing zeppelin Tutorial Code:

error: value registerAsTable is not a member of org.apache.spark.rdd.RDD[Tweet] ).foreachRDD(rdd=> rdd.registerAsTable(“tweets”)

3

Answers


  1. RDD cannot be registered as Table whereas dataframe can. You can convert your RDD into dataframe and then write the resulting dataframe as tempTable or table.

    You can convert RDD into Dataframe as below

    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._
    rdd.toDF()
    

    Refer How to convert rdd object to dataframe in spark and http://spark.apache.org/docs/latest/sql-programming-guide.html

    Login or Signup to reply.
  2. in zepplin interpretors add external dependency of org.apache.bahir:spark-streaming-twitter_2.11:2.0.0 from GUI and after that run following using spark-2.0.1

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.{ SparkConf, SparkContext}
    import org.apache.spark.storage.StorageLevel
    import scala.io.Source
    //import org.apache.spark.Logging
    import java.io.File
    import org.apache.log4j.Logger
    import org.apache.log4j.Level
    import sys.process.stringSeqToProcess
    
    import scala.collection.mutable.HashMap
    /** Configures the Oauth Credentials for accessing Twitter */
    def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
      val configs = new HashMap[String, String] ++= Seq(
        "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
      println("Configuring Twitter OAuth")
      configs.foreach{ case(key, value) =>
        if (value.trim.isEmpty) {
          throw new Exception("Error setting authentication - value for " + key + " not set")
        }
        val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
        System.setProperty(fullKey, value.trim)
        println("tProperty " + fullKey + " set as [" + value.trim + "]")
      }
      println()
    }
    
    
    // Configure Twitter credentials , following config values will not work,it is for show off
    val apiKey = "7AVLnhssAqumpgY6JtMa59w6Tr"
    val apiSecret = "kRLstZgz0BYazK6nqfMkPvtJas7LEqF6IlCp9YB1m3pIvvxrRZl"
    val accessToken = "79438845v6038203392-CH8jDX7iUSj9xmQRLpHqLzgvlLHLSdQ"
    val accessTokenSecret = "OXUpYu5YZrlHnjSacnGJMFkgiZgi4KwZsMzTwA0ALui365"
    configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
    
    import org.apache.spark.{ SparkConf, SparkContext}
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.twitter._
    import org.apache.spark.SparkContext._
    
    val ssc = new StreamingContext(sc, Seconds(2))
    
    val tweets = TwitterUtils.createStream(ssc, None)
    val twt = tweets.window(Seconds(10))
    
    //twt.print
    
    
    val sqlContext= new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    
    case class Tweet(createdAt:Long, text:String)
    
    val tweet = twt.map(status=>
      Tweet(status.getCreatedAt().getTime()/1000, status.getText())
    )
    
    
    tweet.foreachRDD(rdd=>rdd.toDF.registerTempTable("tweets"))
    ssc.start()
    //ssc.stop()
    

    After that run some queries in the table in another zappelin cell

    %sql select createdAt, text  from tweets   limit 50
    
    Login or Signup to reply.
  3. val data = sc.textFile("/FileStore/tables/uy43p2971496606385819/testweet.json");
    

    //convert RDD to DF

    val inputs= data.toDF();
    inputs.createOrReplaceTempView("tweets");
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search