[dj]分布式任务队列-celery实战

celery用于异步处理耗时任务

celery特性

方便查看定时任务的执行情况, 如 是否成功, 当前状态, 执行任务花费的时间等.
使用功能齐备的管理后台或命令行添加,更新,删除任务.
方便把任务和配置管理相关联.
可选 多进程, Eventlet 和 Gevent 三种模型并发执行.
提供错误处理机制.
提供多种任务原语, 方便实现任务分组,拆分,和调用链.
支持多种消息代理和存储后端.
Celery 是语言无关的.它提供了python 等常见语言的接口支持.

脚本命令效果是, 执行命令,返回此次任务的id,任务放后台执行不用等待.

在web中效果是: 提交任务, 刷新页面, 任务正在执行中...
	我们希望
		获取任务执行状态
		获取任务执行结果
		暂停任务
		终止(取消)(撤销)任务
- 耗时调用
	用户注册(提高并发, 发邮件)
	执行linux耗时脚本等

- 定时调用
	到多久后执行

- 周期调用

celery组件

[dj]分布式任务队列-celery实战

user:    用户通过执行脚本/页面点击投递任务到broker, 让woker处理
beats:   定时投递任务到broker, 让woker处理
broker:  消息队列, 支持redis or rabbitmq
worker:  处理耗时任务(干活的)
Backend: 结果存储 ,支持redis or rabbitmq or mongodb, mysql等

工作流: betats/用户投递任务->broker->worker->backend

注: 把mq里的信息称为: message
任务(函数+参数)提交 deplay/apply_sync执行了: 普通任务
任务(函数+参数)不想立刻执行, 先签名保存, 作为子任务被被任务编排画布调用: 称为 signature


测试耗时任务

- 下面版本linux和windows兼容.
pip install redis==2.10.5
pip install celery==3.1.25

docker run -d -p 5672:5672 rabbitmq
docker run -d -p 6379:6379 redis
docker run -d -p 11211:11211 memcached
docker run -d -p 27017:27017 mongo

- 更新的版本(仅linux)
pip install -U "celery[redis]"
  • 测试耗时/周期任务
    参考
# tasks.py

from celery import Celery
from celery.schedules import crontab
import time

from celery.task import periodic_task

app = Celery('tasks', broker='redis://localhost:6379/0', backend="redis://localhost:6379/1", )


# 异步任务
@app.task
def send_mail(email):
    print("send mail to ", email)
    time.sleep(15)
    return "[send_mail]: exec success"


# 定时任务(每分钟投递一次任务)
@periodic_task(run_every=crontab())
def some_task():
    print('periodic task test!!!!!')
    time.sleep(5)
    print('success')
    return "[periodic_task]some_task: exec succ!!"
# 用户投递任务到worker
	耗时任务提交很快
	不管worker是否在线,都可以随意提交任务
	
	deplay函数是apply_async快捷方式

# user.py
from tasks import send_mail

def register():
    import time
    start = time.time()
    print("1. 插入记录到数据库")
    print("2. celery 帮我发邮件")
    send_mail.delay("xx@gmail.com")
    print("3. 告诉用户注册成功")
    print("耗时:%s 秒 " % (time.time() - start))

if __name__ == '__main__':
    register()


for i in `seq 1 20`;do python3 user.py;done
# beats周期投递任务到worker

celery -A tasks beat --loglevel=info
# 启动worker.  
	-A --app=APP    Celery instance所在py文件名. 
	会将tasks.py @app.task装饰的任务全部加载

export C_FORCE_ROOT='true'
celery -A tasks worker --loglevel=info  

# 可以看到积压的任务会陆续被worker执行

子任务和任务流

设计任务流

有时我们并不想简单的将任务发送到队列中,我们想将一个任务函数(由参数和执行选项组成)作为一个参数传递给另外一个函数中,
为了实现此目标,Celery使用一种叫做signatures的东西。

普通函数, 加括号, 带参数, 就执行了. 或提交给broker异步执行.
但有时需要将这些任务编排, 先把每个子任务 函数+参数保存起来(签名), 编排好后, 按照画布上的设计流程执行即可.

signature()包含了以下参数:
	任务调用的 arguments(参数,即任务本身的参数,像add(x,y)中的参数)
	keyword arguments(关键字参数,就是debug=false,true这类参数)
	execution options(执行选项,比如运行时间countdown,到期时间expirt)。
# 创建任务函数
@celery_app.task
def my_task1(a, b):
    print("任务函数(my_task1)正在执行....")
    return a + b

# 创建签名并编排
from celery import group
from celery import signature

t1 = signature(my_task1,args=(1, 2),countdown=1)

my_group = group(t1,t2,t3)

//signature的快捷方式 .s
result = chain(add.s(2, 2), add.s(4), add.s(8))() 

画布(canvas): 设计Celery的工作流(任务编排)

celerycanvas.py
官文参考
celery之Chains, Groups, Chords, Map & Starmap, Chunks
子任务支持如下 5 种原语,实现工作流. 原语表示由若干指令组成的, 用于完成一定功能的过程.

celery -A myproject worker -l info
	-A:指定 celery 实例所在哪个模块中
	-l: loglevel

celery -A myproject beat -l info
	beta周期性的向任务队列中放入任务(是依赖worker去处理的)

如下: 调用task装饰的func.apply_async就是异步执行,或者调用send_task。

