GRPC/C ++-如何检测异步服务器中的客户端断开连接

问题描述:

我正在使用此示例来创建我的GRPC异步服务器:

I am using the code of this example to create my GRPC async server:

#include <memory>
#include <iostream>
#include <string>
#include <thread>

#include <grpcpp/grpcpp.h>
#include <grpc/support/log.h>

#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif

using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerCompletionQueue;
using grpc::Status;
using helloworld::HelloRequest;
using helloworld::HelloReply;
using helloworld::Greeter;

class ServerImpl final {
 public:
  ~ServerImpl() {
    server_->Shutdown();
    // Always shutdown the completion queue after the server.
    cq_->Shutdown();
  }

  // There is no shutdown handling in this code.
  void Run() {
    std::string server_address("0.0.0.0:50051");

    ServerBuilder builder;
    // Listen on the given address without any authentication mechanism.
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
    // Register "service_" as the instance through which we'll communicate with
    // clients. In this case it corresponds to an *asynchronous* service.

    //LINES ADDED BY ME TO IMPLEMENT KEEPALIVE 
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
    builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 2000);
    builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 3000);
    builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
    //END OF LINES ADDED BY ME

    builder.RegisterService(&service_);
    // Get hold of the completion queue used for the asynchronous communication
    // with the gRPC runtime.
    cq_ = builder.AddCompletionQueue();
    // Finally assemble the server.
    server_ = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;

    // Proceed to the server's main loop.
    HandleRpcs();
  }

 private:
  // Class encompasing the state and logic needed to serve a request.
  class CallData {
   public:
    // Take in the "service" instance (in this case representing an asynchronous
    // server) and the completion queue "cq" used for asynchronous communication
    // with the gRPC runtime.
    CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
        : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
      // Invoke the serving logic right away.
      Proceed();
    }

    void Proceed() {
      if (status_ == CREATE) {
        // Make this instance progress to the PROCESS state.
        status_ = PROCESS;

        // As part of the initial CREATE state, we *request* that the system
        // start processing SayHello requests. In this request, "this" acts are
        // the tag uniquely identifying the request (so that different CallData
        // instances can serve different requests concurrently), in this case
        // the memory address of this CallData instance.
        service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                                  this);
      } else if (status_ == PROCESS) {
        // Spawn a new CallData instance to serve new clients while we process
        // the one for this CallData. The instance will deallocate itself as
        // part of its FINISH state.
        new CallData(service_, cq_);

        // The actual processing.
        std::string prefix("Hello ");
        reply_.set_message(prefix + request_.name());

        // And we are done! Let the gRPC runtime know we've finished, using the
        // memory address of this instance as the uniquely identifying tag for
        // the event.
        status_ = FINISH;
        responder_.Finish(reply_, Status::OK, this);
      } else {
        GPR_ASSERT(status_ == FINISH);
        // Once in the FINISH state, deallocate ourselves (CallData).
        delete this;
      }
    }

   private:
    // The means of communication with the gRPC runtime for an asynchronous
    // server.
    Greeter::AsyncService* service_;
    // The producer-consumer queue where for asynchronous server notifications.
    ServerCompletionQueue* cq_;
    // Context for the rpc, allowing to tweak aspects of it such as the use
    // of compression, authentication, as well as to send metadata back to the
    // client.
    ServerContext ctx_;

    // What we get from the client.
    HelloRequest request_;
    // What we send back to the client.
    HelloReply reply_;

    // The means to get back to the client.
    ServerAsyncResponseWriter<HelloReply> responder_;

    // Let's implement a tiny state machine with the following states.
    enum CallStatus { CREATE, PROCESS, FINISH };
    CallStatus status_;  // The current serving state.
  };

  // This can be run in multiple threads if needed.
  void HandleRpcs() {
    // Spawn a new CallData instance to serve new clients.
    new CallData(&service_, cq_.get());
    void* tag;  // uniquely identifies a request.
    bool ok;
    while (true) {
      // Block waiting to read the next event from the completion queue. The
      // event is uniquely identified by its tag, which in this case is the
      // memory address of a CallData instance.
      // The return value of Next should always be checked. This return value
      // tells us whether there is any kind of event or cq_ is shutting down.
      GPR_ASSERT(cq_->Next(&tag, &ok));
      GPR_ASSERT(ok);
      static_cast<CallData*>(tag)->Proceed();
    }
  }

  std::unique_ptr<ServerCompletionQueue> cq_;
  Greeter::AsyncService service_;
  std::unique_ptr<Server> server_;
};

