Redis pub/sub 在Java中的实现

1.什么是pub/sub
Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与23种设计模式中的观察者模式极为相似。
同样,Redis的pub/sub是一种消息通信模式,主要的目的是解除消息发布者和消息订阅者之间的耦合,Redis作为一个pub/sub的server,在订阅者和发布者之间起到了消息路由的功能。

2.Redis pub/sub的实现
Redis通过publish和subscribe命令实现订阅和发布的功能。订阅者可以通过subscribe向redis server订阅自己感兴趣的消息类型。redis将信息类型称为通道(channel)。当发布者通过publish命令向redis server发送特定类型的信息时,订阅该消息类型的全部订阅者都会收到此消息。

 3、创建一个Redis消息接收器

在任何基于消息传递的应用程序中,都有消息发布者和消息接收者。 要创建消息接收方,请使用一种方法来实现接收方以响应消息:

package com.example.spdemo.controller;

import org.springframework.stereotype.Component;

@Component
public class MessageReceiver {
/**接收消息的方法*/
public void receiveMessage(String message){

System.out.println("收到一条消息:"+message);
}
}

MessageReceiver是一个简单的POJO,它定义了一种接收消息的方法。 正如您将MessageReceiver注册为消息侦听器时所看到的,您可以根据需要命名消息处理方法。

4、注册侦听器并发送消息

Spring Data Redis提供了使用Redis发送和接收消息所需的所有组件。 具体来说,你需要配置:

  • 连接工厂
  • 消息侦听器容器
  • Redis模板

您将使用Redis模板发送消息,并且您将向MessageReceiver注册消息侦听器容器,以便它可以接收消息。 连接工厂驱动模板和消息侦听器容器,使它们能够连接到Redis服务器。

本示例使用Spring Boot的默认RedisConnectionFactory,它是基于Jedis Redis库的JedisConnectionFactory的一个实例。 连接工厂被注入到消息监听器容器和Redis模板中。

package com.example.spdemo.config;

import com.example.spdemo.controller.MessageReceiver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration
public class RedisConfig {
    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean //相当于xml中的bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅了一个叫chat 的通道
        container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
        //这个container 可以添加多个 messageListener
        return container;
    }
    /**
     * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
     * @param receiver
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
        //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
        //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
    /**redis 读取内容的template */
    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
}

在listenerAdapter方法中定义的bean在容器中定义的消息侦听器容器中注册为消息侦听器,并将侦听“chat”主题上的消息。

由于MessageReceiver类是POJO,因此需要将其包装在实现AddMessageListener()所需的MessageListener接口的消息侦听器适配器中。

消息侦听器适配器还配置为在消息到达时调用Receiver上的receiveMessage()方法。

连接工厂和消息监听器容器bean都是您需要侦听消息的。

要发送消息,您还需要一个Redis模板。在这里,它是一个配置为StringRedisTemplate的bean,它是RedisTemplate的一个实现,它着重于Redis的常用用法,其中键和值都是`String`。

5、模拟发送消息

package com.example.spdemo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@EnableScheduling //开启定时器功能
@Component
public class MessageSender {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Scheduled(fixedRate = 2000) //间隔2s 通过StringRedisTemplate对象向redis消息队列chat频道发布消息
    public void sendMessage(){
        stringRedisTemplate.convertAndSend("chat","信息内容为:"+String.valueOf(Math.random()));
    }
}