目录

项目代码所在: https://github.com/BruceDone/web_message_queue_app

  • 背景
  • 准备
  • 实践
  • 总结

背景

某个web api项目需要将api的每次调用情况都记录下来,统一收集之后存入数据库,然后每个小时进行汇总与统计。这样方便业务部门进行数据分析,逻辑流程如下

用户请求 -> web api(记录请求) -> 请求返回数据 并记录本次请求 -> 业务人员汇总数据  

从上面的看出,api业务访问数据和记录本次的访问行为其实可以完全分开,对于用户的视角,只会关注本次的请求是否OK,而对于运营人员,更多的关注是本次访问的行为结果,所以,在逻辑上我们需要将两个动作分开,如果按照早期的做法,我们大可以做如下的动作

1.http 请求 返回数据
2.记录本次http请求成功与否到数据库中

每次的请求都绑定了两个动作, 如果并发多了(对于http应用来说是常态),对于日志数据库来说就是一个压力了,当然,我们可以做读写分离了来减轻数据库层面的压力,但是这样的耦合度太高了,对于用户来说api请求时间也有消耗,所以这种类似的异步操作,我们可以使用消息队列完成,本次,我主要从三种常用消息队列中间件来介绍

准备

  • python
  • docker-compose
  • python-rq

安装redis环境

使用docker-compose 起一个redis 环境

创建一个redis的文件夹,在里面写入名字docker-compose.yml 的文件,内容如下

    version: '2'
    
    services:
      redis:
        image: redis
            command: redis-server --requirepass "pythonrq"
        restart: always
        ports:
          - "6666:6379"

使用命令docker-compose up -d

然后使用命令docker-compose ps我们可以看到如下的输出

        Name                   Command               State           Ports
    -------------------------------------------------------------------------------
    redis_redis_1   docker-entrypoint.sh redis ...   Up      0.0.0.0:6666->6379/tcp

这样我们已经使用docker-compose 起了一个redis服务了

安装python-rq

    pip install rq

OK,我们准备一个简单的web api 项目, 本次选用的web框架为vibora

准备web框架

    pip install vibora

核心代码

app_factory.py 代码

# -*- coding: utf-8 -*-
from vibora import Vibora
from vibora.router import RouterStrategy
from src.views.api import bp_api

from src.settings import app_config
from src.log.config import logger
from redis import StrictRedis
from rq import Queue


def create_app():
    app = Vibora(router_strategy=RouterStrategy.CLONE)
    # app.configure_static_files()
    app.add_blueprint(bp_api)
    app.logger = logger

    job_queue = Queue(connection=StrictRedis.from_url(app_config.redis_conn))
    app.components.add(job_queue)

    return app

我们在创建web app的时候,同时创建了一个组件,加入到了整个app runtime 中

api.py文件

# -*- coding: utf-8 -*-
import time

from vibora.responses import JsonResponse
from vibora.blueprints import Blueprint
from rq import Queue

from src.log.config import logger
from src.job.word import count_words

bp_api = Blueprint()


@bp_api.route('/time')
async def get_time(job_queue: Queue):
    logger.info('get user request')

    job_queue.enqueue(count_words, str(time.time()))
    return JsonResponse({'time': str(time.time())})

在每次请求的时候,我们都会将一个job通过queue 放入到我们的队列系统中(redis),然后能过起rq worker这个命令,单独起一个进程来消费我们的任务

实践

由于我在git上已经写好了compose 文件以及dockerfile,所以你在装好docker-compose之后,在docker-compose.yml所在的目录,使用命令 docker-compose up -d就可以看到整个应用了

使用命令
curl 127.0.0.1:5252/time
另外开一个窗口,使用命令

docker-compose logs -f
可以看到日志的滚动

总结

本次我们从

  • 为什么
  • 是什么
  • 怎么样

三个思维的角度来初步的接触消息队列的意义,事实上,消息队列在解耦和中间件的角色里面,有至关重要的作用,当然,引入该组件也是当前系统不能够支撑业务数据了,系统的发展也是层层递进的,不用过早的优化与设计,下次我们来看rabbit mq 与我们的web系统结合

本次的代码已经放在git 上了:https://github.com/BruceDone/web_message_queue_app