如何使用Spring Integration处理来自AWS SQS FiFo队列的10条以上并发消息

问题描述:

我希望能够使用Spring Integration Workflow一次处理10条以上的SQS消息.

I want to be able to process more than 10 SQS messages at a time using a Spring Integration Workflow.

从这个问题出发,建议使用 ExecutorChannel .我更新了代码,但仍然有相同的症状.

From this question, the recommendation was to use an ExecutorChannel. I updated my code but still have the same symptoms.

如何执行Spring多个线程中的集成流程以并行使用更多的Amazon SQS队列消息?

进行此更新后,我的应用程序将请求10条消息,并对它们进行处理,并且仅当我在流程结束时调用 amazonSQSClient.deleteMessage 后,它才会接受来自SQS的另外10条消息队列.

After making this update, my application requests 10 messages, processes those, and only after I make the call to amazonSQSClient.deleteMessage near the end of the flow will it accept another 10 messages from the SQS queue.

应用程序使用SQS FiFo队列.

The application uses an SQS FiFo queue.

还有其他我想念的东西吗?或者这是使用 SqsMessageDeletionPolicy.NEVER 然后在流程结束时删除消息的不可避免的症状吗?由于其他限制,在流程开始时接受消息并不是真正的选择.

Is there something else I'm missing, or is this an unavoidable symptom of using SqsMessageDeletionPolicy.NEVER and then deleting the messages at the end of the flow? Accepting the messages at the beginning of the flow isn't really an option due to other constraints.

以下是相关的代码片段,并进行了一些简化,但我希望它能表达问题.

Here are the relevant snippets of code, with some simplifications, but I hope it expresses the problem.

队列配置

@Bean
public AsyncTaskExecutor inputChannelTaskExecutor() {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
    executor.setConcurrencyLimit(50);
    return executor;
}

@Bean
@Qualifier("inputChannel")
public ExecutorChannel inputChannel() {
    return new ExecutorChannel(inputChannelTaskExecutor());
}

我还尝试了ThreadPoolTask​​Executor而不是SimpleAsyncTaskExecutor,结果相同,但如果它提供其他见解,我也将其包括在内.

I also tried a ThreadPoolTaskExecutor instead of the SimpleAsyncTaskExecutor, with the same result but I'll include that too, in case it offers other insight.

    @Bean
    public AsyncTaskExecutor inputChannelTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("spring-async-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.afterPropertiesSet();
        executor.initialize();
        return executor;
    }

SQS通道适配器

@Bean
public SqsMessageDrivenChannelAdapter changeQueueMessageAdapter() {
    SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSQSClient, changeQueue);
    adapter.setOutputChannel(inputChannel);
    adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
    return adapter;
}


@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(500, TimeUnit.MILLISECONDS).maxMessagesPerPoll(10);
}

简化的主流程

对我们来说,常见的情况是在短时间内获得许多分支编辑.此流程仅关心"至少发生了一次编辑. messageTransformer 从有效内容文档中提取一个ID,并将其放入标头 dsp_docId 中,然后我们将其用于汇总(我们在其他一些地方使用了该ID,因此我们觉得标头有意义,而不是在自定义聚合器中完成所有工作).

A common scenario for us is to get a number of Branch edits in a short period of time. This flow only 'cares' that at least one edit has happened. The messageTransformer extracts an id from the payload document and puts it in the header dsp_docId which we then use to aggregate on (we use this id in a few other places, so we felt a header made sense rather than doing all the work in a custom aggregator).

provisioningServiceActivator 检索分支的最新版本,然后路由器决定是否需要进一步的转换(在这种情况下,它将其发送到 transformBranchChannel )或发送到我们的PI实例上(通过sendToPiChannel).

The provisioningServiceActivator retrieves the latest version of the Branch, then the router decides whether it needs further transforms (in which case it sends it to the transformBranchChannel) or it can be sent onto our PI instance (via the sendToPiChannel).

转换流程(未显示,我认为您不需要它)最终导致发送到PI流程,首先要做更多工作.

The transform flow (not shown, I don't think you need it) leads to the sent to PI flow eventually, it just does more work first.

listingGroupProcessor 捕获所有 aws_receiptHandle 标头,并将它们添加为新标头.分隔列表.

The listingGroupProcessor captures all the aws_receiptHandle headers and adds them to a new header as a | separated list.

sendToPi流(和errorFlow)以对自定义处理程序的调用结束,该处理程序负责删除aws_receiptHandle字符串列表所引用的所有SQS消息.

The sendToPi flow (and the errorFlow) ends with a call to a custom handler that takes care of deleting all the SQS messages referred to by that list of aws_receiptHandle strings.

@Bean
IntegrationFlow sqsListener() {
    return IntegrationFlows.from(inputChannel)
                           .transform(messageTransformer)
                           .aggregate(a -> a.correlationExpression("1")
                                            .outputProcessor(listingGroupProcessor)
                                            .autoStartup(true)
                                            .correlationStrategy(message -> message.getHeaders().get("dsp_docId"))
                                            .groupTimeout(messageAggregateTimeout)  // currently 25s
                                            .expireGroupsUponCompletion(true)
                                            .sendPartialResultOnExpiry(true)
                                            .get())

                           .handle(provisioningServiceActivator, "handleStandard")
                           .route(Branch.class, branch -> (branch.isSuppressed() == null || !branch.isSuppressed()),
                                  routerSpec -> routerSpec.channelMapping(true, "transformBranchChannel")
                                                          .resolutionRequired(false)
                                                          .defaultOutputToParentFlow())

                           .channel(sendtoPiChannel)
                           .get();
}

我认为我将其发布为答案,因为这可以解决我的问题,并可能对其他人有所帮助.作为答案,它更有可能被发现,而不是对可能被忽略的原始问题进行编辑.

首先,我应该注意到我们正在使用 FiFo 队列.

Firstly, I should have noted that we're using a FiFo queue.

问题实际上在整个链上,我们将 MessageGroupId 设置为一个描述数据源的简单值.这意味着我们有非常大的消息组.

The issue was actually further up the chain, where we were setting the MessageGroupId to a simple value that described the source of the data. This meant we had very large message groups.

ReceiveMessage 文档中,您可以看到它在这种情况下,很明智地阻止了您从该组中请求更多消息,因为如果需要将消息放回到队列中,就不可能保证订单的顺序.

From the ReceiveMessage documentation you can see that it quite sensibly stops you requesting more messages from that group in this scenario, as it would be impossible to guarantee the order should a message need to be put back on the queue.

更新发布消息的代码以设置适当的 MessageGroupId ,然后表示 ExecutorChannel 可以按预期工作.

Updating the code that posts the message to set an appropriate MessageGroupId then meant that the ExecutorChannel worked as expected.

虽然具有特定MessageGroupId的消息是不可见的,但直到可见性超时到期,才不会再返回属于同一MessageGroupId的其他消息.只要仍然可见,您仍然可以接收带有另一个MessageGroupId的消息.

While messages with a particular MessageGroupId are invisible, no more messages belonging to the same MessageGroupId are returned until the visibility timeout expires. You can still receive messages with another MessageGroupId as long as it is also visible.