Redis的Java客户端Jedis的八种调用模式(事务、管道、分布式)介绍

Redis的Java客户端Jedis的八种调用方式(事务、管道、分布式)介绍

jedis是一个著名的key-value存储系统,而作为其官方推荐的java版客户端jedis也非常强大和稳定,支持事务、管道及有jedis自身实现的分布式。

在这里对jedis关于事务、管道和分布式的调用方式做一个简单的介绍和对比:

一、普通同步方式

最简单和基础的调用方式,

[java] view plain copy
 
  1. @Test  
  2. public void test1Normal() {  
  3.     Jedis jedis = new Jedis("localhost");  
  4.     long start = System.currentTimeMillis();  
  5.     for (int i = 0; i < 100000; i++) {  
  6.         String result = jedis.set("n" + i, "n" + i);  
  7.     }  
  8.     long end = System.currentTimeMillis();  
  9.     System.out.println("Simple SET: " + ((end - start)/1000.0) + " seconds");  
  10.     jedis.disconnect();  
  11. }  

很简单吧,每次set之后都可以返回结果,标记是否成功。

二、事务方式(Transactions)

redis的事务很简单,他主要目的是保障,一个client发起的事务中的命令可以连续的执行,而中间不会插入其他client的命令。

看下面例子:

[java] view plain copy
 
  1. @Test  
  2. public void test2Trans() {  
  3.     Jedis jedis = new Jedis("localhost");  
  4.     long start = System.currentTimeMillis();  
  5.     Transaction tx = jedis.multi();  
  6.     for (int i = 0; i < 100000; i++) {  
  7.         tx.set("t" + i, "t" + i);  
  8.     }  
  9.     List<Object> results = tx.exec();  
  10.     long end = System.currentTimeMillis();  
  11.     System.out.println("Transaction SET: " + ((end - start)/1000.0) + " seconds");  
  12.     jedis.disconnect();  
  13. }  

我们调用jedis.watch(…)方法来监控key,如果调用后key值发生变化,则整个事务会执行失败。另外,事务中某个操作失败,并不会回滚其他操作。这一点需要注意。还有,我们可以使用discard()方法来取消事务。

三、管道(Pipelining)

有时,我们需要采用异步方式,一次发送多个指令,不同步等待其返回结果。这样可以取得非常好的执行效率。这就是管道,调用方法如下:

[java] view plain copy
 
  1. @Test  
  2. public void test3Pipelined() {  
  3.     Jedis jedis = new Jedis("localhost");  
  4.     Pipeline pipeline = jedis.pipelined();  
  5.     long start = System.currentTimeMillis();  
  6.     for (int i = 0; i < 100000; i++) {  
  7.         pipeline.set("p" + i, "p" + i);  
  8.     }  
  9.     List<Object> results = pipeline.syncAndReturnAll();  
  10.     long end = System.currentTimeMillis();  
  11.     System.out.println("Pipelined SET: " + ((end - start)/1000.0) + " seconds");  
  12.     jedis.disconnect();  
  13. }  

四、管道中调用事务

就Jedis提供的方法而言,是可以做到在管道中使用事务,其代码如下:

[java] view plain copy
 
  1. @Test  
  2. public void test4combPipelineTrans() {  
  3.     jedis = new Jedis("localhost");   
  4.     long start = System.currentTimeMillis();  
  5.     Pipeline pipeline = jedis.pipelined();  
  6.     pipeline.multi();  
  7.     for (int i = 0; i < 100000; i++) {  
  8.         pipeline.set("" + i, "" + i);  
  9.     }  
  10.     pipeline.exec();  
  11.     List<Object> results = pipeline.syncAndReturnAll();  
  12.     long end = System.currentTimeMillis();  
  13.     System.out.println("Pipelined transaction: " + ((end - start)/1000.0) + " seconds");  
  14.     jedis.disconnect();  
  15. }  

但是经测试(见本文后续部分),发现其效率和单独使用事务差不多,甚至还略微差点。

五、分布式直连同步调用

[java] view plain copy
 
  1. @Test  
  2. public void test5shardNormal() {  
  3.     List<JedisShardInfo> shards = Arrays.asList(  
  4.             new JedisShardInfo("localhost",6379),  
  5.             new JedisShardInfo("localhost",6380));  
  6.   
  7.     ShardedJedis sharding = new ShardedJedis(shards);  
  8.   
  9.     long start = System.currentTimeMillis();  
  10.     for (int i = 0; i < 100000; i++) {  
  11.         String result = sharding.set("sn" + i, "n" + i);  
  12.     }  
  13.     long end = System.currentTimeMillis();  
  14.     System.out.println("Simple@Sharing SET: " + ((end - start)/1000.0) + " seconds");  
  15.   
  16.     sharding.disconnect();  
  17. }  

