使用SpringCloudStream整合RabbitMQ
如果项目中我们用的是RabbitMQ进行消息传输,随着后面的业务需求,我们需要向Kafka迁移,如果单纯去修改代码,那是很繁琐的。
那么怎么解决这种情况呢,既能使用RabbitMQ又可以快速切换KafKa?这时就用到了SpringCloudStream:
其可以屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。不过目前只支持RabbitMQ 和 Kafka。
通过定义绑定器 Binder 作为中间层,实现了应用程序与消息中间件细节之间的隔离。向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种消息中间件的实现
inputs是消费者,outputs是生产者
Stream中的消息通信方式遵循了发布-订阅模式,用 Topic 主题进行广播(在RabbitMQ就是Exchange,在Kafka中就是Topic)
其主要流程如下图
Binder:很方便的连接中间件,屏蔽差异。
Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置。
Source和Sink:简单理解为消息的生产者和消费者。
SpringBoot整合SpringCLoudStream:
1、添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- commons-lang3 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.54</version> </dependency> <!-- Swagger2 --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.7.0</version> </dependency> <!-- Spring Cloud Stream --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> <version>2.1.3.RELEASE</version> <exclusions> <exclusion> <artifactId>objenesis</artifactId> <groupId>org.objenesis</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>2.1.3.RELEASE</version> </dependency>
2、配置文件application.yml
server:
port: 8088
spring:
application:
name: stream_demo
cloud:
stream:
binders: #需要绑定的rabbitmq的服务信息
defaultRabbit: #定义的名称,用于binding整合
type: rabbit #消息组件类型
environment: #配置rabbitmq连接环境
spring:
rabbitmq:
host: **.**.**.** #rabbitmq 服务器的地址
port: 5672 #rabbitmq 服务器端口
username: **** #rabbitmq 用户名
password: **** #rabbitmq 密码
virtual-host: / #虚拟路径
bindings: #服务的整合处理
#inputs 对应消费者,outputs 对应生产者
#Stream中的消息通信方式遵循了发布-订阅模式
#在Stream中,处于同一个组的多个消费者是竞争关系,就可以保证消息只被一个服务消费一次,而不同组是可以重复消费的。现在默认分组就是不同的,组流水号不一样。
#消费者宕机:如果未配置group,则消费者上线后无法消费之前的消息(消息丢失);如果配置了group,则消费上线后可以消费之前的消息(消息持久化)
testOutput: #生产者消息输出通道 ---> 消息输出通道 = 生产者相关的定义:Exchange & Queue
destination: exchange-test #exchange名称,交换模式默认是topic;把SpringCloud Stream的消息输出通道绑定到RabbitMQ的exchange-test交换器。
content-type: application/json #设置消息的类型,本次为json
default-binder: defaultRabbit #设置要绑定的消息服务的具体设置,默认绑定RabbitMQ
group: testGroup #分组=Queue名称,如果不设置会使用默认的组流水号
testInput: #消费者消息输入通道 ---> 消息输入通道 = 消费者相关的定义:Exchange & Queue
destination: exchange-test #exchange名称,交换模式默认是topic;把SpringCloud Stream的消息输入通道绑定到RabbitMQ的exchange-test交换器。
content-type: application/json
default-binder: defaultRabbit
group: testGroup
3、Channel信道
package com.stream.api; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.stereotype.Component; /** * @ClassName: TestChannelProcessor * @Description: 定义Channel信道 * @Author: qiaojiacheng * @Date: 2021/3/10 3:20 下午 */ @Component public interface TestChannelProcessor { /** * 生产者消息输出通道(需要与配置文件中的保持一致) */ String TEST_OUTPUT = "testOutput"; /** * 消息生产 * * @return */ @Output(TEST_OUTPUT) MessageChannel testOutput(); /** * 消费者消息输入通道(需要与配置文件中的保持一致) */ String TEST_INPUT = "testInput"; /** * 消息消费 * * @return */ @Input(TEST_INPUT) SubscribableChannel testInput(); }
4、生产者
package com.stream.provider; import com.alibaba.fastjson.JSON; import com.stream.api.TestChannelProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.binding.BinderAwareChannelResolver; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import java.util.HashMap; import java.util.Map; /** * @ClassName: TestMessageProducer * @Description: 生产者生产消息 * @Author: qiaojiacheng * @Date: 2021/3/10 3:21 下午 */ @EnableBinding(value = {TestChannelProcessor.class}) public class TestMessageProducer { @Autowired private BinderAwareChannelResolver channelResolver; /** * 生产消息 * * @param msg */ public void testSendMessage(String msg) { Map<String, Object> headers = new HashMap<>(); Map<String, Object> payload = new HashMap<>(); payload.put("msg", msg); System.err.println("生产者发送消息:" + JSON.toJSONString(payload)); channelResolver.resolveDestination(TestChannelProcessor.TEST_OUTPUT).send( MessageBuilder.createMessage(payload, new MessageHeaders(headers)) ); } }
5、发送消息的Controller
package com.stream.controller; import com.stream.provider.TestMessageProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * @ClassName: TestController * @Description: 测试发送消息 * @Author: qiaojiacheng * @Date: 2021/3/10 3:23 下午 */ @RestController public class TestController { @Autowired private TestMessageProducer testMessageProducer; /** * 发送保存订单消息 * * @param message */ @GetMapping(value = "sendTestMessage") public void sendTestMessage(@RequestParam("message") String message) { //发送消息 testMessageProducer.testSendMessage(message); } }
6、消费者
package com.stream.provider; import com.stream.api.TestChannelProcessor; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; /** * @ClassName: TestMessageConsumer * @Description: 消费者消费消息 * @Author: qiaojiacheng * @Date: 2021/3/10 4:09 下午 */ @EnableBinding(TestChannelProcessor.class) public class TestMessageConsumer { @StreamListener(TestChannelProcessor.TEST_INPUT) public void testConsumeMessage(Message<String> message) { System.err.println("消费者消费消息:" + message.getPayload()); } }
7、swagger配置,方便测试
package com.stream.swagger; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.context.request.async.DeferredResult; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.builders.PathSelectors; import springfox.documentation.service.ApiInfo; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; /** * @author qiaojiacheng */ @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket createRestApi() { return new Docket(DocumentationType.SWAGGER_2) .genericModelSubstitutes(DeferredResult.class) .select() .paths(PathSelectors.any()) .build().apiInfo(apiInfo()); } private ApiInfo apiInfo() { return new ApiInfoBuilder().title("Stream server") .description("测试SpringCloudStream") .termsOfServiceUrl("https://spring.io/projects/spring-cloud-stream") .version("1.0").build(); } }
8、启动类
package com.stream; import com.stream.api.TestChannelProcessor; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; @SpringBootApplication @EnableBinding(value= {TestChannelProcessor.class}) public class StreamDemoApplication { public static void main(String[] args) { SpringApplication.run(StreamDemoApplication.class, args); } }
访问swagger进行测试
控制台输出结果
查看rabbitmq控制台