diff --git a/qingjian_cs/app/agents.py b/qingjian_cs/app/agents.py index 8d382f4..4d94f0d 100644 --- a/qingjian_cs/app/agents.py +++ b/qingjian_cs/app/agents.py @@ -15,7 +15,7 @@ from .agent_tools import ( tool_extract_size_pairs, tool_is_meaningless_short, ) -from .config import OPENAI_API_KEY, OPENAI_BASE_URL, OPENAI_MODEL_NAME +from .config import AGENT_MAX_ITERS, OPENAI_API_KEY, OPENAI_BASE_URL, OPENAI_MODEL_NAME from .models import Decision, DecisionModel, RouteModel from .rules import rules_prompt @@ -66,7 +66,7 @@ class _AgentRuntime: formatter=OpenAIChatFormatter(), toolkit=toolkit, memory=InMemoryMemory(), - max_iters=8, + max_iters=max(1, AGENT_MAX_ITERS), ) @staticmethod diff --git a/qingjian_cs/app/client.py b/qingjian_cs/app/client.py index 3cf5d8f..2124355 100644 --- a/qingjian_cs/app/client.py +++ b/qingjian_cs/app/client.py @@ -1,12 +1,18 @@ import asyncio import json +import re import time from collections import defaultdict import websockets from .callbacks import post_tianwang_callback -from .config import AUTO_QUOTE_WAIT_SECONDS, MESSAGE_DEBOUNCE_SECONDS, QINGJIAN_WS_URI +from .config import ( + AUTO_QUOTE_WAIT_SECONDS, + MESSAGE_DEBOUNCE_SECONDS, + QINGJIAN_WS_URI, + SHORT_REPLY_MAX_CHARS, +) from .logger import setup_logger from .observability import activity_event, build_trace_id from .orchestrator import Orchestrator @@ -27,7 +33,7 @@ class QingjianClient: self.pending_images: dict[str, list[str]] = defaultdict(list) self.auto_quote_tasks: dict[str, asyncio.Task] = {} self.last_reply_key: dict[str, str] = {} - self.recent_outbound: dict[str, tuple[str, float]] = {} + self.recent_outbound: list[tuple[str, str, str, float]] = [] @staticmethod def _customer_key(data: dict) -> str: @@ -57,6 +63,7 @@ class QingjianClient: text = str(text or "").strip() if not text: return + text = self._shorten_reply(text) msg = { "msg_id": "", "acc_id": data.get("acc_id", ""), @@ -70,22 +77,47 @@ class QingjianClient: } activity_event(self.logger, "send_reply_attempt", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=text) await self.send_message(msg) - self.recent_outbound[self._customer_key(data)] = (text, time.monotonic()) + self.recent_outbound.append((str(data.get("acc_id", "")), str(data.get("from_id", "")), text, time.monotonic())) + if len(self.recent_outbound) > 200: + self.recent_outbound = self.recent_outbound[-200:] activity_event(self.logger, "send_reply_success", trace_id=trace_id, customer_id=data.get("from_id", "-"), msg=text) + @staticmethod + def _clean_text(text: str) -> str: + t = str(text or "").strip() + t = re.sub(r"\s+", "", t) + return t + + def _shorten_reply(self, text: str) -> str: + max_len = max(8, int(SHORT_REPLY_MAX_CHARS)) + t = str(text or "").strip() + if len(t) <= max_len: + return t + parts = re.split(r"[。!?!?]", t) + head = next((p.strip() for p in parts if p and p.strip()), t) + if len(head) > max_len: + head = head[:max_len].rstrip(",,;;:: ") + return head or t[:max_len] + def _is_outbound_echo(self, data: dict, msg: str) -> bool: """ 轻简可能会把我方刚发送文本回推为“收到消息”。 - 对同 customer_key 的“短时间完全相同文本”做回环拦截,避免无限对话。 + 对“短时间完全相同文本”做回环拦截,兼容 acc/from 对调回推,避免无限对话。 """ - key = self._customer_key(data) - last = self.recent_outbound.get(key) - if not last: + in_acc = str(data.get("acc_id", "")) + in_from = str(data.get("from_id", "")) + in_msg = self._clean_text(msg) + now = time.monotonic() + if not in_msg: return False - last_msg, ts = last - if (time.monotonic() - ts) > 120: - return False - return str(msg or "").strip() == last_msg + for out_acc, out_to, out_msg, ts in reversed(self.recent_outbound): + if (now - ts) > 120: + break + if self._clean_text(out_msg) != in_msg: + continue + if (out_acc == in_acc and out_to == in_from) or (out_acc == in_from and out_to == in_acc): + return True + return False async def _handle_decision(self, data: dict, merged_msg: str, *, auto_quote: bool = False) -> None: key = self._customer_key(data) diff --git a/qingjian_cs/app/config.py b/qingjian_cs/app/config.py index f2bbc0d..a69e1f2 100644 --- a/qingjian_cs/app/config.py +++ b/qingjian_cs/app/config.py @@ -14,6 +14,9 @@ OPENAI_MODEL_NAME = os.getenv("OPENAI_MODEL_NAME", "doubao-seed-2-0-pro-260215") MESSAGE_DEBOUNCE_SECONDS = int(os.getenv("MESSAGE_DEBOUNCE_SECONDS", "6")) AUTO_QUOTE_WAIT_SECONDS = int(os.getenv("AUTO_QUOTE_WAIT_SECONDS", "18")) +AGENT_MAX_ITERS = int(os.getenv("AGENT_MAX_ITERS", "3")) +FAST_ROUTE_ENABLED = os.getenv("FAST_ROUTE_ENABLED", "1").strip() in {"1", "true", "True", "yes", "on"} +SHORT_REPLY_MAX_CHARS = int(os.getenv("SHORT_REPLY_MAX_CHARS", "28")) STORE_BACKEND = os.getenv("STORE_BACKEND", "sqlite").strip().lower() STORE_SQLITE_PATH = os.getenv("STORE_SQLITE_PATH", "").strip() diff --git a/qingjian_cs/app/logger.py b/qingjian_cs/app/logger.py index a22ccbf..4b91922 100644 --- a/qingjian_cs/app/logger.py +++ b/qingjian_cs/app/logger.py @@ -16,5 +16,7 @@ def setup_logger() -> logging.Logger: logging.getLogger("agentscope").setLevel(logging.ERROR) logging.getLogger("agentscope.formatter").setLevel(logging.ERROR) logging.getLogger("agentscope.agent").setLevel(logging.ERROR) + logging.getLogger("_openai_formatter").setLevel(logging.ERROR) + logging.getLogger("_react_agent").setLevel(logging.ERROR) return logger diff --git a/qingjian_cs/app/orchestrator.py b/qingjian_cs/app/orchestrator.py index a1a8d69..3122ead 100644 --- a/qingjian_cs/app/orchestrator.py +++ b/qingjian_cs/app/orchestrator.py @@ -3,8 +3,15 @@ from __future__ import annotations from typing import Any from .agents import AfterSalesAgent, PreSalesAgent, QuoteAgent, RiskAgent, RouterAgent +from .config import FAST_ROUTE_ENABLED from .models import Decision -from .rules import detect_intent, detect_order_status +from .rules import ( + detect_intent, + detect_order_status, + has_map_or_political_risk, + has_porn_risk, + requests_external_contact, +) from .state_machine import evolve_after_sales_state, migrate_state_schema from .store import ConversationStore @@ -35,9 +42,13 @@ class Orchestrator: "order_status": order_status, } - # 先风控 - risk_decision = await self.risk.decide(merged_ctx) - if risk_decision.action in {"transfer"}: + msg = str(context.get("msg", "") or "") + goods_name = str(context.get("goods_name", "") or "") + risk_hit = has_map_or_political_risk(msg, goods_name) or has_porn_risk(msg) or requests_external_contact(msg) + + # 命中硬风控才调用 RiskAgent,避免每条消息都先走一轮模型。 + if risk_hit: + risk_decision = await self.risk.decide(merged_ctx) route = "risk" new_state = evolve_after_sales_state( {**prev_state, **(risk_decision.state_patch or {})}, @@ -51,7 +62,23 @@ class Orchestrator: self.store.append_event(customer_key, "decision", {"route": route, "action": risk_decision.action, "reason": risk_decision.reason}) return route, risk_decision, new_state - route, route_reason = await self.router.route(merged_ctx) + route = "" + route_reason = "" + if FAST_ROUTE_ENABLED: + pending_images = int(context.get("pending_images", 0) or 0) + auto_quote_trigger = bool(context.get("auto_quote_trigger", False)) + if intent in {"pricing", "finish_or_quote_trigger"} and (pending_images > 0 or auto_quote_trigger): + route = "quote" + route_reason = "fast_route_quote_with_pending_images" + elif order_status == "refund": + route = "after_sales" + route_reason = "fast_route_refund" + elif intent in {"image", "greeting", "nonsense", "pricing", "finish_or_quote_trigger", "unknown"}: + route = "pre_sales" + route_reason = "fast_route_common_presales" + + if not route: + route, route_reason = await self.router.route(merged_ctx) if route == "quote": decision = await self.quote.decide(merged_ctx) diff --git a/qingjian_cs/app/rules.py b/qingjian_cs/app/rules.py index b21e395..11e07af 100644 --- a/qingjian_cs/app/rules.py +++ b/qingjian_cs/app/rules.py @@ -127,6 +127,7 @@ def rules_prompt() -> str: "10) 尺寸明显超大(如>=2m*2m): 提示需补图/重做边缘, 不要直接承诺一模一样。\n" "11) 店铺差异化: 按 acc_id/persona 口吻回复, 保持真人聊天。\n" "12) 最终输出只允许一个动作, 不能混合。\n" + "13) reply 必须简短: 优先 1 句, 一般不超过 28 个汉字, 禁止长段解释。\n" "输出格式:\n" '{"action":"reply|quote|transfer|noop","reply":"","transfer_msg":"","quote_mode":"flush_pending|analyze_current_or_recent|collect_only","reason":""}' )