feat: enforce full AI outbound generation and reduce template replies

This commit is contained in:
2026-03-02 11:09:26 +08:00
parent 6433708597
commit 9d0276be41
5 changed files with 415 additions and 34 deletions

View File

@@ -8,7 +8,7 @@ import secrets
import time
import hashlib
from collections import deque
from datetime import datetime
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Dict, Any, List
from utils.observability import emit_activity, build_trace_id
@@ -170,6 +170,8 @@ class QingjianAPIClient:
self._last_reply_sent_at: dict = {} # customer_key -> monotonic ts
self._outbound_semantic_seen: dict = {} # customer_key -> {semantic_key: ts}
self._outbound_class_seen: dict = {} # customer_key -> {reply_class: ts}
self._outbound_template_seen: dict = {} # customer_key -> {template_family: ts}
self._unreplied_followup_sent: dict = {} # customer_key -> monotonic ts补偿消息节流
self._inbound_log_seen: dict = {} # signature -> monotonic ts防重复写入
self._outbound_log_seen: dict = {} # signature -> monotonic ts防重复写入
self._tianwang_callback_url = (
@@ -179,6 +181,7 @@ class QingjianAPIClient:
self._tianwang_agent_name = os.getenv("TIANWANG_AGENT_NAME", "终结者").strip() or "终结者"
self._reply_guard_enabled = os.getenv("AI_REPLY_GUARD_ENABLED", "true").lower() in ("1", "true", "yes")
self._reply_guard_verbose = os.getenv("AI_REPLY_GUARD_VERBOSE", "false").lower() in ("1", "true", "yes")
self._force_ai_generate_reply = os.getenv("FORCE_AI_GENERATE_ALL_REPLIES", "true").lower() in ("1", "true", "yes")
# 延迟加载任务模块(避免循环导入)
self.task_scheduler = None
@@ -429,6 +432,22 @@ class QingjianAPIClient:
return "ack"
return "general"
@staticmethod
def _template_family(reply: str) -> str:
"""识别高频模板家族,做疲劳抑制。"""
s = (reply or "").strip()
if not s:
return ""
if "需求我记上了" in s and "继续发图" in s:
return "collect_remind"
if ("这批图过一遍" in s or "收齐了" in s or "收好了" in s) and ("总价" in s or "报价" in s):
return "quote_defer"
if "图片收到了" in s and "继续发" in s:
return "collect_ack"
if "好嘞,你稍等下,我这边看一下" in s:
return "fallback_ack"
return ""
def _outbound_arbiter(self, original_msg: dict, reply_content: str, trace_id: str) -> tuple[bool, str]:
"""
统一出站裁决层:
@@ -447,11 +466,17 @@ class QingjianAPIClient:
class_window = max(20, int(os.getenv("AI_OUTBOUND_CLASS_DEDUPE_SECONDS", "90")))
except Exception:
class_window = 90
try:
template_window = max(120, int(os.getenv("AI_OUTBOUND_TEMPLATE_FATIGUE_SECONDS", "600")))
except Exception:
template_window = 600
sem_bucket = self._outbound_semantic_seen.setdefault(key, {})
cls_bucket = self._outbound_class_seen.setdefault(key, {})
tpl_bucket = self._outbound_template_seen.setdefault(key, {})
self._prune_seen(sem_bucket, now_mono, ttl_sec=max(sem_window * 2, 240))
self._prune_seen(cls_bucket, now_mono, ttl_sec=max(class_window * 2, 180))
self._prune_seen(tpl_bucket, now_mono, ttl_sec=max(template_window * 2, 1200))
if sem_key and (now_mono - sem_bucket.get(sem_key, 0.0)) < sem_window:
self._activity_log(
@@ -466,6 +491,19 @@ class QingjianAPIClient:
)
return False, "semantic_duplicate"
template_family = self._template_family(reply_content)
if template_family and (now_mono - tpl_bucket.get(template_family, 0.0)) < template_window:
self._activity_log(
"outbound_arbiter_block",
trace_id=trace_id,
acc_id=original_msg.get("acc_id", ""),
customer_id=original_msg.get("from_id", ""),
reason="template_fatigue",
template_family=template_family,
msg=reply_content,
)
return False, "template_fatigue"
if reply_class in {"quote", "collect", "ack"} and (now_mono - cls_bucket.get(reply_class, 0.0)) < class_window:
self._activity_log(
"outbound_arbiter_block",
@@ -481,16 +519,199 @@ class QingjianAPIClient:
if sem_key:
sem_bucket[sem_key] = now_mono
cls_bucket[reply_class] = now_mono
if template_family:
tpl_bucket[template_family] = now_mono
self._activity_log(
"outbound_arbiter_pass",
trace_id=trace_id,
acc_id=original_msg.get("acc_id", ""),
customer_id=original_msg.get("from_id", ""),
reply_class=reply_class,
template_family=template_family,
semantic_key=sem_key[:80] if sem_key else "",
)
return True, "pass"
async def _unreplied_followup_loop(self):
"""
定时补偿:对“最后一条是客户消息且长时间未回复”的会话,补发一次自然跟进。
"""
if not self.enable_agent or not self.agent:
return
while self.running:
try:
await asyncio.sleep(max(30, int(os.getenv("UNREPLIED_FOLLOWUP_SCAN_SECONDS", "90"))))
await self._scan_and_send_unreplied_followups()
except asyncio.CancelledError:
break
except Exception as e:
self._activity_log("unreplied_followup_loop_error", error=str(e))
async def _scan_and_send_unreplied_followups(self):
from db import chat_log_db as cdb
try:
idle_minutes = max(5, int(os.getenv("UNREPLIED_FOLLOWUP_IDLE_MINUTES", "12")))
max_age_minutes = max(idle_minutes, int(os.getenv("UNREPLIED_FOLLOWUP_MAX_AGE_MINUTES", "180")))
followup_cd = max(300, int(os.getenv("UNREPLIED_FOLLOWUP_COOLDOWN_SECONDS", "3600")))
limit = max(10, int(os.getenv("UNREPLIED_FOLLOWUP_LIMIT", "40")))
except Exception:
idle_minutes, max_age_minutes, followup_cd, limit = 12, 180, 3600, 40
now = datetime.now()
window_start = (now - timedelta(minutes=max_age_minutes)).strftime("%Y-%m-%d %H:%M:%S")
conn = None
try:
conn = cdb._get_conn()
rows = conn.execute(
cdb._sql(
"""
SELECT acc_id, customer_id, MAX(id) AS last_id
FROM chat_logs
WHERE timestamp >= ?
GROUP BY acc_id, customer_id
ORDER BY MAX(id) DESC
LIMIT ?
"""
),
(window_start, limit * 6),
).fetchall()
sessions = [dict(r) for r in rows]
sent = 0
for s in sessions:
if sent >= limit:
break
acc_id = str(s.get("acc_id", "") or "")
cid = str(s.get("customer_id", "") or "")
if not acc_id or not cid:
continue
ckey = f"{acc_id}:{cid}"
if not self._is_owned_by_this_worker(ckey):
continue
last = conn.execute(
cdb._sql(
"""
SELECT id, direction, message, timestamp, customer_name, acc_id, platform
FROM chat_logs
WHERE acc_id = ? AND customer_id = ?
ORDER BY id DESC
LIMIT 1
"""
),
(acc_id, cid),
).fetchone()
if not last:
continue
last = dict(last)
if str(last.get("direction", "")) != "in":
continue
last_ts = last.get("timestamp")
if isinstance(last_ts, datetime):
last_dt = last_ts
else:
last_dt = datetime.strptime(str(last_ts)[:19], "%Y-%m-%d %H:%M:%S")
idle_s = (now - last_dt).total_seconds()
if idle_s < idle_minutes * 60 or idle_s > max_age_minutes * 60:
continue
now_mono = time.monotonic()
if (now_mono - self._unreplied_followup_sent.get(ckey, 0.0)) < followup_cd:
continue
# 避免对明显结束语/确认语再次打扰
last_msg = str(last.get("message", "") or "").strip().lower()
if last_msg in {"好的", "", "ok", "收到", "", ""}:
continue
followup = await self._compose_ai_scene_reply(
original_msg={
"acc_id": acc_id,
"from_id": cid,
"from_name": self.to_chinese(last.get("customer_name", "") or cid),
"acc_type": str(last.get("platform", "") or "AliWorkbench"),
"msg": str(last.get("message", "") or ""),
},
scene="unreplied_followup",
intent_hint="客户上一条消息还没接上,先自然承接并请对方补一句当前要处理的图或要求。",
fallback="刚看到你消息了,我在的。你把要处理的图或要求再发我一下,我马上接着看。",
)
fake = {
"acc_id": acc_id,
"from_id": cid,
"from_name": self.to_chinese(last.get("customer_name", "") or cid),
"cy_id": cid,
"cy_name": self.to_chinese(last.get("customer_name", "") or cid),
"acc_type": str(last.get("platform", "") or "AliWorkbench"),
"msg": str(last.get("message", "") or ""),
"msg_type": 0,
}
await self.send_reply(fake, followup)
self._unreplied_followup_sent[ckey] = now_mono
sent += 1
self._activity_log(
"unreplied_followup_sent",
acc_id=acc_id,
customer_id=cid,
idle_seconds=int(idle_s),
last_msg=str(last.get("message", "") or "")[:120],
reply=followup,
)
finally:
try:
if conn:
conn.close()
except Exception:
pass
async def _compose_ai_scene_reply(
self,
*,
original_msg: dict,
scene: str,
intent_hint: str,
fallback: str,
) -> str:
"""
场景化 AI 直接生成回复(不依赖固定模板)。
"""
if not self.enable_agent or not self.agent or not AgentDeps:
return fallback
try:
deps = AgentDeps(
msg_id=str(original_msg.get("msg_id", "") or f"{scene}_gen"),
acc_id=str(original_msg.get("acc_id", "") or ""),
from_id=str(original_msg.get("from_id", "") or ""),
platform=str(original_msg.get("acc_type", "") or ""),
)
customer_msg = self.to_chinese(str(original_msg.get("msg", "") or ""))
prompt = (
"你是淘宝客服,直接生成一条发给客户的话。\n"
f"场景: {scene}\n"
f"意图: {intent_hint}\n"
f"客户原话: {customer_msg}\n"
"要求: 1-2句自然口语不要模板腔不要新增价格/承诺;只输出最终回复。\n"
)
result = await self.agent.agent_natural_reply.run(prompt, deps=deps, message_history=[])
out = str(getattr(result, "output", "") or "").strip()
if not out:
return fallback
if out.startswith("话术|") or "[转移会话]" in out or "TRANSFER_REQUESTED" in out:
return fallback
self._activity_log(
"ai_scene_reply_generated",
acc_id=str(original_msg.get("acc_id", "") or ""),
customer_id=str(original_msg.get("from_id", "") or ""),
scene=scene,
generated=out[:160],
)
return out
except Exception as e:
self._activity_log(
"ai_scene_reply_error",
acc_id=str(original_msg.get("acc_id", "") or ""),
customer_id=str(original_msg.get("from_id", "") or ""),
scene=scene,
error=str(e),
)
return fallback
async def receive_messages(self):
"""持续接收消息"""
try:
@@ -1296,7 +1517,7 @@ class QingjianAPIClient:
return
# 标记本批次已自动触发,避免同内容循环“马上报价”。
self._auto_quote_done_sig[capture_key] = capture_sig
# 直接置为可报价,然后走“发完了,报价吧”触发既有报价链路
# 直接置为可报价,走内部自动报价入口(不伪造客户语句)。
self.agent._mark_quote_ready(st)
self.agent._sync_pending_quote_state(capture_cid, st)
self._activity_log(
@@ -1308,7 +1529,7 @@ class QingjianAPIClient:
notify_msg = CustomerMessage(
msg_id="auto_quote_idle_trigger",
acc_id=capture_data.get('acc_id', ''),
msg="发完了,报价吧",
msg="__AUTO_QUOTE_INTERNAL_TRIGGER__",
from_id=capture_cid,
from_name=self.to_chinese(capture_data.get('from_name', '') or capture_data.get('cy_name', '')),
cy_id=capture_data.get('cy_id', ''),
@@ -1318,7 +1539,7 @@ class QingjianAPIClient:
goods_name=self.to_chinese(capture_data.get('goods_name', '')) if capture_data.get('goods_name') else None,
goods_order=self.to_chinese(capture_data.get('goods_order', '')) if capture_data.get('goods_order') else None,
)
response = await self.agent.process_message(notify_msg)
response = await self.agent.build_auto_quote_reply(st, notify_msg)
if response.should_reply and response.reply and not response.need_transfer:
await self.send_reply(capture_data, response.reply)
self._activity_log(
@@ -1636,7 +1857,12 @@ class QingjianAPIClient:
logger.info(f"系统询单命中 | 店铺:{acc_id} | 客户:{customer_id} | action:{action}")
if action == "reply":
reply = policy.get("reply") or "您好,这边已收到询单消息,稍后由人工客服跟进处理。"
reply = await self._compose_ai_scene_reply(
original_msg=data,
scene="system_inquiry_reply",
intent_hint="这是系统客服询单消息,简短确认已收到并说明会跟进即可。",
fallback=(policy.get("reply") or "您好,这边已收到询单消息,稍后由人工客服跟进处理。"),
)
await self.send_reply(data, reply)
metrics_emit("system_inquiry_auto_reply", customer_id=customer_id, acc_id=acc_id)
return True
@@ -1976,6 +2202,10 @@ class QingjianAPIClient:
return
reply_content = self._colloquialize_outbound_reply(reply_content)
reply_content = await self._ai_generate_outbound_reply(
original_msg=original_msg,
reply_content=str(reply_content or ""),
)
# 同一客户外发限流N 秒内最多 1 条
try:
@@ -2058,6 +2288,62 @@ class QingjianAPIClient:
reply["_trace_id"] = trace_id
await self.send_message(reply)
async def _ai_generate_outbound_reply(self, original_msg: dict, reply_content: str) -> str:
"""
强制全量 AI 出站生成层:
- 所有普通文本外发先由 AI 生成最终话术;
- 控制命令/纯链接/转接指令直接绕过。
"""
text = (reply_content or "").strip()
if not text:
return text
if text.startswith("话术|") or "[转移会话]" in text or "TRANSFER_REQUESTED" in text:
return text
if re.fullmatch(r"https?://\S+", text):
return text
if not self._force_ai_generate_reply or not self.enable_agent or not self.agent or not AgentDeps:
return text
try:
deps = AgentDeps(
msg_id=str(original_msg.get("msg_id", "") or "outbound_generate"),
acc_id=str(original_msg.get("acc_id", "") or ""),
from_id=str(original_msg.get("from_id", "") or ""),
platform=str(original_msg.get("acc_type", "") or ""),
)
customer_msg = self.to_chinese(str(original_msg.get("msg", "") or ""))
prompt = (
"你是淘宝客服外发文案生成器。请根据“回复意图草稿”生成最终发给客户的话。\n"
"要求:\n"
"1) 保留原意,不新增价格/承诺/流程;\n"
"2) 自然像真人聊天,不用固定模板句;\n"
"3) 1-2句\n"
"4) 只输出最终回复文本。\n\n"
f"客户原话: {customer_msg}\n"
f"回复意图草稿: {text}\n"
)
result = await self.agent.agent_natural_reply.run(prompt, deps=deps, message_history=[])
out = str(getattr(result, "output", "") or "").strip()
if not out:
return text
if out.startswith("话术|") or "[转移会话]" in out:
return text
self._activity_log(
"ai_generate_reply",
acc_id=str(original_msg.get("acc_id", "") or ""),
customer_id=str(original_msg.get("from_id", "") or ""),
draft=text[:160],
generated=out[:160],
)
return out
except Exception as e:
self._activity_log(
"ai_generate_reply_error",
acc_id=str(original_msg.get("acc_id", "") or ""),
customer_id=str(original_msg.get("from_id", "") or ""),
error=str(e),
)
return text
def _colloquialize_outbound_reply(self, text: Any) -> Any:
"""统一外发口语化处理,避免机械话术。"""
if not isinstance(text, str):
@@ -2393,6 +2679,11 @@ class QingjianAPIClient:
except Exception as e:
logger.info(f"[{self.get_time()}] 企微启动消息模块加载失败: {e}")
# 未回复会话补偿(可关闭)
if os.getenv("UNREPLIED_FOLLOWUP_ENABLED", "true").lower() in ("1", "true", "yes"):
tasks.append(self._unreplied_followup_loop())
logger.info(f"[{self.get_time()}] 未回复会话补偿任务已启动")
await asyncio.gather(*tasks)