目录
项目代码所在: https://github.com/BruceDone/web_message_queue_app
- 背景
- 准备
- 实践
- 总结
背景
某个web api项目需要将api的每次调用情况都记录下来,统一收集之后存入数据库,然后每个小时进行汇总与统计。这样方便业务部门进行数据分析,逻辑流程如下
1 2 |
用户请求 -> web api(记录请求) -> 请求返回数据 并记录本次请求 -> 业务人员汇总数据 |
从上面的看出,api业务访问数据和记录本次的访问行为其实可以完全分开,对于用户的视角,只会关注本次的请求是否OK,而对于运营人员,更多的关注是本次访问的行为结果,所以,在逻辑上我们需要将两个动作分开,如果按照早期的做法,我们大可以做如下的动作
1 2 3 |
1.http 请求 返回数据 2.记录本次http请求成功与否到数据库中 |
每次的请求都绑定了两个动作, 如果并发多了(对于http应用来说是常态),对于日志数据库来说就是一个压力了,当然,我们可以做读写分离了来减轻数据库层面的压力,但是这样的耦合度太高了,对于用户来说api请求时间也有消耗,所以这种类似的异步操作,我们可以使用消息队列完成,本次,我主要从三种常用消息队列中间件来介绍
准备
- python
- docker-compose
- python-rq
安装redis环境
使用docker-compose 起一个redis 环境
创建一个redis的文件夹,在里面写入名字docker-compose.yml
的文件,内容如下
1 2 3 4 5 6 7 8 9 10 |
version: '2' services: redis: image: redis command: redis-server --requirepass "pythonrq" restart: always ports: - "6666:6379" |
使用命令docker-compose up -d
然后使用命令docker-compose ps
我们可以看到如下的输出
1 2 3 4 |
Name Command State Ports ------------------------------------------------------------------------------- redis_redis_1 docker-entrypoint.sh redis ... Up 0.0.0.0:6666->6379/tcp |
这样我们已经使用docker-compose 起了一个redis服务了
安装python-rq
1 2 |
pip install rq |
OK,我们准备一个简单的web api 项目, 本次选用的web框架为vibora
准备web框架
1 2 |
pip install vibora |
核心代码
app_factory.py
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# -*- coding: utf-8 -*- from vibora import Vibora from vibora.router import RouterStrategy from src.views.api import bp_api from src.settings import app_config from src.log.config import logger from redis import StrictRedis from rq import Queue def create_app(): app = Vibora(router_strategy=RouterStrategy.CLONE) # app.configure_static_files() app.add_blueprint(bp_api) app.logger = logger job_queue = Queue(connection=StrictRedis.from_url(app_config.redis_conn)) app.components.add(job_queue) return app |
我们在创建web app的时候,同时创建了一个组件,加入到了整个app runtime 中
api.py文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# -*- coding: utf-8 -*- import time from vibora.responses import JsonResponse from vibora.blueprints import Blueprint from rq import Queue from src.log.config import logger from src.job.word import count_words bp_api = Blueprint() @bp_api.route('/time') async def get_time(job_queue: Queue): logger.info('get user request') job_queue.enqueue(count_words, str(time.time())) return JsonResponse({'time': str(time.time())}) |
在每次请求的时候,我们都会将一个job通过queue 放入到我们的队列系统中(redis),然后能过起rq worker
这个命令,单独起一个进程来消费我们的任务
实践
由于我在git上已经写好了compose 文件以及dockerfile,所以你在装好docker-compose
之后,在docker-compose.yml
所在的目录,使用命令 docker-compose up -d
就可以看到整个应用了
使用命令
curl 127.0.0.1:5252/time
另外开一个窗口,使用命令
docker-compose logs -f
可以看到日志的滚动
总结
本次我们从
- 为什么
- 是什么
- 怎么样
三个思维的角度来初步的接触消息队列的意义,事实上,消息队列在解耦和中间件的角色里面,有至关重要的作用,当然,引入该组件也是当前系统不能够支撑业务数据了,系统的发展也是层层递进的,不用过早的优化与设计,下次我们来看rabbit mq 与我们的web系统结合
本次的代码已经放在git 上了:https://github.com/BruceDone/web_message_queue_app