Apache Camel/ActiveMQ 优先级路由
我有两个具有相同使用者的 AMQ 队列.第一个队列 (Q1) 处理 97% 的消息,另一个队列 (Q2) 仅处理 3%.问题是 Q2 中的消息需要在消息排队后立即处理.所以我的问题是,当第 2 季度有消息可用时,我需要以某种方式暂停第一条路线以接收它的消费者.apache 骆驼路由看起来像这样:
I have two AMQ queues that have the same consumers. The first queue (Q1) handles 97% of the messages and the other (Q2) only 3%. Problem is that messages in Q2 need to process messages as soon as they are queued. So my problem is that when a message is available in Q2 I need somehow to suspend the first route to take it's consumers. The apache camel routing looks like this:
<route id="q1">
<from uri="jms:recordAnalysisRequests" />
<to uri="bean:analysisService" />
</route>
<route id="q2">
<from uri="jms:recordAnalysisRequestsFastTrack" />
<to uri="bean:analysisService" />
</route>
应该使用什么策略?我不认为我可以使用重新排序器,因为 Q1 可能有数千条消息排队,而且我无法将所有消息都放入重新排序器批次中.我正在查看路由节流,但我不知道该怎么做.另外我想知道是否可以通过zookeeper节点进行同步.如果此解决方案可行,我将再次需要一些指导.
What strategy should use? I don't think I can use resequencer because Q1 could have thousands of messages queued up and I cannot fit all in a resequencer batch. I was looking at the route throttling but I cannot figure out how to do it. Also I was wondering if I can synchronize through a zookeeper node. Again I will need some guidance here if this solution is viable.
您可以将所有消息放在一个队列中并使用消息优先级 http://activemq.apache.org/how-can-i-support-priority-queues.html
You could place all messages in a single queue and use message priority http://activemq.apache.org/how-can-i-support-priority-queues.html
第二个选项,使用 Camel Resequencer
Second option, use a Camel Resequencer
<route id="q1">
<from uri="jms:recordAnalysisRequests" />
<setHeader headerName="CustomPriority">
<constant>2</constant>
</setHeader>
<to uri="direct:analysisDirect" />
</route>
<route id="q2">
<from uri="jms:recordAnalysisRequestsFastTrack" />
<setHeader headerName="CustomPriority">
<constant>1</constant>
</setHeader>
<to uri="direct:analysisDirect" />
</route>
<route id="q3">
<from uri="direct:analysisDirect">
<resequence>
<header>CustomPriority</header>
<to uri="bean:analysisService" />
</resequence>
</route>
第三个选项(Camel 2.12),而不是使用重新排序器,使用带有 PriorityBlockingQueue 的 SEDA 端点 https://camel.apache.org/seda.html#SEDA-ChoosingBlockingQueueimplementation
Third option (Camel 2.12), insteads of using a resequencer, use a SEDA endpoint with a PriorityBlockingQueue https://camel.apache.org/seda.html#SEDA-ChoosingBlockingQueueimplementation