Kafka Consumer for Spark 用 Scala 编写,用于 Kafka API 0.10:自定义 AVRO 解串器

问题描述:

我正在将我的 Spark Scala App Kafka API 升级到 0.10 版.我曾经创建自定义方法来反序列化字节字符串格式的消息.

I am upgrading my Spark Scala App Kafka API to v. 0.10. I used to create custom method for deserialization of the message which comes in byte string format.

我意识到有一种方法可以将 StringDeserializer 或 ByteArrayDeserializer 作为参数传递给键或值.

I have realized there is a way to pass StringDeserializer or ByteArrayDeserializer as parameter to either key or value.

但是,我找不到有关如何创建自定义 Avro 模式解串器的任何信息,因此我的 kafkaStream 可以在我 createDirectStream 并使用来自 Kafka 的数据时使用它.

However,I can not find any information on how to create custom Avro schema deserializer so my kafkaStream can use it when I createDirectStream and consume data from Kafka.

有可能吗?

这是可能的.你需要覆盖org.apache.kafka.common.serialization中定义的Deserializer接口,你需要指向key.deserializervalue.deserializer 通过包含 Kafka 参数的 ConsumerStrategy[K, V] 类添加到您的自定义类.例如:

It is possible. You need to override the Deserializer<T> interface defined in org.apache.kafka.common.serialization and you need to point key.deserializer or value.deserializer to your custom class via the ConsumerStrategy[K, V] class which holds the Kafka parameters. For example:

import org.apache.kafka.common.serialization.Deserializer

class AvroDeserializer extends Deserializer[Array[Byte]] {
  override def configure(map: util.Map[String, _], b: Boolean): Unit = ???
  override def close(): Unit = ???
  override def deserialize(s: String, bytes: Array[Byte]): Array[Byte] = ???
}

然后:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import my.location.with.AvroDeserializer

val ssc: StreamingContext = ???
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[AvroDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("sometopic")
val stream = KafkaUtils.createDirectStream[String, MyTypeWithAvroDeserializer](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)