redis 2 基本使用+Pipeline+事务
分类:
IT文章
•
2022-05-17 11:03:04
一 redis key的设计
越短,而且要完整表达含义,可以缩写,但必须文档留存好说明
user:001
tm:order:001 order:1
一般以业务,功能模块或者表名开头,后跟主键(或能表示数据唯一性的值)
二 客户端连接redis
普通实现
import redis.clients.jedis.Jedis;
public class jedistest {
public static void main(String[] args) {
try {
String host = "xx.kvstore.aliyuncs.com";//控制台显示访问地址
int port = 6379;
//直接new一个jedis对象
Jedis jedis = new Jedis(host, port);
//鉴权信息
jedis.auth("password");//password
String key = "redis";
String value = "aliyun-redis";
//select db默认为0
jedis.select(1);
//set一个key
jedis.set(key, value);
System.out.println("Set Key " + key + " Value: " + value);
//get 设置进去的key
String getvalue = jedis.get(key);
System.out.println("Get Key " + key + " ReturnValue: " + getvalue);
jedis.quit();
jedis.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
View Code
使用了连接池
package com.springboot.demo.base.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
@Configuration
@PropertySource("classpath:redis.properties")
@Slf4j
public class RedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int port;
@Value("${spring.redis.timeout}")
private int timeout;
@Value("${spring.redis.jedis.pool.max-idle}")
private int maxIdle;
@Value("${spring.redis.jedis.pool.max-wait}")
private long maxWaitMillis;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.block-when-exhausted}")
private boolean blockWhenExhausted;
@Bean
public JedisPool redisPoolFactory() throws Exception{
log.info("JedisPool注入成功!!");
log.info("redis地址:" + host + ":" + port);
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
// 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
jedisPoolConfig.setBlockWhenExhausted(blockWhenExhausted);
// 是否启用pool的jmx管理功能, 默认true
jedisPoolConfig.setJmxEnabled(true);
JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout, password);
return jedisPool;
}
}
View Code
从连接池中获取jedis对象
/**
* 通过key获取储存在redis中的value
* 并释放连接
*
* @param key
* @param indexdb 选择redis库 0-15
* @return 成功返回value 失败返回null
*/
public String get(String key,int indexdb) {
Jedis jedis = null;
String value = null;
try {
jedis = jedisPool.getResource();
jedis.select(indexdb);
value = jedis.get(key);
log.info(value);
} catch (Exception e) {
log.error(e.getMessage());
} finally {
//归还redis对象
returnResource(jedisPool, jedis);
//或者 jedis.close()
}
return value;
}
View Code
redis的resp协议,无论是传输还是存储,都是按这个数据格式
如 set name james ----> resp协议包 -----> redis 服务端
resp协议包的格式如下
*3 //组数
$3 //字段长度
set // 字段名
$4 //key长度
name //key的字段名
$5 //值的长度
james //值
手写redis
//基于tcp协议 socket实现 redis客户端
public static String set(Socket socket, String key, String value) throws Exception {
StringBuilder str = new StringBuilder();
//1个resp协议格式的数据
str.append("*3").append("
"); //组数
str.append("$3").append("
");//字段长度
str.append("set").append("
");//字段
str.append("$").append(key.getBytes().length).append("
");//key长度
str.append(key).append("
"); //key 字段名
str.append("$").append(value.getBytes().length).append("
");//value长度
str.append(value).append("
"); //value的值
socket.getOutputStream().write(str.toString().getBytes());
byte[] response = new byte[2048];
socket.getInputStream().read(response);
return new String(response);
}
View Code
三 Pipeline详解
1 pipeline出现的背景:
redis客户端执行一条命令分4个过程:
发送命令--->命令排队--->命令执行--->返回结果
这个过程称为Round trip time(简称RTT, 往返时间),mget mset等批量此操作有效节约了RTT,但大部分命令(如hgetall,并没有mhgetall)不支持批量操作,需要消耗N次RTT ,这个时候需要pipeline来解决这个问题。
2 pepeline的性能
1) 未使用pipeline执行N条命令

特别频繁的redis操作,大部分时间都是花在了网络上,网络时间比redis执行时间要长很多,特别是异地机房更加明显
2)、使用了pipeline执行N条命令

3)、两者性能对比

4 )具体实现
@Test
public void pipeCompare() {
Jedis redis = new Jedis("192.168.1.111", 6379);
redis.auth("12345678");//授权密码 对应redis.conf的requirepass密码
Map<String, String> data = new HashMap<String, String>();
redis.select(8);//使用第8个库
redis.flushDB();//清空第8个库所有数据
// hmset
long start = System.currentTimeMillis();
// 直接hmset 循环10000次往redis里写数据
for (int i = 0; i < 10000; i++) {
data.clear(); //清空map
data.put("k_" + i, "v_" + i);
redis.hmset("key_" + i, data); //循环执行10000条数据插入redis
}
long end = System.currentTimeMillis();
System.out.println(" 共插入:[" + redis.dbSize() + "]条 .. ");
System.out.println("1,未使用PIPE批量设值耗时" + (end - start) / 1000 + "秒..");
redis.select(8);
redis.flushDB();
// 使用pipeline
Pipeline pipe = redis.pipelined();
start = System.currentTimeMillis();
//循环10000次,值都放到了Pileline里了
for (int i = 0; i < 10000; i++) {
data.clear();
data.put("k_" + i, "v_" + i);
pipe.hmset("key_" + i, data); //将值封装到PIPE对象,此时并未执行,还停留在客户端
}
pipe.sync(); //将封装后的PIPE一次性发给redis
end = System.currentTimeMillis();
System.out.println(" PIPE共插入:[" + redis.dbSize() + "]条 .. ");
System.out.println("2,使用PIPE批量设值耗时" + (end - start) / 1000 + "秒 ..");
View Code
性能对比

使用了Pipeline性能高效了很多倍,可以优化吞吐量
四、原生批命令(mset, mget)与Pipeline对比
1 原生批命令是原子性,pipeline是非原子性
(原子性概念:一个事务是一个不可分割的最小工作单位,要么都成功要么都失败。原子操作是指你的一个业务逻辑必须是不可拆分的. 处理一件事情要么都成功,要么都失败,原子不可拆分)
2 原生批命令一命令多个key, 但pipeline支持多命令(存在事务),非原子性
3 原生批命令是服务端实现,而pipeline需要服务端与客户端共同完成
五、Pipeline正确使用方式
使用pipeline组装的命令个数不能太多,不然数据量过大,增加客户端的等待时间,还可能造成网络阻塞,可以将大量命令的拆分多个小的pipeline命令完成。
六 具体实现
/**
* 删除多个字符串key 并释放连接
*
* @param keys*
* @return 成功返回value 失败返回null
*/
public boolean mdel(List<String> keys) {
Jedis jedis = null;
boolean flag = false;
try {
jedis = pool.getResource();//从连接借用Jedis对象
Pipeline pipe = jedis.pipelined();//获取jedis对象的pipeline对象
for(String key:keys){
pipe.del(key); //将多个key放入pipe删除指令中
}
pipe.sync(); //执行命令,完全此时pipeline对象的远程调用
flag = true;
} catch (Exception e) {
pool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
returnResource(pool, jedis);
}
return flag;
}
View Code
七 Redis事务
Redis事务的概念:
Redis 事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。
总结说:redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令。
Redis事务没有隔离级别的概念:
批量操作在发送 EXEC 命令前被放入队列缓存,并不会被实际执行,也就不存在事务内的查询要看到事务里的更新,事务外查询不能看到。
Redis不保证原子性:
Redis中,单条命令是原子性执行的,但事务不保证原子性,且没有回滚。事务中任意命令执行失败,其余的命令仍会被执行。
Redis事务的三个阶段:
Redis事务相关命令:
watch key1 key2 ... : 监视一或多个key,如果在事务执行之前,被监视的key被其他命令改动,则事务被打断 ( 类似乐观锁 )
multi : 标记一个事务块的开始( queued )
exec : 执行所有事务块的命令 ( 一旦执行exec后,之前加的监控锁都会被取消掉 )
discard : 取消事务,放弃事务块中的所有命令
unwatch : 取消watch对所有key的监控
Redis事务使用案例:
(1)正常执行

(2)放弃事务

(3)若在事务队列中存在命令性错误(类似于java编译性错误),则执行EXEC命令时,所有命令都不会执行

(4)若在事务队列中存在语法性错误(类似于java的1/0的运行时异常),则执行EXEC命令时,其他正确命令会被执行,错误命令抛出异常。

(5)使用watch
案例一:使用watch检测balance,事务期间balance数据未变动,事务执行成功

案例二:使用watch检测balance,在开启事务后(标注1处),在新窗口执行标注2中的操作,更改balance的值,模拟其他客户端在事务执行期间更改watch监控的数据,然后再执行标注1后命令,执行EXEC后,事务未成功执行。


一但执行 EXEC 开启事务的执行后,无论事务使用执行成功, WARCH 对变量的监控都将被取消。
故当事务执行失败后,需重新执行WATCH命令对变量进行监控,并开启新的事务进行操作。
总结:
watch指令类似于乐观锁,在事务提交时,如果watch监控的多个KEY中任何KEY的值已经被其他客户端更改,则使用EXEC执行事务时,事务队列将不会被执行,同时返回Nullmulti-bulk应答以通知调用者事务执行失败。