如何使用骆驼聚合将一条消息聚合到多个组中?
我正在尝试生成连续市场数据的汇总视图,这意味着我们需要计算每 2 条消息的总和值.假设输入的数据为:
I'm trying to generate a aggregate view of consecutive market data, which means we need to calculate the sum value every 2 message. say the data coming in as:
(V0,T0),(V1,T1),(V2,T2),(V3,T3)....
V
表示值 T
表示我们收到数据时的时间戳.
V
means value T
means timestamp when we receive the data.
我们需要为每 2 个点生成总和说:
We need to generate the sum for every 2 points say:
(R1=Sum(V0,V1),T1),(R2=Sum(V1,V2),T2),(R3=Sum(V2,V3),T3),....
任何建议我们如何使用 aggregator2
来做到这一点,或者我们需要为此编写一个处理器?
Any suggestion how can we do this by using aggregator2
or we need to write a processor for this?
在阅读 Aggregator 的源代码后,camel 只将一条消息聚合到一个组,为此我们必须构建一个聚合器".这是代码:
After reading the source code of Aggregator, it turns out that camel only aggregate one message to one group, we have to build a "aggregator" for this purpose. here is the code:
public abstract class GroupingGenerator<I> implements Processor {
private final EvictingQueue<I> queue;
private final int size;
public int getSize() {
return size;
}
public GroupingGenerator(int size) {
super();
this.size = size;
this.queue = EvictingQueue.create(size);
}
@SuppressWarnings("unchecked")
@Override
public void process(Exchange exchange) throws Exception {
queue.offer((I) exchange.getIn().getBody());
if (queue.size() != size) {
exchange.setProperty(Exchange.ROUTE_STOP, true);
return;
} else {
processGroup(queue, exchange);
}
}
protected abstract void processGroup(Collection<I> items, Exchange exchange);
}