Files
tw2/qingjian_cs/app/agents.py
jimi 4e5557bcc3
Some checks failed
Pre-commit / run (ubuntu-latest) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_en (ubuntu-latest, 3.10) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_zh (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.12) (push) Has been cancelled
perf: fast-route orchestration and short-reply guard for qingjian
2026-03-02 19:12:32 +08:00

193 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import re
import sys
from pathlib import Path
from typing import Any
from .agent_tools import (
tool_detect_external_contact,
tool_detect_intent,
tool_detect_order_status,
tool_detect_risk,
tool_extract_image_urls,
tool_extract_size_pairs,
tool_is_meaningless_short,
)
from .config import AGENT_MAX_ITERS, OPENAI_API_KEY, OPENAI_BASE_URL, OPENAI_MODEL_NAME
from .models import Decision, DecisionModel, RouteModel
from .rules import rules_prompt
def _ensure_agentscope_importable() -> None:
repo_root = Path(__file__).resolve().parents[2]
src_dir = repo_root / "src"
if src_dir.exists() and str(src_dir) not in sys.path:
sys.path.insert(0, str(src_dir))
class _AgentRuntime:
def __init__(self, name: str, sys_prompt: str):
_ensure_agentscope_importable()
from agentscope.agent import ReActAgent
from agentscope.formatter import OpenAIChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.message import Msg
from agentscope.model import OpenAIChatModel
from agentscope.tool import Toolkit
if not OPENAI_API_KEY:
raise RuntimeError("OPENAI_API_KEY 未设置")
self.Msg = Msg
toolkit = Toolkit()
toolkit.register_tool_function(tool_detect_intent)
toolkit.register_tool_function(tool_extract_image_urls)
toolkit.register_tool_function(tool_detect_order_status)
toolkit.register_tool_function(tool_extract_size_pairs)
toolkit.register_tool_function(tool_detect_risk)
toolkit.register_tool_function(tool_detect_external_contact)
toolkit.register_tool_function(tool_is_meaningless_short)
model = OpenAIChatModel(
model_name=OPENAI_MODEL_NAME,
api_key=OPENAI_API_KEY,
stream=False,
client_kwargs={"base_url": OPENAI_BASE_URL},
generate_kwargs={"temperature": 0.1},
)
self.agent = ReActAgent(
name=name,
sys_prompt=sys_prompt,
model=model,
formatter=OpenAIChatFormatter(),
toolkit=toolkit,
memory=InMemoryMemory(),
max_iters=max(1, AGENT_MAX_ITERS),
)
@staticmethod
def _extract_json(text: str) -> dict[str, Any] | None:
m = re.search(r"\{[\s\S]*\}", text or "")
if not m:
return None
try:
return json.loads(m.group(0))
except Exception:
return None
@staticmethod
def _msg_to_text(msg: Any) -> str:
try:
if hasattr(msg, "get_text_content"):
v = msg.get_text_content()
if isinstance(v, str):
return v
except Exception:
pass
c = getattr(msg, "content", None)
if isinstance(c, str):
return c
if isinstance(c, list):
out: list[str] = []
for b in c:
t = getattr(b, "text", None)
if isinstance(t, str) and t.strip():
out.append(t)
return "\n".join(out)
return str(msg)
@staticmethod
def _extract_structured(metadata: dict[str, Any] | None) -> dict[str, Any] | None:
if not isinstance(metadata, dict):
return None
candidates = [metadata, metadata.get("structured_output"), metadata.get("result"), metadata.get("output"), metadata.get("json")]
for obj in candidates:
if isinstance(obj, dict):
return obj
return None
class RouterAgent(_AgentRuntime):
def __init__(self) -> None:
super().__init__(
"RouterAgent",
"你是客服路由Agent。只输出路由不回复客户。必须先调用工具读取意图/风险/订单后再路由。",
)
async def route(self, context: dict[str, Any]) -> tuple[str, str]:
prompt = f"按上下文路由到 pre_sales/quote/after_sales/risk。\n上下文:\n{json.dumps(context, ensure_ascii=False)}"
res = await self.agent(self.Msg("user", prompt, "user"), structured_model=RouteModel)
obj = self._extract_structured(getattr(res, "metadata", None)) or self._extract_json(self._msg_to_text(res)) or {}
route = str(obj.get("route", "pre_sales") or "pre_sales")
if route not in {"pre_sales", "quote", "after_sales", "risk"}:
route = "pre_sales"
return route, str(obj.get("reason", "") or "")
class QuoteAgent(_AgentRuntime):
def __init__(self) -> None:
super().__init__(
"QuoteAgent",
rules_prompt() + "\n你是报价专家Agent。必须结合图片数量、尺寸和订单状态给出报价动作。",
)
async def decide(self, context: dict[str, Any]) -> Decision:
prompt = f"你负责报价相关决策。\n上下文:\n{json.dumps(context, ensure_ascii=False)}"
return await _decide_with_model(self, prompt)
class AfterSalesAgent(_AgentRuntime):
def __init__(self) -> None:
super().__init__(
"AfterSalesAgent",
rules_prompt() + "\n你是售后专家Agent。优先维护售后状态并给出下一步动作。",
)
async def decide(self, context: dict[str, Any]) -> Decision:
prompt = f"你负责售后相关决策。\n上下文:\n{json.dumps(context, ensure_ascii=False)}"
return await _decide_with_model(self, prompt)
class RiskAgent(_AgentRuntime):
def __init__(self) -> None:
super().__init__(
"RiskAgent",
"你是风控Agent。遇到地图政治/黄暴/外联高风险优先给 transfer 或拒绝性 reply。",
)
async def decide(self, context: dict[str, Any]) -> Decision:
prompt = f"你负责风控相关决策。\n上下文:\n{json.dumps(context, ensure_ascii=False)}"
return await _decide_with_model(self, prompt)
class PreSalesAgent(_AgentRuntime):
def __init__(self) -> None:
super().__init__(
"PreSalesAgent",
rules_prompt() + "\n你是售前专家Agent。处理打招呼、询价前引导、收图承接。",
)
async def decide(self, context: dict[str, Any]) -> Decision:
prompt = f"你负责售前相关决策。\n上下文:\n{json.dumps(context, ensure_ascii=False)}"
return await _decide_with_model(self, prompt)
async def _decide_with_model(rt: _AgentRuntime, prompt: str) -> Decision:
res = await rt.agent(rt.Msg("user", prompt, "user"), structured_model=DecisionModel)
obj = rt._extract_structured(getattr(res, "metadata", None)) or rt._extract_json(rt._msg_to_text(res)) or {}
action = str(obj.get("action", "reply") or "reply").strip().lower()
if action not in {"reply", "quote", "transfer", "noop", "update_state"}:
action = "reply"
return Decision(
action=action,
reply=str(obj.get("reply", "") or "").strip(),
transfer_msg=str(obj.get("transfer_msg", "") or "").strip(),
quote_mode=str(obj.get("quote_mode", "") or "").strip(),
state_patch=obj.get("state_patch") if isinstance(obj.get("state_patch"), dict) else {},
reason=str(obj.get("reason", "") or "").strip(),
)