施用传统IO包编写的Servlet多线程监听程序
使用传统IO包编写的Servlet多线程监听程序
我是做的 c/s结构,不是 web开发. 所以直接写类,就行了. 没用到 servlet.
消息,有消息包. 分为消息头,和消息体. 解开包头和包体,就可以区分各个用户了.对于消息结构,你们应该有定义吧! 对于你所说的发送给不同的用户,包头内应该有.得到消息后,可以放到 集合里,然后再分发出去.
不知道你们那的程序设计是否是这样. 这就看具体情况了.
我的java程序,对于vc++程序,是客户端,但对于用户,就是服务器.
JDK6.0好像在Linux下支持EPOLL,性能应该还要好一点
这里稍微补充一下,这个支持是从jdk5.0 update9之后开始支持,也就是说jdk5.0后期的版本也是可以从中获益的。
如果用jdk5.0,可以不必非升级到6.0,换成后面的小版本了。
7.0据说要支持真正的aio,这个是个好消息。
下面说一下我的需求:
1.要实现JAVA与VC++的SOCKET通信。
2.要使实现监听多个端口。
3.要在Servlet启动时将监听启动。
4.要实时将数据推送到页面。
———————————————————————————————————————————————————————
根据需求写的线程监听代码如下:
java 代码
- package com.test.youCompany.comet;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.OutputStreamWriter;
- import java.io.PrintWriter;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.Vector;
- import net.sf.json.JSONArray;
- import net.sf.json.JSONException;
- import com.test.youCompany.core.common.Constants;
- import com.test.youCompany.core.json.JsonObjectFactory;
- import com.test.youCompany.core.util.StringUtils;
- import com.test.youCompany.domain.Yc;
- import com.test.youCompany.domain.Yx;
- import dojox.cometd.Bayeux;
- import dojox.cometd.Client;
- public class ServerThread extends Thread
- {
- public static Bayeux bayeux;
- public static Client client;
- public static final String YC = "yc";
- public static final String YX = "yx";
- public static final String TRIP = "trip";
- public static final String PRONUM = "command";
- Socket clientRequest;//用户连接的通信套接字
- ServerSocket serverSocket;
- BufferedReader input; //输入流
- PrintWriter output; //输出流
- private String popedom = "";//线程权限
- // serverThread的构造器
- public ServerThread(int s)
- {
- ServerSocket rServer = null;
- try
- {
- rServer = new ServerSocket(s);
- System.out.println("Welcome to the server!");
- System.out.println(new Date());
- System.out.println("The server is ready!");
- System.out.println("Port: " + rServer.getLocalPort());
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- this.serverSocket=rServer;// 接收receiveServer传来的套接字
- }
- public ServerThread(int s,String yt)
- {
- ServerSocket rServer = null;
- try
- {
- rServer = new ServerSocket(s);
- System.out.println("Welcome to the server!");
- System.out.println(new Date());
- System.out.println("The server is ready!");
- System.out.println("Port: " + rServer.getLocalPort());
- this.setPopedom(yt);
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- this.serverSocket=rServer;// 接收receiveServer传来的套接字
- }
- public static void setBayeux(Bayeux bayeux)
- {
- ServerThread.bayeux = bayeux;
- ServerThread.client =bayeux.newClient(Constants.DFDATACLENT, null);
- // System.out.println("设置协议成功.");
- }
- public void run()
- { //线程的执行方法
- InputStreamReader reader;
- boolean done=false;
- while(!done)
- {
- try
- {
- this.clientRequest = this.serverSocket.accept();
- // System.out.println("New connection accepted "+ clientRequest.getInetAddress() + ":" + clientRequest.getPort());
- if(clientRequest != null)
- {
- reader=new InputStreamReader(clientRequest.getInputStream());
- input=new BufferedReader(reader);
- while (true)
- {
- String message = input.readLine();
- if (message == null)
- break;
- // System.out.println("以下是从端口" + serverSocket.getLocalPort() + "中得到数据");
- if(this.popedom.equalsIgnoreCase(ServerThread.YC))
- {
- if(message.endsWith("]]"))
- sendDataToView(message,ServerThread.YC);
- }
- else if(this.popedom.equalsIgnoreCase(ServerThread.YX))
- {
- if(message.endsWith("]]"))
- sendDataToView(message,ServerThread.YX);
- }
- else if(this.popedom.equalsIgnoreCase(ServerThread.TRIP))
- {
- }
- else if(this.popedom.equalsIgnoreCase(ServerThread.PRONUM))
- {
- }
- // showMessage(message);
- // sender(message.);
- }
- try
- {
- Thread.sleep(300);
- }
- catch (InterruptedException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- else
- {
- try
- {
- Thread.sleep(300);
- }
- catch (InterruptedException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- catch(IOException e)
- {
- System.out.println(e.getMessage());
- }
- }//end of while
- try
- {
- clientRequest.close(); //关闭套接字
- }
- catch(IOException e)
- {
- System.out.println(e.getMessage());
- }
- }
- public void sender(String command)
- {
- OutputStreamWriter writer;
- try
- { //初始化输入、输出流
- writer=new OutputStreamWriter(clientRequest.getOutputStream());
- output=new PrintWriter(writer,true);
- }
- catch(IOException e)
- {
- System.out.println(e.getMessage());
- }
- output.println(command);//客户机连接欢迎词
- }
- /**
- * 将数据传送到页面。
- * @param message
- */
- private void sendDataToView(String message, String str)
- {
- if(bayeux != null && client != null)
- {
- String result;
- Map msg = new HashMap();
- if(this.popedom.equalsIgnoreCase(ServerThread.YC))
- {
- result = getYcDataView(message);
- if(result != null)
- msg.put("chat", result);
- }
- else if(this.popedom.equalsIgnoreCase(ServerThread.YX))
- {
- result = getYxDataView(message);
- if(result != null)
- {
- msg.put("chat", result);
- }
- }
- bayeux.publish(client,"/dfChat/" + str,msg,""+msg.hashCode());
- // System.out.println("数据已发出,请确认。");
- }
- }
- private void pushDataToBus(List<yc></yc> list)
- {
- }
- @SuppressWarnings("unchecked")
- private String getYcDataView(String message)
- {
- int i = 0;
- int j = 0;
- try
- {
- List <list></list> ycData = JsonObjectFactory.getNomalList(message);
- List<yc></yc> jsonData = new Vector<yc></yc>();
- for(Iterator <list></list> equipment = ycData.iterator(); equipment.hasNext();i++)
- {
- List ycList = equipment.next();
- for(Iterator ycValue = ycList.iterator();ycValue.hasNext();j++)
- {
- Yc singleData = new Yc();
- singleData.setModtime(new Date());
- try
- {
- singleData.setValue((Double)ycValue.next());
- }
- catch(ClassCastException e)
- {
- singleData.setValue(Double.valueOf(ycValue.next().toString() + ".0"));
- }
- singleData.setStationid(i);
- singleData.setYcid(j);
- jsonData.add(singleData);
- }
- j = 0;
- }
- pushDataToBus(jsonData);
- return JSONArray.fromCollection(jsonData, StringUtils.getStrings(Constants.YCNONEED)).toString();
- }
- catch(JSONException e)
- {
- System.out.println(e.getMessage());
- return null;
- }
- catch(Exception e)
- {
- System.out.println(e.getMessage());
- return null;
- }
- }
- @SuppressWarnings("unchecked")
- private String getYxDataView(String message)
- {
- int i = 0;
- int j = 0;
- try
- {
- List> ycData = JsonObjectFactory.getNomalList(message);
- List<yx></yx> jsonData = new Vector<yx></yx>();
- for(Iterator> equipment = ycData.iterator(); equipment.hasNext();i++)
- {
- List<integer></integer> yxList = equipment.next();
- for(Iterator<integer></integer> yxValue = yxList.iterator();yxValue.hasNext();j++)
- {
- Yx singleData = new Yx();
- singleData.setStationid(i);
- singleData.setYxid(j);
- singleData.setValue(yxValue.next());
- jsonData.add(singleData);
- }
- j = 0;
- }
- return JSONArray.fromCollection(jsonData, StringUtils.getStrings(Constants.YXNONEED)).toString();
- }
- catch(JSONException e)
- {
- System.out.println(e.getMessage());
- return null;
- }
- }
- private void setPopedom(String popedom)
- {
- this.popedom = popedom;
- }
- }
Servlet代码如下:
java 代码
- package com.test.youCompany.comet;
- import javax.servlet.ServletConfig;
- import javax.servlet.ServletException;
- import javax.servlet.http.HttpServlet;
- import com.test.youCompany.core.common.Constants;
- import com.test.youCompany.core.common.DFPropertyOwner;
- import com.test.youCompany.core.util.StringUtils;
- public class JMessagePushServlet extends HttpServlet
- {
- /**
- *
- */
- private static final long serialVersionUID = -832833032007994994L;
- final int RECEIVE_PORT = 9090;
- public void init(ServletConfig servletConfig) throws ServletException
- {
- super.init(servletConfig);
- Thread serverThread = null;
- try
- {
- String [] ports = StringUtils.getStrings(DFPropertyOwner.getKeyValue("ArrPort", Constants.GLOBAL_PROPERTIES));
- String [] portsRight = StringUtils.getStrings(DFPropertyOwner.getKeyValue("PortRights", Constants.GLOBAL_PROPERTIES));
- for(int i = 0;i
- {
- if(i < portsRight.length)
- serverThread = new ServerThread(Integer.valueOf(ports[i]),portsRight[i]);//监控项目
- else
- serverThread = new ServerThread(Integer.valueOf(ports[i]));//没有指定监控项目
- serverThread.setName(Constants.THREANNAMES + i);
- serverThread.start();
- }
- System.out.println("线程开始启动");
- }
- catch (Exception e)
- {
- System.out.println(e.getMessage());
- }
- }
- }
以下是WEB.xml配置
java 代码
- <servlet></servlet>
- <servlet-name></servlet-name>MessageServer
- class>com.test.youCompany.comet.JMessagePushServletclass>
- <load-on-startup></load-on-startup>1
———————————————————————————————————————————————————————
在使用过程这种方法还是有一些弊端,它的效率不高,在处理大的数据时可能产生错误,比如接受到的是坏数据,等。在网上略看了一下NIO的实现,以后我尝试。
1 楼
liuweipeng
2007-11-29
楼主,你好,看了你的帖子,和我要做的项目很相似,和你的思路基本一样,就是在实时将数据推送到页面的方面有些问题,我采用的dwr框架的长连接技术,但是无法长期保持连接,超过一定时间后就出现异常,请问楼主在这方面是如何解决的!
2 楼
myworkfirst
2007-11-29
二楼的朋友,你好. 我现在也在开发 java程序与vc++程序之间的通讯.不过没用到servlet. 对于你的问题,我认为你可能没有发送心跳包(有的也称之为测试包,或者链路包). 如果是长连接,一般会有连接时间,长时间不发心跳包,服务端就会断开连接.
解决办法: 开启一个 Timer,间隔几秒之内,发送心跳包.心跳包很小.
解决办法: 开启一个 Timer,间隔几秒之内,发送心跳包.心跳包很小.
3 楼
liuweipeng
2007-11-29
谢谢三楼朋友,冒昧的问一句,你没有用serverlet,你采用什么了?
4 楼
dengtl
2007-11-29
数据接收后,把其推送到页面部分可以做优化。
将其推送到一个缓冲区,由一个线程专门处理数据推送,这样可能效率稍微高一点。
将其推送到一个缓冲区,由一个线程专门处理数据推送,这样可能效率稍微高一点。
5 楼
liuweipeng
2007-11-29
对于多用户同时访问的时候,暂不考虑性能,如何对于实时消息如何区分开,并且发送给不同的用户,也就是对于多线程方面如何处理,请指教。
6 楼
myworkfirst
2007-11-29
liuweipeng 写道
谢谢三楼朋友,冒昧的问一句,你没有用serverlet,你采用什么了?
我是做的 c/s结构,不是 web开发. 所以直接写类,就行了. 没用到 servlet.
7 楼
myworkfirst
2007-11-29
liuweipeng 写道
对于多用户同时访问的时候,暂不考虑性能,如何对于实时消息如何区分开,并且发送给不同的用户,也就是对于多线程方面如何处理,请指教。
消息,有消息包. 分为消息头,和消息体. 解开包头和包体,就可以区分各个用户了.对于消息结构,你们应该有定义吧! 对于你所说的发送给不同的用户,包头内应该有.得到消息后,可以放到 集合里,然后再分发出去.
不知道你们那的程序设计是否是这样. 这就看具体情况了.
我的java程序,对于vc++程序,是客户端,但对于用户,就是服务器.
8 楼
dengtl
2007-11-29
同意楼上的。
另外,能否做一个线程池,对单个用户请求从线程池中分配一个线程与客户端进行交互,这样就不存在区分用户了。
另外,能否做一个线程池,对单个用户请求从线程池中分配一个线程与客户端进行交互,这样就不存在区分用户了。
9 楼
dengtl
2007-11-29
对于长连接,我没有这方面的经验。直观感觉来看,长连接只所以存在是不希望频繁的进行TCP/IP握手处理,但如果长时间占用,势必也会对服务器端造成压力。我的建议是每当用户请求建立一个长连接时,则分配一个TimeoutSocket(可以由一般的Socket加上延时关闭功能装饰),当用户上一次传输请求时间超过某一时间后,则断开连接。这当然要客户端配合才行,客户端应该在传输之前检测该连接是否有效,无效则从新建立连接。
这样,或许能够解决长连接所产生的不足。
这样,或许能够解决长连接所产生的不足。
10 楼
liuweipeng
2007-11-29
通过连接池的话,如果管理每个客户端呢,楼上说的心跳检查,那不是成了轮询吗?还不是变成了定时的模式,有没有demo啊?
11 楼
ken1984
2007-11-29
JAVA有个NIO比较火,虽然里面的效率不怎么样,但总比一片空白好,等JDK7支持EPOLL时必然又会火一下了。
12 楼
techno_it
2007-12-04
liuweipeng 写道
楼主,你好,看了你的帖子,和我要做的项目很相似,和你的思路基本一样,就是在实时将数据推送到页面的方面有些问题,我采用的dwr框架的长连接技术,但是无法长期保持连接,超过一定时间后就出现异常,请问楼主在这方面是如何解决的!
不知这位仁兄采用的是什么WEB服务?Tomcate6.0?
13 楼
techno_it
2007-12-04
liuweipeng 写道
通过连接池的话,如果管理每个客户端呢,楼上说的心跳检查,那不是成了轮询吗?还不是变成了定时的模式,有没有demo啊?
demo是有的。<img src='/javascripts/fckeditor/editor/images/smiley/msn/regular_smile.gif' alt=''/>
14 楼
liuweipeng
2007-12-07
techno_it ,可以把demo发一个给我吗?谢谢,liuweipeng_lwp@163.com
15 楼
tomcat4
2008-02-19
我现在要作的是ServerSocket采用Java写,客户端的Client采用C写的,不知道通信要注意什么问题,会不会出现乱码?有经验的朋友帮忙解答一下!
16 楼
techno_it
2008-02-28
要注意,在c段发文的时候在最后加一“\n”别的应该没什么了。
17 楼
ssuupv
2008-02-28
如果并发不高的话,可以用我方法
18 楼
drbeckham
2008-02-29
用MINA吧,效率很好,也支持线程池,我自己测试过在普通配置(C2.6, 512MB)下Winxp SP2的环境MINA能够最大接收上万个Socket连接,不过如果每个连接活动很频繁的话,实际能承受的连接数要打个折的
Java的NIO在Windows下实际就是对完成端口的封装
JDK6.0好像在Linux下支持EPOLL,性能应该还要好一点
不要用Servlet,Servlet有些东西不好控制的,特别是线程
Java的NIO在Windows下实际就是对完成端口的封装
JDK6.0好像在Linux下支持EPOLL,性能应该还要好一点
不要用Servlet,Servlet有些东西不好控制的,特别是线程
19 楼
skydream
2008-02-29
drbeckham 写道
JDK6.0好像在Linux下支持EPOLL,性能应该还要好一点
这里稍微补充一下,这个支持是从jdk5.0 update9之后开始支持,也就是说jdk5.0后期的版本也是可以从中获益的。
如果用jdk5.0,可以不必非升级到6.0,换成后面的小版本了。
7.0据说要支持真正的aio,这个是个好消息。
20 楼
fishelf
2008-03-29
...................