异步任务(Celery)详解 一、背景 二、原理 三、实现 四、实操 五、总结 六、坑

在开发中,我们可能经常会遇到一些需要执行时间很长的任务,如果放在前端,会让用户一直卡在那儿等待或者一直转圈圈,体验非常不好。为了改善这种体验,我赶紧上网搜索,果然,前人早已有解决办法了。那就是异步。在Django中,我们可以使用celery异步框架,我们可以把耗时的任务扔到后台,而前端给用户立即返回,待用户需要查看结果时,点击查看即可,并且可以随时看到任务执行的状态。

二、原理

Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。它是Python写的库,但是它实现的通讯协议也可以使用ruby,php,javascript等调用。异步任务除了消息队列的后台执行的方式,还是一种则是定时计划任务。

Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。它的架构组成如下图 

异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

组件:

1、任务(tasks)--用户定义的函数,用于实现用户的功能,比如执行一个耗时很长的任务

2、中间介(Broker)--用于存放tasks的地方,但是这个中间介需要解决一个问题,就是可能需要存放非常非常多的tasks,而且要保证Worker能够从这里拿取

3、执行者(Worker)--用于执行tasks,也就是真正调用我们在tasks中定义的函数

4、存储(Backend)--把执行tasks返回的结果进行存储,以供用户查看或调用

 

三、实现

1、各模块功能

Celery中,以上组件具体功能如下:

任务模块 Task

包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。

消息中间件 Broker

Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

任务执行单元 Worker

Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。

任务结果存储 Backend

Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。

2、实现步骤

使用 Celery 实现异步任务主要包含三个步骤:

  • 创建一个 Celery 实例 
  • 启动 Celery Worker 
  • 应用程序调用异步任务

3、操作流程

既然我们已经知道原理和实现步骤,那么就简单了,开搞吧。以下步骤基本上是按照celery官网最佳实践来操作的。

相关链接:http://docs.jinkan.org/docs/celery/django/first-steps-with-django.html

 

a、环境安装(RabbitMQ/Redis、Celery、django-celery、flower)

b、创建工程(工程:tcelery、应用:app01)

异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

请注意:这个工程目录是适合于大的工程,小的工程可以直接把tasks放在celery.py文件中。我们大多数tasks都是位于app中,而且app一般不止一个,基本上都会有多个。

 

 c、新建文件

celery下面需要修改的文件:celery.py、__init__.py、settings文件

app01下面需要修改的文件:tasks.py文件

 

d、修改过程

1、修改settings文件,新增如下配置:

import djcelery    #导入包
djcelery.setup_loader() #加载tasks
BROKER_URL = 'redis://127.0.0.1:6379/0'  #指定broker
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' #指定结果存储位置为本地数据库
#CELERY_RESULT_BACKEND = 'redis://' #指定结果存储位置为redis
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'  #指定计划任务为本地数据库配置的
#CELERY_RESULT_BACKEND = 'redis://'  #指定结果存放位置

  

2、__init__.py文件

#绝对导入,以免celery和标准库中的celery模块冲突
from __future__ import absolute_import

#以下导入时为了确保在Django启动时加载app,shared_task在app中会使用到
from .celery import app as celery_app

  

3、celery文件

from __future__ import absolute_import,unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "tcelery.settings")  #设置celery可以在命令行中使用
app = Celery('tcelery', backend='amqp://guest@localhost//', broker='redis://localhost:6379/0')  #创建app实例,并指定backend和broker均为rabbitMQ
#app = Celery('tcelery', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
app.conf.CELERY_IGNORE_RESULT = False   #结果不忽略
#app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' #结果保存在redis中

app.config_from_object('django.conf:settings')  #从文件中加载实例
app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)  #自动加载tasks,注意:他会去app下面查找tasks.py文件,所以我们必须将task放在tasks.py文件中

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

  

4、tasks.py

from tcelery import celery_app

@celery_app.task
def test(x,y):
    return x+y

  

5、settings文件

注意:前面settings文件已经修改过,这里再次提到,是需要把app和django-celery注册进入app

INSTALLED_APPS = (
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    "app01", 
    "djcelery",
)

  

以上配置修改完成后,我们按照如下方式启动django、woker、flower。

 异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

 

异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

 

 异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

 

4、验证

a、命令行调用

