Java监视ActiveMQ的,但没有投票队列
我需要一些编程,监控的ActiveMQ的队列在java中。这意味着我需要登录时,排队队列的消息,当出队的消息。我的程序不能发送邮件或接收消息,只需要登录。
I need to program something that monitors an activemq queue in java. This means I need to log when a message is enqueued on the queue and when a message is dequeued. My program must not send messages or receives messages, it only needs to log.
我发现将消息传递和接收消息,但是这不是我想做的事情,只要登录如果外部进程对或关闭队列中的消息。
I found out to push messages and receive messages but this is not what I want to do, just log if an external process puts messages on or off the queue.
要更清楚我做了一个绘图
To make it more clear I made a drawing
我使用Apache的骆驼进行整合,
我routebuilder看起来像
I use apache camel to make the integration, my routebuilder looks like
public void configure() throws Exception {
Processor queueProcessor = new QueueProcessor();
from("activemq:queue:KBC").process(queueProcessor);
}
它调用folowwing处理器
it calls the folowwing processor
@Override
public void process(Exchange exchange) throws Exception {
Trax_EventDao dao = new Trax_EventDao();
dao.insert(new Trax_Event("Queue",exchange.getExchangeId(),"UP","KBC", new Time(new Date().getTime())));
}
在DAO处理数据库连接,使记录的插入
The dao handles a database connection and makes an insert of a record
实际的问题是,当我把在队列中的消息,并在程序运行时,该消息得到了记录这是好的,但也立即得到调查,这是不行的。
我怎样才能让没有消息插入轮询?
The actual problem is that when I push a message on the queue and the program runs, the message got logged which is okay, but it also get polled immediately, which is not okay. How can I make the insert without the message being polled?
我终于做到了在写一个自己的亚军类,它使用一个queuebrowser。
What I finally did was writing an own runner class, which uses a queuebrowser.
我想用这个类来要做的就是
What I wanted to do with this class is
- 请与avtivemq的连接,并启动
- 请一个无限循环,它控制指定的队列。我对队列中的项目的列表。在每一个循环我检查这个
-
如果列表比队列的大小越大,有出列的项目。这意味着我需要循环,并核对哪些项目离队。
Otherwhise我循环队列的枚举和元素添加到列表中,如果还不存在的话
- Make a connection with avtivemq and start it
- Make an endless loop, which controls the queue specified. I have a list of the items on the queue. At every loop I check this
if the list is bigger than the size of the queue, there are items dequeued. This means I need to loop this and check which items are dequeued. Otherwhise I loop the enumeration of the queue and add elements to the list if they don't exist yet
package queueFeed;
import dao.ProcmonDao;
import dao.EventDao;
import domain.Event;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import javax.jms.*;
import java.sql.SQLException;
import java.sql.Time;
import java.util.*;
public class QueueRunner {
private ProcmonDao dao;
private Connection connection;
private String queueName;
public QueueRunner() throws SQLException {
dao = new EventDao();
}
public void setConnection(String username, String password, String url) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, url);
connection = factory.createConnection();
}
public void run() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = session.createBrowser(new ActiveMQQueue(queueName));
List<String> ids = new ArrayList<>();
int queueSize = 0;
int counter = 0;
connection.start();
while (true) {
Enumeration enumeration = browser.getEnumeration();
if (queueSize < ids.size()) {
while (enumeration.hasMoreElements()) {
Message message = (Message) enumeration.nextElement();
ids.remove(message.getJMSMessageID());
counter++;
}
if (ids.size() > 0 && ids.size() > 0) {
Iterator<String> iterator = ids.iterator();
while (iterator.hasNext()) {
String messageId = iterator.next();
dao.insert(new Event("Queue", messageId, "UP", browser.getQueue().getQueueName(), new Time(new Date().getTime())));
iterator.remove();
}
}
queueSize = counter;
counter = 0;
} else {
while (enumeration.hasMoreElements()) {
counter++;
Message message = (Message) enumeration.nextElement();
String id = message.getJMSMessageID();
if (!ids.contains(id)) {
ids.add(id);
dao.insert(new Event("Queue", id, "UP", browser.getQueue().getQueueName(), new Time(new Date().getTime())));
}
}
queueSize = counter;
counter = 0;
}
}
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getQueueName() {
return this.queueName;
}
}
这工作还不完善。我觉得这是它一个小的逻辑问题。
This works not perfect yet. I think there is a small logical issue in it.