弹簧集成单元测试出站通道适配器

弹簧集成单元测试出站通道适配器

问题描述:

我具有以下配置

<int:channel id="notificationChannel" datatype="com.mycompany.integration.NotificationMessage">
        <int:queue message-store="jdbc-message-store" capacity="1000" />
    </int:channel>

    <int:outbound-channel-adapter ref="notificationHandler"
        method="handle" channel="notificationChannel" >
        <int:poller max-messages-per-poll="100" fixed-delay="60000"
            time-unit="MILLISECONDS" >
            <int:transactional isolation="DEFAULT" />
        </int:poller>
    </int:outbound-channel-adapter>

现在我要对此进行单元测试,我需要等待消息在测试中正确处理,我使用拦截器对其进行了尝试,但这不起作用,因为我只能在消息传递时进行同步,但无法成功进行同步邮件的处理.实现在处理完成后发送回复,但这意味着将仅执行此操作以使我的单元测试工作,而在生产环境中,消息头中将没有设置ReplyChannel.如何在不成功在messageHandler中实现请求的情况下实现成功处理请求的同步?

now i want to unit-test this, i need to wait for the message being processed correctly in the test, i tried it with an interceptor but that doesn't work because i could only sync on message delivery but not on successful processing of the message. implement sending a reply when the procesing is done but this would mean that would implement this only to make my unit-test work, in production there wouldn't be a replyChannel set in the message-header. how can i realize syncing on successful processing of the request without implementing it in the messageHandler?

如果您使用的是Spring Integration 2.2.x,则可以通过建议来做到这一点...

If you are using Spring Integration 2.2.x, you can do this with an advice...

public class CompletionAdvice extends AbstractRequestHandlerAdvice {

    private final CountDownLatch latch = new CountDownLatch(1);

    @Override
    protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
        Object result = callback.execute();
        latch.countDown();
        return result;
    }

    public CountDownLatch getLatch() {
        return latch;
    }

}

在测试环境中,使用Bean工厂后处理器将建议添加到适配器的处理程序中.

In your test environment, add the advice to the adapter's handler with a bean factory post processor.

public class AddCompletionAdvice implements BeanFactoryPostProcessor {

    private final Collection<String> handlers;

    private final Collection<String> replyProducingHandlers;

    public AddCompletionAdvice(Collection<String> handlers, Collection<String> replyProducingHandlers) {
        this.handlers = handlers;
        this.replyProducingHandlers = replyProducingHandlers;
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        for (String beanName : handlers) {
            defineAdviceAndInject(beanFactory, beanName, beanName + "CompletionAdvice");
        }
        for (String beanName : replyProducingHandlers) {
            String handlerBeanName = beanFactory.getAliases(beanName + ".handler")[0];
            defineAdviceAndInject(beanFactory, handlerBeanName, beanName + "CompletionAdvice");
        }
    }

    private void defineAdviceAndInject(ConfigurableListableBeanFactory beanFactory, String beanName, String adviceBeanName) {
        BeanDefinition serviceHandler = beanFactory.getBeanDefinition(beanName);
        BeanDefinition advice = new RootBeanDefinition(CompletionAdvice.class);
        ((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(adviceBeanName, advice);
        serviceHandler.getPropertyValues().add("adviceChain", new RuntimeBeanReference(adviceBeanName));
    }

}

将后处理器添加到配置<bean class="foo.AddCompletionAdvice" />.

Add the post processor to the config <bean class="foo.AddCompletionAdvice" />.

最后,将建议注入您的测试用例中

Finally, inject the advice(s) into your test case

@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class TestAdvice {

    @Autowired
    private CompletionAdvice fooCompletionAdvice;

    @Autowired
    private CompletionAdvice barCompletionAdvice;

    @Autowired
    private MessageChannel input;

    @Test
    public void test() throws Exception {
        Message<?> message = new GenericMessage<String>("Hello, world!");
        input.send(message);
        assertTrue(fooCompletionAdvice.getLatch().await(1, TimeUnit.SECONDS));
        assertTrue(barCompletionAdvice.getLatch().await(1, TimeUnit.SECONDS));
    }

}

并等待闩锁.

<int:publish-subscribe-channel id="input"/>

<int:outbound-channel-adapter id="foo" channel="input" ref="x" method="handle"/>

<int:service-activator id="bar" input-channel="input" ref="x"/>

<bean class="foo.AddCompletionAdvice">
    <constructor-arg name="handlers">
        <list>
            <value>foo</value>
        </list>
    </constructor-arg>
    <constructor-arg name="replyProducingHandlers">
        <list>
            <value>bar</value>
        </list>
    </constructor-arg>
</bean>

<bean id="x" class="foo.Foo" />

我将这些类添加到要点

已更新,为最终消费者(无答复)和产生答复的消费者提供一般情况.

Updated to provide a general case for ultimate consumers (no reply) and reply producing consumers.