这个是分布式直接连接,并且是同步调用,每步执行都返回执行结果。类似地,还有异步管道调用。

六、分布式直连异步调用

[java] view plain copy
 
  1. @Test  
  2. public void test6shardpipelined() {  
  3.     List<JedisShardInfo> shards = Arrays.asList(  
  4.             new JedisShardInfo("localhost",6379),  
  5.             new JedisShardInfo("localhost",6380));  
  6.   
  7.     ShardedJedis sharding = new ShardedJedis(shards);  
  8.   
  9.     ShardedJedisPipeline pipeline = sharding.pipelined();  
  10.     long start = System.currentTimeMillis();  
  11.     for (int i = 0; i < 100000; i++) {  
  12.         pipeline.set("sp" + i, "p" + i);  
  13.     }  
  14.     List<Object> results = pipeline.syncAndReturnAll();  
  15.     long end = System.currentTimeMillis();  
  16.     System.out.println("Pipelined@Sharing SET: " + ((end - start)/1000.0) + " seconds");  
  17.   
  18.     sharding.disconnect();  
  19. }  

七、分布式连接池同步调用

如果,你的分布式调用代码是运行在线程中,那么上面两个直连调用方式就不合适了,因为直连方式是非线程安全的,这个时候,你就必须选择连接池调用。

[java] view plain copy
 
  1. @Test  
  2. public void test7shardSimplePool() {  
  3.     List<JedisShardInfo> shards = Arrays.asList(  
  4.             new JedisShardInfo("localhost",6379),  
  5.             new JedisShardInfo("localhost",6380));  
  6.   
  7.     ShardedJedisPool pool = new ShardedJedisPool(new JedisPoolConfig(), shards);  
  8.   
  9.     ShardedJedis one = pool.getResource();  
  10.   
  11.     long start = System.currentTimeMillis();  
  12.     for (int i = 0; i < 100000; i++) {  
  13.         String result = one.set("spn" + i, "n" + i);  
  14.     }  
  15.     long end = System.currentTimeMillis();  
  16.     pool.returnResource(one);  
  17.     System.out.println("Simple@Pool SET: " + ((end - start)/1000.0) + " seconds");  
  18.   
  19.     pool.destroy();  
  20. }  

上面是同步方式,当然还有异步方式。

八、分布式连接池异步调用

[java] view plain copy
 
  1. @Test  
  2. public void test8shardPipelinedPool() {  
  3.     List<JedisShardInfo> shards = Arrays.asList(  
  4.             new JedisShardInfo("localhost",6379),  
  5.             new JedisShardInfo("localhost",6380));  
  6.   
  7.     ShardedJedisPool pool = new ShardedJedisPool(new JedisPoolConfig(), shards);  
  8.   
  9.     ShardedJedis one = pool.getResource();  
  10.   
  11.     ShardedJedisPipeline pipeline = one.pipelined();  
  12.   
  13.     long start = System.currentTimeMillis();  
  14.     for (int i = 0; i < 100000; i++) {  
  15.         pipeline.set("sppn" + i, "n" + i);  
  16.     }  
  17.     List<Object> results = pipeline.syncAndReturnAll();  
  18.     long end = System.currentTimeMillis();  
  19.     pool.returnResource(one);  
  20.     System.out.println("Pipelined@Pool SET: " + ((end - start)/1000.0) + " seconds");  
  21.     pool.destroy();  
  22. }  

九、需要注意的地方

  1. 事务和管道都是异步模式。在事务和管道中不能同步查询结果。比如下面两个调用,都是不允许的:

    [java] view plain copy
     
    1.  Transaction tx = jedis.multi();  
    2.  for (int i = 0; i < 100000; i++) {  
    3.      tx.set("t" + i, "t" + i);  
    4.  }  
    5.  System.out.println(tx.get("t1000").get());  //不允许  
    6.   
    7.  List<Object> results = tx.exec();  
    8.   
    9.  …  
    10.  …  
    11.   
    12.  Pipeline pipeline = jedis.pipelined();  
    13.  long start = System.currentTimeMillis();  
    14.  for (int i = 0; i < 100000; i++) {  
    15.      pipeline.set("p" + i, "p" + i);  
    16.  }  
    17.  System.out.println(pipeline.get("p1000").get()); //不允许  
    18.   
    19.  List<Object> results = pipeline.syncAndReturnAll();  
  2. 事务和管道都是异步的,个人感觉,在管道中再进行事务调用,没有必要,不如直接进行事务模式。

  3. 分布式中,连接池的性能比直连的性能略好(见后续测试部分)。

  4. 分布式调用中不支持事务。

    因为事务是在服务器端实现,而在分布式中,每批次的调用对象都可能访问不同的机器,所以,没法进行事务。

