施用传统IO包编写的Servlet多线程监听程序

使用传统IO包编写的Servlet多线程监听程序

下面说一下我的需求:

1.要实现JAVA与VC++的SOCKET通信。

2.要使实现监听多个端口。

3.要在Servlet启动时将监听启动。

4.要实时将数据推送到页面。

———————————————————————————————————————————————————————

根据需求写的线程监听代码如下:

java 代码
  1. package com.test.youCompany.comet;   
  2.   
  3. import java.io.BufferedReader;   
  4. import java.io.IOException;   
  5. import java.io.InputStreamReader;   
  6. import java.io.OutputStreamWriter;   
  7. import java.io.PrintWriter;   
  8. import java.net.ServerSocket;   
  9. import java.net.Socket;   
  10. import java.util.Date;   
  11. import java.util.HashMap;   
  12. import java.util.Iterator;   
  13. import java.util.List;   
  14. import java.util.Map;   
  15. import java.util.Vector;   
  16.   
  17. import net.sf.json.JSONArray;   
  18. import net.sf.json.JSONException;   
  19.   
  20. import com.test.youCompany.core.common.Constants;   
  21. import com.test.youCompany.core.json.JsonObjectFactory;   
  22. import com.test.youCompany.core.util.StringUtils;   
  23. import com.test.youCompany.domain.Yc;   
  24. import com.test.youCompany.domain.Yx;   
  25.   
  26. import dojox.cometd.Bayeux;   
  27. import dojox.cometd.Client;   
  28.   
  29. public class ServerThread extends Thread   
  30. {   
  31.     public static Bayeux bayeux;   
  32.     public static Client client;   
  33.     public static final String YC = "yc";   
  34.     public static final String YX = "yx";   
  35.     public static final String TRIP = "trip";   
  36.     public static final String PRONUM = "command";   
  37.        
  38.     Socket clientRequest;//用户连接的通信套接字    
  39.     ServerSocket serverSocket;   
  40.     BufferedReader input; //输入流    
  41.     PrintWriter output; //输出流    
  42.     private String popedom = "";//线程权限   
  43.     //  serverThread的构造器   
  44.     public ServerThread(int s)    
  45.     {       
  46.         ServerSocket rServer = null;   
  47.         try  
  48.         {   
  49.                
  50.             rServer = new ServerSocket(s);   
  51.             System.out.println("Welcome to the server!");   
  52.             System.out.println(new Date());   
  53.             System.out.println("The server is ready!");   
  54.             System.out.println("Port: " + rServer.getLocalPort());   
  55.         }   
  56.         catch (IOException e)   
  57.         {   
  58.             // TODO Auto-generated catch block   
  59.             e.printStackTrace();   
  60.         }   
  61.            
  62.         this.serverSocket=rServer;//    接收receiveServer传来的套接字    
  63.     }    
  64.     public ServerThread(int s,String yt)    
  65.     {       
  66.         ServerSocket rServer = null;   
  67.         try  
  68.         {   
  69.                
  70.             rServer = new ServerSocket(s);   
  71.             System.out.println("Welcome to the server!");   
  72.             System.out.println(new Date());   
  73.             System.out.println("The server is ready!");   
  74.             System.out.println("Port: " + rServer.getLocalPort());   
  75.             this.setPopedom(yt);   
  76.         }   
  77.         catch (IOException e)   
  78.         {   
  79.             // TODO Auto-generated catch block   
  80.             e.printStackTrace();   
  81.         }   
  82.            
  83.         this.serverSocket=rServer;//    接收receiveServer传来的套接字    
  84.     }      
  85.     public static void setBayeux(Bayeux bayeux)   
  86.     {   
  87.         ServerThread.bayeux = bayeux;   
  88.         ServerThread.client =bayeux.newClient(Constants.DFDATACLENT, null);   
  89. //      System.out.println("设置协议成功.");   
  90.     }   
  91.     public void run()   
  92.     {   //线程的执行方法    
  93.         InputStreamReader reader;    
  94.         boolean done=false;   
  95.         while(!done)   
  96.         {    
  97.             try  
  98.             {   
  99.                 this.clientRequest = this.serverSocket.accept();   
  100. //              System.out.println("New connection accepted "+ clientRequest.getInetAddress() + ":" + clientRequest.getPort());   
  101.                 if(clientRequest != null)   
  102.                 {   
  103.                     reader=new InputStreamReader(clientRequest.getInputStream());   
  104.                     input=new BufferedReader(reader);   
  105.                        
  106.                     while (true)   
  107.                     {   
  108.                         String message = input.readLine();   
  109.                         if (message == null)   
  110.                             break;   
  111.                            
  112. //                      System.out.println("以下是从端口" + serverSocket.getLocalPort() + "中得到数据");   
  113.                         if(this.popedom.equalsIgnoreCase(ServerThread.YC))   
  114.                         {   
  115.                             if(message.endsWith("]]"))   
  116.                                 sendDataToView(message,ServerThread.YC);   
  117.                         }   
  118.                         else if(this.popedom.equalsIgnoreCase(ServerThread.YX))   
  119.                         {   
  120.                             if(message.endsWith("]]"))   
  121.                                 sendDataToView(message,ServerThread.YX);   
  122.                         }   
  123.                         else if(this.popedom.equalsIgnoreCase(ServerThread.TRIP))   
  124.                         {   
  125.                                
  126.                         }   
  127.                         else if(this.popedom.equalsIgnoreCase(ServerThread.PRONUM))   
  128.                         {   
  129.                                
  130.                         }   
  131. //                      showMessage(message);   
  132. //                      sender(message.);   
  133.                     }   
  134.                     try  
  135.                     {   
  136.                         Thread.sleep(300);   
  137.                     }   
  138.                     catch (InterruptedException e)   
  139.                     {   
  140.                         // TODO Auto-generated catch block   
  141.                         e.printStackTrace();   
  142.                     }   
  143.                 }   
  144.                 else  
  145.                 {   
  146.                     try  
  147.                     {   
  148.                         Thread.sleep(300);   
  149.                     }   
  150.                     catch (InterruptedException e)   
  151.                     {   
  152.                         // TODO Auto-generated catch block   
  153.                         e.printStackTrace();   
  154.                     }   
  155.                 }   
  156.             }   
  157.             catch(IOException e)   
  158.             {    
  159.                 System.out.println(e.getMessage());   
  160.             }    
  161.         }//end of while    
  162.         try  
  163.         {    
  164.             clientRequest.close(); //关闭套接字    
  165.         }   
  166.         catch(IOException e)   
  167.         {    
  168.             System.out.println(e.getMessage());    
  169.         }    
  170.   
  171.     }   
  172.     public  void sender(String command)   
  173.     {   
  174.         OutputStreamWriter writer;    
  175.         try  
  176.         { //初始化输入、输出流    
  177.             writer=new OutputStreamWriter(clientRequest.getOutputStream());    
  178.             output=new PrintWriter(writer,true);    
  179.         }   
  180.         catch(IOException e)   
  181.         {    
  182.             System.out.println(e.getMessage());   
  183.         }    
  184.         output.println(command);//客户机连接欢迎词    
  185.     }   
  186.     /**  
  187.      * 将数据传送到页面。  
  188.      * @param message  
  189.      */  
  190.     private  void sendDataToView(String message, String str)   
  191.     {   
  192.         if(bayeux != null && client != null)   
  193.         {   
  194.             String result;   
  195.             Map msg = new HashMap();   
  196.             if(this.popedom.equalsIgnoreCase(ServerThread.YC))   
  197.             {   
  198.                 result = getYcDataView(message);   
  199.                 if(result != null)   
  200.                     msg.put("chat", result);   
  201.             }   
  202.             else if(this.popedom.equalsIgnoreCase(ServerThread.YX))   
  203.             {   
  204.                 result = getYxDataView(message);   
  205.                 if(result != null)   
  206.                 {   
  207.                     msg.put("chat", result);   
  208.                        
  209.                 }   
  210.             }   
  211.             bayeux.publish(client,"/dfChat/" + str,msg,""+msg.hashCode());   
  212. //          System.out.println("数据已发出,请确认。");   
  213.         }   
  214.     }   
  215.     private void pushDataToBus(List<yc></yc> list)   
  216.     {   
  217.            
  218.     }   
  219.     @SuppressWarnings("unchecked")   
  220.     private String getYcDataView(String message)   
  221.     {   
  222.         int i = 0;   
  223.         int j = 0;   
  224.         try  
  225.         {   
  226.             List <list></list>  ycData =  JsonObjectFactory.getNomalList(message);   
  227.             List<yc></yc> jsonData = new Vector<yc></yc>();   
  228.             for(Iterator <list></list>  equipment = ycData.iterator(); equipment.hasNext();i++)   
  229.             {   
  230.                 List ycList = equipment.next();   
  231.                 for(Iterator ycValue = ycList.iterator();ycValue.hasNext();j++)   
  232.                 {   
  233.                     Yc singleData = new Yc();   
  234.                     singleData.setModtime(new Date());   
  235.                     try  
  236.                     {   
  237.                         singleData.setValue((Double)ycValue.next());   
  238.                     }   
  239.                     catch(ClassCastException e)   
  240.                     {   
  241.                         singleData.setValue(Double.valueOf(ycValue.next().toString() + ".0"));   
  242.                     }   
  243.                     singleData.setStationid(i);   
  244.                     singleData.setYcid(j);   
  245.                     jsonData.add(singleData);   
  246.                        
  247.                 }   
  248.                 j = 0;   
  249.             }   
  250.             pushDataToBus(jsonData);   
  251.             return JSONArray.fromCollection(jsonData, StringUtils.getStrings(Constants.YCNONEED)).toString();   
  252.         }   
  253.         catch(JSONException e)   
  254.         {   
  255.             System.out.println(e.getMessage());   
  256.             return null;   
  257.         }   
  258.         catch(Exception e)   
  259.         {   
  260.             System.out.println(e.getMessage());   
  261.             return null;   
  262.         }   
  263.     }   
  264.     @SuppressWarnings("unchecked")   
  265.     private String getYxDataView(String message)   
  266.     {   
  267.         int i = 0;   
  268.         int j = 0;   
  269.         try  
  270.         {   
  271.             List> ycData =  JsonObjectFactory.getNomalList(message);   
  272.             List<yx></yx> jsonData = new Vector<yx></yx>();   
  273.             for(Iterator> equipment = ycData.iterator(); equipment.hasNext();i++)   
  274.             {   
  275.                 List<integer></integer> yxList = equipment.next();   
  276.                 for(Iterator<integer></integer> yxValue = yxList.iterator();yxValue.hasNext();j++)   
  277.                 {   
  278.                     Yx singleData = new Yx();   
  279.                     singleData.setStationid(i);   
  280.                     singleData.setYxid(j);   
  281.                     singleData.setValue(yxValue.next());   
  282.                     jsonData.add(singleData);   
  283.                 }   
  284.                 j = 0;   
  285.             }   
  286.            
  287.             return JSONArray.fromCollection(jsonData, StringUtils.getStrings(Constants.YXNONEED)).toString();   
  288.         }   
  289.         catch(JSONException e)   
  290.         {   
  291.             System.out.println(e.getMessage());   
  292.             return null;   
  293.         }   
  294.     }      
  295.     private void setPopedom(String popedom)   
  296.     {   
  297.         this.popedom = popedom;   
  298.     }   
  299.        
  300. }   

