fix: harden image handling and update docs

This commit is contained in:
2026-03-08 13:20:18 +08:00
parent 2e3409d8c5
commit fddd879ba0
8 changed files with 180 additions and 37 deletions

View File

@@ -16,7 +16,7 @@ Orchestrator (防抖/去重/冷却/路由)
CustomerServiceBrain (PydanticAI Agent) CustomerServiceBrain (PydanticAI Agent)
├── lookup_chat_history_tool → 查询历史记录 ├── lookup_chat_history_tool → 查询历史记录
├── transfer_to_human_tool → 转接设计师 ├── transfer_to_human_tool → 转接设计师
└── check_order_status_tool → 订单查询 └── lookup_customer_orders_tool → 订单查询
QianniuAdapter → WebSocket → 回复客户 QianniuAdapter → WebSocket → 回复客户
``` ```
@@ -30,10 +30,13 @@ QianniuAdapter → WebSocket → 回复客户
| 智能接待 | 自动引导发图、问需求、转接设计师 | | 智能接待 | 自动引导发图、问需求、转接设计师 |
| 历史记忆 | AI 可调用工具查询完整聊天历史,避免重复提问 | | 历史记忆 | AI 可调用工具查询完整聊天历史,避免重复提问 |
| 自动转接 | 收到图片+需求后自动派单给在线设计师 | | 自动转接 | 收到图片+需求后自动派单给在线设计师 |
| 待转接池 | 设计师不在线时先入队,上线后自动补转接 |
| 转接冷却 | 转接后 120 秒内不再调用 AI直接安抚 | | 转接冷却 | 转接后 120 秒内不再调用 AI直接安抚 |
| 情绪识别 | 客户愤怒/投诉时自动转人工 | | 情绪识别 | 客户愤怒/投诉时自动转人工 |
| 消息防抖 | 合并短时间内的多条消息,避免重复回复 | | 消息防抖 | 合并短时间内的多条消息,避免重复回复 |
| 订单静默 | 订单通知/SKU 信息自动入库,不触发 AI | | 订单静默 | 订单通知/SKU 信息自动入库,不触发 AI |
| 图片兜底 | 识别 `msg_type=1` 图片包;即使无文本也不会被当心跳丢弃 |
| 出站防泄露 | 拦截 `<think>`、工具原文、历史摘要、订单摘要等内部内容 |
| 时段感知 | 根据时间区分"没上班"/"下班了"/"暂时不在" | | 时段感知 | 根据时间区分"没上班"/"下班了"/"暂时不在" |
| 图片分析 | 后台调用 Gemini 分析图片复杂度 | | 图片分析 | 后台调用 Gemini 分析图片复杂度 |
| 日报统计 | 每日自动生成客服数据报告 | | 日报统计 | 每日自动生成客服数据报告 |
@@ -95,6 +98,7 @@ curl http://localhost:6060/api/health
│ ├── chat_log_db.py # 聊天记录SQLite/MySQL │ ├── chat_log_db.py # 聊天记录SQLite/MySQL
│ ├── customer_db.py # 客户档案 │ ├── customer_db.py # 客户档案
│ ├── image_tasks_db.py # 图片任务 │ ├── image_tasks_db.py # 图片任务
│ ├── pending_transfer_db.py # 待转接队列(本地 SQLite
│ └── task_db/ # 任务模型 │ └── task_db/ # 任务模型
├── services/ ├── services/
│ ├── dispatch_service.py # 设计师派单 │ ├── dispatch_service.py # 设计师派单
@@ -125,6 +129,8 @@ curl http://localhost:6060/api/health
| `OPENAI_MODEL` | 对话模型 | | `OPENAI_MODEL` | 对话模型 |
| `DB_TYPE` | 数据库类型(`sqlite` / `mysql` | | `DB_TYPE` | 数据库类型(`sqlite` / `mysql` |
| `MYSQL_HOST/PORT/USER/PASSWORD/DATABASE` | MySQL 连接信息 | | `MYSQL_HOST/PORT/USER/PASSWORD/DATABASE` | MySQL 连接信息 |
| `MYSQL_POOL_SIZE` | MySQL 连接池大小,默认 `10` |
| `MYSQL_POOL_WAIT_TIMEOUT` | 连接池等待超时(秒),默认 `10` |
| `WECHAT_WEBHOOK` | 企业微信通知 Webhook | | `WECHAT_WEBHOOK` | 企业微信通知 Webhook |
| `MESSAGE_DEBOUNCE_SECONDS` | 消息防抖时间(秒) | | `MESSAGE_DEBOUNCE_SECONDS` | 消息防抖时间(秒) |
| `DISPATCH_BASE_URL` | 派单服务地址 | | `DISPATCH_BASE_URL` | 派单服务地址 |
@@ -136,11 +142,23 @@ curl http://localhost:6060/api/health
## 消息处理流程 ## 消息处理流程
1. **WebSocket 接收** → 千牛原始消息 1. **WebSocket 接收** → 千牛原始消息
2. **适配器转换**`StandardMessage`(统一格式) 2. **适配器转换**`StandardMessage`(统一格式,识别 `msg_type`、递归提取图片 URL
3. **Orchestrator 过滤** → 订单/SKU 静默入库、心跳过滤、商家回复入库 3. **图片兜底** → 纯图片包即使无文本/无直出 URL也会标记为“已收到图片消息”
4. **防抖合并** → 2 秒窗口内多条消息合并为一条 4. **Orchestrator 过滤** → 订单/SKU 静默入库、心跳过滤、商家回复入库
5. **冷却检查**转接后 120 秒内直接安抚,不调 AI 5. **防抖合并**2 秒窗口内多条消息合并为一条
6. **AI 思考**PydanticAI Agent 调用工具、生成回复 6. **冷却检查**转接后 120 秒内直接安抚,不调 AI
7. **转接截获**工具返回转接指令时直接发送,不经 AI 二次加工 7. **AI 思考**PydanticAI Agent 调用工具、生成回复
8. **乱码清理**过滤 `<think>`、内部标记等泄露内容 8. **转接截获**工具返回转接指令时直接发送,不经 AI 二次加工
9. **发送回复**通过 WebSocket 回复客户,同时入库 9. **待转接入池**设计师不在线时记录原因,进入待转接队列,稍后自动补转
10. **出站安全过滤** → 过滤 `<think>`、历史摘要、订单摘要、工具原文、时间戳转述历史
11. **发送回复** → 通过 WebSocket 回复客户,同时入库
---
## 运行注意
- `db/pending_transfer.db` 是待转接池的本地 SQLite 数据文件,用于暂存“设计师不在线”的转接请求。
- `db/chat_log_db/chats.db` 是 SQLite 模式下的本地聊天库;当 `DB_TYPE=mysql` 时,线上主库仍然是 MySQL。
- 上面两个 `.db` 文件都属于运行时数据,不建议提交到 Git。
- 当前待转接池是单机本地队列;如果未来部署成多台应用机,需要改成共享存储,否则无法跨机器补转接。
- 出站消息在 Brain、Orchestrator、QianniuAdapter 三层都会做防泄露清洗;如果线上仍出现历史摘要外发,优先确认服务是否已经重启到最新代码。

View File

@@ -2,7 +2,7 @@ import re
import logging import logging
import json import json
from pathlib import Path from pathlib import Path
from typing import List, Tuple from typing import List, Tuple, Any
from core.adapters.base import BaseAdapter from core.adapters.base import BaseAdapter
from core.schema import StandardMessage, StandardResponse from core.schema import StandardMessage, StandardResponse
@@ -78,6 +78,10 @@ class QianniuAdapter(BaseAdapter):
acc_id = str(raw.get("acc_id") or raw.get("shop_id") or "") acc_id = str(raw.get("acc_id") or raw.get("shop_id") or "")
from_id = str(raw.get("from_id") or raw.get("cy_id") or "") from_id = str(raw.get("from_id") or raw.get("cy_id") or "")
msg_text = str(raw.get("msg") or raw.get("content") or "") msg_text = str(raw.get("msg") or raw.get("content") or "")
raw_msg_type = self._safe_int(raw.get("msg_type"), 0)
image_urls = self._extract_inbound_image_urls(raw, msg_text)
if raw_msg_type == 1 and not msg_text.strip():
msg_text = "【系统:已收到图片消息】"
# 判断方向:如果 from_id 包含了店铺名或 acc_id通常说明是商家自己在说话 # 判断方向:如果 from_id 包含了店铺名或 acc_id通常说明是商家自己在说话
# 或者逆向接口通常有一个特定的标识,这里我们做一个通用的逻辑判断 # 或者逆向接口通常有一个特定的标识,这里我们做一个通用的逻辑判断
@@ -96,7 +100,8 @@ class QianniuAdapter(BaseAdapter):
user_id=user_id, user_id=user_id,
user_name=str(raw.get("from_name", "")), user_name=str(raw.get("from_name", "")),
content=msg_text, content=msg_text,
image_urls=self._extract_urls(msg_text), msg_type=raw_msg_type,
image_urls=image_urls,
acc_id=acc_id, acc_id=acc_id,
acc_type=str(raw.get("acc_type") or "AliWorkbench"), acc_type=str(raw.get("acc_type") or "AliWorkbench"),
raw_data=raw raw_data=raw
@@ -135,3 +140,52 @@ class QianniuAdapter(BaseAdapter):
image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp") image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp")
candidates = re.findall(r'https?://[^\s#]+', text) candidates = re.findall(r'https?://[^\s#]+', text)
return [u for u in candidates if any(ext in u.lower() for ext in image_exts)] return [u for u in candidates if any(ext in u.lower() for ext in image_exts)]
@staticmethod
def _safe_int(value: Any, default: int = 0) -> int:
try:
return int(value)
except Exception:
return default
def _extract_inbound_image_urls(self, raw: dict, msg_text: str) -> List[str]:
urls = []
seen = set()
def add_url(url: str):
if not url:
return
s = str(url).strip()
if not s or s in seen:
return
if self._extract_urls(s):
seen.add(s)
urls.append(s)
for url in self._extract_urls(msg_text):
add_url(url)
for url in self._find_image_urls_in_obj(raw):
add_url(url)
return urls
def _find_image_urls_in_obj(self, obj: Any) -> List[str]:
found: List[str] = []
def walk(val: Any):
if val is None:
return
if isinstance(val, str):
found.extend(self._extract_urls(val))
return
if isinstance(val, dict):
for item in val.values():
walk(item)
return
if isinstance(val, (list, tuple, set)):
for item in val:
walk(item)
walk(obj)
return found

View File

@@ -121,7 +121,9 @@ async def lookup_chat_history_tool(
line = f"[{ts}] {role}{msg}" line = f"[{ts}] {role}{msg}"
lines.append(line) lines.append(line)
if r["direction"] == "in": if r["direction"] == "in":
if "已收到" in msg and "" in msg: msg_type = int(r.get("msg_type") or 0)
image_urls = str(r.get("image_urls", "") or "").strip()
if msg_type == 1 or image_urls or ("已收到" in msg and "" in msg):
has_images = True has_images = True
if any(k in msg for k in ["找原图", "修复", "高清", "去背景", "抠图", "做衣服", "打印"]): if any(k in msg for k in ["找原图", "修复", "高清", "去背景", "抠图", "做衣服", "打印"]):
customer_needs.append(msg[:60]) customer_needs.append(msg[:60])

View File

@@ -148,7 +148,14 @@ class SystemOrchestrator:
if is_order or is_price_only or is_sku_only or is_sku_amount: if is_order or is_price_only or is_sku_only or is_sku_amount:
await self._handle_order_packet(platform, std_msg) await self._handle_order_packet(platform, std_msg)
logger.info(f"[订单消息] user={user_id} acc={std_msg.acc_id} 已入库更新状态") logger.info(f"[订单消息] user={user_id} acc={std_msg.acc_id} 已入库更新状态")
await repo.save_chat(platform, user_id, msg_text, "in", acc_id=std_msg.acc_id) await repo.save_chat(
platform,
user_id,
msg_text,
"in",
acc_id=std_msg.acc_id,
msg_type=std_msg.msg_type,
)
return return
preview = (std_msg.content or "").replace("\n", "\\n") preview = (std_msg.content or "").replace("\n", "\\n")
@@ -159,12 +166,20 @@ class SystemOrchestrator:
f"type={std_msg.msg_type} images={len(std_msg.image_urls)} content={preview}" f"type={std_msg.msg_type} images={len(std_msg.image_urls)} content={preview}"
) )
# 过滤心跳 # 过滤心跳;图片消息即使暂时没拿到 URL也不能直接丢掉
if not (std_msg.content or "").strip() and not std_msg.image_urls: return if std_msg.msg_type != 1 and not (std_msg.content or "").strip() and not std_msg.image_urls:
return
# 如果是商家人工回复,静默入库 # 如果是商家人工回复,静默入库
if direction == "out": if direction == "out":
await repo.save_chat(platform, user_id, std_msg.content, "out", acc_id=std_msg.acc_id) await repo.save_chat(
platform,
user_id,
std_msg.content,
"out",
acc_id=std_msg.acc_id,
msg_type=std_msg.msg_type,
)
return return
# ID 去重 # ID 去重
@@ -326,7 +341,15 @@ class SystemOrchestrator:
db_start = time.time() db_start = time.time()
db_content = combined_content db_content = combined_content
if all_image_urls: db_content = f"【系统:已收到{len(all_image_urls)}张图】\n{combined_content}" if all_image_urls: db_content = f"【系统:已收到{len(all_image_urls)}张图】\n{combined_content}"
await repo.save_chat(platform, user_id, db_content, "in", acc_id=acc_id, image_urls=all_image_urls) await repo.save_chat(
platform,
user_id,
db_content,
"in",
acc_id=acc_id,
image_urls=all_image_urls,
msg_type=final_msg.msg_type,
)
db_elapsed = time.time() - db_start db_elapsed = time.time() - db_start
logger.info(f"[计时] user={user_id} 消息入库: {db_elapsed:.2f}s") logger.info(f"[计时] user={user_id} 消息入库: {db_elapsed:.2f}s")
@@ -376,11 +399,25 @@ class SystemOrchestrator:
metadata={"acc_id": acc_id, "acc_type": acc_type} metadata={"acc_id": acc_id, "acc_type": acc_type}
) )
await self.qianniu_adapter.translate_outbound(greet, user_id) await self.qianniu_adapter.translate_outbound(greet, user_id)
await repo.save_chat(platform, user_id, greet.reply_content, "out", acc_id=acc_id) await repo.save_chat(
platform,
user_id,
greet.reply_content,
"out",
acc_id=acc_id,
msg_type=greet.msg_type,
)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
await self.qianniu_adapter.translate_outbound(std_res, user_id) await self.qianniu_adapter.translate_outbound(std_res, user_id)
await repo.save_chat(platform, user_id, std_res.reply_content, "out", acc_id=acc_id) await repo.save_chat(
platform,
user_id,
std_res.reply_content,
"out",
acc_id=acc_id,
msg_type=std_res.msg_type,
)
if std_res.metadata.get("pending_transfer"): if std_res.metadata.get("pending_transfer"):
reason = str(std_res.metadata.get("pending_transfer_reason") or "").strip() reason = str(std_res.metadata.get("pending_transfer_reason") or "").strip()
@@ -450,10 +487,24 @@ class SystemOrchestrator:
) )
await self.qianniu_adapter.translate_outbound(notify, customer_id) await self.qianniu_adapter.translate_outbound(notify, customer_id)
await repo.save_chat("qianniu", customer_id, notify.reply_content, "out", acc_id=acc_id) await repo.save_chat(
"qianniu",
customer_id,
notify.reply_content,
"out",
acc_id=acc_id,
msg_type=notify.msg_type,
)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
await self.qianniu_adapter.translate_outbound(transfer, customer_id) await self.qianniu_adapter.translate_outbound(transfer, customer_id)
await repo.save_chat("qianniu", customer_id, transfer.reply_content, "out", acc_id=acc_id) await repo.save_chat(
"qianniu",
customer_id,
transfer.reply_content,
"out",
acc_id=acc_id,
msg_type=transfer.msg_type,
)
self._last_transfer_time[f"{customer_id}@{acc_id}"] = time.time() self._last_transfer_time[f"{customer_id}@{acc_id}"] = time.time()
await asyncio.to_thread(complete_pending_transfer, row_id) await asyncio.to_thread(complete_pending_transfer, row_id)

