为多个主题创建多个消费者组
就我而言,我需要多个主题,每个主题都与多个消费者相关联.我想为每个主题设置一个消费者组.我在 kafka
.net 客户端中没有找到任何方法,以便我可以动态创建消费者组并将主题与该消费者组链接.我正在使用 kafka 0.9.0
版本,请告诉我是否需要更改为 kafka server
设置或在 Zookeeper
上?
In my case I need multiple topics with each topic being linked with multiple consumers. I want to set a consumer group for each topic. I did not find any method in kafka
.net client so that I can create consumer group dynamically and link the topic with that consumer group. I am using kafka 0.9.0
version, please tell me if I need to change to kafka server
setting or on Zookeeper
?
我使用 Microsoft .NET kafka 构建了一个快速原型,链接如下.不确定它是否解决了您的问题.
I'm built a quick prototype with Microsoft .NET kafka as link below. not sure it's solving your problem or not.
但是,我强烈建议您使用这个库,因为它包含比 kafka-net 多得多的功能(例如,支持 Zookeeper 维护偏移量、主题组等)
However, I'm hightly recommend you to use this library because it contain much more feature than kafka-net(e.g. supports zookeeper for maintaining offset, topic group, etc.)
https://github.com/Microsoft/CSharpClient-for-Kafka
示例代码
当消费者收到消息时,这将向 kafka 发送 10 条消息并输出消息到控制台.
This will send 10 message to kafka and output message to console when consumer got it.
static void Main(string[] args)
{
Task.Factory.StartNew(() =>
{
ConsumerConfiguration consumerConfig = new ConsumerConfiguration
{
AutoCommit = true,
AutoCommitInterval = 1000,
GroupId = "group1",
ConsumerId = "1",
AutoOffsetReset = OffsetRequest.SmallestTime,
NumberOfTries = 20,
ZooKeeper = new ZooKeeperConfiguration("localhost:2181", 30000, 30000, 2000)
};
var consumer = new ZookeeperConsumerConnector(consumerConfig, true);
var dictionaryMapping = new Dictionary<string, int>();
dictionaryMapping.Add("topic1", 1);
var streams = consumer.CreateMessageStreams(dictionaryMapping, new DefaultDecoder());
var messageStream = streams["topic1"][0];
foreach (var message in messageStream.GetCancellable(new CancellationToken()))
{
Console.WriteLine("Response: P{0},O{1} : {2}", message.PartitionId, message.Offset, Encoding.UTF8.GetString(message.Payload));
//If you set AutoCommit to false, you can commit by yourself from this command.
//consumer.CommitOffsets()
}
});
var brokerConfig = new BrokerConfiguration()
{
BrokerId = 1,
Host = "localhost",
Port = 9092
};
var config = new ProducerConfiguration(new List<BrokerConfiguration> { brokerConfig });
config.CompressionCodec = CompressionCodecs.DefaultCompressionCodec;
config.ProducerRetries = 3;
config.RequiredAcks = -1;
var kafkaProducer = new Producer(config);
byte[] payloadData = Encoding.UTF8.GetBytes("Test Message");
var inputMessage = new Message(payloadData);
var data = new ProducerData<string, Message>("topic1", inputMessage);
for (int i = 0; i < 10; i++)
{
kafkaProducer.Send(data);
}
Console.ReadLine();
}
希望对您有所帮助.