动态设立Spring DefaultMessageListenerContainer 的messageSelector

动态设置Spring DefaultMessageListenerContainer 的messageSelector

Spring JMS可以帮助开发人员快速的使用MQ的发送与接收,在异步接收方面,Spring 提供了MessageListenerContainer的容器接收消息。通过研究源码发现DefaultMessageListenerContainer是支持动态改变messageSelector的。在DefaultMessageListenerContainer 中有个cacheLevel的属性默认是4,把它改动到2或1或0,数字分表代表

public static final int CACHE_NONE = 0;
public static final int CACHE_CONNECTION = 1;
public static final int CACHE_SESSION = 2;
public static final int CACHE_CONSUMER = 3;
public static final int CACHE_AUTO = 4;

在设置完cacheLevel后就可以动态设置messageSelector,Container就能用上最新的selector了。

Spring配置如下

<bean id="messageListenerContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="jmsConnectionFactory" />  
        <property name="destination" ref="receiverQueue" />  
        <property name="messageListener" ref="jmsReceiver" />  
        <property name="concurrentConsumers" value="10" />           
        <property name="messageSelector" value="CLIENT='DEMO'" />  
        <property name="cacheLevel" value="2"/>
    </bean> 

 修改messageSelector代码如下

DefaultMessageListenerContainer messageListenerContainer = (DefaultMessageListenerContainer) ac.getBean("messageListenerContainer");
		messageListenerContainer.setMessageSelector("CLIENT='DEMO2'");

 

源码分析:

//DefaultMessageListenerContainer类中, 每次收消息都会call的方法
private boolean invokeListener() throws JMSException {
initResourcesIfNecessary();
boolean messageReceived = DefaultMessageListenerContainer.this.receiveAndExecute(this, this.session, this.consumer);
this.lastMessageSucceeded = true;
return messageReceived;
 }

//这里就是使用cacheLevel的地方,由于需要动态selector,所以需要每次重新生成consumer,//当cacheLevel<3的时候,this.consumer会为null
private void initResourcesIfNecessary() throws JMSException {
       if (DefaultMessageListenerContainer.this.getCacheLevel() <= 1) {
         updateRecoveryMarker();
       }
       else {
         if ((this.session == null) && (DefaultMessageListenerContainer.this.getCacheLevel() >= 2)) {
           updateRecoveryMarker();
           this.session = DefaultMessageListenerContainer.this.createSession(DefaultMessageListenerContainer.this.getSharedConnection());
         }
         if ((this.consumer == null) && (DefaultMessageListenerContainer.this.getCacheLevel() >= 3))
           this.consumer = DefaultMessageListenerContainer.this.createListenerConsumer(this.session);
       }
     }


//在这个方法中可以发现当传入consumer为null时,会生成一个新的consumer
protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, TransactionStatus status)
     throws JMSException
   {
     Connection conToClose = null;
     Session sessionToClose = null;
     MessageConsumer consumerToClose = null;
     try {
       Session sessionToUse = session;
       boolean transactional = false;
       if (sessionToUse == null) {
         sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
           getConnectionFactory(), this.transactionalResourceFactory, true);
         transactional = sessionToUse != null;
       }
       if (sessionToUse == null) {
         Connection conToUse = null;
         if (sharedConnectionEnabled()) {
           conToUse = getSharedConnection();
         }
         else {
           conToUse = createConnection();
           conToClose = conToUse;
           conToUse.start();
         }
         sessionToUse = createSession(conToUse);
         sessionToClose = sessionToUse;
       }
       MessageConsumer consumerToUse = consumer;
       if (consumerToUse == null) {
         consumerToUse = createListenerConsumer(sessionToUse);
         consumerToClose = consumerToUse;
       }
       Message message = receiveMessage(consumerToUse);
       if (message != null) {
         if (this.logger.isDebugEnabled()) {
           this.logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + 
             consumerToUse + "] of " + ((transactional) ? "transactional " : "") + "session [" + 
             sessionToUse + "]");
         }
         messageReceived(invoker, sessionToUse);
         boolean exposeResource = (!(transactional)) && (isExposeListenerSession()) && 
           (!(TransactionSynchronizationManager.hasResource(getConnectionFactory())));
         if (exposeResource)
           TransactionSynchronizationManager.bindResource(
             getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
         try
         {
           doExecuteListener(sessionToUse, message);
         }
         catch (Throwable ex) {
           if (status != null) {
             if (this.logger.isDebugEnabled()) {
               this.logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
             }
             status.setRollbackOnly();
           }
           handleListenerException(ex);
 
           if (ex instanceof JMSException)
             throw ((JMSException)ex);
         }
         finally
         {
           if (exposeResource)
             TransactionSynchronizationManager.unbindResource(getConnectionFactory());
         }
         return true;
       }
 
       if (this.logger.isTraceEnabled()) {
         this.logger.trace("Consumer [" + consumerToUse + "] of " + ((transactional) ? "transactional " : "") + 
           "session [" + sessionToUse + "] did not receive a message");
       }
       noMessageReceived(invoker, sessionToUse);
       return false;
     }
     finally
     {
       JmsUtils.closeMessageConsumer(consumerToClose);
       JmsUtils.closeSession(sessionToClose);
       ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
     }
   }