[经验] python下的消息队列选择-rq
目录
项目代码所在: https://github.com/BruceDone/web_message_queue_app
- 背景
- 准备
- 实践
- 总结
背景
某个web api项目需要将api的每次调用情况都记录下来,统一收集之后存入数据库,然后每个小时进行汇总与统计。这样方便业务部门进行数据分析,逻辑流程如下
用户请求 -> web api(记录请求) -> 请求返回数据 并记录本次请求 -> 业务人员汇总数据
从上面的看出,api业务访问数据和记录本次的访问行为其实可以完全分开,对于用户的视角,只会关注本次的请求是否OK,而对于运营人员,更多的关注是本次访问的行为结果,所以,在逻辑上我们需要将两个动作分开,如果按照早期的做法,我们大可以做如下的动作
1.http 请求 返回数据
2.记录本次http请求成功与否到数据库中
每次的请求都绑定了两个动作, 如果并发多了(对于http应用来说是常态),对于日志数据库来说就是一个压力了,当然,我们可以做读写分离了来减轻数据库层面的压力,但是这样的耦合度太高了,对于用户来说api请求时间也有消耗,所以这种类似的异步操作,我们可以使用消息队列完成,本次,我主要从三种常用消息队列中间件来介绍
准备
- python
- docker-compose
- python-rq
安装redis环境
使用docker-compose 起一个redis 环境
创建一个redis的文件夹,在里面写入名字docker-compose.yml
的文件,内容如下
version: '2'
services:
redis:
image: redis
command: redis-server --requirepass "pythonrq"
restart: always
ports:
- "6666:6379"
使用命令docker-compose up -d
然后使用命令docker-compose ps
我们可以看到如下的输出
Name Command State Ports
-------------------------------------------------------------------------------
redis_redis_1 docker-entrypoint.sh redis ... Up 0.0.0.0:6666->6379/tcp
这样我们已经使用docker-compose 起了一个redis服务了
安装python-rq
pip install rq
OK,我们准备一个简单的web api 项目, 本次选用的web框架为vibora
准备web框架
pip install vibora
核心代码
app_factory.py
代码
# -*- coding: utf-8 -*-
from vibora import Vibora
from vibora.router import RouterStrategy
from src.views.api import bp_api
from src.settings import app_config
from src.log.config import logger
from redis import StrictRedis
from rq import Queue
def create_app():
app = Vibora(router_strategy=RouterStrategy.CLONE)
# app.configure_static_files()
app.add_blueprint(bp_api)
app.logger = logger
job_queue = Queue(connection=StrictRedis.from_url(app_config.redis_conn))
app.components.add(job_queue)
return app
我们在创建web app的时候,同时创建了一个组件,加入到了整个app runtime 中
api.py文件
# -*- coding: utf-8 -*-
import time
from vibora.responses import JsonResponse
from vibora.blueprints import Blueprint
from rq import Queue
from src.log.config import logger
from src.job.word import count_words
bp_api = Blueprint()
@bp_api.route('/time')
async def get_time(job_queue: Queue):
logger.info('get user request')
job_queue.enqueue(count_words, str(time.time()))
return JsonResponse({'time': str(time.time())})
在每次请求的时候,我们都会将一个job通过queue 放入到我们的队列系统中(redis),然后能过起rq worker
这个命令,单独起一个进程来消费我们的任务
实践
由于我在git上已经写好了compose 文件以及dockerfile,所以你在装好docker-compose
之后,在docker-compose.yml
所在的目录,使用命令 docker-compose up -d
就可以看到整个应用了
使用命令
curl 127.0.0.1:5252/time
另外开一个窗口,使用命令
docker-compose logs -f
可以看到日志的滚动
总结
本次我们从
- 为什么
- 是什么
- 怎么样
三个思维的角度来初步的接触消息队列的意义,事实上,消息队列在解耦和中间件的角色里面,有至关重要的作用,当然,引入该组件也是当前系统不能够支撑业务数据了,系统的发展也是层层递进的,不用过早的优化与设计,下次我们来看rabbit mq 与我们的web系统结合
本次的代码已经放在git 上了:https://github.com/BruceDone/web_message_queue_app
- 原文作者:大鱼
- 原文链接:https://brucedone.com/archives/1178/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议. 进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。