Kafka Streams App-计数和总计
问题描述:
我正在尝试从KGroupedStream创建一个KTable,以存储每个键的值之和.
I'm trying to create a KTable from a KGroupedStream to store the sum of the value for each key.
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, Long> sum = builder.stream("streams-plaintext-input")
.groupByKey()
.aggregate(new Initializer<Long>() {
@Override
public Long apply() {
return Long.MIN_VALUE;
}
}, new Aggregator<String, Long, Long>() {
@Override
public Long apply(final String key, final Long value,final Long aggregate) {
aggregate += value;
return aggregate;
}
}, Materialized.<String, Long, KeyValueStore<Byte, byte[]>>as("counts-store"));
但出现错误:
The method aggregate(Initializer<VR>, Aggregator<? super Object,? super Object,VR>, Materialized<Object,VR,KeyValueStore<Bytes,byte[]>>) in the type KGroupedStream<Object,Object> is not applicable for the arguments (new Initializer<Long>(){}, new Aggregator<String,Long,Long>(){}, Materialized<String,Long,KeyValueStore<Byte,byte[]>>)
我看到的所有示例都将Serde作为第三个参数传递,但是我已经尝试过并得到了非常类似的错误(我认为这可能来自较旧的版本,因为它与当前版本的签名不匹配实施?):
All the examples I've seen pass in a Serde as the third argument but I have tried this and get a very similar error (I think this might be from an older version as it does not match with the signature of the current implementation?):
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, Long> sum = builder.stream("streams-plaintext-input")
.groupByKey()
.aggregate(new Initializer<Long>() {
@Override
public Long apply() {
return Long.MIN_VALUE;
}
}, new Aggregator<String, Long, Long>() {
@Override
public Long apply(final String key, final Long value,final Long aggregate) {
aggregate += value;
return aggregate;
}
}, Serdes.Long());
错误:
The method aggregate(Initializer<VR>, Aggregator<? super Object,? super Object,VR>, Materialized<Object,VR,KeyValueStore<Bytes,byte[]>>) in the type KGroupedStream<Object,Object> is not applicable for the arguments (new Initializer<Long>(){}, new Aggregator<String,Long,Long>(){}, Serde<Long>)
我在做什么错了?
使用Kafka版本:2.1.0
Using Kafka version: 2.1.0
答
您的代码中几乎没有问题:
There are few issues in your code:
- 对于
Materialized.as
而不是java.lang.Byte
,您应该传递org.apache.kafka.common.utils.Bytes
- 您不应修改
final
变量:aggregate + = value;
- 您必须将键和值的类型添加到
StreamsBuilder :: stream
调用(builder.< String,Long> stream("streams-plaintext-input")
)
- For
Materialized.as
insteadjava.lang.Byte
you should passorg.apache.kafka.common.utils.Bytes
- You shouldn't modify
final
variable:aggregate += value;
- You have to add types of key and value to
StreamsBuilder::stream
call (builder.<String, Long>stream("streams-plaintext-input")
)
修改后,其外观应大致如下:
After modification it should looks more or less as follow:
KTable<String, Long> sum = builder.<String, Long>stream("streams-plaintext-input")
.groupByKey()
.aggregate(new Initializer<Long>() {
@Override
public Long apply() {
return Long.MIN_VALUE;
}
}, new Aggregator<String, Long, Long>() {
@Override
public Long apply(final String key, final Long value,final Long aggregate) {
return aggregate + value;
}
}, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));