RabbitMQ入门 安装与环境配置 SpringBoot整合使用 不使用交换机 使用DirectExchange交换机 交换机和路由不会因为我们的代码删除而删除! 使用TopicExchange交换机 使用FanoutExchange交换机 生产者回调 消费者重回队列   安装与环境配置 SpringBoot整合使用 不使用交换机 使用DirectExchange交换机 交换机和路由不会因为我们的代码删除而删除! 使用TopicExchange交换机 使用FanoutExchange交换机 生产者回调 消费者重回队列

SpringBoot整合使用

不使用交换机

使用DirectExchange交换机

交换机和路由不会因为我们的代码删除而删除!

使用TopicExchange交换机

使用FanoutExchange交换机

生产者回调

消费者重回队列

 

安装与环境配置

本文为个人记录RabbitMq的学习笔记。

由于RabbitMQ是基于erlang的,所以,在正式安装RabbitMQ之前,需要先安装一下erlang。

下载安装RabbitMq

1、2两个步骤的安装包,我是在本机Windows10安装的。如果你看到这里可以百度找一下安装博客,其实就是傻瓜式下一步 

链接:https://pan.baidu.com/s/1mUyOdBKcvoW3Lv8f4y8NAQ
提取码:41fm

配置erlang和RabbitMq的环境变量

RabbitMQ入门
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列
 
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列

 RabbitMQ入门
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列
 
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列

 RabbitMQ入门
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列
 
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列

在cmd命令界面安装RabbitMq网页版控制台 rabbitmq-plugins enable rabbitmq_management

浏览器 http://localhost:15672/ 进入登录页面,账号密码都是 guest

SpringBoot整合使用

注意:当前我把生产者和消费者放到了两个项目中,是因为刚好搭建了springcloud项目,一步到位。你也可以当到同一个项目中。

需要先引入依赖,如果是多个项目则都需要引入

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
View Code

不使用交换机

package com.dang.springcloud.mq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class Send1 {

    //创建一个队列
    @Bean(value = "mq1")
    public Queue mq1() {
        return new Queue("mq1");
    }

    @Autowired
    private AmqpTemplate amqpTemplate;


    @GetMapping("g1")
    public String send() {
        //发送消息,将消息放入mq1的队列中
        amqpTemplate.convertAndSend("mq1", "我是一个字符串");
        System.out.println("1已发送消息");
        return "发送成功!";
    }

}
View Code
package com.datang.springcloud.mq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive1 {
    //    监听 mq1消息队列
    @RabbitListener(queues = "mq1")
    public void receive(String msg) {
        System.out.println("1我接受到的消息是:" + msg);
    }
}
View Code

这种写法,发送者直接将消息投递到队列中,消费者从队列获取。思考一个问题,如果我们需要将一条消息投递到多个队列是不是这么写?

package com.dang.springcloud.mq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class Send2 {

    //创建一个队列
    @Bean(value = "mq2")
    public Queue mq2() {
        return new Queue("mq2");
    }


    //创建一个队列
    @Bean(value = "mq3")
    public Queue mq3() {
        return new Queue("mq3");
    }

    @Autowired
    private AmqpTemplate amqpTemplate;


    @GetMapping("g2")
    public String send() {
        //发送消息,将消息放入mq1的队列中
        amqpTemplate.convertAndSend("mq2", "我是一个字符串");
        amqpTemplate.convertAndSend("mq3", "我是一个字符串");
        System.out.println("23已发送消息");
        return "发送成功!";
    }

}
View Code 
package com.datang.springcloud.mq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive2 {
    //    监听 mq2消息队列
    @RabbitListener(queues = "mq2")
    public void receive(String msg) {
        System.out.println("2我接受到的消息是:" + msg);
    }

    //    监听 mq3消息队列
    @RabbitListener(queues = "mq3")
    public void receive2(String msg) {
        System.out.println("3我接受到的消息是:" + msg);
    }
}
View Code

RabbitMQ在消息发送者和队列之间在抽象一个概念,交换机。消息发送者,不关心消息到底发送给谁,而是将消息投递给交换机,并且指定路由地址,交换机通过路由标记决定消息投递到哪个队列中。

使用DirectExchange交换机

package com.dang.springcloud.mq;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Configuration
@RestController
public class Send2 {

    //创建一个队列
    @Bean(value = "mq2")
    public Queue mq2() {
        return new Queue("mq2");
    }


    //创建一个队列
    @Bean(value = "mq3")
    public Queue mq3() {
        return new Queue("mq3");
    }


