Storm运行出现Client is being closed, and does not take requests any more唤起的Netty故障跟踪

Storm运行出现Client is being closed, and does not take requests any more引起的Netty故障跟踪
最近升级调试一个storm任务,出了一个很奇怪的internal exception,堆栈如下:
 backtype.storm.daemon.worker - Received invalid messages for unknown tasks. Dropping...
backtype.storm.daemon.worker - Received invalid messages for unknown tasks. Dropping...
backtype.storm.daemon.worker - Received invalid messages for unknown tasks. Dropping... 
2015-01-28 18:25:52 [Thread-11-worker-receiver-thread-0] WARN  backtype.storm.daemon.worker - Received invalid messages for unknown tasks. Dropping...
2015-01-28 18:29:12 [Thread-12-disruptor-worker-transfer-queue] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: Client is being closed, and does not take requests any more
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:127) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:96) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:81) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
        at backtype.storm.disruptor$consume_loop_STAR_$fn__1681.invoke(disruptor.clj:95) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
        at backtype.storm.util$async_loop$fn__457.invoke(util.clj:457) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:662) [na:1.6.0_38]
Caused by: java.lang.RuntimeException: Client is being closed, and does not take requests any more
        at backtype.storm.messaging.netty.Client.send(Client.java:183) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]

        at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
        at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__4610$fn__4611.invoke(worker.clj:330) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
        at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__4610.invoke(worker.clj:328) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
        at backtype.storm.disruptor$clojure_handler$reify__1668.onEvent(disruptor.clj:59) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:124) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
        ... 6 common frames omitted
2015-01-28 18:29:12 [Thread-12-disruptor-worker-transfer-queue] ERROR backtype.storm.util - Halting process: ("Async loop died!")
java.lang.RuntimeException: ("Async loop died!")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:319) [storm-core-0.9.3-rc1.jar:0.9.3-rc1]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
        at backtype.storm.disruptor$consume_loop_STAR_$fn__1679.invoke(disruptor.clj:93) [storm-core-0.9.3-rc1.jar:0.9.3-rc1]
        at backtype.storm.util$async_loop$fn__457.invoke(util.clj:467) [storm-core-0.9.3-rc1.jar:0.9.3-rc1]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:662) [na:1.6.0_38]

运行了一段时间以后(大概一个小时),就会不停地抛出Received invalid messages for unknown tasks. Dropping... 这样的warning,之后便是async loop died,worker会不间断地重启
对应supervisor日志:
2015-02-02 17:46:51 b.s.d.supervisor [INFO] Shutting down and clearing state for id eec730d3-7487-457d-b22c-29a3691d7236. Current supervisor time: 1422870408. State: :timed-out, Heartbeat: #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 1422870376, :storm-id "xxxxtopology-TopologyUAT-412-1422861787", :executors #{[3 3] [136 136] [110 110] [83 83] [55 55] [29 29] [-1 -1]}, :port 6711}

2015-02-02 17:22:14 b.s.d.supervisor [INFO] Shutting down and clearing state for id d940a373-37d8-4dee-b206-0a42be3d22da. Current supervisor time: 1422868934. State: :disallowed, Heartbeat: #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 1422868889, :storm-id "xxxtopology-TopologyUAT-412-1422861787", :executors #{[35 35] [144 144] [18 18] [117 117] [89 89] [63 63] [-1 -1]}, :port 6717}

从日志上只能拿到以上的信息,不管是worker、supervisor的日志都看起来原因不详,一时之间无法定位问题。
1.Netty的接收线程无法正常获取,为什么?
源码:worker.clj
(defn mk-transfer-local-fn [worker]
  (let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker)
        task->short-executor (:task->short-executor worker)
        task-getter (comp #(get task->short-executor %) fast-first)]
    (fn [tuple-batch]
      (let [grouped (fast-group-by task-getter tuple-batch)]
        (fast-map-iter [[short-executor pairs] grouped]
          (let [q (short-executor-receive-queue-map short-executor)]
            (if q  //如果q为空则会报出warn,消息没有对应的接收队列
              (disruptor/publish q pairs)
              (log-warn "Received invalid messages for unknown tasks. Dropping... ")
              )))))))

查看netty配置,如下:
storm.messaging.netty.buffer_size 52428800
storm.messaging.netty.client_worker_threads 1
storm.messaging.netty.flush.check.interval.ms 10
storm.messaging.netty.max_retries 10
storm.messaging.netty.max_wait_ms 10000
storm.messaging.netty.min_wait_ms 5000
storm.messaging.netty.server_worker_threads 1
storm.messaging.netty.transfer.batch.size 262144
storm.messaging.transport backtype.storm.messaging.netty.Context
这个是我们统一的配置,如果有问题的话不应该在配置,message的传输压力?delay?

2.supervisor的timed-out、disallowed更是原因多样了,负载、FGC等都有可能。
任务监控数据:
Storm运行出现Client is being closed, and does not take requests any more唤起的Netty故障跟踪

Topology共设置worker 30个,从任务监控图上能看到, 过几分钟基本所有worker都会重启,且worker的重启事件与GC次数基本无相关性。

从上述两点出发考虑,怀疑问题还是在Netty这边。正好同事在看Storm UI,发现这里有点问题:
Spout的UI显示
Storm运行出现Client is being closed, and does not take requests any more唤起的Netty故障跟踪
Storm运行出现Client is being closed, and does not take requests any more唤起的Netty故障跟踪

这里按道理没有开启acker的应该都是0,而这里的latency太大,查看acker设置:
topology.acker.executors
这里用的是默认值,null。查看了代码发现我们公司封装的上层Bolt里有手动调用了ack(),也就是当你未设置acker的时候,而又调用了ack(),这样这个tuple还是会放到tuple tree进行追踪的。

所以需要下游使用的人手动设置topology.acker.executors=0,这样ack才是关闭的,失败的消息才不会一直重复发射,开启ack对处理性能也是有开销的。
设置为0后:
Storm运行出现Client is being closed, and does not take requests any more唤起的Netty故障跟踪Storm运行出现Client is being closed, and does not take requests any more唤起的Netty故障跟踪Storm运行出现Client is being closed, and does not take requests any more唤起的Netty故障跟踪

在运行了几个小时之后,查看日志没有前文提及的异常日志出现.
总结一下:
1.如果程序未设置acker,但是调用了ack()的话,storm会进行tuple tree的构建;
2.我们消费的是用户日志,本没有必要开启ack,由于1导致的问题,会导致Netty的压力,出现一些消息无法接收的问题(这一块具体的影响,对Netty不是非常熟悉),最终导致worker不断重启

本文为作者原创,转载请标明出处。原作者:Tony_老七,原文链接:http://blog.****.net/tonylee0329/article/details/43488981