异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

 

 b、woker执行

异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

 

c、backend保存结果

异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

 

d、flower结果查看

异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

同样,我们也可以将结果保存在redis里面。

 

四、实操

1、效果

上面已经知道了原理和使用,那么下面就来进行实操吧,实操会让你感受celery的真正使用场景。

场景:模拟后台执行一个耗时的任务(一个加法的任务),然后通过前端查询执行结果。

效果:

模拟一个加法的任务,用户点击“运行”后,我们把这个任务放到后台运行,通过sleep(10)来模拟耗时任务,然后通过点击“查看任务”查看执行的情况。

异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

 

异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

再次查看执行情况:

异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

 

2、配置

 基本配置上面已经具备了,下面只说修改的几个地方:

tasks.py

@celery_app.task
def test(x,y):
    """
    通过sleep来模拟需要执行很长时间的任务。
    :param x:
    :param y:
    :return:
    """
    sleep(10)
    return x+y

  

 views.py文件

#coding:utf-8
from django.shortcuts import render,HttpResponse,render_to_response
from models import Add
from .tasks import test,get_task_status
import datetime
import redis
import json
import time
# Create your views here.

def index(request):
    return  render_to_response('index.html')

def add_1(request):
    try:
        first = int(request.GET.get('first'))
    except:
        first = 0
    try:
        second =int(request.GET.get('second'))
    except:
        second = 0
    result = test.apply_async(args=(first,second))
    dd = Add(task_id=result.id,first=first,second=second,log_date=datetime.datetime.now())
    dd.save()
    return render_to_response('index.html')

# 任务结果
def results(request):
    #查询所有的任务信息
    start_time = time.time()
    new_result = {}
    rt_list = []
    rows = Add.objects.all()
    for r in rows:
        status,result = get_status_id(r.task_id)
        new_result["task_id"] = r.task_id
        new_result["first"] = r.first
        new_result["second"] = r.second
        new_result["log_date"] = r.log_date
        new_result["status"] = status
        new_result["result"] = result
        rt_list.append(new_result)
        new_result = {}
    end_time = time.time()
    rt = end_time - start_time
    print rt
    return render_to_response('result.html',{'rows':rt_list})


def get_status_id(task_id):
    """
    :param task_id:
    :return:
    坑:host填写主机名时,会耗时非常多,可以通过time获取,大概一次要1s
    task测试:这里
    """
    pool = redis.ConnectionPool(host='127.0.0.1',port=6379,db=0)
    r = redis.Redis(connection_pool=pool)
    task_id = 'celery-task-meta-'+task_id
    #start_time = time.time()
    try:
        status = json.loads(r.get(task_id)).get("status")
        result = json.loads(r.get(task_id)).get("result")
    except:
        status = 'Executing...'
        result = 0
    #end_time = time.time()
    #print 'time:%s' %(end_time-start_time)
    print status,result
    return status,result

  

 

五、总结

从原理和实现过程来看,celery的设计非常优秀,尤其是各模块的耦合,比如broker我们既可以使用redis、也可以使用rabbitMQ。

异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

backend也一样支持很多种方式。

异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

六、坑

1、redis执行时间慢

在本次试验的过程中遇到一个坑,通过python连接redis的时候,刚开始使用的是主机名:

pool = redis.ConnectionPool(host='localhost',port=6379,db=0)

  发现redis执行时间非常常,查询一条记录需要1s左右,查了好久没找到原因。

异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

后来把主机名改为ip后,发现非常快:

异步任务(Celery)详解
一、背景
二、原理
三、实现
四、实操
五、总结
六、坑

 

更多详细内容请参阅celery官网。

http://docs.jinkan.org/docs/celery/index.html

2、celery:Unrecoverable error: AttributeError("'unicode' object has no attribute 'iteritems')

由于项目使用的django版本比较老,python2.7+django1.7下使用celery异步处理耗时请求。

错误提示:Unrecoverable error: AttributeError("'unicode' object has no attribute 'iteritems')

在*中发现了解决办法,地址:stack overflowcelery-github
问题的症结是redis的版本号为3.0以上,导致celery将其作为消息中间件的时候出现问题,给出的解决方案是安装3.0以下的redis版本。这里我们安装redis==2.10.6

pip install redis==2.10.6