带有akka集群的akka流
我的akka流继续学习。我想将我的akka-streams应用程序与 akka集成在一起-cluster和DistributedPubSubMediator 。
My akka-streams learn-o-thon continues. I'd like to integrate my akka-streams application with akka-cluster and DistributedPubSubMediator.
添加对Publish的支持相当简单,但是我遇到的订阅部分遇到了麻烦。
Adding support for Publish is fairly straight forward, but the Subscribe part I'm having trouble with.
供参考,类型安全的示例:
class ChatClient(name: String) extends Actor {
val mediator = DistributedPubSub(context.system).mediator
mediator ! Subscribe("some topic", self)
def receive = {
case ChatClient.Message(from, text) =>
...process message...
}
}
我的问题是,如何将这个参与者与自己的流程整合在一起?如何在没有流背压的情况下确保发布消息?
My question is, how should I integrate this actor with my flow, and how should I ensure I'm getting publish messages in the absence of stream backpressure?
我正在尝试建立一个pubsub模型,其中一个流可以发布一条消息,而另一个流则可以使用它(如果已订阅)。
I'm trying to accomplish a pubsub model where one stream may publish a message and another stream would consume it (if subscribed).
您可能希望使Actor扩展ActorPublisher。然后,您可以从中创建一个Source并将其集成到您的流中。
You probably want to make your Actor extend ActorPublisher. Then you can create a Source from it and integrate that into your stream.
在此处查看ActorPublisher上的文档: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0 .3 / scala / stream-integrations.html
See the docs on ActorPublisher here: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-integrations.html