Files
tw/core/orchestrator.py

762 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import asyncio
import re
import time
import json
from datetime import datetime
from typing import Optional, List, Any, Dict
from collections import deque
from core.schema import StandardMessage, StandardResponse
from core.adapters.qianniu_adapter import QianniuAdapter
from core.pydantic_ai_agent_v2 import CustomerServiceBrain
from core.events.event_bus import bus
from core.repository import repo
from db.pending_transfer_db import (
enqueue_pending_transfer,
claim_due_pending_transfers,
complete_pending_transfer,
retry_pending_transfer,
)
from services.dispatch_service import dispatch_service
from services.service_auto_image_pipeline import auto_image_pipeline_service
logger = logging.getLogger("cs_agent")
# 配置常量
MSG_DEDUP_CAPACITY = 200 # 消息 ID 去重缓存容量
TRANSFER_COOLDOWN_SEC = 120 # 转接冷却时间(秒)—— 转接后2分钟内不再调用AI
DEBOUNCE_SECONDS = 2.0 # 消息防抖延迟(秒)
PENDING_TRANSFER_POLL_SECONDS = 30
PENDING_TRANSFER_RETRY_SECONDS = 60
TRANSFER_RETRY_WINDOW_SEC = 300
TRANSFER_RETRY_GAP_SEC = 45
# 转接后安抚话术池(轮换使用,避免复读)
_TRANSFER_CALM_REPLIES = [
"我在帮你催了哈,稍等下",
"已经转了哈,马上就来",
"收到,设计师在赶来了哈",
"好的亲,稍等一下哈",
"在催了在催了,马上哈",
]
_OUTBOUND_BLOCK_MARKERS = (
"【历史记录摘要】",
"【详细记录】",
"【订单摘要】",
"【订单详情】",
"<think",
"think_never_used",
'[{"name":',
)
_TRANSFER_COMMAND_MARKER = "[转移会话]"
# 历史记录格式检测模式AI 转述历史时容易泄露)
_HISTORY_LEAK_PATTERNS = [
r'\[\d{4}-\d{2}-\d{2}[^\]]*\]\s*(客户|客服)[:]', # [2026-03-07 12:00:00] 客户:
r'\[\d{2}:\d{2}:\d{2}\]\s*(客户|客服|我)[:]', # [12:00:00] 客户:
r'(根据|查看|查询|翻看)(历史|聊天|对话)(记录|内容)', # 根据历史记录
r'历史(记录|对话|消息)(显示|表明|中)', # 历史记录显示
r'之前的(聊天|对话|记录)(中|里|显示)', # 之前的聊天中
r'\d+条(历史|对话)?消息', # 共30条历史消息
r'订单号[:]\s*\d{10,}', # 订单号:xxxxxxxxxx
r'(状态|金额|数量)[:].*(状态|金额|数量)[:]', # 状态:xxx 金额:xxx 连续出现
]
class SystemOrchestrator:
"""
全系统总编排:具备转接冷却、防抖合并、多消息去重、以及精准日志。
"""
def __init__(self, ws_client=None):
self.ws_client = ws_client
self.qianniu_adapter = QianniuAdapter(ws_client)
self.brain = CustomerServiceBrain()
# 1. 消息 ID 去重
self._processed_msg_ids = deque(maxlen=MSG_DEDUP_CAPACITY)
# 2. 转接冷却存储 (session_key -> last_transfer_time)
self._last_transfer_time: Dict[str, float] = {}
self._transfer_calm_idx: Dict[str, int] = {} # 安抚话术轮换索引
# 3. 防抖配置
self._debounce_seconds = DEBOUNCE_SECONDS
self._debounce_tasks: Dict[str, asyncio.Task] = {}
self._pending_messages: Dict[str, List[StandardMessage]] = {}
self._user_locks: Dict[str, asyncio.Lock] = {}
self._pending_transfer_task: Optional[asyncio.Task] = None
self._last_retry_transfer_time: Dict[str, float] = {}
self._auto_pipeline_jobs: Dict[str, float] = {}
bus.subscribe("MESSAGE_OUTBOUND", self.handle_outbound_event)
@staticmethod
def _has_transfer_intent(text: str) -> bool:
if not text:
return False
t = str(text)
keywords = ("转人工", "转接", "人工客服", "人工", "设计师", "叫人", "找人")
return any(k in t for k in keywords)
def _get_user_lock(self, user_id: str) -> asyncio.Lock:
if user_id not in self._user_locks:
self._user_locks[user_id] = asyncio.Lock()
return self._user_locks[user_id]
def _should_run_pending_transfer_worker(self) -> bool:
worker_id = getattr(self.ws_client, "worker_id", -1) if self.ws_client else -1
return worker_id in (-1, 0)
def _ensure_background_tasks(self):
if not self._should_run_pending_transfer_worker():
return
if self._pending_transfer_task is None or self._pending_transfer_task.done():
self._pending_transfer_task = asyncio.create_task(self._process_pending_transfers_loop())
logger.info("[Orchestrator] 待转接轮询任务已启动")
@staticmethod
def _parse_history_ts(ts: Any) -> Optional[datetime]:
text = str(ts or "").strip()
if not text:
return None
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f"):
try:
return datetime.strptime(text, fmt)
except ValueError:
continue
return None
def _find_stalled_transfer(self, history: List[dict]) -> Optional[dict]:
if not history:
return None
last_transfer_idx = -1
for idx in range(len(history) - 1, -1, -1):
item = history[idx]
if item.get("role") == "assistant" and _TRANSFER_COMMAND_MARKER in str(item.get("content") or ""):
last_transfer_idx = idx
break
if last_transfer_idx < 0:
return None
transfer_item = history[last_transfer_idx]
transfer_at = self._parse_history_ts(transfer_item.get("timestamp"))
if not transfer_at:
return None
elapsed = time.time() - transfer_at.timestamp()
if elapsed < 0 or elapsed > TRANSFER_RETRY_WINDOW_SEC:
return None
after_transfer = history[last_transfer_idx + 1:]
if not any(item.get("role") == "user" for item in after_transfer):
return None
for item in after_transfer:
if item.get("role") != "assistant":
continue
content = str(item.get("content") or "")
if _TRANSFER_COMMAND_MARKER not in content:
return None
return {
"timestamp": transfer_at,
"elapsed": elapsed,
"content": str(transfer_item.get("content") or ""),
}
async def _retry_stalled_transfer_if_needed(
self,
session_key: str,
user_id: str,
platform: str,
acc_id: str,
acc_type: str,
history: List[dict],
) -> Optional[StandardResponse]:
stalled = self._find_stalled_transfer(history)
if not stalled:
return None
last_retry_at = self._last_retry_transfer_time.get(session_key, 0.0)
if time.time() - last_retry_at < TRANSFER_RETRY_GAP_SEC:
logger.info(
f"[Orchestrator] 转接补发冷却中,先不重复补转: user={user_id} acc={acc_id}"
)
return None
logger.info(
f"[Orchestrator] 检测到疑似转接未接上,准备补发转接: "
f"user={user_id} acc={acc_id} elapsed={stalled['elapsed']:.0f}s"
)
designer_name = await dispatch_service.assign_designer(user_id=user_id)
if not designer_name:
logger.info(f"[Orchestrator] 补发转接失败,当前仍无可用设计师: user={user_id} acc={acc_id}")
return None
self._last_retry_transfer_time[session_key] = time.time()
return StandardResponse(
reply_content=f"正在为您转接|[转移会话],{designer_name},无原因",
need_transfer=True,
metadata={
"acc_id": acc_id,
"acc_type": acc_type,
"transfer_prelude": "我再帮您转一下哈",
"retry_transfer": True,
},
)
@staticmethod
def _sanitize_outbound_text(text: str) -> str:
if not text:
return ""
cleaned = str(text).strip()
if "[转移会话]" in cleaned:
return cleaned
if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS):
logger.warning("[Orchestrator] 拦截到内部内容外发,替换为安全兜底回复")
return "我在帮你看记录,稍等哈"
# 检查历史记录泄露模式
for pattern in _HISTORY_LEAK_PATTERNS:
if re.search(pattern, cleaned):
logger.warning(f"[Orchestrator] 检测到历史记录泄露模式: {pattern[:30]}...")
return "我在帮你看记录,稍等哈"
return cleaned
@staticmethod
def _extract_designer_name(transfer_cmd: str) -> str:
text = str(transfer_cmd or "").strip()
match = re.search(r"\[转移会话\],([^,]+),", text)
return str(match.group(1)).strip() if match else ""
@staticmethod
def _infer_processing_intent(requirement_text: str, history: Optional[List[dict]] = None) -> str:
combined_parts = [str(requirement_text or "").lower()]
for item in history or []:
if item.get("role") == "user":
combined_parts.append(str(item.get("content") or "").lower())
combined = "\n".join(combined_parts)
repair_keywords = ("修复", "高清", "清晰", "放大", "老照片")
if any(k in combined for k in repair_keywords):
return "repair"
return "find_original"
@staticmethod
def _collect_recent_image_urls(history: List[dict], fallback_urls: Optional[List[str]] = None) -> List[str]:
urls: List[str] = []
seen = set()
def add_url(url: str):
value = str(url or "").strip()
if not value or value in seen:
return
seen.add(value)
urls.append(value)
for url in fallback_urls or []:
add_url(url)
for item in reversed(history or []):
if item.get("role") != "user":
continue
raw_urls = item.get("image_urls") or []
if isinstance(raw_urls, str):
for part in re.split(r"[\n#]+", raw_urls):
add_url(part)
elif isinstance(raw_urls, list):
for part in raw_urls:
add_url(part)
content = str(item.get("content") or "")
for match in re.findall(r"https?://[^\s#]+", content):
add_url(match)
if len(urls) >= 5:
break
return urls
def _schedule_auto_pipeline(
self,
*,
session_key: str,
customer_id: str,
acc_id: str,
designer_name: str,
requirement_text: str,
history: List[dict],
image_urls: Optional[List[str]] = None,
):
resolved_urls = self._collect_recent_image_urls(history, image_urls)
if not resolved_urls:
logger.info(f"[Orchestrator] 自动处理跳过:未找到客户图片 user={customer_id} acc={acc_id}")
return
intent = self._infer_processing_intent(requirement_text, history)
signature_src = f"{session_key}|{designer_name}|{intent}|{'|'.join(resolved_urls)}"
signature = str(abs(hash(signature_src)))
now = time.time()
last_run = self._auto_pipeline_jobs.get(signature, 0.0)
if now - last_run < 600:
logger.info(f"[Orchestrator] 自动处理已在近期触发,跳过重复任务 user={customer_id} acc={acc_id}")
return
self._auto_pipeline_jobs[signature] = now
async def _runner():
try:
result = await auto_image_pipeline_service.process_and_notify(
session_key=session_key,
customer_id=customer_id,
acc_id=acc_id,
designer_name=designer_name,
requirement_text=requirement_text,
image_urls=resolved_urls,
intent=intent,
)
logger.info(
f"[Orchestrator] 自动处理完成 user={customer_id} acc={acc_id} "
f"ok={result.get('success')} uploaded={len(result.get('uploaded') or [])}"
)
except Exception as e:
logger.warning(f"[Orchestrator] 自动处理失败 user={customer_id} acc={acc_id}: {e}")
asyncio.create_task(_runner())
async def on_raw_message_received(self, platform: str, raw_data: dict):
"""链路入口"""
try:
if platform != "qianniu": return
self._ensure_background_tasks()
std_msg, direction = await self.qianniu_adapter.translate_inbound(raw_data)
# 关键修复:确保 user_id 绝不为空
user_id = std_msg.user_id or str(raw_data.get("cy_id") or raw_data.get("from_id") or "unknown")
std_msg.user_id = user_id
# 店铺隔离:同一客户在不同店铺的对话独立处理
session_key = f"{user_id}@{std_msg.acc_id}"
# 订单消息 / 纯金额通知 / SKU信息静默入库不触发 AI 回复
msg_text = std_msg.content or ""
is_order = "[系统订单信息]" in msg_text
is_price_only = bool(re.match(r'^[\s\n]*金?额?[:]?\s*[\d.]+\s*元', msg_text.strip()))
is_sku_only = bool(re.match(r'^[\s\n]*(备注[:]|数量[:]|款式[:]|定制[:])', msg_text.strip()))
is_sku_amount = bool(re.match(r'^[\s\n]*金额[:]\s*[\d.]+元\s*●', msg_text.strip()))
if is_order or is_price_only or is_sku_only or is_sku_amount:
await self._handle_order_packet(platform, std_msg)
logger.info(f"[订单消息] user={user_id} acc={std_msg.acc_id} 已入库更新状态")
await repo.save_chat(
platform,
user_id,
msg_text,
"in",
acc_id=std_msg.acc_id,
msg_type=std_msg.msg_type,
)
return
preview = (std_msg.content or "").replace("\n", "\\n")
if len(preview) > 120:
preview = preview[:120] + "..."
logger.info(
f"[监听消息] dir={direction} user={user_id} acc={std_msg.acc_id} "
f"type={std_msg.msg_type} images={len(std_msg.image_urls)} content={preview}"
)
# 过滤心跳;图片消息即使暂时没拿到 URL也不能直接丢掉
if std_msg.msg_type != 1 and not (std_msg.content or "").strip() and not std_msg.image_urls:
return
# 如果是商家人工回复,静默入库
if direction == "out":
await repo.save_chat(
platform,
user_id,
std_msg.content,
"out",
acc_id=std_msg.acc_id,
msg_type=std_msg.msg_type,
)
return
# ID 去重
if std_msg.msg_id:
if std_msg.msg_id in self._processed_msg_ids: return
self._processed_msg_ids.append(std_msg.msg_id)
# 进入防抖(使用 session_key 隔离不同店铺)
if session_key in self._debounce_tasks: self._debounce_tasks[session_key].cancel()
if session_key not in self._pending_messages: self._pending_messages[session_key] = []
self._pending_messages[session_key].append(std_msg)
self._debounce_tasks[session_key] = asyncio.create_task(self._debounced_process(session_key, user_id, platform))
except Exception as e:
logger.error(f"[Orchestrator] 处理失败: {e}")
async def _handle_order_packet(self, platform: str, msg: StandardMessage):
try:
from core.order_helpers import parse_order_info
from db.chat_log_db import upsert_order
content = msg.content or ""
info = parse_order_info(content)
price_match = re.search(r"金额[:]\s*([\d\.]+)\s*元", content)
if price_match:
await repo.update_task_price(platform, msg.user_id, float(price_match.group(1)))
if any(k in content for k in ["买家已付款", "卖家已发货"]):
await repo.update_task_outcome(platform, msg.user_id, "deal_success")
elif any(k in content for k in ["退款", "已关闭", "已取消"]):
await repo.update_task_outcome(platform, msg.user_id, "refunded")
# 结构化写入 customer_orders 表
order_id = info.get("order_id", "")
if order_id and msg.user_id and msg.user_id != "unknown":
title_match = re.search(r"商品标题[:]\s*([^\s]+(?:\s+[^\s订]+)*)", content)
product_title = title_match.group(1).strip() if title_match else ""
amount = float(info.get("amount", 0))
quantity = int(info.get("quantity", 0))
order_status = info.get("order_status", "")
buyer_note = info.get("buyer_note", "")
await asyncio.to_thread(
upsert_order,
customer_id=msg.user_id,
order_id=order_id,
order_status=order_status,
acc_id=msg.acc_id,
product_title=product_title,
amount=amount,
quantity=quantity,
buyer_note=buyer_note,
)
logger.info(f"[订单入库] user={msg.user_id} order={order_id} status={order_status} amount={amount}")
except Exception as e:
logger.warning(f"[Orchestrator] 订单消息处理异常: {e}")
async def _analyze_images_background(self, session_key: str, image_urls: List[str], requirement_text: str = ""):
"""后台静默分析图片,存入用户数据库用于数据标定"""
try:
from services.service_image_analyzer import image_analyzer_service
from db.customer_db import CustomerDatabase
db = CustomerDatabase()
profile = db.get_customer(session_key)
for url in (image_urls or [])[:1]:
try:
result = await image_analyzer_service.analyze(url, customer_requirement=requirement_text)
result_json = json.dumps(result, ensure_ascii=False)
# 更新最近一次分析
profile.last_image_analysis = result_json
profile.last_image_url = url
profile.last_gemini_prompt = result.get("gemini_prompt", "")
profile.last_aspect_ratio = result.get("aspect_ratio", "1:1")
profile.last_perspective = result.get("perspective", "no")
# 追加到历史记录保留最近20条
if profile.image_analysis_history is None:
profile.image_analysis_history = []
profile.image_analysis_history.append(result_json)
if len(profile.image_analysis_history) > 20:
profile.image_analysis_history = profile.image_analysis_history[-20:]
# 更新复杂度历史
complexity = result.get("complexity", "normal")
if profile.complexity_history is None:
profile.complexity_history = []
profile.complexity_history.append(complexity)
if len(profile.complexity_history) > 10:
profile.complexity_history = profile.complexity_history[-10:]
# 更新图片类型历史
proc_type = result.get("proc_type", "")
if proc_type and profile.image_type_history is not None:
if proc_type not in profile.image_type_history:
profile.image_type_history.append(proc_type)
logger.debug(f"[ImageAnalysis] session={session_key} 分析完成: {result.get('subject', '?')} | {complexity}")
except Exception as e:
logger.warning(f"[ImageAnalysis] 单张图片分析失败: {e}")
continue
# 保存更新
db.save_customer(profile)
logger.info(f"[ImageAnalysis] session={session_key} 分析结果已保存到数据库")
except Exception as e:
logger.warning(f"[ImageAnalysis] 后台分析失败: {e}")
async def _debounced_process(self, session_key: str, user_id: str, platform: str):
try:
# 记录开始时间(防抖前)
process_start = time.time()
await asyncio.sleep(self._debounce_seconds)
async with self._get_user_lock(session_key):
messages = self._pending_messages.pop(session_key, [])
if not messages: return
debounce_elapsed = time.time() - process_start
logger.info(f"[计时] user={user_id} 防抖等待完成: {debounce_elapsed:.1f}s")
# A. 合并与元数据修复(去重:同一防抖窗口内完全相同的内容只保留一条)
seen_contents = set()
unique_parts = []
for m in messages:
c = (m.content or "").strip()
if c and c not in seen_contents:
seen_contents.add(c)
unique_parts.append(c)
combined_content = "\n".join(unique_parts)
all_image_urls = []
acc_id = messages[-1].acc_id
acc_type = messages[-1].acc_type
for m in messages:
for url in m.image_urls:
if url not in all_image_urls: all_image_urls.append(url)
# 防抖合并后的消息仍需有 msg_id避免触发 StandardMessage 校验失败
merged_msg_id = messages[-1].msg_id if messages[-1].msg_id else f"merged-{user_id}-{int(time.time() * 1000)}"
final_msg = StandardMessage(
platform=platform,
msg_id=merged_msg_id,
user_id=user_id,
content=combined_content,
msg_type=messages[-1].msg_type,
image_urls=all_image_urls,
acc_id=acc_id,
acc_type=acc_type
)
# B. 持久化
db_start = time.time()
db_content = combined_content
if all_image_urls: db_content = f"【系统:已收到{len(all_image_urls)}张图】\n{combined_content}"
await repo.save_chat(
platform,
user_id,
db_content,
"in",
acc_id=acc_id,
image_urls=all_image_urls,
msg_type=final_msg.msg_type,
)
db_elapsed = time.time() - db_start
logger.info(f"[计时] user={user_id} 消息入库: {db_elapsed:.2f}s")
# B2. 后台图片分析(不阻塞主流程,用于数据标定)
if all_image_urls:
asyncio.create_task(self._analyze_images_background(session_key, all_image_urls, combined_content))
history_start = time.time()
history = await repo.get_chat_history(user_id, limit=12, acc_id=acc_id)
history_elapsed = time.time() - history_start
logger.info(f"[计时] user={user_id} 查询历史: {history_elapsed:.2f}s (共{len(history)}条)")
ai_history = history[:-1] if history and history[-1].get("content") == db_content else history
# C. 短时间追问且疑似没真正接上人工:优先补发一次转接
std_res = await self._retry_stalled_transfer_if_needed(
session_key=session_key,
user_id=user_id,
platform=platform,
acc_id=acc_id,
acc_type=acc_type,
history=history,
)
# D. 冷却检查转接成功后冷却期内直接回安抚话术不调AI
last_transfer = self._last_transfer_time.get(session_key, 0)
cooldown_elapsed = time.time() - last_transfer
is_in_cooldown = cooldown_elapsed < TRANSFER_COOLDOWN_SEC
if std_res is None and is_in_cooldown:
idx = self._transfer_calm_idx.get(session_key, 0)
calm_reply = _TRANSFER_CALM_REPLIES[idx % len(_TRANSFER_CALM_REPLIES)]
self._transfer_calm_idx[session_key] = idx + 1
logger.info(f"[Orchestrator] 转接冷却中({cooldown_elapsed:.0f}s),直接安抚: {calm_reply}")
std_res = StandardResponse(
reply_content=calm_reply,
metadata={"acc_id": acc_id, "acc_type": acc_type}
)
if std_res is None:
# E. 正常流程调用AI思考
ai_start = time.time()
std_res = await self.brain.think_and_reply(final_msg, history=ai_history)
ai_elapsed = time.time() - ai_start
total_elapsed = time.time() - process_start
logger.info(f"[计时] user={user_id} AI思考: {ai_elapsed:.1f}s | 总耗时: {total_elapsed:.1f}s")
# F. 发送并记录时间
if std_res.should_reply:
std_res.reply_content = self._sanitize_outbound_text(std_res.reply_content)
meta = dict(std_res.metadata or {})
meta.update({"acc_id": acc_id, "acc_type": acc_type})
std_res.metadata = meta
# 转接场景:先发一句安抚话,再发转接指令
if "[转移会话]" in std_res.reply_content:
designer_name = self._extract_designer_name(std_res.reply_content)
transfer_prelude = str(std_res.metadata.get("transfer_prelude") or "").strip()
greet = StandardResponse(
reply_content=transfer_prelude or "收到,我叫设计师来看下哈",
metadata={"acc_id": acc_id, "acc_type": acc_type}
)
await self.qianniu_adapter.translate_outbound(greet, user_id)
await repo.save_chat(
platform,
user_id,
greet.reply_content,
"out",
acc_id=acc_id,
msg_type=greet.msg_type,
)
await asyncio.sleep(0.5)
await self.qianniu_adapter.translate_outbound(std_res, user_id)
await repo.save_chat(
platform,
user_id,
std_res.reply_content,
"out",
acc_id=acc_id,
msg_type=std_res.msg_type,
)
if std_res.metadata.get("pending_transfer"):
reason = str(std_res.metadata.get("pending_transfer_reason") or "").strip()
if reason:
pending_id = await asyncio.to_thread(
enqueue_pending_transfer,
customer_id=user_id,
acc_id=acc_id,
acc_type=acc_type,
platform=platform,
reason=reason,
)
logger.info(
f"[Orchestrator] 已加入待转接池: pending_id={pending_id} user={user_id} acc={acc_id}"
)
if "[转移会话]" in std_res.reply_content:
self._last_transfer_time[session_key] = time.time()
self._schedule_auto_pipeline(
session_key=session_key,
customer_id=user_id,
acc_id=acc_id,
designer_name=self._extract_designer_name(std_res.reply_content),
requirement_text=combined_content,
history=history,
image_urls=all_image_urls,
)
except asyncio.CancelledError: pass
except Exception as e: logger.exception(f"[Orchestrator] 处理失败: {e}")
async def handle_outbound_event(self, user_id: str, platform: str, response: StandardResponse):
if platform == "qianniu":
if response and response.msg_type == 0:
response.reply_content = self._sanitize_outbound_text(response.reply_content)
await self.qianniu_adapter.translate_outbound(response, user_id)
async def _process_pending_transfers_loop(self):
while True:
try:
if not self.ws_client or not getattr(self.ws_client, "websocket", None):
await asyncio.sleep(PENDING_TRANSFER_POLL_SECONDS)
continue
rows = await asyncio.to_thread(claim_due_pending_transfers, 10)
if not rows:
await asyncio.sleep(PENDING_TRANSFER_POLL_SECONDS)
continue
for row in rows:
row_id = int(row["id"])
customer_id = str(row.get("customer_id") or "")
acc_id = str(row.get("acc_id") or "")
acc_type = str(row.get("acc_type") or "AliWorkbench")
reason = str(row.get("reason") or "").strip()
try:
designer_name = await dispatch_service.assign_designer(user_id=customer_id)
if not designer_name:
await asyncio.to_thread(
retry_pending_transfer,
row_id,
PENDING_TRANSFER_RETRY_SECONDS,
"designer_unavailable",
)
continue
notify = StandardResponse(
reply_content="设计师上线了,我给您转过去哈",
metadata={"acc_id": acc_id, "acc_type": acc_type},
)
transfer = StandardResponse(
reply_content=f"正在为您转接|[转移会话],{designer_name},无原因",
need_transfer=True,
metadata={"acc_id": acc_id, "acc_type": acc_type},
)
await self.qianniu_adapter.translate_outbound(notify, customer_id)
await repo.save_chat(
"qianniu",
customer_id,
notify.reply_content,
"out",
acc_id=acc_id,
msg_type=notify.msg_type,
)
await asyncio.sleep(0.5)
await self.qianniu_adapter.translate_outbound(transfer, customer_id)
await repo.save_chat(
"qianniu",
customer_id,
transfer.reply_content,
"out",
acc_id=acc_id,
msg_type=transfer.msg_type,
)
self._last_transfer_time[f"{customer_id}@{acc_id}"] = time.time()
history = await repo.get_chat_history(customer_id, limit=12, acc_id=acc_id)
self._schedule_auto_pipeline(
session_key=f"{customer_id}@{acc_id}",
customer_id=customer_id,
acc_id=acc_id,
designer_name=designer_name,
requirement_text=reason,
history=history,
)
await asyncio.to_thread(complete_pending_transfer, row_id)
logger.info(
f"[Orchestrator] 待转接自动完成: pending_id={row_id} user={customer_id} designer={designer_name} reason={reason}"
)
except Exception as e:
logger.warning(f"[Orchestrator] 待转接处理失败 pending_id={row_id}: {e}")
await asyncio.to_thread(
retry_pending_transfer,
row_id,
PENDING_TRANSFER_RETRY_SECONDS,
str(e),
)
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(f"[Orchestrator] 待转接轮询异常: {e}")
await asyncio.sleep(PENDING_TRANSFER_RETRY_SECONDS)
# 全局单例
orchestrator: Optional[SystemOrchestrator] = None
def init_orchestrator(ws_client):
global orchestrator
orchestrator = SystemOrchestrator(ws_client)
return orchestrator