Kafka 与 Storm 调整
Kafka 与 Storm 整合
原创编写: 王宇
2016-10-27
整合Storm的概念
创建 Bolt
提交到 Topology
版本
编译
执行
整合Storm的概念
-
整合场景
A spout is a source of streams. For example, a spout may read tuples off a Kafka Topic and emit them as a stream. A bolt consumes input streams, process and possibly emits new streams. Bolts can do anything from running functions, filtering tuples, do streaming aggregations, streaming joins, talk to databases, and more. Each node in a Storm topology executes in parallel. A topology runs indefinitely until you terminate it. Storm will automatically reassign any failed tasks. Additionally, Storm guarantees that there will be no data loss, even if the machines go down and messages are dropped.
-
BrokerHosts - ZkHosts & StaticHosts
BrokerHosts is an interface and ZkHosts and StaticHosts are its two main implementations. ZkHosts is used to track the Kafka brokers dynamically by maintaining the details in ZooKeeper, while StaticHosts is used to manually / statically set the Kafka brokers and its details. ZkHosts is the simple and fast way to access the Kafka broker - KafkaConfig API
This API is used to define configuration settings for the Kafka cluster. The signature of Kafka Con-fig is defined as follows
public KafkaConfig(BrokerHosts hosts, string topic)Hosts − The BrokerHosts can be ZkHosts / StaticHosts.
Topic − topic name -
SpoutConfig API
Spoutconfig is an extension of KafkaConfig that supports additional ZooKeeper information.
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)- Hosts − The BrokerHosts can be any implementation of BrokerHosts interface
- Topic − topic name.
- zkRoot − ZooKeeper root path.
- id − The spout stores the state of the offsets its consumed in Zookeeper. The id should uniquely identify your spout.
-
SchemeAsMultiScheme (调度)????
- KafkaSpout API
KafkaSpout is our spout implementation, which will integrate with Storm. It fetches the messages from kafka topic and emits it into Storm ecosystem as tuples. KafkaSpout get its config-uration details from SpoutConfig.
// ZooKeeper connection string
BrokerHosts hosts =newZkHosts(zkConnString);
//Creating SpoutConfig Object
SpoutConfig spoutConfig =newSpoutConfig(hosts,
topicName,"/"+ topicName UUID.randomUUID().toString());
//convert the ByteBuffer to String.
spoutConfig.scheme =newSchemeAsMultiScheme(newStringScheme());
//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout =newKafkaSpout(spoutConfig);
创建 Bolt
- Bolt 接口定义参考《Storm学习笔记》
- 分割统计单词个数的Bolt例子
- SplitBolt.java
import java.util.Map;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.task.TopologyContext;
publicclassSplitBoltimplementsIRichBolt{
privateOutputCollector collector;
@Override
publicvoid prepare(Map stormConf,TopologyContext context,
OutputCollector collector){
this.collector = collector;
}
@Override
publicvoid execute(Tuple input){
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word: words){
word = word.trim();
if(!word.isEmpty()){
word = word.toLowerCase();
collector.emit(newValues(word));
}
}
collector.ack(input);
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(newFields("word"));
}
@Override
publicvoid cleanup(){}
@Override
publicMap<String,Object> getComponentConfiguration(){
returnnull;
}
}
- CountBolt.java
import java.util.Map;
import java.util.HashMap;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.task.TopologyContext;
publicclassCountBoltimplementsIRichBolt{
Map<String,Integer> counters;
privateOutputCollector collector;
@Override
publicvoid prepare(Map stormConf,TopologyContext context,
OutputCollector collector){
this.counters =newHashMap<String,Integer>();
this.collector = collector;
}
@Override
publicvoid execute(Tuple input){
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str,1);
}else{
Integer c = counters.get(str)+1;
counters.put(str, c);
}
collector.ack(input);
}
@Override
publicvoid cleanup(){
for(Map.Entry<String,Integer> entry:counters.entrySet()){
System.out.println(entry.getKey()+" : "+ entry.getValue());
}
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
}
@Override
publicMap<String,Object> getComponentConfiguration(){
returnnull;
}
}
提交到 Topology
- Topology概念参考《Storm学习笔记》
- KafkaStormSample.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.kafka.trident.GlobalPartitionInformation;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.Broker;
import org.apache.storm.kafka.StaticHosts;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.KafkaConfig;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.StringScheme;
publicclassKafkaStormSample{
publicstaticvoid main(String[] args)throwsException{
Config config =newConfig();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
String zkConnString ="localhost:2181";
String topic ="my-first-topic";
BrokerHosts hosts =newZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig =newSpoutConfig(hosts, topic,"/"+ topic, UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes =1024*1024*4;
kafkaSpoutConfig.fetchSizeBytes =1024*1024*4;
// kafkaSpoutConfig.forceFromStart = true;
kafkaSpoutConfig.scheme =newSchemeAsMultiScheme(newStringScheme());
TopologyBuilder builder =newTopologyBuilder();
builder.setSpout("kafka-spout",newKafkaSpout(kafkaSpoutConfig));
builder.setBolt("word-spitter",newSplitBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("word-counter",newCountBolt()).shuffleGrouping("word-spitter");
LocalCluster cluster =newLocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
版本
- Zookeeper: zookeeper-3.5.2-alpha.tar.gz
- Curator: 2.9.1
- SLF4j: slf4j-1.7.21.tar.gz
- Kafka: kafka_2.11-0.10.1.0
- Storm : apache-storm-1.0.2.tar.gz
- JSON: json-simple-1.1.1.jar
- JDK: 1.8.0
编译
-
依赖包
- Curator
Before moving compilation, Kakfa-Storm integration needs curator ZooKeeper client java library. 这个包的路径,要加到CLASSPATH中
curator-client-2.9.1.jar
curator-framework-2.9.1.jar
- JSON
json-simple-1.1.1.jar - Storm-Kafka
json-simple-1.1.1.jar - Kafka Lib
- Storm Lib
- SLF4J
当CLASSPATH中,包含了Kafka 和Storm 的lib 后, SLF4j会产生冲突,将Kafka中的SLF4j 移除CLASSPATH即可
- Curator
-
编译命令
$ javac *.java
执行
- 开启服务: Zookeeper Kafka Storm
$cd /opt/zookeeper
$.bin/zkServer.sh start
$cd /opt/kafka
$./bin/kafaka-server-start.sh config/server.properties
$ cd /opt/storm
$./bin/storm nimbus
$./bin/storm supervisor
- 创建Topic: “my-first-topic”
$ ./bin/kafktopics.sh --create --zookeeper localhost:2181--replication-factor 1--partitions 1--topic my-first-topic
$ ./bin/kafktopics.sh --list --zookeeper localhost:2181
- 在Kafka Producer CLI 输入message
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092--topic my-first-topic
hello
kafka
storm
spark
test message
anther test message
- 执行例子
$ java KafkaStormSample
- 输出结果
storm :1
test :2
spark :1
another :1
kafka :1
hello :1
message :2