JMS的IBM MQ提供程序:如何自动回滚消息?

问题描述:

应用中的工作版本

  • IBM AllClient版本:'com.ibm.mq:com.ibm.mq.allclient:9.1.1.0'
  • org.springframework:spring-jms:4.3.9.RELEASE
  • javax.jms:javax.jms-api:2.0.1

我的要求是,如果消息处理失败(例如,使用者不可用)(例如,数据库不可用),则消息将保留在队列中或放回队列中(如果可能的话) ).这是因为消息的顺序很重要,因此必须按照接收消息的顺序来使用消息. Java应用程序是单线程的.

My requirement is that in case of the failure of a message processing due to say, consumer not being available (eg. DB is unavailable), the message remains in the queue or put back on the queue (if that is even possible). This is because the order of the messages is important, messages have to be consumed in the same order that they are received. The Java app is single-threaded.

我尝试了以下

@Override
public void onMessage(Message message)
{
   try{
      if(message instanceOf Textmessage)
      {
      }
   
      :

      : 
      throw new Exception("Test");// Just to test the retry
    }
    catch(Exception ex)
    {
            try
            {
                int temp = message.getIntProperty("JMSXDeliveryCount");
                throw new RuntimeException("Redlivery attempted ");
                // At this point, I am expecting JMS to put the message back into the queue.
                // But it is actually put into the Bakout queue.
            }
            catch(JMSException ef)
            {
                String temp = ef.getMessage();
            }

    }
}

我已经在spring.xml中为jmsContainer bean进行了设置.

I have set this in my spring.xml for the jmsContainer bean.

    <property name="sessionTransacted" value="true" />

上面的代码有什么问题?

What is wrong with the code above ?

如果将消息放回队列中是不切实际的,那么如何浏览消息,对其进行处理,以及如果成功则提取消息(这样消息就被消耗掉了,不再在队列中了)? IBM JMS提供程序是否支持这种情况?

And if putting the message back in the queue is not practical, how can one browse the message, process it and, if successful, pull the message (so it is consumed and no longer on the queue) ? Is this scenario supported in IBM provider for JMS?

IBM MQ本地队列具有BOTHRESH(1).

The IBM MQ Local queue has BOTHRESH(1).

要保留消息顺序,一种方法可能是作为回滚策略的一部分临时停止消息侦听器.查看 Spring Boot文档对于DefaultMessageListenerContainer,有一种stop(Runnable callback)方法.我已经尝试过在回滚中使用它,如下所示.

To preserve message ordering, one approach might be to stop the message listener temporarily as part of your rollback strategy. Looking at the Spring Boot doc for DefaultMessageListenerContainer there is a stop(Runnable callback) method. I've experimented with using this in a rollback as follows.

为了确保我的侦听器是单线程的,在我的DefaultJmsListenerContainerFactory上设置了containerFactory.setConcurrency("1").

To ensure my Listener is single threaded, on my DefaultJmsListenerContainerFactory I set containerFactory.setConcurrency("1").

在我的侦听器中,设置一个id

In my Listener, I set an id

@JmsListener(destination = "DEV.QUEUE.2", containerFactory = "listenerTwoFactory", concurrency="1", id="listenerTwo")

并检索DefaultMessageListenerContainer实例.

JmsListenerEndpointRegistry reg = context.getBean(JmsListenerEndpointRegistry.class);
DefaultMessageListenerContainer mlc = (DefaultMessageListenerContainer) reg.getListenerContainer("listenerTwo");

为了进行测试,我检查了JMSXDeliveryCount并将异常抛出回滚.

For testing, I check JMSXDeliveryCount and throw an exception to rollback.

retryCount = Integer.parseInt(msg.getStringProperty("JMSXDeliveryCount"));
if (retryCount < 5) {
    throw new Exception("Rollback test "+retryCount);
}

在侦听器的catch处理中,我在DefaultMessageListenerContainer实例上调用stop(Runnable callback),并传入如下定义的新类ContainerTimedRestart.

In the Listener's catch processing, I call stop(Runnable callback) on the DefaultMessageListenerContainer instance and pass in a new class ContainerTimedRestart as defined below.

//catch processing here and decide to rollback
mlc.stop(new ContainerTimedRestart(mlc,delay));
System.out.println("#### "+getClass().getName()+" Unable to process message.");
throw new Exception();

ContainerTimedRestart扩展了Runnable,并且DefaultMessageListenerContainer负责在stop调用完成时调用run()方法.

ContainerTimedRestart extends Runnable and DefaultMessageListenerContainer is responsible for invoking the run() method when the stop call completes.

public class ContainerTimedRestart implements Runnable {

  //Container instance to restart.
  private DefaultMessageListenerContainer theMlc;

  //Default delay before restart in mills.
  private long theDelay = 5000L;

  //Basic constructor for testing.
  public ContainerTimedRestart(DefaultMessageListenerContainer mlc, long delay) {
    theMlc = mlc;
    theDelay = delay;
  }

  public void run(){
    //Validate container instance.

    try {
      System.out.println("#### "+getClass().getName()+"Waiting for "+theDelay+" millis.");
      Thread.sleep(theDelay);
      System.out.println("#### "+getClass().getName()+"Restarting container.");
      theMlc.start();
      System.out.println("#### "+getClass().getName()+"Container started!");
    } catch (InterruptedException ie) {
      ie.printStackTrace();

      //Further checks and ensure container is in correct state.
      //Report errors.
    }
  }

我在队列中加载了三个带有有效负载"a","b"和"c"的消息.分别启动监听器.

I loaded my queue with three messages with payloads "a", "b", and "c" respectively and started the listener.

在我的队列管理器上检查DEV.QUEUE.2,我看到IPPROCS(1)确认只有一个应用程序句柄打开了队列.每次滚动五次后,将按顺序处理这些消息,并且两次回滚尝试之间的延迟为5秒.

Checking DEV.QUEUE.2 on my queue manager I see IPPROCS(1) confirming only one application handle has the queue open. The messages are processed in order after each is rolled five times and with a 5 second delay between rollback attempts.