Skip to content

异步Redis Queue

ARG安装与使用

介绍

arq 是一个基于 Python asyncio 和 Redis 的轻量级、高性能异步任务队列系统,用于在后台执行任务(Job Queue)或实现 RPC。它被设计为 RQ(Redis Queue) 的现代、非阻塞替代品。

参考资料:arq官方文档

安装与配置

安装

python
pip install arq

定义异步任务函数:app\api\v1\module_test\docs\tasks.py

  • 第一个参数通常是 ctx(上下文),用于访问共享资源
python
import asyncio

async def send_welcome_email(ctx, user_email: str, user_name: str):
    """模拟发送欢迎邮件的耗时任务"""
    print(f"开始执行任务: 向 {user_name} <{user_email}> 发送欢迎邮件")
    # 模拟耗时操作,如连接邮件服务器
    await asyncio.sleep(5)
    print(f"欢迎邮件已发送至 {user_name}")
    return {"status": "success", "recipient": user_email}

配置全局 ARQ 连接池:app\config\a_redis_queue.py

python
from arq import create_pool
from arq.connections import RedisSettings

from .settings import settings

# 配置 Redis 连接
redis_settings = RedisSettings(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    database=settings.REDIS_DB,
    password=settings.REDIS_PASSWORD,
)

# 全局 ARQ 连接池
_arq_pool = None


async def get_arq_pool():
    """
    获取 ARQ 连接池实例
    """
    global _arq_pool
    if _arq_pool is None:
        _arq_pool = await create_pool(redis_settings)
    return _arq_pool


async def close_arq_pool():
    """
    关闭 ARQ 连接池
    """
    global _arq_pool
    if _arq_pool is not None:
        _arq_pool.close()
        await _arq_pool.wait_closed()
        _arq_pool = None

配置 ARQ Worker:arq_worker.py

python
"""配置 ARQ Worker"""

from app.config.arq_config import redis_settings
from app.api.v1.module_rag.docs.tasks import send_welcome_email
from app.api.v1.module_rag.docs.tasks import parse_and_embed_doc_task


class WorkerSettings:
    # 注册所有ARQ任务函数
    functions = [send_welcome_email, parse_and_embed_doc_task]
    redis_settings = redis_settings

运行

编写接口触发ARQ任务:app\api\v1\module_test\docs\controller.py

  • 注意:填写任务函数名字符串
python
@router.post("/send-email", summary="发送欢迎邮件")
async def send_email(user_email: str, user_name: str):
    """
    发送欢迎邮件(测试)
    """
    arq_pool = await get_arq_pool()
    job = await arq_pool.enqueue_job("send_welcome_email", user_email, user_name)
    return {
        "message": "欢迎邮件将在后台发送。",
        "job_id": job.job_id,
    }

启动 ARQ worker 执行任务

  • --burst:执行完当前任务后退出。
  • --watch .:监听代码变更自动重启(需安装 watchfiles)。
shell
arq arq_worker.WorkerSettings

运行Fastapi,访问接口,触发任务。

image-20260112104904308