app = Celery("task", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1")

@app.task
def add(x, y):
    return x + y

@app.task
def tsum(numbers):
    return sum(numbers)

@app.task
def trange(limit):
    return range(limit)

result = add.apply_async((2, 2), link=add.s(16))
  • (Chains)串行调用: 可以将signature任务按照顺序执行,前一个任务的输出是后一个任务的输入,结果是最后一个signature任务的输出。
from celery import chain
result = chain(add.s(2, 2), add.s(4), add.s(8))() # 2 + 2 + 4 + 8
result.get() # 16
result.parent.parent.graph
# cd959635-47d2-4368-bdf1-ab969f9ce0e4(1)
#      6681c6b6-bc34-44b4-8c9f-7ad132ffa5f3(0)
# 6681c6b6-bc34-44b4-8c9f-7ad132ffa5f3(0)
# 87eac0e5-1f1b-4c0d-a27a-dbf7e7ccd925(2)
#      cd959635-47d2-4368-bdf1-ab969f9ce0e4(1)
#           6681c6b6-bc34-44b4-8c9f-7ad132ffa5f3(0)
  • (Groups)并行调用:可以让signature并行执行,返回的结果是所有signature任务返回结果组成的数组。
from celery import group
result = group(add.s(2, 2), add.s(4, 4), add.s(8, 8))() # 2 + 2, 4 + 4, 8 + 8
result.get() # [14, 8, 16]
  • (Chords)先并行,后串行: 把并行signature任务的结果列表输入到串行调用,进行汇总,是reduce过程。
from celery import group, chain, chord
result = chain(group(add.s(2, 2), add.s(4, 4), add.s(8, 8)), tsum.s())() # sum([2 + 2, 4 + 4, 8 + 8])
result.get() # 28
result = chord((add.s(2, 2), add.s(4, 4), add.s(8, 8)), tsum.s())()
result.get() # 28
  • (Map)对并行调用的结果各自汇总
~tsum.map([range(2), range(2), range(2)]) # [1, 1, 1]
result = chain(trange.s(2), group(tsum.s(), tsum.s(), tsum.s()))()
result.get() # [1, 1, 1]
  • (Starmap)对并行调用的结果各自汇总,汇总参数是tuple,相当于*args
~add.starmap(zip(range(10), range(10))) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
  • (chunks)对大任务进行分割,分成小块执行,提高性能,将结果收集成列表。
result = add.chunks(zip(range(100), range(100)), 10)()
result.get()
#[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
# [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
# [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
# [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
# [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
# [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
# [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
# [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
# [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
# [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]

配置和监控

flower:监控和远程操作celery

celery的配置
celery的配置高低版本对比

Celery 监控和管理向导

celery -A tasks shell  #调试, 类似django management.py shell

signal: 任务执行进度查看

# 
@app.task
def Task_A(message):
    # 这里就是在更新进度
    Task_A.update_state(state='PROGRESS', meta={'progress': 0})
    time.sleep(10)
    Task_A.update_state(state='PROGRESS', meta={'progress': 30})
    time.sleep(10)
    return message


# 查看进度
def get_task_status(task_id):
    task = Task_A.AsyncResult(task_id)

    status = task.state
    progress = 0
    
    if status == u'SUCCESS':
        progress = 100
    elif status == u'FAILURE':
        progress = 0
    elif status == 'PROGRESS':
        progress = task.info['progress']

    return {'status': status, 'progress': progress}
r = xxx.delay()
r.ready()     # 查看任务状态,返回布尔值,  任务执行完成, 返回 True, 否则返回 False.
r.get(timeout=1) # 获取任务执行结果,可以设置等待时间
r.result      # 任务执行结果.
r.state       # PENDING, START, SUCCESS,任务当前的状态
r.status      # PENDING, START, SUCCESS,任务当前的状态
r.successful  # 任务成功返回true
r.traceback   # 如果任务抛出了一个异常,你也可以获取原始的回溯信息
r.wait()      # 等待任务完成, 返回任务执行结果,很少使用;
from task import add,test_mes
import sys

def pm(body):
    res = body.get('result')
    if body.get('status') == 'PROGRESS':
        sys.stdout.write('
任务进度: {0}%'.format(res.get('p')))
        sys.stdout.flush()
    else:
        print '
'
        print res
r = test_mes.delay()
print r.get(on_message=pm, propagate=False)
  • 方法3: signal回调方式获取状态

信号可以帮助我们了解任务执行情况, 分析任务运行的瓶颈. Celery 支持 7 种信号类型.
任务状态回调

钩子函数 状态值 说明
before_task_publish - -
after_task_publish - -
task_prerun PENDING 任务等待中
task_postrun STARTED 任务已开始
task_retry RETRY 任务将被重试
task_success SUCCESS 任务执行成功
task_failure FAILURE 任务执行失败
task_revoked REVOKED 任务取消

每个任务的声明周期都会触发

@after_task_publish.connect
def task_sent_handler(sender=None, body=None, **kwargs):
    print("hello world")
    print('after_task_publish for task id {body[id]}'.format(
        body=body,
    ))

终止任务

from celery.task.control import inspect
inspect().active() # 对应celery命令也可以查询,在监控那章节

这将列出正在处理的活动任务的列表。 你可以在那里获得任务的id 。 一旦获得了任务的id,就可以通过使用以下终止任务

from celery.task.control import revoke
revoke(task_id, terminate=True)