WSO2ESB 4.8.1 PassThroughHttpSender在出现异常时不将线程返回到工作池

WSO2ESB 4.8.1 PassThroughHttpSender在出现异常时不将线程返回到工作池

问题描述:

我认为org.apache.synapse.transport.passthru.PassThroughHttpSender是wso2esb 4.8.1中HTTP的默认传输发件人(不确定4.9.0以后会检查它)不会将借来的线程返回到工作池上在某些情况下例外.

I think that org.apache.synapse.transport.passthru.PassThroughHttpSender which is default transport sender for http in wso2esb 4.8.1 (not sure about 4.9.0 will check it later) does not return borrowed thread to worker pool on exception in some cases.

在我看来,这是在外部序列中发生异常时发生的(不是在直接接受传入请求的代理中发生).例如,当您使用存储和消息处理器实现存储转发模式时,就会发生这种情况.

It seems to me that this happens when exception occurs in external sequence (not in proxy which is directly accepts incoming request). For example, it occurs when you implement store-and-forward pattern with store and message processor.

这是我简单的WSO2ESB 4.8.1配置来重现该问题:

this is my simple WSO2ESB 4.8.1 configuration to reproduce the issue:

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://ws.apache.org/ns/synapse">
   <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
      <parameter name="cachableDuration">15000</parameter>
   </registry>
   <import name="fileconnector"
           package="org.wso2.carbon.connector"
           status="enabled"/>
   <proxy name="ProxyTest"
          transports="http https"
          startOnLoad="true"
          trace="disable">
      <target>
         <inSequence>
            <property name="FORCE_SC_ACCEPTED"
                      value="true"
                      scope="axis2"
                      type="STRING"/>
            <log level="custom">
               <property name="text" value="store message"/>
            </log>
            <store messageStore="TestXMS"/>
            <log level="custom">
               <property name="text" value="message stored"/>
            </log>
         </inSequence>
         <outSequence/>
         <faultSequence/>
      </target>
   </proxy>
   <localEntry key="ESBInstance">Test<description/>
   </localEntry>
   <endpoint name="HTTPEndpoint">
      <http method="post" uri-template="http://localhost/index.php">
         <timeout>
            <duration>10</duration>
            <responseAction>fault</responseAction>
         </timeout>
         <suspendOnFailure>
            <errorCodes>-1</errorCodes>
            <initialDuration>0</initialDuration>
            <progressionFactor>1.0</progressionFactor>
            <maximumDuration>0</maximumDuration>
         </suspendOnFailure>
         <markForSuspension>
            <errorCodes>-1</errorCodes>
         </markForSuspension>
      </http>
   </endpoint>
   <sequence name="fault">
      <log level="full">
         <property name="MESSAGE" value="Executing default 'fault' sequence"/>
         <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
         <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
      </log>
      <drop/>
   </sequence>
   <sequence name="TestSequence">
      <log level="full">
         <property name="text" value="message recieved"/>
      </log>
      <call>
         <endpoint key="HTTPEndpoint"/>
      </call>
      <log level="full">
         <property name="text" value="message processed"/>
      </log>
   </sequence>
   <sequence name="main">
      <in>
         <log level="full"/>
         <filter source="get-property('To')" regex="http://localhost:9000.*">
            <send/>
         </filter>
      </in>
      <out>
         <send/>
      </out>
      <description>The main sequence for the message mediation</description>
   </sequence>
   <messageStore name="TestXMS"/>
   <messageProcessor class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor"
                     name="TestMP"
                     messageStore="TestXMS">
      <parameter name="interval">1000</parameter>
      <parameter name="sequence">TestSequence</parameter>
      <parameter name="concurrency">1</parameter>
      <parameter name="is.active">true</parameter>
   </messageProcessor>
</definitions>

配置端点以将请求发送到任何地方,然后仅调用TestProxy 20次或更长时间以耗尽工作线程的内部池...

configure endpoint to send request to nowhere and just call TestProxy 20 time or more in order to exhaust internal pool of worker threads...

在收到第20个请求后,新请求将被接受并存储在存储中,但工作池将耗尽,并且不会从消息存储中重试消息.

after 20th request is received, new requests will be accepted and stored in store but worker pool will be exhausted and messages will not be retried from message store.

