Java,如何管理线程读取套接字(websocket)?
我有一个WebSocket服务器.
I have a WebSocket server.
我的服务器创建了一个新线程来处理新连接.该线程一直存在,直到websocket断开为止.
my server create a new thread for handle a new connection.the thread is live until websocket break.
我的问题:对于1_000_000个连接,我需要1_000_000个线程.我如何通过线程处理许多websocket?没有等待?
my problem: for 1_000_000 connections i need 1_000_000 threads. how i can handle many websockets by a thread? without wait?
ServerSocket server;
private ExecutorService executor = new ThreadPoolExecutor(1_000_000 , 1_000_000 , 7, TimeUnit.SECONDS, queue, threadFactory);
try
{
server = new ServerSocket(port);
}
catch (IOException e) {}
while (true)
{
Socket client = null;
try
{
client = server.accept();
Runnable r = new Runnable()
{
run()
{
// this is simple, original is complete WebSocket imp
client.getInputStream().read();
}
};
executor.execute(r);
}
catch (IOException e) {}
}
考虑一下,您具有套接字映射,每当收到一条消息到服务器时,您都将获得消息和相关的套接字!
Think about this you have a map of sockets and every time a message received to server you will get message and related socket !
此操作是使用OS(linux,windows,unix,mac-OS ...)内核完成的!
this operation done with OS(linux , windows , unix , mac-OS , ...) kernel !
因此您可以在一个线程中处理一百万个连接!
so you can handle a million connection just in one thread !
我们将此称为 Non-Blocking套接字,这意味着它们从不阻止您的线程进行读写,也不会阻止任何其他操作,例如accept和...!
we call this None-Blocking sockets which means they never block your thread to read or write or any other operation such as accept and ... !
java有一个软件包可以处理这个问题! java.nio.*
java has a package to handle this ! java.nio.*
它是如何工作的?
- 用于处理IO操作的线程
- 一个选择器,用于选择哪个套接字具有操作以及什么类型的操作
- ByteBuffer来处理读写操作,而不是在阻塞套接字中使用socket.stream
您还可以使用多个线程和选择器(每个选择器都有其自己的线程)
看这个例子:
NoneBlockingServer.java :
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NoneBlockingServer {
public static void main(String[] args) throws Exception
{
runServer("localhost" , 5050);
}
private final static void runServer(String host , int port)throws Exception {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(host, port));
serverSocketChannel.configureBlocking(false); //config to be a none-blocking serve-socket
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//register to selector for operation ACCEPT !
//also you can use selectionKey for some other stuffs !
while (true) {
int numberOfReadSockets = selector.select();
//it will wait until a socket(s) be ready for some io operation
//or other threads call selector.wakeup()
if(numberOfReadSockets==0){
//maybe selector.wakeup() called
//do some sync operations here !
continue; // continue selecting !
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext())
{
SelectionKey key = keys.next();
keys.remove(); //remove selected key from current selection !
//handle selected key
if(key.isValid() && key.isReadable())
{
//it means this socket is valid and has data to read
SocketChannel socketChannel =
(SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(100); // allocate 100 bytes for buffer
//maybe you must use an allocated buffer for each connection
// instead of allocate for each operation
int read = socketChannel.read(buffer);
if(read<0)
{
//need to close channel !
socketChannel.close(); // explicitly remove from selector
System.out.println("CONNECTION CLOSED");
continue; //socket closed and other operations will skip
}else
{
buffer.flip(); // you need to learn work with ByteBuffers
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
//maybe convert it to String
String msg = new String(bytes);
//use msg !
System.out.println("MESSAGE : "+msg);
key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
//set interestOps to WRIT and READ to write hello back message !
key.attach(ByteBuffer.wrap("Hello Client !".getBytes("UTF-8")));
//wrap a array of bytes using wrap and attach it to selectionKey
}
}
if(key.isValid() && key.isWritable())
{
//it means this socket is valid and have space to write data !
SocketChannel socketChannel =
(SocketChannel) key.channel();
//you must represent data you want to write to this socket
//maybe attached to selection key !
ByteBuffer dataToWrite = (ByteBuffer) key.attachment();
//key.attachment here to help u have some meta data about each socket
//use it smart !
int write = socketChannel.write(dataToWrite);
if(write<0)
{
//so means some error occurs better to close it !
socketChannel.close();
System.out.println("CONNECTION CLOSED !"); //log
continue;//as socket closed we will skip next operations !
}else if(!dataToWrite.hasRemaining())
{
//so all data putted to buffer !
key.interestOps(SelectionKey.OP_READ); // just need to read !
}
}
if(key.isValid() && key.isAcceptable())
{
ServerSocketChannel server =
(ServerSocketChannel) key.channel();//just server channels has accept operation
SocketChannel socketChannel = server.accept(); //accept it !
socketChannel.configureBlocking(false); // config none-blocking mode
socketChannel.register(selector , SelectionKey.OP_READ);
//also you can register for multiple operation using | operation
//for example register for both read and write SelectionKey.READ|SelectionKey.WRITE
//also you can change is late using key.interestOps(int ops)
System.out.println("NEW CONNECTION"); //log
}
//there is another type of key, key.isConnectable()
//google it !
}
}
}
}
,这里是 BlockingClient.java :
import java.net.InetSocketAddress;
import java.net.Socket;
public class BlockingClient {
//using blocking sockets !
public static void main(String[] args)throws Exception
{
Socket socket = new Socket();
socket.connect(new InetSocketAddress("localhost" , 5050));
socket.getOutputStream()
.write("Hello Server".getBytes("UTF-8"));
byte[] buffer = new byte[100];
int len = socket.getInputStream().read(buffer);
System.out.println(new String(buffer , 0 , len , "UTF-8"));
socket.close();
}
}
在此示例中,我们将"Hello Server"消息从阻止客户端"发送到无阻止服务器",服务器将响应"Hello Client"消息!
at this example we send Hello Server message from Blocking Client to None-Blocking Server and server will response Hello Client message !
快跑!
祝你好运