HBASE 代码翻阅笔记-1 - PUT-3-提交任务2(基于0.94.12)

HBASE 代码阅读笔记-1 - PUT-3-提交任务2(基于0.94.12)
上一篇把提交任务的主流程整理了下,遗留了连接、发送请求、处理响应三个核心流程,今天就继续吧。
首先创建了一个Call对象,这是HBaseClient的一个内部类,代码很简单,贴出来完事。
protected class Call {
        final int id;                                       // call id
        final Writable param;                               // parameter
        Writable value;                               // value, null if error
        IOException error;                            // exception, null if value
        boolean done;                                 // true when call is done
        long startTime;

        protected Call(Writable param) {
            this.param = param;
            this.startTime = System.currentTimeMillis();
            synchronized (HBaseClient.this) {
                this.id = counter++;
            }
        }

        /**
         * Indicate when the call is complete and the
         * value or error are available.  Notifies by default.
         */
        protected synchronized void callComplete() {
            this.done = true;
            notify();                                 // notify caller
        }

        /**
         * Set the exception when there is an error.
         * Notify the caller the call is done.
         *
         * @param error exception thrown by the call; either local or remote
         */
        public synchronized void setException(IOException error) {
            this.error = error;
            callComplete();
        }

        /**
         * Set the return value when there is no error.
         * Notify the caller the call is done.
         *
         * @param value return value of the call.
         */
        public synchronized void setValue(Writable value) {
            this.value = value;
            callComplete();
        }

        public long getStartTime() {
            return this.startTime;
        }
    }


接着是HBaseClient的getConnection方法,这里主要是检查缓存并返回或者创建一个Connection对象,该对象是Thread的一个子类
    protected Connection getConnection(InetSocketAddress addr,
              Class<? extends VersionedProtocol> protocol,
                                       User ticket,
                                       int rpcTimeout,
                                       Call call)
            throws IOException, InterruptedException {
        if (!running.get()) {
            // the client is stopped
            throw new IOException("The client is stopped");
        }
        Connection connection;
    /* we could avoid this allocation for each RPC by having a
     * connectionsId object and with set() method. We need to manage the
     * refs for keys in HashMap properly. For now its ok.
     */
        ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);
        synchronized (connections) {
            connection = connections.get(remoteId);//这里用CHM做了一个缓存
            if (connection == null) {
                connection = createConnection(remoteId);
                connections.put(remoteId, connection);//如果没有连接则创建一个
            }
        }
        connection.addCall(call);

        //we don't invoke the method below inside "synchronized (connections)"
        //block above. The reason for that is if the server happens to be slow,
        //it will take longer to establish a connection and that will slow the
        //entire system down.
        //Moreover, if the connection is currently created, there will be many threads
        // waiting here; as setupIOstreams is synchronized. If the connection fails with a
        // timeout, they will all fail simultaneously. This is checked in setupIOstreams.
        // 这里讲述了为什么setupIOstream方法不放在同步块里,因为这是一个同步操作,如果某一个服务端
        // 变慢的话,这样会拖死整个系统。即便连上了,该方法检查的超时异常也会导致灯带同步块的线程都失效
        connection.setupIOstreams();
        return connection;
    }



见识一下传说中会拖死整个系统的方法
       /**
         * Connect to the server and set up the I/O streams. It then sends
         * a header to the server and starts
         * the connection thread that waits for responses.
         *
         * @throws java.io.IOException e
         */
        protected synchronized void setupIOstreams()
                throws IOException, InterruptedException {

            if (socket != null || shouldCloseConnection.get()) {
                return;
            }//检查连接是否已经关闭

            if (failedServers.isFailedServer(remoteId.getAddress())) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Not trying to connect to " + remoteId.getAddress() +
                            " this server is in the failed servers list");
                }
                IOException e = new FailedServerException(
                        "This server is in the failed servers list: " + remoteId.getAddress());
                markClosed(e);
                close();
                throw e;
            }//检查连接指向的主机是不是已经失效

            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Connecting to " + remoteId);
                }
                setupConnection();//这里是建立连接
                this.in = new DataInputStream(new BufferedInputStream
                        (new PingInputStream(NetUtils.getInputStream(socket))));
                this.out = new DataOutputStream
                        (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
                writeHeader();//初次创建连接需要发送同步头

                // update last activity time
                touch();

                // start the receiver thread after the socket connection has been set up
                start();//开始执行
            } catch (Throwable t) {
                failedServers.addToFailedServers(remoteId.address);
                IOException e;
                if (t instanceof IOException) {
                    e = (IOException) t;
                } else {
                    e = new IOException("Could not set up IO Streams", t);
                }
                markClosed(e);
                close();

                throw e;
            }
        }


