Work queues工作队列模式
- 不需要设置交换机(使用默认的交换机)
- 简单模式和工作队列模式都是点对点模式。其他的都是发布与订阅模式。
- 点对点就是一个消息只能被一个消费者消费。消费成功就删除了。
- 工作队列模式对比简单模式,多个消费者共同消费同一个队列中的消息。
- 工作队列模式的应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。(消费者项目多部署几台服务)

- 简单模式-生产者代码demo:
public class Producer {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址
connectionFactory.setHost("192.168.137.130");
//连接端口;默认为 5672
connectionFactory.setPort(5672);
//虚拟主机名称;默认为 /
connectionFactory.setVirtualHost("/");
//连接用户名;默认为guest
connectionFactory.setUsername("admin");
//连接密码;默认为guest
connectionFactory.setPassword("123456");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
/**
* queue 参数1:队列名称
* durable 参数2:是否定义持久化队列,当mq重启之后,还在
* exclusive 参数3:是否独占本次连接
* ① 是否独占,只能有一个消费者监听这个队列
* ② 当connection关闭时,是否删除队列
* autoDelete 参数4:是否在不使用的时候自动删除队列,当没有consumer时,自动删除
* arguments 参数5:队列其它参数
*/
channel.queueDeclare("simple_queue", true, false, false, null);
// 要发送的信息
String message = "你好;小兔子!";
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:配置信息
* 参数4:消息内容
*/
channel.basicPublish("", "simple_queue", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.137.130");//ip
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/");//虚拟机 默认值/
factory.setUsername("admin");//用户名
factory.setPassword("123456");//密码
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("simple_queue",true,false,false,null);
// 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息,true是自动确认,手动确认需要在上面回调函数里加channel.basicAck(设置确认标签,是否一次性确认签收所有消息);
3. callback:回调对象
*/
// 消费者类似一个监听程序,主要是用来监听消息。
channel.basicConsume("simple_queue",true,consumer);
//关闭信道和连接。如果不关闭,就会处于堵塞状态,对应的队列有了消息,可以立即进行消费。
//channel.close();
//connection.close();
}
}
- 工作队列模式-生产者代码和消费者代码其实跟简单模式差不多。只是复制了几份消费者代码执行。