十、测试

运行上面的代码,进行测试,其结果如下:

[java] view plain copy
 
  1. Simple SET: 5.227 seconds  
  2.   
  3. Transaction SET: 0.5 seconds  
  4. Pipelined SET: 0.353 seconds  
  5. Pipelined transaction: 0.509 seconds  
  6.   
  7. Simple@Sharing SET: 5.289 seconds  
  8. Pipelined@Sharing SET: 0.348 seconds  
  9.   
  10. Simple@Pool SET: 5.039 seconds  
  11. Pipelined@Pool SET: 0.401 seconds  

另外,经测试分布式中用到的机器越多,调用会越慢。上面是2片,下面是5片:

[java] view plain copy
 
  1. Simple@Sharing SET: 5.494 seconds  
  2. Pipelined@Sharing SET: 0.51 seconds  
  3. Simple@Pool SET: 5.223 seconds  
  4. Pipelined@Pool SET: 0.518 seconds  

下面是10片:

[java] view plain copy
 
  1. Simple@Sharing SET: 5.9 seconds  
  2. Pipelined@Sharing SET: 0.794 seconds  
  3. Simple@Pool SET: 5.624 seconds  
  4. Pipelined@Pool SET: 0.762 seconds  

下面是100片:

[java] view plain copy
 
  1. Simple@Sharing SET: 14.055 seconds  
  2. Pipelined@Sharing SET: 8.185 seconds  
  3. Simple@Pool SET: 13.29 seconds  
  4. Pipelined@Pool SET: 7.767 seconds  

分布式中,连接池方式调用不但线程安全外,根据上面的测试数据,也可以看出连接池比直连的效率更好。

十一、完整的测试代码