建立连接
      protected synchronized void setupConnection() throws IOException {
            short ioFailures = 0;
            short timeoutFailures = 0;
            while (true) {
                try {
                    this.socket = socketFactory.createSocket();
                    //设置nodelay,hbase.ipc.client.tcpnodelay,默认false 
                    //比较关键的参数,默认值坑爹,关键是配置文件里面还没有
                    this.socket.setTcpNoDelay(tcpNoDelay);
                    //设置keepAlive,hbase.ipc.client.tcpkeepalive,默认true
                    this.socket.setKeepAlive(tcpKeepAlive);
                    // connection time out is 20s
                    NetUtils.connect(this.socket, remoteId.getAddress(),
                            getSocketTimeout(conf));//ipc.socket.timeout,默认20000,配置文件里面也没有哦亲
                    if (remoteId.rpcTimeout > 0) {
                        pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
                    }
                    this.socket.setSoTimeout(pingInterval);//这个ipc.ping.interval,默认60s,还是没有配置哦亲
                    return;
                } catch (SocketTimeoutException toe) {
          /* The max number of retries is 45,
           * which amounts to 20s*45 = 15 minutes retries.
           */
                    handleConnectionFailure(timeoutFailures++, maxRetries, toe);
                } catch (IOException ie) {
                    handleConnectionFailure(ioFailures++, maxRetries, ie);
                }
            }
        }


连接完了,开始执行,逻辑很简单,等待响应,然后关闭连接
      public void run() {
            if (LOG.isDebugEnabled())
                LOG.debug(getName() + ": starting, having connections "
                        + connections.size());

            try {
                while (waitForWork()) {//wait here for work - read or close connection
                    receiveResponse();
                }
            } catch (Throwable t) {
                LOG.warn("Unexpected exception receiving call responses", t);
                markClosed(new IOException("Unexpected exception receiving call responses", t));
            }

            close();

            if (LOG.isDebugEnabled())
                LOG.debug(getName() + ": stopped, remaining connections "
                        + connections.size());
        }
       
        protected void receiveResponse() {
            if (shouldCloseConnection.get()) {
                return;
            }
            touch();

            try {
                // See HBaseServer.Call.setResponse for where we write out the response.
                // It writes the call.id (int), a flag byte, then optionally the length
                // of the response (int) followed by data.
                // 接受响应,一个int型的id,一个消息长度也是int型,后续是真实的数据
                // Read the call id.
                int id = in.readInt();

                if (LOG.isDebugEnabled())
                    LOG.debug(getName() + " got value #" + id);
                Call call = calls.get(id);

                // Read the flag byte
                byte flag = in.readByte();
                boolean isError = ResponseFlag.isError(flag);
                if (ResponseFlag.isLength(flag)) {
                    // Currently length if present is unused.
                    in.readInt();
                }
                int state = in.readInt(); // Read the state.  Currently unused.
                if (isError) {
                    if (call != null) {
                        //noinspection ThrowableInstanceNeverThrown
                        call.setException(new RemoteException(WritableUtils.readString(in),
                                WritableUtils.readString(in)));
                    }
                } else {
                    Writable value = ReflectionUtils.newInstance(valueClass, conf);
                    value.readFields(in);                 // read value
                    // it's possible that this call may have been cleaned up due to a RPC
                    // timeout, so check if it still exists before setting the value.
                    if (call != null) {
                        call.setValue(value);
                    }
                }
                calls.remove(id);
            } catch (IOException e) {
                if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
                    // Clean up open calls but don't treat this as a fatal condition,
                    // since we expect certain responses to not make it by the specified
                    // {@link ConnectionId#rpcTimeout}.
                    closeException = e;
                } else {
                    // Since the server did not respond within the default ping interval
                    // time, treat this as a fatal condition and close this connection
                    markClosed(e);
                }
            } finally {
                if (remoteId.rpcTimeout > 0) {
                    cleanupCalls(remoteId.rpcTimeout);
                }
            }
        }


倒回来看看sendParam方法,比较简单了
      protected void sendParam(Call call) {
            if (shouldCloseConnection.get()) {
                return;
            }

            // For serializing the data to be written.

            final DataOutputBuffer d = new DataOutputBuffer();
            try {
                if (LOG.isDebugEnabled())
                    LOG.debug(getName() + " sending #" + call.id);

                d.writeInt(0xdeadbeef); // placeholder for data length
                d.writeInt(call.id);
                call.param.write(d);
                byte[] data = d.getData();
                int dataLength = d.getLength();
                // fill in the placeholder
                Bytes.putInt(data, 0, dataLength - 4);
                //noinspection SynchronizeOnNonFinalField
                synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
                    out.write(data, 0, dataLength);
                    out.flush();
                }
            } catch (IOException e) {
                markClosed(e);
            } finally {
                //the buffer is just an in-memory buffer, but it is still polite to
                // close early
                IOUtils.closeStream(d);
            }
        }



到此为止,整个客户端的PUT操作流程全部读完,收工。准备啃服务端。