rabbitmq 安装及python调用 简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式) 什么是消息队列(MQ) 为什么要用消息队列? RabbitMQ

rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ

MQ全称为Message Queue 消息队列(MQ)是一种应用程序对应用程序的通信方法。MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。这样发布者和使用者都不用知道对方的存在。

首先看下队列

rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ

import queue

q = queue.Queue(maxsize=10) # 最多存放10个,默认FIFO

q.put(111)
q.put(222)
q.put(333)

print(q.get())
print(q.get())
print(q.get())
print(q.get())  # 会等待一直到有值
# print(q.get(block=False))  # 取不到值会报错

为什么要用消息队列?

       消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。

       流量削峰指的是当访问量过大,放访问放入队列中,依次排队执行。

       接下来利用一个外卖系统的消息推送给大家解释下MQ的意义。

rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ

RabbitMQ

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。而且两端可以使用不同的语言编写,大大提供了灵活性。

安装

1.安装erlang

rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ一路下一步,选择安装路径

添加到电脑环境变量中

rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ

 进入cmd,输入erl,进入erlang代码编辑就代表安装成功

rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ

 2.安装RabbitMQ

rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ

RabbitMQ Service默认是自动勾选中的,这里我们取消勾选。不需要添加入系统服务中

 rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ

 配置环境变量

rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ

 进入cmd,输入命令rabbitmq-plugins enable rabbitmq_management,这样就可以添加可视化插件了

 启动rabbitmq

 在cmd下启动,输入rabbitmq-server

在浏览器中输入地址

http://localhost:15672/

rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ用户名 guest,密码 guest

关闭    关闭当前rabbitmq服务的cmd窗口即可

python调用rabbitmq

安装pika

pip install pika

简单模式

rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ

生产者:
  1 链接rabbitmq
  2 创建队列
  3 向指定的队列插入数据
消费者
  1 链接rabbitmq
  2 监听模式
  3 确定回调函数

# producer.py    生产者
import pika

# 1 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2 创建队列
channel.queue_declare(queue='hello')

# 3 向指定队列插入数据
channel.basic_publish(exchange='',  # 简单模式,可以设置交换机模式
                      routing_key='hello',  # 指定队列
                      body='Hello Yuan!')  # 向rabbitmq发送的内容

print(" [x] Sent 'Hello Yuan!'")
# consumer.py  消费者
import pika

# 1 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2 创建队列
channel.queue_declare(queue='hello')

# 3 向指定队列插入数据
channel.basic_publish(exchange='',  # 简单模式,可以设置交换机模式
                      routing_key='hello',  # 指定队列
                      body='Hello Yuan!')  # 向rabbitmq发送的内容

print(" [x] Sent 'Hello Yuan!'")

参数使用

应答参数

auto_ack为True代表默认应答,如果取走就会删除对应数据。如果发生消费者取走后代码还没处理报错,消费者再次重启但是数据已经丢失

把auto_ack改为False代表手动应答,如果消费者取走数据也不会删除对应数据。
回调函数中配合ch.basic_ack(delivery_tag=method.deliver_tag)删除对应数据,可以在做完处理后加入该语句
# producer.py
import pika

# 1 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2 创建队列
channel.queue_declare(queue='hello')

# 3 向指定队列插入数据
channel.basic_publish(exchange='',  # 简单模式,可以设置交换机模式
                      routing_key='hello',  # 指定队列
                      body='Hello Yuan!')  # 向rabbitmq发送的内容

print(" [x] Sent 'Hello Yuan!'")
# consumer.py
'''
auto_ack为True代表默认应答,如果取走就会删除对应数据。如果发生消费者取走后代码还没处理报错,消费者再次重启但是数据已经丢失

把auto_ack改为False代表手动应答,如果消费者取走数据也不会删除对应数据。
回调函数中配合ch.basic_ack(delivery_tag=method.deliver_tag)删除对应数据,可以在做完处理后加入该语句
'''
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建队列(因为不知道是生成者还是消费者先执行,所以也创建个队列,谁先运行谁先创建)
channel.queue_declare(queue='hello')

