# -*- coding: utf-8 -*- """ 天网任务调度系统 负责任务的执行、重试、超时处理 """ import asyncio import logging from typing import Optional, Dict from datetime import datetime from .websocket_client_v2 import QingjianAPIClient from db.task_db.task_model import get_task_manager, TaskStatus, TaskPriority logger = logging.getLogger(__name__) # 配置常量 TIMEOUT_CHECK_INTERVAL_SEC = 300 # 超时检查间隔(5分钟) ERROR_RETRY_DELAY_SEC = 60 # 错误后重试延迟(1分钟) QUEUE_POLL_INTERVAL_SEC = 1 # 队列轮询间隔(秒) class TaskScheduler: """任务调度器""" def __init__(self, client: QingjianAPIClient = None): self.task_manager = get_task_manager() self.client = client self.running = False self.task_timeout_checker = None logger.info("任务调度器初始化完成") async def start(self): """启动任务调度器""" self.running = True logger.info("任务调度器已启动") # 启动超时检查任务 self.task_timeout_checker = asyncio.create_task(self._check_timeouts()) # 启动任务执行队列 await self._process_task_queue() async def stop(self): """停止任务调度器""" self.running = False if self.task_timeout_checker: self.task_timeout_checker.cancel() logger.info("任务调度器已停止") async def _check_timeouts(self): """检查超时任务""" while self.running: try: timeout_tasks = self.task_manager.get_timeout_tasks() for task in timeout_tasks: logger.warning(f"任务超时:{task['task_id']}") self.task_manager.cancel_task( task['task_id'], reason=f"任务超时({task['timeout_hours']}小时)" ) # 通知天网任务超时 await self._notify_tianwang(task['task_id'], 'timeout') # 每隔固定时间检查一次 await asyncio.sleep(TIMEOUT_CHECK_INTERVAL_SEC) except asyncio.CancelledError: break except Exception as e: logger.error(f"超时检查失败:{e}") await asyncio.sleep(ERROR_RETRY_DELAY_SEC) async def _process_task_queue(self): """处理任务队列""" # 持续监听,当有任务时执行 while self.running: try: # 这里实际应该从队列获取任务 # 简化处理:定期检查待触发任务 await asyncio.sleep(QUEUE_POLL_INTERVAL_SEC) except Exception as e: logger.error(f"任务队列处理失败:{e}") async def execute_task(self, task: dict) -> bool: """ 执行任务 Args: task: 任务数据 Returns: 是否执行成功 """ task_id = task['task_id'] try: # 更新状态为执行中 self.task_manager.update_task_status(task_id, TaskStatus.RUNNING) logger.info(f"开始执行任务:{task_id}") # 根据任务类型执行 task_type = task.get('type') if task_type == 'send_file_after_reply': success = await self._execute_send_file(task) elif task_type == 'send_message': success = await self._execute_send_message(task) elif task_type == 'send_link': success = await self._execute_send_link(task) else: logger.error(f"未知任务类型:{task_type}") success = False if success: self.task_manager.update_task_status( task_id, TaskStatus.COMPLETED, result={'executed_at': datetime.now().isoformat()} ) logger.info(f"任务执行成功:{task_id}") # 通知天网 await self._notify_tianwang(task_id, 'completed') else: # 失败重试逻辑 await self._handle_task_failure(task) return success except Exception as e: logger.error(f"任务执行失败:{task_id} - {e}") await self._handle_task_failure(task, error=str(e)) return False async def _execute_send_file(self, task: dict) -> bool: """执行发送文件任务""" try: customer_id = task.get('customer', {}).get('id') file_url = task.get('action', {}).get('file_url') message = task.get('action', {}).get('message', '这是您要的文件') if not self.client: logger.error("WebSocket 客户端未初始化") return False # 调用发送文件方法 # TODO: 实现 send_file 方法 logger.info(f"发送文件给 {customer_id}: {file_url}") return True except Exception as e: logger.error(f"发送文件失败:{e}") return False async def _execute_send_message(self, task: dict) -> bool: """执行发送消息任务""" try: customer_id = task.get('customer', {}).get('id') message = task.get('action', {}).get('message') if not self.client: return False # 发送消息 await self.client.send_message(customer_id, message) logger.info(f"发送消息给 {customer_id}: {message}") return True except Exception as e: logger.error(f"发送消息失败:{e}") return False async def _execute_send_link(self, task: dict) -> bool: """执行发送链接任务""" try: customer_id = task.get('customer', {}).get('id') link_url = task.get('action', {}).get('link_url') message = task.get('action', {}).get('message', '') full_message = f"{message}\n\n{link_url}" if message else link_url return await self._execute_send_message({ 'customer': {'id': customer_id}, 'action': {'message': full_message} }) except Exception as e: logger.error(f"发送链接失败:{e}") return False async def _handle_task_failure(self, task: dict, error: str = None): """处理任务失败""" task_id = task['task_id'] max_retry = task.get('max_retry', 3) retry_count = self.task_manager.increment_retry(task_id) if retry_count <= max_retry: # 重试 logger.warning(f"任务失败,{retry_count}/{max_retry} 次重试:{task_id}") await asyncio.sleep(60 * retry_count) # 递增重试间隔 await self.execute_task(task) else: # 超过最大重试次数,标记失败 self.task_manager.update_task_status( task_id, TaskStatus.FAILED, error_message=error or '超过最大重试次数' ) logger.error(f"任务最终失败:{task_id}") # 通知天网 await self._notify_tianwang(task_id, 'failed', error) async def _notify_tianwang(self, task_id: str, status: str, error: str = None): """通知天网任务状态""" try: # TODO: 实现天网回调 API # 可以通过 HTTP POST 通知天网 logger.info(f"通知天网 - 任务 {task_id} 状态:{status}") # 示例代码(实际使用时取消注释并配置天网 URL) ''' import httpx async with httpx.AsyncClient() as client: await client.post( 'http://127.0.0.1:6060/api/task/callback', json={ 'task_id': task_id, 'status': status, 'error': error, 'timestamp': datetime.now().isoformat() } ) ''' except Exception as e: logger.error(f"通知天网失败:{e}") def check_trigger_match(self, message: str, trigger: dict) -> bool: """ 检查消息是否匹配触发条件 Args: message: 客户消息内容 trigger: 触发条件配置 Returns: 是否匹配 """ trigger_type = trigger.get('type') if trigger_type == 'customer_reply': keyword = trigger.get('keyword') return keyword and keyword in message elif trigger_type == 'customer_keyword': keywords = trigger.get('keywords', []) if isinstance(keywords, str): import json keywords = json.loads(keywords) return any(kw in message for kw in keywords) elif trigger_type == 'customer_payment': # 付款检测逻辑 payment_keywords = ['已付款', '拍下了', '已下单', '付款了'] return any(kw in message for kw in payment_keywords) elif trigger_type == 'time_reach': # 时间判断 target_time = trigger.get('time') if target_time: try: target = datetime.fromisoformat(target_time) return datetime.now() >= target except: pass return False return False # 单例 _scheduler: Optional[TaskScheduler] = None def get_task_scheduler(client: QingjianAPIClient = None) -> TaskScheduler: """获取任务调度器单例""" global _scheduler if _scheduler is None: _scheduler = TaskScheduler(client) elif client and _scheduler.client is None: _scheduler.client = client return _scheduler