如何告诉芹菜工作者停止接受任务?如何检查芹菜工作者任务是否正在运行?
问题描述:
场景:
- 在由Python/Flask Web应用程序和使用Celery进行的后台任务组成的服务器上运行的系统
- Web应用程序和celery工作者都作为新贵工作运行(Nginx后面的Web应用程序)
-
使用以下脚本完成生产部署:
- System running on a server consisting of a Python/Flask web application and background tasks using Celery
- Both web application and celery workers are run as upstart jobs (Web app behind Nginx)
Deployment to production is done with a script that:
- 停止暴发户
- 将代码推送到服务器
- 运行任何数据库迁移
- 开始新贵工作
如何增强部署脚本,以便执行以下操作?
How can I enhance the deployment script so it does the following?:
- 告诉芹菜工人停止接受任务
- 等待直到当前正在运行的芹菜任务完成
- 停止暴发户
- 将代码推送到服务器
- 运行任何数据库迁移
- 开始新贵工作
- Tell the celery worker to stop accepting tasks
- Wait until any currently running celery tasks are finished
- Stop the upstart jobs
- Push code to server
- Run any db migrations
- Start the upstart jobs
答
作为部署的一部分运行的以下脚本解决了该问题:
The following script, run as part of the deployment solved the problem:
import time
from celery.app.control import Control
from myapp.tasks import celery # my application's Celery app
if __name__ == "__main__":
control = Control(celery)
control.cancel_consumer("celery") # queue name, must probably be specified once per queue, but my app uses a single queue
inspect = control.inspect()
while True:
active = inspect.active()
running_jobs = []
for key, value in active.items():
running_jobs.extend(value)
if len(running_jobs) > 0:
print("{} jobs running: {}".format(len(running_jobs), ", ".join(job["name"] for job in running_jobs)))
time.sleep(10)
else:
print("No running jobs")
break