# 确定回调函数
def callback(ch,method,properties,body): # body是拿到的数据
    print(' [x] Received %r' % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 确定监听队列参数(并没有真的监听,只是确定参数)
channel.basic_consume(queue='hello',    # 确定监听的队列
                      auto_ack=False,    # 默认应答改为手动应答
                      on_message_callback=callback)

print(' [x] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   # 开始监听,会hang住

持久化参数

当生产者把数据放入队列中,生产者还没取数据,rabbitmq崩了,会导致数据丢失,需要把数据持久化,存到硬盘上
# producer.py
'''
当生产者把数据放入队列中,生产者还没取数据,rabbitmq崩了,会导致数据丢失,需要把数据持久化
'''
import pika

# 1 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2 创建可持久化队列(注意队列一旦确定是持久化就是持久化,非持久化就是非持久化,不能变更)
channel.queue_declare(queue='hello3',durable=True)

# 3 向指定队列插入数据(向可持久队列插入数据,可以通过properties参数设定插入的数据是否要持久化,不加properties参数就是非持久化数据)
channel.basic_publish(exchange='',  # 简单模式,可以设置交换机模式
                      routing_key='hello3',  # 指定队列
                      body='Hello Alex!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      )
                      )  # 向rabbitmq发送的内容

print(" [x] Sent 'Hello Alex!'")
# consumer.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建队列(消费者也别忘了设置可持久化队列)
channel.queue_declare(queue='hello3',durable=True)

# 确定回调函数
def callback(ch,method,properties,body): # body是拿到的数据
    print(' [x] Received %r' % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 确定监听队列参数(并没有真的监听,只是确定参数)
channel.basic_consume(queue='hello3',    # 确定监听的队列
                      auto_ack=False,    # 默认应答改为手动应答
                      on_message_callback=callback)

print(' [x] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   # 开始监听,会hang住

分发参数(公平分发)

当生产者把数据放入队列中,有多个生产者。默认会采取轮询的分发方式,假如a生产者启动,b生产者再启动,c生产者最后启动。
那么每次生产者放入一个数据,会按照a,b,c,a,b...的消费者启动顺序依次分发

问题是:假如消费者a启动,再启动b。a消费者处理很慢,b消费者处理很快。会出现,a拿到,b拿到。然后a还没处理忘,b完成了。数据又要给a,只能等a处理完
这种情况下,最好不要采用轮询分发,采用公平分发

改为公平分发模式(只要在消费者中添加 channel.basic_qos(prefetch_count=1) , 生产者代码不变 )
# producer.py
'''
当生产者把数据放入队列中,有多个生产者。默认会采取轮询的分发方式,假如a生产者启动,b生产者再启动,c生产者最后启动。
那么每次生产者放入一个数据,会按照a,b,c,a,b...的消费者启动顺序依次分发

问题是:假如消费者a启动,再启动b。a消费者处理很慢,b消费者处理很快。会出现,a拿到,b拿到。然后a还没处理忘,b完成了。数据又要给a,只能等a处理完
这种情况下,最好不要采用轮询分发,采用公平分发
'''
import pika

# 1 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2 创建队列
channel.queue_declare(queue='hello4')

# 3 向指定队列插入数据(向可持久队列插入数据,可以通过properties参数设定插入的数据是否要持久化,不加properties参数就是非持久化数据)
channel.basic_publish(exchange='',  # 简单模式,可以设置交换机模式
                      routing_key='hello4',  # 指定队列
                      body='Hello 111',
                      )  # 向rabbitmq发送的内容

print(" [x] Sent 'Hello Alex!'")
# consumer.py
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='hello4')

# 确定回调函数
def callback(ch,method,properties,body): # body是拿到的数据
    time.sleep(3)   # 可以多开几个设定不同的时间,来测试效果
    print(' [x] Received %r' % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 公平分发
channel.basic_qos(prefetch_count=1)

# 确定监听队列参数(并没有真的监听,只是确定参数)
channel.basic_consume(queue='hello4',    # 确定监听的队列
                      auto_ack=False,    # 默认应答改为手动应答
                      on_message_callback=callback)

print(' [x] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   # 开始监听,会hang住

交换机模式

发布订阅

生产者创建一个交换机,每个消费者创建一个队列并绑定交换机。当生产者发布消息进交换机,而交换机又是发布订阅模式,它会向所有绑定它的队列发送一份数据。

rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ

 例如外卖系统,订单信息发给骑士系统,商家系统,后台系统

rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ

 发布订阅模式代码

# producer.py
import pika

# 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'
))
channel = connection.channel()

# 声明一个名为logs,类型为fanout的交换机
channel.exchange_declare(exchange='logs',   # 交换机的名字
                         exchange_type='fanout') # fanout:发布订阅模式参数

# 向logs交换机插入数据"info: Hello World!"
message = 'info: Hello World!'
channel.basic_publish(exchange='logs',  # 名为logs的交换机
                      routing_key='',
                      body=message)     # 要插入的数据

print(" [x] Sent %r" % message)
connection.close()

订阅者代码相比于简单模式,多了开头声明(创建交换机),创建队列名字唯一,绑定队列到指定交换机上

import pika

# 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 声明一个名为logs,类型为fanout的交换机(当消费者先启动,没有交换机时,提前创建交换机,和简单模型里创建队列目的相同)
channel.exchange_declare(exchange='logs',   # 交换机的名字
                         exchange_type='fanout') # fanout:发布订阅模式参数

# 创建队列
result = channel.queue_declare('',exclusive=True)   # exclusive为True,系统创建一个唯一的名字
queue_name = result.method.queue    # 获取创建队列的名字
print(queue_name)

# 将制定队列绑定到交换机上
channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [x] Waiting for messages. To exit press CTRL+C')

def callback(ch,method,properties,body):
    print(' [x] Received %r' % body)

channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

channel.start_consuming()   # 开始监听,会hang住

关键字匹配

 rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ

用于日志系统

 案例代码

# producer.py
import pika

# 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'
))
channel = connection.channel()

# 声明一个名为logs,类型为fanout的交换机
channel.exchange_declare(exchange='logs2',   # 交换机的名字
                         exchange_type='direct') # 关键字模式

# 向logs交换机插入数据"info: Hello World!"
message = 'error: Hello World!'
channel.basic_publish(exchange='logs2',  # 名为logs的交换机
                      routing_key='error', # 发送消息的关键字,如果和消费者绑定交换机的关键字相同,消费者就会收到
                      body=message)     # 要插入的数据

print(" [x] Sent %r" % message)
connection.close()
# consumer.py
import pika

# 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 声明一个名为logs,类型为fanout的交换机(当消费者先启动,没有交换机时,提前创建交换机,和简单模型里创建队列目的相同)
channel.exchange_declare(exchange='logs2',   # 交换机的名字
                         exchange_type='direct') # 交换机模式  关键字模式

# 创建队列
result = channel.queue_declare('',exclusive=True)   # exclusive为True,系统创建一个唯一的名字
queue_name = result.method.queue    # 获取创建队列的名字
print(queue_name)

# 将制定队列绑定到交换机上,绑定关键字,如果要绑定多个关键字要重复绑定语句,可以用for循环
channel.queue_bind(exchange='logs2',
                   queue=queue_name,
                   routing_key='error') # 绑定关键字

channel.queue_bind(exchange='logs2',
                   queue=queue_name,
                   routing_key='info') # 绑定关键字

channel.queue_bind(exchange='logs2',
                   queue=queue_name,
                   routing_key='warning') # 绑定关键字

print(' [x] Waiting for messages. To exit press CTRL+C')

def callback(ch,method,properties,body):
    print(' [x] Received %r' % body)

channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

channel.start_consuming()   # 开始监听,会hang住

通配符模式

在关键字绑定的基础上,可以多关键字进行模糊匹配。类似正则匹配,但是只有#和。#匹配一个或多个单词,匹配一个词

rabbitmq  安装及python调用    简单模式,参数使用(应答参数, 持久化参数, 分发参数), 交换机模式(发布订阅, 关键字匹配, 模糊匹配模式)
什么是消息队列(MQ)
为什么要用消息队列?
RabbitMQ

# producer.py
import pika

# 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'
))
channel = connection.channel()

