Spring集成错误“没有可用的输出通道或回复通道头"

问题描述:

我不知道为什么会出现异常

I am not sure why I am getting the exception

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available

它只是一个简单的 IntegrationFlow,但不确定我在下面的代码中缺少什么.

Its just a simple IntegrationFlow but not sure what am I missing here in the code below.

  @Bean
  Exchange messageExchange() {
    return ExchangeBuilder
        .directExchange("attr")
        .durable(true)
        .build();
  }

  @Bean
  Queue queue() {
    return QueueBuilder
        .durable("attr_queue")
        .build();
  }

  @Bean
  Binding binding() {
    return BindingBuilder
        .bind(queue())
        .to(messageExchange())
        .with("attr_queue")
        .noargs();
  }

  @Bean
  IntegrationFlow deltaFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Amqp
        .inboundAdapter(connectionFactory, queue()))
        .handle(String.class, (payload, headers) -> {
          if (payload.isEmpty()) {
            log.info("Payload empty");
          } else {
            log.info("Payload : " + payload);
          }
          return payload;
        })
        .get();
  }

我试图接触 Spring Integration,但不确定为什么会出现此异常.我要做的就是使用 inboundAdapter 从队列中读取数据,然后将其记录到控制台.代码运行良好,但是当我向队列发布消息时,出现此异常.使用 Amqp 适配器时是否必须始终指定 replyChanneloutput-channel ?

I was trying to get my hands on Spring Integration and was not sure why I am getting this exception. All I'm trying to do is to read from a queue using an inboundAdapter and just log it to the console. The code runs fine, but when I publish a message to the queue, I get this exception. Do I have to specify a replyChannel or output-channel always when using Amqp adapters?

不,这不是 AMQP 通道适配器问题.请看看你的 handle() - 你在那里返回了一些东西.之后没有任何东西可以处理该退货.那么,应该去哪里回复呢?对,进入 replyChannel 标题.但是等等,没有人,因为没有什么可以等待回复 - 通道适配器是单向组件.

No, that’s not AMQP Channel Adapter problem. Please, look at your handle() - you return something there. And there is nothing afterwards to handle that return. So, where should a reply go? Right, into the replyChannel header. But wait , there is no one because there is nothing to wait for the reply - the Channel Adapter is one-way component.

由于您对回复不做任何处理,并且框架无法从配置阶段假设您不会处理此回复,因此我们只会在运行时遇到该异常.它不能做出这样的假设,因为在 handle() 之前有一个消息通道,所以你可以从其他流发送带有 replyChannel 标头的消息等等.但是!由于这是您的代码并且您完全控制它,您可能会假设没有人会期望从那里得到答复,最好从此时停止流式传输.为此,最好使用单向 MessageHandler - 基于 handle() 变体或只返回 null 而不是 payload代码>.您也可以使用 channel(nullChannel") 来停止流式传输.

Since you do nothing with the reply and the Framework can’t make an assumption from the configuration phase that you are not going to handle this reply, we just have that exception at runtime. It can’t make that assumption because there is a message channel before that handle(), so you may send a message with replyChannel header from some other flow and so on. But! Since this is your code and you fully control it you may have an assumption that nobody is going to expect a reply from there and it would be better to stop streaming from this point. For this purpose it would be better to use one-way MessageHandler - based handle() variant or just return null instead of payload. You also may use channel("nullChannel") to stop streaming.