View File

@@ -195,9 +195,13 @@ class CustomerServiceBrain:
user_content = msg.content or "" user_content = msg.content or ""
# 客户已发图:告知 AI 图已收到,引导问需求,但不要直接转接 # 客户已发图:告知 AI 图已收到,引导问需求,但不要直接转接
if msg.image_urls: has_image_message = bool(msg.image_urls) or msg.msg_type == 1
if has_image_message:
image_count = max(len(msg.image_urls), 1)
if user_content.startswith("【系统:已收到图片消息"):
user_content = ""
user_content = ( user_content = (
f"【系统通知:客户已发送 {len(msg.image_urls)} 张图片,图已收到不要再让客户发图。" f"【系统通知:客户已发送 {image_count} 张图片,图已收到不要再让客户发图。"
f"你现在必须先问客户:这张是找原图还是高清修复?有什么具体要求?" f"你现在必须先问客户:这张是找原图还是高清修复?有什么具体要求?"
f"等客户明确回答后才能转接,严禁跳过问需求直接转接!】\n{user_content}" f"等客户明确回答后才能转接,严禁跳过问需求直接转接!】\n{user_content}"
) )

View File

@@ -18,7 +18,16 @@ class DataRepository:
# --- 聊天记录 (异步化) --- # --- 聊天记录 (异步化) ---
async def save_chat(self, platform: str, user_id: str, content: str, direction: str, acc_id: str = "", image_urls: list = None): async def save_chat(
self,
platform: str,
user_id: str,
content: str,
direction: str,
acc_id: str = "",
image_urls: list = None,
msg_type: int = 0,
):
"""异步持久化存储聊天记录""" """异步持久化存储聊天记录"""
# 将图片URL列表转为\n分隔的字符串 # 将图片URL列表转为\n分隔的字符串
urls_str = "\n".join(image_urls) if image_urls else "" urls_str = "\n".join(image_urls) if image_urls else ""
@@ -29,6 +38,7 @@ class DataRepository:
direction=direction, direction=direction,
platform=platform, platform=platform,
acc_id=acc_id, acc_id=acc_id,
msg_type=msg_type,
image_urls=urls_str image_urls=urls_str
) )
@@ -42,6 +52,8 @@ class DataRepository:
{ {
"role": role, "role": role,
"content": r["message"], "content": r["message"],
"msg_type": r.get("msg_type", 0),
"image_urls": r.get("image_urls", ""),
"timestamp": r.get("timestamp", ""), "timestamp": r.get("timestamp", ""),
} }
) )

