在Storm中延迟执行队列– Kafka,Cassandra,Redis或Beanstalk?

问题描述:

我有一个风暴拓扑,可以处理来自Kafka的消息,并根据手头的任务在Cassandra中进行HTTP调用/保存.我会尽快处理这些消息.由于来自外部源(例如HTTP)的响应,有几条消息没有得到完全处理.我想为重试的情况实现指数补偿机制,以防HTTP服务器在一段时间后不响应/返回错误消息以重试.我想不出什么主意就能实现它们.我想知道如果还有其他可以容错的解决方案,那么其中哪个将是更好的解决方案.由于此方法用于实现指数补偿,因此每条消息将具有不同的延迟时间.

I have a storm topology to process messages from Kafka and make HTTP call / saves in Cassandra based on the task in hand. I process the messages as soon as they come. How ever few messages are not processed completely due to the response form external sources such as an HTTP. I would like to implement a exponential backoff mechanism for retrial in-case HTTP server does not respond/returns an error message to retry after some time. I could think of few ideas using which I could achieve them. I would like to know which of them will be a better solution also if there is any other solution that I can use which is fault tolerant. Since this is used to implement an exponential backoff each message will have a different delay time.

  • Kafka 中将其发送给另一个主题,稍后再使用. 我的首选解决方案.我知道我们可以使用Kafka偏移量,以便在后期使用消息.我怎么找不到文档/示例代码来做同样的事情.如果有人可以帮助我,这将真的很有帮助.
  • 编写消息 Cassandra/Redis 并编写调度程序以获取未处理并准备使用的消息,并将其发送给Kafka,以便我的风暴拓扑可以使用它. (其他遗留项目(Non Storm)中的现有解决方案)
  • 有延迟地发送到 Beanstalk (其他遗留项目中的现有解决方案(非Storm).我想避免使用此解决方案,仅在无法使用的情况下才使用它).
  • Send it another topic in Kafka which is consumed later. My preferred Solution. I know we can use Kafka offset so consume the message at a latter stage. How ever I could not find documentation/Sample code to do the same. It will be really helpful if any one can help me out with this.
  • Write the message Cassandra / Redis and write a scheduler to fetch the messages which are not processed and are ready to be consumed and Send it to Kafka so that my storm topology can consume it. (Existing solution in other legacy project(Non Storm))
  • Send to Beanstalk with Delay (Existing solution in other legacy project(Non Storm). How ever I would like to avoid using this solution and use it only in case I am out of option).

这几乎是我想要做的.如 Kafka-延迟所述,我找不到实现delayProcessingUntil的文档.使用高级使用者实现队列

While this is pretty much what I would like to do. I am not able to find documentation to implement delayProcessingUntil as mentioned in Kafka - Delayed Queue implementation using high level consumer

过去,我已经从数据存储中完成了预定的工作,并使用Beanstalk进行了延迟,但是我更喜欢使用Kafka.

I have done scheduled job from Data-store and delay using Beanstalk in the past, but I would prefer to use Kafka.

Kafka喷口内置了指数补偿消息重试功能.您可以通过喷嘴配置来配置初始延迟,延迟乘数和最大延迟.如果螺栓有错误,则可以调用collector.fail(input).之后,您只需将其喷出即可重试.

Kafka spout has an exponential backoff message retry built-in. You can configure initial delay, delay multiplier and maximum delay through spout configuration. If there is an error in the bolt, you can call collector.fail(input). After that you just leave it to spout to do the retry.

https://github.com/apache/storm/blob/v0.10.0/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java