如何指定使用 Spring Cloud Stream 向 RabbitMQ 发送消息的超时时间?

问题描述:

我们在发送消息的过程中遇到网络问题,这导致所有线程都处于阻塞状态.我们使用 org.springframework.cloud:spring-cloud-stream:2.0.1.RELEASEorg.springframework:spring-messaging:5.0.8.RELEASE向 RabbitMQ 代理发送消息.绑定接口:

We had an network issue in the middle of sending a message and this was causing all the threads to be in Blocked state. We're using org.springframework.cloud:spring-cloud-stream:2.0.1.RELEASE and org.springframework:spring-messaging:5.0.8.RELEASE for sending message to RabbitMQ broker. Binding interface:

interface MessagingSource {
    @Output("bindingTargetName")
    fun messageChannelOutput(): MessageChannel
}

用法:

 val isSent = messageSource.messageChannelOutput().send(message)

MessageChannel#send(Message, long) 方法也有以毫秒为单位的超时时间作为第二个参数,但它在 org.springframework.integration.channel.AbstractSubscribableChannel#doSend 方法:

MessageChannel#send(Message, long) method also has the timeout in milliseconds as the second parameter, but it further ignored in org.springframework.integration.channel.AbstractSubscribableChannel#doSend method:

@Override
protected boolean doSend(Message<?> message, long timeout) { // timeout is ignored in this method
    try {
        return getRequiredDispatcher().dispatch(message);
    }
    catch (MessageDispatchingException e) {
        String description = e.getMessage() + " for channel '" + this.getFullChannelName() + "'.";
        throw new MessageDeliveryException(message, description, e);
    }
}

你能解释为什么超时参数被忽略以及我如何配置它以避免长时间阻塞状态吗?

Can you explain why the timeout parameter is ignored and how i can configure it to avoid long blocking state?

谢谢!

通道 sendTimeout 仅适用于通道本身可以阻塞的情况,例如一个 QueueChannel 具有当前已满的有界队列;调用者将阻塞,直到队列中有可用空间或发生超时.

The channel sendTimeout only applies if the channel itself can block, e.g. a QueueChannel with a bounded queue that is currently full; the caller will block until either space becomes available in the queue, or the timeout occurs.

在这种情况下,块位于通道的下游,因此 sendTimeout 无关紧要(无论如何,它是一个无论如何都无法阻止的 DirectChannel,订阅的处理程序直接在调用线程上调用).

In this case, the block is downstream of the channel so the sendTimeout is irrelevant (in any case, it's a DirectChannel which can't block anyway, the subscribed handler is called directly on the calling thread).

你看到的实际阻塞很可能是在rabbitmq客户端的socket.write()中,它没有超时并且不可中断;调用线程无法执行任何操作来超时"写入.

The actual blocking you are seeing is most likely in the socket.write() in the rabbitmq client, which does not have a timeout and is not interruptible; there is nothing that can be done by the calling thread to "time out" the write.

我知道的唯一可能的解决方案是通过在连接工厂上调用 resetConnection() 来强制关闭兔子连接.

The only possible solution I am aware of is to force close the rabbit connection by calling resetConnection() on the connection factory.