一个十分完善的基于Socket的多服务器通信框架

一个非常完善的基于Socket的多服务器通信框架

一共4个文件

  • XServerReceiver : Socket接受处理XServer返回工具类 XServerReceiver充当服务器
  • XServerSender : Socket连接发送XServer服务器工具类 XServerSender充当客户端
  • ClientTest : 客户端测试程序
  • ServerTest : 服务端测试程序

XServerReceiver.java

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.BindException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentLinkedQueue;

class XServerReceiverThread extends Thread {
    private Socket m_socket = null;
    private InputStream m_input = null;
    private OutputStream m_output = null;
    private BufferedReader m_br = null;
    private String m_clientHost = "";// 客户端的socket地址

    public XServerReceiverThread(Socket socket) {
        try {
            this.m_socket = socket;
            this.m_input = socket.getInputStream();
            this.m_output = socket.getOutputStream();
            this.m_br = new BufferedReader(new InputStreamReader(this.m_input, "UTF-8"));
            this.m_clientHost = m_socket.getRemoteSocketAddress().toString();
            this.m_clientHost = this.m_clientHost.substring(1);//因为m_clientHost为  /192.168.0.83:52177
            this.m_socket.setSoTimeout(3000);

            System.out.println("connection established from " + m_clientHost);
        } catch (Exception ex) {
            ex.printStackTrace();
            closeHandles();
        }
    }