int main(int argc, char** argv) {
  ServerImpl server;
  server.Run();

  return 0;
}

因为我进行了研究,所以发现我必须实现KeepAlive(

Because I've made a research and there I found out that I have to implement KeepAlive ( https://grpc.github.io/grpc/cpp/md_doc_keepalive.html ) i've added those lines:

builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 2000);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 3000);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);

到目前为止,服务器可以正常工作并且通信正在畅通.但是,如何检测到客户端已断开连接?我为使所谓的 KeepAlive 方法添加的行似乎对我不起作用.

So far so good, server works and communication is flowing. However how can I detect that client has disconnected ? Lines I've added to make the so called KeepAlive method seems to not work for me.

我的错误在哪里?当客户端由于任何原因断开连接时,如何在异步服务器上进行检测?

Where is my mistake and how can I detect on a Async server when client has been disconnected by any reason?

让我从一些背景知识开始.

Let me start with a little background information.

了解gRPC的重要一件事是它使用HTTP/2在单个TCP连接上多路复用多个流.每个gRPC调用都是一个单独的流,无论该调用是一元调用还是流传输.一般而言,任何gRPC调用都可以从双方发送零个或多个消息.一元呼叫只是一种特殊情况,它从客户端到服务器只有一条消息,然后是从服务器到客户端只有一条消息.

One thing that's important to understand about gRPC is that it uses HTTP/2 to multiplex many streams on a single TCP connection. Each gRPC call is a separate stream, regardless of whether the call is unary or streaming. Generically speaking, any gRPC call can have zero or more messages sent from both sides; a unary call is just a special case that has exactly one message from the client to the server followed by exactly one message from the server to the client.

我们通常在单词"disconnected"中使用是指TCP连接断开,而不是指单个流终止,尽管有时人们使用相反的含义.我不确定您在这里指的是什么,所以我都会回答.

We generally use the word "disconnected" to refer to the TCP connection breaking, not to an individual stream terminating, although sometimes people use the word in the opposite sense. I'm not sure which one you mean here, so I'll answer both.

gRPC API向应用程序公开流的生命周期,但不公开TCP连接生命周期.目的是该库处理所有管理TCP连接的细节并将它们隐藏在应用程序中-我们实际上并没有提供一种方法来告知何时断开连接,因此您无需担心,因为库将自动为您重新连接.:)对于应用程序而言唯一可见的情况是,如果单个TCP连接出现故障时流已经在传输中,则这些流将失败.

The gRPC API exposes the stream lifecycle to the application, but it does not expose TCP connection lifecycle. The intent is that the library handles all of the details of managing TCP connections and hides them from the application -- we don't actually expose a way to tell when a connection has dropped, and you shouldn't need to care, because the library will automatically reconnect for you. :) The only case where this will be visible to the application is that if there are streams that are already in flight on an individual TCP connection when it fails, those streams will fail.

正如我所说,库确实向应用程序公开了各个流的生命周期;流的生存期基本上就是上面代码中 CallData 对象的生存期.有两种方法可以确定流是否已终止.一种是显式调用 ServerContext :: IsCancelled().另一种是在CQ上请求一个事件,以通过异步通知取消应用程序 ServerContext :: AsyncNotifyWhenDone() .

As I said, the library does expose the lifecycle of individual streams to the application; the lifetime of the stream is basically the lifetime of the CallData object in the code above. There are two ways to find out if the stream has been terminated. One is to explicitly call ServerContext::IsCancelled(). The other is to request an event on the CQ to asynchronously notify the application of cancellation via ServerContext::AsyncNotifyWhenDone().

请注意,一般而言,像上面的HelloWorld这样的一元示例实际上并不需要担心检测流取消,因为从服务器的角度来看,整个流的持续时间并不长.通常在流式调用的情况下更有用.但是也有一些例外情况,例如您的一元调用在发送响应之前必须执行大量昂贵的异步工作.

Note that in general, unary examples like the HelloWorld one above don't really need to worry about detecting stream cancellation, since the entire stream doesn't really last very long from the server's perspective. It's generally more useful in the case of a streaming call. But there are some exceptions, such as if you have a unary call that has to do a lot of expensive asynchronous work before sending a response.

我希望此信息对您有所帮助.

I hope this info is helpful.