Compare commits
2 Commits
684409686d
...
b5153048c4
| Author | SHA1 | Date | |
|---|---|---|---|
| b5153048c4 | |||
| 4022ed8f7a |
@@ -2,11 +2,28 @@
|
||||
"shops": {
|
||||
"tb2801080146": {
|
||||
"type": "gemini_api",
|
||||
"hint": "【店铺类型】Gemini API 账号。客户问账号/pro/续费/没pro时,按API客服回复:续费/充值/套餐说明。"
|
||||
"hint": "【店铺类型】Gemini API 账号。客户问账号/pro/续费/没pro时,按API客服回复:续费/充值/套餐说明。",
|
||||
"persona": "技术型客服,表达清晰,回答直接,少废话,优先给可执行步骤和结论。"
|
||||
},
|
||||
"小威哥1216": {
|
||||
"type": "find_image",
|
||||
"hint": "【店铺类型】找原图/修图。"
|
||||
"hint": "【店铺类型】找原图/修图。",
|
||||
"persona": "修图老店主语气,接地气,像微信聊天,先承接再推进成交,不端着。"
|
||||
},
|
||||
"小威哥1216:媚媚": {
|
||||
"type": "find_image",
|
||||
"hint": "【店铺类型】找原图/修图。",
|
||||
"persona": "女性店主口吻,亲切利落,话短不硬,先确认需求再自然推进下单。"
|
||||
},
|
||||
"tb7518056865:小林": {
|
||||
"type": "find_image",
|
||||
"hint": "【店铺类型】找原图/修图。",
|
||||
"persona": "效率型客服,回复简短干脆,先给结论再补一句说明,避免重复。"
|
||||
},
|
||||
"tb637530900564:小威威": {
|
||||
"type": "find_image",
|
||||
"hint": "【店铺类型】找原图/修图。",
|
||||
"persona": "熟客店主口吻,轻松自然,像真人在店里接单聊天,避免模板句。"
|
||||
}
|
||||
},
|
||||
"goods_keywords": {
|
||||
@@ -23,5 +40,10 @@
|
||||
"gemini_api": "【店铺类型】Gemini API 账号。客户问账号/pro/续费/没pro时,按API客服回复:续费/充值/套餐说明,自然回复。",
|
||||
"find_image": "【店铺类型】找原图/修图。"
|
||||
},
|
||||
"_comment": "新增店铺:在 shops 加 acc_id。新增商品类型:在 goods_keywords 加关键词→类型。"
|
||||
"type_personas": {
|
||||
"gemini_api": "技术支持型客服,回答准确直给,先结论后解释,避免口水话。",
|
||||
"find_image": "淘宝修图店主语气,口语自然,有温度但不啰嗦,先承接再推进成交。"
|
||||
},
|
||||
"default_persona": "淘宝老店主,说话自然,像真人微信聊天,不官腔、不背模板。",
|
||||
"_comment": "新增店铺:在 shops 加 acc_id。可选 persona 设置店铺人设;未设置则走 type_personas/default_persona。"
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ def build_prompt(
|
||||
state: Any,
|
||||
extract_image_url: Callable[[str], str],
|
||||
shop_type_resolver: Callable[[str, str], str],
|
||||
shop_persona_resolver: Callable[[str, str], str],
|
||||
parse_order_info: Callable[[str], dict[str, str]],
|
||||
build_order_instruction: Callable[[str, str], str],
|
||||
) -> str:
|
||||
@@ -58,6 +59,7 @@ def build_prompt(
|
||||
stage_info += f"\n【客户压价次数】{state.discount_count}"
|
||||
|
||||
shop_type = shop_type_resolver(message.acc_id or "", message.goods_name or "")
|
||||
shop_persona = shop_persona_resolver(message.acc_id or "", message.goods_name or "")
|
||||
shop_hint = ""
|
||||
try:
|
||||
from config.config import CONFIG_DIR
|
||||
@@ -84,6 +86,8 @@ def build_prompt(
|
||||
prompt += f"商品名称: {message.goods_name}\n"
|
||||
if shop_hint:
|
||||
prompt += f"\n{shop_hint}\n"
|
||||
if shop_persona:
|
||||
prompt += f"\n【店铺人设】{shop_persona}\n"
|
||||
|
||||
order_paid = False
|
||||
order_unpaid = False
|
||||
|
||||
@@ -234,6 +234,37 @@ def _get_shop_type(acc_id: str = "", goods_name: str = "") -> str:
|
||||
return "find_image"
|
||||
|
||||
|
||||
def _get_shop_persona(acc_id: str = "", goods_name: str = "") -> str:
|
||||
"""按店铺返回人设描述,优先级:shops.persona > type_personas > default_persona。"""
|
||||
default_persona = "淘宝老店主,说话自然,像真人微信聊天,不官腔、不背模板。"
|
||||
try:
|
||||
from config.config import CONFIG_DIR
|
||||
import json
|
||||
|
||||
cfg_path = CONFIG_DIR / "shop_prompts.json"
|
||||
if not cfg_path.exists():
|
||||
return default_persona
|
||||
with open(cfg_path, "r", encoding="utf-8") as f:
|
||||
cfg = json.load(f)
|
||||
|
||||
shops = cfg.get("shops", {})
|
||||
if acc_id and acc_id in shops:
|
||||
persona = str(shops[acc_id].get("persona", "")).strip()
|
||||
if persona:
|
||||
return persona
|
||||
|
||||
shop_type = _get_shop_type(acc_id, goods_name)
|
||||
type_personas = cfg.get("type_personas", {})
|
||||
persona = str(type_personas.get(shop_type, "")).strip()
|
||||
if persona:
|
||||
return persona
|
||||
|
||||
cfg_default = str(cfg.get("default_persona", "")).strip()
|
||||
return cfg_default or default_persona
|
||||
except Exception:
|
||||
return default_persona
|
||||
|
||||
|
||||
def load_skill_map(skills_dir: str = "skills") -> Dict[str, str]:
|
||||
"""按技能目录名加载 SKILL.md,返回 {skill_name: content}。"""
|
||||
skill_map: Dict[str, str] = {}
|
||||
@@ -504,9 +535,11 @@ class CustomerServiceAgent:
|
||||
)
|
||||
history = self.message_histories.get(message.from_id, [])
|
||||
pending_req = ";".join((state.pending_requirements or [])[-4:]) or "无"
|
||||
persona = _get_shop_persona(message.acc_id or "", message.goods_name or "")
|
||||
user_prompt = (
|
||||
"请按下面意图生成给客户的自然回复。\n"
|
||||
f"场景: {scene}\n"
|
||||
f"店铺人设: {persona}\n"
|
||||
f"回复意图: {intent_hint}\n"
|
||||
f"客户原话: {message.msg}\n"
|
||||
f"当前已收图片数: {len(state.pending_image_urls)}\n"
|
||||
@@ -551,9 +584,11 @@ class CustomerServiceAgent:
|
||||
)
|
||||
history = self.message_histories.get(message.from_id, [])
|
||||
pending_req = ";".join((state.pending_requirements or [])[-4:]) or "无"
|
||||
persona = _get_shop_persona(message.acc_id or "", message.goods_name or "")
|
||||
prompt = (
|
||||
"请把下面这句客服回复润色成更自然的微信聊天口吻,语义必须保持一致。\n"
|
||||
f"场景: {scene}\n"
|
||||
f"店铺人设: {persona}\n"
|
||||
f"客户原话: {message.msg}\n"
|
||||
f"当前已收图: {len(state.pending_image_urls)}张\n"
|
||||
f"当前需求摘要: {pending_req}\n"
|
||||
@@ -650,6 +685,7 @@ class CustomerServiceAgent:
|
||||
_is_political_inquiry = staticmethod(is_political_inquiry)
|
||||
_is_map_inquiry = staticmethod(is_map_inquiry)
|
||||
_get_shop_type = staticmethod(_get_shop_type)
|
||||
_get_shop_persona = staticmethod(_get_shop_persona)
|
||||
_notify_wechat = staticmethod(_notify_wechat)
|
||||
_notify_wechat_overdue = staticmethod(_notify_wechat_overdue)
|
||||
|
||||
@@ -989,6 +1025,7 @@ class CustomerServiceAgent:
|
||||
state=state,
|
||||
extract_image_url=self._extract_image_url,
|
||||
shop_type_resolver=_get_shop_type,
|
||||
shop_persona_resolver=_get_shop_persona,
|
||||
parse_order_info=parse_order_info,
|
||||
build_order_instruction=build_order_instruction,
|
||||
)
|
||||
|
||||
53
core/websocket_agent_reply_flow.py
Normal file
53
core/websocket_agent_reply_flow.py
Normal file
@@ -0,0 +1,53 @@
|
||||
import logging
|
||||
|
||||
from utils.observability import build_trace_id
|
||||
from core.websocket_brain_flow import decide_brain_action, execute_brain_action
|
||||
|
||||
logger = logging.getLogger("cs_agent")
|
||||
|
||||
|
||||
async def handle_agent_reply_flow(client, data: dict, *, workflow, shop_type_resolver):
|
||||
"""处理单条消息:统一走 Brain 决策 + 执行。"""
|
||||
try:
|
||||
msg_text = client.to_chinese(data.get("msg", ""))
|
||||
customer_id = data.get("from_id", "")
|
||||
trace_id = build_trace_id(data.get("acc_id", ""), customer_id, data.get("msg_id", ""), msg_text[:64])
|
||||
data["_trace_id"] = trace_id
|
||||
shop_type = shop_type_resolver(data.get("acc_id", ""), client.to_chinese(data.get("goods_name", "") or ""))
|
||||
|
||||
customer_msg = client._build_customer_message(data)
|
||||
decision = await decide_brain_action(
|
||||
client,
|
||||
data,
|
||||
customer_msg,
|
||||
trace_id=trace_id,
|
||||
msg_text=msg_text,
|
||||
shop_type=shop_type,
|
||||
)
|
||||
client._activity_log(
|
||||
"brain_decision",
|
||||
trace_id=trace_id,
|
||||
acc_id=data.get("acc_id", ""),
|
||||
customer_id=data.get("from_id", ""),
|
||||
action=decision.action,
|
||||
source=decision.source,
|
||||
should_reply=bool(decision.should_reply),
|
||||
need_transfer=bool(decision.need_transfer),
|
||||
)
|
||||
await execute_brain_action(
|
||||
client,
|
||||
data,
|
||||
decision=decision,
|
||||
trace_id=trace_id,
|
||||
msg_text=msg_text,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Agent 处理失败: %s", e)
|
||||
client._activity_log(
|
||||
"agent_process_error",
|
||||
trace_id=data.get("_trace_id", ""),
|
||||
acc_id=data.get("acc_id", ""),
|
||||
customer_id=data.get("from_id", ""),
|
||||
error=str(e),
|
||||
)
|
||||
100
core/websocket_auto_quote_flow.py
Normal file
100
core/websocket_auto_quote_flow.py
Normal file
@@ -0,0 +1,100 @@
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
|
||||
def cancel_auto_quote_task(client, key: str, reason: str = ""):
|
||||
task = client._auto_quote_tasks.get(key)
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
client._activity_log("auto_quote_cancel", key=key, reason=reason or "unknown")
|
||||
|
||||
|
||||
def build_auto_quote_signature(state: Any) -> str:
|
||||
"""为待报价内容生成稳定签名,用于避免同一批内容反复自动触发。"""
|
||||
urls = list(getattr(state, "pending_image_urls", []) or [])
|
||||
reqs = list(getattr(state, "pending_requirements", []) or [])
|
||||
req_tail = reqs[-6:] if len(reqs) > 6 else reqs
|
||||
return "||".join(urls) + "##" + "||".join(req_tail)
|
||||
|
||||
|
||||
async def schedule_auto_quote(client, data: dict, *, shop_type_resolver):
|
||||
"""
|
||||
智能兜底:客户发图后若长时间不再补充消息,自动触发一次报价,避免会话卡住。
|
||||
"""
|
||||
if not client.enable_agent or not client.agent:
|
||||
return
|
||||
try:
|
||||
shop_type = shop_type_resolver(data.get('acc_id', ''), client.to_chinese(data.get('goods_name', '') or ''))
|
||||
if shop_type != "find_image":
|
||||
return
|
||||
cid = data.get('from_id', '')
|
||||
key = client._customer_key(data)
|
||||
state = client.agent._get_conversation_state(cid)
|
||||
if not state or not getattr(state, "pending_image_urls", None):
|
||||
cancel_auto_quote_task(client, key, reason="no_pending_images")
|
||||
client._auto_quote_done_sig.pop(key, None)
|
||||
return
|
||||
if state.quote_phase not in {"collecting", "waiting_result"}:
|
||||
return
|
||||
current_sig = build_auto_quote_signature(state)
|
||||
if current_sig and client._auto_quote_done_sig.get(key) == current_sig:
|
||||
client._activity_log(
|
||||
"auto_quote_skip_duplicate",
|
||||
key=key,
|
||||
pending_count=len(state.pending_image_urls),
|
||||
)
|
||||
return
|
||||
try:
|
||||
idle_seconds = max(8, int(os.getenv("AUTO_QUOTE_IDLE_SECONDS", "18")))
|
||||
except Exception:
|
||||
idle_seconds = 18
|
||||
|
||||
cancel_auto_quote_task(client, key, reason="reschedule")
|
||||
|
||||
async def _delayed_auto_quote(capture_key: str, capture_data: dict, wait_s: int, capture_sig: str):
|
||||
await asyncio.sleep(wait_s)
|
||||
async with client._get_customer_lock(capture_key):
|
||||
capture_cid = capture_data.get('from_id', '')
|
||||
st = client.agent._get_conversation_state(capture_cid)
|
||||
if not st or not st.pending_image_urls:
|
||||
client._auto_quote_done_sig.pop(capture_key, None)
|
||||
return
|
||||
# 内容变化时,放弃旧触发(会在新一轮消息后重新调度)。
|
||||
if build_auto_quote_signature(st) != capture_sig:
|
||||
return
|
||||
# 标记本批次已自动触发,避免同内容循环“马上报价”。
|
||||
client._auto_quote_done_sig[capture_key] = capture_sig
|
||||
# 直接置为可报价,走内部自动报价入口(不伪造客户语句)。
|
||||
client.agent._mark_quote_ready(st)
|
||||
client.agent._sync_pending_quote_state(capture_cid, st)
|
||||
client._activity_log(
|
||||
"auto_quote_trigger",
|
||||
key=capture_key,
|
||||
pending_count=len(st.pending_image_urls),
|
||||
wait_s=wait_s,
|
||||
)
|
||||
notify_data = dict(capture_data)
|
||||
notify_data["msg_id"] = "auto_quote_idle_trigger"
|
||||
notify_data["msg"] = "__AUTO_QUOTE_INTERNAL_TRIGGER__"
|
||||
notify_msg = client._build_customer_message(notify_data)
|
||||
response = await client.agent.build_auto_quote_reply(st, notify_msg)
|
||||
if response.should_reply and response.reply and not response.need_transfer:
|
||||
await client.send_reply(capture_data, response.reply)
|
||||
client._activity_log(
|
||||
"auto_quote_sent",
|
||||
key=capture_key,
|
||||
reply=response.reply,
|
||||
)
|
||||
|
||||
task = asyncio.create_task(_delayed_auto_quote(key, dict(data), idle_seconds, current_sig))
|
||||
client._auto_quote_tasks[key] = task
|
||||
client._activity_log(
|
||||
"auto_quote_scheduled",
|
||||
key=key,
|
||||
pending_count=len(state.pending_image_urls),
|
||||
phase=state.quote_phase,
|
||||
wait_s=idle_seconds,
|
||||
)
|
||||
except Exception as e:
|
||||
client._activity_log("auto_quote_schedule_error", error=str(e), key=client._customer_key(data))
|
||||
311
core/websocket_brain_flow.py
Normal file
311
core/websocket_brain_flow.py
Normal file
@@ -0,0 +1,311 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger("cs_agent")
|
||||
|
||||
|
||||
@dataclass
|
||||
class BrainDecision:
|
||||
action: str # reply | quote | transfer | noop
|
||||
source: str
|
||||
reply: str = ""
|
||||
transfer_msg: str = ""
|
||||
should_reply: bool = False
|
||||
need_transfer: bool = False
|
||||
payload: dict[str, Any] | None = None
|
||||
|
||||
|
||||
def _extract_json_obj(text: str) -> dict[str, Any] | None:
|
||||
if not text:
|
||||
return None
|
||||
m = re.search(r"\{[\s\S]*\}", text)
|
||||
if not m:
|
||||
return None
|
||||
try:
|
||||
return json.loads(m.group(0))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
async def _ai_policy_brain_decide(client, data: dict, *, msg_text: str, shop_type: str) -> BrainDecision | None:
|
||||
if not client.enable_agent or not client.agent or not client.AgentDeps:
|
||||
return None
|
||||
|
||||
acc_id = str(data.get("acc_id", "") or "")
|
||||
customer_id = str(data.get("from_id", "") or "")
|
||||
current_urls = client._extract_image_urls(msg_text)
|
||||
recent_urls = client._collect_recent_image_urls(customer_id, acc_id, max_count=6)
|
||||
key = client._customer_key(data)
|
||||
pending_urls = client._pending_images.get(key) or []
|
||||
|
||||
try:
|
||||
order_status = client._detect_order_status(msg_text)
|
||||
has_image_url = client._msg_has_image_url(msg_text)
|
||||
refers_images = client._msg_refers_images(msg_text)
|
||||
is_price = client._msg_is_price_inquiry(msg_text)
|
||||
is_req = client._msg_is_requirement(msg_text)
|
||||
ext_contact = client._msg_requests_external_contact(msg_text)
|
||||
except Exception:
|
||||
order_status, has_image_url, refers_images, is_price, is_req, ext_contact = "", False, False, False, False, False
|
||||
|
||||
deps = client.AgentDeps(
|
||||
msg_id=str(data.get("msg_id", "") or "brain_policy"),
|
||||
acc_id=acc_id,
|
||||
from_id=customer_id,
|
||||
platform=str(data.get("acc_type", "") or "AliWorkbench"),
|
||||
)
|
||||
|
||||
prompt = (
|
||||
"你是淘宝客服系统的主决策Brain,只做决策,不要解释。\n"
|
||||
"你必须根据历史规则和当前上下文,输出唯一动作。\n"
|
||||
"可选动作 action: reply / quote / transfer / noop。\n"
|
||||
"历史规则(完整继承):\n"
|
||||
"1) 客户发图/补图:先自然承接,再根据上下文决定继续收集或报价;\n"
|
||||
"2) 客户询价且有可用图片(当前或最近)时,优先 action=quote;\n"
|
||||
"3) 若有 pending 图片且客户催报价/补充需求,优先 quote_mode=flush_pending;\n"
|
||||
"4) 仅打招呼/短无意义文本:可 action=reply 简短承接,不要机械模板;\n"
|
||||
"5) 索要外部联系方式(微信/QQ/手机号)时,不外呼,站内引导;\n"
|
||||
"6) 订单已付款:可回执安排处理;未付款/待付款:提醒完成付款;\n"
|
||||
"7) 地图/政治/高风险内容:谨慎,必要时 transfer 或拒绝性 reply;\n"
|
||||
"8) 尺寸超限/不可做场景:给明确边界,不要胡乱承诺;\n"
|
||||
"9) 客户没发图却问价:先承接,再引导发图;\n"
|
||||
"10) 避免重复外发,避免同一句话反复说。\n"
|
||||
"\n"
|
||||
"quote_mode 可选: flush_pending / analyze_current_or_recent / collect_only\n"
|
||||
"只输出 JSON:\n"
|
||||
'{"action":"reply|quote|transfer|noop","reply":"","transfer_msg":"","quote_mode":"","reason":""}\n\n'
|
||||
f"店铺类型: {shop_type}\n"
|
||||
f"legacy_fast_quote_enabled: {str(bool(client._legacy_fast_quote_enabled)).lower()}\n"
|
||||
f"客户原话: {msg_text}\n"
|
||||
f"has_image_url: {has_image_url}\n"
|
||||
f"current_image_urls_count: {len(current_urls)}\n"
|
||||
f"recent_image_urls_count: {len(recent_urls)}\n"
|
||||
f"pending_image_urls_count: {len(pending_urls)}\n"
|
||||
f"refers_images: {refers_images}\n"
|
||||
f"is_price_inquiry: {is_price}\n"
|
||||
f"is_requirement: {is_req}\n"
|
||||
f"requests_external_contact: {ext_contact}\n"
|
||||
f"order_status: {order_status or 'none'}\n"
|
||||
)
|
||||
|
||||
try:
|
||||
result = await client.agent.agent_natural_reply.run(prompt, deps=deps, message_history=[])
|
||||
raw = str(getattr(result, "output", "") or "").strip()
|
||||
obj = _extract_json_obj(raw)
|
||||
if not obj:
|
||||
client._activity_log(
|
||||
"brain_policy_parse_error",
|
||||
acc_id=acc_id,
|
||||
customer_id=customer_id,
|
||||
raw=raw[:300],
|
||||
)
|
||||
return None
|
||||
|
||||
action = str(obj.get("action", "") or "").strip().lower()
|
||||
reply = str(obj.get("reply", "") or "").strip()
|
||||
transfer_msg = str(obj.get("transfer_msg", "") or "").strip()
|
||||
quote_mode = str(obj.get("quote_mode", "") or "").strip().lower()
|
||||
reason = str(obj.get("reason", "") or "").strip()
|
||||
|
||||
payload: dict[str, Any] | None = None
|
||||
if action == "quote":
|
||||
mode = quote_mode or "analyze_current_or_recent"
|
||||
if mode == "flush_pending":
|
||||
payload = {"mode": "flush_pending", "key": key, "pre_reply": reply}
|
||||
elif mode == "collect_only":
|
||||
payload = {"mode": "collect_only", "pre_reply": reply}
|
||||
else:
|
||||
urls = current_urls or recent_urls
|
||||
payload = {"mode": "analyze_urls", "urls": urls, "pre_reply": reply}
|
||||
|
||||
decision = BrainDecision(
|
||||
action=action if action in {"reply", "quote", "transfer", "noop"} else "noop",
|
||||
source="brain_ai_policy",
|
||||
reply=reply,
|
||||
transfer_msg=transfer_msg,
|
||||
should_reply=bool(reply),
|
||||
need_transfer=(action == "transfer"),
|
||||
payload=payload,
|
||||
)
|
||||
client._activity_log(
|
||||
"brain_policy_raw",
|
||||
acc_id=acc_id,
|
||||
customer_id=customer_id,
|
||||
action=decision.action,
|
||||
quote_mode=quote_mode,
|
||||
reason=reason,
|
||||
)
|
||||
return decision
|
||||
except Exception as e:
|
||||
client._activity_log(
|
||||
"brain_policy_error",
|
||||
acc_id=acc_id,
|
||||
customer_id=customer_id,
|
||||
error=str(e),
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
async def decide_brain_action(client, data: dict, customer_msg, *, trace_id: str, msg_text: str, shop_type: str) -> BrainDecision:
|
||||
"""统一主决策层:优先由 Brain AI 决策;失败时回退 Agent 默认决策。"""
|
||||
ai_decision = await _ai_policy_brain_decide(client, data, msg_text=msg_text, shop_type=shop_type)
|
||||
if ai_decision is not None:
|
||||
return ai_decision
|
||||
|
||||
# 回退:保持可用性
|
||||
logger.info("Agent 正在处理消息...")
|
||||
client._activity_log(
|
||||
"agent_process_start",
|
||||
trace_id=trace_id,
|
||||
acc_id=data.get("acc_id", ""),
|
||||
customer_id=data.get("from_id", ""),
|
||||
msg=msg_text,
|
||||
)
|
||||
response = await client.agent.process_message(customer_msg)
|
||||
client._activity_log(
|
||||
"agent_process_done",
|
||||
trace_id=trace_id,
|
||||
acc_id=data.get("acc_id", ""),
|
||||
customer_id=data.get("from_id", ""),
|
||||
result="ok",
|
||||
should_reply=bool(response.should_reply),
|
||||
need_transfer=bool(response.need_transfer),
|
||||
)
|
||||
if response.need_transfer:
|
||||
return BrainDecision(
|
||||
action="transfer",
|
||||
source="fallback_agent",
|
||||
reply=response.reply or "",
|
||||
transfer_msg=response.transfer_msg or "",
|
||||
should_reply=bool(response.should_reply),
|
||||
need_transfer=True,
|
||||
)
|
||||
if response.should_reply and response.reply:
|
||||
return BrainDecision(
|
||||
action="reply",
|
||||
source="fallback_agent",
|
||||
reply=response.reply,
|
||||
should_reply=True,
|
||||
need_transfer=False,
|
||||
)
|
||||
return BrainDecision(action="noop", source="fallback_agent", should_reply=False, need_transfer=False)
|
||||
|
||||
|
||||
async def execute_brain_action(client, data: dict, *, decision: BrainDecision, trace_id: str, msg_text: str):
|
||||
"""统一执行层:只执行标准动作。"""
|
||||
customer_id = data.get("from_id", "")
|
||||
|
||||
if customer_id:
|
||||
client._touch_customer_last_contact(customer_id)
|
||||
|
||||
if decision.action == "transfer":
|
||||
logger.info("Agent 决定转接人工")
|
||||
client._activity_log(
|
||||
"agent_transfer",
|
||||
trace_id=trace_id,
|
||||
acc_id=data.get("acc_id", ""),
|
||||
customer_id=data.get("from_id", ""),
|
||||
transfer_msg=decision.transfer_msg,
|
||||
)
|
||||
client._fire_and_forget(
|
||||
client._post_tianwang_callback(
|
||||
"message_processed",
|
||||
data,
|
||||
extra={
|
||||
"should_reply": bool(decision.should_reply),
|
||||
"need_transfer": True,
|
||||
"agent_reply": decision.reply or "",
|
||||
"transfer_msg": decision.transfer_msg or "",
|
||||
},
|
||||
)
|
||||
)
|
||||
await client.transfer_to_human(data, decision.transfer_msg)
|
||||
client._push_chat_to_wechat_safe(
|
||||
data=data,
|
||||
customer_msg=msg_text,
|
||||
reply_msg=decision.transfer_msg or "转接",
|
||||
tag="转人工",
|
||||
)
|
||||
return
|
||||
|
||||
if decision.action == "reply":
|
||||
text = (decision.reply or "").strip()
|
||||
if not text:
|
||||
return
|
||||
await asyncio.sleep(0.6)
|
||||
client._activity_log(
|
||||
"agent_reply",
|
||||
trace_id=trace_id,
|
||||
acc_id=data.get("acc_id", ""),
|
||||
customer_id=data.get("from_id", ""),
|
||||
reply=text,
|
||||
)
|
||||
await client.send_reply(data, text)
|
||||
await client._maybe_schedule_auto_quote(data)
|
||||
client._fire_and_forget(
|
||||
client._post_tianwang_callback(
|
||||
"message_processed",
|
||||
data,
|
||||
extra={
|
||||
"should_reply": True,
|
||||
"need_transfer": False,
|
||||
"agent_reply": text,
|
||||
},
|
||||
)
|
||||
)
|
||||
client._push_chat_to_wechat_safe(
|
||||
data=data,
|
||||
customer_msg=msg_text,
|
||||
reply_msg=text,
|
||||
tag="正常AI回复",
|
||||
)
|
||||
return
|
||||
|
||||
if decision.action == "quote":
|
||||
payload = decision.payload or {}
|
||||
pre_reply = str(payload.get("pre_reply", "") or "").strip()
|
||||
if pre_reply:
|
||||
await client.send_reply(data, pre_reply)
|
||||
mode = str(payload.get("mode", "") or "")
|
||||
if mode == "flush_pending":
|
||||
key = str(payload.get("key", "") or "")
|
||||
if key:
|
||||
await client._flush_pending_images(key, data)
|
||||
elif mode == "analyze_urls":
|
||||
urls = payload.get("urls") or []
|
||||
if isinstance(urls, list) and urls:
|
||||
if len(urls) == 1:
|
||||
asyncio.create_task(client._analyze_single_and_reply(data, urls[0]))
|
||||
else:
|
||||
asyncio.create_task(client._analyze_multi_and_reply(data, urls))
|
||||
else:
|
||||
await client.send_reply(data, "你把要处理的图再发我一下,我马上给你看。")
|
||||
else:
|
||||
if not pre_reply:
|
||||
await client.send_reply(data, "收到,我先看一下哈,稍等哈。")
|
||||
return
|
||||
|
||||
# noop
|
||||
client._activity_log(
|
||||
"agent_no_reply",
|
||||
trace_id=trace_id,
|
||||
acc_id=data.get("acc_id", ""),
|
||||
customer_id=data.get("from_id", ""),
|
||||
)
|
||||
client._fire_and_forget(
|
||||
client._post_tianwang_callback(
|
||||
"message_processed",
|
||||
data,
|
||||
extra={
|
||||
"should_reply": False,
|
||||
"need_transfer": False,
|
||||
"agent_reply": "",
|
||||
},
|
||||
)
|
||||
)
|
||||
48
core/websocket_callback_flow.py
Normal file
48
core/websocket_callback_flow.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
|
||||
async def post_tianwang_callback_flow(client, event: str, data: dict, extra: Optional[Dict[str, Any]] = None):
|
||||
"""将消息处理事件回调给天网。"""
|
||||
if not client._tianwang_callback_url:
|
||||
return
|
||||
try:
|
||||
import httpx
|
||||
|
||||
trust_env = os.getenv("TIANWANG_CALLBACK_TRUST_ENV", "false").lower() in ("1", "true", "yes")
|
||||
payload = {
|
||||
"event": event,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"agent_name": client._tianwang_agent_name,
|
||||
"acc_id": str(data.get("acc_id", "") or ""),
|
||||
"customer_id": str(data.get("from_id", "") or ""),
|
||||
"customer_name": client.to_chinese(data.get("from_name", "") or data.get("cy_name", "")),
|
||||
"msg_id": str(data.get("msg_id", "") or ""),
|
||||
"msg_type": int(data.get("msg_type", 0) or 0),
|
||||
"msg": client.to_chinese(data.get("msg", "") or ""),
|
||||
"goods_name": client.to_chinese(data.get("goods_name", "") or ""),
|
||||
"goods_order": client.to_chinese(data.get("goods_order", "") or ""),
|
||||
}
|
||||
if extra:
|
||||
payload.update(extra)
|
||||
async with httpx.AsyncClient(timeout=6, trust_env=trust_env) as http_client:
|
||||
resp = await http_client.post(client._tianwang_callback_url, json=payload)
|
||||
ok = 200 <= resp.status_code < 300
|
||||
client._activity_log(
|
||||
"tianwang_callback",
|
||||
result="ok" if ok else "http_error",
|
||||
event_name=event,
|
||||
status_code=resp.status_code,
|
||||
acc_id=payload["acc_id"],
|
||||
customer_id=payload["customer_id"],
|
||||
)
|
||||
except Exception as e:
|
||||
client._activity_log(
|
||||
"tianwang_callback",
|
||||
result="error",
|
||||
event_name=event,
|
||||
acc_id=str(data.get("acc_id", "") or ""),
|
||||
customer_id=str(data.get("from_id", "") or ""),
|
||||
error=str(e),
|
||||
)
|
||||
File diff suppressed because it is too large
Load Diff
57
core/websocket_connection_flow.py
Normal file
57
core/websocket_connection_flow.py
Normal file
@@ -0,0 +1,57 @@
|
||||
import asyncio
|
||||
import websockets
|
||||
|
||||
|
||||
async def connect_flow(client):
|
||||
"""连接 WebSocket 服务器并自动重连。"""
|
||||
while client.running:
|
||||
try:
|
||||
client.logger.info(f"[{client.get_time()}] 正在连接轻简API {client.uri}...")
|
||||
async with websockets.connect(client.uri) as websocket:
|
||||
client.websocket = websocket
|
||||
from utils.health_check import set_qingjian_connected
|
||||
set_qingjian_connected(True)
|
||||
client.logger.info(f"[{client.get_time()}] 连接成功!")
|
||||
if client.enable_agent:
|
||||
client.logger.info(f"[{client.get_time()}] AI Agent 已启用,将自动处理消息")
|
||||
client.logger.info(f"[{client.get_time()}] 等待接收消息...")
|
||||
|
||||
await client.receive_messages()
|
||||
|
||||
except ConnectionRefusedError:
|
||||
from utils.health_check import set_qingjian_connected
|
||||
set_qingjian_connected(False)
|
||||
client.logger.info(f"[{client.get_time()}] 连接被拒绝,请检查轻简软件是否已启动")
|
||||
except websockets.exceptions.InvalidURI:
|
||||
from utils.health_check import set_qingjian_connected
|
||||
set_qingjian_connected(False)
|
||||
client.logger.info(f"[{client.get_time()}] URI格式错误")
|
||||
except Exception as e:
|
||||
from utils.health_check import set_qingjian_connected
|
||||
set_qingjian_connected(False)
|
||||
client.logger.info(f"[{client.get_time()}] 连接错误: {e}")
|
||||
|
||||
if client.running:
|
||||
client.logger.info(f"[{client.get_time()}] 5秒后尝试重连...")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
|
||||
async def receive_messages_flow(client):
|
||||
"""持续接收消息。"""
|
||||
try:
|
||||
async for message in client.websocket:
|
||||
await client.handle_message(message)
|
||||
except websockets.exceptions.ConnectionClosed:
|
||||
from utils.health_check import set_qingjian_connected
|
||||
set_qingjian_connected(False)
|
||||
client.logger.info(f"[{client.get_time()}] 连接已关闭")
|
||||
except Exception as e:
|
||||
from utils.health_check import set_qingjian_connected
|
||||
set_qingjian_connected(False)
|
||||
client.logger.info(f"[{client.get_time()}] 接收消息错误: {e}")
|
||||
|
||||
|
||||
async def handle_message_flow(client, message, *, shop_type_resolver):
|
||||
from core.websocket_inbound_flow import handle_incoming_message
|
||||
|
||||
await handle_incoming_message(client, message, shop_type_resolver=shop_type_resolver)
|
||||
45
core/websocket_customer_profile_flow.py
Normal file
45
core/websocket_customer_profile_flow.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def extract_and_save_customer_info_flow(client, message: str, customer_id: str, db):
|
||||
"""从消息中提取客户信息并保存。"""
|
||||
if not message or not customer_id:
|
||||
return
|
||||
|
||||
email_pattern = r"[\w\.-]+@[\w\.-]+\.\w+"
|
||||
email_match = re.search(email_pattern, message)
|
||||
if email_match:
|
||||
db.update_email(customer_id, email_match.group())
|
||||
|
||||
phone_pattern = r"1[3-9]\d{9}"
|
||||
phone_match = re.search(phone_pattern, message)
|
||||
if phone_match:
|
||||
db.update_phone(customer_id, phone_match.group())
|
||||
|
||||
wechat_pattern = r"[Vv微信]+号[::]?\s*([\w-]+)"
|
||||
wechat_match = re.search(wechat_pattern, message)
|
||||
if wechat_match:
|
||||
db.update_wechat(customer_id, wechat_match.group(1))
|
||||
|
||||
budget_keywords = ["预算", "不超过", "最多", "便宜点", "便宜"]
|
||||
for keyword in budget_keywords:
|
||||
if keyword in message:
|
||||
db.add_personality_tag(customer_id, "关注价格")
|
||||
break
|
||||
|
||||
personality_keywords = {
|
||||
"爽快": "爽快",
|
||||
"干脆": "爽快",
|
||||
"纠结": "纠结",
|
||||
"墨迹": "纠结",
|
||||
"砍价": "砍价",
|
||||
"贵": "砍价",
|
||||
}
|
||||
for keyword, tag in personality_keywords.items():
|
||||
if keyword in message:
|
||||
db.add_personality_tag(customer_id, tag)
|
||||
|
||||
profile = db.get_customer(customer_id)
|
||||
profile.last_contact = datetime.now().isoformat()
|
||||
db.save_customer(profile)
|
||||
265
core/websocket_debounce_flow.py
Normal file
265
core/websocket_debounce_flow.py
Normal file
@@ -0,0 +1,265 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
import secrets
|
||||
|
||||
logger = logging.getLogger("cs_agent")
|
||||
|
||||
|
||||
async def debounce_agent_reply(client, data: dict):
|
||||
"""
|
||||
消息防抖:同一客户在 _DEBOUNCE_SECONDS 内的连续消息合并后再处理。
|
||||
订单通知、付款相关消息不走防抖,立即处理。
|
||||
"""
|
||||
msg_body = data.get("msg", "")
|
||||
key = f"{data.get('acc_id','')}:{data.get('from_id','')}"
|
||||
client._cancel_auto_quote_task(key, reason="new_inbound")
|
||||
|
||||
# 以下情况跳过防抖,立即处理(后台执行,不阻塞接收循环)
|
||||
immediate_keywords = ["买家已付款", "已付款", "[系统订单信息]"]
|
||||
if any(kw in msg_body for kw in immediate_keywords):
|
||||
client._activity_log(
|
||||
"debounce_bypass_immediate",
|
||||
acc_id=data.get("acc_id", ""),
|
||||
customer_id=data.get("from_id", ""),
|
||||
reason="payment_or_order",
|
||||
msg=msg_body,
|
||||
)
|
||||
client._fire_and_forget(client._agent_reply_serialized(data))
|
||||
return
|
||||
|
||||
# 积攒消息
|
||||
if key not in client._pending_msgs:
|
||||
client._pending_msgs[key] = []
|
||||
client._pending_msgs[key].append(msg_body)
|
||||
client._activity_log(
|
||||
"debounce_enqueue",
|
||||
key=key,
|
||||
queue_size=len(client._pending_msgs[key]),
|
||||
msg=msg_body,
|
||||
)
|
||||
|
||||
# 取消上一个等待任务(如果有)
|
||||
old_task = client._debounce_tasks.get(key)
|
||||
if old_task and not old_task.done():
|
||||
old_task.cancel()
|
||||
|
||||
debounce_seconds = pick_debounce_seconds(client, data, msg_body)
|
||||
|
||||
# 创建新的延迟处理任务
|
||||
async def _delayed(capture_key, capture_data, wait_s: float):
|
||||
await asyncio.sleep(wait_s)
|
||||
msgs = client._pending_msgs.pop(capture_key, [])
|
||||
if not msgs:
|
||||
return
|
||||
if len(msgs) == 1:
|
||||
merged_msg = msgs[0]
|
||||
else:
|
||||
merged_msg = "、".join(m for m in msgs if m.strip())
|
||||
logger.info(f"[{client.get_time()}] 防抖合并 {len(msgs)} 条消息: {merged_msg[:60]}")
|
||||
client._activity_log(
|
||||
"debounce_flush",
|
||||
key=capture_key,
|
||||
merged_count=len(msgs),
|
||||
merged_msg=merged_msg,
|
||||
)
|
||||
merged_data = dict(capture_data)
|
||||
merged_data["msg"] = merged_msg
|
||||
await client._agent_reply_serialized(merged_data)
|
||||
|
||||
task = asyncio.create_task(_delayed(key, data, debounce_seconds))
|
||||
client._debounce_tasks[key] = task
|
||||
|
||||
|
||||
def rand_between(low: float, high: float) -> float:
|
||||
if high <= low:
|
||||
return float(low)
|
||||
# 使用 secrets 增强随机性,避免固定周期导致机械感
|
||||
span = high - low
|
||||
return round(low + span * (secrets.randbelow(1000) / 1000.0), 2)
|
||||
|
||||
|
||||
def guess_intent_for_debounce(client, msg: str) -> str:
|
||||
text = (msg or "").strip()
|
||||
if not text:
|
||||
return "unknown"
|
||||
if msg_has_image_url(text):
|
||||
return "image"
|
||||
try:
|
||||
from utils.intent_analyzer import detect_intent
|
||||
|
||||
decision = detect_intent(text)
|
||||
intent = decision.intent
|
||||
if intent:
|
||||
client._activity_log(
|
||||
"debounce_intent_detected",
|
||||
intent=intent,
|
||||
source=decision.source,
|
||||
score=round(float(decision.score or 0.0), 4),
|
||||
msg=text[:120],
|
||||
)
|
||||
except Exception:
|
||||
intent = ""
|
||||
if intent:
|
||||
return intent
|
||||
lower = text.lower()
|
||||
if any(k in lower for k in ["报价", "多少钱", "价格", "贵", "优惠", "收费", "怎么收费", "咋收费"]):
|
||||
return "询价"
|
||||
if any(k in lower for k in ["做一下", "改一下", "需求", "门头", "上面的字", "处理"]):
|
||||
return "修改"
|
||||
if any(k in lower for k in ["在吗", "你好", "有人"]):
|
||||
return "打招呼"
|
||||
return "unknown"
|
||||
|
||||
|
||||
def looks_like_requirement_text(msg: str) -> bool:
|
||||
text = (msg or "").strip().lower()
|
||||
if not text:
|
||||
return False
|
||||
req_kw = (
|
||||
"做一下",
|
||||
"改一下",
|
||||
"处理一下",
|
||||
"这个字",
|
||||
"上面的字",
|
||||
"门头",
|
||||
"去背景",
|
||||
"抠图",
|
||||
"换色",
|
||||
"调色",
|
||||
"清晰",
|
||||
"高清",
|
||||
"尺寸",
|
||||
"比例",
|
||||
"横版",
|
||||
"竖版",
|
||||
"排版",
|
||||
"改字",
|
||||
"按这个做",
|
||||
"照这个做",
|
||||
"就这张",
|
||||
"看看做",
|
||||
"弄一下",
|
||||
)
|
||||
return any(k in text for k in req_kw)
|
||||
|
||||
|
||||
def pick_debounce_seconds(client, data: dict, msg: str) -> float:
|
||||
"""意图驱动防抖:不同意图不同等待区间,并引入轻微随机。"""
|
||||
base = max(1.0, float(client._DEBOUNCE_SECONDS))
|
||||
if not client._adaptive_debounce_enabled:
|
||||
return base
|
||||
|
||||
intent = guess_intent_for_debounce(client, msg)
|
||||
is_req = looks_like_requirement_text(msg)
|
||||
has_img = msg_has_image_url(msg)
|
||||
|
||||
# 区间策略:越明确、越短消息,等待越短;需求描述类稍长
|
||||
if intent == "打招呼":
|
||||
low, high = 1.0, min(3.0, base)
|
||||
elif intent in ("询价", "砍价"):
|
||||
# 询价先略等一会,给客户补发图片/需求的窗口,减少机械两连回
|
||||
low, high = 4.0, min(7.0, max(base, 7.0))
|
||||
elif intent == "image":
|
||||
# 文本里直接贴图链接:短等合并上下文,避免和上一条询价并发
|
||||
low, high = 2.2, 4.2
|
||||
elif intent in ("修改", "批量"):
|
||||
low, high = max(3.0, base * 0.65), min(18.0, base + 2.0)
|
||||
elif intent == "转接":
|
||||
low, high = 1.0, 2.5
|
||||
else:
|
||||
low, high = max(2.0, base * 0.5), base
|
||||
|
||||
# 发图后的需求描述,优先“多等一点”收集完整需求,减少半句回复
|
||||
# 约束到 12-14s,避免等待过长。
|
||||
if is_req and not has_img:
|
||||
low = max(low, 12.0)
|
||||
high = min(14.0, max(high, 12.6))
|
||||
|
||||
# 短句更快,长句稍慢,避免把连续半句拆开
|
||||
text_len = len((msg or "").strip())
|
||||
if text_len <= 4:
|
||||
high = min(high, max(low + 0.2, 2.5))
|
||||
elif text_len >= 18:
|
||||
low = min(high, low + 0.6)
|
||||
|
||||
wait_s = rand_between(low, high)
|
||||
logger.info(f"防抖等待 {wait_s}s | intent={intent} | len={text_len}")
|
||||
return wait_s
|
||||
|
||||
|
||||
def msg_has_image_url(msg: str) -> bool:
|
||||
"""判断文本消息里是否包含图片URL(客户粘贴了图片链接,可能带前缀文字如 有吗#*#https://...)"""
|
||||
if not msg:
|
||||
return False
|
||||
lower = msg.lower()
|
||||
image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp")
|
||||
image_hosts = ("alicdn.com", "imgextra", "taobao.com", "jd.com", "pinduoduo.com")
|
||||
if "http://" in lower or "https://" in lower:
|
||||
if any(ext in lower for ext in image_exts) or any(h in lower for h in image_hosts):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def msg_refers_images(msg: str) -> bool:
|
||||
"""判断文本是否指代之前的图片(图一/图二/这张/那张/上面那张等)"""
|
||||
if not msg:
|
||||
return False
|
||||
refs = (
|
||||
"图一",
|
||||
"图二",
|
||||
"第一张",
|
||||
"第二张",
|
||||
"这张",
|
||||
"那张",
|
||||
"这图",
|
||||
"那个图",
|
||||
"这个",
|
||||
"这个呢",
|
||||
"上面那张",
|
||||
"下面那张",
|
||||
"刚才那张",
|
||||
"上一张",
|
||||
"下一张",
|
||||
)
|
||||
return any(r in msg for r in refs)
|
||||
|
||||
|
||||
def extract_image_urls(msg: str) -> list:
|
||||
if not msg:
|
||||
return []
|
||||
parts = [p.strip() for p in msg.split("#*#") if p.strip()]
|
||||
urls = []
|
||||
for p in parts:
|
||||
if p.startswith("http://") or p.startswith("https://"):
|
||||
urls.append(p)
|
||||
if not urls and ("http://" in msg or "https://" in msg):
|
||||
tokens = re.findall(r"(https?://\S+)", msg)
|
||||
for t in tokens:
|
||||
if any(ext in t.lower() for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"]):
|
||||
urls.append(t)
|
||||
return urls[:8]
|
||||
|
||||
|
||||
def collect_recent_image_urls(client, customer_id: str, acc_id: str, max_count: int = 6) -> list:
|
||||
"""从最近对话中回溯收集图片URL(优先买家消息),用于慢发或引用图片的场景"""
|
||||
urls, seen = [], set()
|
||||
try:
|
||||
from db.chat_log_db import get_recent_conversation
|
||||
|
||||
recent = get_recent_conversation(customer_id=customer_id, acc_id=acc_id, limit=20)
|
||||
# 从最近到更早遍历,收集买家(in)消息中的图片链接
|
||||
for item in reversed(recent):
|
||||
if item.get("direction") != "in":
|
||||
continue
|
||||
message = item.get("message") or ""
|
||||
found = extract_image_urls(message)
|
||||
for u in found:
|
||||
if u not in seen:
|
||||
seen.add(u)
|
||||
urls.append(u)
|
||||
if len(urls) >= max_count:
|
||||
return urls
|
||||
except Exception:
|
||||
logger.debug("收集近期图片URL失败", exc_info=True)
|
||||
return urls
|
||||
36
core/websocket_dispatch_flow.py
Normal file
36
core/websocket_dispatch_flow.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import os
|
||||
|
||||
|
||||
async def dispatch_assign_once_flow(client):
|
||||
"""
|
||||
调用新的一键派单接口:
|
||||
GET {DISPATCH_BASE_URL}/assign
|
||||
Header: X-API-Key
|
||||
"""
|
||||
base_url = os.getenv("DISPATCH_BASE_URL", "http://1.12.50.92:8006").strip().rstrip("/")
|
||||
api_key = os.getenv("DISPATCH_API_KEY", "tuhui_dispatch_key_2026").strip()
|
||||
timeout_s = float(os.getenv("DISPATCH_TIMEOUT_SECONDS", "5"))
|
||||
if not base_url or not api_key:
|
||||
return {"success": False, "reason": "dispatch config missing"}
|
||||
try:
|
||||
import httpx
|
||||
|
||||
async with httpx.AsyncClient(timeout=timeout_s) as http_client:
|
||||
resp = await http_client.get(
|
||||
f"{base_url}/assign",
|
||||
headers={"X-API-Key": api_key},
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return {"success": False, "reason": f"http {resp.status_code}"}
|
||||
data = resp.json() if resp.content else {}
|
||||
ok = bool((data or {}).get("success", False))
|
||||
return {
|
||||
"success": ok,
|
||||
"task_id": str((data or {}).get("task_id", "") or ""),
|
||||
"assigned_to": str((data or {}).get("assigned_to", "") or ""),
|
||||
"online_count": int((data or {}).get("online_count", 0) or 0),
|
||||
"notification_sent": bool((data or {}).get("notification_sent", False)),
|
||||
"raw": data,
|
||||
}
|
||||
except Exception as e:
|
||||
return {"success": False, "reason": str(e)}
|
||||
181
core/websocket_followup_flow.py
Normal file
181
core/websocket_followup_flow.py
Normal file
@@ -0,0 +1,181 @@
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger("cs_agent")
|
||||
|
||||
|
||||
async def unreplied_followup_loop(client):
|
||||
"""定时补偿:对“最后一条是客户消息且长时间未回复”的会话,补发一次自然跟进。"""
|
||||
if not client.enable_agent or not client.agent:
|
||||
return
|
||||
while client.running:
|
||||
try:
|
||||
await asyncio.sleep(max(30, int(os.getenv("UNREPLIED_FOLLOWUP_SCAN_SECONDS", "90"))))
|
||||
await scan_and_send_unreplied_followups(client)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
client._activity_log("unreplied_followup_loop_error", error=str(e))
|
||||
|
||||
|
||||
async def scan_and_send_unreplied_followups(client):
|
||||
from db import chat_log_db as cdb
|
||||
|
||||
try:
|
||||
idle_minutes = max(5, int(os.getenv("UNREPLIED_FOLLOWUP_IDLE_MINUTES", "12")))
|
||||
max_age_minutes = max(idle_minutes, int(os.getenv("UNREPLIED_FOLLOWUP_MAX_AGE_MINUTES", "180")))
|
||||
followup_cd = max(300, int(os.getenv("UNREPLIED_FOLLOWUP_COOLDOWN_SECONDS", "3600")))
|
||||
limit = max(10, int(os.getenv("UNREPLIED_FOLLOWUP_LIMIT", "40")))
|
||||
except Exception:
|
||||
idle_minutes, max_age_minutes, followup_cd, limit = 12, 180, 3600, 40
|
||||
|
||||
now = datetime.now()
|
||||
window_start = (now - timedelta(minutes=max_age_minutes)).strftime("%Y-%m-%d %H:%M:%S")
|
||||
conn = None
|
||||
try:
|
||||
conn = cdb._get_conn()
|
||||
rows = conn.execute(
|
||||
cdb._sql(
|
||||
"""
|
||||
SELECT acc_id, customer_id, MAX(id) AS last_id
|
||||
FROM chat_logs
|
||||
WHERE timestamp >= ?
|
||||
GROUP BY acc_id, customer_id
|
||||
ORDER BY MAX(id) DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
),
|
||||
(window_start, limit * 6),
|
||||
).fetchall()
|
||||
sessions = [dict(r) for r in rows]
|
||||
sent = 0
|
||||
for s in sessions:
|
||||
if sent >= limit:
|
||||
break
|
||||
acc_id = str(s.get("acc_id", "") or "")
|
||||
cid = str(s.get("customer_id", "") or "")
|
||||
if not acc_id or not cid:
|
||||
continue
|
||||
ckey = f"{acc_id}:{cid}"
|
||||
if not client._is_owned_by_this_worker(ckey):
|
||||
continue
|
||||
last = conn.execute(
|
||||
cdb._sql(
|
||||
"""
|
||||
SELECT id, direction, message, timestamp, customer_name, acc_id, platform
|
||||
FROM chat_logs
|
||||
WHERE acc_id = ? AND customer_id = ?
|
||||
ORDER BY id DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
),
|
||||
(acc_id, cid),
|
||||
).fetchone()
|
||||
if not last:
|
||||
continue
|
||||
last = dict(last)
|
||||
if str(last.get("direction", "")) != "in":
|
||||
continue
|
||||
last_ts = last.get("timestamp")
|
||||
if isinstance(last_ts, datetime):
|
||||
last_dt = last_ts
|
||||
else:
|
||||
last_dt = datetime.strptime(str(last_ts)[:19], "%Y-%m-%d %H:%M:%S")
|
||||
idle_s = (now - last_dt).total_seconds()
|
||||
if idle_s < idle_minutes * 60 or idle_s > max_age_minutes * 60:
|
||||
continue
|
||||
now_mono = time.monotonic()
|
||||
if (now_mono - client._unreplied_followup_sent.get(ckey, 0.0)) < followup_cd:
|
||||
continue
|
||||
|
||||
last_msg = str(last.get("message", "") or "").strip().lower()
|
||||
if last_msg in {"好的", "好", "ok", "收到", "嗯", "哦"}:
|
||||
continue
|
||||
|
||||
followup = await compose_ai_scene_reply(
|
||||
client,
|
||||
original_msg={
|
||||
"acc_id": acc_id,
|
||||
"from_id": cid,
|
||||
"from_name": client.to_chinese(last.get("customer_name", "") or cid),
|
||||
"acc_type": str(last.get("platform", "") or "AliWorkbench"),
|
||||
"msg": str(last.get("message", "") or ""),
|
||||
},
|
||||
scene="unreplied_followup",
|
||||
intent_hint="客户上一条消息还没接上,先自然承接并请对方补一句当前要处理的图或要求。",
|
||||
fallback="刚看到你消息了,我在的。你把要处理的图或要求再发我一下,我马上接着看。",
|
||||
)
|
||||
fake = {
|
||||
"acc_id": acc_id,
|
||||
"from_id": cid,
|
||||
"from_name": client.to_chinese(last.get("customer_name", "") or cid),
|
||||
"cy_id": cid,
|
||||
"cy_name": client.to_chinese(last.get("customer_name", "") or cid),
|
||||
"acc_type": str(last.get("platform", "") or "AliWorkbench"),
|
||||
"msg": str(last.get("message", "") or ""),
|
||||
"msg_type": 0,
|
||||
}
|
||||
await client.send_reply(fake, followup)
|
||||
client._unreplied_followup_sent[ckey] = now_mono
|
||||
sent += 1
|
||||
client._activity_log(
|
||||
"unreplied_followup_sent",
|
||||
acc_id=acc_id,
|
||||
customer_id=cid,
|
||||
idle_seconds=int(idle_s),
|
||||
last_msg=str(last.get("message", "") or "")[:120],
|
||||
reply=followup,
|
||||
)
|
||||
finally:
|
||||
try:
|
||||
if conn:
|
||||
conn.close()
|
||||
except Exception:
|
||||
logger.debug("关闭数据库连接失败", exc_info=True)
|
||||
|
||||
|
||||
async def compose_ai_scene_reply(client, *, original_msg: dict, scene: str, intent_hint: str, fallback: str) -> str:
|
||||
"""场景化 AI 直接生成回复(不依赖固定模板)。"""
|
||||
if not client.enable_agent or not client.agent or not client.AgentDeps:
|
||||
return fallback
|
||||
try:
|
||||
deps = client.AgentDeps(
|
||||
msg_id=str(original_msg.get("msg_id", "") or f"{scene}_gen"),
|
||||
acc_id=str(original_msg.get("acc_id", "") or ""),
|
||||
from_id=str(original_msg.get("from_id", "") or ""),
|
||||
platform=str(original_msg.get("acc_type", "") or ""),
|
||||
)
|
||||
customer_msg = client.to_chinese(str(original_msg.get("msg", "") or ""))
|
||||
prompt = (
|
||||
"你是淘宝客服,直接生成一条发给客户的话。\n"
|
||||
f"场景: {scene}\n"
|
||||
f"意图: {intent_hint}\n"
|
||||
f"客户原话: {customer_msg}\n"
|
||||
"要求: 1-2句,自然口语,不要模板腔,不要新增价格/承诺;只输出最终回复。\n"
|
||||
)
|
||||
result = await client.agent.agent_natural_reply.run(prompt, deps=deps, message_history=[])
|
||||
out = str(getattr(result, "output", "") or "").strip()
|
||||
if not out:
|
||||
return fallback
|
||||
if out.startswith("话术|") or "[转移会话]" in out or "TRANSFER_REQUESTED" in out:
|
||||
return fallback
|
||||
client._activity_log(
|
||||
"ai_scene_reply_generated",
|
||||
acc_id=str(original_msg.get("acc_id", "") or ""),
|
||||
customer_id=str(original_msg.get("from_id", "") or ""),
|
||||
scene=scene,
|
||||
generated=out[:160],
|
||||
)
|
||||
return out
|
||||
except Exception as e:
|
||||
client._activity_log(
|
||||
"ai_scene_reply_error",
|
||||
acc_id=str(original_msg.get("acc_id", "") or ""),
|
||||
customer_id=str(original_msg.get("from_id", "") or ""),
|
||||
scene=scene,
|
||||
error=str(e),
|
||||
)
|
||||
return fallback
|
||||
128
core/websocket_helpers_flow.py
Normal file
128
core/websocket_helpers_flow.py
Normal file
@@ -0,0 +1,128 @@
|
||||
import asyncio
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def fire_and_forget(client, coro):
|
||||
"""后台执行协程,不阻塞接收循环;异常会记录到日志。"""
|
||||
task = asyncio.create_task(coro)
|
||||
|
||||
def _done(t):
|
||||
if t.cancelled():
|
||||
return
|
||||
exc = t.exception()
|
||||
if exc:
|
||||
client.logger.exception(f"后台任务异常: {exc}")
|
||||
|
||||
task.add_done_callback(_done)
|
||||
|
||||
|
||||
def prune_seen(seen: dict, now_mono: float, ttl_sec: float = 8.0):
|
||||
if len(seen) <= 2000:
|
||||
return
|
||||
stale = [k for k, t in seen.items() if (now_mono - t) > ttl_sec]
|
||||
for k in stale:
|
||||
seen.pop(k, None)
|
||||
|
||||
|
||||
def log_inbound_once(client, data: dict, chat_log_fn):
|
||||
"""统一记录入站消息,短窗口去重,避免多分支重复写库。"""
|
||||
try:
|
||||
cid = data.get("from_id", "")
|
||||
if not cid:
|
||||
return
|
||||
msg = client.to_chinese(data.get("msg", "") or "")
|
||||
acc_id = data.get("acc_id", "")
|
||||
mtype = int(data.get("msg_type", 0) or 0)
|
||||
now_mono = time.monotonic()
|
||||
sig = f"{acc_id}|{cid}|{mtype}|{msg}"
|
||||
last = client._inbound_log_seen.get(sig, 0.0)
|
||||
if (now_mono - last) < 2.0:
|
||||
return
|
||||
client._inbound_log_seen[sig] = now_mono
|
||||
prune_seen(client._inbound_log_seen, now_mono, ttl_sec=8.0)
|
||||
chat_log_fn(
|
||||
cid,
|
||||
msg,
|
||||
"in",
|
||||
customer_name=client.to_chinese(data.get("from_name", "") or data.get("cy_name", "")),
|
||||
acc_id=acc_id,
|
||||
platform=data.get("acc_type", ""),
|
||||
msg_type=mtype,
|
||||
)
|
||||
except Exception:
|
||||
client.logger.debug("入站消息写库失败", exc_info=True)
|
||||
|
||||
|
||||
def log_outbound_once(client, original_msg: dict, reply_content: str, chat_log_fn):
|
||||
"""统一记录出站消息,短窗口去重,避免重复写库。"""
|
||||
try:
|
||||
cid = original_msg.get("from_id", "")
|
||||
if not cid:
|
||||
return
|
||||
msg = reply_content or ""
|
||||
acc_id = original_msg.get("acc_id", "")
|
||||
now_mono = time.monotonic()
|
||||
sig = f"{acc_id}|{cid}|0|{msg}"
|
||||
last = client._outbound_log_seen.get(sig, 0.0)
|
||||
if (now_mono - last) < 2.0:
|
||||
return
|
||||
client._outbound_log_seen[sig] = now_mono
|
||||
prune_seen(client._outbound_log_seen, now_mono, ttl_sec=8.0)
|
||||
chat_log_fn(
|
||||
cid,
|
||||
msg,
|
||||
"out",
|
||||
customer_name=client.to_chinese(original_msg.get("from_name", "") or original_msg.get("cy_name", "")),
|
||||
acc_id=acc_id,
|
||||
platform=original_msg.get("acc_type", ""),
|
||||
msg_type=0,
|
||||
)
|
||||
except Exception:
|
||||
client.logger.debug("出站消息写库失败", exc_info=True)
|
||||
|
||||
|
||||
def build_customer_message(client, data: dict, customer_message_cls):
|
||||
"""把原始消息字典转换为 Agent 输入模型。"""
|
||||
return customer_message_cls(
|
||||
msg_id=data.get("msg_id", ""),
|
||||
acc_id=data.get("acc_id", ""),
|
||||
msg=client.to_chinese(data.get("msg", "")),
|
||||
from_id=data.get("from_id", ""),
|
||||
from_name=client.to_chinese(data.get("from_name", "")),
|
||||
cy_id=data.get("cy_id", ""),
|
||||
acc_type=data.get("acc_type", ""),
|
||||
msg_type=data.get("msg_type", 0),
|
||||
cy_name=client.to_chinese(data.get("cy_name", "")),
|
||||
goods_name=client.to_chinese(data.get("goods_name", "")) if data.get("goods_name") else None,
|
||||
goods_order=client.to_chinese(data.get("goods_order", "")) if data.get("goods_order") else None,
|
||||
)
|
||||
|
||||
|
||||
def touch_customer_last_contact(client, customer_id: str, db):
|
||||
"""兜底更新客户最后联系时间。"""
|
||||
if not customer_id:
|
||||
return
|
||||
try:
|
||||
profile = db.get_customer(customer_id)
|
||||
profile.last_contact = datetime.now().isoformat()
|
||||
db.save_customer(profile)
|
||||
except Exception:
|
||||
client.logger.debug("更新客户最后联系时间失败: customer_id=%s", customer_id, exc_info=True)
|
||||
|
||||
|
||||
def push_chat_to_wechat_safe(client, *, data: dict, customer_msg: str, reply_msg: str, tag: str, goods_name: str = ""):
|
||||
"""异步推送企微聊天日志,失败不影响主流程。"""
|
||||
try:
|
||||
from utils.wechat_chat_log import push_chat_to_wechat
|
||||
|
||||
asyncio.create_task(push_chat_to_wechat(
|
||||
customer_name=client.to_chinese(data.get("from_name", "") or data.get("cy_name", "")),
|
||||
customer_id=data.get("from_id", ""),
|
||||
acc_id=data.get("acc_id", ""),
|
||||
customer_msg=client.to_chinese(customer_msg or ""),
|
||||
reply_msg=reply_msg or "",
|
||||
goods_name=goods_name or client.to_chinese(data.get("goods_name", "") or ""),
|
||||
))
|
||||
except Exception:
|
||||
client.logger.debug("推送企微聊天日志失败(%s)", tag, exc_info=True)
|
||||
11
core/websocket_image_entry_flow.py
Normal file
11
core/websocket_image_entry_flow.py
Normal file
@@ -0,0 +1,11 @@
|
||||
async def handle_image_message_flow(client, data: dict):
|
||||
"""
|
||||
处理图片消息。
|
||||
先回复"我找找",然后把图片URL作为消息内容交给 Agent(后台执行)。
|
||||
"""
|
||||
await client.send_reply(data, "我找找")
|
||||
|
||||
image_data = dict(data)
|
||||
image_data["msg"] = f"[客户发来图片] {data.get('msg', '')}"
|
||||
image_data["msg_type"] = 0
|
||||
client._fire_and_forget(client._agent_reply_serialized(image_data))
|
||||
86
core/websocket_logger_setup.py
Normal file
86
core/websocket_logger_setup.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class _AnsiColorFormatter(logging.Formatter):
|
||||
RESET = "\033[0m"
|
||||
MESSAGE_TEXT_REPLACEMENTS = (
|
||||
("[PROMPT->AI 前置提示词]", "[AI提示词]"),
|
||||
("[PROMPT->AI", "[AI提示词"),
|
||||
("[THINK/TOOL_CALL]", "[AI思考-工具调用]"),
|
||||
("[THINK/TOOL_RETURN]", "[AI思考-工具返回]"),
|
||||
("[THINK/RAW_OUTPUT]", "[AI思考-原始输出]"),
|
||||
("[REPLY->CUSTOMER]", "[AI回复客户]"),
|
||||
("[ACTIVITY]", "[活动日志]"),
|
||||
("[AI质检]", "[AI质检]"),
|
||||
)
|
||||
MESSAGE_COLOR_RULES = (
|
||||
("[PROMPT->AI", "\033[94m"),
|
||||
("[THINK/", "\033[96m"),
|
||||
("[REPLY->CUSTOMER]", "\033[92m"),
|
||||
("Agent 回复", "\033[92m"),
|
||||
("[ACTIVITY]", "\033[95m"),
|
||||
("[AI质检]", "\033[97m"),
|
||||
("收到新消息", "\033[36m"),
|
||||
("发送成功", "\033[32m"),
|
||||
("防抖等待", "\033[93m"),
|
||||
)
|
||||
COLORS = {
|
||||
logging.DEBUG: "\033[36m",
|
||||
logging.INFO: "\033[32m",
|
||||
logging.WARNING: "\033[33m",
|
||||
logging.ERROR: "\033[31m",
|
||||
logging.CRITICAL: "\033[35m",
|
||||
}
|
||||
|
||||
def __init__(self, fmt: str, datefmt: str | None = None, use_color: bool = True):
|
||||
super().__init__(fmt=fmt, datefmt=datefmt)
|
||||
self.use_color = use_color
|
||||
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
msg = super().format(record)
|
||||
if not self.use_color:
|
||||
for old, new in self.MESSAGE_TEXT_REPLACEMENTS:
|
||||
msg = msg.replace(old, new)
|
||||
return msg
|
||||
raw_msg = record.getMessage()
|
||||
for old, new in self.MESSAGE_TEXT_REPLACEMENTS:
|
||||
msg = msg.replace(old, new)
|
||||
for key, color in self.MESSAGE_COLOR_RULES:
|
||||
if key in raw_msg:
|
||||
return f"{color}{msg}{self.RESET}"
|
||||
color = self.COLORS.get(record.levelno, "")
|
||||
if not color:
|
||||
return msg
|
||||
return f"{color}{msg}{self.RESET}"
|
||||
|
||||
|
||||
def setup_logger():
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from config.config import LOG_DIR, LOG_MAX_BYTES, LOG_BACKUP_COUNT
|
||||
|
||||
logger = logging.getLogger("cs_agent")
|
||||
if getattr(logger, "_cs_logger_configured", False):
|
||||
return logger
|
||||
logger.setLevel(logging.INFO)
|
||||
logger.propagate = False
|
||||
fmt = logging.Formatter("[%(asctime)s] %(message)s", datefmt="%H:%M:%S")
|
||||
use_color = (os.getenv("LOG_COLOR", "1").lower() in ("1", "true", "yes")) and not bool(os.getenv("NO_COLOR"))
|
||||
|
||||
ch = logging.StreamHandler()
|
||||
ch.setFormatter(_AnsiColorFormatter("[%(asctime)s] %(message)s", datefmt="%H:%M:%S", use_color=use_color))
|
||||
logger.addHandler(ch)
|
||||
|
||||
LOG_DIR.mkdir(exist_ok=True)
|
||||
today = datetime.now().strftime("%Y-%m-%d")
|
||||
fh = RotatingFileHandler(
|
||||
LOG_DIR / f"chat_{today}.log",
|
||||
maxBytes=LOG_MAX_BYTES,
|
||||
backupCount=LOG_BACKUP_COUNT,
|
||||
encoding="utf-8",
|
||||
)
|
||||
fh.setFormatter(fmt)
|
||||
logger.addHandler(fh)
|
||||
logger._cs_logger_configured = True
|
||||
return logger
|
||||
98
core/websocket_message_utils_flow.py
Normal file
98
core/websocket_message_utils_flow.py
Normal file
@@ -0,0 +1,98 @@
|
||||
import json
|
||||
import random
|
||||
import re
|
||||
|
||||
|
||||
def to_chinese_text(text):
|
||||
"""处理文本,安全地转换 unicode 转义。"""
|
||||
if not isinstance(text, str):
|
||||
return text
|
||||
if "\\u" not in text:
|
||||
return text
|
||||
try:
|
||||
return json.loads(f'"{text}"')
|
||||
except Exception:
|
||||
return text
|
||||
|
||||
|
||||
def is_transfer_msg(client, data: dict) -> bool:
|
||||
msg = to_chinese_text(data.get("msg", ""))
|
||||
return "转交给" in msg or "转接给" in msg
|
||||
|
||||
|
||||
def pick_transfer_greeting() -> str:
|
||||
choices = [
|
||||
"在的亲,发图我看下",
|
||||
"在呢亲,有需求直接说",
|
||||
"我在的,您把要求发我",
|
||||
"在的哈,你说我这边看着处理",
|
||||
"在呢,图和需求发来我看看",
|
||||
]
|
||||
return random.choice(choices)
|
||||
|
||||
|
||||
def is_shop_card(client, data: dict) -> bool:
|
||||
msg = to_chinese_text(data.get("msg", ""))
|
||||
return msg.startswith("[进店卡片]") or "我想咨询你们店的这个商品" in msg
|
||||
|
||||
|
||||
def extract_customer_text_from_shop_card_msg(client, msg: str) -> str:
|
||||
text = to_chinese_text(msg or "").strip()
|
||||
if not text:
|
||||
return ""
|
||||
parts = [p.strip() for p in text.split("#*#") if p and p.strip()]
|
||||
kept = []
|
||||
for part in parts:
|
||||
if part.startswith("[进店卡片]") or "我想咨询你们店的这个商品" in part:
|
||||
continue
|
||||
kept.append(part)
|
||||
if kept:
|
||||
return " ".join(kept).strip()
|
||||
stripped = re.sub(r"\[进店卡片\][^\n\r]*", "", text).strip()
|
||||
stripped = stripped.replace("我想咨询你们店的这个商品", "").strip(",。,#* ")
|
||||
return stripped
|
||||
|
||||
|
||||
def has_chat_history(customer_id: str, acc_id: str = "") -> bool:
|
||||
if not customer_id:
|
||||
return False
|
||||
try:
|
||||
from db.chat_log_db import get_recent_conversation
|
||||
|
||||
msgs = get_recent_conversation(customer_id, acc_id=acc_id, limit=1)
|
||||
return len(msgs) > 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def should_ignore(client, data: dict) -> bool:
|
||||
msg = to_chinese_text(data.get("msg", ""))
|
||||
|
||||
ignore_patterns = [
|
||||
"已转接",
|
||||
"接入会话",
|
||||
"结束会话",
|
||||
"会话已",
|
||||
"[系统消息]",
|
||||
"[系统通知]",
|
||||
]
|
||||
for pattern in ignore_patterns:
|
||||
if pattern in msg:
|
||||
return True
|
||||
|
||||
acc_id = data.get("acc_id", "")
|
||||
from_id = data.get("from_id", "")
|
||||
if acc_id and from_id and acc_id == from_id:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def get_msg_type_name(msg_type):
|
||||
types = {
|
||||
0: "文本",
|
||||
1: "图片",
|
||||
2: "视频",
|
||||
3: "文件",
|
||||
}
|
||||
return types.get(msg_type, f"未知({msg_type})")
|
||||
84
core/websocket_misc_rules_flow.py
Normal file
84
core/websocket_misc_rules_flow.py
Normal file
@@ -0,0 +1,84 @@
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
|
||||
def msg_is_price_inquiry(msg: str) -> bool:
|
||||
if not msg:
|
||||
return False
|
||||
patterns = ("多少钱", "多少一张", "一张多少钱", "画图多少", "报价", "给个价", "几块", "多少钱")
|
||||
return any(p in msg for p in patterns)
|
||||
|
||||
|
||||
def detect_order_status(msg: str) -> str:
|
||||
if not msg:
|
||||
return ""
|
||||
s = msg
|
||||
if "买家已付款" in s or "已付款" in s:
|
||||
return "paid"
|
||||
if "[系统订单信息]" in s:
|
||||
if "等待买家付款" in s or "未付款" in s:
|
||||
return "waiting"
|
||||
return "order"
|
||||
return ""
|
||||
|
||||
|
||||
def msg_requests_external_contact(msg: str) -> bool:
|
||||
if not msg:
|
||||
return False
|
||||
lower = msg.lower()
|
||||
kws = ("加qq", "qq号", "vx", "微信", "加v", "联系方式", "私聊", "加一下", "加个", "手机号", "电话", "加群", "q q", "v 信")
|
||||
return any(k in lower for k in kws)
|
||||
|
||||
|
||||
def extract_size_pairs_m(msg: str) -> list[tuple[float, float]]:
|
||||
"""提取消息中的米制尺寸对,如 15*6.4米 / 15米*6.4 / 15x6.4m。"""
|
||||
if not msg:
|
||||
return []
|
||||
s = (msg or "").lower().replace("×", "*").replace("x", "*")
|
||||
pairs = []
|
||||
patterns = [
|
||||
r"(\d+(?:\.\d+)?)\s*\*\s*(\d+(?:\.\d+)?)\s*(?:米|m)\b",
|
||||
r"(\d+(?:\.\d+)?)\s*(?:米|m)\s*\*\s*(\d+(?:\.\d+)?)\b",
|
||||
]
|
||||
for p in patterns:
|
||||
for m in re.findall(p, s):
|
||||
try:
|
||||
a = float(m[0])
|
||||
b = float(m[1])
|
||||
if a > 0 and b > 0:
|
||||
pairs.append((a, b))
|
||||
except Exception:
|
||||
continue
|
||||
return pairs
|
||||
|
||||
|
||||
def oversize_reply_if_needed(msg: str) -> str:
|
||||
"""
|
||||
检测超大尺寸需求并返回拒绝话术;未命中返回空字符串。
|
||||
规则:最长边 > 阈值 或 面积 > 阈值。
|
||||
"""
|
||||
try:
|
||||
from config.config import MAX_SERVICE_SIZE_LONGEST_METERS, MAX_SERVICE_SIZE_AREA_SQM
|
||||
|
||||
longest_limit = float(MAX_SERVICE_SIZE_LONGEST_METERS)
|
||||
area_limit = float(MAX_SERVICE_SIZE_AREA_SQM)
|
||||
except Exception:
|
||||
longest_limit = 10.0
|
||||
area_limit = 20.0
|
||||
|
||||
pairs = extract_size_pairs_m(msg)
|
||||
for w, h in pairs:
|
||||
longest = max(w, h)
|
||||
area = w * h
|
||||
if longest > longest_limit or area > area_limit:
|
||||
return (
|
||||
f"{w:g}米*{h:g}米这个尺寸太大了,我们这边做不了。"
|
||||
"如果要做可以拆成几段小尺寸,我再给你按段评估。"
|
||||
)
|
||||
return ""
|
||||
|
||||
|
||||
def build_auto_quote_signature(state: Any) -> str:
|
||||
from core.websocket_auto_quote_flow import build_auto_quote_signature as _build
|
||||
|
||||
return _build(state)
|
||||
130
core/websocket_outbound_arbiter_flow.py
Normal file
130
core/websocket_outbound_arbiter_flow.py
Normal file
@@ -0,0 +1,130 @@
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
|
||||
|
||||
def normalize_reply_semantic_key(text: str) -> str:
|
||||
s = (text or "").strip().lower()
|
||||
if not s:
|
||||
return ""
|
||||
for w in ("哈", "呀", "哦", "呢", "啦", "咯", "亲"):
|
||||
s = s.replace(w, "")
|
||||
s = re.sub(r"[,。!?、,.!?::;\s~\-—_]+", "", s)
|
||||
return s[:200]
|
||||
|
||||
|
||||
def classify_outbound_reply(text: str) -> str:
|
||||
s = (text or "").strip()
|
||||
if not s:
|
||||
return "empty"
|
||||
if any(k in s for k in ("报价", "总价", "多少钱", "多少", "马上给你报价", "先给你报")):
|
||||
return "quote"
|
||||
if any(k in s for k in ("继续发图", "发完", "发图", "把图发", "先看图")):
|
||||
return "collect"
|
||||
if any(k in s for k in ("在吗", "你好", "在的", "在呢")):
|
||||
return "greeting"
|
||||
if any(k in s for k in ("转人工", "转接", "转给")):
|
||||
return "transfer"
|
||||
if any(k in s for k in ("稍等", "我先看", "看一下", "看下")):
|
||||
return "ack"
|
||||
return "general"
|
||||
|
||||
|
||||
def template_family(reply: str) -> str:
|
||||
s = (reply or "").strip()
|
||||
if not s:
|
||||
return ""
|
||||
if "需求我记上了" in s and "继续发图" in s:
|
||||
return "collect_remind"
|
||||
if ("这批图过一遍" in s or "收齐了" in s or "收好了" in s) and ("总价" in s or "报价" in s):
|
||||
return "quote_defer"
|
||||
if "图片收到了" in s and "继续发" in s:
|
||||
return "collect_ack"
|
||||
if "好嘞,你稍等下,我这边看一下" in s:
|
||||
return "fallback_ack"
|
||||
return ""
|
||||
|
||||
|
||||
def outbound_arbiter(client, original_msg: dict, reply_content: str, trace_id: str) -> tuple[bool, str]:
|
||||
"""
|
||||
统一出站裁决层:
|
||||
1) 语义去重(相同语义短窗口不重复);
|
||||
2) 同类回复节流(同类话术短窗口不重复)。
|
||||
"""
|
||||
key = f"{original_msg.get('acc_id', '')}:{original_msg.get('from_id', '')}"
|
||||
now_mono = time.monotonic()
|
||||
sem_key = normalize_reply_semantic_key(reply_content)
|
||||
reply_class = classify_outbound_reply(reply_content)
|
||||
try:
|
||||
sem_window = max(30, int(os.getenv("AI_OUTBOUND_SEMANTIC_DEDUPE_SECONDS", "180")))
|
||||
except Exception:
|
||||
sem_window = 180
|
||||
try:
|
||||
class_window = max(20, int(os.getenv("AI_OUTBOUND_CLASS_DEDUPE_SECONDS", "90")))
|
||||
except Exception:
|
||||
class_window = 90
|
||||
try:
|
||||
template_window = max(120, int(os.getenv("AI_OUTBOUND_TEMPLATE_FATIGUE_SECONDS", "600")))
|
||||
except Exception:
|
||||
template_window = 600
|
||||
|
||||
sem_bucket = client._outbound_semantic_seen.setdefault(key, {})
|
||||
cls_bucket = client._outbound_class_seen.setdefault(key, {})
|
||||
tpl_bucket = client._outbound_template_seen.setdefault(key, {})
|
||||
client._prune_seen(sem_bucket, now_mono, ttl_sec=max(sem_window * 2, 240))
|
||||
client._prune_seen(cls_bucket, now_mono, ttl_sec=max(class_window * 2, 180))
|
||||
client._prune_seen(tpl_bucket, now_mono, ttl_sec=max(template_window * 2, 1200))
|
||||
|
||||
if sem_key and (now_mono - sem_bucket.get(sem_key, 0.0)) < sem_window:
|
||||
client._activity_log(
|
||||
"outbound_arbiter_block",
|
||||
trace_id=trace_id,
|
||||
acc_id=original_msg.get("acc_id", ""),
|
||||
customer_id=original_msg.get("from_id", ""),
|
||||
reason="semantic_duplicate",
|
||||
semantic_key=sem_key[:80],
|
||||
reply_class=reply_class,
|
||||
msg=reply_content,
|
||||
)
|
||||
return False, "semantic_duplicate"
|
||||
|
||||
family = template_family(reply_content)
|
||||
if family and (now_mono - tpl_bucket.get(family, 0.0)) < template_window:
|
||||
client._activity_log(
|
||||
"outbound_arbiter_block",
|
||||
trace_id=trace_id,
|
||||
acc_id=original_msg.get("acc_id", ""),
|
||||
customer_id=original_msg.get("from_id", ""),
|
||||
reason="template_fatigue",
|
||||
template_family=family,
|
||||
msg=reply_content,
|
||||
)
|
||||
return False, "template_fatigue"
|
||||
|
||||
if reply_class in {"quote", "collect", "ack"} and (now_mono - cls_bucket.get(reply_class, 0.0)) < class_window:
|
||||
client._activity_log(
|
||||
"outbound_arbiter_block",
|
||||
trace_id=trace_id,
|
||||
acc_id=original_msg.get("acc_id", ""),
|
||||
customer_id=original_msg.get("from_id", ""),
|
||||
reason="class_duplicate",
|
||||
reply_class=reply_class,
|
||||
msg=reply_content,
|
||||
)
|
||||
return False, "class_duplicate"
|
||||
|
||||
if sem_key:
|
||||
sem_bucket[sem_key] = now_mono
|
||||
cls_bucket[reply_class] = now_mono
|
||||
if family:
|
||||
tpl_bucket[family] = now_mono
|
||||
client._activity_log(
|
||||
"outbound_arbiter_pass",
|
||||
trace_id=trace_id,
|
||||
acc_id=original_msg.get("acc_id", ""),
|
||||
customer_id=original_msg.get("from_id", ""),
|
||||
reply_class=reply_class,
|
||||
template_family=family,
|
||||
semantic_key=sem_key[:80] if sem_key else "",
|
||||
)
|
||||
return True, "pass"
|
||||
119
core/websocket_runtime_flow.py
Normal file
119
core/websocket_runtime_flow.py
Normal file
@@ -0,0 +1,119 @@
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
|
||||
async def command_handler_flow(client):
|
||||
"""命令行交互。"""
|
||||
client.logger.info("\n命令帮助:")
|
||||
client.logger.info(" reply <内容> - 回复最后一条消息")
|
||||
client.logger.info(" text <id> <平台> <内容> - 发送文本消息")
|
||||
client.logger.info(" img <id> <平台> <路径> - 发送图片")
|
||||
client.logger.info(" setid <id> - 设置回复ID")
|
||||
client.logger.info(" agent on/off - 开启/关闭 Agent")
|
||||
client.logger.info(" exit/quit - 退出\n")
|
||||
|
||||
while client.running:
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
user_input = await loop.run_in_executor(None, input, "")
|
||||
|
||||
parts = user_input.strip().split(maxsplit=1)
|
||||
if not parts:
|
||||
continue
|
||||
|
||||
cmd = parts[0].lower()
|
||||
|
||||
if cmd in ["exit", "quit", "q"]:
|
||||
client.logger.info(f"[{client.get_time()}] 正在关闭...")
|
||||
client.running = False
|
||||
if client.websocket:
|
||||
await client.websocket.close()
|
||||
break
|
||||
|
||||
if cmd == "setid" and len(parts) > 1:
|
||||
client.reply_id = parts[1]
|
||||
client.logger.info(f"[{client.get_time()}] 回复ID已设置为: {client.reply_id}")
|
||||
continue
|
||||
|
||||
if cmd == "agent" and len(parts) > 1:
|
||||
if parts[1].lower() == "on":
|
||||
client.enable_agent = True
|
||||
client.logger.info(f"[{client.get_time()}] Agent 已开启")
|
||||
elif parts[1].lower() == "off":
|
||||
client.enable_agent = False
|
||||
client.logger.info(f"[{client.get_time()}] Agent 已关闭")
|
||||
continue
|
||||
|
||||
if cmd == "reply" and len(parts) > 1:
|
||||
if client.last_msg:
|
||||
await client.send_reply(client.last_msg, parts[1])
|
||||
else:
|
||||
client.logger.info(f"[{client.get_time()}] 错误: 还没有收到任何消息")
|
||||
continue
|
||||
|
||||
if cmd == "text" and len(parts) > 1:
|
||||
args = parts[1].split(maxsplit=2)
|
||||
if len(args) >= 3:
|
||||
await client.send_text(args[0], args[1], args[2])
|
||||
else:
|
||||
client.logger.info(f"[{client.get_time()}] 格式: text <cy_id> <acc_type> <内容>")
|
||||
continue
|
||||
|
||||
if cmd == "img" and len(parts) > 1:
|
||||
args = parts[1].split(maxsplit=2)
|
||||
if len(args) >= 3:
|
||||
await client.send_image(args[0], args[1], args[2])
|
||||
else:
|
||||
client.logger.info(f"[{client.get_time()}] 格式: img <cy_id> <acc_type> <图片路径>")
|
||||
continue
|
||||
|
||||
client.logger.info(f"[{client.get_time()}] 未知命令: {cmd}")
|
||||
|
||||
except Exception as e:
|
||||
client.logger.info(f"[{client.get_time()}] 命令错误: {e}")
|
||||
|
||||
|
||||
async def run_client_flow(client):
|
||||
"""运行客户端。"""
|
||||
tasks = [client.connect(), client.command_handler()]
|
||||
|
||||
try:
|
||||
from mail.email_receiver import email_receiver
|
||||
if email_receiver.username:
|
||||
client.logger.info(f"[{client.get_time()}] 邮件接收已启动,监控: {email_receiver.username}")
|
||||
tasks.append(email_receiver.start())
|
||||
else:
|
||||
client.logger.info(f"[{client.get_time()}] 未配置邮件账号,跳过邮件接收")
|
||||
except Exception as e:
|
||||
client.logger.info(f"[{client.get_time()}] 邮件接收模块加载失败: {e}")
|
||||
|
||||
try:
|
||||
from utils.daily_summary import scheduler as daily_scheduler
|
||||
tasks.append(daily_scheduler())
|
||||
client.logger.info(f"[{client.get_time()}] 每日日报定时任务已启动")
|
||||
except Exception as e:
|
||||
client.logger.info(f"[{client.get_time()}] 日报模块加载失败: {e}")
|
||||
|
||||
try:
|
||||
from utils.health_check import health_check_loop
|
||||
|
||||
def _qingjian_ok():
|
||||
return client.websocket is not None and not getattr(client.websocket, "closed", True)
|
||||
|
||||
tasks.append(health_check_loop(_qingjian_ok))
|
||||
client.logger.info(f"[{client.get_time()}] 健康检查已启动")
|
||||
except Exception as e:
|
||||
client.logger.info(f"[{client.get_time()}] 健康检查模块加载失败: {e}")
|
||||
|
||||
try:
|
||||
from utils.wechat_chat_log import morning_startup_scheduler
|
||||
tasks.append(morning_startup_scheduler())
|
||||
client.logger.info(f"[{client.get_time()}] 早8点企微启动消息已启动")
|
||||
except Exception as e:
|
||||
client.logger.info(f"[{client.get_time()}] 企微启动消息模块加载失败: {e}")
|
||||
|
||||
if os.getenv("UNREPLIED_FOLLOWUP_ENABLED", "true").lower() in ("1", "true", "yes"):
|
||||
tasks.append(client._unreplied_followup_loop())
|
||||
client.logger.info(f"[{client.get_time()}] 未回复会话补偿任务已启动")
|
||||
|
||||
await asyncio.gather(*tasks)
|
||||
70
core/websocket_send_flow.py
Normal file
70
core/websocket_send_flow.py
Normal file
@@ -0,0 +1,70 @@
|
||||
import json
|
||||
import websockets
|
||||
|
||||
|
||||
async def send_text_flow(client, cy_id, acc_type, content):
|
||||
"""主动发送文本消息。"""
|
||||
message = {
|
||||
"msg_id": "",
|
||||
"acc_id": "",
|
||||
"msg": content,
|
||||
"from_id": client.reply_id,
|
||||
"from_name": client.reply_id,
|
||||
"cy_id": cy_id,
|
||||
"acc_type": acc_type,
|
||||
"msg_type": 0,
|
||||
"cy_name": "",
|
||||
}
|
||||
await client.send_message(message)
|
||||
|
||||
|
||||
async def send_image_flow(client, cy_id, acc_type, image_path):
|
||||
"""主动发送图片消息。"""
|
||||
message = {
|
||||
"msg_id": "",
|
||||
"acc_id": "",
|
||||
"msg": image_path,
|
||||
"from_id": client.reply_id,
|
||||
"from_name": client.reply_id,
|
||||
"cy_id": cy_id,
|
||||
"acc_type": acc_type,
|
||||
"msg_type": 1,
|
||||
"cy_name": "",
|
||||
}
|
||||
await client.send_message(message)
|
||||
|
||||
|
||||
async def send_message_flow(client, message):
|
||||
"""发送消息到服务器。"""
|
||||
if client.websocket and client.websocket.state == websockets.protocol.State.OPEN:
|
||||
try:
|
||||
msg_json = json.dumps(message, ensure_ascii=False)
|
||||
await client.websocket.send(msg_json)
|
||||
pretty = json.dumps(message, ensure_ascii=False, indent=2)
|
||||
client.logger.info(f"[{client.get_time()}] 发送成功:\n{pretty}")
|
||||
client._activity_log(
|
||||
"send_message_success",
|
||||
trace_id=message.get("_trace_id", ""),
|
||||
acc_id=message.get("acc_id", ""),
|
||||
customer_id=message.get("from_id", ""),
|
||||
msg_type=message.get("msg_type", 0),
|
||||
msg=message.get("msg", ""),
|
||||
)
|
||||
except Exception as e:
|
||||
client.logger.info(f"[{client.get_time()}] 发送失败: {e}")
|
||||
client._activity_log(
|
||||
"send_message_error",
|
||||
trace_id=message.get("_trace_id", ""),
|
||||
acc_id=message.get("acc_id", ""),
|
||||
customer_id=message.get("from_id", ""),
|
||||
error=str(e),
|
||||
)
|
||||
else:
|
||||
client.logger.info(f"[{client.get_time()}] 错误: 连接未打开")
|
||||
client._activity_log(
|
||||
"send_message_skipped",
|
||||
trace_id=message.get("_trace_id", ""),
|
||||
reason="socket_not_open",
|
||||
acc_id=message.get("acc_id", ""),
|
||||
customer_id=message.get("from_id", ""),
|
||||
)
|
||||
23
core/websocket_summary_flow.py
Normal file
23
core/websocket_summary_flow.py
Normal file
@@ -0,0 +1,23 @@
|
||||
async def save_conversation_summary_flow(client, customer_id: str, buyer_msg: str, agent_reply: str):
|
||||
"""用 AI 生成一句话对话摘要并持久化。"""
|
||||
try:
|
||||
from db.customer_db import db
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
api_client = AsyncOpenAI(
|
||||
api_key=client.agent.api_key if client.agent else None,
|
||||
base_url=client.agent.base_url if client.agent else None,
|
||||
)
|
||||
resp = await api_client.chat.completions.create(
|
||||
model=client.agent.model_name if client.agent else "gpt-4o-mini",
|
||||
messages=[
|
||||
{"role": "system", "content": "用一句话(15字以内)总结这段对话的核心内容,只输出摘要文字。"},
|
||||
{"role": "user", "content": f"买家:{buyer_msg}\n客服:{agent_reply}"},
|
||||
],
|
||||
max_tokens=30,
|
||||
temperature=0.3,
|
||||
)
|
||||
summary = resp.choices[0].message.content.strip()
|
||||
db.save_conversation_summary(customer_id, summary)
|
||||
except Exception:
|
||||
client.logger.debug("保存对话摘要失败(不影响主流程)", exc_info=True)
|
||||
143
core/websocket_system_inquiry_flow.py
Normal file
143
core/websocket_system_inquiry_flow.py
Normal file
@@ -0,0 +1,143 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from utils.metrics_tracker import emit as metrics_emit
|
||||
|
||||
logger = logging.getLogger("cs_agent")
|
||||
|
||||
|
||||
def load_system_inquiry_rules() -> Dict[str, Any]:
|
||||
"""加载系统客服询单规则(全局 + 店铺覆盖)。"""
|
||||
from config.config import (
|
||||
SYSTEM_INQUIRY_ENABLED,
|
||||
SYSTEM_INQUIRY_DEFAULT_ACTION,
|
||||
SYSTEM_INQUIRY_DEFAULT_REPLY,
|
||||
SYSTEM_INQUIRY_RULES_FILE,
|
||||
)
|
||||
|
||||
enabled_env = os.getenv("SYSTEM_INQUIRY_ENABLED")
|
||||
enabled = (
|
||||
enabled_env.lower() in ("1", "true", "yes")
|
||||
if isinstance(enabled_env, str)
|
||||
else bool(SYSTEM_INQUIRY_ENABLED)
|
||||
)
|
||||
action = (os.getenv("SYSTEM_INQUIRY_DEFAULT_ACTION") or SYSTEM_INQUIRY_DEFAULT_ACTION or "silent").strip().lower()
|
||||
reply = os.getenv("SYSTEM_INQUIRY_DEFAULT_REPLY") or SYSTEM_INQUIRY_DEFAULT_REPLY or ""
|
||||
rules_file = os.getenv("SYSTEM_INQUIRY_RULES_FILE") or str(SYSTEM_INQUIRY_RULES_FILE)
|
||||
defaults: Dict[str, Any] = {
|
||||
"enabled": bool(enabled),
|
||||
"default_action": action,
|
||||
"default_reply": reply,
|
||||
"sender_keywords": ["系统客服", "官方客服", "平台客服", "机器人客服", "商家客服系统"],
|
||||
"message_keywords": ["系统询单", "代客咨询", "平台代问", "系统代发", "客服询单"],
|
||||
"shops": {},
|
||||
}
|
||||
try:
|
||||
p = Path(rules_file)
|
||||
if p.exists():
|
||||
with p.open("r", encoding="utf-8") as f:
|
||||
loaded = json.load(f)
|
||||
if isinstance(loaded, dict):
|
||||
defaults.update(loaded)
|
||||
except Exception as e:
|
||||
logger.warning("系统询单规则加载失败,使用默认规则: %s", e)
|
||||
return defaults
|
||||
|
||||
|
||||
def normalize_kw_list(v: Any) -> List[str]:
|
||||
if not isinstance(v, list):
|
||||
return []
|
||||
return [str(x).strip().lower() for x in v if str(x).strip()]
|
||||
|
||||
|
||||
def resolve_system_inquiry_policy(client, acc_id: str) -> Dict[str, Any]:
|
||||
"""根据店铺合并系统询单策略。"""
|
||||
from config.config import SYSTEM_INQUIRY_SHOPS
|
||||
|
||||
rules = client._system_inquiry_rules or {}
|
||||
if not bool(rules.get("enabled", True)):
|
||||
return {"enabled": False}
|
||||
|
||||
shops_env = os.getenv("SYSTEM_INQUIRY_SHOPS", SYSTEM_INQUIRY_SHOPS or "")
|
||||
shop_whitelist = [s.strip() for s in shops_env.split(",") if s.strip()]
|
||||
if shop_whitelist and (acc_id or "") not in shop_whitelist:
|
||||
return {"enabled": False}
|
||||
|
||||
policy: Dict[str, Any] = {
|
||||
"enabled": True,
|
||||
"action": str(rules.get("default_action", "silent")).strip().lower(),
|
||||
"reply": str(rules.get("default_reply", "")).strip(),
|
||||
"sender_keywords": normalize_kw_list(rules.get("sender_keywords")),
|
||||
"message_keywords": normalize_kw_list(rules.get("message_keywords")),
|
||||
}
|
||||
shop_cfg = (rules.get("shops") or {}).get(acc_id or "", {})
|
||||
if isinstance(shop_cfg, dict):
|
||||
if "enabled" in shop_cfg and not bool(shop_cfg.get("enabled", True)):
|
||||
return {"enabled": False}
|
||||
if shop_cfg.get("action"):
|
||||
policy["action"] = str(shop_cfg.get("action")).strip().lower()
|
||||
if shop_cfg.get("reply"):
|
||||
policy["reply"] = str(shop_cfg.get("reply")).strip()
|
||||
if isinstance(shop_cfg.get("sender_keywords"), list):
|
||||
policy["sender_keywords"] = normalize_kw_list(shop_cfg.get("sender_keywords"))
|
||||
if isinstance(shop_cfg.get("message_keywords"), list):
|
||||
policy["message_keywords"] = normalize_kw_list(shop_cfg.get("message_keywords"))
|
||||
if policy["action"] not in ("silent", "reply", "transfer"):
|
||||
policy["action"] = "silent"
|
||||
return policy
|
||||
|
||||
|
||||
def match_system_inquiry(client, data: dict, policy: Dict[str, Any]) -> bool:
|
||||
"""识别是否为系统客服询单消息。"""
|
||||
if not policy.get("enabled", False):
|
||||
return False
|
||||
|
||||
from_name = client.to_chinese(data.get("from_name", "") or "").lower()
|
||||
from_id = str(data.get("from_id", "") or "").lower()
|
||||
msg = client.to_chinese(data.get("msg", "") or "").lower()
|
||||
|
||||
sender_hits = 0
|
||||
for kw in policy.get("sender_keywords", []):
|
||||
if kw and (kw in from_name or kw in from_id):
|
||||
sender_hits += 1
|
||||
message_hits = 0
|
||||
for kw in policy.get("message_keywords", []):
|
||||
if kw and kw in msg:
|
||||
message_hits += 1
|
||||
|
||||
# 优先看发送者特征;纯文本命中时至少要求两个关键词,降低误判风险
|
||||
return sender_hits > 0 or message_hits >= 2
|
||||
|
||||
|
||||
async def handle_system_inquiry(client, data: dict) -> bool:
|
||||
"""命中系统询单后按策略处理。"""
|
||||
acc_id = data.get("acc_id", "")
|
||||
policy = resolve_system_inquiry_policy(client, acc_id)
|
||||
if not match_system_inquiry(client, data, policy):
|
||||
return False
|
||||
|
||||
customer_id = data.get("from_id", "")
|
||||
metrics_emit("system_inquiry_detected", customer_id=customer_id, acc_id=acc_id)
|
||||
action = policy.get("action", "silent")
|
||||
logger.info("系统询单命中 | 店铺:%s | 客户:%s | action:%s", acc_id, customer_id, action)
|
||||
|
||||
if action == "reply":
|
||||
reply = await client._compose_ai_scene_reply(
|
||||
original_msg=data,
|
||||
scene="system_inquiry_reply",
|
||||
intent_hint="这是系统客服询单消息,简短确认已收到并说明会跟进即可。",
|
||||
fallback=(policy.get("reply") or "您好,这边已收到询单消息,稍后由人工客服跟进处理。"),
|
||||
)
|
||||
await client.send_reply(data, reply)
|
||||
metrics_emit("system_inquiry_auto_reply", customer_id=customer_id, acc_id=acc_id)
|
||||
return True
|
||||
if action == "transfer":
|
||||
await client.transfer_to_human(data, "系统询单转人工")
|
||||
metrics_emit("system_inquiry_transfer", customer_id=customer_id, acc_id=acc_id)
|
||||
return True
|
||||
|
||||
metrics_emit("system_inquiry_ignored", customer_id=customer_id, acc_id=acc_id)
|
||||
return True
|
||||
83
core/websocket_transfer_flow.py
Normal file
83
core/websocket_transfer_flow.py
Normal file
@@ -0,0 +1,83 @@
|
||||
import logging
|
||||
|
||||
from utils.metrics_tracker import emit as metrics_emit
|
||||
|
||||
logger = logging.getLogger("cs_agent")
|
||||
|
||||
|
||||
async def transfer_to_human_flow(client, data: dict, transfer_msg: str = "", *, transfer_group_resolver=None):
|
||||
"""
|
||||
转接人工客服。
|
||||
1. 优先调用 dispatch 服务 GET /assign 一键派单
|
||||
2. 派单失败时,回退旧版 designer_roster 派单
|
||||
3. 无人在线或未配置时,回退到 config/transfer_groups.json
|
||||
设计师在线状态:仅在转人工时按需查询,不轮询。
|
||||
"""
|
||||
if not client.websocket:
|
||||
logger.info("[%s] 错误: 未连接到服务器", client.get_time())
|
||||
return
|
||||
|
||||
acc_id = data.get("acc_id", "")
|
||||
group_id = None
|
||||
assigned_to = ""
|
||||
dispatch_res = await client._dispatch_assign_once()
|
||||
if dispatch_res.get("success"):
|
||||
assigned_to = str(dispatch_res.get("assigned_to", "") or "").strip()
|
||||
logger.info(
|
||||
"一键派单成功 | task_id=%s | assigned_to=%s | online_count=%s",
|
||||
dispatch_res.get("task_id", ""),
|
||||
assigned_to or "未知",
|
||||
dispatch_res.get("online_count", 0),
|
||||
)
|
||||
metrics_emit(
|
||||
"dispatch_assign_success",
|
||||
acc_id=acc_id,
|
||||
assigned_to=assigned_to,
|
||||
online_count=dispatch_res.get("online_count", 0),
|
||||
)
|
||||
else:
|
||||
logger.warning("一键派单失败,回退旧派单逻辑: %s", dispatch_res.get("reason", "unknown"))
|
||||
metrics_emit("dispatch_assign_failed", acc_id=acc_id)
|
||||
|
||||
# 2. 派单失败时,回退旧版 designer_roster
|
||||
if not dispatch_res.get("success"):
|
||||
try:
|
||||
from utils.designer_roster import poll_and_update_roster
|
||||
from db.designer_roster_db import get_transfer_group_for_shop
|
||||
await poll_and_update_roster()
|
||||
group_id = get_transfer_group_for_shop(acc_id)
|
||||
except Exception as e:
|
||||
logger.debug("设计师派单未启用或异常: %s", e)
|
||||
|
||||
# 3. 无人在线时企微提醒(新旧两套都没拿到在线结果时)
|
||||
online_count = int(dispatch_res.get("online_count", 0) or 0)
|
||||
if online_count <= 0 and not group_id:
|
||||
try:
|
||||
from config.config import WECHAT_WEBHOOK
|
||||
if WECHAT_WEBHOOK:
|
||||
import httpx
|
||||
|
||||
async with httpx.AsyncClient(timeout=5) as c:
|
||||
resp = await c.post(WECHAT_WEBHOOK, json={
|
||||
"msgtype": "text",
|
||||
"text": {"content": "谁在线啊"},
|
||||
})
|
||||
if resp.status_code != 200:
|
||||
logger.warning("企微提醒发送失败: %s %s", resp.status_code, resp.text)
|
||||
else:
|
||||
logger.debug("未配置 WECHAT_WEBHOOK,跳过企微提醒")
|
||||
except Exception as e:
|
||||
logger.warning("企微提醒发送异常: %s", e)
|
||||
|
||||
# 4. 构造转接命令:有 assigned_to 用人名,否则回退分组
|
||||
if assigned_to:
|
||||
cmd = f"正在为你转接人工|[转移会话],{assigned_to},无原因"
|
||||
await client.send_reply(data, cmd)
|
||||
logger.info("[%s] 已发送转接请求 (店铺:%s -> 设计师:%s)", client.get_time(), acc_id or "未知", assigned_to)
|
||||
return
|
||||
|
||||
if not group_id:
|
||||
group_id = transfer_group_resolver(acc_id) if transfer_group_resolver else "20252916034"
|
||||
cmd = f"话术|[转移会话],分组{group_id},无原因"
|
||||
await client.send_reply(data, cmd)
|
||||
logger.info("[%s] 已发送转接请求 (店铺:%s -> 分组:%s)", client.get_time(), acc_id or "未知", group_id)
|
||||
64
core/websocket_workflow_flow.py
Normal file
64
core/websocket_workflow_flow.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import asyncio
|
||||
|
||||
|
||||
async def workflow_agent_notify_flow(client, customer_id: str, acc_id: str, acc_type: str, system_hint: str):
|
||||
"""图片处理完成后,让客服 AI 生成自然话术发给客户。"""
|
||||
if not client.enable_agent or not client.agent:
|
||||
return
|
||||
try:
|
||||
from core.pydantic_ai_agent import CustomerMessage
|
||||
|
||||
notify_msg = CustomerMessage(
|
||||
msg_id="workflow_notify",
|
||||
acc_id=acc_id,
|
||||
msg=system_hint,
|
||||
from_id=customer_id,
|
||||
from_name="",
|
||||
cy_id=customer_id,
|
||||
acc_type=acc_type,
|
||||
msg_type=0,
|
||||
cy_name="",
|
||||
)
|
||||
response = await client.agent.process_message(notify_msg)
|
||||
if response.should_reply and response.reply:
|
||||
nonsense_patterns = [
|
||||
"无需", "流程已完成", "不需要回复", "无需额外", "已完成",
|
||||
"无需回复", "不需要额外", "已经完成", "无需再", "操作已完成",
|
||||
"任务完成", "流程完成", "记录完成", "报价已",
|
||||
]
|
||||
if not any(p in response.reply for p in nonsense_patterns):
|
||||
fake_data = {
|
||||
"acc_id": acc_id,
|
||||
"from_id": customer_id,
|
||||
"from_name": "",
|
||||
"cy_id": customer_id,
|
||||
"acc_type": acc_type,
|
||||
}
|
||||
await asyncio.sleep(0.5)
|
||||
await client.send_reply(fake_data, response.reply)
|
||||
client.logger.info(f"[Workflow] AI 通知已发送: {response.reply}")
|
||||
except Exception as e:
|
||||
client.logger.error(f"[Workflow] AI 通知生成失败: {e}")
|
||||
|
||||
|
||||
async def workflow_send_flow(
|
||||
client,
|
||||
customer_id: str,
|
||||
acc_id: str,
|
||||
acc_type: str,
|
||||
content: str,
|
||||
msg_type: int = 0,
|
||||
):
|
||||
"""workflow 回调:图片AI完成后用此方法推送消息给客户。"""
|
||||
msg = {
|
||||
"msg_id": "",
|
||||
"acc_id": acc_id,
|
||||
"msg": content,
|
||||
"from_id": customer_id,
|
||||
"from_name": customer_id,
|
||||
"cy_id": customer_id,
|
||||
"acc_type": acc_type,
|
||||
"msg_type": msg_type,
|
||||
"cy_name": customer_id,
|
||||
}
|
||||
await client.send_message(msg)
|
||||
Reference in New Issue
Block a user