    public void run() {
        try {
            while (true) {
                String event = "";

                while (true) {
                    int one = this.m_br.read();
                    if (one == -1) {
                        closeHandles();
                        break;
                    }

                    event += String.valueOf((char) one);
                    if (one == '\1')// 一般消息尾部截止为一个不可见字符,这里假设为'\1'
                    {
                        XServerReceiver.PushEvent(event);
                        break;
                    }
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            closeHandles();
        }
    }

    private void closeHandles() {
        System.out.println("connection closed from " + m_clientHost);

        try {
            if (this.m_br != null) {
                this.m_br.close();
                this.m_br = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            if (this.m_input != null) {
                this.m_input.close();
                this.m_input = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            if (this.m_output != null) {
                this.m_output.close();
                this.m_output = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            if (this.m_socket != null) {
                this.m_socket.close();
                this.m_socket = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

class XServerListenThread extends Thread {
    private ServerSocket server_socket = null;
    private boolean m_pleaseWait = true;

    public void run() {
        while (true) {
            try {
                if (server_socket == null || server_socket.isClosed() || !server_socket.isBound()) {
                    this.server_socket = new ServerSocket(5600);// XServer.TCP_PORT_FOR_OTHER_SERVER,这里假设为5600
                    System.out.println("TCP Processor is listening on " + 5600);
                    m_pleaseWait = false;
                }

                Socket socket = this.server_socket.accept();
                XServerReceiverThread xt = new XServerReceiverThread(socket);
                xt.start();
            } catch (BindException bex) {
                bex.printStackTrace();

                try {
                    sleep(5000);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

    public void Wait() {
        try {
            while (m_pleaseWait)
                Thread.sleep(10);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

class XServerHandlerThread extends Thread {
    public void run() {
        try {
            while (true) {
                String event = XServerReceiver.PopEvent();

                if (event == null) {
                    Thread.sleep(1000);
                    continue;
                }

                EventHandler(event);// 处理消息事件
                event = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    private void EventHandler(String event) {
        System.out.println("Received event: " + event);
    }
}

/**
 * 
 * Socket接受处理XServer返回工具类
 * XServerReceiver充当服务器
 * 
 * @author vhreal
 * 
 */
public class XServerReceiver {
    static private ConcurrentLinkedQueue<String> m_eventQueue = new ConcurrentLinkedQueue<String>();
    static private XServerListenThread m_XServerListenThread = null;// 监听线程
    static private XServerHandlerThread m_XServerHandlerThread = null;// 处理线程

    static public void PushEvent(String event) {// 外部类调用XServerReceiver.PushEvent(event);
        m_eventQueue.add(event);
    }

    static public String PopEvent() {// 外部类调用XServerReceiver.PopEvent(event);
        if (m_eventQueue.size() == 0)
            return null;

        return m_eventQueue.remove();
    }

    static public void Start()// 外部类调用XServerReceiver.Start();
    {
        m_XServerListenThread = new XServerListenThread();
        m_XServerListenThread.start();
        m_XServerListenThread.Wait();

        m_XServerHandlerThread = new XServerHandlerThread();
        m_XServerHandlerThread.start();
    }
}

XServerSender .java

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentLinkedQueue;

class XServerSenderThread extends Thread {
    private Socket m_socket = null;
    private OutputStream m_output = null;
    private InputStream m_input = null;
    private InputStreamReader m_isr = null;
    private BufferedReader m_br = null;
    private ConcurrentLinkedQueue<String> m_Events = new ConcurrentLinkedQueue<String>();

    private String m_ServerHost = "192.168.0.83";
    private int m_ServerPort = 5600;

    public void run() {
        long lastkeepalive = 0;
        long now = 0;

        while (true) {
            try {
                if (m_socket == null || m_socket.isClosed()) {
                    Connect2XServer();
                    continue;
                }

                now = System.currentTimeMillis() / 1000;
                if (now - lastkeepalive > 2)// 2s发一次心跳
                {
                    RequestKeepAlive();
                    lastkeepalive = now;
                }

                String event = PopEvent();
                if (event == null) {
                    Thread.sleep(1000);
                    continue;
                }

                SendEvent2XServer(event);
                event = null;
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

    public void PushEvent(String event) {
        m_Events.add(event);
    }

    private String PopEvent() {
        if (m_Events.size() == 0)
            return null;

        return m_Events.remove();
    }

    private boolean SendEvent2XServer(String event) {
        try {
            m_output.write(event.getBytes());

            m_output.flush();
            return true;
        } catch (Exception ex) {
            closeHandles();
            return false;
        }
    }

    private boolean RequestKeepAlive() {
        try {
            String json = String
                    .format("{\"AttributeTimeinSecs\":\"%d\",\"AttributeEventName\":\"EventKeepAlive\"}",
                            System.currentTimeMillis() / 1000);
            m_output.write(json.getBytes());
            m_output.flush();
            return true;
        } catch (Exception ex) {
            closeHandles();
            return false;
        }
    }

    private boolean Connect2XServer() {
        try {
            System.out.println("connecting to XServer");

            SocketAddress endpoint = new InetSocketAddress(m_ServerHost, m_ServerPort);// XServer的Ip和端口号
            m_socket = new Socket();
            m_socket.setSoTimeout(3000);// 3s网络超时
            m_socket.connect(endpoint);

            m_output = m_socket.getOutputStream();
            m_input = m_socket.getInputStream();
            m_isr = new InputStreamReader(m_input, "UTF-8");
            m_br = new BufferedReader(m_isr);

            System.out.println("connected to XServer");
            return true;
        } catch (Exception ex) {
            ex.printStackTrace();
            closeHandles();
        }

        return false;
    }

    private void closeHandles() {
        try {
            if (m_isr != null) {
                m_isr.close();
                m_isr = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            if (m_br != null) {
                m_br.close();
                m_br = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            if (m_input != null) {
                m_input.close();
                m_input = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            if (m_output != null) {
                m_output.close();
                m_output = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            if (m_socket != null) {
                m_socket.close();
                m_socket = null;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        try {
            Thread.sleep(1000);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

/**
 * 
 * Socket连接发送XServer服务器工具类 XServerSender充当客户端
 * 
 * @author vhreal
 * 
 */
public class XServerSender {
    static private XServerSenderThread m_SenderThread = null;

    static public void Start() {// 外部类调用XServerSender.Start();
        try {
            m_SenderThread = new XServerSenderThread();
            m_SenderThread.start();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    static public void SendEvent2XServer(String event) {// 外部类调用XServerSender.SendEvent2XServer(event);
        if (m_SenderThread != null) {
            m_SenderThread.PushEvent(event);
        }
    }
}

ClientTest.java

public class ClientTest {

    public static void main(String[] args) {
        XServerSender.Start();
        XClientThread xc = new XClientThread();
        xc.start();     
    }

}

class XClientThread extends Thread {
    public void run() {
        try {
            while (true) {
                String s = String.format("hello,Xsever!\1");
                XServerSender.SendEvent2XServer(s);
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

ServerTest.java


public class ServerTest {

    public static void main(String[] args) {
        XServerReceiver.Start();
    }

}

测试结果
一个十分完善的基于Socket的多服务器通信框架

一个十分完善的基于Socket的多服务器通信框架