使用spring-rabbit插件实现RabbitMQ讯息的发送和接收
本文不介绍AMQP和RabbitMQ的基础知识,请参考链接: http://www.ostest.cn/archives/497 ,介绍的非常详细。
本文主要通过一个小的demo,来举例说明如何使用spring-rabbit插件来实现RabbitMQ消息的发送和接收,发送端称为生产者,接收端称为消费者。
1. 给pom.xml文件中添加rabbitmq相关依赖
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring.version>3.1.1.RELEASE</spring.version> <spring.rabbit.version>1.3.5.RELEASE</spring.rabbit.version> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>${spring.rabbit.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId> <version>2.2.2</version> </dependency> </dependencies>上述protobuf-java依赖用于序列化和反序列化RabbitMQ的消息。
2. 生产者的xml配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties" /> <!-- 使用annotation 自动注册bean,并保证@Required,@Autowired的属性被注入 --> <context:component-scan base-package="com.tracy" /> <!-- 创建rabbit ConnectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}" /> <!-- 创建RabbitAdmin,用来管理exchange、queue、bindings --> <rabbit:admin id="containerAdmin" connection-factory="connectionFactory" /> <!-- 指定protobuf为消息队列格式 --> <bean id="protoMessageConverter" class="com.tracy.rabbitmq.converter.ProtobufMessageConverter"></bean> <!-- 创建发送消息模板auditTemplate --> <rabbit:template id="auditTemplate" connection-factory="connectionFactory" exchange="${rabbitmq.exchange}" routing-key="${rabbitmq.routingKey}" message-converter="protoMessageConverter" /> </beans>
上述配置文件中,<rabbit:template>中的exchange声明将消息发送到名为ui_ex_test的交换器,routing-key指定消息应当路由到名为audit的队列,message-converter指定使用protobuf作为数据的交换格式。
连接RabbitMQ服务器的相关信息放到了rabbitmq.properties文件中,此文件位于src/main/resources的根目录下,具体内容为:
rabbitmq.host = 10.0.3.123 rabbitmq.username = guest rabbitmq.password = guest rabbitmq.exchange = ui_ex_test rabbitmq.routingKey = audit
3. 生产者和消费者共用的proto格式约定
有关于protobuf的介绍,请参考本人的上一篇博文,地址http://tracywen.iteye.com/blog/2106402
person_msg.proto文件内容为:
package com.tracy.rabbitmq.proto; option java_package = "com.tracy.rabbitmq.proto"; option java_outer_classname = "PersonMsgProtos"; message Person { // ID(必需) required int32 id = 1; // 姓名(必需) required string name = 2; // email(可选) optional string email = 3; // 朋友(集合) repeated string friends = 4; }
4. 生产者主函数
package com.tracy.server; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.tracy.rabbitmq.proto.PersonMsgProtos; /** * 发送消息主函数 * * @author tracy_cui * */ public class Sender { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-sender.xml"); RabbitTemplate template = (RabbitTemplate) context.getBean("auditTemplate"); // 按照定义的Proto结构,创建一个Person PersonMsgProtos.Person.Builder personBuilder = PersonMsgProtos.Person.newBuilder(); personBuilder.setId(1); personBuilder.setName("tracy"); personBuilder.setEmail("tracy_cui@xxx.com"); personBuilder.addFriends("wang"); personBuilder.addFriends("yang"); PersonMsgProtos.Person person = personBuilder.build(); // 将该Java对象发送给rabbit:template绑定的message-converter template.convertAndSend(person); } }
5. 消息格式转换插件protobuf messageconverter
此插件由生产者和消费者公用,createMessage由生产者调用,convertProto2Object由消费者调用
package com.tracy.rabbitmq.converter; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractMessageConverter; import org.springframework.amqp.support.converter.MessageConversionException; import com.google.protobuf.InvalidProtocolBufferException; import com.tracy.rabbitmq.proto.PersonMsgProtos; /** * * ProtoBuf & object格式转换 * * @author tracy_cui * */ public class ProtobufMessageConverter extends AbstractMessageConverter{ /** * object转换为ProtoBuf, 发送消息 */ @Override public Message createMessage(Object object, MessageProperties messageProperties) { System.out.println("发送转换的消息"); PersonMsgProtos.Person person = (PersonMsgProtos.Person)object; byte[] byteArray = person.toByteArray(); Message message = new Message(byteArray, messageProperties); return message; } @Override public Object fromMessage(Message message) throws MessageConversionException { return null; } /** * ProtoBuf转换为object, 接收消息 */ public Object convertProto2Object(Message message) throws InvalidProtocolBufferException{ byte[] byteArray = message.getBody(); PersonMsgProtos.Person parsePerson = PersonMsgProtos.Person.parseFrom(byteArray); return parsePerson; } }
6. 消费者的xml配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd" > <context:property-placeholder location="classpath:rabbitmq.properties" /> <!-- 使用annotation 自动注册bean,并保证@Required,@Autowired的属性被注入 --> <context:component-scan base-package="com.tracy" /> <!-- 创建rabbit ConnectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}" /> <!-- 创建RabbitAdmin,用来管理exchange、queue、bindings --> <rabbit:admin id="containerAdmin" connection-factory="connectionFactory" /> <!-- 声明队列 --> <rabbit:queue name="audit_queue" durable="false" exclusive="false" auto-delete="false" auto-declare="true"/> <!-- 声明direct类型的交换器 --> <rabbit:direct-exchange name="${rabbitmq.exchange}" durable="false" auto-delete="false" auto-declare="true"> <!-- 将交换器与队列、路由key绑定 --> <rabbit:bindings> <rabbit:binding queue="audit_queue" key="${rabbitmq.routingKey}"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- 声明两个监听器 --> <bean id="auditListenerOne" class="com.tracy.rabbitmq.listener.AuditListenerOne" /> <bean id="auditListenerTwo" class="com.tracy.rabbitmq.listener.AuditListenerTwo" /> <!-- 指定protobuf为消息队列格式 --> <bean id="protoMessageConverter" class="com.tracy.rabbitmq.converter.ProtobufMessageConverter"></bean> <!-- 将两个监听器绑定到声明的队列中 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" prefetch="1" message-converter="protoMessageConverter"> <rabbit:listener ref="auditListenerOne" queue-names="audit_queue" /> <rabbit:listener ref="auditListenerTwo" queue-names="audit_queue" /> </rabbit:listener-container> <!-- 创建spring线程池,多线程对收到的数据进行处理 --> <task:executor id="MessageQueue-Executor" pool-size="2-5" queue-capacity="50" rejection-policy="CALLER_RUNS" keep-alive="2000"/> <task:annotation-driven executor="MessageQueue-Executor"/> </beans>
上述配置文件中,rabbit连接工厂的定义与生产者一致,消费者的配置与生产者的配置区别在于以下几点:
a. 定义了名称为audit_queue的队列,声明队列的作用是消费exchange中的消息;
b. 声明交换器,并与队列、路由key绑定,即将exchange收到的消息发送到bindkey=audit的队列中;
c. 声明了两个监听器,用于监听audit_queue中的消息;
d. 使用spring线程池对收到的数据进行处理。
7. 消费者的监听器
package com.tracy.rabbitmq.listener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.beans.factory.annotation.Autowired; /** * 监听rabbitMQ消息 * * @author tracy_cui * */ public class AuditListenerOne implements MessageListener{ private static final Logger logger = LoggerFactory.getLogger(AuditListenerOne.class); @Autowired private AuditListenerHandler auditListenerHandler; public AuditListenerOne() { logger.info("[****************] MessageQueue waiting for messages..."); } @Override public void onMessage(Message message) { try { auditListenerHandler.handleMessage(message); } catch (Exception e) { e.printStackTrace(); } } }
8. 消费者对收到的数据进行处理
package com.tracy.rabbitmq.listener; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import com.tracy.rabbitmq.converter.ProtobufMessageConverter; import com.tracy.rabbitmq.proto.PersonMsgProtos; import com.google.protobuf.InvalidProtocolBufferException; /** * Message处理 * * @author tracy_cui * */ @Component public class AuditListenerHandler { private static final Logger logger = LoggerFactory.getLogger(AuditListenerHandler.class); @Autowired private ProtobufMessageConverter messageConverter; /** * 使用Spring线程池 */ @Async public void handleMessage(Message message) throws Exception{ logger.info("[****************] handleMessage thread : " + Thread.currentThread().getName()); PersonMsgProtos.Person person = this.convertMessage(message); if(person != null){ System.out.println("id : " + person.getId()); System.out.println("name : " + person.getName()); System.out.println("email : " + person.getEmail()); List<String> friendLists = person.getFriendsList(); for(String friend : friendLists){ System.out.println("friend :" + friend); } } } /** * 将ProtoBuf转换为Entity */ private PersonMsgProtos.Person convertMessage(Message message){ PersonMsgProtos.Person person = null; try { Object object = messageConverter.convertProto2Object(message); if(object instanceof PersonMsgProtos.Person){ person = (PersonMsgProtos.Person)object; }else{ logger.warn("[****************] object is not a instance of CreativeAuditProtos.ui_audit_t"); } } catch (InvalidProtocolBufferException e) { logger.warn("[****************] convert message error, InvalidProtocolBuffer"); e.printStackTrace(); } return person; } }
消费者收到的数据截图:
本项目完整代码已使用git托管,地址:https://coding.net/u/tracywen/p/RabbitMQ/git