celery rabbit mq 详解
Celery介绍和基本使用
Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:
1)你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。
2)你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福
Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis,后面会讲
1.1 Celery有以下优点:
- 简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
- 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
- 快速:一个单进程的celery每分钟可处理上百万个任务
- 灵活: 几乎celery的各个组件都可以被扩展及自定制
- Celery包含如下组件:
- Celery Beat:任务调度器,Beat进程会读取配置文件的内容周期性地将配置中到期需要执行的任务发送给任务队列。
- Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
- Broker:消息代理,或者叫做消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。
- Producer:调用Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
- Result Backend:任务处理完后保存状态信息和结果,以供查询。celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。
执行依靠
celery call 发送任务给这个 -->celery组件--->发送给rabbitmq,交给---->worker1|worker2
- 环境的部署
安装celery模块:pip install celery
基础代码worker端
from celery import Celery app = Celery('tasks',broker='redis://:alex3714@192.168.14.52') #‘tasks’给app起的名字 backend='redis://alex3714@localhost')#将结果写入到哪 @app.task def add(x,y): print("running...",x,y) return x+y @app.task def cmd(cmd_str): print('running cmd',cmd_str)
执行这个脚本: celery -A celery_test -l debug #当执行完这条命令后就会进入监听状态
#celery_test不用写后缀名, 指定模式。
调度端执行,会形成异步执行
from celery_test import add,cmd add(4,5) #这么执行的话只是本地显示结果。 add.delay(45,2) #让worker端执行 t1 = add.delay(45,2)当你赋值的时候worker端还会在执行一次 t1.get() #获取执行的结果
celery -A celery_test worker -l info #在开一个客户端。
#这次我们测试这个任务是怎么分发的。 t2 = add.delay(22,44) t2 = add.delay(22,44) t2 = add.delay(22,44) t2 = add.delay(22,44) #当你执行的很快的时候你会发现这个任务和rabbit MQ一样是抢任务的。 from celery import Celery app = Celery('tasks',broker='redis://:alex3714@192.168.14.52') #‘tasks’给app起的名字 backend='redis://alex3714@localhost')#将结果写入到哪 @app.task def add(x,y): print("running...",x,y) return x+y @app.task def cmd(cmd_str): print('running cmd',cmd_str) time.sleep(10) return time.time()
调度端
from celery_test import add,cmd t1 = cmd.delay('dsfsdf') t1.get() #这时你会发现程序是阻塞的,并没有实现异步的效果 #当然这个解决也是非常简单的 t1.get(timeout=1,propa) #这时你会发现提示你任务没有完成 t1.ready() #可以查看任务没有没有完成,返回布尔值的状态
基础测试代码celery_test.py测试端:
from celery import Celery app = Celery('proj',include=['proj.tasks']) #app是Celery类的实例,创建的时候添加了proj.tasks这个模块,也就是包含了proj/tasks.py这个文件。 app.config_from_object('proj.celeryconfig')把Celery配置存放进proj/celeryconfig.py文件,使用app.config_from_object加载配置。 if __name__ == '__main__' app.start()
存放任务函数的文件tasks.py
from __future__ import absolute_import from proj.celery import app @app.task #让这个任务生效的装饰器 def add(x, y): return x + y
配置文件celeryconfig.py
BROKER_URL = 'amqp://dongwm:123456@localhost:5672/web_develop' # 使用RabbitMQ作为消息代理 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任务结果存在了Redis CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案 CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显 CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型
启动上面的代码 celery -A celery_test -l info #-A指定人物名,不需要后缀,-l指定模式
开启另外一个终端,用IPython调用add函数:
from celery_test import add r = add.delay(1,3) #在任务端执行一次 r #执行第二次 r.result #返回结果 r.status #返回状态 r.successful() #去执行结果 r.backend #保存到redis中 通过IPython触发的任务就完成了。任务的结果都需要根据上面提到的task_id获得,我们还可以用如下两种方式随时找到这个结果: task_id = '93288a00-94ee-4727-b815-53dc3474cf3f' In : add.AsyncResult(task_id).get() Out: 4 或者: In : from celery.result import AsyncResult In : AsyncResult(task_id).get() Out: 4
指定队列
Celery非常容易设置和运行,通常它会使用默认的名为celery的队列(可以通过CELERY_DEFAULT_QUEUE修改)用来存放任务。我们可以使用优先级不同的队列来确保高优先级的任务不需要等待就得到响应。
基于proj目录下的源码,我们创建一个projq目录,并对projq/celeryconfig.py添加如下配置:
from kombu import Queue CELERY_QUEUES = ( # 定义任务队列 Queue('default', routing_key='task.#'), # 路由键以“task.”开头的消息都进default队列 Queue('web_tasks', routing_key='web.#'), # 路由键以“web.”开头的消息都进web_tasks队列 ) CELERY_DEFAULT_EXCHANGE = 'tasks' # 默认的交换机名字为tasks CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' # 默认的交换类型是topic CELERY_DEFAULT_ROUTING_KEY = 'task.default' # 默认的路由键是task.default,这个路由键符合上面的default队列 CELERY_ROUTES = { 'projq.test.add': { # tasks.add的消息会进入web_tasks队列 'queue': 'web_tasks', 'routing_key': 'web.add', } }