Python与RabbitMQ交互
RabbitMQ 消息队列
成熟的中间件RabbitMQ、ZeroMQ、ActiveMQ等等
RabbitMQ使用erlang语言开发,使用RabbitMQ前要安装erlang语言
RabbitMQ允许不同应用、程序间交互数据
python中的Threading queue只能允许单进程内多线程交互的
python中的MultiProcessing queue只能允许父进程与子进程或同父进程的多个子进程交互
RabbitMQ启动:
1.windows中默认安装成功,在服务列表中会显示自动启动
2.Linux中使用命令rabbitmq-server start
RabbitMQ支持不同的语言,对于不同语言有相应的模块,这些模式支持使用开发语言连接RabbitMQ
Python连接RabbitMQ模块有:
1.pika主流模块
2.Celery分布式消息队列
3.Haigha提供了一个简单的使用客户端库来与AMQP代理进行交互的方法
使用RabbitMQ前,首先阅读开始文档: http://www.rabbitmq.com/getstarted.html
简单的发送接收实例
默认情况下,使用同一队列的进程,接收消息方使用轮询的方式,依次获取消息
对于一条消息的接收来说,只有当接收方收到消息,并处理完消息,给RabbitMQ发送ack,队列中的消息才会删除
如果在处理的过程中socket断开,那么消息自动转接到下一个接收方
producer.py
__author__ = 'Cq' import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) #声明一个管道 channel = connection.channel() #声明queue,这个队列在RabbitMQ中生成,发送方和接收方使用同一个队列 channel.queue_declare(queue='hello2', durable=True) #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='hello2',#队列名称 body='Hello World!', properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ) )#body消息内容 print(" [x] Sent 'Hello World!'") connection.close()
consumer.py
__author__ = 'Cq' import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. #发送方和接收方不知道谁首先连接到RabbitMQ,双方连接上来都先声明一个队列 channel.queue_declare(queue='hello2', durable=True) def callback(ch, method, properties, body): print("recived message...") # time.sleep(30) print(" [x] Received %r" % body) #处理完成消息后,主动要向RabbitMQ发送ack ch.basic_ack(delivery_tag=method.delivery_tag) #ch --> 管道内存对象的地址 #method --> 指定各种参数 #properties --> #python3 socket等发送网络包都是byte格式 #如果队列里还有1条消息未处理完,将不能接收新的消息 channel.basic_qos(prefetch_count=1) #声明接收收消息变量 channel.basic_consume(callback,#收到消息后执行的回调函数 queue='hello2',) #no_ack=True)#执行完callback函数后,默认会发送ack给RabbitMQ print(' [*] Waiting for messages. To exit press CTRL+C') #开始接收消息,不停循环接收,没有消息挂起等待 channel.start_consuming()