工作中使用Kafka接收消息代码收拾
工作中使用Kafka接收消息代码整理
前段时间工作中使用到Kafka接收消息,简单整理一下:
Properties props = new Properties(); props.put("zookeeper.connect", zkServer); props.put("group.id",groupId); //使用zk集群管理 ConsumerConfig conf = new ConsumerConfig(props); kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topicId, 1); Map<String, List<KafkaStream<String, LogInfoBean>>> consumerMap = consumer.createMessageStreams(topicCountMap,new StringDecoder(new VerifiableProperties()), new DiggerEventDecoder()); List<KafkaStream<String,LogInfoBean>> streams = consumerMap.get(topicId); KafkaStream<String,LogInfoBean> stream = streams.get(0); ConsumerIterator<String,LogInfoBean> it = stream.iterator(); //没有消息时会阻塞线程 while (it.hasNext()){ LogInfoBean msgBean = it.next().message(); String logMessage = msgBean.getMsg(); //处理接收到的消息 //... }