我认为PassThroughHttpSender必须有可用的源代码...但是反编译存储库/组件/插件/synapse-nhttp-transport_2.1.2.wso2v4.jar的速度更快,并且可以查看内部原因.问题.

I think that there must be source code somewhere available for PassThroughHttpSender... but it was quicker to decompile repository/components/plugins/synapse-nhttp-transport_2.1.2.wso2v4.jar and look inside to see the reason of the problem.

我们应该查看sendRequestContent方法内部

we should look inside sendRequestContent method:

synchronized (msgContext)
      {
        while ((!Boolean.TRUE.equals(msgContext.getProperty("WAIT_BUILDER_IN_STREAM_COMPLETE"))) && (!Boolean.TRUE.equals(msgContext.getProperty("PASSTHRU_CONNECT_ERROR")))) {
          try
          {
            log.info("msgContext before wait");
            msgContext.wait();
            log.info("msgContext after wait");
          }
          catch (InterruptedException e)
          {
            e.printStackTrace();
          }
        }

这是进程堆栈出错的地方.当发生异常(在另一个线程中)时,它忘记"通知当前线程,而msgContext永远等待(实际上直到服务器重新启动)

this is where the process stack on error. When exception occurs (in another thread) it "forgets" to notify current thread and msgContext waits forever (actually till server restart)

所以我稍微修改了同一包中的另一个类-DeliveryAgent,方法为errorConnecting.此方法用于从用于连接到目标主机的线程中捕获回调...因此,当我们捕获回调时,我们通知msgContext并通过在targetErrorHandler之后添加新的syncyize块来通知其继续...

so I slightly modify another class in the same package - DeliveryAgent, method errorConnecting. this method is used to catch a callback from a thread used to connect to a target host... so when we catch a callback we notify msgContext and inform it to continue by adding new synchronize block after targetErrorHandler...

public void errorConnecting(HttpRoute route, int errorCode, String message)
  {
    Queue<MessageContext> queue = (Queue)this.waitingMessages.get(route);
    if (queue != null)
    {
      MessageContext msgCtx = (MessageContext)queue.poll();
      if (msgCtx != null) {
        this.targetErrorHandler.handleError(msgCtx, errorCode, "Error connecting to the back end", null, ProtocolState.REQUEST_READY);
        synchronized (msgCtx)
        {
            log.info("errorConnecting: notify message context about error");
            msgCtx.setProperty("PASSTHRU_CONNECT_ERROR", Boolean.TRUE);
            msgCtx.notifyAll();
        }
      }
    }
    else
    {
      throw new IllegalStateException("Queue cannot be null for: " + route);
    }
  }

我已经做了一些测试,看起来它解决了死"线程的问题.但是,我不确定这是否是正确的解决方案……欢迎任何建议.

I have done some test and it looks like it fixes the problem of "dead" threads. However, I am not sure if it is a proper fix or not... any suggestions are welcomed..

链接到反编译和修改的源文件- src.zip

link to decompiled and modified source files - src.zip

我找出了问题的原因.如果后端应用程序接受请求并且不向ESB回复任何内容,则会发生这种情况.结果,连接因超时而关闭.在这种情况下,ESB工作线程不会返回到线程池.

I figured out the reason of the problem. It happens if backend application accepts request and does not reply anything to ESB. As aresult, connection is closed by timeout. In this case, ESB worker thread is not returned to the thread pool.

要解决此问题,需要修补org.apache.synapse.transport.passthru.TargetErrorHandler Java类

To overcome this problem it is needed to patch org.apache.synapse.transport.passthru.TargetErrorHandler java class

synchronized (mc){ 
   log.info("notify message context about error");
   mc.setProperty("PASSTHRU_CONNECT_ERROR", Boolean.TRUE); 
   mc.notifyAll(); 
}

将其放在catch块之前.

put it right before catch block.

它将适当地将消息上下文设置为有错误并通知所有等待对象,这又使ESB可以返回线程池中的请求处理线程.所有这些对于4.8.1都是正确的.

It will properly set message context as having error and notify all waiting objects, what in turn allows ESB to return request processing thread in thread pool. All this is true for 4.8.1.

我已将其修补并投入生产环境,该环境在1月(2016年)每天处理数百万个请求.自那时以来没有任何问题

I have patched and put it to the production environment which processes several million requests per day in January (2016). There have been no issues since that time