python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务
在之前的一篇文章中提到了用Django+Celery+Redis实现了异步任务队列,只不过消息中间件使用了redis,redis作为消息中间件可谓是差强人意,功能和性能上都不如Rabbitmq,所以本次使用tornado框架结合celery,同时消息中间件使用Rabbitmq来实现异步发邮件,并且使用flower来监控任务队列。
首先安装rabbitmq
Mac os直接运行brew命令安装
#安装服务 brew install rabbitmq #启动服务 brew services start rabbitmq
Win10系统就要下载安装包进行安装了,由于rabbitmq是基于erlang的,所以要首先安装erlang
1、首先,下载并运行Erlang for Windows 安装程序 (地址:http://www.erlang.org/downloads)下载完毕并安装(注 意:安装目录请选择默认目录)
2、下载 RabbitMQ,(地址:http://www.rabbitmq.com/download.html )(注意:安装目录请选择默认目录)
安装成功后,启用web管理UI,进入RabbitMQ Serverrabbitmq_server-3.6.6sbin,输入命令rabbitmq-plugins enable rabbitmq_management
在系统的开始菜单里找到RabbitMQ的启动菜单,启动服务
浏览器输入,http://localhost:15672/,使用默认用户guest/guest进入网页端控制台:
代表没有问题了
然后安装tornado和celery,注意指定版本号
pip3 install tornado==5.1.1 pip3 install celery ==3.1 pip3 install pika ==0.9.14 pip3 install tornado-celery pip3 install flower
需要注意一点,由于python3.7中async已经作为关键字存在,但是有的三方库还没有及时修正,导致它们自己声明的变量和系统关键字重名,所以我们要深入三方库的源码,帮他们修改async关键字为async_my,需要修改的文件夹和文件包含但不限于:
/site-packages/pika/adapters/libev_connection.py
/site-packages/celery下面的文件
/site-packages/kombu下面的文件夹
在tornado项目下新建一个任务队列文件task.py:
import time from celery import Celery from func_tool import mail C_FORCE_ROOT=True celery = Celery("tasks", broker="amqp://guest:guest@localhost:5672") celery.conf.CELERY_RESULT_BACKEND = "amqp" @celery.task def sleep(seconds): time.sleep(float(seconds)) return seconds @celery.task def sendmail(title,text,tomail): mail(title,text,tomail) return '发送邮件成功' if __name__ == "__main__": celery.start()
然后编写服务端代码:
from celery import Celery from tornado import gen import tcelery sys.path.append("..") import task #异步任务 class CeleryHandler(BaseHandler): @gen.coroutine def get(self): response = yield gen.Task(task.sendmail.apply_async,args=['你好','非常好','164850527@qq.com']) self.write('ok') self.finish()
路由器代码:
import tornado.web from views import Index import config #路由 class Application(tornado.web.Application): def __init__(self): handlers = [ (r"/celery", Index.CeleryHandler) ] super(Application,self).__init__(handlers,**config.setting)
程序入口代码server.py:
import tornado.ioloop import tornado.httpserver import config from application import Application if __name__ == "__main__": print('启动...') app = Application() httpServer = tornado.httpserver.HTTPServer(app) # httpServer.listen(8888) #绑定端口 httpServer.bind(config.options['port']) #开启5个子进程(默认1,若为None或者小于0,开启对应硬件的CPU核心数个子进程) httpServer.start(1) tornado.ioloop.IOLoop.current().start()
进入项目目录,分别启动tornado服务,celery服务,以及flower服务
python server.py celery -A task worker --loglevel=info celery flower -A task --broker=amqp://guest:guest@localhost:5672//
访问网址http://localhost:8000/celery 用来触发异步任务
后台服务显示任务返回值:
进入flower在线任务监控网址:http://localhost:5555/
至此,整个流程就走完了。
-
Next Post利用基于Python的Pelican打造一个自己的个人纯静态网站
-
Previous Post用户认证(Authentication)进化之路:由Basic Auth到Oauth2再到jwt