Compare commits

...

2 Commits

4 changed files with 331 additions and 2 deletions

View File

@@ -8,6 +8,28 @@ from core.schema import StandardMessage, StandardResponse
logger = logging.getLogger("cs_agent") logger = logging.getLogger("cs_agent")
_OUTBOUND_BLOCK_MARKERS = (
"【历史记录摘要】",
"【详细记录】",
"【订单摘要】",
"【订单详情】",
"<think",
"think_never_used",
'[{"name":',
)
_HISTORY_LEAK_PATTERNS = [
r'\[\d{4}-\d{2}-\d{2}[^\]]*\]\s*(客户|客服)[:]',
r'\[\d{2}:\d{2}:\d{2}\]\s*(客户|客服|我)[:]',
r'(根据|查看|查询|翻看)(历史|聊天|对话)(记录|内容)',
r'历史(记录|对话|消息)(显示|表明|中)',
r'之前的(聊天|对话|记录)(中|里|显示)',
r'\d+条(历史|对话)?消息',
r'订单号[:]\s*\d{10,}',
r'(状态|金额|数量)[:].*(状态|金额|数量)[:]',
]
class QianniuAdapter(BaseAdapter): class QianniuAdapter(BaseAdapter):
""" """
千牛适配器:支持识别消息来源(客户 vs 商家人工)。 千牛适配器:支持识别消息来源(客户 vs 商家人工)。
@@ -30,6 +52,22 @@ class QianniuAdapter(BaseAdapter):
logger.warning(f"[QianniuAdapter] 读取转接分组配置失败: {e}") logger.warning(f"[QianniuAdapter] 读取转接分组配置失败: {e}")
return self._default_group_id return self._default_group_id
@staticmethod
def _sanitize_outbound_text(content: str) -> str:
if not content:
return ""
cleaned = str(content).strip()
if "[转移会话]" in cleaned:
return cleaned
if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS):
logger.warning("[QianniuAdapter] 拦截到内部内容外发,替换为安全兜底回复")
return "我在帮你看记录,稍等哈"
for pattern in _HISTORY_LEAK_PATTERNS:
if re.search(pattern, cleaned):
logger.warning(f"[QianniuAdapter] 检测到历史记录泄露模式: {pattern[:30]}...")
return "我在帮你看记录,稍等哈"
return cleaned
async def translate_inbound(self, raw: dict) -> Tuple[StandardMessage, str]: async def translate_inbound(self, raw: dict) -> Tuple[StandardMessage, str]:
""" """
返回: (标准消息, 消息方向) 返回: (标准消息, 消息方向)
@@ -81,6 +119,9 @@ class QianniuAdapter(BaseAdapter):
else: else:
content = res.reply_content content = res.reply_content
if res.msg_type == 0:
content = self._sanitize_outbound_text(content)
try: try:
logger.info( logger.info(
f"[REPLY->CUSTOMER] user={user_id} acc={acc_id} type={res.msg_type}\n{content}" f"[REPLY->CUSTOMER] user={user_id} acc={acc_id} type={res.msg_type}\n{content}"

View File

@@ -10,6 +10,13 @@ from core.adapters.qianniu_adapter import QianniuAdapter
from core.pydantic_ai_agent_v2 import CustomerServiceBrain from core.pydantic_ai_agent_v2 import CustomerServiceBrain
from core.events.event_bus import bus from core.events.event_bus import bus
from core.repository import repo from core.repository import repo
from db.pending_transfer_db import (
enqueue_pending_transfer,
claim_due_pending_transfers,
complete_pending_transfer,
retry_pending_transfer,
)
from services.dispatch_service import dispatch_service
logger = logging.getLogger("cs_agent") logger = logging.getLogger("cs_agent")
@@ -17,6 +24,8 @@ logger = logging.getLogger("cs_agent")
MSG_DEDUP_CAPACITY = 200 # 消息 ID 去重缓存容量 MSG_DEDUP_CAPACITY = 200 # 消息 ID 去重缓存容量
TRANSFER_COOLDOWN_SEC = 120 # 转接冷却时间(秒)—— 转接后2分钟内不再调用AI TRANSFER_COOLDOWN_SEC = 120 # 转接冷却时间(秒)—— 转接后2分钟内不再调用AI
DEBOUNCE_SECONDS = 2.0 # 消息防抖延迟(秒) DEBOUNCE_SECONDS = 2.0 # 消息防抖延迟(秒)
PENDING_TRANSFER_POLL_SECONDS = 30
PENDING_TRANSFER_RETRY_SECONDS = 60
# 转接后安抚话术池(轮换使用,避免复读) # 转接后安抚话术池(轮换使用,避免复读)
_TRANSFER_CALM_REPLIES = [ _TRANSFER_CALM_REPLIES = [
@@ -70,6 +79,7 @@ class SystemOrchestrator:
self._debounce_tasks: Dict[str, asyncio.Task] = {} self._debounce_tasks: Dict[str, asyncio.Task] = {}
self._pending_messages: Dict[str, List[StandardMessage]] = {} self._pending_messages: Dict[str, List[StandardMessage]] = {}
self._user_locks: Dict[str, asyncio.Lock] = {} self._user_locks: Dict[str, asyncio.Lock] = {}
self._pending_transfer_task: Optional[asyncio.Task] = None
bus.subscribe("MESSAGE_OUTBOUND", self.handle_outbound_event) bus.subscribe("MESSAGE_OUTBOUND", self.handle_outbound_event)
@@ -86,6 +96,17 @@ class SystemOrchestrator:
self._user_locks[user_id] = asyncio.Lock() self._user_locks[user_id] = asyncio.Lock()
return self._user_locks[user_id] return self._user_locks[user_id]
def _should_run_pending_transfer_worker(self) -> bool:
worker_id = getattr(self.ws_client, "worker_id", -1) if self.ws_client else -1
return worker_id in (-1, 0)
def _ensure_background_tasks(self):
if not self._should_run_pending_transfer_worker():
return
if self._pending_transfer_task is None or self._pending_transfer_task.done():
self._pending_transfer_task = asyncio.create_task(self._process_pending_transfers_loop())
logger.info("[Orchestrator] 待转接轮询任务已启动")
@staticmethod @staticmethod
def _sanitize_outbound_text(text: str) -> str: def _sanitize_outbound_text(text: str) -> str:
if not text: if not text:
@@ -107,6 +128,7 @@ class SystemOrchestrator:
"""链路入口""" """链路入口"""
try: try:
if platform != "qianniu": return if platform != "qianniu": return
self._ensure_background_tasks()
std_msg, direction = await self.qianniu_adapter.translate_inbound(raw_data) std_msg, direction = await self.qianniu_adapter.translate_inbound(raw_data)
@@ -343,7 +365,9 @@ class SystemOrchestrator:
# E. 发送并记录时间 # E. 发送并记录时间
if std_res.should_reply: if std_res.should_reply:
std_res.reply_content = self._sanitize_outbound_text(std_res.reply_content) std_res.reply_content = self._sanitize_outbound_text(std_res.reply_content)
std_res.metadata = {"acc_id": acc_id, "acc_type": acc_type} meta = dict(std_res.metadata or {})
meta.update({"acc_id": acc_id, "acc_type": acc_type})
std_res.metadata = meta
# 转接场景:先发一句安抚话,再发转接指令 # 转接场景:先发一句安抚话,再发转接指令
if "[转移会话]" in std_res.reply_content: if "[转移会话]" in std_res.reply_content:
@@ -358,6 +382,21 @@ class SystemOrchestrator:
await self.qianniu_adapter.translate_outbound(std_res, user_id) await self.qianniu_adapter.translate_outbound(std_res, user_id)
await repo.save_chat(platform, user_id, std_res.reply_content, "out", acc_id=acc_id) await repo.save_chat(platform, user_id, std_res.reply_content, "out", acc_id=acc_id)
if std_res.metadata.get("pending_transfer"):
reason = str(std_res.metadata.get("pending_transfer_reason") or "").strip()
if reason:
pending_id = await asyncio.to_thread(
enqueue_pending_transfer,
customer_id=user_id,
acc_id=acc_id,
acc_type=acc_type,
platform=platform,
reason=reason,
)
logger.info(
f"[Orchestrator] 已加入待转接池: pending_id={pending_id} user={user_id} acc={acc_id}"
)
if "[转移会话]" in std_res.reply_content: if "[转移会话]" in std_res.reply_content:
self._last_transfer_time[session_key] = time.time() self._last_transfer_time[session_key] = time.time()
@@ -366,8 +405,75 @@ class SystemOrchestrator:
async def handle_outbound_event(self, user_id: str, platform: str, response: StandardResponse): async def handle_outbound_event(self, user_id: str, platform: str, response: StandardResponse):
if platform == "qianniu": if platform == "qianniu":
if response and response.msg_type == 0:
response.reply_content = self._sanitize_outbound_text(response.reply_content)
await self.qianniu_adapter.translate_outbound(response, user_id) await self.qianniu_adapter.translate_outbound(response, user_id)
async def _process_pending_transfers_loop(self):
while True:
try:
if not self.ws_client or not getattr(self.ws_client, "websocket", None):
await asyncio.sleep(PENDING_TRANSFER_POLL_SECONDS)
continue
rows = await asyncio.to_thread(claim_due_pending_transfers, 10)
if not rows:
await asyncio.sleep(PENDING_TRANSFER_POLL_SECONDS)
continue
for row in rows:
row_id = int(row["id"])
customer_id = str(row.get("customer_id") or "")
acc_id = str(row.get("acc_id") or "")
acc_type = str(row.get("acc_type") or "AliWorkbench")
reason = str(row.get("reason") or "").strip()
try:
designer_name = await dispatch_service.assign_designer(user_id=customer_id)
if not designer_name:
await asyncio.to_thread(
retry_pending_transfer,
row_id,
PENDING_TRANSFER_RETRY_SECONDS,
"designer_unavailable",
)
continue
notify = StandardResponse(
reply_content="设计师上线了,我给您转过去哈",
metadata={"acc_id": acc_id, "acc_type": acc_type},
)
transfer = StandardResponse(
reply_content=f"正在为您转接|[转移会话],{designer_name},无原因",
need_transfer=True,
metadata={"acc_id": acc_id, "acc_type": acc_type},
)
await self.qianniu_adapter.translate_outbound(notify, customer_id)
await repo.save_chat("qianniu", customer_id, notify.reply_content, "out", acc_id=acc_id)
await asyncio.sleep(0.5)
await self.qianniu_adapter.translate_outbound(transfer, customer_id)
await repo.save_chat("qianniu", customer_id, transfer.reply_content, "out", acc_id=acc_id)
self._last_transfer_time[f"{customer_id}@{acc_id}"] = time.time()
await asyncio.to_thread(complete_pending_transfer, row_id)
logger.info(
f"[Orchestrator] 待转接自动完成: pending_id={row_id} user={customer_id} designer={designer_name} reason={reason}"
)
except Exception as e:
logger.warning(f"[Orchestrator] 待转接处理失败 pending_id={row_id}: {e}")
await asyncio.to_thread(
retry_pending_transfer,
row_id,
PENDING_TRANSFER_RETRY_SECONDS,
str(e),
)
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(f"[Orchestrator] 待转接轮询异常: {e}")
await asyncio.sleep(PENDING_TRANSFER_RETRY_SECONDS)
# 全局单例 # 全局单例
orchestrator: Optional[SystemOrchestrator] = None orchestrator: Optional[SystemOrchestrator] = None
def init_orchestrator(ws_client): def init_orchestrator(ws_client):

View File

@@ -3,6 +3,7 @@ import re
import hashlib import hashlib
import logging import logging
import time import time
import json
from typing import List, Optional, Any, Dict from typing import List, Optional, Any, Dict
from pydantic_ai import Agent, RunContext from pydantic_ai import Agent, RunContext
from pydantic_ai.models.openai import OpenAIChatModel from pydantic_ai.models.openai import OpenAIChatModel
@@ -236,6 +237,8 @@ class CustomerServiceBrain:
logger.info(f"[Brain] AI处理完成总耗时{elapsed:.1f}s") logger.info(f"[Brain] AI处理完成总耗时{elapsed:.1f}s")
# ===== 详细日志AI 的思考过程和工具调用 ===== # ===== 详细日志AI 的思考过程和工具调用 =====
pending_transfer_reason = ""
pending_transfer_error = ""
try: try:
all_msgs = result.all_messages() all_msgs = result.all_messages()
for idx, m in enumerate(all_msgs): for idx, m in enumerate(all_msgs):
@@ -247,9 +250,20 @@ class CustomerServiceBrain:
tool_name = getattr(part, 'tool_name', '?') tool_name = getattr(part, 'tool_name', '?')
tool_args = getattr(part, 'args', {}) tool_args = getattr(part, 'args', {})
logger.info(f"[AI思考] 步骤{idx+1} 调用工具: {tool_name}({tool_args})") logger.info(f"[AI思考] 步骤{idx+1} 调用工具: {tool_name}({tool_args})")
if tool_name == "transfer_to_human_tool":
if isinstance(tool_args, str):
try:
tool_args = json.loads(tool_args)
except Exception:
tool_args = {"reason": tool_args}
if isinstance(tool_args, dict):
pending_transfer_reason = str(tool_args.get("reason") or "").strip()
elif part_kind == 'tool-return': elif part_kind == 'tool-return':
content = str(getattr(part, 'content', ''))[:200] content = str(getattr(part, 'content', ''))[:200]
logger.info(f"[AI思考] 步骤{idx+1} 工具返回: {content}") logger.info(f"[AI思考] 步骤{idx+1} 工具返回: {content}")
full_content = str(getattr(part, 'content', ''))
if full_content.startswith("ERROR_DESIGNER_"):
pending_transfer_error = full_content
elif part_kind == 'text': elif part_kind == 'text':
content = str(getattr(part, 'content', ''))[:150] content = str(getattr(part, 'content', ''))[:150]
if content.strip(): if content.strip():
@@ -298,7 +312,13 @@ class CustomerServiceBrain:
return StandardResponse( return StandardResponse(
reply_content=reply_text, reply_content=reply_text,
need_transfer=need_transfer, need_transfer=need_transfer,
metadata={"acc_id": msg.acc_id, "acc_type": msg.acc_type} metadata={
"acc_id": msg.acc_id,
"acc_type": msg.acc_type,
"pending_transfer": bool(pending_transfer_error and pending_transfer_reason),
"pending_transfer_reason": pending_transfer_reason,
"pending_transfer_error": pending_transfer_error,
}
) )
except Exception as e: except Exception as e:

162
db/pending_transfer_db.py Normal file
View File

@@ -0,0 +1,162 @@
# -*- coding: utf-8 -*-
"""
待转接队列(本地 SQLite
用于在设计师不在线时暂存转接请求,待设计师上线后自动转接。
"""
import os
import sqlite3
from datetime import datetime, timedelta
from typing import List, Dict
_DB_PATH = os.path.join(os.path.dirname(__file__), "pending_transfer.db")
def _get_conn() -> sqlite3.Connection:
conn = sqlite3.connect(_DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
os.makedirs(os.path.dirname(_DB_PATH), exist_ok=True)
with _get_conn() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS pending_transfers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id TEXT NOT NULL,
acc_id TEXT DEFAULT '',
acc_type TEXT DEFAULT '',
platform TEXT DEFAULT 'qianniu',
reason TEXT DEFAULT '',
status TEXT NOT NULL DEFAULT 'pending',
retry_count INTEGER NOT NULL DEFAULT 0,
next_retry_at TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
completed_at TEXT DEFAULT '',
last_error TEXT DEFAULT ''
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_pending_status_retry ON pending_transfers(status, next_retry_at)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_pending_customer_acc ON pending_transfers(customer_id, acc_id)"
)
conn.commit()
init_db()
def enqueue_pending_transfer(
customer_id: str,
acc_id: str,
acc_type: str,
platform: str,
reason: str,
) -> int:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with _get_conn() as conn:
row = conn.execute(
"""
SELECT id
FROM pending_transfers
WHERE customer_id = ? AND acc_id = ? AND status IN ('pending', 'processing')
ORDER BY id DESC
LIMIT 1
""",
(customer_id, acc_id),
).fetchone()
if row:
conn.execute(
"""
UPDATE pending_transfers
SET acc_type = ?, platform = ?, reason = ?, status = 'pending',
next_retry_at = ?, updated_at = ?, last_error = ''
WHERE id = ?
""",
(acc_type, platform, reason, now, now, row["id"]),
)
conn.commit()
return int(row["id"])
cur = conn.execute(
"""
INSERT INTO pending_transfers
(customer_id, acc_id, acc_type, platform, reason, status, retry_count, next_retry_at, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, 'pending', 0, ?, ?, ?)
""",
(customer_id, acc_id, acc_type, platform, reason, now, now, now),
)
conn.commit()
return int(cur.lastrowid)
def claim_due_pending_transfers(limit: int = 10) -> List[Dict]:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with _get_conn() as conn:
conn.execute("BEGIN IMMEDIATE")
rows = conn.execute(
"""
SELECT *
FROM pending_transfers
WHERE status = 'pending' AND next_retry_at <= ?
ORDER BY created_at ASC, id ASC
LIMIT ?
""",
(now, limit),
).fetchall()
ids = [int(r["id"]) for r in rows]
if ids:
placeholders = ",".join("?" for _ in ids)
conn.execute(
f"""
UPDATE pending_transfers
SET status = 'processing', updated_at = ?
WHERE id IN ({placeholders})
""",
(now, *ids),
)
conn.commit()
return [dict(r) for r in rows]
def complete_pending_transfer(row_id: int):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with _get_conn() as conn:
conn.execute(
"""
UPDATE pending_transfers
SET status = 'completed', completed_at = ?, updated_at = ?, last_error = ''
WHERE id = ?
""",
(now, now, row_id),
)
conn.commit()
def retry_pending_transfer(row_id: int, delay_seconds: int = 60, error: str = ""):
now = datetime.now()
next_retry = now + timedelta(seconds=max(delay_seconds, 5))
now_s = now.strftime("%Y-%m-%d %H:%M:%S")
next_s = next_retry.strftime("%Y-%m-%d %H:%M:%S")
with _get_conn() as conn:
conn.execute(
"""
UPDATE pending_transfers
SET status = 'pending',
retry_count = retry_count + 1,
next_retry_at = ?,
updated_at = ?,
last_error = ?
WHERE id = ?
""",
(next_s, now_s, error[:500], row_id),
)
conn.commit()