memcached java上性能测试报告、分析与有关问题讨论

memcached java下性能测试报告、分析与问题讨论

我的项目原来使用静态HashMap来实现Key->Object的缓存,并且实现脏数据刷新.由于项目要改成集群部署.在单个jvm里运行的静态Hash结构已经无法处理脏数据问题.所以准备使用memcached做分布式缓存来解决.

从网上搜索到的资料来看 memcached能够接受较大量的请求.但其javaclient 由于大量使用同步语句、hashmap,读取流没有使用bufferedStream。造成性能并不好
为此在我的项目里我参考memcached的协议,自己实现一个客户端功能。经过测试后发现一些问题。

测试环境1:
windows xp home
迅驰T2250
1G内存
jdk1.5
server: memcached-1.2.1 nt版本
client: 自己编写
用于传输的pojo 40个属性,序列化后为750个byte
测试client与server同台机器

测试方法
填充测试时填充10w个pojo。创建100个任务,每任务负责向memcached存储1000个pojo。
读取测试时读取10000个pojo 创建100个任务,每任务读取100个pojo。
平均值均按照线程内取平均值后,各线程再取平均值。
在进行上述测试的时候 cpu占用均在90%-100%,根据上述测试结果,开2个线程时效率最高。

thread count write avg/per obj write totall read avg/per obj read total
10 2.404991 ms 29s 1.709544 ms 2s
5 0.704780 ms 18s 1.333013 ms 2s
2 0.262194 ms 15s 0.414683 ms 2s






测试环境2:
AIX5.2
IBM P650  Power4 1.5G(64bit) *4
内存8G
jdk1.4
memcached-1.2.1
libevnet1.1b
client自己编写
用于传输的pojo 40个属性,序列化后为750个byte
测试client与server同台机器

相同的测试方法。测试结果大跌眼睛  10线程时 读写速度为200ms/per object 比我的笔记本慢100倍。就没继续测试

测试环境3:
windows2000 server
xeon 1.5*4
内存8G
jdk1.5
测试时发现cpu占用不高于20%

thread count write avg/per obj write total read avg/per obj read total
20 10.266615ns 71s 23.21283ns 15s
10 4.341574ns 41s 13.30084ns 16s
5 1.298717ns 25s 9.33258ns 18s
2 1.298717ns 21s 4.02503ns 23s


 




初步测试到这里 发现的问题
1.是暂时没有达到网上宣传的1.5w个对象/s (目前测试cpu为瓶颈)
2.是aix下效率低的可怕,对aix不太熟悉。应该有些设置的参数,这点还要向大家请教。
3.超过2个线程效率就开始低。没有发挥多线程的优势,不知道大家在使用过程中有没有发现这一点,还是说我写的MemBufferedDriver有问题。

我的期望是读写速度均能稳定在0.1毫秒左右,至少 1w obj/s 这样才有可能不影响到现有系统的效率
另外大家在java下使用memcached的时候效率怎么样都是怎么用的呢

下面贴下我的client的实现
memcached的协议可以参考memcache包的doc/protocol.txt

主要是socket初次打开后不关闭,直接存放到ThreadLocal中,与memcached保持一个长练接。每次使用的时候判断连接可用还是需要重新连接。

其他存储和读取实现memcached协议。使用BufferedStream。
序列化采用实现Serializable,直接使用ObjectStream来实现。(由于都是pojo简单数据对象,尝试过实现Externalizable接口自己实现序列化和使用750个byte的String 来序列化,发现性能相差不多故放弃)

