RabbitMQ学习笔记

常识

  • 开发语言是erlang,安装的话要先安装语言环境
  • 中间件要考虑的:
    1. 持久化
    2. 高可用(集群,主从)
  • kafka性能最高
  • 名词
    1. Broker / RabbitMQ Server / Message Broker 接受和分发消息的应用
    2. Exchange 一个交换机可以绑定多个队列
    3. Queue 交换机和队列在Broker里
    4. Producer
    5. Consumer
    6. connection c/s之间的一个tcp/ip链接
    7. Channel 一个链接有多个信道,信道是为了减少链接开销,是一种逻辑链接
    8. vhost 一个vhost里有交换机和队列

AMQP协议

协议是在 tcp/ip协议基础之上约定的,tcp/ip过于简单,就是在tcp/ip上做个封装(http也不符合需求)

  • MQTT协议
  • OpenMana
  • Kafka协议

消息持久化

支持文件存储(自己定义的文件存储格式来做持久化)

消息分发机制

RabbitMQ kafka
发布订阅
轮训分发(公平)
公平分发(能者多劳)
重发
消息拉取

简单队列模式

getChannel

factory = new connectionFactory()
factory  -> setHost  setUsername  setPassword
Connection conn = factory.newConn()
Channel chan = conn.newChan()

生产者:

  1. 哪个队列declQueue
chan = getChan()
chan.declQueue(queue_name,isdurable,exclusive消息共享,autoDelete自动删除,arguments其他参数)
Message = "hello"
chan.publish(exchange哪个交换机,queue_name 路由key,props 其他参数,message.getBytes()消息体)

消费者:

  1. 哪个队列
  2. 消费成功是否自动应答
  3. 成功和不成功的回调
chan = getChan()
chan.Consume(queue,true,declconsume,sucess)

工作队列模式 Working Queues

多个工作线程处理一个消息队列(一个消息只能处理一次),采用轮询分发

一样的,多开几个消费线程就是

消息应答

消费者把消息处理完了之后,给队列一个应答,然后队列才删除消息

  1. 自动应答
    只要他接到消息立马给应答,但是其实后续还有很多处理
    所以在高吞吐量的情况下还是可能丢失消息
  2. 手动应答
    可以批量应答multiple(跟网络那个批量确认差不多,建议为false)
    basicAck肯定
void deliverMessage(consumeTag, message){
    // do something .....
    message.getbody()
    // over
    chan.basicAck(message.getEnvelop(),message.getTag(),multiple=false)
}
chan.consume(autoAck=false,deliverMessage)

basicNack否定
basicReject否定

重新入队

如果没有ack,队列知道消息丢了,然后消息重新入队,给另一个消费者,总之就是要保证消息不要丢失

持久化

  1. 队列持久化
  2. 消息持久化
channel.basicPublish(exchange,queue_name,messageProperties.PERSISTENT,message.getBytes)

负载均衡

公平分发

能者多劳,在消费者那里设置这个,谁能消费就把消息给谁

channel.basicQos(perfectCount=1)

预取值

相当于权值,相当于设置堆积到信道上的消息有多少条

channel.basicQos(perfectCount=x)

发布确认

消息持久化了之后再给生产者发消息确认

单个

同步的,发一个确认一个,上一个没确认下一个也不发。缺点明显,就是慢

while(true){
    channel.publish()
    bool flag = chan.waitForConfirms()
    if(flag){
        printf("sucess")
    }
}

批量

仍然是同步的,但是一旦出问题无法确认是具体哪条消息出问题(没有被确认)

int batchSize = 100
for (i:1~1000) {
    chan.publish()
    if(i%batchSize==0){
        chan.waitForConfirms()
    }
}

异步批量

producer只需要发消息就够了,然后是异步的,broker会对消息处理

chan.confirmSelect()
ConcurrentSkipListMap<int,String> confirms
//准备消息监听器
ackCallback(){
    //收到了,删除掉已经确认的
    confirms.delete()
}
nackcallback(){
    //没收到,做些操作,比如放回队列
}
chan.addConfirmListener(ackCallback,nackCallback) //这是多线程
//直接
for (i : message) {
    chan.publish(message)
    confirms.put(num,message)
}

交换机

消息不会直接发送给队列,只能发送给交换机,交换机拿到消息,通过RoutingKey来决定把消息路由到哪个队列里
exchange -> RoutingKey -> Queue

临时队列

不带有持久化,队列名称让服务器来给我们随机出来,断开连接之后,临时队列自动销毁

string queue_name = chan.declQueue().getQueue()

Fanout

广播,交换机把消息广播到所有队列

chan = getChan()
chan.exchangeDecl(exchange_name, exchange_type="fanout")
chan.queueDecl().getQueue()
chan.queueBind(queue_name,exchange_name,routingKey)

chan.publish(exchange_name,routing_key,props,)

Direct

就是绑定的RoutingKey不一样,可以多重绑定啥的

chan = getChan()
chan.exchangeDecl(exchange_name, exchange_type="fanout")
chan.queueDecl(queue_name1,callback)
chan.queueDecl(queue_name2,callback)
chan.queueBind(queue_name1,exchange_name,routingKey)
chan.queueBind(queue_name2,exchange_name,routingKey)

chan.publish(exchange_name,routing_key,props,)

Topic

类似于正则表达式

死信

消息在消息队列里,但是出于某些原因无法被消费者消费,为了防止这些消息过期,需要把他们放到死信队列里

  • 来源
  1. 消息TTL过期
  2. 队列满了,加不了新消息
  3. basic.reject,basic.nack requeue=false
  • dead_exchange和dead_queue
chan = getChan()
chan.exchangeDel(normal, type=direct)
chan.exchangeDel(dead, type=direct)
map<string, object> arguments = new hashMap  //参数设置
arguments.put("x-dead-letter-exchange",dead_exchange)  //第一个参数是固定的
arguments.put("x-dead-letter-routing-key","routing_key") 
chan.queueDecl(normal_queue_name,durable,exclusive,autoDelete,arguments=arguments)

延迟队列

结构和死信的ttl状态很像,就是normal_queue没有消费者

  • 使用场景
    1. 某个事件发生后一段时间,某个事件发生前一段时间 需要做某件事
    2. 订单十分钟内未支付,注册成功三天没登录,预定会议前10分提醒
arguments.put("x-dead-letter-exchange",dead_exchange)  //第一个参数是固定的,指定死信队列
arguments.put("x-dead-letter-routing-key","routing_key")
arguments.put("x-message-ttl",4000)  //第一个参数是固定的