redis消息队列

正文

1、为什么要使用消息队列?

分析:一个用消息队列的人,不知道为啥用,这就有点尴尬。没有复习这点,很容易被问蒙,然后就开始胡扯了。
回答:这个问题,咱只答三个最主要的应用场景(不可否认还有其他的,但是只答三个主要的),即以下六个字:解耦、异步、削峰

(1)解耦

传统模式:
redis消息队列
传统模式的缺点

  • 系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!

中间件模式:
redis消息队列
中间件模式的的优点

  • 将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。

(2)异步

传统模式:
redis消息队列
传统模式的缺点

  • 一些非必要的业务逻辑以同步的方式运行,太耗费时间。

中间件模式:
redis消息队列
中间件模式的的优点

  • 将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度

(3)削峰

传统模式
redis消息队列
传统模式的缺点

  • 并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

中间件模式:
redis消息队列
中间件模式的的优点

  • 系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。

2、使用了消息队列会有什么缺点?

分析:一个使用了MQ的项目,如果连这个问题都没有考虑过,就把MQ引进去了,那就给自己的项目带来了风险。我们引入一个技术,要对这个技术的弊端有充分的认识,才能做好预防。要记住,不要给公司挖坑!
回答:回答也很容易,从以下两个个角度来答

  • 系统可用性降低:你想啊,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低
  • 系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。

但是,我们该用还是要用的。

一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式。利用redis这两种场景的消息队列都能够实现。 定义:

  • 生产者消费者模式:生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有。
  • 发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;即正常情况下每个消费者收到的消息应该都是一样的。

那么如此多的MQ产品,为什么要使用redis作消息队列呢?以下附上一份总结了别人的一些report或blog的表格,以及当初用来说服整个team的一句结论。

redis消息队列

Redis is easy to use and configure since we have experience in Redis, and most importantly, its performance satisfies our requirement.

Then, how to use redis as a MQ?

首先,redis的队列实际在代码逻辑中不需要由我们自己实现,因此一个所谓的 RedisMQ 对象实际是一个 redis key以及对其操作的一些封装。

PubSub Mode:

redis 从 2.0.0 版本开始支持 pub/sub 指令。详情见 http://redis.io/topics/pubsub

实现思想很简单,Publisher调用redis的publish方法往特定的channel发送消息,Subscriber在初始化的时候要subscribe到该channel,一旦有消息就会立即接收。

比较简单的demo可参见:http://shift-alt-ctrl.iteye.com/blog/1867454 ,此链接博客中写得已较详细,本文便不再赘述

Producer/Consumer Mode:

该方法是借助redis的list结构实现的。

Producer调用redis的lpush往特定key里塞入消息,Consumer调用brpop去不断监听该key。

producer:

1 // producer code
2 String key = "demo:mq:test";
3 String msg = "hello world";
4 redisDao.lpush(key, msg);

consumer:

// consumer code
String key = "demo:mq:test";
while (true) {
    // block invoke
    List<String> msgs = redisDao.brpop(BLOCK_TIMEOUT, listKey);
    if (msgs == null) continue;
    String jobMsg = msgs.get(1);
    processMsg(jobMsg);
}

当有多个consumers的时候,它会按照brpop调用的顺序分派消息,并非随机。

BLOCK_TIMEOUT不建议设成infinity(有些redis驱动也直接不支持inifinity),我们目前设成30(单位是秒)情况良好。

在学习RPOPLPUSH命令的时候,官方文档中有提到安全队列和不安全的队列,一开始没有看懂,现在理解了做个笔记。

一般情况下,我们可以借助List来实现消息队列,比如一个客户端通过命令LPUSH(BLPUSH)把消息入队,另一个客户端通过命令RPOP(BRPOP)获取消息。这种方式实现的队列是不安全的。

为什么是不安全的呢?因为RPOP命令的特性:会移除list的队尾元素(消息),并将这个元素(消息)返回给客户端。这意味着该元素就只存在于客户端的上下文中,redis服务器中没有这个元素了,如果客户端在处理这个返回元素的过程崩溃了,那么这个元素就永远丢失了。这种情况导致:客户端虽然成功收到了消息,但是却没有处理它。

那怎么来实现一个安全的队列呢?可以使用redis的 RPOPLPUSH (或者其阻塞版本的 BRPOPLPUSH)命令。

redis的命令中,很多都提供了阻塞与非阻塞两个方式

例如 LPUSH 为非阻塞,BLPUSH 为阻塞方式

他们的区别是什么?

唯一的区别是当列表中没有元素时,BRPOP命令会一直阻塞住连接,直到有新元素加入,而RPOP会直接返回nil

实际应用的区别

需要从队列获取任务

如果用非阻塞的方式,代码会是这样

# 无限循环读取任务队列中的内容 

loop 

  $task = RPOR queue 

  if $task 

    # 如果有任务则执行 

    execute($task) 

  else 

    # 如果没有就等待1秒

    wait 1 second 

当队列中没有任务时,每秒都会调用一次RPOP命令查看是否有新任务,可能会白白浪费很多系统资源,如果在有新任务加入队列时就通知消费者就好了,这个需求就可以使用阻塞式命令来实现

loop 

# 如果队列中没有任务,BRPOP命令会一直阻塞

# 0 表示一直等待,永不过期 

$task = BRPOP queue, 0 

# 有返回值就继续执行 

execute($task[1])

RPOPLPUSH命令格式:RPOPLPUSH  source  destination 。RPOPLPUSH命令原子性地返回并移除 source 列表的最后一个元素, 并把该元素放入 destination 列表的头部。使用这个命令就可以实现安全队列。

因为使用 RPOPLPUSH 获取消息时,RPOPLPUSH 会把消息返给客户端,同时把该消息放入一个备份消息列表,并且这个过程是原子的,可以保证消息的安全。当客户端成功的处理了消息后,就可以把此消息从备份列表中移除了。如果客户端因为崩溃的原因没有处理某个消息,那么就可以从备份列表destination中重新获取并处理这个消息。

近两三年的kafka和阿里的rocketmq也很火,至于怎么选择,一部分是根据数据量,若数据量不大,容错要求不是极高,redis是个高效开发易维护的好选择;如果数据量很大或对消息准确性有一定要求,那应当考虑更成熟的消息队列产品比如kafka等。所以mq的选型并不是本文的重点,本文只是介绍一下基于redis 2.6的mq的简单封装实现。

https://www.cnblogs.com/rjzheng/p/8994962.html