Files
tw/core/orchestrator.py
2026-03-06 15:06:06 +08:00

320 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import asyncio
import re
import time
import json
from typing import Optional, List, Any, Dict
from collections import deque
from core.schema import StandardMessage, StandardResponse
from core.adapters.qianniu_adapter import QianniuAdapter
from core.pydantic_ai_agent_v2 import CustomerServiceBrain
from core.events.event_bus import bus
from core.repository import repo
logger = logging.getLogger("cs_agent")
# 配置常量
MSG_DEDUP_CAPACITY = 200 # 消息 ID 去重缓存容量
TRANSFER_COOLDOWN_SEC = 120 # 转接冷却时间(秒)—— 转接后2分钟内不再调用AI
DEBOUNCE_SECONDS = 2.0 # 消息防抖延迟(秒)
# 转接后安抚话术池(轮换使用,避免复读)
_TRANSFER_CALM_REPLIES = [
"我在帮你催了哈,稍等下",
"已经转了哈,马上就来",
"收到,设计师在赶来了哈",
"好的亲,稍等一下哈",
"在催了在催了,马上哈",
]
class SystemOrchestrator:
"""
全系统总编排:具备转接冷却、防抖合并、多消息去重、以及精准日志。
"""
def __init__(self, ws_client=None):
self.ws_client = ws_client
self.qianniu_adapter = QianniuAdapter(ws_client)
self.brain = CustomerServiceBrain()
# 1. 消息 ID 去重
self._processed_msg_ids = deque(maxlen=MSG_DEDUP_CAPACITY)
# 2. 转接冷却存储 (session_key -> last_transfer_time)
self._last_transfer_time: Dict[str, float] = {}
self._transfer_calm_idx: Dict[str, int] = {} # 安抚话术轮换索引
# 3. 防抖配置
self._debounce_seconds = DEBOUNCE_SECONDS
self._debounce_tasks: Dict[str, asyncio.Task] = {}
self._pending_messages: Dict[str, List[StandardMessage]] = {}
self._user_locks: Dict[str, asyncio.Lock] = {}
bus.subscribe("MESSAGE_OUTBOUND", self.handle_outbound_event)
@staticmethod
def _has_transfer_intent(text: str) -> bool:
if not text:
return False
t = str(text)
keywords = ("转人工", "转接", "人工客服", "人工", "设计师", "叫人", "找人")
return any(k in t for k in keywords)
def _get_user_lock(self, user_id: str) -> asyncio.Lock:
if user_id not in self._user_locks:
self._user_locks[user_id] = asyncio.Lock()
return self._user_locks[user_id]
async def on_raw_message_received(self, platform: str, raw_data: dict):
"""链路入口"""
try:
if platform != "qianniu": return
std_msg, direction = await self.qianniu_adapter.translate_inbound(raw_data)
# 关键修复:确保 user_id 绝不为空
user_id = std_msg.user_id or str(raw_data.get("cy_id") or raw_data.get("from_id") or "unknown")
std_msg.user_id = user_id
# 店铺隔离:同一客户在不同店铺的对话独立处理
session_key = f"{user_id}@{std_msg.acc_id}"
# 订单消息 / 纯金额通知 / SKU信息静默入库不触发 AI 回复
msg_text = std_msg.content or ""
is_order = "[系统订单信息]" in msg_text
is_price_only = bool(re.match(r'^[\s\n]*金?额?[:]?\s*[\d.]+\s*元', msg_text.strip()))
is_sku_only = bool(re.match(r'^[\s\n]*(备注[:]|数量[:]|款式[:]|定制[:])', msg_text.strip()))
is_sku_amount = bool(re.match(r'^[\s\n]*金额[:]\s*[\d.]+元\s*●', msg_text.strip()))
if is_order or is_price_only or is_sku_only or is_sku_amount:
await self._handle_order_packet(platform, std_msg)
logger.info(f"[订单消息] user={user_id} acc={std_msg.acc_id} 已入库更新状态")
await repo.save_chat(platform, user_id, msg_text, "in", acc_id=std_msg.acc_id)
return
preview = (std_msg.content or "").replace("\n", "\\n")
if len(preview) > 120:
preview = preview[:120] + "..."
logger.info(
f"[监听消息] dir={direction} user={user_id} acc={std_msg.acc_id} "
f"type={std_msg.msg_type} images={len(std_msg.image_urls)} content={preview}"
)
# 过滤心跳
if not (std_msg.content or "").strip() and not std_msg.image_urls: return
# 如果是商家人工回复,静默入库
if direction == "out":
await repo.save_chat(platform, user_id, std_msg.content, "out", acc_id=std_msg.acc_id)
return
# ID 去重
if std_msg.msg_id:
if std_msg.msg_id in self._processed_msg_ids: return
self._processed_msg_ids.append(std_msg.msg_id)
# 进入防抖(使用 session_key 隔离不同店铺)
if session_key in self._debounce_tasks: self._debounce_tasks[session_key].cancel()
if session_key not in self._pending_messages: self._pending_messages[session_key] = []
self._pending_messages[session_key].append(std_msg)
self._debounce_tasks[session_key] = asyncio.create_task(self._debounced_process(session_key, user_id, platform))
except Exception as e:
logger.error(f"[Orchestrator] 处理失败: {e}")
async def _handle_order_packet(self, platform: str, msg: StandardMessage):
try:
from core.order_helpers import parse_order_info
from db.chat_log_db import upsert_order
content = msg.content or ""
info = parse_order_info(content)
price_match = re.search(r"金额[:]\s*([\d\.]+)\s*元", content)
if price_match:
await repo.update_task_price(platform, msg.user_id, float(price_match.group(1)))
if any(k in content for k in ["买家已付款", "卖家已发货"]):
await repo.update_task_outcome(platform, msg.user_id, "deal_success")
elif any(k in content for k in ["退款", "已关闭", "已取消"]):
await repo.update_task_outcome(platform, msg.user_id, "refunded")
# 结构化写入 customer_orders 表
order_id = info.get("order_id", "")
if order_id and msg.user_id and msg.user_id != "unknown":
title_match = re.search(r"商品标题[:]\s*([^\s]+(?:\s+[^\s订]+)*)", content)
product_title = title_match.group(1).strip() if title_match else ""
amount = float(info.get("amount", 0))
quantity = int(info.get("quantity", 0))
order_status = info.get("order_status", "")
buyer_note = info.get("buyer_note", "")
await asyncio.to_thread(
upsert_order,
customer_id=msg.user_id,
order_id=order_id,
order_status=order_status,
acc_id=msg.acc_id,
product_title=product_title,
amount=amount,
quantity=quantity,
buyer_note=buyer_note,
)
logger.info(f"[订单入库] user={msg.user_id} order={order_id} status={order_status} amount={amount}")
except Exception as e:
logger.warning(f"[Orchestrator] 订单消息处理异常: {e}")
async def _analyze_images_background(self, session_key: str, image_urls: List[str]):
"""后台静默分析图片,存入用户数据库用于数据标定"""
try:
from services.service_image_analyzer import image_analyzer_service
from db.customer_db import CustomerDatabase
db = CustomerDatabase()
profile = db.get_customer(session_key)
for url in image_urls:
try:
result = await image_analyzer_service.analyze(url)
result_json = json.dumps(result, ensure_ascii=False)
# 更新最近一次分析
profile.last_image_analysis = result_json
profile.last_image_url = url
profile.last_gemini_prompt = result.get("gemini_prompt", "")
profile.last_aspect_ratio = result.get("aspect_ratio", "1:1")
profile.last_perspective = result.get("perspective", "no")
# 追加到历史记录保留最近20条
if profile.image_analysis_history is None:
profile.image_analysis_history = []
profile.image_analysis_history.append(result_json)
if len(profile.image_analysis_history) > 20:
profile.image_analysis_history = profile.image_analysis_history[-20:]
# 更新复杂度历史
complexity = result.get("complexity", "normal")
if profile.complexity_history is None:
profile.complexity_history = []
profile.complexity_history.append(complexity)
if len(profile.complexity_history) > 10:
profile.complexity_history = profile.complexity_history[-10:]
# 更新图片类型历史
proc_type = result.get("proc_type", "")
if proc_type and profile.image_type_history is not None:
if proc_type not in profile.image_type_history:
profile.image_type_history.append(proc_type)
logger.debug(f"[ImageAnalysis] session={session_key} 分析完成: {result.get('subject', '?')} | {complexity}")
except Exception as e:
logger.warning(f"[ImageAnalysis] 单张图片分析失败: {e}")
continue
# 保存更新
db.save_customer(profile)
logger.info(f"[ImageAnalysis] session={session_key} 分析结果已保存到数据库")
except Exception as e:
logger.warning(f"[ImageAnalysis] 后台分析失败: {e}")
async def _debounced_process(self, session_key: str, user_id: str, platform: str):
try:
await asyncio.sleep(self._debounce_seconds)
async with self._get_user_lock(session_key):
messages = self._pending_messages.pop(session_key, [])
if not messages: return
# A. 合并与元数据修复(去重:同一防抖窗口内完全相同的内容只保留一条)
seen_contents = set()
unique_parts = []
for m in messages:
c = (m.content or "").strip()
if c and c not in seen_contents:
seen_contents.add(c)
unique_parts.append(c)
combined_content = "\n".join(unique_parts)
all_image_urls = []
acc_id = messages[-1].acc_id
acc_type = messages[-1].acc_type
for m in messages:
for url in m.image_urls:
if url not in all_image_urls: all_image_urls.append(url)
# 防抖合并后的消息仍需有 msg_id避免触发 StandardMessage 校验失败
merged_msg_id = messages[-1].msg_id if messages[-1].msg_id else f"merged-{user_id}-{int(time.time() * 1000)}"
final_msg = StandardMessage(
platform=platform,
msg_id=merged_msg_id,
user_id=user_id,
content=combined_content,
msg_type=messages[-1].msg_type,
image_urls=all_image_urls,
acc_id=acc_id,
acc_type=acc_type
)
# B. 持久化
db_content = combined_content
if all_image_urls: db_content = f"【系统:已收到{len(all_image_urls)}张图】\n{combined_content}"
await repo.save_chat(platform, user_id, db_content, "in", acc_id=acc_id, image_urls=all_image_urls)
# B2. 后台图片分析(不阻塞主流程,用于数据标定)
if all_image_urls:
asyncio.create_task(self._analyze_images_background(session_key, all_image_urls))
# C. 冷却检查转接成功后冷却期内直接回安抚话术不调AI
last_transfer = self._last_transfer_time.get(session_key, 0)
cooldown_elapsed = time.time() - last_transfer
is_in_cooldown = cooldown_elapsed < TRANSFER_COOLDOWN_SEC
if is_in_cooldown:
idx = self._transfer_calm_idx.get(session_key, 0)
calm_reply = _TRANSFER_CALM_REPLIES[idx % len(_TRANSFER_CALM_REPLIES)]
self._transfer_calm_idx[session_key] = idx + 1
logger.info(f"[Orchestrator] 转接冷却中({cooldown_elapsed:.0f}s),直接安抚: {calm_reply}")
std_res = StandardResponse(
reply_content=calm_reply,
metadata={"acc_id": acc_id, "acc_type": acc_type}
)
else:
# D. 正常流程调用AI思考
history = await repo.get_chat_history(user_id, limit=10, acc_id=acc_id)
if history and history[-1].get('content') == db_content: history = history[:-1]
std_res = await self.brain.think_and_reply(final_msg, history=history)
# E. 发送并记录时间
if std_res.should_reply:
std_res.metadata = {"acc_id": acc_id, "acc_type": acc_type}
# 转接场景:先发一句安抚话,再发转接指令
if "[转移会话]" in std_res.reply_content:
greet = StandardResponse(
reply_content="收到,我叫设计师来看下哈",
metadata={"acc_id": acc_id, "acc_type": acc_type}
)
await self.qianniu_adapter.translate_outbound(greet, user_id)
await repo.save_chat(platform, user_id, greet.reply_content, "out", acc_id=acc_id)
await asyncio.sleep(0.5)
await self.qianniu_adapter.translate_outbound(std_res, user_id)
await repo.save_chat(platform, user_id, std_res.reply_content, "out", acc_id=acc_id)
if "[转移会话]" in std_res.reply_content:
self._last_transfer_time[session_key] = time.time()
except asyncio.CancelledError: pass
except Exception as e: logger.exception(f"[Orchestrator] 处理失败: {e}")
async def handle_outbound_event(self, user_id: str, platform: str, response: StandardResponse):
if platform == "qianniu":
await self.qianniu_adapter.translate_outbound(response, user_id)
# 全局单例
orchestrator: Optional[SystemOrchestrator] = None
def init_orchestrator(ws_client):
global orchestrator
orchestrator = SystemOrchestrator(ws_client)
return orchestrator