    //Direct交换机
    @Bean(value = "ex1")
    DirectExchange ex1() {
        return new DirectExchange("ex1");
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键
    @Bean(value = "bi1")
    Binding binding1() {
        return BindingBuilder.bind(mq2()).to(ex1()).with("k1");
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键
    @Bean(value = "bi2")
    Binding binding2() {
        return BindingBuilder.bind(mq3()).to(ex1()).with("k1");
    }



    @Autowired
    private AmqpTemplate amqpTemplate;


    @GetMapping("g2")
    public String send() {
        amqpTemplate.convertAndSend("ex1", "k1", "我是一条消息");
        System.out.println("消息已投递给交换机ex1");
        return "发送成功!";
    }

}
View Code
package com.datang.springcloud.mq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive2 {
    //    监听 mq2消息队列
    @RabbitListener(queues = "mq2")
    public void receive(String msg) {
        System.out.println("2我接受到的消息是:" + msg);
    }

    //    监听 mq3消息队列
    @RabbitListener(queues = "mq3")
    public void receive2(String msg) {
        System.out.println("3我接受到的消息是:" + msg);
    }
}
View Code

这种写法,我们还是需要创建多个队列,然后创建交换机,将队列和交换机绑定,此时我们需要填入一个路由地址。消息发送者直接将消息投递给交换机,并且指定路由。由交换机根据路由查询绑定的队列。上边代码片段,只有唯一的一个交换机,但是从这个交换机中查到了两个不同的路由,将消息投递给绑定的队列中。消费者则不改变写法。

交换机和路由不会因为我们的代码删除而删除!

RabbitMQ入门
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列
 
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列

 RabbitMQ入门
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列
 
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列

 RabbitMQ入门
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列
 
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列

 RabbitMQ入门
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列
 
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列

使用TopicExchange交换机

一下代码片段和上边的十分类似,只是使用了TopicExchange交换机。第二个绑定器使用的是 * 通配符。消息是可以被投递到mq5的。

package com.dang.springcloud.mq;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Configuration
@RestController
public class Send3 {

    //创建一个队列
    @Bean(value = "mq4")
    public Queue mq4() {
        return new Queue("mq4");
    }


    //创建一个队列
    @Bean(value = "mq5")
    public Queue mq5() {
        return new Queue("mq5");
    }


    //Topic交换机
    @Bean(value = "ex2")
    TopicExchange ex2() {
        return new TopicExchange("ex2");
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键
    @Bean(value = "bi3")
    Binding binding1() {
        return BindingBuilder.bind(mq4()).to(ex2()).with("student.age.12");
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键
    @Bean(value = "bi4")
    Binding binding2() {
        return BindingBuilder.bind(mq5()).to(ex2()).with("student.age.*");
    }



    @Autowired
    private AmqpTemplate amqpTemplate;


    @GetMapping("g3")
    public String send() {
        amqpTemplate.convertAndSend("ex2", "student.age.12", "我是一条消息");
        System.out.println("消息已投递给交换机ex2");
        return "发送成功!";
    }

}
View Code
package com.datang.springcloud.mq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive3 {
    //    监听 mq4消息队列
    @RabbitListener(queues = "mq4")
    public void receive(String msg) {
        System.out.println("4我接受到的消息是:" + msg);
    }

    //    监听 mq5消息队列
    @RabbitListener(queues = "mq5")
    public void receive2(String msg) {
        System.out.println("5我接受到的消息是:" + msg);
    }
}
View Code

 RabbitMQ入门
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列
 
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列

接下来我们删除 student.age.* 的路由,换成 student.* 这样消息就不能转发到mq5了。可见 * 只能匹配一个词。

RabbitMQ入门
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列
 
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列

删除掉所有绑定到mq5队列的路由,重新绑定。下面代码片段使用的 # 通配符,# 匹配多个词。

package com.dang.springcloud.mq;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Configuration
@RestController
public class Send3 {

    //创建一个队列
    @Bean(value = "mq4")
    public Queue mq4() {
        return new Queue("mq4");
    }


    //创建一个队列
    @Bean(value = "mq5")
    public Queue mq5() {
        return new Queue("mq5");
    }


    //Topic交换机
    @Bean(value = "ex2")
    TopicExchange ex2() {
        return new TopicExchange("ex2");
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键
    @Bean(value = "bi3")
    Binding binding1() {
        return BindingBuilder.bind(mq4()).to(ex2()).with("student.age.12");
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键
    @Bean(value = "bi4")
    Binding binding2() {
        return BindingBuilder.bind(mq5()).to(ex2()).with("student.#");
    }



    @Autowired
    private AmqpTemplate amqpTemplate;


    @GetMapping("g3")
    public String send() {
        amqpTemplate.convertAndSend("ex2", "student.age.12", "我是一条消息");
        System.out.println("消息已投递给交换机ex2");
        return "发送成功!";
    }

}
View Code

RabbitMQ入门
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列
 
安装与环境配置
SpringBoot整合使用
不使用交换机
使用DirectExchange交换机
交换机和路由不会因为我们的代码删除而删除!
使用TopicExchange交换机
使用FanoutExchange交换机
生产者回调
消费者重回队列

使用FanoutExchange交换机

FanoutExchange没有路由的概念,到像是点对点的直接发送。

package com.dang.springcloud.mq;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Configuration
@RestController
public class Send4 {

    //创建一个队列
    @Bean(value = "mq6")
    public Queue mq6() {
        return new Queue("mq6");
    }


    //创建一个队列
    @Bean(value = "mq7")
    public Queue mq7() {
        return new Queue("mq7");
    }


    //Fanout交换机
    @Bean(value = "ex3")
    FanoutExchange ex3() {
        return new FanoutExchange("ex3");
    }

    //绑定 将队列和交换机绑定
    @Bean(value = "bi5")
    Binding binding1() {
        return BindingBuilder.bind(mq6()).to(ex3());
    }

    //绑定 将队列和交换机绑定
    @Bean(value = "bi6")
    Binding binding2() {
        return BindingBuilder.bind(mq7()).to(ex3());
    }


    @Autowired
    private AmqpTemplate amqpTemplate;


    @GetMapping("g4")
    public String send() {
        //此处第二个路由参数必须给 null,否则不能成功将消息投递到队列
        amqpTemplate.convertAndSend("ex3", null,"我是一条消息");
        System.out.println("消息已投递给交换机ex3");
        return "发送成功!";
    }

}
View Code
package com.datang.springcloud.mq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive4 {
    //    监听 mq6消息队列
    @RabbitListener(queues = "mq6")
    public void receive(String msg) {
        System.out.println("6我接受到的消息是:" + msg);
    }

    //    监听 mq7消息队列
    @RabbitListener(queues = "mq7")
    public void receive2(String msg) {
        System.out.println("7我接受到的消息是:" + msg);
    }
}
View Code

生产者回调

在Spring配置文件中配置如下

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
View Code

注意看回调函数的写法

package com.dang.springcloud.mq;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Configuration
public class Send5 {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("confirm-----"+correlationData);
                System.out.println("confirm-----"+b);
                System.out.println("confirm-----"+s);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                System.out.println("returnedMessage-----"+message);
                System.out.println("returnedMessage-----"+s);
                System.out.println("returnedMessage-----"+s1);
                System.out.println("returnedMessage-----"+s2);
            }
        });

        return rabbitTemplate;
    }

    //创建一个队列
    @Bean(value = "mq8")
    public Queue mq8() {
        return new Queue("mq8");
    }


    //Topic交换机
    @Bean(value = "ex4")
    TopicExchange ex4() {
        return new TopicExchange("ex4");
    }

    //绑定 将队列和交换机绑定, 并设置用于匹配键
    @Bean(value = "bi7")
    Binding binding1() {
        return BindingBuilder.bind(mq8()).to(ex4()).with("HaHaHa");
    }

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @GetMapping(value = "g5")
    public String g5() {
        rabbitTemplate.convertAndSend("ex4", "HaHaHa", "我是一条消息");
        System.out.println("消息已投递给交换机ex4");
        return "发送成功";
    }

}
View Code

回调结果

交换机不对
confirm-----null
confirm-----false
confirm-----channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'ex' in vhost '/', class-id=60, method-id=40)


找不到路由
returnedMessage-----(Body:'我是一条消息' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
returnedMessage-----NO_ROUTE
returnedMessage-----ex4
returnedMessage-----HaHaHaww
confirm-----null
confirm-----true
confirm-----null

成功
confirm-----null
confirm-----true
confirm-----null
View Code

消费者重回队列

package com.datang.springcloud.mq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class Receive5 {
    //    监听 mq8消息队列
    @RabbitListener(queues = "mq8")
    public void receive(String msg, Message message, Channel channel) throws IOException {
        try {
            System.out.println("8我接受到的消息是:" + msg);
            int a = 1 / 0;
            // 第一个参数为队列的ID,第二个参数批处理,手动提交比当前ID小的。如果这个队列为正确的,那就可以把之前的提交
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            System.out.println("消费消息确认" + message.getMessageProperties().getConsumerQueue() + ",接收到了回调方法");
        } catch (Exception e) {
            //其实重发也没多大意义,一般都是做个日志,或者其他补偿。
            System.out.println("尝试重发:" + message.getMessageProperties().getConsumerQueue());
            //前两个参数和 basicAck 一样,最后一个为是否重回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);


        }
    }
}
View Code