From a2119f3b6dbd6026131a79ca4e71f6a7f50c1e3a Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Mon, 9 Mar 2026 14:34:04 +0800 Subject: [PATCH] fix: harden outbound leak guard and title naming --- core/repository.py | 40 +++++++++++++++++++ core/websocket_send_flow.py | 51 +++++++++++++++++++++++-- services/service_auto_image_pipeline.py | 18 ++++++++- 3 files changed, 104 insertions(+), 5 deletions(-) diff --git a/core/repository.py b/core/repository.py index 52f584e..11e17f3 100644 --- a/core/repository.py +++ b/core/repository.py @@ -1,5 +1,6 @@ import logging import asyncio +import re from typing import Optional, List, Any from datetime import datetime from db.customer_db import db as customer_db @@ -8,6 +9,43 @@ from db.chat_log_db import log_message, get_conversation logger = logging.getLogger("cs_agent") +_OUTBOUND_BLOCK_MARKERS = ( + "【历史记录摘要】", + "【详细记录】", + "【订单摘要】", + "【订单详情】", + " str: + if not content: + return "" + cleaned = str(content).strip() + if "[转移会话]" in cleaned: + return cleaned + if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS): + logger.warning("[Repository] 拦截到内部内容写入外发记录,替换为安全兜底回复") + return "我在帮你看记录,稍等哈" + for pattern in _HISTORY_LEAK_PATTERNS: + if re.search(pattern, cleaned): + logger.warning(f"[Repository] 检测到历史记录泄露模式,拦截出站入库: {pattern[:30]}...") + return "我在帮你看记录,稍等哈" + return cleaned + class DataRepository: """ 异步数据仓库:使用 asyncio.to_thread 屏蔽底层同步 IO 阻塞。 @@ -29,6 +67,8 @@ class DataRepository: msg_type: int = 0, ): """异步持久化存储聊天记录""" + if direction == "out" and int(msg_type or 0) == 0: + content = _sanitize_outbound_archive_text(content) # 将图片URL列表转为\n分隔的字符串 urls_str = "\n".join(image_urls) if image_urls else "" return await asyncio.to_thread( diff --git a/core/websocket_send_flow.py b/core/websocket_send_flow.py index f408376..e352b38 100644 --- a/core/websocket_send_flow.py +++ b/core/websocket_send_flow.py @@ -1,13 +1,54 @@ import json +import logging +import re import websockets +logger = logging.getLogger("cs_agent") + +_OUTBOUND_BLOCK_MARKERS = ( + "【历史记录摘要】", + "【详细记录】", + "【订单摘要】", + "【订单详情】", + " str: + if not content: + return "" + cleaned = str(content).strip() + if "[转移会话]" in cleaned: + return cleaned + if any(marker in cleaned for marker in _OUTBOUND_BLOCK_MARKERS): + logger.warning("[WebSocketSend] 拦截到内部内容外发,替换为安全兜底回复") + return "我在帮你看记录,稍等哈" + for pattern in _HISTORY_LEAK_PATTERNS: + if re.search(pattern, cleaned): + logger.warning(f"[WebSocketSend] 检测到历史记录泄露模式: {pattern[:30]}...") + return "我在帮你看记录,稍等哈" + return cleaned + async def send_text_flow(client, cy_id, acc_type, content): """主动发送文本消息。""" message = { "msg_id": "", "acc_id": "", - "msg": content, + "msg": _sanitize_outbound_text(content), "from_id": client.reply_id, "from_name": client.reply_id, "cy_id": cy_id, @@ -38,10 +79,12 @@ async def send_message_flow(client, message): """发送消息到服务器。""" if client.websocket and client.websocket.state == websockets.protocol.State.OPEN: try: - payload = message if isinstance(message, dict) else {} - msg_json = json.dumps(message, ensure_ascii=False) + payload = dict(message) if isinstance(message, dict) else {} + if int(payload.get("msg_type", 0) or 0) == 0: + payload["msg"] = _sanitize_outbound_text(payload.get("msg", "")) + msg_json = json.dumps(payload, ensure_ascii=False) await client.websocket.send(msg_json) - pretty = json.dumps(message, ensure_ascii=False, indent=2) + pretty = json.dumps(payload, ensure_ascii=False, indent=2) client.logger.info(f"[{client.get_time()}] 发送成功:\n{pretty}") client._activity_log( "send_message_success", diff --git a/services/service_auto_image_pipeline.py b/services/service_auto_image_pipeline.py index d131301..e7878fd 100644 --- a/services/service_auto_image_pipeline.py +++ b/services/service_auto_image_pipeline.py @@ -77,6 +77,22 @@ def _build_processing_prompt(intent: str, requirement_text: str, analysis: Dict) return f"根据客户需求“{req or '找原图'}”,严格参考原图元素与构图,生成完整干净的高质量素材图。" +def _build_upload_title(intent: str, analysis: Dict, requirement_text: str, idx: int) -> str: + analysis = analysis or {} + subject = _safe_name(str(analysis.get("subject") or ""), "") + proc_type = _safe_name(str(analysis.get("proc_type") or ""), "") + requirement = _safe_name(str(requirement_text or ""), "") + action = "修复" if intent == "repair" else "原图" + + parts = [part for part in (subject, proc_type, requirement) if part] + if parts: + base = "_".join(parts[:2]) + else: + base = "图片识别结果" + + return f"{base}_{action}_{idx}" + + class AutoImagePipelineService: def __init__(self): self.customer_db = CustomerDatabase() @@ -244,7 +260,7 @@ class AutoImagePipelineService: digest = hashlib.md5(f"{customer_id}|{acc_id}|{image_url}".encode("utf-8")).hexdigest()[:10] input_path = pipeline_root / f"{digest}_src{_suffix_from_url(image_url)}" output_path = pipeline_root / f"{digest}_out.png" - title = f"{_safe_name(customer_id, '客户')}_{'修复' if intent == 'repair' else '原图'}_{idx}" + title = _build_upload_title(intent, analysis, requirement_text, idx) prompt = _build_processing_prompt(intent, requirement_text, analysis) task_id = task_db.add_task( customer_id=customer_id,