kafka 的 createDirectStream
kafka api中给出2类直接获取流的接口:createStream和createDirectStream。
createStream比较简单,只需topic、groupid、zookeeper就可以直接获取流,brokers和offset都是黑盒无需进行控制,但在项目中往往不受控。以下是部分源码:
/** * Create an input stream that pulls messages from Kafka Brokers. * @param ssc StreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) * @param groupId The group id for this consumer * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) * @return DStream of (Kafka message key, Kafka message value) */ def createStream( ssc: StreamingContext, zkQuorum: String, groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[(String, String)] = { val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, "zookeeper.connection.timeout.ms" -> "10000") createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics, storageLevel) }
createDirectStream直接去操作kafka,需要自己手动保存offset,方法的注释写的还是很明白的,以下是部分源码:
/** * Create an input stream that directly pulls messages from Kafka Brokers * without using any receiver. This stream can guarantee that each message * from Kafka is included in transformations exactly once (see points below). * * Points to note: * - No receivers: This stream does not use any receiver. It directly queries Kafka * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked * by the stream itself. For interoperability with Kafka monitoring tools that depend on * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. * You can access the offsets used in each batch from the generated RDDs (see * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). * - Failure Recovery: To recover from driver failures, you have to enable checkpointing * in the [[StreamingContext]]. The information on consumed offset can be * recovered from the checkpoint. See the programming guide for details (constraints, etc.). * - End-to-end semantics: This stream ensures that every records is effectively received and * transformed exactly once, but gives no guarantees on whether the transformed data are * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure * that the output operation is idempotent, or use transactions to output records atomically. * See the programming guide for more details. * * @param ssc StreamingContext object * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" * to be set with Kafka broker(s) (NOT zookeeper servers) specified in * host1:port1,host2:port2 form. * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) * starting point of the stream * @param messageHandler Function for translating each message and metadata into the desired type * @tparam K type of Kafka message key * @tparam V type of Kafka message value * @tparam KD type of Kafka message key decoder * @tparam VD type of Kafka message value decoder * @tparam R type returned by messageHandler * @return DStream of R */ def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R ): InputDStream[R] = { val cleanedHandler = ssc.sc.clean(messageHandler) new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets, cleanedHandler) }