有没有办法从Netty中的channel.write()返回自定义承诺?
我目前正在努力实现隐私保护数据挖掘算法.对于不同方之间的交流,我使用的是 Netty 4.0 .双方之间的通信流程如下:
I'm currently working on implementing a privacy preserving data mining algorithm. For the communication part between the different parties I'm using Netty 4.0. The communication flow between the parties looks like this:
-- multiplicationMsg --> ... -- multiplicationMsg -->
P_{1} P_{N}
<-- multiplicationMsg -- ... <-- multiplicationMsg --
其中,P_{1}
是发起并控制整个计算的主控方.安全多方乘法的逻辑位于Netty ChannelHandler
中.还有另一种安全添加协议.
where P_{1}
is the master party that initiates and controls the whole computation. The logic for the secure multi-party multiplication is located in Netty ChannelHandler
s. There is also another protocol for secure addition.
目前,我使用类似的解决方案,例如此核心团队的Norman Maurer展示了a>,以通知子协议计算是否已完成.但这有点像与框架作斗争.
At the moment I use a similar solution like this, shown by Norman Maurer from Netty core team, to get informed if a sub-protocol computation has finished. But that feels a bit like fighting against the framework.
是否有一种方法可以从channel.write(msg)
获得自定义承诺,该承诺将在ChannelPipeline
中创建并实现?在上面的示例中,当multiplicationMsg
返回到P_{1}
时应实现该目标.
Is there a way to get a custom promise from channel.write(msg)
, that will be created and fulfilled in the ChannelPipeline
? In my example above, it should be fulfilled when multiplicationMsg
arrives back at P_{1}
.
编辑1
这是我通常从ChannelPipeline
外部编写消息的方式:
This is what I normally do to write a message from outside of the ChannelPipeline
:
ChannelFuture f = channel.write(msg);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
//do something with the future
}
});
如果可以将数据写入套接字或发生故障,则将满足上述示例中的ChannelFuture
f
.但是除了ChannelFuture
之外,我还需要一种方法来获取自定义Future
,就像这样:
The ChannelFuture
f
from the example above will be fulfilled, if the data could be written to the socket or if a failure occurs. But I need a way do get back a custom Future
in addition to the ChannelFuture
, somehow like:
ChannelFuture f = channel.write(msg);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// I need something like the following
if(future.isSuccess()) {
Future myFuture = future.getMyFuture();
}
}
});
有很多方法可以做到,这是一个基于netty的示例:
There are many ways to do it, here's one example building on top of netty:
在管道外部,使用包含ChannelFuture
(来自连接初始化)的类(例如,IoClient
)内的公共方法发送消息.该方法将如下所示:
From outside the pipeline, send the message using a public method inside a class (let's say, IoClient
) containing the ChannelFuture
(from the connection initialization). The method would look something like this:
public MyCustomFuture send(String msg) {
MyCustomFuture responseFuture = new MyCustomFuture();
channelFuture.channel().pipeline().get(MyAppClientHandler.class).setResponseFuture(responseFuture);
channelFuture.channel().writeAndFlush(msg);
return responseFuture;
}
MyCustomFuture
是我们创建的用于实现netty的Future
接口的自定义类,因此它的实例将代理我们的消息. MyAppClientHandler
是要兑现承诺的净额管道(在responseFuture
中),.setResponseFuture(...)
将代理添加到管道中.
MyCustomFuture
is the custom class we create implementing netty's Future
interface, so it's instance will proxy our message. MyAppClientHandler
is the netty pipe to be fulfilling the promise (in responseFuture
), and .setResponseFuture(...)
adds the proxy to the pipe.
取决于通道的初始化,channelFuture.channel()
可能仍然是null
,为我们提供了NullPointerException
.因此,我们需要更改上面的代码以从回调内部插入代理:
Depending on the initialization of the channel, channelFuture.channel()
might still be null
, giving us a NullPointerException
. So we need to change the code above to insert the proxy from inside a callback:
public MyCustomFuture send(final String msg) {
final MyCustomFuture responseFuture = new MyCustomFuture();
channelFuture.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
channelFuture.channel().pipeline()
.get(MyAppClientHandler.class).setResponseFuture(responseFuture);
channelFuture.channel().writeAndFlush(msg);
}
});
return responseFuture;
}
关于MyCustomFuture
的另一件事是,它将需要一个setter方法:
One more thing about MyCustomFuture
is that it will need a setter method:
public void set(String msg) throws InterruptedException {
if (state == State.DONE) {
return;
}
blockingReplyHolder.put(msg);
state = State.DONE;
}
顾名思义,
blockingReplyHolder
是实现的字段,其中包含实现诺言的消息,如果仍然不存在,则阻止该消息(请检查
blockingReplyHolder
, as the name suggests, is the implementation's field that holds the message that fulfills the promise and blocks if it still doesn't exist (check Future)
对.现在,当期望的消息到达管道MyAppClientHandler
时,我们可以实现如下承诺:
Right. Now, when the expected message reaches the pipe MyAppClientHandler
, we can fulfill the promise like:
protected void channelRead(ChannelHandlerContext ctx, String msg) throws Exception {
responseFuture.set(msg);
}
生成的自定义API的用法为:
The resulting custom API's usage would be:
MyCustomFuture future = ioClient.send(message);
// do other stuff if needed
String response = future.get(); // waits if necessary
// make use of the response
此答案来自我正在玩的示例.
This answer was born from a example I was toying with.