# 声明一个名为logs,类型为fanout的交换机
channel.exchange_declare(exchange='logs3',   # 交换机的名字
                         exchange_type='topic') # 通配符模式

# 向logs交换机插入数据"info: Hello World!"
message = 'usa.weather....'
channel.basic_publish(exchange='logs3',  # 名为logs的交换机
                      routing_key='usa.weather',
                      body=message)     # 要插入的数据

print(" [x] Sent %r" % message)
connection.close()
# consumer.py
import pika

# 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 声明一个名为logs,类型为fanout的交换机(当消费者先启动,没有交换机时,提前创建交换机,和简单模型里创建队列目的相同)
channel.exchange_declare(exchange='logs3',   # 交换机的名字
                         exchange_type='topic') # 交换机模式  通配符模式

# 创建队列
result = channel.queue_declare('',exclusive=True)   # exclusive为True,系统创建一个唯一的名字
queue_name = result.method.queue    # 获取创建队列的名字
print(queue_name)

# 将制定队列绑定到交换机上,绑定关键字,如果要绑定多个关键字要重复绑定语句,可以用for循环
channel.queue_bind(exchange='logs3',
                   queue=queue_name,
                   routing_key='usa.#') # 绑定关键字

print(' [x] Waiting for messages. To exit press CTRL+C')

def callback(ch,method,properties,body):
    print(' [x] Received %r' % body)

channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

channel.start_consuming()   # 开始监听,会hang住