在Azure Service Bus中并行处理消息
问题:我有很多电子邮件要发送,目前在任何时间队列中平均要发送10封电子邮件.我的代码一次处理一个队列;也就是说,接收消息,对其进行处理并最终发送电子邮件.当用户注册该服务时,这会导致向用户发送电子邮件的时间大大延迟.
Problem: I've got tons of emails to send, presently, an average of 10 emails in the queue at any point in time. The code I have process the queue one at a time; that is, receive the message, process it and eventually send the email. This cause a considerably delay in sending emails to users when they signup for the service.
我已经开始考虑修改代码以异步处理 CTP
并行调用此方法(例如5次).
I've begun to think of modifying the code to process the messages in parrallel
say 5 asynchronously. I'm imagining writing a method and using the CTP
to call this method in parallel, say, 5 times.
我对如何实现这一点有些迷惑.犯错的代价非常高,因为如果事情出错,用户会感到失望.
I'm a little bit lost in how to implement this. The cost of making a mistake is exceedingly great as users will get disappointed if things go wrong.
请求:
我需要编写并行处理Azure服务总线中的消息的代码的帮助.谢谢.
Request:
I need help in writing code that process messages in Azure service bus in parallel.
Thanks.
My code in a nutshell.
Public .. Run()
{
_myQueueClient.BeginReceive(ProcessUrgentEmails, _myQueueClient);
}
void ProcessUrgentEmails(IAsyncResult result)
{
//casted the `result` as a QueueClient
//Used EndReceive on an object of BrokeredMessage
//I processed the message, then called
sendEmail.BeginComplete(ProcessEndComplete, sendEmail);
}
//This method is never called despite having it as callback function above.
void ProcessEndComplete(IAsyncResult result)
{
Trace.WriteLine("ENTERED ProcessEndComplete method...");
var bm = result.AsyncState as BrokeredMessage;
bm.EndComplete(result);
}
此 page 为您提供使用Windows Azure Service Bus时的性能提示.
This page gives you performance tips when using Windows Azure Service Bus.
关于并行处理,您可能有一个线程池用于处理,每当收到一条消息时,您只需抓住其中一个池并为其分配一条消息.您需要管理该池.
About parallel processing, you could have a pool of threads for processing, and every time you get a message, you just grab one of that pool and assign it a message. You need to manage that pool.
或者,您可以一次检索多条消息并使用TPL处理它们...例如,方法 EndReceiveBatch 允许您从队列(异步)中检索多个项目",然后使用"AsParallel"转换先前方法返回的IEnumerable,并在多个线程中处理消息.
OR, you could retrieve multiple messages at once and process them using TPL... for example, the method BeginReceiveBatch/EndReceiveBatch allows you to retrieve multiple "items" from Queue (Async) and then use "AsParallel" to convert the IEnumerable returned by the previous methods and process the messages in multiple threads.
非常简单且裸露的骨头样本:
VERY simple and BARE BONES sample:
var messages = await Task.Factory.FromAsync<IEnumerable<BrokeredMessage>>(Client.BeginReceiveBatch(3, null, null), Client.EndReceiveBatch);
messages.AsParallel().WithDegreeOfParallelism(3).ForAll(item =>
{
ProcessMessage(item);
});
该代码从队列和进程中检索3条消息,然后在"3个线程"中进行处理(注意:不能保证它将使用3个线程,.NET将分析系统资源,并在必要时最多使用3个线程)
That code retrieves 3 messages from queue and processes then in "3 threads" (Note: it is not guaranteed that it will use 3 threads, .NET will analyze the system resources and it will use up to 3 threads if necessary)
您还可以删除" WithDegreeOfParallelism "部分和.NET将使用所需的任何线程.
You could also remove the "WithDegreeOfParallelism" part and .NET will use whatever threads it needs.
最终,有多种方法可以做到这一点,您必须确定哪种方法更适合您.
At the end of the day there are multiple ways to do it, you have to decide which one works better for you.
更新:不使用ASYNC/AWAIT的示例
这是一个使用常规Begin/End Async模式的基本示例(无错误检查).
This is a basic (without error checking) sample using regular Begin/End Async pattern.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.ServiceRuntime;
namespace WorkerRoleWithSBQueue1
{
public class WorkerRole : RoleEntryPoint
{
// The name of your queue
const string QueueName = "QUEUE_NAME";
const int MaxThreads = 3;
// QueueClient is thread-safe. Recommended that you cache
// rather than recreating it on every request
QueueClient Client;
bool IsStopped;
int dequeueRequests = 0;
public override void Run()
{
while (!IsStopped)
{
// Increment Request Counter
Interlocked.Increment(ref dequeueRequests);
Trace.WriteLine(dequeueRequests + " request(s) in progress");
Client.BeginReceive(new TimeSpan(0, 0, 10), ProcessUrgentEmails, Client);
// If we have made too many requests, wait for them to finish before requesting again.
while (dequeueRequests >= MaxThreads && !IsStopped)
{
System.Diagnostics.Trace.WriteLine(dequeueRequests + " requests in progress, waiting before requesting more work");
Thread.Sleep(2000);
}
}
}
void ProcessUrgentEmails(IAsyncResult result)
{
var qc = result.AsyncState as QueueClient;
var sendEmail = qc.EndReceive(result);
// We have received a message or has timeout... either way we decrease our counter
Interlocked.Decrement(ref dequeueRequests);
// If we have a message, process it
if (sendEmail != null)
{
var r = new Random();
// Process the message
Trace.WriteLine("Processing message: " + sendEmail.MessageId);
System.Threading.Thread.Sleep(r.Next(10000));
// Mark it as completed
sendEmail.BeginComplete(ProcessEndComplete, sendEmail);
}
}
void ProcessEndComplete(IAsyncResult result)
{
var bm = result.AsyncState as BrokeredMessage;
bm.EndComplete(result);
Trace.WriteLine("Completed message: " + bm.MessageId);
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// Create the queue if it does not exist already
string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
if (!namespaceManager.QueueExists(QueueName))
{
namespaceManager.CreateQueue(QueueName);
}
// Initialize the connection to Service Bus Queue
Client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
IsStopped = false;
return base.OnStart();
}
public override void OnStop()
{
// Waiting for all requestes to finish (or timeout) before closing
while (dequeueRequests > 0)
{
System.Diagnostics.Trace.WriteLine(dequeueRequests + " request(s), waiting before stopping");
Thread.Sleep(2000);
}
// Close the connection to Service Bus Queue
IsStopped = true;
Client.Close();
base.OnStop();
}
}
}
希望有帮助.