Files
tw/core/task_scheduler.py
2026-03-06 12:44:57 +08:00

295 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
天网任务调度系统
负责任务的执行、重试、超时处理
"""
import asyncio
import logging
from typing import Optional, Dict
from datetime import datetime
from .websocket_client_v2 import QingjianAPIClient
from db.task_db.task_model import get_task_manager, TaskStatus, TaskPriority
logger = logging.getLogger(__name__)
# 配置常量
TIMEOUT_CHECK_INTERVAL_SEC = 300 # 超时检查间隔5分钟
ERROR_RETRY_DELAY_SEC = 60 # 错误后重试延迟1分钟
QUEUE_POLL_INTERVAL_SEC = 1 # 队列轮询间隔(秒)
class TaskScheduler:
"""任务调度器"""
def __init__(self, client: QingjianAPIClient = None):
self.task_manager = get_task_manager()
self.client = client
self.running = False
self.task_timeout_checker = None
logger.info("任务调度器初始化完成")
async def start(self):
"""启动任务调度器"""
self.running = True
logger.info("任务调度器已启动")
# 启动超时检查任务
self.task_timeout_checker = asyncio.create_task(self._check_timeouts())
# 启动任务执行队列
await self._process_task_queue()
async def stop(self):
"""停止任务调度器"""
self.running = False
if self.task_timeout_checker:
self.task_timeout_checker.cancel()
logger.info("任务调度器已停止")
async def _check_timeouts(self):
"""检查超时任务"""
while self.running:
try:
timeout_tasks = self.task_manager.get_timeout_tasks()
for task in timeout_tasks:
logger.warning(f"任务超时:{task['task_id']}")
self.task_manager.cancel_task(
task['task_id'],
reason=f"任务超时({task['timeout_hours']}小时)"
)
# 通知天网任务超时
await self._notify_tianwang(task['task_id'], 'timeout')
# 每隔固定时间检查一次
await asyncio.sleep(TIMEOUT_CHECK_INTERVAL_SEC)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"超时检查失败:{e}")
await asyncio.sleep(ERROR_RETRY_DELAY_SEC)
async def _process_task_queue(self):
"""处理任务队列"""
# 持续监听,当有任务时执行
while self.running:
try:
# 这里实际应该从队列获取任务
# 简化处理:定期检查待触发任务
await asyncio.sleep(QUEUE_POLL_INTERVAL_SEC)
except Exception as e:
logger.error(f"任务队列处理失败:{e}")
async def execute_task(self, task: dict) -> bool:
"""
执行任务
Args:
task: 任务数据
Returns:
是否执行成功
"""
task_id = task['task_id']
try:
# 更新状态为执行中
self.task_manager.update_task_status(task_id, TaskStatus.RUNNING)
logger.info(f"开始执行任务:{task_id}")
# 根据任务类型执行
task_type = task.get('type')
if task_type == 'send_file_after_reply':
success = await self._execute_send_file(task)
elif task_type == 'send_message':
success = await self._execute_send_message(task)
elif task_type == 'send_link':
success = await self._execute_send_link(task)
else:
logger.error(f"未知任务类型:{task_type}")
success = False
if success:
self.task_manager.update_task_status(
task_id,
TaskStatus.COMPLETED,
result={'executed_at': datetime.now().isoformat()}
)
logger.info(f"任务执行成功:{task_id}")
# 通知天网
await self._notify_tianwang(task_id, 'completed')
else:
# 失败重试逻辑
await self._handle_task_failure(task)
return success
except Exception as e:
logger.error(f"任务执行失败:{task_id} - {e}")
await self._handle_task_failure(task, error=str(e))
return False
async def _execute_send_file(self, task: dict) -> bool:
"""执行发送文件任务"""
try:
customer_id = task.get('customer', {}).get('id')
file_url = task.get('action', {}).get('file_url')
message = task.get('action', {}).get('message', '这是您要的文件')
if not self.client:
logger.error("WebSocket 客户端未初始化")
return False
# 调用发送文件方法
# TODO: 实现 send_file 方法
logger.info(f"发送文件给 {customer_id}: {file_url}")
return True
except Exception as e:
logger.error(f"发送文件失败:{e}")
return False
async def _execute_send_message(self, task: dict) -> bool:
"""执行发送消息任务"""
try:
customer_id = task.get('customer', {}).get('id')
message = task.get('action', {}).get('message')
if not self.client:
return False
# 发送消息
await self.client.send_message(customer_id, message)
logger.info(f"发送消息给 {customer_id}: {message}")
return True
except Exception as e:
logger.error(f"发送消息失败:{e}")
return False
async def _execute_send_link(self, task: dict) -> bool:
"""执行发送链接任务"""
try:
customer_id = task.get('customer', {}).get('id')
link_url = task.get('action', {}).get('link_url')
message = task.get('action', {}).get('message', '')
full_message = f"{message}\n\n{link_url}" if message else link_url
return await self._execute_send_message({
'customer': {'id': customer_id},
'action': {'message': full_message}
})
except Exception as e:
logger.error(f"发送链接失败:{e}")
return False
async def _handle_task_failure(self, task: dict, error: str = None):
"""处理任务失败"""
task_id = task['task_id']
max_retry = task.get('max_retry', 3)
retry_count = self.task_manager.increment_retry(task_id)
if retry_count <= max_retry:
# 重试
logger.warning(f"任务失败,{retry_count}/{max_retry} 次重试:{task_id}")
await asyncio.sleep(60 * retry_count) # 递增重试间隔
await self.execute_task(task)
else:
# 超过最大重试次数,标记失败
self.task_manager.update_task_status(
task_id,
TaskStatus.FAILED,
error_message=error or '超过最大重试次数'
)
logger.error(f"任务最终失败:{task_id}")
# 通知天网
await self._notify_tianwang(task_id, 'failed', error)
async def _notify_tianwang(self, task_id: str, status: str, error: str = None):
"""通知天网任务状态"""
try:
# TODO: 实现天网回调 API
# 可以通过 HTTP POST 通知天网
logger.info(f"通知天网 - 任务 {task_id} 状态:{status}")
# 示例代码(实际使用时取消注释并配置天网 URL
'''
import httpx
async with httpx.AsyncClient() as client:
await client.post(
'http://127.0.0.1:6060/api/task/callback',
json={
'task_id': task_id,
'status': status,
'error': error,
'timestamp': datetime.now().isoformat()
}
)
'''
except Exception as e:
logger.error(f"通知天网失败:{e}")
def check_trigger_match(self, message: str, trigger: dict) -> bool:
"""
检查消息是否匹配触发条件
Args:
message: 客户消息内容
trigger: 触发条件配置
Returns:
是否匹配
"""
trigger_type = trigger.get('type')
if trigger_type == 'customer_reply':
keyword = trigger.get('keyword')
return keyword and keyword in message
elif trigger_type == 'customer_keyword':
keywords = trigger.get('keywords', [])
if isinstance(keywords, str):
import json
keywords = json.loads(keywords)
return any(kw in message for kw in keywords)
elif trigger_type == 'customer_payment':
# 付款检测逻辑
payment_keywords = ['已付款', '拍下了', '已下单', '付款了']
return any(kw in message for kw in payment_keywords)
elif trigger_type == 'time_reach':
# 时间判断
target_time = trigger.get('time')
if target_time:
try:
target = datetime.fromisoformat(target_time)
return datetime.now() >= target
except:
pass
return False
return False
# 单例
_scheduler: Optional[TaskScheduler] = None
def get_task_scheduler(client: QingjianAPIClient = None) -> TaskScheduler:
"""获取任务调度器单例"""
global _scheduler
if _scheduler is None:
_scheduler = TaskScheduler(client)
elif client and _scheduler.client is None:
_scheduler.client = client
return _scheduler