Storm 与 Redis 调整
Storm 与 Redis 整合
Storm 与 Redis 整合
原创编写: 王宇
2016-11-07
Storm 与 Redis 整合
参考资料:http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-redis.html
Strom为Redis提供几个标准Bolt
RedisLookupBolt 例子
RedisStoreBoltBolt 例子
非简单Bolt
RedisLookupBolt 源代码,用于理解WordCountRedisLookupMapper
Kafka + Storm + Redis 完整例子
依赖包
编译执行
参考资料:http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-redis.html
Strom为Redis提供几个标准Bolt
RedisLookupBolt 例子
RedisStoreBoltBolt 例子
非简单Bolt
RedisLookupBolt 源代码,用于理解WordCountRedisLookupMapper
Kafka + Storm + Redis 完整例子
依赖包
编译执行
参考资料:http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-redis.html
Strom为Redis提供几个标准Bolt
- RedisLookupBolt: 查询
- RedisStoreBolt: 存储
- AbstractRedisBolt: 存储
RedisLookupBolt 例子
从Redis中,查询单词的计算数量
classWordCountRedisLookupMapperimplementsRedisLookupMapper{
privateRedisDataTypeDescription description;
privatefinalString hashKey ="wordCount";
publicWordCountRedisLookupMapper(){
description =newRedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
publicList<Values> toTuple(ITuple input,Object value){
String member = getKeyFromTuple(input);
List<Values> values =Lists.newArrayList();
values.add(newValues(member, value));
return values;
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(newFields("wordName","count"));
}
@Override
publicRedisDataTypeDescription getDataTypeDescription(){
return description;
}
@Override
publicString getKeyFromTuple(ITuple tuple){
return tuple.getStringByField("word");
}
@Override
publicString getValueFromTuple(ITuple tuple){
returnnull;
}
}
JedisPoolConfig poolConfig =newJedisPoolConfig.Builder()
.setHost(host).setPort(port).build();
RedisLookupMapper lookupMapper =newWordCountRedisLookupMapper();
RedisLookupBolt lookupBolt =newRedisLookupBolt(poolConfig, lookupMapper);
- 对例子理解
-
如上图,JedisPoolConfig的作用是,提供Redis相关的配置,给RedisLookupBolt
RedisLookupMapper的作用是,绘制相应的数据结构给RedisLookupBolt.
例如- getKeyFromTuple()方法, 告诉RedisLookupBolt,从输入的Tuple中,取什么样子的Key,让它去Redis中查询。
- declareOutputFields()的作用是,定义RedisLookupBolt的输出格式。
RedisStoreBoltBolt 例子
classWordCountStoreMapperimplementsRedisStoreMapper{
privateRedisDataTypeDescription description;
privatefinalString hashKey ="wordCount";
publicWordCountStoreMapper(){
description =newRedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
publicRedisDataTypeDescription getDataTypeDescription(){
return description;
}
@Override
publicString getKeyFromTuple(ITuple tuple){
return tuple.getStringByField("word");
}
@Override
publicString getValueFromTuple(ITuple tuple){
return tuple.getStringByField("count");
}
}
JedisPoolConfig poolConfig =newJedisPoolConfig.Builder()
.setHost(host).setPort(port).build();
RedisStoreMapper storeMapper =newWordCountStoreMapper();
RedisStoreBolt storeBolt =newRedisStoreBolt(poolConfig, storeMapper);
非简单Bolt
采用 AbstractRedisBolt 实现更为复杂的逻辑
publicstaticclassLookupWordTotalCountBoltextendsAbstractRedisBolt{
privatestaticfinalLogger LOG =LoggerFactory.getLogger(LookupWordTotalCountBolt.class);
privatestaticfinalRandom RANDOM =newRandom();
publicLookupWordTotalCountBolt(JedisPoolConfig config){
super(config);
}
publicLookupWordTotalCountBolt(JedisClusterConfig config){
super(config);
}
@Override
publicvoid execute(Tuple input){
JedisCommands jedisCommands =null;
try{
jedisCommands = getInstance();
String wordName = input.getStringByField("word");
String countStr = jedisCommands.get(wordName);
if(countStr !=null){
int count =Integer.parseInt(countStr);
this.collector.emit(newValues(wordName, count));
// print lookup result with low probability
if(RANDOM.nextInt(1000)>995){
LOG.info("Lookup result - word : "+ wordName +" / count : "+ count);
}
}else{
// skip
LOG.warn("Word not found in Redis - word : "+ wordName);
}
}finally{
if(jedisCommands !=null){
returnInstance(jedisCommands);
}
this.collector.ack(input);
}
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
// wordName, count
declarer.declare(newFields("wordName","count"));
}
}
RedisLookupBolt 源代码,用于理解WordCountRedisLookupMapper
注意下列代码中的:
- get
package org.apache.storm.redis.bolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import redis.clients.jedis.JedisCommands;
import java.util.List;
/**
* Basic bolt for querying from Redis and emits response as tuple.
* <p/>
* Various data types are supported: STRING, LIST, HASH, SET, SORTED_SET, HYPER_LOG_LOG, GEO
*/
publicclassRedisLookupBoltextendsAbstractRedisBolt{
privatefinalRedisLookupMapper lookupMapper;
privatefinalRedisDataTypeDescription.RedisDataType dataType;
privatefinalString additionalKey;
/**
* Constructor for single Redis environment (JedisPool)
* @param config configuration for initializing JedisPool
* @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses
*/
publicRedisLookupBolt(JedisPoolConfig config,RedisLookupMapper lookupMapper){
super(config);
this.lookupMapper = lookupMapper;
RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
/**
* Constructor for Redis Cluster environment (JedisCluster)
* @param config configuration for initializing JedisCluster
* @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses
*/
publicRedisLookupBolt(JedisClusterConfig config,RedisLookupMapper lookupMapper){
super(config);
this.lookupMapper = lookupMapper;
RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
/**
* {@inheritDoc}
*/
@Override
publicvoid execute(Tuple input){
String key = lookupMapper.getKeyFromTuple(input);
Object lookupValue;
JedisCommands jedisCommand =null;
try{
jedisCommand = getInstance();
switch(dataType){
case STRING:
lookupValue = jedisCommand.get(key);
break;
case LIST:
lookupValue = jedisCommand.lpop(key);
break;
case HASH:
lookupValue = jedisCommand.hget(additionalKey, key);
break;
case SET:
lookupValue = jedisCommand.scard(key);
break;
case SORTED_SET:
lookupValue = jedisCommand.zscore(additionalKey, key);
break;
case HYPER_LOG_LOG:
lookupValue = jedisCommand.pfcount(key);
break;
case GEO:
lookupValue = jedisCommand.geopos(additionalKey, key);
break;
default:
thrownewIllegalArgumentException("Cannot process such data type: "+ dataType);
}
List<Values> values = lookupMapper.toTuple(input, lookupValue);
for(Values value : values){
collector.emit(input, value);
}
collector.ack(input);
}catch(Exception e){
this.collector.reportError(e);
this.collector.fail(input);
}finally{
returnInstance(jedisCommand);
}
}
/**
* {@inheritDoc}
*/
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
lookupMapper.declareOutputFields(declarer);
}
}
Kafka + Storm + Redis 完整例子
- 场景
参考 《Storm与Kafka整合记录》中,统计单词数量的例子。在这个例子基础之上,将统计到单词结果保存到Redis中。 - 修改Topology,增加RedisStoreBolt
publicclassKafkaStormSample{
publicclassKafkaStormSample{
publicstaticvoid main(String[] args)throwsException{
Config config =newConfig();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
String zkConnString ="localhost:2181";
String topic ="my-first-topic";
BrokerHosts hosts =newZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig =newSpoutConfig(hosts, topic,"/"+ topic, UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes =1024*1024*4;
kafkaSpoutConfig.fetchSizeBytes =1024*1024*4;
// kafkaSpoutConfig.forceFromStart = true;
kafkaSpoutConfig.scheme =newSchemeAsMultiScheme(newStringScheme());
// Creating RedisStoreBolt
String host ="localhost";
int port =6379;
JedisPoolConfig poolConfig =newJedisPoolConfig.Builder().setHost(host).setPort(port).build();
RedisStoreMapper storeMapper =newWordCountStoreMapper();
RedisStoreBolt storeBolt =newRedisStoreBolt(poolConfig, storeMapper);
// Assemble with topology
TopologyBuilder builder =newTopologyBuilder();
builder.setSpout("kafka-spout",newKafkaSpout(kafkaSpoutConfig));
builder.setBolt("word-spitter",newSplitBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("word-counter",newCountBolt()).shuffleGrouping("word-spitter");
builder.setBolt("redis-store-bolt", storeBolt).shuffleGrouping("word-counter");
LocalCluster cluster =newLocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
- 修改 CountBolt, emit to RedisStoreBolt
publicclassCountBoltimplementsIRichBolt{
Map<String,Integer> counters;
privateOutputCollector collector;
@Override
publicvoid prepare(Map stormConf,TopologyContext context,
OutputCollector collector){
this.counters =newHashMap<String,Integer>();
this.collector = collector;
}
@Override
publicvoid execute(Tuple input){
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str,1);
}else{
Integer c = counters.get(str)+1;
counters.put(str, c);
}
// emit to redis-store-bolt
Integer result = counters.get(str);
this.collector.emit(newValues(str,String.valueOf(result)));
collector.ack(input);
}
@Override
publicvoid cleanup(){
for(Map.Entry<String,Integer> entry:counters.entrySet()){
System.out.println(entry.getKey()+" : "+ entry.getValue());
}
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(newFields("word","count"));
}
@Override
publicMap<String,Object> getComponentConfiguration(){
returnnull;
}
}
依赖包
jedis-2.9.0.jar
commons-pool2-2.4.2.jar
将以上两个包,增加到CLASSPATH路径中
编译执行
-
启动服务
- 启动 zookeeper
$ cd /opt/zookeeper
$ ./bin/zkServer.sh start
- 启动 redis
$ redis-server
- 启动 kafka
$ cd /opt/kafka
$ ./bin/kafka-server-start.sh config/server.properties
- 启动 storm
$ cd /opt/storm
$./bin/storm nimbus
$./bin/storm supervisor
- 启动 zookeeper
-
输入数据
$cd /opt/kafka
$./bin/kafka-console-producer.sh --broker-list localhost:9092--topic my-first-topic
Hello
My first message
My second message
- 执行例子
$ java KafkaStormSample
- 在Redis-cli 中查询结果
$ redis-cli
127.0.0.1:6379> hvals wordCount