Azure服务总线消息锁是否未续订?

问题描述:

我构建了一项服务来支持Azure Service Bus中的多个队列订阅,但是出现了一些奇怪的行为。

I built a service to support multiple queue subscriptions in Azure Service Bus, but I'm getting some odd behavior.

我的订阅单例类具有一种看起来像像这样:

My subscription singleton class has a method that looks like this:

    public void Subscribe<TMessage>(Func<TMessage, Task> execution, int maxDop = 1, int ttl = 60) where TMessage : IServiceBusMessage
    {
        try
        {
            var messageLifespan = TimeSpan.FromSeconds(ttl);
            var messageType = typeof(TMessage);
            if (!_activeSubscriptionClients.TryGetValue(messageType, out var subscriptionClient))
            {
                subscriptionClient = _subscriptionClientFactory.Create(typeof(TMessage)).GetAwaiter().GetResult();
                if (subscriptionClient.OperationTimeout < messageLifespan) subscriptionClient.OperationTimeout = messageLifespan;
                if (subscriptionClient.ServiceBusConnection.OperationTimeout < messageLifespan)
                    subscriptionClient.ServiceBusConnection.OperationTimeout = messageLifespan;
                _activeSubscriptionClients.AddOrUpdate(messageType, subscriptionClient, (key, value) => value);
            }

            var messageHandlerOptions = new MessageHandlerOptions(OnException)
            {
                MaxConcurrentCalls = maxDop,
                AutoComplete = false,
                MaxAutoRenewDuration = messageLifespan,
            };


            subscriptionClient.RegisterMessageHandler(
                async (azureMessage, cancellationToken) =>
                {
                    try
                    {
                        var textPayload = _encoding.GetString(azureMessage.Body);
                        var message = JsonConvert.DeserializeObject<TMessage>(textPayload);
                        if (message == null)
                            throw new FormatException($"Cannot deserialize the message payload to type '{typeof(TMessage).FullName}'.");
                        await execution.Invoke(message);
                        await subscriptionClient.CompleteAsync(azureMessage.SystemProperties.LockToken);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "ProcessMessagesAsync(Message, CancellationToken)");
                        await subscriptionClient.AbandonAsync(azureMessage.SystemProperties.LockToken);
                    }
                }
                , messageHandlerOptions);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Subscribe(Action<TMessage>)");
            throw;
        }
    }

想法是,您订阅Azure Service Bus特定类型的消息,直接与队列相对应。在您的订阅中,您传递了有关如何处理消息的委托。

The idea is, you subscribe to Azure Service Bus for a specific type of message, and that directly corresponds to a queue. In your subscription, you pass in a delegate for how to process the message.

这似乎奏效了...有一个警告。

This seems to work... with one caveat.

无论我将 ttl 设置为 MaxAutoRenewDuration 还是 OperationTimeout ,对于任何给定消息,在长时间运行的过程中,一分钟后,该消息便从队列中解锁,另一个订户将其拾取并开始处理它。

Regardless of what I set the ttl for the MaxAutoRenewDuration, or the OperationTimeout, on a long-running process for any given message, after a minute the message is unlocked from the queue and another subscriber picks it up and starts processing it.

我的理解是,正是 MaxAutoRenewDuration 应该防止的东西,但是似乎并没有阻止任何东西。

My understanding is that is exactly what the MaxAutoRenewDuration is supposed to prevent... but it doesn't seem to prevent anything.

有人能告诉我我需要做些什么来确保消费者拥有消息直到完成吗?

Can anyone tell me what I need to do differently to make sure the consumer owns the message through to completion?

事实证明,正在运行的使用者的远程进程正在静默失败,并且未返回失败状态代码(或其他任何信息);自动刷新机制挂起等待结果,因此消息最终超时了。

It turns out the remote process that the consumer was running was failing silently and not returning a failure status code (or anything else); the auto-refresh mechanism hung waiting for the result, so the message ended up timing out.

我不清楚如何防止这种情况,但是一旦解决了

I'm not clear on how to prevent that, but once I fixed the issue on the remote process, the problem was no longer reproducible.

故事的寓意:如果一切看起来正确,并且仍在超时,似乎自动刷新机制可以解决问题您正在等待异步操作的一些资源。可能是寻找失败的另一个地方。

Moral of the story: If everything looks right, and it's still timing out, it seems the autorefresh mechanism shares some resources with asynchronous operations you are waiting on. It may be another place to look for failures.