Apache Camel/ActiveMQ 优先级路由

问题描述:

我有两个具有相同使用者的 AMQ 队列.第一个队列 (Q1) 处理 97% 的消息,另一个队列 (Q2) 仅处理 3%.问题是 Q2 中的消息需要在消息排队后立即处理.所以我的问题是,当第 2 季度有消息可用时,我需要以某种方式暂停第一条路线以接收它的消费者.apache 骆驼路由看起来像这样:

I have two AMQ queues that have the same consumers. The first queue (Q1) handles 97% of the messages and the other (Q2) only 3%. Problem is that messages in Q2 need to process messages as soon as they are queued. So my problem is that when a message is available in Q2 I need somehow to suspend the first route to take it's consumers. The apache camel routing looks like this:

<route id="q1">
    <from uri="jms:recordAnalysisRequests" />
    <to uri="bean:analysisService" />
</route>
<route id="q2">
    <from uri="jms:recordAnalysisRequestsFastTrack" />
    <to uri="bean:analysisService" />
</route>

应该使用什么策略?我不认为我可以使用重新排序器,因为 Q1 可能有数千条消息排队,而且我无法将所有消息都放入重新排序器批次中.我正在查看路由节流,但我不知道该怎么做.另外我想知道是否可以通过zookeeper节点进行同步.如果此解决方案可行,我将再次需要一些指导.

What strategy should use? I don't think I can use resequencer because Q1 could have thousands of messages queued up and I cannot fit all in a resequencer batch. I was looking at the route throttling but I cannot figure out how to do it. Also I was wondering if I can synchronize through a zookeeper node. Again I will need some guidance here if this solution is viable.

您可以将所有消息放在一个队列中并使用消息优先级 http://activemq.apache.org/how-can-i-support-priority-queues.html

You could place all messages in a single queue and use message priority http://activemq.apache.org/how-can-i-support-priority-queues.html

第二个选项,使用 Camel Resequencer

Second option, use a Camel Resequencer

<route id="q1">
    <from uri="jms:recordAnalysisRequests" />
    <setHeader headerName="CustomPriority">
       <constant>2</constant>       
    </setHeader>
    <to uri="direct:analysisDirect" />
</route>
<route id="q2">
    <from uri="jms:recordAnalysisRequestsFastTrack" />
    <setHeader headerName="CustomPriority">
       <constant>1</constant>       
    </setHeader>
    <to uri="direct:analysisDirect" />
</route>
<route id="q3">
    <from uri="direct:analysisDirect">
    <resequence>
        <header>CustomPriority</header>
        <to uri="bean:analysisService" />
    </resequence>
</route>

第三个选项(Camel 2.12),而不是使用重新排序器,使用带有 PriorityBlockingQueue 的 SEDA 端点 https://camel.apache.org/seda.html#SEDA-ChoosingBlockingQueueimplementation

Third option (Camel 2.12), insteads of using a resequencer, use a SEDA endpoint with a PriorityBlockingQueue https://camel.apache.org/seda.html#SEDA-ChoosingBlockingQueueimplementation