[java] view plain copy
 
  1. package com.example.nosqlclient;  
  2.   
  3. import java.util.Arrays;  
  4. import java.util.List;  
  5.   
  6. import org.junit.AfterClass;  
  7. import org.junit.BeforeClass;  
  8. import org.junit.Test;  
  9.   
  10. import redis.clients.jedis.Jedis;  
  11. import redis.clients.jedis.JedisPoolConfig;  
  12. import redis.clients.jedis.JedisShardInfo;  
  13. import redis.clients.jedis.Pipeline;  
  14. import redis.clients.jedis.ShardedJedis;  
  15. import redis.clients.jedis.ShardedJedisPipeline;  
  16. import redis.clients.jedis.ShardedJedisPool;  
  17. import redis.clients.jedis.Transaction;  
  18.   
  19. import org.junit.FixMethodOrder;  
  20. import org.junit.runners.MethodSorters;  
  21.   
  22. @FixMethodOrder(MethodSorters.NAME_ASCENDING)  
  23. public class TestJedis {  
  24.   
  25.     private static Jedis jedis;  
  26.     private static ShardedJedis sharding;  
  27.     private static ShardedJedisPool pool;  
  28.   
  29.     @BeforeClass  
  30.     public static void setUpBeforeClass() throws Exception {  
  31.         List<JedisShardInfo> shards = Arrays.asList(  
  32.                 new JedisShardInfo("localhost",6379),  
  33.                 new JedisShardInfo("localhost",6379)); //使用相同的ip:port,仅作测试  
  34.   
  35.   
  36.         jedis = new Jedis("localhost");   
  37.         sharding = new ShardedJedis(shards);  
  38.   
  39.         pool = new ShardedJedisPool(new JedisPoolConfig(), shards);  
  40.     }  
  41.   
  42.     @AfterClass  
  43.     public static void tearDownAfterClass() throws Exception {  
  44.         jedis.disconnect();  
  45.         sharding.disconnect();  
  46.         pool.destroy();  
  47.     }  
  48.   
  49.     @Test  
  50.     public void test1Normal() {  
  51.         long start = System.currentTimeMillis();  
  52.         for (int i = 0; i < 100000; i++) {  
  53.             String result = jedis.set("n" + i, "n" + i);  
  54.         }  
  55.         long end = System.currentTimeMillis();  
  56.         System.out.println("Simple SET: " + ((end - start)/1000.0) + " seconds");  
  57.     }  
  58.   
  59.     @Test  
  60.     public void test2Trans() {  
  61.         long start = System.currentTimeMillis();  
  62.         Transaction tx = jedis.multi();  
  63.         for (int i = 0; i < 100000; i++) {  
  64.             tx.set("t" + i, "t" + i);  
  65.         }  
  66.         //System.out.println(tx.get("t1000").get());  
  67.   
  68.         List<Object> results = tx.exec();  
  69.         long end = System.currentTimeMillis();  
  70.         System.out.println("Transaction SET: " + ((end - start)/1000.0) + " seconds");  
  71.     }  
  72.   
  73.     @Test  
  74.     public void test3Pipelined() {  
  75.         Pipeline pipeline = jedis.pipelined();  
  76.         long start = System.currentTimeMillis();  
  77.         for (int i = 0; i < 100000; i++) {  
  78.             pipeline.set("p" + i, "p" + i);  
  79.         }  
  80.         //System.out.println(pipeline.get("p1000").get());  
  81.         List<Object> results = pipeline.syncAndReturnAll();  
  82.         long end = System.currentTimeMillis();  
  83.         System.out.println("Pipelined SET: " + ((end - start)/1000.0) + " seconds");  
  84.     }  
  85.   
  86.     @Test  
  87.     public void test4combPipelineTrans() {  
  88.         long start = System.currentTimeMillis();  
  89.         Pipeline pipeline = jedis.pipelined();  
  90.         pipeline.multi();  
  91.         for (int i = 0; i < 100000; i++) {  
  92.             pipeline.set("" + i, "" + i);  
  93.         }  
  94.         pipeline.exec();  
  95.         List<Object> results = pipeline.syncAndReturnAll();  
  96.         long end = System.currentTimeMillis();  
  97.         System.out.println("Pipelined transaction: " + ((end - start)/1000.0) + " seconds");  
  98.     }  
  99.   
  100.     @Test  
  101.     public void test5shardNormal() {  
  102.         long start = System.currentTimeMillis();  
  103.         for (int i = 0; i < 100000; i++) {  
  104.             String result = sharding.set("sn" + i, "n" + i);  
  105.         }  
  106.         long end = System.currentTimeMillis();  
  107.         System.out.println("Simple@Sharing SET: " + ((end - start)/1000.0) + " seconds");  
  108.     }  
  109.   
  110.     @Test  
  111.     public void test6shardpipelined() {  
  112.         ShardedJedisPipeline pipeline = sharding.pipelined();  
  113.         long start = System.currentTimeMillis();  
  114.         for (int i = 0; i < 100000; i++) {  
  115.             pipeline.set("sp" + i, "p" + i);  
  116.         }  
  117.         List<Object> results = pipeline.syncAndReturnAll();  
  118.         long end = System.currentTimeMillis();  
  119.         System.out.println("Pipelined@Sharing SET: " + ((end - start)/1000.0) + " seconds");  
  120.     }  
  121.   
  122.     @Test  
  123.     public void test7shardSimplePool() {  
  124.         ShardedJedis one = pool.getResource();  
  125.   
  126.         long start = System.currentTimeMillis();  
  127.         for (int i = 0; i < 100000; i++) {  
  128.             String result = one.set("spn" + i, "n" + i);  
  129.         }  
  130.         long end = System.currentTimeMillis();  
  131.         pool.returnResource(one);  
  132.         System.out.println("Simple@Pool SET: " + ((end - start)/1000.0) + " seconds");  
  133.     }  
  134.   
  135.     @Test  
  136.     public void test8shardPipelinedPool() {  
  137.         ShardedJedis one = pool.getResource();  
  138.   
  139.         ShardedJedisPipeline pipeline = one.pipelined();  
  140.   
  141.         long start = System.currentTimeMillis();  
  142.         for (int i = 0; i < 100000; i++) {  
  143.             pipeline.set("sppn" + i, "n" + i);  
  144.         }  
  145.         List<Object> results = pipeline.syncAndReturnAll();  
  146.         long end = System.currentTimeMillis();  
  147.         pool.returnResource(one);  
  148.         System.out.println("Pipelined@Pool SET: " + ((end - start)/1000.0) + " seconds");  
  149.     }  
  150. }