基于Celery + Redis的分布式任务管理系统,支持多种任务类型和动态任务管理。
- 任务类型支持:
- 立即执行任务
- 延时执行任务
- 定时执行任务(Cron表达式)
- 任务管理:
- 手动定义任务
- 动态添加任务(传递代码与函数名)
- API接口任务
- 任务添加、删除、停止
- 生产环境特性:
- 任务状态监控
- 错误处理和重试机制
- 日志记录
- 性能监控
- 安全认证
task_manage/
├── celery_app.py # Celery应用配置
├── tasks/ # 任务模块
│ ├── __init__.py
│ ├── base_tasks.py # 基础任务类
│ └── task_manager.py # 任务管理器
├── api/ # API接口
│ ├── __init__.py
│ ├── routes.py # 路由定义
│ └── models.py # 数据模型
├── config/ # 配置文件
│ ├── __init__.py
│ └── settings.py # 系统配置
├── utils/ # 工具模块
│ ├── __init__.py
│ ├── logger.py # 日志工具
│ └── security.py # 安全工具
├── requirements.txt # 依赖包
├── docker-compose.yml # Docker配置
└── README.md # 项目文档
pip install -r requirements.txtdocker-compose up -d rediscelery -A celery_app worker --loglevel=infocelery -A celery_app beat --loglevel=infopython api/app.pyPOST /api/tasks- 创建任务GET /api/tasks- 获取任务列表GET /api/tasks/{task_id}- 获取任务详情DELETE /api/tasks/{task_id}- 删除任务POST /api/tasks/{task_id}/stop- 停止任务
{
"name": "示例任务",
"task_type": "immediate", // immediate, delayed, scheduled
"function_code": "def hello_world():\n return 'Hello World!'",
"function_name": "hello_world",
"args": [],
"kwargs": {},
"delay_seconds": 60, // 延时执行时间(秒)
"cron_expression": "0 0 * * *" // Cron表达式
}- Redis连接配置
- Celery配置
- 日志配置
- 安全配置
- 任务执行状态监控
- 错误日志记录
- 性能指标收集