View File

@@ -38,34 +38,36 @@ async def send_message_flow(client, message):
"""发送消息到服务器。""" """发送消息到服务器。"""
if client.websocket and client.websocket.state == websockets.protocol.State.OPEN: if client.websocket and client.websocket.state == websockets.protocol.State.OPEN:
try: try:
payload = message if isinstance(message, dict) else {}
msg_json = json.dumps(message, ensure_ascii=False) msg_json = json.dumps(message, ensure_ascii=False)
await client.websocket.send(msg_json) await client.websocket.send(msg_json)
pretty = json.dumps(message, ensure_ascii=False, indent=2) pretty = json.dumps(message, ensure_ascii=False, indent=2)
client.logger.info(f"[{client.get_time()}] 发送成功:\n{pretty}") client.logger.info(f"[{client.get_time()}] 发送成功:\n{pretty}")
data = message.get("data", {}) if isinstance(message, dict) else {}
client._activity_log( client._activity_log(
"send_message_success", "send_message_success",
trace_id=message.get("_trace_id", "") if isinstance(message, dict) else "", trace_id=payload.get("_trace_id", ""),
acc_id=data.get("acc_id", ""), acc_id=payload.get("acc_id", ""),
customer_id=data.get("cy_id", ""), customer_id=payload.get("cy_id") or payload.get("from_id", ""),
msg_type=data.get("msg_type", 0), msg_type=payload.get("msg_type", 0),
msg=data.get("msg", ""), msg=payload.get("msg", ""),
) )
except Exception as e: except Exception as e:
client.logger.info(f"[{client.get_time()}] 发送失败: {e}") client.logger.info(f"[{client.get_time()}] 发送失败: {e}")
payload = message if isinstance(message, dict) else {}
client._activity_log( client._activity_log(
"send_message_error", "send_message_error",
trace_id=message.get("_trace_id", ""), trace_id=payload.get("_trace_id", ""),
acc_id=message.get("acc_id", ""), acc_id=payload.get("acc_id", ""),
customer_id=message.get("from_id", ""), customer_id=payload.get("cy_id") or payload.get("from_id", ""),
error=str(e), error=str(e),
) )
else: else:
client.logger.info(f"[{client.get_time()}] 错误: 连接未打开") client.logger.info(f"[{client.get_time()}] 错误: 连接未打开")
payload = message if isinstance(message, dict) else {}
client._activity_log( client._activity_log(
"send_message_skipped", "send_message_skipped",
trace_id=message.get("_trace_id", ""), trace_id=payload.get("_trace_id", ""),
reason="socket_not_open", reason="socket_not_open",
acc_id=message.get("acc_id", ""), acc_id=payload.get("acc_id", ""),
customer_id=message.get("from_id", ""), customer_id=payload.get("cy_id") or payload.get("from_id", ""),
) )

View File

@@ -388,7 +388,7 @@ def get_conversation(customer_id: str, limit: int = 200, acc_id: str = "") -> Li
with _get_conn() as conn: with _get_conn() as conn:
rows = conn.execute(_sql(""" rows = conn.execute(_sql("""
SELECT * FROM ( SELECT * FROM (
SELECT id, direction, message, msg_type, timestamp, acc_id SELECT id, direction, message, msg_type, timestamp, acc_id, image_urls
FROM chat_logs FROM chat_logs
WHERE customer_id = ? WHERE customer_id = ?
ORDER BY timestamp DESC, id DESC ORDER BY timestamp DESC, id DESC