Servlet代码如下:

 

java 代码
  1. package com.test.youCompany.comet;   
  2.   
  3. import javax.servlet.ServletConfig;   
  4. import javax.servlet.ServletException;   
  5. import javax.servlet.http.HttpServlet;   
  6.   
  7. import com.test.youCompany.core.common.Constants;   
  8. import com.test.youCompany.core.common.DFPropertyOwner;   
  9. import com.test.youCompany.core.util.StringUtils;   
  10.   
  11. public class JMessagePushServlet extends HttpServlet   
  12. {   
  13.   
  14.     /**  
  15.      *   
  16.      */  
  17.     private static final long serialVersionUID = -832833032007994994L;   
  18.     final int RECEIVE_PORT = 9090;   
  19.     public void init(ServletConfig servletConfig) throws ServletException   
  20.     {   
  21.         super.init(servletConfig);   
  22.         Thread serverThread = null;   
  23.         try  
  24.         {   
  25.                
  26.             String [] ports = StringUtils.getStrings(DFPropertyOwner.getKeyValue("ArrPort", Constants.GLOBAL_PROPERTIES));   
  27.             String [] portsRight = StringUtils.getStrings(DFPropertyOwner.getKeyValue("PortRights", Constants.GLOBAL_PROPERTIES));   
  28.             for(int i = 0;i
  29.             {   
  30.                 if(i < portsRight.length)   
  31.                     serverThread = new ServerThread(Integer.valueOf(ports[i]),portsRight[i]);//监控项目   
  32.                 else  
  33.                     serverThread = new ServerThread(Integer.valueOf(ports[i]));//没有指定监控项目   
  34.                 serverThread.setName(Constants.THREANNAMES + i);   
  35.                 serverThread.start();   
  36.             }   
  37.             System.out.println("线程开始启动");   
  38.         }   
  39.         catch (Exception e)   
  40.         {   
  41.             System.out.println(e.getMessage());   
  42.         }           
  43.     }      
  44.        
  45. }  

以下是WEB.xml配置

java 代码
  1. <servlet></servlet>    
  2.         <servlet-name></servlet-name>MessageServer    
  3.         class>com.test.youCompany.comet.JMessagePushServletclass>    
  4.         <load-on-startup></load-on-startup>1    
  5.        

———————————————————————————————————————————————————————

在使用过程这种方法还是有一些弊端,它的效率不高,在处理大的数据时可能产生错误,比如接受到的是坏数据,等。在网上略看了一下NIO的实现,以后我尝试。

1 楼 liuweipeng 2007-11-29  
楼主,你好,看了你的帖子,和我要做的项目很相似,和你的思路基本一样,就是在实时将数据推送到页面的方面有些问题,我采用的dwr框架的长连接技术,但是无法长期保持连接,超过一定时间后就出现异常,请问楼主在这方面是如何解决的!
2 楼 myworkfirst 2007-11-29  
  二楼的朋友,你好. 我现在也在开发 java程序与vc++程序之间的通讯.不过没用到servlet. 对于你的问题,我认为你可能没有发送心跳包(有的也称之为测试包,或者链路包). 如果是长连接,一般会有连接时间,长时间不发心跳包,服务端就会断开连接.
   解决办法: 开启一个 Timer,间隔几秒之内,发送心跳包.心跳包很小.
3 楼 liuweipeng 2007-11-29  
谢谢三楼朋友,冒昧的问一句,你没有用serverlet,你采用什么了?
4 楼 dengtl 2007-11-29  
数据接收后,把其推送到页面部分可以做优化。
将其推送到一个缓冲区,由一个线程专门处理数据推送,这样可能效率稍微高一点。
5 楼 liuweipeng 2007-11-29  
对于多用户同时访问的时候,暂不考虑性能,如何对于实时消息如何区分开,并且发送给不同的用户,也就是对于多线程方面如何处理,请指教。
6 楼 myworkfirst 2007-11-29  
liuweipeng 写道
谢谢三楼朋友,冒昧的问一句,你没有用serverlet,你采用什么了?

    我是做的 c/s结构,不是 web开发.  所以直接写类,就行了. 没用到 servlet.
7 楼 myworkfirst 2007-11-29  
liuweipeng 写道
对于多用户同时访问的时候,暂不考虑性能,如何对于实时消息如何区分开,并且发送给不同的用户,也就是对于多线程方面如何处理,请指教。

    消息,有消息包. 分为消息头,和消息体. 解开包头和包体,就可以区分各个用户了.对于消息结构,你们应该有定义吧! 对于你所说的发送给不同的用户,包头内应该有.得到消息后,可以放到 集合里,然后再分发出去.
不知道你们那的程序设计是否是这样. 这就看具体情况了.
    我的java程序,对于vc++程序,是客户端,但对于用户,就是服务器.
8 楼 dengtl 2007-11-29  
同意楼上的。
另外,能否做一个线程池,对单个用户请求从线程池中分配一个线程与客户端进行交互,这样就不存在区分用户了。
9 楼 dengtl 2007-11-29  
对于长连接,我没有这方面的经验。直观感觉来看,长连接只所以存在是不希望频繁的进行TCP/IP握手处理,但如果长时间占用,势必也会对服务器端造成压力。我的建议是每当用户请求建立一个长连接时,则分配一个TimeoutSocket(可以由一般的Socket加上延时关闭功能装饰),当用户上一次传输请求时间超过某一时间后,则断开连接。这当然要客户端配合才行,客户端应该在传输之前检测该连接是否有效,无效则从新建立连接。
这样,或许能够解决长连接所产生的不足。
10 楼 liuweipeng 2007-11-29  
通过连接池的话,如果管理每个客户端呢,楼上说的心跳检查,那不是成了轮询吗?还不是变成了定时的模式,有没有demo啊?
11 楼 ken1984 2007-11-29  
JAVA有个NIO比较火,虽然里面的效率不怎么样,但总比一片空白好,等JDK7支持EPOLL时必然又会火一下了。
12 楼 techno_it 2007-12-04  
liuweipeng 写道
楼主,你好,看了你的帖子,和我要做的项目很相似,和你的思路基本一样,就是在实时将数据推送到页面的方面有些问题,我采用的dwr框架的长连接技术,但是无法长期保持连接,超过一定时间后就出现异常,请问楼主在这方面是如何解决的!
不知这位仁兄采用的是什么WEB服务?Tomcate6.0?
13 楼 techno_it 2007-12-04  
liuweipeng 写道
通过连接池的话,如果管理每个客户端呢,楼上说的心跳检查,那不是成了轮询吗?还不是变成了定时的模式,有没有demo啊?
demo是有的。<img src='/javascripts/fckeditor/editor/images/smiley/msn/regular_smile.gif' alt=''/>
14 楼 liuweipeng 2007-12-07  
techno_it ,可以把demo发一个给我吗?谢谢,liuweipeng_lwp@163.com
15 楼 tomcat4 2008-02-19  
我现在要作的是ServerSocket采用Java写,客户端的Client采用C写的,不知道通信要注意什么问题,会不会出现乱码?有经验的朋友帮忙解答一下!
16 楼 techno_it 2008-02-28  
要注意,在c段发文的时候在最后加一“\n”别的应该没什么了。
17 楼 ssuupv 2008-02-28  
如果并发不高的话,可以用我方法
18 楼 drbeckham 2008-02-29  
用MINA吧,效率很好,也支持线程池,我自己测试过在普通配置(C2.6, 512MB)下Winxp SP2的环境MINA能够最大接收上万个Socket连接,不过如果每个连接活动很频繁的话,实际能承受的连接数要打个折的
Java的NIO在Windows下实际就是对完成端口的封装
JDK6.0好像在Linux下支持EPOLL,性能应该还要好一点
不要用Servlet,Servlet有些东西不好控制的,特别是线程
19 楼 skydream 2008-02-29  
drbeckham 写道

JDK6.0好像在Linux下支持EPOLL,性能应该还要好一点


这里稍微补充一下,这个支持是从jdk5.0 update9之后开始支持,也就是说jdk5.0后期的版本也是可以从中获益的。

如果用jdk5.0,可以不必非升级到6.0,换成后面的小版本了。

7.0据说要支持真正的aio,这个是个好消息。
20 楼 fishelf 2008-03-29  
...................