java 代码
MemBufferedDriver 为client实现
  1. public class MemBufferedDriver {   
  2.     /**  
  3.      * 存放 连到cacheserver的socket连接  
  4.      */  
  5.     private final static ThreadLocal sockPool = new ThreadLocal();   
  6.     private static String serverAddress = "localhost:11211";   
  7.   
  8.     public static final byte[] BYTE_GET = new byte[]{10310111632};   
  9.     public static final byte[] BYTE_SET = new byte[]{11510111632};   
  10.     public static final byte[] BYTE_DELETE = new byte[]{10010110810111610132};   
  11.     public static final byte[] BYTE_CRLF = new byte[]{1310};   
  12.     public static final byte[] BYTE_SPACE = new byte[]{32};   
  13.   
  14.     public static final String SERVER_STATUS_DELETED = "DELETED";   
  15.     public static final String SERVER_STATUS_NOT_FOUND = "NOT_FOUND";   
  16.     public static final String SERVER_STATUS_STORED = "STORED";   
  17.     public static final String SERVER_STATUS_ERROR = "ERROR";   
  18.     public static final String SERVER_STATUS_END = "END";   
  19.     public static final String SERVER_STATUS_VALUE = "VALUE";   
  20.   
  21.     public static final String ENCODING_TYPE = "UTF-8";   
  22.   
  23.   
  24.     public static Socket getSocket() throws UnknownHostException, IOException {   
  25.         Socket s = (Socket) MemBufferedDriver.sockPool.get();   
  26.         if (s == null || s.isClosed()) {   
  27.             s = MemBufferedDriver.reconnect();   
  28.             MemBufferedDriver.sockPool.set(s);   
  29.         }   
  30.         return s;   
  31.     }   
  32.   
  33.     private static Socket reconnect() throws UnknownHostException, IOException {   
  34.         String[] ip = MemBufferedDriver.serverAddress.split(":");   
  35.         return new Socket(ip[0], Integer.parseInt(ip[1]));   
  36.   
  37.     }   
  38.   
  39.   
  40.     public Map getMulti(String[] keys) {   
  41.         Map map = new HashMap();   
  42.         if (keys == null || keys.length <= 0return map;   
  43.   
  44.         for (int i = 0; i < keys.length; i++) {   
  45.             Object o = get(keys[i]);   
  46.             if (o != null) map.put(keys[i], o);   
  47.   
  48.         }   
  49.         return map;   
  50.     }   
  51.   
  52.     public Object[] getMultiArray(String[] keys) {   
  53.         if (keys == null || keys.length <= 0return null;   
  54.   
  55.         Object[] o = new Object[keys.length];   
  56.         for (int i = 0; i < keys.length; i++)   
  57.             o[i] = get(keys[i]);   
  58.   
  59.         return o;   
  60.     }   
  61.   
  62.     public boolean set(String key, Object obj) {   
  63.         try {   
  64.             if (obj == null || key == null || "".equals(key)) throw new Exception("对象和key 不能为空");   
  65.             Socket s = MemBufferedDriver.getSocket();   
  66.             BufferedInputStream in = new BufferedInputStream(s.getInputStream());   
  67.             BufferedOutputStream out = new BufferedOutputStream(s.getOutputStream());   
  68.   
  69.             key = encodeKey(key);   
  70.             int flag = 0;   
  71.   
  72.             //序列化对象   
  73.             byte[] bs = object2Byte(obj);   
  74.   
  75.             out.write(MemBufferedDriver.BYTE_SET);           //write cmd   
  76.             out.write(key.getBytes());     //write key   
  77.             out.write(MemBufferedDriver.BYTE_SPACE);   
  78.             out.write(String.valueOf(flag).getBytes());     //write flag   
  79.             out.write(MemBufferedDriver.BYTE_SPACE);   
  80.             out.write("0".getBytes());     //write expire date   
  81.             out.write(MemBufferedDriver.BYTE_SPACE);   
  82.             out.write(String.valueOf(bs.length).getBytes());     //object length   
  83.             out.write(MemBufferedDriver.BYTE_CRLF);   
  84.   
  85.             out.write(bs);   
  86.             out.write(MemBufferedDriver.BYTE_CRLF);   
  87.             out.flush();   
  88.   
  89.             String ret = readLine(in);   
  90.             return MemBufferedDriver.SERVER_STATUS_STORED.equals(ret);   
  91.         } catch (Exception e) {   
  92.             System.out.println(e.getMessage());   
  93.             return false;   
  94.         }   
  95.     }   
  96.   
  97.     public Object get(String key) {   
  98.         try {   
  99.             Socket s = MemBufferedDriver.getSocket();   
  100.             InputStream in = s.getInputStream();   
  101.             OutputStream out = s.getOutputStream();   
  102.             key = encodeKey(key);   
  103.             out.write(MemBufferedDriver.BYTE_GET);   
  104.             out.write(key.getBytes());   
  105.             out.write(MemBufferedDriver.BYTE_CRLF);   
  106.             out.flush();   
  107.   
  108.             return getObjectFromStream(in, out);   
  109.         } catch (Exception e) {   
  110.             System.out.println(e.getMessage());   
  111.             return null;   
  112.         }   
  113.     }   
  114.   
  115.     public boolean delete(String key) {   
  116.         try {   
  117.             Socket s = MemBufferedDriver.getSocket();   
  118.             InputStream in = s.getInputStream();   
  119.             OutputStream out = s.getOutputStream();   
  120.             key = encodeKey(key);   
  121.             out.write(MemBufferedDriver.BYTE_DELETE);   
  122.             out.write(key.getBytes());   
  123.             out.write(MemBufferedDriver.BYTE_CRLF);   
  124.             out.flush();   
  125.   
  126.             String ret = readLine(in);   
  127.             return MemBufferedDriver.SERVER_STATUS_DELETED.equals(ret) || MemBufferedDriver.SERVER_STATUS_NOT_FOUND.equals(ret);   
  128.         } catch (Exception e) {   
  129.             return false;   
  130.         }   
  131.     }   
  132.   
  133.     private Object getObjectFromStream(InputStream in, OutputStream out) throws IOException, ClassNotFoundException {   
  134.         String cmd = readLine(in);   
  135.         if (cmd.startsWith(MemBufferedDriver.SERVER_STATUS_VALUE)) {   
  136.             //return object   
  137.             String[] part = cmd.split(" ");   
  138.             String para = part[2];   
  139.             int length = Integer.parseInt(part[3]);   
  140.   
  141.             byte[] bs = new byte[length];   
  142.   
  143.             int count = 0;   
  144.             while (count < bs.length) count += in.read(bs, count, (bs.length - count));   
  145.             if (count != bs.length)   
  146.                 throw new IOException("读取数据长度错误");   
  147.             readLine(in);   
  148.             String endstr = readLine(in);   
  149.             if (MemBufferedDriver.SERVER_STATUS_END.equals(endstr))   
  150.                 return this.byte2Object(bs);   
  151.             else  
  152.                 System.out.println("结束标记错误");   
  153.   
  154.         }   
  155.         return null;   
  156.     }   
  157.   
  158.     private String encodeKey(String key) throws UnsupportedEncodingException {   
  159.         return URLEncoder.encode(key, MemBufferedDriver.ENCODING_TYPE);   
  160.     }   
  161.   
  162.   
  163.     private String readLine(InputStream in) throws IOException {   
  164.         ByteArrayOutputStream bos = new ByteArrayOutputStream();   
  165.         boolean eol = false;   
  166.         byte[] b = new byte[1];   
  167.         while (in.read(b, 01) != -1) {   
  168.             if (b[0] == 13) eol = true;   
  169.             else if (eol && b[0] == 10break;   
  170.             else  
  171.                 eol = false;   
  172.   
  173.             bos.write(b, 01);   
  174.         }   
  175.   
  176.   
  177.         if (bos.size() == 0return null;   
  178.         return bos.toString().trim();   
  179.     }   
  180.   
  181.   
  182.     private byte[] object2Byte(Object o) throws IOException {   
  183.         ByteArrayOutputStream b = new ByteArrayOutputStream();   
  184.         new ObjectOutputStream(b).writeObject(o);   
  185.         return b.toByteArray();   
  186.     }   
  187.   
  188.     private Object byte2Object(byte[] b) throws IOException, ClassNotFoundException {   
  189.         return new ObjectInputStream(new ByteArrayInputStream(b)).readObject();   
  190.     }   
  191.   
  192.   
  193.     public static void main(String[] args) throws Exception {   
  194.         MemBufferedDriver m = new MemBufferedDriver();   
  195.         System.out.println(m.set("a""DsSD"));   
  196.         System.out.println(m.get("a"));   
  197.     }   
  198.   
  199.     public static void setServerAddress(String serverAddress) {   
  200.         MemBufferedDriver.serverAddress = serverAddress;   
  201.     }   
  202. }  

 
java 代码
写入测试类

  1. public class Fill2Server extends Thread {   
  2.     public static int THREAD_COUNT = 2;   
  3.     public static Queue queue = new Queue();   
  4.     MemBufferedDriver md = new MemBufferedDriver();   
  5.   
  6.     public static void main(String[] args) throws Exception {   
  7.   
  8.         int size ;   
  9.         if (args.length == 3 && args[0] != null && args[1] != null) {   
  10.             MemDriver.setServerAddress(args[0]);   
  11.             size = Integer.parseInt(args[1]);   
  12.             THREAD_COUNT = Integer.parseInt(args[2]);   
  13.             new Fill2Server().doFill(size);   
  14.         } else  
  15.             System.out.println("参数1 连接服务器地址 ipaddress:port ,参数2填充数量,不能小于10000,参数3为使用的线程数");   
  16.   
  17.     }   
  18.   
  19.     private void doFill(int size) throws InterruptedException {   
  20.         int taskCount = size / 1000;   //每个线程负责填充1000的对象   
  21.         for (int i = 0; i < taskCount; i++) {   
  22.             Task t = new Task();   
  23.             t.setTaskId(String.valueOf(i));   
  24.             queue.add(t);   
  25.         }   
  26.   
  27.         long time = System.currentTimeMillis();   
  28.         Thread tr[] = new Thread[THREAD_COUNT];   
  29.         for (int i = 0; i < THREAD_COUNT; i++) {   
  30.             FillThread ft = new FillThread();   
  31.             (tr[i] = new Thread(ft)).start();   
  32.         }   
  33.   
  34.         //监控填充完成   
  35.         while (true) {   
  36.             boolean flag = true;   
  37.             for (int i = 0; i < THREAD_COUNT; i++)   
  38.                 flag &= tr[i].isAlive();   
  39.   
  40.             if (!flag) break;   
  41.   
  42.             Thread.sleep(1000);   
  43.         }   
  44.   
  45.         time = System.currentTimeMillis() - time;   
  46.         System.out.println("任务完成,共用" + (time / 1000) + "s");   
  47.     }   
  48.   
  49.     class FillThread implements Runnable {   
  50.         public void run() {   
  51.             Task task;   
  52.             while (true) {   
  53.                 task = (Task) queue.get();   
  54.                 if (task == nullbreak;   
  55.                 long time = System.nanoTime();   
  56.                 for (int i = 0; i < 1000; i++) {   
  57.                     TestBO b = new TestBO();   
  58.                     md.set(task.getTaskId() + i, b);   
  59.                 }   
  60.                 time = System.nanoTime() - time;   
  61.                 System.out.println(Thread.currentThread().getName() + " avg " + (time / 1000) + " ns ");   
  62.             }   
  63.         }   
  64.     }   
  65. }  

 

java 代码
读取的测试方法
  1. public class GetFromServer extends Thread {   
  2.     public static int THREAD_COUNT = 2;   
  3.     public static Queue queue = new Queue();   
  4.     MemDriver md = new MemDriver();   
  5.   
  6.     public static void main(String[] args) throws Exception {   
  7.   
  8.         int size;   
  9.         if (args.length == 3 && args[0] != null && args[1] != null) {   
  10.             MemDriver.setServerAddress(args[0]);   
  11.             size = Integer.parseInt(args[1]);   
  12.             THREAD_COUNT = Integer.parseInt(args[2]);   
  13.             new GetFromServer().doFill(size);   
  14.         } else  
  15.             System.out.println("参数1 连接服务器地址 ipaddress:port ,参数2读取数量不能小于1000,参数3为使用的线程数");   
  16.   
  17.     }   
  18.   
  19.     private void doFill(int size) throws InterruptedException {   
  20.         int taskCount = size / 100;   //每个线程负责填充1000的对象   
  21.         for (int i = 0; i < taskCount; i++) {   
  22.             Task t = new Task();   
  23.             t.setTaskId(String.valueOf(i));   
  24.             GetFromServer.queue.add(t);   
  25.         }   
  26.   
  27.         long time = System.currentTimeMillis();   
  28.         Thread tr[] = new Thread[GetFromServer.THREAD_COUNT];   
  29.         for (int i = 0; i < GetFromServer.THREAD_COUNT; i++) {   
  30.             GetFromServer.FillThread ft = new GetFromServer.FillThread();   
  31.             (tr[i] = new Thread(ft)).start();   
  32.         }   
  33.   
  34.         //监控填充完成   
  35.         while (true) {   
  36.             boolean flag = true;   
  37.             for (int i = 0; i < GetFromServer.THREAD_COUNT; i++)   
  38.                 flag &= tr[i].isAlive();   
  39.   
  40.             if (!flag) break;   
  41.   
  42.             Thread.sleep(1000);   
  43.         }   
  44.   
  45.   
  46.         time = System.currentTimeMillis() - time;   
  47.         System.out.println("任务完成,共用" + (time / 1000) + "s");   
  48.     }   
  49.   
  50.   
  51.     class FillThread implements 
13 楼 ftv_2001 2007-03-21  
robbin 写道
自己用cookie id +MemCached的解决方案取代HttpSession
那就说在Session Scope 里面只保存了用户信息?如果是这样,那么对MemCached的利用率好像低了点。
我怎么觉得如果能够重写HttpSession或者相关的实现,第一,可以充分提高性能,第二,遗留的一些应用可以不加修改的重新部署。
14 楼 robbin 2007-03-21  
ftv_2001 写道
robbin 写道
自己用cookie id +MemCached的解决方案取代HttpSession
那就说在Session Scope 里面只保存了用户信息?如果是这样,那么对MemCached的利用率好像低了点。
我怎么觉得如果能够重写HttpSession或者相关的实现,第一,可以充分提高性能,第二,遗留的一些应用可以不加修改的重新部署。


我说过了,用cookie id +cache,根本就不用session,哪有session scope?

不同的应用服务器的HttpSession的实现都不一样,我不反对你这样去做,但是这样做一来代码丧失了应用服务器兼容性,二来某些应用服务器的HttpSession实现也不是你想像的那么好写的,你不信的话可以自己试试看。而我的方案只不过几行代码而已。
15 楼 ohmyzi 2007-03-22  
codeutil 写道

昨天听同学说他的memecache用的是12G内存.存储大约1000万个键值对,查询速度很快.



浪费啊  千万键值也用不了2G的内存阿  难道他起了六个?
16 楼 garnoopy 2007-03-22  
robbin 写道
ftv_2001 写道
robbin 写道
自己用cookie id +MemCached的解决方案取代HttpSession
那就说在Session Scope 里面只保存了用户信息?如果是这样,那么对MemCached的利用率好像低了点。
我怎么觉得如果能够重写HttpSession或者相关的实现,第一,可以充分提高性能,第二,遗留的一些应用可以不加修改的重新部署。


我说过了,用cookie id +cache,根本就不用session,哪有session scope?

不同的应用服务器的HttpSession的实现都不一样,我不反对你这样去做,但是这样做一来代码丧失了应用服务器兼容性,二来某些应用服务器的HttpSession实现也不是你想像的那么好写的,你不信的话可以自己试试看。而我的方案只不过几行代码而已。


请问使用cookie id,有没有遇到安全问题?有没有必要给cookie id加密、解密?
17 楼 youcai 2007-03-22  
garnoopy 写道

请问使用cookie id,有没有遇到安全问题?有没有必要给cookie id加密、解密?

安全性等同于session,因为session使用的也是cookie id作为标识的。
18 楼 xiaoheng 2007-03-22  
socket.setTcpNoDelay(true);


这样性能应该可以提高。
19 楼 ftv_2001 2007-03-23  
robbin 写道
ftv_2001 写道
robbin 写道
自己用cookie id +MemCached的解决方案取代HttpSession
那就说在Session Scope 里面只保存了用户信息?如果是这样,那么对MemCached的利用率好像低了点。
我怎么觉得如果能够重写HttpSession或者相关的实现,第一,可以充分提高性能,第二,遗留的一些应用可以不加修改的重新部署。


我说过了,用cookie id +cache,根本就不用session,哪有session scope?

不同的应用服务器的HttpSession的实现都不一样,我不反对你这样去做,但是这样做一来代码丧失了应用服务器兼容性,二来某些应用服务器的HttpSession实现也不是你想像的那么好写的,你不信的话可以自己试试看。而我的方案只不过几行代码而已。

不能说你没有session Scope吧,只能说你的session不是java的session,而是更换成为了memCached内缓存,如果你有用户logout了,或者过期了,你是不是也要invalidate一下(做delete)?
我相信robbin用memcached的目的,无非就是为了解决集群时java session 复制性能低下问题。那么用memCached也就是提供一个集中式的缓存。换句话说,就是一个集中的session控制器。
当然,如果你拿来做Application 级别的cache,那就令当别论了。
另外,我觉得session这个概念的存在,是有非常的价值,你完全把session抛弃了,只能说某些应用比较合适,在某些应用下就很棘手,现在有了一个集中式的session控制器做为替代,所以我觉得怎么样无逢替代(兼容性,性能),是很有价值的。
20 楼 hiver 2007-03-23  
ls,用户信息也不一定要存在session吧,完全可以map.put(cookieId, user);
21 楼 diogin 2007-03-23  
session无非服务端一持久数据而已,通过sessionid这个cookie来做标识。通过memcache实现session有很多好处,除速度外,它还可以自动清除expired掉的记录。服务器集群时,本地session根本没戏,这时候完全可以把session搞到memcached里去,用sessionid当键名。如果还同时使用了cache,最好把缓存和session的数据分开成两个memcached daemon,否则cache一清扫,session数据也全没了。
22 楼 alin_ass 2007-03-27  
不是很喜欢memcache java client这种搞个连接池,每次发送的时候拿到一个连接,发送后阻塞在那里等回应,一般remote cache如果每次请求带个id的话,是可以在client端用一个链接做异步的,也就是request pool而不是connection pool
23 楼 firebody 2007-03-28  
robbin 写道
JavaEye现在也用到了memcache,用来做对象缓存和少量查询缓存,负载很低很低,CPU一般只有0.1%。

我使用memcache的java client来取代Java自己的HttpSession机制,然后对一个实际的web项目使用loadrunner进行压力测试,模拟20个用户,50个用户同时使用。压力测试结果表明,使用memcache取代httpsession几乎没有性能损失。(memcache java client要开socket pool)

原来使用httpSession的话,在这个压力下性能如何?
24 楼 robbin 2007-03-28  
firebody 写道
robbin 写道
JavaEye现在也用到了memcache,用来做对象缓存和少量查询缓存,负载很低很低,CPU一般只有0.1%。

我使用memcache的java client来取代Java自己的HttpSession机制,然后对一个实际的web项目使用loadrunner进行压力测试,模拟20个用户,50个用户同时使用。压力测试结果表明,使用memcache取代httpsession几乎没有性能损失。(memcache java client要开socket pool)

原来使用httpSession的话,在这个压力下性能如何?


性能几乎是一样的。
25 楼 alin_ass 2007-03-28  
这个帖子里的朋友有兴趣去写一个request pool而不是connection pool的mem client么?

或者谁能证明n个链接发数据和1个链接在数据吞吐量上有什么差别, connection pool只是为了编程上方便的做法
26 楼 xieke 2007-03-30  
memcache 对比其他一些 缓存系统如 JCS ,OSCache,有什么优势呢?
好像都是支持集群的吧?
27 楼 grantbb 2007-04-19  
xieke 写道
memcache 对比其他一些 缓存系统如 JCS ,OSCache,有什么优势呢?
好像都是支持集群的吧?


上面已经说了,memcache可以作为一种集中式的cache,而OSCache可以支持集群,但是集群中服务器多,数据更新比较频繁的情况下,集群见的数据广播也是很恐怖的意见事情。
28 楼 codeutil 2007-04-20  

qq新做的网站:http://city.qzone.qq.com ( 绝非作广告)

上面的搜索结果就是用的6台服务器的memcached来做的.
将搜索条件拼成key,然后去命中缓存.
每次查询的时间都在0.0x秒以下.



29 楼 zzeric 2007-04-22  
用memcached做集中session处理,是否容易造成单点故障?用双机解决?
30 楼 iunknown 2007-05-17  
alin_ass 写道
不是很喜欢memcache java client这种搞个连接池,每次发送的时候拿到一个连接,发送后阻塞在那里等回应,一般remote cache如果每次请求带个id的话,是可以在client端用一个链接做异步的,也就是request pool而不是connection pool


connection pool 的确是方便了编程,有点像服务器端的 OneThreadPerConnection 的编程风格。
如果转为 request pool ,就相当于是 event-driven 的编程风格了。这种 event-driven 风格的 api 对于使用者来说会带来比较大的困难,会增加客户端的编程难度。如果这个客户端是一个需求改变比较频繁,代码修改比较频繁的模块,那么这些增加的编程难度将带来很大的障碍。
另外对于 http 这种 request/response 方式的应用来说,在服务器端处理请求的时候,即使使用 request pool ,用异步方式来访问 memcached ,还是一样要在处理请求的 action 里等待这个异步操作的完成,相当于还是同步阻塞的,和 connection pool 没很大区别。如果客户端是一个批处理模式的应用,那么使用异步处理方式会快很多。

引用
这个帖子里的朋友有兴趣去写一个request pool而不是connection pool的mem client么?

或者谁能证明n个链接发数据和1个链接在数据吞吐量上有什么差别, connection pool只是为了编程上方便的做法


要实现 request pool 的 memclient,需要 memcached 服务器的支持。目前的 memcached 的协议并不支持。
因为要实现 request pool 的方式,这个 RequestID 是服务器必须要知道。memcached 的协议并不支持传递 RequetID 或者产生 RequestID。
服务器必须要知道 RequestID,是因为服务器必须要在 Response 中加上这个 RequestID ,这样客户端才能把 response 和 request 对应起来。RequestID 可以由客户端生成,传递给服务器,也可以由服务器在收到请求之后,产生一个 RequestID ,客户端每次发送请求之后,同步读这个 RequestID 。通常如果请求的处理时间比较长,可以考虑由服务器来产生,否则读取服务器产生的 RequestID 的消耗可能已经超过读取本来的 Response 了。
31 楼 MiMiEye 2007-05-31  
<br/>
<strong>timelyRain 写道:</strong><br/>
<div class='quote_div'>
<p><font>我的项目原来使用静态HashMap来实现Key-&gt;Object的缓存,并且实现脏数据刷新.由于项目要改成集群部署.在单个jvm里运行的静态Hash结构已经无法处理脏数据问题.所以准备使用memcached做分布式缓存来解决.</font></p>
<p><font>从网上搜索到的资料来看 memcached能够接受较大量的请求.但其javaclient 由于大量使用同步语句、hashmap,读取流没有使用bufferedStream。造成性能并不好<br/>
为此在我的项目里我参考memcached的协议,自己实现一个客户端功能。经过测试后发现一些问题。</font></p>
<p><font><strong>测试环境1:</strong><br/>
windows xp home <br/>
迅驰T2250<br/>
1G内存<br/>
jdk1.5<br/>
server: memcached-1.2.1 nt版本<br/>
client: 自己编写<br/>
用于传输的pojo 40个属性,序列化后为750个byte<br/>
测试client与server同台机器</font></p>
<p><font><strong>测试方法</strong><br/>
填充测试时填充10w个pojo。创建100个任务,每任务负责向memcached存储1000个pojo。<br/>
读取测试时读取10000个pojo 创建100个任务,每任务读取100个pojo。<br/>
平均值均按照线程内取平均值后,各线程再取平均值。<br/>
<font>在进行上述测试的时候 cpu占用均在90%-100%,根据上述测试结果,开2个线程时效率最高。</font></font></p>
<p><font>
</font></p><table cellspacing='1' border='1' align='left' summary='' style='width: 520px; height: 107px;' cellpadding='1' width='520'>
    <tbody>
        <tr>
            <td>thread count</td>
            <td>write avg/per obj</td>
            <td>write totall</td>
            <td>read avg/per obj</td>
            <td>read total</td>
        </tr>
        <tr>
            <td>10</td>
            <td><font><font>2.404991 ms</font></font></td>
            <td>29s</td>
            <td><font><font>1.709544 ms</font></font></td>
            <td>2s</td>
        </tr>
        <tr>
            <td>5</td>
            <td><font><font>0.704780 ms</font></font></td>
            <td>18s</td>
            <td><font><font>1.333013 ms</font></font></td>
            <td>2s</td>
        </tr>
        <tr>
            <td>2</td>
            <td><font><font>0.262194 m</font>s</font></td>
            <td>15s</td>
            <td><font><font>0.414683 ms</font></font></td>
            <td>2s</td>
        </tr>
    </tbody>
</table><font>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
</font><p/>
<p><font/></p>
<p><font><strong>测试环境2:</strong><br/>
AIX5.2<br/>
IBM P650  Power4 1.5G(64bit) *4<br/>
内存8G<br/>
jdk1.4<br/>
memcached-1.2.1<br/>
libevnet1.1b<br/>
client自己编写<br/>
用于传输的pojo 40个属性,序列化后为750个byte<br/>
测试client与server同台机器 </font></p>
<p><font>相同的测试方法。测试结果大跌眼睛  10线程时 读写速度为200ms/per object 比我的笔记本慢100倍。就没继续测试</font></p>
<p><font><strong>测试环境3:</strong><br/>
windows2000 server <br/>
xeon 1.5*4<br/>
内存8G<br/>
jdk1.5<br/>
测试时发现cpu占用不高于20%<br/>
</font></p><table cellspacing='1' border='1' align='left' summary='' style='width: 543px; height: 118px;' cellpadding='1' width='543'>
    <tbody>
        <tr>
            <td>thread count</td>
            <td>write avg/per obj</td>
            <td>write total</td>
            <td>read avg/per obj</td>
            <td>read total</td>
        </tr>
        <tr>
            <td>20</td>
            <td>10.266615ns</td>
            <td>71s</td>
            <td><font>23.21283ns</font></td>
            <td>15s</td>
        </tr>
        <tr>
            <td>10</td>
            <td><font>4.341574ns</font></td>
            <td>41s</td>
            <td><font>13.30084ns</font></td>
            <td>16s</td>
        </tr>
        <tr>
            <td>5</td>
            <td><font>1.298717ns</font></td>
            <td>25s</td>
            <td><font>9.33258ns</font></td>
            <td>18s</td>
        </tr>
        <tr>
            <td>2</td>
            <td><font>1.298717ns</font></td>
            <td>21s</td>
            <td><font>4.02503ns</font></td>
            <td>23s</td>
        </tr>
    </tbody>
</table><font>
<br/>
</font><p/>
<font>
<p><br/>
</p>
<p> </p>
<p><br/>
<br/>
</p>
<font>
<p><br/>
初步测试到这里 发现的问题<br/>
1.是暂时没有达到网上宣传的1.5w个对象/s (目前测试cpu为瓶颈)<br/>
2.是aix下效率低的可怕,对aix不太熟悉。应该有些设置的参数,这点还要向大家请教。<br/>
<font color='#ff00ff'>3.超过2个线程效率就开始低。没有发挥多线程的优势,不知道大家在使用过程中有没有发现这一点,还是说我写的MemBufferedDriver有问题。</font></p>
<p>我的期望是读写速度均能稳定在0.1毫秒左右,至少 1w obj/s 这样才有可能不影响到现有系统的效率<br/>
另外大家在java下使用memcached的时候效率怎么样都是怎么用的呢</p>
<p>下面贴下我的client的实现<br/>
memcached的协议可以参考memcache包的doc/protocol.txt</p>
<p>主要是socket初次打开后不关闭,直接存放到ThreadLocal中,与memcached保持一个长练接。每次使用的时候判断连接可用还是需要重新连接。<br/>
<br/>
其他存储和读取实现memcached协议。使用BufferedStream。<br/>
序列化采用实现Serializable,直接使用ObjectStream来实现。(由于都是pojo简单数据对象,尝试过实现Externalizable接口自己实现序列化和使用750个byte的String 来序列化,发现性能相差不多故放弃)<br/>
</p>
</font>
<div class='code_title'>java 代码<br/>
MemBufferedDriver 为client实现</div>
<div class='dp-highlighter'>
<div class='bar'/>
<ol class='dp-j'>
    <li class='alt'><span><span class='keyword'>public</span><span> </span><span class='keyword'>class</span><span> MemBufferedDriver {   </span></span> </li>
    <li class=''><span>    </span><span class='comment'>/** </span>  </li>
    <li class='alt'><span><span class='comment'>     * 存放 连到cacheserver的socket连接 </span> </span> </li>
    <li class=''><span><span class='comment'>     */</span><span>  </span></span> </li>
    <li class='alt'><span>    </span><span class='keyword'>private</span><span> </span><span class='keyword'>final</span><span> </span><span class='keyword'>static</span><span> ThreadLocal sockPool = </span><span class='keyword'>new</span><span> ThreadLocal();   </span> </li>
    <li class=''><span>    </span><span class='keyword'>private</span><span> </span><span class='keyword'>static</span><span> String serverAddress = </span><span class='string'>"localhost:11211"</span><span>;   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>final</span><span> </span><span class='keyword'>byte</span><span>[] BYTE_GET = </span><span class='keyword'>new</span><span> </span><span class='keyword'>byte</span><span>[]{</span><span class='number'>103</span><span>, </span><span class='number'>101</span><span>, </span><span class='number'>116</span><span>, </span><span class='number'>32</span><span>};   </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>final</span><span> </span><span class='keyword'>byte</span><span>[] BYTE_SET = </span><span class='keyword'>new</span><span> </span><span class='keyword'>byte</span><span>[]{</span><span class='number'>115</span><span>, </span><span class='number'>101</span><span>, </span><span class='number'>116</span><span>, </span><span class='number'>32</span><span>};   </span> </li>
    <li class=''><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>final</span><span> </span><span class='keyword'>byte</span><span>[] BYTE_DELETE = </span><span class='keyword'>new</span><span> </span><span class='keyword'>byte</span><span>[]{</span><span class='number'>100</span><span>, </span><span class='number'>101</span><span>, </span><span class='number'>108</span><span>, </span><span class='number'>101</span><span>, </span><span class='number'>116</span><span>, </span><span class='number'>101</span><span>, </span><span class='number'>32</span><span>};   </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>final</span><span> </span><span class='keyword'>byte</span><span>[] BYTE_CRLF = </span><span class='keyword'>new</span><span> </span><span class='keyword'>byte</span><span>[]{</span><span class='number'>13</span><span>, </span><span class='number'>10</span><span>};   </span> </li>
    <li class=''><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>final</span><span> </span><span class='keyword'>byte</span><span>[] BYTE_SPACE = </span><span class='keyword'>new</span><span> </span><span class='keyword'>byte</span><span>[]{</span><span class='number'>32</span><span>};   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>final</span><span> String SERVER_STATUS_DELETED = </span><span class='string'>"DELETED"</span><span>;   </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>final</span><span> String SERVER_STATUS_NOT_FOUND = </span><span class='string'>"NOT_FOUND"</span><span>;   </span> </li>
    <li class=''><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>final</span><span> String SERVER_STATUS_STORED = </span><span class='string'>"STORED"</span><span>;   </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>final</span><span> String SERVER_STATUS_ERROR = </span><span class='string'>"ERROR"</span><span>;   </span> </li>
    <li class=''><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>final</span><span> String SERVER_STATUS_END = </span><span class='string'>"END"</span><span>;   </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>final</span><span> String SERVER_STATUS_VALUE = </span><span class='string'>"VALUE"</span><span>;   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>final</span><span> String ENCODING_TYPE = </span><span class='string'>"UTF-8"</span><span>;   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> Socket getSocket() </span><span class='keyword'>throws</span><span> UnknownHostException, IOException {   </span> </li>
    <li class='alt'><span>        Socket s = (Socket) MemBufferedDriver.sockPool.get();   </span> </li>
    <li class=''><span>        </span><span class='keyword'>if</span><span> (s == </span><span class='keyword'>null</span><span> || s.isClosed()) {   </span> </li>
    <li class='alt'><span>            s = MemBufferedDriver.reconnect();   </span> </li>
    <li class=''><span>            MemBufferedDriver.sockPool.set(s);   </span> </li>
    <li class='alt'><span>        }   </span> </li>
    <li class=''><span>        </span><span class='keyword'>return</span><span> s;   </span> </li>
    <li class='alt'><span>    }   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>private</span><span> </span><span class='keyword'>static</span><span> Socket reconnect() </span><span class='keyword'>throws</span><span> UnknownHostException, IOException {   </span> </li>
    <li class=''><span>        String[] ip = MemBufferedDriver.serverAddress.split(</span><span class='string'>":"</span><span>);   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>return</span><span> </span><span class='keyword'>new</span><span> Socket(ip[</span><span class='number'>0</span><span>], Integer.parseInt(ip[</span><span class='number'>1</span><span>]));   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>    }   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>    </span><span class='keyword'>public</span><span> Map getMulti(String[] keys) {   </span> </li>
    <li class='alt'><span>        Map map = </span><span class='keyword'>new</span><span> HashMap();   </span> </li>
    <li class=''><span>        </span><span class='keyword'>if</span><span> (keys == </span><span class='keyword'>null</span><span> || keys.length &lt;= </span><span class='number'>0</span><span>) </span><span class='keyword'>return</span><span> map;   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>        </span><span class='keyword'>for</span><span> (</span><span class='keyword'>int</span><span> i = </span><span class='number'>0</span><span>; i &lt; keys.length; i++) {   </span> </li>
    <li class='alt'><span>            Object o = get(keys[i]);   </span> </li>
    <li class=''><span>            </span><span class='keyword'>if</span><span> (o != </span><span class='keyword'>null</span><span>) map.put(keys[i], o);   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>        }   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>return</span><span> map;   </span> </li>
    <li class=''><span>    }   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>    </span><span class='keyword'>public</span><span> Object[] getMultiArray(String[] keys) {   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>if</span><span> (keys == </span><span class='keyword'>null</span><span> || keys.length &lt;= </span><span class='number'>0</span><span>) </span><span class='keyword'>return</span><span> </span><span class='keyword'>null</span><span>;   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>        Object[] o = </span><span class='keyword'>new</span><span> Object[keys.length];   </span> </li>
    <li class=''><span>        </span><span class='keyword'>for</span><span> (</span><span class='keyword'>int</span><span> i = </span><span class='number'>0</span><span>; i &lt; keys.length; i++)   </span> </li>
    <li class='alt'><span>            o[i] = get(keys[i]);   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>return</span><span> o;   </span> </li>
    <li class=''><span>    }   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>boolean</span><span> set(String key, Object obj) {   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>try</span><span> {   </span> </li>
    <li class=''><span>            </span><span class='keyword'>if</span><span> (obj == </span><span class='keyword'>null</span><span> || key == </span><span class='keyword'>null</span><span> || </span><span class='string'>""</span><span>.equals(key)) </span><span class='keyword'>throw</span><span> </span><span class='keyword'>new</span><span> Exception(</span><span class='string'>"对象和key 不能为空"</span><span>);   </span> </li>
    <li class='alt'><span>            Socket s = MemBufferedDriver.getSocket();   </span> </li>
    <li class=''><span>            BufferedInputStream in = </span><span class='keyword'>new</span><span> BufferedInputStream(s.getInputStream());   </span> </li>
    <li class='alt'><span>            BufferedOutputStream out = </span><span class='keyword'>new</span><span> BufferedOutputStream(s.getOutputStream());   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>            key = encodeKey(key);   </span> </li>
    <li class=''><span>            </span><span class='keyword'>int</span><span> flag = </span><span class='number'>0</span><span>;   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>            </span><span class='comment'>//序列化对象 </span><span>  </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>byte</span><span>[] bs = object2Byte(obj);   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>            out.write(MemBufferedDriver.BYTE_SET);           </span><span class='comment'>//write cmd </span><span>  </span> </li>
    <li class=''><span>            out.write(key.getBytes());     </span><span class='comment'>//write key </span><span>  </span> </li>
    <li class='alt'><span>            out.write(MemBufferedDriver.BYTE_SPACE);   </span> </li>
    <li class=''><span>            out.write(String.valueOf(flag).getBytes());     </span><span class='comment'>//write flag </span><span>  </span> </li>
    <li class='alt'><span>            out.write(MemBufferedDriver.BYTE_SPACE);   </span> </li>
    <li class=''><span>            out.write(</span><span class='string'>"0"</span><span>.getBytes());     </span><span class='comment'>//write expire date </span><span>  </span> </li>
    <li class='alt'><span>            out.write(MemBufferedDriver.BYTE_SPACE);   </span> </li>
    <li class=''><span>            out.write(String.valueOf(bs.length).getBytes());     </span><span class='comment'>//object length </span><span>  </span> </li>
    <li class='alt'><span>            out.write(MemBufferedDriver.BYTE_CRLF);   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>            out.write(bs);   </span> </li>
    <li class=''><span>            out.write(MemBufferedDriver.BYTE_CRLF);   </span> </li>
    <li class='alt'><span>            out.flush();   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>            String ret = readLine(in);   </span> </li>
    <li class=''><span>            </span><span class='keyword'>return</span><span> MemBufferedDriver.SERVER_STATUS_STORED.equals(ret);   </span> </li>
    <li class='alt'><span>        } </span><span class='keyword'>catch</span><span> (Exception e) {   </span> </li>
    <li class=''><span>            System.out.println(e.getMessage());   </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>return</span><span> </span><span class='keyword'>false</span><span>;   </span> </li>
    <li class=''><span>        }   </span> </li>
    <li class='alt'><span>    }   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>public</span><span> Object get(String key) {   </span> </li>
    <li class=''><span>        </span><span class='keyword'>try</span><span> {   </span> </li>
    <li class='alt'><span>            Socket s = MemBufferedDriver.getSocket();   </span> </li>
    <li class=''><span>            InputStream in = s.getInputStream();   </span> </li>
    <li class='alt'><span>            OutputStream out = s.getOutputStream();   </span> </li>
    <li class=''><span>            key = encodeKey(key);   </span> </li>
    <li class='alt'><span>            out.write(MemBufferedDriver.BYTE_GET);   </span> </li>
    <li class=''><span>            out.write(key.getBytes());   </span> </li>
    <li class='alt'><span>            out.write(MemBufferedDriver.BYTE_CRLF);   </span> </li>
    <li class=''><span>            out.flush();   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>            </span><span class='keyword'>return</span><span> getObjectFromStream(in, out);   </span> </li>
    <li class='alt'><span>        } </span><span class='keyword'>catch</span><span> (Exception e) {   </span> </li>
    <li class=''><span>            System.out.println(e.getMessage());   </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>return</span><span> </span><span class='keyword'>null</span><span>;   </span> </li>
    <li class=''><span>        }   </span> </li>
    <li class='alt'><span>    }   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>boolean</span><span> delete(String key) {   </span> </li>
    <li class=''><span>        </span><span class='keyword'>try</span><span> {   </span> </li>
    <li class='alt'><span>            Socket s = MemBufferedDriver.getSocket();   </span> </li>
    <li class=''><span>            InputStream in = s.getInputStream();   </span> </li>
    <li class='alt'><span>            OutputStream out = s.getOutputStream();   </span> </li>
    <li class=''><span>            key = encodeKey(key);   </span> </li>
    <li class='alt'><span>            out.write(MemBufferedDriver.BYTE_DELETE);   </span> </li>
    <li class=''><span>            out.write(key.getBytes());   </span> </li>
    <li class='alt'><span>            out.write(MemBufferedDriver.BYTE_CRLF);   </span> </li>
    <li class=''><span>            out.flush();   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>            String ret = readLine(in);   </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>return</span><span> MemBufferedDriver.SERVER_STATUS_DELETED.equals(ret) || MemBufferedDriver.SERVER_STATUS_NOT_FOUND.equals(ret);   </span> </li>
    <li class=''><span>        } </span><span class='keyword'>catch</span><span> (Exception e) {   </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>return</span><span> </span><span class='keyword'>false</span><span>;   </span> </li>
    <li class=''><span>        }   </span> </li>
    <li class='alt'><span>    }   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>private</span><span> Object getObjectFromStream(InputStream in, OutputStream out) </span><span class='keyword'>throws</span><span> IOException, ClassNotFoundException {   </span> </li>
    <li class=''><span>        String cmd = readLine(in);   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>if</span><span> (cmd.startsWith(MemBufferedDriver.SERVER_STATUS_VALUE)) {   </span> </li>
    <li class=''><span>            </span><span class='comment'>//return object </span><span>  </span> </li>
    <li class='alt'><span>            String[] part = cmd.split(</span><span class='string'>" "</span><span>);   </span> </li>
    <li class=''><span>            String para = part[</span><span class='number'>2</span><span>];   </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>int</span><span> length = Integer.parseInt(part[</span><span class='number'>3</span><span>]);   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>byte</span><span>[] bs = </span><span class='keyword'>new</span><span> </span><span class='keyword'>byte</span><span>[length];   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>int</span><span> count = </span><span class='number'>0</span><span>;   </span> </li>
    <li class=''><span>            </span><span class='keyword'>while</span><span> (count &lt; bs.length) count += in.read(bs, count, (bs.length - count));   </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>if</span><span> (count != bs.length)   </span> </li>
    <li class=''><span>                </span><span class='keyword'>throw</span><span> </span><span class='keyword'>new</span><span> IOException(</span><span class='string'>"读取数据长度错误"</span><span>);   </span> </li>
    <li class='alt'><span>            readLine(in);   </span> </li>
    <li class=''><span>            String endstr = readLine(in);   </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>if</span><span> (MemBufferedDriver.SERVER_STATUS_END.equals(endstr))   </span> </li>
    <li class=''><span>                </span><span class='keyword'>return</span><span> </span><span class='keyword'>this</span><span>.byte2Object(bs);   </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>else</span><span>  </span> </li>
    <li class=''><span>                System.out.println(</span><span class='string'>"结束标记错误"</span><span>);   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>        }   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>return</span><span> </span><span class='keyword'>null</span><span>;   </span> </li>
    <li class=''><span>    }   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>    </span><span class='keyword'>private</span><span> String encodeKey(String key) </span><span class='keyword'>throws</span><span> UnsupportedEncodingException {   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>return</span><span> URLEncoder.encode(key, MemBufferedDriver.ENCODING_TYPE);   </span> </li>
    <li class=''><span>    }   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>private</span><span> String readLine(InputStream in) </span><span class='keyword'>throws</span><span> IOException {   </span> </li>
    <li class=''><span>        ByteArrayOutputStream bos = </span><span class='keyword'>new</span><span> ByteArrayOutputStream();   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>boolean</span><span> eol = </span><span class='keyword'>false</span><span>;   </span> </li>
    <li class=''><span>        </span><span class='keyword'>byte</span><span>[] b = </span><span class='keyword'>new</span><span> </span><span class='keyword'>byte</span><span>[</span><span class='number'>1</span><span>];   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>while</span><span> (in.read(b, </span><span class='number'>0</span><span>, </span><span class='number'>1</span><span>) != -</span><span class='number'>1</span><span>) {   </span> </li>
    <li class=''><span>            </span><span class='keyword'>if</span><span> (b[</span><span class='number'>0</span><span>] == </span><span class='number'>13</span><span>) eol = </span><span class='keyword'>true</span><span>;   </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>else</span><span> </span><span class='keyword'>if</span><span> (eol &amp;&amp; b[</span><span class='number'>0</span><span>] == </span><span class='number'>10</span><span>) </span><span class='keyword'>break</span><span>;   </span> </li>
    <li class=''><span>            </span><span class='keyword'>else</span><span>  </span> </li>
    <li class='alt'><span>                eol = </span><span class='keyword'>false</span><span>;   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>            bos.write(b, </span><span class='number'>0</span><span>, </span><span class='number'>1</span><span>);   </span> </li>
    <li class=''><span>        }   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>if</span><span> (bos.size() == </span><span class='number'>0</span><span>) </span><span class='keyword'>return</span><span> </span><span class='keyword'>null</span><span>;   </span> </li>
    <li class=''><span>        </span><span class='keyword'>return</span><span> bos.toString().trim();   </span> </li>
    <li class='alt'><span>    }   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>    </span><span class='keyword'>private</span><span> </span><span class='keyword'>byte</span><span>[] object2Byte(Object o) </span><span class='keyword'>throws</span><span> IOException {   </span> </li>
    <li class='alt'><span>        ByteArrayOutputStream b = </span><span class='keyword'>new</span><span> ByteArrayOutputStream();   </span> </li>
    <li class=''><span>        </span><span class='keyword'>new</span><span> ObjectOutputStream(b).writeObject(o);   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>return</span><span> b.toByteArray();   </span> </li>
    <li class=''><span>    }   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>    </span><span class='keyword'>private</span><span> Object byte2Object(</span><span class='keyword'>byte</span><span>[] b) </span><span class='keyword'>throws</span><span> IOException, ClassNotFoundException {   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>return</span><span> </span><span class='keyword'>new</span><span> ObjectInputStream(</span><span class='keyword'>new</span><span> ByteArrayInputStream(b)).readObject();   </span> </li>
    <li class=''><span>    }   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>void</span><span> main(String[] args) </span><span class='keyword'>throws</span><span> Exception {   </span> </li>
    <li class=''><span>        MemBufferedDriver m = </span><span class='keyword'>new</span><span> MemBufferedDriver();   </span> </li>
    <li class='alt'><span>        System.out.println(m.set(</span><span class='string'>"a"</span><span>, </span><span class='string'>"DsSD"</span><span>));   </span> </li>
    <li class=''><span>        System.out.println(m.get(</span><span class='string'>"a"</span><span>));   </span> </li>
    <li class='alt'><span>    }   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>void</span><span> setServerAddress(String serverAddress) {   </span> </li>
    <li class=''><span>        MemBufferedDriver.serverAddress = serverAddress;   </span> </li>
    <li class='alt'><span>    }   </span> </li>
    <li class=''><span>}  </span> </li>
</ol>
</div>
<p> <br/>
<strong>java 代码<br/>
写入测试类</strong></p>
<div class='dp-highlighter'>
<div class='bar'/>
<ol class='dp-j'>
    <li class='alt'><span><span class='keyword'>public</span><span> </span><span class='keyword'>class</span><span> Fill2Server </span><span class='keyword'>extends</span><span> Thread {   </span></span> </li>
    <li class=''><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>int</span><span> THREAD_COUNT = </span><span class='number'>2</span><span>;   </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> Queue queue = </span><span class='keyword'>new</span><span> Queue();   </span> </li>
    <li class=''><span>    MemBufferedDriver md = </span><span class='keyword'>new</span><span> MemBufferedDriver();   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>void</span><span> main(String[] args) </span><span class='keyword'>throws</span><span> Exception {   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>        </span><span class='keyword'>int</span><span> size ;   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>if</span><span> (args.length == </span><span class='number'>3</span><span> &amp;&amp; args[</span><span class='number'>0</span><span>] != </span><span class='keyword'>null</span><span> &amp;&amp; args[</span><span class='number'>1</span><span>] != </span><span class='keyword'>null</span><span>) {   </span> </li>
    <li class=''><span>            MemDriver.setServerAddress(args[</span><span class='number'>0</span><span>]);   </span> </li>
    <li class='alt'><span>            size = Integer.parseInt(args[</span><span class='number'>1</span><span>]);   </span> </li>
    <li class=''><span>            THREAD_COUNT = Integer.parseInt(args[</span><span class='number'>2</span><span>]);   </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>new</span><span> Fill2Server().doFill(size);   </span> </li>
    <li class=''><span>        } </span><span class='keyword'>else</span><span>  </span> </li>
    <li class='alt'><span>            System.out.println(</span><span class='string'>"参数1 连接服务器地址 ipaddress:port ,参数2填充数量,不能小于10000,参数3为使用的线程数"</span><span>);   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>    }   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>private</span><span> </span><span class='keyword'>void</span><span> doFill(</span><span class='keyword'>int</span><span> size) </span><span class='keyword'>throws</span><span> InterruptedException {   </span> </li>
    <li class=''><span>        </span><span class='keyword'>int</span><span> taskCount = size / </span><span class='number'>1000</span><span>;   </span><span class='comment'>//每个线程负责填充1000的对象 </span><span>  </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>for</span><span> (</span><span class='keyword'>int</span><span> i = </span><span class='number'>0</span><span>; i &lt; taskCount; i++) {   </span> </li>
    <li class=''><span>            Task t = </span><span class='keyword'>new</span><span> Task();   </span> </li>
    <li class='alt'><span>            t.setTaskId(String.valueOf(i));   </span> </li>
    <li class=''><span>            queue.add(t);   </span> </li>
    <li class='alt'><span>        }   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>long</span><span> time = System.currentTimeMillis();   </span> </li>
    <li class=''><span>        Thread tr[] = </span><span class='keyword'>new</span><span> Thread[THREAD_COUNT];   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>for</span><span> (</span><span class='keyword'>int</span><span> i = </span><span class='number'>0</span><span>; i &lt; THREAD_COUNT; i++) {   </span> </li>
    <li class=''><span>            FillThread ft = </span><span class='keyword'>new</span><span> FillThread();   </span> </li>
    <li class='alt'><span>            (tr[i] = </span><span class='keyword'>new</span><span> Thread(ft)).start();   </span> </li>
    <li class=''><span>        }   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>        </span><span class='comment'>//监控填充完成 </span><span>  </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>while</span><span> (</span><span class='keyword'>true</span><span>) {   </span> </li>
    <li class=''><span>            </span><span class='keyword'>boolean</span><span> flag = </span><span class='keyword'>true</span><span>;   </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>for</span><span> (</span><span class='keyword'>int</span><span> i = </span><span class='number'>0</span><span>; i &lt; THREAD_COUNT; i++)   </span> </li>
    <li class=''><span>                flag &amp;= tr[i].isAlive();   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>            </span><span class='keyword'>if</span><span> (!flag) </span><span class='keyword'>break</span><span>;   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>            Thread.sleep(</span><span class='number'>1000</span><span>);   </span> </li>
    <li class='alt'><span>        }   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>        time = System.currentTimeMillis() - time;   </span> </li>
    <li class=''><span>        System.out.println(</span><span class='string'>"任务完成,共用"</span><span> + (time / </span><span class='number'>1000</span><span>) + </span><span class='string'>"s"</span><span>);   </span> </li>
    <li class='alt'><span>    }   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>class</span><span> FillThread </span><span class='keyword'>implements</span><span> Runnable {   </span> </li>
    <li class=''><span>        </span><span class='keyword'>public</span><span> </span><span class='keyword'>void</span><span> run() {   </span> </li>
    <li class='alt'><span>            Task task;   </span> </li>
    <li class=''><span>            </span><span class='keyword'>while</span><span> (</span><span class='keyword'>true</span><span>) {   </span> </li>
    <li class='alt'><span>                task = (Task) queue.get();   </span> </li>
    <li class=''><span>                </span><span class='keyword'>if</span><span> (task == </span><span class='keyword'>null</span><span>) </span><span class='keyword'>break</span><span>;   </span> </li>
    <li class='alt'><span>                </span><span class='keyword'>long</span><span> time = System.nanoTime();   </span> </li>
    <li class=''><span>                </span><span class='keyword'>for</span><span> (</span><span class='keyword'>int</span><span> i = </span><span class='number'>0</span><span>; i &lt; </span><span class='number'>1000</span><span>; i++) {   </span> </li>
    <li class='alt'><span>                    TestBO b = </span><span class='keyword'>new</span><span> TestBO();   </span> </li>
    <li class=''><span>                    md.set(task.getTaskId() + i, b);   </span> </li>
    <li class='alt'><span>                }   </span> </li>
    <li class=''><span>                time = System.nanoTime() - time;   </span> </li>
    <li class='alt'><span>                System.out.println(Thread.currentThread().getName() + </span><span class='string'>" avg "</span><span> + (time / </span><span class='number'>1000</span><span>) + </span><span class='string'>" ns "</span><span>);   </span> </li>
    <li class=''><span>            }   </span> </li>
    <li class='alt'><span>        }   </span> </li>
    <li class=''><span>    }   </span> </li>
    <li class='alt'><span>}  </span> </li>
</ol>
</div>
<p> </p>
<div class='code_title'>java 代码<br/>
读取的测试方法</div>
<div class='dp-highlighter'>
<div class='bar'/>
<ol class='dp-j'>
    <li class='alt'><span><span class='keyword'>public</span><span> </span><span class='keyword'>class</span><span> GetFromServer </span><span class='keyword'>extends</span><span> Thread {   </span></span> </li>
    <li class=''><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>int</span><span> THREAD_COUNT = </span><span class='number'>2</span><span>;   </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> Queue queue = </span><span class='keyword'>new</span><span> Queue();   </span> </li>
    <li class=''><span>    MemDriver md = </span><span class='keyword'>new</span><span> MemDriver();   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>    </span><span class='keyword'>public</span><span> </span><span class='keyword'>static</span><span> </span><span class='keyword'>void</span><span> main(String[] args) </span><span class='keyword'>throws</span><span> Exception {   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>        </span><span class='keyword'>int</span><span> size;   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>if</span><span> (args.length == </span><span class='number'>3</span><span> &amp;&amp; args[</span><span class='number'>0</span><span>] != </span><span class='keyword'>null</span><span> &amp;&amp; args[</span><span class='number'>1</span><span>] != </span><span class='keyword'>null</span><span>) {   </span> </li>
    <li class=''><span>            MemDriver.setServerAddress(args[</span><span class='number'>0</span><span>]);   </span> </li>
    <li class='alt'><span>            size = Integer.parseInt(args[</span><span class='number'>1</span><span>]);   </span> </li>
    <li class=''><span>            THREAD_COUNT = Integer.parseInt(args[</span><span class='number'>2</span><span>]);   </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>new</span><span> GetFromServer().doFill(size);   </span> </li>
    <li class=''><span>        } </span><span class='keyword'>else</span><span>  </span> </li>
    <li class='alt'><span>            System.out.println(</span><span class='string'>"参数1 连接服务器地址 ipaddress:port ,参数2读取数量不能小于1000,参数3为使用的线程数"</span><span>);   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>    }   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>    </span><span class='keyword'>private</span><span> </span><span class='keyword'>void</span><span> doFill(</span><span class='keyword'>int</span><span> size) </span><span class='keyword'>throws</span><span> InterruptedException {   </span> </li>
    <li class=''><span>        </span><span class='keyword'>int</span><span> taskCount = size / </span><span class='number'>100</span><span>;   </span><span class='comment'>//每个线程负责填充1000的对象 </span><span>  </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>for</span><span> (</span><span class='keyword'>int</span><span> i = </span><span class='number'>0</span><span>; i &lt; taskCount; i++) {   </span> </li>
    <li class=''><span>            Task t = </span><span class='keyword'>new</span><span> Task();   </span> </li>
    <li class='alt'><span>            t.setTaskId(String.valueOf(i));   </span> </li>
    <li class=''><span>            GetFromServer.queue.add(t);   </span> </li>
    <li class='alt'><span>        }   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>long</span><span> time = System.currentTimeMillis();   </span> </li>
    <li class=''><span>        Thread tr[] = </span><span class='keyword'>new</span><span> Thread[GetFromServer.THREAD_COUNT];   </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>for</span><span> (</span><span class='keyword'>int</span><span> i = </span><span class='number'>0</span><span>; i &lt; GetFromServer.THREAD_COUNT; i++) {   </span> </li>
    <li class=''><span>            GetFromServer.FillThread ft = </span><span class='keyword'>new</span><span> GetFromServer.FillThread();   </span> </li>
    <li class='alt'><span>            (tr[i] = </span><span class='keyword'>new</span><span> Thread(ft)).start();   </span> </li>
    <li class=''><span>        }   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>        </span><span class='comment'>//监控填充完成 </span><span>  </span> </li>
    <li class='alt'><span>        </span><span class='keyword'>while</span><span> (</span><span class='keyword'>true</span><span>) {   </span> </li>
    <li class=''><span>            </span><span class='keyword'>boolean</span><span> flag = </span><span class='keyword'>true</span><span>;   </span> </li>
    <li class='alt'><span>            </span><span class='keyword'>for</span><span> (</span><span class='keyword'>int</span><span> i = </span><span class='number'>0</span><span>; i &lt; GetFromServer.THREAD_COUNT; i++)   </span> </li>
    <li class=''><span>                flag &amp;= tr[i].isAlive();   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>            </span><span class='keyword'>if</span><span> (!flag) </span><span class='keyword'>break</span><span>;   </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>            Thread.sleep(</span><span class='number'>1000</span><span>);   </span> </li>
    <li class='alt'><span>        }   </span> </li>
    <li class=''><span>  </span> </li>
    <li class='alt'><span>  </span> </li>
    <li class=''><span>        time = System.currentTimeMillis() - time;   </span> </li>
    <li class='alt'><span>        System.out.println(</span><span class='string'>"任务完成,共用"</span><span> + (time / </span><span class='number'>1000</span><span>) + </span><span class='string'>"s"</span></li></ol></div></font></div>
32 楼 davexin 2007-09-01  
codeutil 写道

昨天听同学说他的memecache用的是12G内存.存储大约1000万个键值对,查询速度很快.

不太相信能用到12G内存,估计4G就差不多了