295 lines
10 KiB
Python
295 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
天网任务调度系统
|
||
负责任务的执行、重试、超时处理
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
from typing import Optional, Dict
|
||
from datetime import datetime
|
||
from .websocket_client_v2 import QingjianAPIClient
|
||
from db.task_db.task_model import get_task_manager, TaskStatus, TaskPriority
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 配置常量
|
||
TIMEOUT_CHECK_INTERVAL_SEC = 300 # 超时检查间隔(5分钟)
|
||
ERROR_RETRY_DELAY_SEC = 60 # 错误后重试延迟(1分钟)
|
||
QUEUE_POLL_INTERVAL_SEC = 1 # 队列轮询间隔(秒)
|
||
|
||
class TaskScheduler:
|
||
"""任务调度器"""
|
||
|
||
def __init__(self, client: QingjianAPIClient = None):
|
||
self.task_manager = get_task_manager()
|
||
self.client = client
|
||
self.running = False
|
||
self.task_timeout_checker = None
|
||
logger.info("任务调度器初始化完成")
|
||
|
||
async def start(self):
|
||
"""启动任务调度器"""
|
||
self.running = True
|
||
logger.info("任务调度器已启动")
|
||
|
||
# 启动超时检查任务
|
||
self.task_timeout_checker = asyncio.create_task(self._check_timeouts())
|
||
|
||
# 启动任务执行队列
|
||
await self._process_task_queue()
|
||
|
||
async def stop(self):
|
||
"""停止任务调度器"""
|
||
self.running = False
|
||
if self.task_timeout_checker:
|
||
self.task_timeout_checker.cancel()
|
||
logger.info("任务调度器已停止")
|
||
|
||
async def _check_timeouts(self):
|
||
"""检查超时任务"""
|
||
while self.running:
|
||
try:
|
||
timeout_tasks = self.task_manager.get_timeout_tasks()
|
||
for task in timeout_tasks:
|
||
logger.warning(f"任务超时:{task['task_id']}")
|
||
self.task_manager.cancel_task(
|
||
task['task_id'],
|
||
reason=f"任务超时({task['timeout_hours']}小时)"
|
||
)
|
||
# 通知天网任务超时
|
||
await self._notify_tianwang(task['task_id'], 'timeout')
|
||
|
||
# 每隔固定时间检查一次
|
||
await asyncio.sleep(TIMEOUT_CHECK_INTERVAL_SEC)
|
||
|
||
except asyncio.CancelledError:
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"超时检查失败:{e}")
|
||
await asyncio.sleep(ERROR_RETRY_DELAY_SEC)
|
||
|
||
async def _process_task_queue(self):
|
||
"""处理任务队列"""
|
||
# 持续监听,当有任务时执行
|
||
while self.running:
|
||
try:
|
||
# 这里实际应该从队列获取任务
|
||
# 简化处理:定期检查待触发任务
|
||
await asyncio.sleep(QUEUE_POLL_INTERVAL_SEC)
|
||
|
||
except Exception as e:
|
||
logger.error(f"任务队列处理失败:{e}")
|
||
|
||
async def execute_task(self, task: dict) -> bool:
|
||
"""
|
||
执行任务
|
||
|
||
Args:
|
||
task: 任务数据
|
||
|
||
Returns:
|
||
是否执行成功
|
||
"""
|
||
task_id = task['task_id']
|
||
|
||
try:
|
||
# 更新状态为执行中
|
||
self.task_manager.update_task_status(task_id, TaskStatus.RUNNING)
|
||
logger.info(f"开始执行任务:{task_id}")
|
||
|
||
# 根据任务类型执行
|
||
task_type = task.get('type')
|
||
|
||
if task_type == 'send_file_after_reply':
|
||
success = await self._execute_send_file(task)
|
||
elif task_type == 'send_message':
|
||
success = await self._execute_send_message(task)
|
||
elif task_type == 'send_link':
|
||
success = await self._execute_send_link(task)
|
||
else:
|
||
logger.error(f"未知任务类型:{task_type}")
|
||
success = False
|
||
|
||
if success:
|
||
self.task_manager.update_task_status(
|
||
task_id,
|
||
TaskStatus.COMPLETED,
|
||
result={'executed_at': datetime.now().isoformat()}
|
||
)
|
||
logger.info(f"任务执行成功:{task_id}")
|
||
|
||
# 通知天网
|
||
await self._notify_tianwang(task_id, 'completed')
|
||
else:
|
||
# 失败重试逻辑
|
||
await self._handle_task_failure(task)
|
||
|
||
return success
|
||
|
||
except Exception as e:
|
||
logger.error(f"任务执行失败:{task_id} - {e}")
|
||
await self._handle_task_failure(task, error=str(e))
|
||
return False
|
||
|
||
async def _execute_send_file(self, task: dict) -> bool:
|
||
"""执行发送文件任务"""
|
||
try:
|
||
customer_id = task.get('customer', {}).get('id')
|
||
file_url = task.get('action', {}).get('file_url')
|
||
message = task.get('action', {}).get('message', '这是您要的文件')
|
||
|
||
if not self.client:
|
||
logger.error("WebSocket 客户端未初始化")
|
||
return False
|
||
|
||
# 调用发送文件方法
|
||
# TODO: 实现 send_file 方法
|
||
logger.info(f"发送文件给 {customer_id}: {file_url}")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送文件失败:{e}")
|
||
return False
|
||
|
||
async def _execute_send_message(self, task: dict) -> bool:
|
||
"""执行发送消息任务"""
|
||
try:
|
||
customer_id = task.get('customer', {}).get('id')
|
||
message = task.get('action', {}).get('message')
|
||
|
||
if not self.client:
|
||
return False
|
||
|
||
# 发送消息
|
||
await self.client.send_message(customer_id, message)
|
||
logger.info(f"发送消息给 {customer_id}: {message}")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送消息失败:{e}")
|
||
return False
|
||
|
||
async def _execute_send_link(self, task: dict) -> bool:
|
||
"""执行发送链接任务"""
|
||
try:
|
||
customer_id = task.get('customer', {}).get('id')
|
||
link_url = task.get('action', {}).get('link_url')
|
||
message = task.get('action', {}).get('message', '')
|
||
|
||
full_message = f"{message}\n\n{link_url}" if message else link_url
|
||
|
||
return await self._execute_send_message({
|
||
'customer': {'id': customer_id},
|
||
'action': {'message': full_message}
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送链接失败:{e}")
|
||
return False
|
||
|
||
async def _handle_task_failure(self, task: dict, error: str = None):
|
||
"""处理任务失败"""
|
||
task_id = task['task_id']
|
||
max_retry = task.get('max_retry', 3)
|
||
|
||
retry_count = self.task_manager.increment_retry(task_id)
|
||
|
||
if retry_count <= max_retry:
|
||
# 重试
|
||
logger.warning(f"任务失败,{retry_count}/{max_retry} 次重试:{task_id}")
|
||
await asyncio.sleep(60 * retry_count) # 递增重试间隔
|
||
await self.execute_task(task)
|
||
else:
|
||
# 超过最大重试次数,标记失败
|
||
self.task_manager.update_task_status(
|
||
task_id,
|
||
TaskStatus.FAILED,
|
||
error_message=error or '超过最大重试次数'
|
||
)
|
||
logger.error(f"任务最终失败:{task_id}")
|
||
|
||
# 通知天网
|
||
await self._notify_tianwang(task_id, 'failed', error)
|
||
|
||
async def _notify_tianwang(self, task_id: str, status: str, error: str = None):
|
||
"""通知天网任务状态"""
|
||
try:
|
||
# TODO: 实现天网回调 API
|
||
# 可以通过 HTTP POST 通知天网
|
||
logger.info(f"通知天网 - 任务 {task_id} 状态:{status}")
|
||
|
||
# 示例代码(实际使用时取消注释并配置天网 URL)
|
||
'''
|
||
import httpx
|
||
async with httpx.AsyncClient() as client:
|
||
await client.post(
|
||
'http://127.0.0.1:6060/api/task/callback',
|
||
json={
|
||
'task_id': task_id,
|
||
'status': status,
|
||
'error': error,
|
||
'timestamp': datetime.now().isoformat()
|
||
}
|
||
)
|
||
'''
|
||
|
||
except Exception as e:
|
||
logger.error(f"通知天网失败:{e}")
|
||
|
||
def check_trigger_match(self, message: str, trigger: dict) -> bool:
|
||
"""
|
||
检查消息是否匹配触发条件
|
||
|
||
Args:
|
||
message: 客户消息内容
|
||
trigger: 触发条件配置
|
||
|
||
Returns:
|
||
是否匹配
|
||
"""
|
||
trigger_type = trigger.get('type')
|
||
|
||
if trigger_type == 'customer_reply':
|
||
keyword = trigger.get('keyword')
|
||
return keyword and keyword in message
|
||
|
||
elif trigger_type == 'customer_keyword':
|
||
keywords = trigger.get('keywords', [])
|
||
if isinstance(keywords, str):
|
||
import json
|
||
keywords = json.loads(keywords)
|
||
return any(kw in message for kw in keywords)
|
||
|
||
elif trigger_type == 'customer_payment':
|
||
# 付款检测逻辑
|
||
payment_keywords = ['已付款', '拍下了', '已下单', '付款了']
|
||
return any(kw in message for kw in payment_keywords)
|
||
|
||
elif trigger_type == 'time_reach':
|
||
# 时间判断
|
||
target_time = trigger.get('time')
|
||
if target_time:
|
||
try:
|
||
target = datetime.fromisoformat(target_time)
|
||
return datetime.now() >= target
|
||
except:
|
||
pass
|
||
return False
|
||
|
||
return False
|
||
|
||
|
||
# 单例
|
||
_scheduler: Optional[TaskScheduler] = None
|
||
|
||
def get_task_scheduler(client: QingjianAPIClient = None) -> TaskScheduler:
|
||
"""获取任务调度器单例"""
|
||
global _scheduler
|
||
if _scheduler is None:
|
||
_scheduler = TaskScheduler(client)
|
||
elif client and _scheduler.client is None:
|
||
_scheduler.client = client
|
||
return _scheduler
|