Java监视ActiveMQ的,但没有投票队列

问题描述:

我需要一些编程,监控的ActiveMQ的队列在java中。这意味着我需要登录时,排队队列的消息,当出队的消息。我的程序不能发送邮件或接收消息,只需要登录。

I need to program something that monitors an activemq queue in java. This means I need to log when a message is enqueued on the queue and when a message is dequeued. My program must not send messages or receives messages, it only needs to log.

我发现将消息传递和接收消息,但是这不是我想做的事情,只要登录如果外部进程对或关闭队列中的消息。

I found out to push messages and receive messages but this is not what I want to do, just log if an external process puts messages on or off the queue.

要更清楚我做了一个绘图

To make it more clear I made a drawing

我使用Apache的骆驼进行整合,
我routebuilder看起来像

I use apache camel to make the integration, my routebuilder looks like

public void configure() throws Exception {
        Processor queueProcessor = new QueueProcessor();

        from("activemq:queue:KBC").process(queueProcessor);
    }

它调用folowwing处理器

it calls the folowwing processor

@Override
    public void process(Exchange exchange) throws Exception {
        Trax_EventDao dao = new Trax_EventDao();
        dao.insert(new Trax_Event("Queue",exchange.getExchangeId(),"UP","KBC", new Time(new Date().getTime())));
    }

在DAO处理数据库连接,使记录的插入

The dao handles a database connection and makes an insert of a record

实际的问题是,当我把在队列中的消息,并在程序运行时,该消息得到了记录这是好的,但也立即得到调查,这是不行的。
我怎样才能让没有消息插入轮询?

The actual problem is that when I push a message on the queue and the program runs, the message got logged which is okay, but it also get polled immediately, which is not okay. How can I make the insert without the message being polled?

我终于做到了在写一个自己的亚军类,它使用一个queuebrowser。

What I finally did was writing an own runner class, which uses a queuebrowser.

我想用这个类来要做的就是

What I wanted to do with this class is


  1. 请与avtivemq的连接,并启动

  2. 请一个无限循环,它控制指定的队列。我对队列中的项目的列表。在每一个循环我检查这个

  3. 如果列表比队列的大小越大,有出列的项目。这意味着我需要循环,并核对哪些项目离队。
    Otherwhise我循环队列的枚举和元素添加到列表中,如果还不存在的话

  1. Make a connection with avtivemq and start it
  2. Make an endless loop, which controls the queue specified. I have a list of the items on the queue. At every loop I check this
  3. if the list is bigger than the size of the queue, there are items dequeued. This means I need to loop this and check which items are dequeued. Otherwhise I loop the enumeration of the queue and add elements to the list if they don't exist yet

package queueFeed;

import dao.ProcmonDao;
import dao.EventDao;
import domain.Event;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

import javax.jms.*;
import java.sql.SQLException;
import java.sql.Time;
import java.util.*;

public class QueueRunner {
    private ProcmonDao dao;
    private Connection connection;
    private String queueName;

    public QueueRunner() throws SQLException {
        dao = new EventDao();
    }

public void setConnection(String username, String password, String url) throws JMSException {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, url);
    connection = factory.createConnection();
}

public void run() throws Exception {
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    QueueBrowser browser = session.createBrowser(new ActiveMQQueue(queueName));

    List<String> ids = new ArrayList<>();
    int queueSize = 0;
    int counter = 0;
    connection.start();

    while (true) {
        Enumeration enumeration = browser.getEnumeration();
        if (queueSize < ids.size()) {
            while (enumeration.hasMoreElements()) {
                Message message = (Message) enumeration.nextElement();
                ids.remove(message.getJMSMessageID());
                counter++;
            }

            if (ids.size() > 0 && ids.size() > 0) {
                Iterator<String> iterator = ids.iterator();
                while (iterator.hasNext()) {
                    String messageId = iterator.next();
                    dao.insert(new Event("Queue", messageId, "UP", browser.getQueue().getQueueName(), new Time(new Date().getTime())));
                    iterator.remove();
                }
            }

            queueSize = counter;
            counter = 0;
        } else {

            while (enumeration.hasMoreElements()) {
                counter++;
                Message message = (Message) enumeration.nextElement();
                String id = message.getJMSMessageID();
                if (!ids.contains(id)) {
                    ids.add(id);
                    dao.insert(new Event("Queue", id, "UP", browser.getQueue().getQueueName(), new Time(new Date().getTime())));
                }
            }
            queueSize = counter;
            counter = 0;
        }
    }
}

public void setQueueName(String queueName) {
    this.queueName = queueName;
}

public String getQueueName() {
    return this.queueName;
}

}

这工作还不完善。我觉得这是它一个小的逻辑问题。

This works not perfect yet. I think there is a small logical issue in it.