Storm运行出现Client is being closed, and does not take requests any more唤起的Netty故障跟踪
Storm运行出现Client is being closed, and does not take requests any more引起的Netty故障跟踪


这里用的是默认值,null。查看了代码发现我们公司封装的上层Bolt里有手动调用了ack(),也就是当你未设置acker的时候,而又调用了ack(),这样这个tuple还是会放到tuple tree进行追踪的。


最近升级调试一个storm任务,出了一个很奇怪的internal exception,堆栈如下:
backtype.storm.daemon.worker - Received invalid messages for unknown tasks. Dropping...
backtype.storm.daemon.worker - Received invalid messages for unknown tasks. Dropping...
backtype.storm.daemon.worker - Received invalid messages for unknown tasks. Dropping...
2015-01-28 18:25:52 [Thread-11-worker-receiver-thread-0] WARN backtype.storm.daemon.worker - Received invalid messages for unknown tasks. Dropping...
2015-01-28 18:29:12 [Thread-12-disruptor-worker-transfer-queue] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: Client is being closed, and does not take requests any more
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:127) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:96) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:81) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.disruptor$consume_loop_STAR_$fn__1681.invoke(disruptor.clj:95) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:457) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_38]
Caused by: java.lang.RuntimeException: Client is being closed, and does not take requests any more
at backtype.storm.messaging.netty.Client.send(Client.java:183) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__4610$fn__4611.invoke(worker.clj:330) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__4610.invoke(worker.clj:328) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.disruptor$clojure_handler$reify__1668.onEvent(disruptor.clj:59) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:124) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
... 6 common frames omitted
2015-01-28 18:29:12 [Thread-12-disruptor-worker-transfer-queue] ERROR backtype.storm.util - Halting process: ("Async loop died!")
java.lang.RuntimeException: ("Async loop died!")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:319) [storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.disruptor$consume_loop_STAR_$fn__1679.invoke(disruptor.clj:93) [storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:467) [storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_38]
backtype.storm.daemon.worker - Received invalid messages for unknown tasks. Dropping...
backtype.storm.daemon.worker - Received invalid messages for unknown tasks. Dropping...
2015-01-28 18:25:52 [Thread-11-worker-receiver-thread-0] WARN backtype.storm.daemon.worker - Received invalid messages for unknown tasks. Dropping...
2015-01-28 18:29:12 [Thread-12-disruptor-worker-transfer-queue] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: Client is being closed, and does not take requests any more
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:127) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:96) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:81) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.disruptor$consume_loop_STAR_$fn__1681.invoke(disruptor.clj:95) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:457) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_38]
Caused by: java.lang.RuntimeException: Client is being closed, and does not take requests any more
at backtype.storm.messaging.netty.Client.send(Client.java:183) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__4610$fn__4611.invoke(worker.clj:330) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__4610.invoke(worker.clj:328) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.disruptor$clojure_handler$reify__1668.onEvent(disruptor.clj:59) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:124) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]
... 6 common frames omitted
2015-01-28 18:29:12 [Thread-12-disruptor-worker-transfer-queue] ERROR backtype.storm.util - Halting process: ("Async loop died!")
java.lang.RuntimeException: ("Async loop died!")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:319) [storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.disruptor$consume_loop_STAR_$fn__1679.invoke(disruptor.clj:93) [storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:467) [storm-core-0.9.3-rc1.jar:0.9.3-rc1]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_38]
运行了一段时间以后(大概一个小时),就会不停地抛出Received invalid messages for unknown tasks. Dropping... 这样的warning,之后便是async loop died,worker会不间断地重启
对应supervisor日志:
2015-02-02 17:46:51 b.s.d.supervisor [INFO] Shutting down and clearing state for id eec730d3-7487-457d-b22c-29a3691d7236. Current supervisor time: 1422870408. State: :timed-out, Heartbeat: #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs
1422870376, :storm-id "xxxxtopology-TopologyUAT-412-1422861787", :executors #{[3 3] [136 136] [110 110] [83 83] [55 55] [29 29] [-1 -1]}, :port 6711}
2015-02-02 17:22:14 b.s.d.supervisor [INFO] Shutting down and clearing state for id d940a373-37d8-4dee-b206-0a42be3d22da. Current supervisor time: 1422868934. State: :disallowed, Heartbeat: #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 1422868889,
:storm-id "xxxtopology-TopologyUAT-412-1422861787", :executors #{[35 35] [144 144] [18 18] [117 117] [89 89] [63 63] [-1 -1]}, :port 6717}
从日志上只能拿到以上的信息,不管是worker、supervisor的日志都看起来原因不详,一时之间无法定位问题。
1.Netty的接收线程无法正常获取,为什么?
源码:worker.clj
(defn mk-transfer-local-fn [worker]
(let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker)
task->short-executor (:task->short-executor worker)
task-getter (comp #(get task->short-executor %) fast-first)]
(fn [tuple-batch]
(let [grouped (fast-group-by task-getter tuple-batch)]
(fast-map-iter [[short-executor pairs] grouped]
(let [q (short-executor-receive-queue-map short-executor)]
(if q //如果q为空则会报出warn,消息没有对应的接收队列
(disruptor/publish q pairs)
(log-warn "Received invalid messages for unknown tasks. Dropping... ")
)))))))
(defn mk-transfer-local-fn [worker]
(let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker)
task->short-executor (:task->short-executor worker)
task-getter (comp #(get task->short-executor %) fast-first)]
(fn [tuple-batch]
(let [grouped (fast-group-by task-getter tuple-batch)]
(fast-map-iter [[short-executor pairs] grouped]
(let [q (short-executor-receive-queue-map short-executor)]
(if q //如果q为空则会报出warn,消息没有对应的接收队列
(disruptor/publish q pairs)
(log-warn "Received invalid messages for unknown tasks. Dropping... ")
)))))))
查看netty配置,如下:
storm.messaging.netty.buffer_size | 52428800 |
storm.messaging.netty.client_worker_threads | 1 |
storm.messaging.netty.flush.check.interval.ms | 10 |
storm.messaging.netty.max_retries | 10 |
storm.messaging.netty.max_wait_ms | 10000 |
storm.messaging.netty.min_wait_ms | 5000 |
storm.messaging.netty.server_worker_threads | 1 |
storm.messaging.netty.transfer.batch.size | 262144 |
storm.messaging.transport | backtype.storm.messaging.netty.Context |
这个是我们统一的配置,如果有问题的话不应该在配置,message的传输压力?delay?
2.supervisor的timed-out、disallowed更是原因多样了,负载、FGC等都有可能。
任务监控数据:
Topology共设置worker 30个,从任务监控图上能看到, 过几分钟基本所有worker都会重启,且worker的重启事件与GC次数基本无相关性。
从上述两点出发考虑,怀疑问题还是在Netty这边。正好同事在看Storm UI,发现这里有点问题:
Spout的UI显示
这里按道理没有开启acker的应该都是0,而这里的latency太大,查看acker设置:
topology.acker.executors |
所以需要下游使用的人手动设置topology.acker.executors=0,这样ack才是关闭的,失败的消息才不会一直重复发射,开启ack对处理性能也是有开销的。
设置为0后:
在运行了几个小时之后,查看日志没有前文提及的异常日志出现.
总结一下:
1.如果程序未设置acker,但是调用了ack()的话,storm会进行tuple tree的构建;
2.我们消费的是用户日志,本没有必要开启ack,由于1导致的问题,会导致Netty的压力,出现一些消息无法接收的问题(这一块具体的影响,对Netty不是非常熟悉),最终导致worker不断重启
本文为作者原创,转载请标明出处。原作者:Tony_老七,原文链接:http://blog.****.net/tonylee0329/article/details/43488981