外观
异步Redis Queue
ARG安装与使用
介绍
arq 是一个基于 Python asyncio 和 Redis 的轻量级、高性能异步任务队列系统,用于在后台执行任务(Job Queue)或实现 RPC。它被设计为 RQ(Redis Queue) 的现代、非阻塞替代品。
参考资料:arq官方文档
安装与配置
安装
python
pip install arq定义异步任务函数:app\api\v1\module_test\docs\tasks.py
- 第一个参数通常是
ctx(上下文),用于访问共享资源
python
import asyncio
async def send_welcome_email(ctx, user_email: str, user_name: str):
"""模拟发送欢迎邮件的耗时任务"""
print(f"开始执行任务: 向 {user_name} <{user_email}> 发送欢迎邮件")
# 模拟耗时操作,如连接邮件服务器
await asyncio.sleep(5)
print(f"欢迎邮件已发送至 {user_name}")
return {"status": "success", "recipient": user_email}配置全局 ARQ 连接池:app\config\a_redis_queue.py
python
from arq import create_pool
from arq.connections import RedisSettings
from .settings import settings
# 配置 Redis 连接
redis_settings = RedisSettings(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
database=settings.REDIS_DB,
password=settings.REDIS_PASSWORD,
)
# 全局 ARQ 连接池
_arq_pool = None
async def get_arq_pool():
"""
获取 ARQ 连接池实例
"""
global _arq_pool
if _arq_pool is None:
_arq_pool = await create_pool(redis_settings)
return _arq_pool
async def close_arq_pool():
"""
关闭 ARQ 连接池
"""
global _arq_pool
if _arq_pool is not None:
_arq_pool.close()
await _arq_pool.wait_closed()
_arq_pool = None配置 ARQ Worker:arq_worker.py
python
"""配置 ARQ Worker"""
from app.config.arq_config import redis_settings
from app.api.v1.module_rag.docs.tasks import send_welcome_email
from app.api.v1.module_rag.docs.tasks import parse_and_embed_doc_task
class WorkerSettings:
# 注册所有ARQ任务函数
functions = [send_welcome_email, parse_and_embed_doc_task]
redis_settings = redis_settings运行
编写接口触发ARQ任务:app\api\v1\module_test\docs\controller.py
- 注意:填写任务函数名字符串
python
@router.post("/send-email", summary="发送欢迎邮件")
async def send_email(user_email: str, user_name: str):
"""
发送欢迎邮件(测试)
"""
arq_pool = await get_arq_pool()
job = await arq_pool.enqueue_job("send_welcome_email", user_email, user_name)
return {
"message": "欢迎邮件将在后台发送。",
"job_id": job.job_id,
}启动 ARQ worker 执行任务
- 加
--burst:执行完当前任务后退出。 - 加
--watch .:监听代码变更自动重启(需安装watchfiles)。
shell
arq arq_worker.WorkerSettings运行Fastapi,访问接口,触发任务。
