从Spark-Streaming到Cassandra存储数据时出现问题

从Spark-Streaming到Cassandra存储数据时出现问题

问题描述:

SparkStreaming上下文以30秒的间隔从RabbitMQ读取流.我想修改cassandra中对应行的几列的值,然后再将数据存储回Cassandra.为此,我需要检查特定主键的行是否在Cassandra中存在,如果可以,则将其获取并进行必要的操作.但是问题是,我在驱动程序上创建了StreamingContext,并且在Worker上执行了操作.所以,他们不能得到StreamingContext对象的原因,因为它没有被序列化并发送给工人,而我却得到了这个错误: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext.我也知道我们无法访问foreachRDD中的StreamingContext.但是,如何在这里实现相同的功能而又不会出现序列化错误?

SparkStreaming context reading a stream from RabbitMQ with an interval of 30 seconds. I want to modify the values of few columns of corresponding rows existing in cassandra and then want to store data back to Cassandra. For that i need to check whether the row for the particular primary key exist in Cassandra or not if, yes, fetch it and do the necessary operation. But the problem is, i create the StreamingContext on the driver and actions get performed on Worker. So, they are not able to get the StreamingContext object reason being it wasn't serialized and sent to workers and i get this error : java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext. I also know that we cannot access the StreamingContext inside foreachRDD. But, How do i achieve the same functionality here without getting serialization error?

我已经在此处但没有帮助.

I have looked at fews examples here but it didn't help.

以下是代码段:

   val ssc = new StreamingContext(sparkConf,30)
    val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)
    receiverStream.start()      
    val lines = receiverStream.map(EventData.fromString(_))
    lines.foreachRDD{ x => if (x.toLocalIterator.nonEmpty) {
                x.foreachPartition { it => for (tuple <- it) { 
                val cookieid  = tuple.cookieid                
                val sessionid = tuple.sessionid              
                val logdate = tuple.logdate
                val EventRows =  ssc.cassandraTable("SparkTest", CassandraTable).select("*")
                .where("cookieid = '" + cookieid + "' and logdate = '" + logdate+ "' and sessionid = '" + sessionid + "')

                   Somelogic Whether row exist or not for Cookieid

                }  } }

SparkContext无法序列化并在可能在不同节点中的多个工作进程之间传递.如果您需要执行此类操作,则可以使用forEachPartiion,mapPartitons. 否则,您的函数会被传递

The SparkContext cannot be serialized and passed across multiple workers in possibly different nodes. If you need to do something like this you could use forEachPartiion, mapPartitons. Else do this withing your function that gets passed around

 CassandraConnector(SparkWriter.conf).withSessionDo { session =>
  ....
    session.executeAsync(<CQL Statement>)

在SparkConf中,您需要提供Cassandra详细信息

and in the SparkConf you need to give the Cassandra details

  val conf = new SparkConf()
    .setAppName("test")
    .set("spark.ui.enabled", "true")
    .set("spark.executor.memory", "8g")
    //  .set("spark.executor.core", "4")
    .set("spark.eventLog.enabled", "true")
    .set("spark.eventLog.dir", "/ephemeral/spark-events")
    //to avoid disk space issues - default is /tmp
    .set("spark.local.dir", "/ephemeral/spark-scratch")
    .set("spark.cleaner.ttl", "10000")
    .set("spark.cassandra.connection.host", cassandraip)
    .setMaster("spark://10.255.49.238:7077")

Java CSCParser是不可序列化的库.因此,如果您在RDD上调用map或forEach,Spark无法将其发送给其他节点.一种解决方法是使用mapPartion,在这种情况下,将在一个SparkNode中执行一个完整的Parition.因此,它不需要为每个调用序列化.示例

The Java CSCParser is a library that is not serializable. So Spark cannot send it possibly different nodes if you call map or forEach on the RDD. One workaround is using mapPartion, in which case one full Parition will be executed in one SparkNode. Hence it need not serialize for each call.Example

val rdd_inital_parse = rdd.mapPartitions(pLines).

 def pLines(lines: Iterator[String]) = {
    val parser = new CSVParser() ---> Cannot be serialized, will fail if using rdd.map(pLines)
    lines.map(x => parseCSVLine(x, parser.parseLine))
  }