Compare commits

..

2 Commits

25 changed files with 2391 additions and 2429 deletions

View File

@@ -2,11 +2,28 @@
"shops": {
"tb2801080146": {
"type": "gemini_api",
"hint": "【店铺类型】Gemini API 账号。客户问账号/pro/续费/没pro时按API客服回复续费/充值/套餐说明。"
"hint": "【店铺类型】Gemini API 账号。客户问账号/pro/续费/没pro时按API客服回复续费/充值/套餐说明。",
"persona": "技术型客服,表达清晰,回答直接,少废话,优先给可执行步骤和结论。"
},
"小威哥1216": {
"type": "find_image",
"hint": "【店铺类型】找原图/修图。"
"hint": "【店铺类型】找原图/修图。",
"persona": "修图老店主语气,接地气,像微信聊天,先承接再推进成交,不端着。"
},
"小威哥1216:媚媚": {
"type": "find_image",
"hint": "【店铺类型】找原图/修图。",
"persona": "女性店主口吻,亲切利落,话短不硬,先确认需求再自然推进下单。"
},
"tb7518056865:小林": {
"type": "find_image",
"hint": "【店铺类型】找原图/修图。",
"persona": "效率型客服,回复简短干脆,先给结论再补一句说明,避免重复。"
},
"tb637530900564:小威威": {
"type": "find_image",
"hint": "【店铺类型】找原图/修图。",
"persona": "熟客店主口吻,轻松自然,像真人在店里接单聊天,避免模板句。"
}
},
"goods_keywords": {
@@ -23,5 +40,10 @@
"gemini_api": "【店铺类型】Gemini API 账号。客户问账号/pro/续费/没pro时按API客服回复续费/充值/套餐说明,自然回复。",
"find_image": "【店铺类型】找原图/修图。"
},
"_comment": "新增店铺:在 shops 加 acc_id。新增商品类型在 goods_keywords 加关键词→类型。"
"type_personas": {
"gemini_api": "技术支持型客服,回答准确直给,先结论后解释,避免口水话。",
"find_image": "淘宝修图店主语气,口语自然,有温度但不啰嗦,先承接再推进成交。"
},
"default_persona": "淘宝老店主,说话自然,像真人微信聊天,不官腔、不背模板。",
"_comment": "新增店铺:在 shops 加 acc_id。可选 persona 设置店铺人设;未设置则走 type_personas/default_persona。"
}

View File

@@ -25,6 +25,7 @@ def build_prompt(
state: Any,
extract_image_url: Callable[[str], str],
shop_type_resolver: Callable[[str, str], str],
shop_persona_resolver: Callable[[str, str], str],
parse_order_info: Callable[[str], dict[str, str]],
build_order_instruction: Callable[[str, str], str],
) -> str:
@@ -58,6 +59,7 @@ def build_prompt(
stage_info += f"\n【客户压价次数】{state.discount_count}"
shop_type = shop_type_resolver(message.acc_id or "", message.goods_name or "")
shop_persona = shop_persona_resolver(message.acc_id or "", message.goods_name or "")
shop_hint = ""
try:
from config.config import CONFIG_DIR
@@ -84,6 +86,8 @@ def build_prompt(
prompt += f"商品名称: {message.goods_name}\n"
if shop_hint:
prompt += f"\n{shop_hint}\n"
if shop_persona:
prompt += f"\n【店铺人设】{shop_persona}\n"
order_paid = False
order_unpaid = False

View File

@@ -234,6 +234,37 @@ def _get_shop_type(acc_id: str = "", goods_name: str = "") -> str:
return "find_image"
def _get_shop_persona(acc_id: str = "", goods_name: str = "") -> str:
"""按店铺返回人设描述优先级shops.persona > type_personas > default_persona。"""
default_persona = "淘宝老店主,说话自然,像真人微信聊天,不官腔、不背模板。"
try:
from config.config import CONFIG_DIR
import json
cfg_path = CONFIG_DIR / "shop_prompts.json"
if not cfg_path.exists():
return default_persona
with open(cfg_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
shops = cfg.get("shops", {})
if acc_id and acc_id in shops:
persona = str(shops[acc_id].get("persona", "")).strip()
if persona:
return persona
shop_type = _get_shop_type(acc_id, goods_name)
type_personas = cfg.get("type_personas", {})
persona = str(type_personas.get(shop_type, "")).strip()
if persona:
return persona
cfg_default = str(cfg.get("default_persona", "")).strip()
return cfg_default or default_persona
except Exception:
return default_persona
def load_skill_map(skills_dir: str = "skills") -> Dict[str, str]:
"""按技能目录名加载 SKILL.md返回 {skill_name: content}。"""
skill_map: Dict[str, str] = {}
@@ -504,9 +535,11 @@ class CustomerServiceAgent:
)
history = self.message_histories.get(message.from_id, [])
pending_req = "".join((state.pending_requirements or [])[-4:]) or ""
persona = _get_shop_persona(message.acc_id or "", message.goods_name or "")
user_prompt = (
"请按下面意图生成给客户的自然回复。\n"
f"场景: {scene}\n"
f"店铺人设: {persona}\n"
f"回复意图: {intent_hint}\n"
f"客户原话: {message.msg}\n"
f"当前已收图片数: {len(state.pending_image_urls)}\n"
@@ -551,9 +584,11 @@ class CustomerServiceAgent:
)
history = self.message_histories.get(message.from_id, [])
pending_req = "".join((state.pending_requirements or [])[-4:]) or ""
persona = _get_shop_persona(message.acc_id or "", message.goods_name or "")
prompt = (
"请把下面这句客服回复润色成更自然的微信聊天口吻,语义必须保持一致。\n"
f"场景: {scene}\n"
f"店铺人设: {persona}\n"
f"客户原话: {message.msg}\n"
f"当前已收图: {len(state.pending_image_urls)}\n"
f"当前需求摘要: {pending_req}\n"
@@ -650,6 +685,7 @@ class CustomerServiceAgent:
_is_political_inquiry = staticmethod(is_political_inquiry)
_is_map_inquiry = staticmethod(is_map_inquiry)
_get_shop_type = staticmethod(_get_shop_type)
_get_shop_persona = staticmethod(_get_shop_persona)
_notify_wechat = staticmethod(_notify_wechat)
_notify_wechat_overdue = staticmethod(_notify_wechat_overdue)
@@ -989,6 +1025,7 @@ class CustomerServiceAgent:
state=state,
extract_image_url=self._extract_image_url,
shop_type_resolver=_get_shop_type,
shop_persona_resolver=_get_shop_persona,
parse_order_info=parse_order_info,
build_order_instruction=build_order_instruction,
)

View File

@@ -0,0 +1,53 @@
import logging
from utils.observability import build_trace_id
from core.websocket_brain_flow import decide_brain_action, execute_brain_action
logger = logging.getLogger("cs_agent")
async def handle_agent_reply_flow(client, data: dict, *, workflow, shop_type_resolver):
"""处理单条消息:统一走 Brain 决策 + 执行。"""
try:
msg_text = client.to_chinese(data.get("msg", ""))
customer_id = data.get("from_id", "")
trace_id = build_trace_id(data.get("acc_id", ""), customer_id, data.get("msg_id", ""), msg_text[:64])
data["_trace_id"] = trace_id
shop_type = shop_type_resolver(data.get("acc_id", ""), client.to_chinese(data.get("goods_name", "") or ""))
customer_msg = client._build_customer_message(data)
decision = await decide_brain_action(
client,
data,
customer_msg,
trace_id=trace_id,
msg_text=msg_text,
shop_type=shop_type,
)
client._activity_log(
"brain_decision",
trace_id=trace_id,
acc_id=data.get("acc_id", ""),
customer_id=data.get("from_id", ""),
action=decision.action,
source=decision.source,
should_reply=bool(decision.should_reply),
need_transfer=bool(decision.need_transfer),
)
await execute_brain_action(
client,
data,
decision=decision,
trace_id=trace_id,
msg_text=msg_text,
)
except Exception as e:
logger.error("Agent 处理失败: %s", e)
client._activity_log(
"agent_process_error",
trace_id=data.get("_trace_id", ""),
acc_id=data.get("acc_id", ""),
customer_id=data.get("from_id", ""),
error=str(e),
)

View File

@@ -0,0 +1,100 @@
import asyncio
import os
from typing import Any
def cancel_auto_quote_task(client, key: str, reason: str = ""):
task = client._auto_quote_tasks.get(key)
if task and not task.done():
task.cancel()
client._activity_log("auto_quote_cancel", key=key, reason=reason or "unknown")
def build_auto_quote_signature(state: Any) -> str:
"""为待报价内容生成稳定签名,用于避免同一批内容反复自动触发。"""
urls = list(getattr(state, "pending_image_urls", []) or [])
reqs = list(getattr(state, "pending_requirements", []) or [])
req_tail = reqs[-6:] if len(reqs) > 6 else reqs
return "||".join(urls) + "##" + "||".join(req_tail)
async def schedule_auto_quote(client, data: dict, *, shop_type_resolver):
"""
智能兜底:客户发图后若长时间不再补充消息,自动触发一次报价,避免会话卡住。
"""
if not client.enable_agent or not client.agent:
return
try:
shop_type = shop_type_resolver(data.get('acc_id', ''), client.to_chinese(data.get('goods_name', '') or ''))
if shop_type != "find_image":
return
cid = data.get('from_id', '')
key = client._customer_key(data)
state = client.agent._get_conversation_state(cid)
if not state or not getattr(state, "pending_image_urls", None):
cancel_auto_quote_task(client, key, reason="no_pending_images")
client._auto_quote_done_sig.pop(key, None)
return
if state.quote_phase not in {"collecting", "waiting_result"}:
return
current_sig = build_auto_quote_signature(state)
if current_sig and client._auto_quote_done_sig.get(key) == current_sig:
client._activity_log(
"auto_quote_skip_duplicate",
key=key,
pending_count=len(state.pending_image_urls),
)
return
try:
idle_seconds = max(8, int(os.getenv("AUTO_QUOTE_IDLE_SECONDS", "18")))
except Exception:
idle_seconds = 18
cancel_auto_quote_task(client, key, reason="reschedule")
async def _delayed_auto_quote(capture_key: str, capture_data: dict, wait_s: int, capture_sig: str):
await asyncio.sleep(wait_s)
async with client._get_customer_lock(capture_key):
capture_cid = capture_data.get('from_id', '')
st = client.agent._get_conversation_state(capture_cid)
if not st or not st.pending_image_urls:
client._auto_quote_done_sig.pop(capture_key, None)
return
# 内容变化时,放弃旧触发(会在新一轮消息后重新调度)。
if build_auto_quote_signature(st) != capture_sig:
return
# 标记本批次已自动触发,避免同内容循环“马上报价”。
client._auto_quote_done_sig[capture_key] = capture_sig
# 直接置为可报价,走内部自动报价入口(不伪造客户语句)。
client.agent._mark_quote_ready(st)
client.agent._sync_pending_quote_state(capture_cid, st)
client._activity_log(
"auto_quote_trigger",
key=capture_key,
pending_count=len(st.pending_image_urls),
wait_s=wait_s,
)
notify_data = dict(capture_data)
notify_data["msg_id"] = "auto_quote_idle_trigger"
notify_data["msg"] = "__AUTO_QUOTE_INTERNAL_TRIGGER__"
notify_msg = client._build_customer_message(notify_data)
response = await client.agent.build_auto_quote_reply(st, notify_msg)
if response.should_reply and response.reply and not response.need_transfer:
await client.send_reply(capture_data, response.reply)
client._activity_log(
"auto_quote_sent",
key=capture_key,
reply=response.reply,
)
task = asyncio.create_task(_delayed_auto_quote(key, dict(data), idle_seconds, current_sig))
client._auto_quote_tasks[key] = task
client._activity_log(
"auto_quote_scheduled",
key=key,
pending_count=len(state.pending_image_urls),
phase=state.quote_phase,
wait_s=idle_seconds,
)
except Exception as e:
client._activity_log("auto_quote_schedule_error", error=str(e), key=client._customer_key(data))

View File

@@ -0,0 +1,311 @@
from __future__ import annotations
import asyncio
import json
import logging
import re
from dataclasses import dataclass
from typing import Any
logger = logging.getLogger("cs_agent")
@dataclass
class BrainDecision:
action: str # reply | quote | transfer | noop
source: str
reply: str = ""
transfer_msg: str = ""
should_reply: bool = False
need_transfer: bool = False
payload: dict[str, Any] | None = None
def _extract_json_obj(text: str) -> dict[str, Any] | None:
if not text:
return None
m = re.search(r"\{[\s\S]*\}", text)
if not m:
return None
try:
return json.loads(m.group(0))
except Exception:
return None
async def _ai_policy_brain_decide(client, data: dict, *, msg_text: str, shop_type: str) -> BrainDecision | None:
if not client.enable_agent or not client.agent or not client.AgentDeps:
return None
acc_id = str(data.get("acc_id", "") or "")
customer_id = str(data.get("from_id", "") or "")
current_urls = client._extract_image_urls(msg_text)
recent_urls = client._collect_recent_image_urls(customer_id, acc_id, max_count=6)
key = client._customer_key(data)
pending_urls = client._pending_images.get(key) or []
try:
order_status = client._detect_order_status(msg_text)
has_image_url = client._msg_has_image_url(msg_text)
refers_images = client._msg_refers_images(msg_text)
is_price = client._msg_is_price_inquiry(msg_text)
is_req = client._msg_is_requirement(msg_text)
ext_contact = client._msg_requests_external_contact(msg_text)
except Exception:
order_status, has_image_url, refers_images, is_price, is_req, ext_contact = "", False, False, False, False, False
deps = client.AgentDeps(
msg_id=str(data.get("msg_id", "") or "brain_policy"),
acc_id=acc_id,
from_id=customer_id,
platform=str(data.get("acc_type", "") or "AliWorkbench"),
)
prompt = (
"你是淘宝客服系统的主决策Brain只做决策不要解释。\n"
"你必须根据历史规则和当前上下文,输出唯一动作。\n"
"可选动作 action: reply / quote / transfer / noop。\n"
"历史规则(完整继承):\n"
"1) 客户发图/补图:先自然承接,再根据上下文决定继续收集或报价;\n"
"2) 客户询价且有可用图片(当前或最近)时,优先 action=quote\n"
"3) 若有 pending 图片且客户催报价/补充需求,优先 quote_mode=flush_pending\n"
"4) 仅打招呼/短无意义文本:可 action=reply 简短承接,不要机械模板;\n"
"5) 索要外部联系方式(微信/QQ/手机号)时,不外呼,站内引导;\n"
"6) 订单已付款:可回执安排处理;未付款/待付款:提醒完成付款;\n"
"7) 地图/政治/高风险内容:谨慎,必要时 transfer 或拒绝性 reply\n"
"8) 尺寸超限/不可做场景:给明确边界,不要胡乱承诺;\n"
"9) 客户没发图却问价:先承接,再引导发图;\n"
"10) 避免重复外发,避免同一句话反复说。\n"
"\n"
"quote_mode 可选: flush_pending / analyze_current_or_recent / collect_only\n"
"只输出 JSON\n"
'{"action":"reply|quote|transfer|noop","reply":"","transfer_msg":"","quote_mode":"","reason":""}\n\n'
f"店铺类型: {shop_type}\n"
f"legacy_fast_quote_enabled: {str(bool(client._legacy_fast_quote_enabled)).lower()}\n"
f"客户原话: {msg_text}\n"
f"has_image_url: {has_image_url}\n"
f"current_image_urls_count: {len(current_urls)}\n"
f"recent_image_urls_count: {len(recent_urls)}\n"
f"pending_image_urls_count: {len(pending_urls)}\n"
f"refers_images: {refers_images}\n"
f"is_price_inquiry: {is_price}\n"
f"is_requirement: {is_req}\n"
f"requests_external_contact: {ext_contact}\n"
f"order_status: {order_status or 'none'}\n"
)
try:
result = await client.agent.agent_natural_reply.run(prompt, deps=deps, message_history=[])
raw = str(getattr(result, "output", "") or "").strip()
obj = _extract_json_obj(raw)
if not obj:
client._activity_log(
"brain_policy_parse_error",
acc_id=acc_id,
customer_id=customer_id,
raw=raw[:300],
)
return None
action = str(obj.get("action", "") or "").strip().lower()
reply = str(obj.get("reply", "") or "").strip()
transfer_msg = str(obj.get("transfer_msg", "") or "").strip()
quote_mode = str(obj.get("quote_mode", "") or "").strip().lower()
reason = str(obj.get("reason", "") or "").strip()
payload: dict[str, Any] | None = None
if action == "quote":
mode = quote_mode or "analyze_current_or_recent"
if mode == "flush_pending":
payload = {"mode": "flush_pending", "key": key, "pre_reply": reply}
elif mode == "collect_only":
payload = {"mode": "collect_only", "pre_reply": reply}
else:
urls = current_urls or recent_urls
payload = {"mode": "analyze_urls", "urls": urls, "pre_reply": reply}
decision = BrainDecision(
action=action if action in {"reply", "quote", "transfer", "noop"} else "noop",
source="brain_ai_policy",
reply=reply,
transfer_msg=transfer_msg,
should_reply=bool(reply),
need_transfer=(action == "transfer"),
payload=payload,
)
client._activity_log(
"brain_policy_raw",
acc_id=acc_id,
customer_id=customer_id,
action=decision.action,
quote_mode=quote_mode,
reason=reason,
)
return decision
except Exception as e:
client._activity_log(
"brain_policy_error",
acc_id=acc_id,
customer_id=customer_id,
error=str(e),
)
return None
async def decide_brain_action(client, data: dict, customer_msg, *, trace_id: str, msg_text: str, shop_type: str) -> BrainDecision:
"""统一主决策层:优先由 Brain AI 决策;失败时回退 Agent 默认决策。"""
ai_decision = await _ai_policy_brain_decide(client, data, msg_text=msg_text, shop_type=shop_type)
if ai_decision is not None:
return ai_decision
# 回退:保持可用性
logger.info("Agent 正在处理消息...")
client._activity_log(
"agent_process_start",
trace_id=trace_id,
acc_id=data.get("acc_id", ""),
customer_id=data.get("from_id", ""),
msg=msg_text,
)
response = await client.agent.process_message(customer_msg)
client._activity_log(
"agent_process_done",
trace_id=trace_id,
acc_id=data.get("acc_id", ""),
customer_id=data.get("from_id", ""),
result="ok",
should_reply=bool(response.should_reply),
need_transfer=bool(response.need_transfer),
)
if response.need_transfer:
return BrainDecision(
action="transfer",
source="fallback_agent",
reply=response.reply or "",
transfer_msg=response.transfer_msg or "",
should_reply=bool(response.should_reply),
need_transfer=True,
)
if response.should_reply and response.reply:
return BrainDecision(
action="reply",
source="fallback_agent",
reply=response.reply,
should_reply=True,
need_transfer=False,
)
return BrainDecision(action="noop", source="fallback_agent", should_reply=False, need_transfer=False)
async def execute_brain_action(client, data: dict, *, decision: BrainDecision, trace_id: str, msg_text: str):
"""统一执行层:只执行标准动作。"""
customer_id = data.get("from_id", "")
if customer_id:
client._touch_customer_last_contact(customer_id)
if decision.action == "transfer":
logger.info("Agent 决定转接人工")
client._activity_log(
"agent_transfer",
trace_id=trace_id,
acc_id=data.get("acc_id", ""),
customer_id=data.get("from_id", ""),
transfer_msg=decision.transfer_msg,
)
client._fire_and_forget(
client._post_tianwang_callback(
"message_processed",
data,
extra={
"should_reply": bool(decision.should_reply),
"need_transfer": True,
"agent_reply": decision.reply or "",
"transfer_msg": decision.transfer_msg or "",
},
)
)
await client.transfer_to_human(data, decision.transfer_msg)
client._push_chat_to_wechat_safe(
data=data,
customer_msg=msg_text,
reply_msg=decision.transfer_msg or "转接",
tag="转人工",
)
return
if decision.action == "reply":
text = (decision.reply or "").strip()
if not text:
return
await asyncio.sleep(0.6)
client._activity_log(
"agent_reply",
trace_id=trace_id,
acc_id=data.get("acc_id", ""),
customer_id=data.get("from_id", ""),
reply=text,
)
await client.send_reply(data, text)
await client._maybe_schedule_auto_quote(data)
client._fire_and_forget(
client._post_tianwang_callback(
"message_processed",
data,
extra={
"should_reply": True,
"need_transfer": False,
"agent_reply": text,
},
)
)
client._push_chat_to_wechat_safe(
data=data,
customer_msg=msg_text,
reply_msg=text,
tag="正常AI回复",
)
return
if decision.action == "quote":
payload = decision.payload or {}
pre_reply = str(payload.get("pre_reply", "") or "").strip()
if pre_reply:
await client.send_reply(data, pre_reply)
mode = str(payload.get("mode", "") or "")
if mode == "flush_pending":
key = str(payload.get("key", "") or "")
if key:
await client._flush_pending_images(key, data)
elif mode == "analyze_urls":
urls = payload.get("urls") or []
if isinstance(urls, list) and urls:
if len(urls) == 1:
asyncio.create_task(client._analyze_single_and_reply(data, urls[0]))
else:
asyncio.create_task(client._analyze_multi_and_reply(data, urls))
else:
await client.send_reply(data, "你把要处理的图再发我一下,我马上给你看。")
else:
if not pre_reply:
await client.send_reply(data, "收到,我先看一下哈,稍等哈。")
return
# noop
client._activity_log(
"agent_no_reply",
trace_id=trace_id,
acc_id=data.get("acc_id", ""),
customer_id=data.get("from_id", ""),
)
client._fire_and_forget(
client._post_tianwang_callback(
"message_processed",
data,
extra={
"should_reply": False,
"need_transfer": False,
"agent_reply": "",
},
)
)

View File

@@ -0,0 +1,48 @@
import os
from datetime import datetime
from typing import Any, Dict, Optional
async def post_tianwang_callback_flow(client, event: str, data: dict, extra: Optional[Dict[str, Any]] = None):
"""将消息处理事件回调给天网。"""
if not client._tianwang_callback_url:
return
try:
import httpx
trust_env = os.getenv("TIANWANG_CALLBACK_TRUST_ENV", "false").lower() in ("1", "true", "yes")
payload = {
"event": event,
"timestamp": datetime.now().isoformat(),
"agent_name": client._tianwang_agent_name,
"acc_id": str(data.get("acc_id", "") or ""),
"customer_id": str(data.get("from_id", "") or ""),
"customer_name": client.to_chinese(data.get("from_name", "") or data.get("cy_name", "")),
"msg_id": str(data.get("msg_id", "") or ""),
"msg_type": int(data.get("msg_type", 0) or 0),
"msg": client.to_chinese(data.get("msg", "") or ""),
"goods_name": client.to_chinese(data.get("goods_name", "") or ""),
"goods_order": client.to_chinese(data.get("goods_order", "") or ""),
}
if extra:
payload.update(extra)
async with httpx.AsyncClient(timeout=6, trust_env=trust_env) as http_client:
resp = await http_client.post(client._tianwang_callback_url, json=payload)
ok = 200 <= resp.status_code < 300
client._activity_log(
"tianwang_callback",
result="ok" if ok else "http_error",
event_name=event,
status_code=resp.status_code,
acc_id=payload["acc_id"],
customer_id=payload["customer_id"],
)
except Exception as e:
client._activity_log(
"tianwang_callback",
result="error",
event_name=event,
acc_id=str(data.get("acc_id", "") or ""),
customer_id=str(data.get("from_id", "") or ""),
error=str(e),
)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,57 @@
import asyncio
import websockets
async def connect_flow(client):
"""连接 WebSocket 服务器并自动重连。"""
while client.running:
try:
client.logger.info(f"[{client.get_time()}] 正在连接轻简API {client.uri}...")
async with websockets.connect(client.uri) as websocket:
client.websocket = websocket
from utils.health_check import set_qingjian_connected
set_qingjian_connected(True)
client.logger.info(f"[{client.get_time()}] 连接成功!")
if client.enable_agent:
client.logger.info(f"[{client.get_time()}] AI Agent 已启用,将自动处理消息")
client.logger.info(f"[{client.get_time()}] 等待接收消息...")
await client.receive_messages()
except ConnectionRefusedError:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
client.logger.info(f"[{client.get_time()}] 连接被拒绝,请检查轻简软件是否已启动")
except websockets.exceptions.InvalidURI:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
client.logger.info(f"[{client.get_time()}] URI格式错误")
except Exception as e:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
client.logger.info(f"[{client.get_time()}] 连接错误: {e}")
if client.running:
client.logger.info(f"[{client.get_time()}] 5秒后尝试重连...")
await asyncio.sleep(5)
async def receive_messages_flow(client):
"""持续接收消息。"""
try:
async for message in client.websocket:
await client.handle_message(message)
except websockets.exceptions.ConnectionClosed:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
client.logger.info(f"[{client.get_time()}] 连接已关闭")
except Exception as e:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
client.logger.info(f"[{client.get_time()}] 接收消息错误: {e}")
async def handle_message_flow(client, message, *, shop_type_resolver):
from core.websocket_inbound_flow import handle_incoming_message
await handle_incoming_message(client, message, shop_type_resolver=shop_type_resolver)

View File

@@ -0,0 +1,45 @@
import re
from datetime import datetime
def extract_and_save_customer_info_flow(client, message: str, customer_id: str, db):
"""从消息中提取客户信息并保存。"""
if not message or not customer_id:
return
email_pattern = r"[\w\.-]+@[\w\.-]+\.\w+"
email_match = re.search(email_pattern, message)
if email_match:
db.update_email(customer_id, email_match.group())
phone_pattern = r"1[3-9]\d{9}"
phone_match = re.search(phone_pattern, message)
if phone_match:
db.update_phone(customer_id, phone_match.group())
wechat_pattern = r"[Vv微信]+号[:]?\s*([\w-]+)"
wechat_match = re.search(wechat_pattern, message)
if wechat_match:
db.update_wechat(customer_id, wechat_match.group(1))
budget_keywords = ["预算", "不超过", "最多", "便宜点", "便宜"]
for keyword in budget_keywords:
if keyword in message:
db.add_personality_tag(customer_id, "关注价格")
break
personality_keywords = {
"爽快": "爽快",
"干脆": "爽快",
"纠结": "纠结",
"墨迹": "纠结",
"砍价": "砍价",
"": "砍价",
}
for keyword, tag in personality_keywords.items():
if keyword in message:
db.add_personality_tag(customer_id, tag)
profile = db.get_customer(customer_id)
profile.last_contact = datetime.now().isoformat()
db.save_customer(profile)

View File

@@ -0,0 +1,265 @@
import asyncio
import logging
import re
import secrets
logger = logging.getLogger("cs_agent")
async def debounce_agent_reply(client, data: dict):
"""
消息防抖:同一客户在 _DEBOUNCE_SECONDS 内的连续消息合并后再处理。
订单通知、付款相关消息不走防抖,立即处理。
"""
msg_body = data.get("msg", "")
key = f"{data.get('acc_id','')}:{data.get('from_id','')}"
client._cancel_auto_quote_task(key, reason="new_inbound")
# 以下情况跳过防抖,立即处理(后台执行,不阻塞接收循环)
immediate_keywords = ["买家已付款", "已付款", "[系统订单信息]"]
if any(kw in msg_body for kw in immediate_keywords):
client._activity_log(
"debounce_bypass_immediate",
acc_id=data.get("acc_id", ""),
customer_id=data.get("from_id", ""),
reason="payment_or_order",
msg=msg_body,
)
client._fire_and_forget(client._agent_reply_serialized(data))
return
# 积攒消息
if key not in client._pending_msgs:
client._pending_msgs[key] = []
client._pending_msgs[key].append(msg_body)
client._activity_log(
"debounce_enqueue",
key=key,
queue_size=len(client._pending_msgs[key]),
msg=msg_body,
)
# 取消上一个等待任务(如果有)
old_task = client._debounce_tasks.get(key)
if old_task and not old_task.done():
old_task.cancel()
debounce_seconds = pick_debounce_seconds(client, data, msg_body)
# 创建新的延迟处理任务
async def _delayed(capture_key, capture_data, wait_s: float):
await asyncio.sleep(wait_s)
msgs = client._pending_msgs.pop(capture_key, [])
if not msgs:
return
if len(msgs) == 1:
merged_msg = msgs[0]
else:
merged_msg = "".join(m for m in msgs if m.strip())
logger.info(f"[{client.get_time()}] 防抖合并 {len(msgs)} 条消息: {merged_msg[:60]}")
client._activity_log(
"debounce_flush",
key=capture_key,
merged_count=len(msgs),
merged_msg=merged_msg,
)
merged_data = dict(capture_data)
merged_data["msg"] = merged_msg
await client._agent_reply_serialized(merged_data)
task = asyncio.create_task(_delayed(key, data, debounce_seconds))
client._debounce_tasks[key] = task
def rand_between(low: float, high: float) -> float:
if high <= low:
return float(low)
# 使用 secrets 增强随机性,避免固定周期导致机械感
span = high - low
return round(low + span * (secrets.randbelow(1000) / 1000.0), 2)
def guess_intent_for_debounce(client, msg: str) -> str:
text = (msg or "").strip()
if not text:
return "unknown"
if msg_has_image_url(text):
return "image"
try:
from utils.intent_analyzer import detect_intent
decision = detect_intent(text)
intent = decision.intent
if intent:
client._activity_log(
"debounce_intent_detected",
intent=intent,
source=decision.source,
score=round(float(decision.score or 0.0), 4),
msg=text[:120],
)
except Exception:
intent = ""
if intent:
return intent
lower = text.lower()
if any(k in lower for k in ["报价", "多少钱", "价格", "", "优惠", "收费", "怎么收费", "咋收费"]):
return "询价"
if any(k in lower for k in ["做一下", "改一下", "需求", "门头", "上面的字", "处理"]):
return "修改"
if any(k in lower for k in ["在吗", "你好", "有人"]):
return "打招呼"
return "unknown"
def looks_like_requirement_text(msg: str) -> bool:
text = (msg or "").strip().lower()
if not text:
return False
req_kw = (
"做一下",
"改一下",
"处理一下",
"这个字",
"上面的字",
"门头",
"去背景",
"抠图",
"换色",
"调色",
"清晰",
"高清",
"尺寸",
"比例",
"横版",
"竖版",
"排版",
"改字",
"按这个做",
"照这个做",
"就这张",
"看看做",
"弄一下",
)
return any(k in text for k in req_kw)
def pick_debounce_seconds(client, data: dict, msg: str) -> float:
"""意图驱动防抖:不同意图不同等待区间,并引入轻微随机。"""
base = max(1.0, float(client._DEBOUNCE_SECONDS))
if not client._adaptive_debounce_enabled:
return base
intent = guess_intent_for_debounce(client, msg)
is_req = looks_like_requirement_text(msg)
has_img = msg_has_image_url(msg)
# 区间策略:越明确、越短消息,等待越短;需求描述类稍长
if intent == "打招呼":
low, high = 1.0, min(3.0, base)
elif intent in ("询价", "砍价"):
# 询价先略等一会,给客户补发图片/需求的窗口,减少机械两连回
low, high = 4.0, min(7.0, max(base, 7.0))
elif intent == "image":
# 文本里直接贴图链接:短等合并上下文,避免和上一条询价并发
low, high = 2.2, 4.2
elif intent in ("修改", "批量"):
low, high = max(3.0, base * 0.65), min(18.0, base + 2.0)
elif intent == "转接":
low, high = 1.0, 2.5
else:
low, high = max(2.0, base * 0.5), base
# 发图后的需求描述,优先“多等一点”收集完整需求,减少半句回复
# 约束到 12-14s避免等待过长。
if is_req and not has_img:
low = max(low, 12.0)
high = min(14.0, max(high, 12.6))
# 短句更快,长句稍慢,避免把连续半句拆开
text_len = len((msg or "").strip())
if text_len <= 4:
high = min(high, max(low + 0.2, 2.5))
elif text_len >= 18:
low = min(high, low + 0.6)
wait_s = rand_between(low, high)
logger.info(f"防抖等待 {wait_s}s | intent={intent} | len={text_len}")
return wait_s
def msg_has_image_url(msg: str) -> bool:
"""判断文本消息里是否包含图片URL客户粘贴了图片链接可能带前缀文字如 有吗#*#https://..."""
if not msg:
return False
lower = msg.lower()
image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp")
image_hosts = ("alicdn.com", "imgextra", "taobao.com", "jd.com", "pinduoduo.com")
if "http://" in lower or "https://" in lower:
if any(ext in lower for ext in image_exts) or any(h in lower for h in image_hosts):
return True
return False
def msg_refers_images(msg: str) -> bool:
"""判断文本是否指代之前的图片(图一/图二/这张/那张/上面那张等)"""
if not msg:
return False
refs = (
"图一",
"图二",
"第一张",
"第二张",
"这张",
"那张",
"这图",
"那个图",
"这个",
"这个呢",
"上面那张",
"下面那张",
"刚才那张",
"上一张",
"下一张",
)
return any(r in msg for r in refs)
def extract_image_urls(msg: str) -> list:
if not msg:
return []
parts = [p.strip() for p in msg.split("#*#") if p.strip()]
urls = []
for p in parts:
if p.startswith("http://") or p.startswith("https://"):
urls.append(p)
if not urls and ("http://" in msg or "https://" in msg):
tokens = re.findall(r"(https?://\S+)", msg)
for t in tokens:
if any(ext in t.lower() for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"]):
urls.append(t)
return urls[:8]
def collect_recent_image_urls(client, customer_id: str, acc_id: str, max_count: int = 6) -> list:
"""从最近对话中回溯收集图片URL优先买家消息用于慢发或引用图片的场景"""
urls, seen = [], set()
try:
from db.chat_log_db import get_recent_conversation
recent = get_recent_conversation(customer_id=customer_id, acc_id=acc_id, limit=20)
# 从最近到更早遍历,收集买家(in)消息中的图片链接
for item in reversed(recent):
if item.get("direction") != "in":
continue
message = item.get("message") or ""
found = extract_image_urls(message)
for u in found:
if u not in seen:
seen.add(u)
urls.append(u)
if len(urls) >= max_count:
return urls
except Exception:
logger.debug("收集近期图片URL失败", exc_info=True)
return urls

View File

@@ -0,0 +1,36 @@
import os
async def dispatch_assign_once_flow(client):
"""
调用新的一键派单接口:
GET {DISPATCH_BASE_URL}/assign
Header: X-API-Key
"""
base_url = os.getenv("DISPATCH_BASE_URL", "http://1.12.50.92:8006").strip().rstrip("/")
api_key = os.getenv("DISPATCH_API_KEY", "tuhui_dispatch_key_2026").strip()
timeout_s = float(os.getenv("DISPATCH_TIMEOUT_SECONDS", "5"))
if not base_url or not api_key:
return {"success": False, "reason": "dispatch config missing"}
try:
import httpx
async with httpx.AsyncClient(timeout=timeout_s) as http_client:
resp = await http_client.get(
f"{base_url}/assign",
headers={"X-API-Key": api_key},
)
if resp.status_code != 200:
return {"success": False, "reason": f"http {resp.status_code}"}
data = resp.json() if resp.content else {}
ok = bool((data or {}).get("success", False))
return {
"success": ok,
"task_id": str((data or {}).get("task_id", "") or ""),
"assigned_to": str((data or {}).get("assigned_to", "") or ""),
"online_count": int((data or {}).get("online_count", 0) or 0),
"notification_sent": bool((data or {}).get("notification_sent", False)),
"raw": data,
}
except Exception as e:
return {"success": False, "reason": str(e)}

View File

@@ -0,0 +1,181 @@
import asyncio
import os
import time
from datetime import datetime, timedelta
import logging
logger = logging.getLogger("cs_agent")
async def unreplied_followup_loop(client):
"""定时补偿:对“最后一条是客户消息且长时间未回复”的会话,补发一次自然跟进。"""
if not client.enable_agent or not client.agent:
return
while client.running:
try:
await asyncio.sleep(max(30, int(os.getenv("UNREPLIED_FOLLOWUP_SCAN_SECONDS", "90"))))
await scan_and_send_unreplied_followups(client)
except asyncio.CancelledError:
break
except Exception as e:
client._activity_log("unreplied_followup_loop_error", error=str(e))
async def scan_and_send_unreplied_followups(client):
from db import chat_log_db as cdb
try:
idle_minutes = max(5, int(os.getenv("UNREPLIED_FOLLOWUP_IDLE_MINUTES", "12")))
max_age_minutes = max(idle_minutes, int(os.getenv("UNREPLIED_FOLLOWUP_MAX_AGE_MINUTES", "180")))
followup_cd = max(300, int(os.getenv("UNREPLIED_FOLLOWUP_COOLDOWN_SECONDS", "3600")))
limit = max(10, int(os.getenv("UNREPLIED_FOLLOWUP_LIMIT", "40")))
except Exception:
idle_minutes, max_age_minutes, followup_cd, limit = 12, 180, 3600, 40
now = datetime.now()
window_start = (now - timedelta(minutes=max_age_minutes)).strftime("%Y-%m-%d %H:%M:%S")
conn = None
try:
conn = cdb._get_conn()
rows = conn.execute(
cdb._sql(
"""
SELECT acc_id, customer_id, MAX(id) AS last_id
FROM chat_logs
WHERE timestamp >= ?
GROUP BY acc_id, customer_id
ORDER BY MAX(id) DESC
LIMIT ?
"""
),
(window_start, limit * 6),
).fetchall()
sessions = [dict(r) for r in rows]
sent = 0
for s in sessions:
if sent >= limit:
break
acc_id = str(s.get("acc_id", "") or "")
cid = str(s.get("customer_id", "") or "")
if not acc_id or not cid:
continue
ckey = f"{acc_id}:{cid}"
if not client._is_owned_by_this_worker(ckey):
continue
last = conn.execute(
cdb._sql(
"""
SELECT id, direction, message, timestamp, customer_name, acc_id, platform
FROM chat_logs
WHERE acc_id = ? AND customer_id = ?
ORDER BY id DESC
LIMIT 1
"""
),
(acc_id, cid),
).fetchone()
if not last:
continue
last = dict(last)
if str(last.get("direction", "")) != "in":
continue
last_ts = last.get("timestamp")
if isinstance(last_ts, datetime):
last_dt = last_ts
else:
last_dt = datetime.strptime(str(last_ts)[:19], "%Y-%m-%d %H:%M:%S")
idle_s = (now - last_dt).total_seconds()
if idle_s < idle_minutes * 60 or idle_s > max_age_minutes * 60:
continue
now_mono = time.monotonic()
if (now_mono - client._unreplied_followup_sent.get(ckey, 0.0)) < followup_cd:
continue
last_msg = str(last.get("message", "") or "").strip().lower()
if last_msg in {"好的", "", "ok", "收到", "", ""}:
continue
followup = await compose_ai_scene_reply(
client,
original_msg={
"acc_id": acc_id,
"from_id": cid,
"from_name": client.to_chinese(last.get("customer_name", "") or cid),
"acc_type": str(last.get("platform", "") or "AliWorkbench"),
"msg": str(last.get("message", "") or ""),
},
scene="unreplied_followup",
intent_hint="客户上一条消息还没接上,先自然承接并请对方补一句当前要处理的图或要求。",
fallback="刚看到你消息了,我在的。你把要处理的图或要求再发我一下,我马上接着看。",
)
fake = {
"acc_id": acc_id,
"from_id": cid,
"from_name": client.to_chinese(last.get("customer_name", "") or cid),
"cy_id": cid,
"cy_name": client.to_chinese(last.get("customer_name", "") or cid),
"acc_type": str(last.get("platform", "") or "AliWorkbench"),
"msg": str(last.get("message", "") or ""),
"msg_type": 0,
}
await client.send_reply(fake, followup)
client._unreplied_followup_sent[ckey] = now_mono
sent += 1
client._activity_log(
"unreplied_followup_sent",
acc_id=acc_id,
customer_id=cid,
idle_seconds=int(idle_s),
last_msg=str(last.get("message", "") or "")[:120],
reply=followup,
)
finally:
try:
if conn:
conn.close()
except Exception:
logger.debug("关闭数据库连接失败", exc_info=True)
async def compose_ai_scene_reply(client, *, original_msg: dict, scene: str, intent_hint: str, fallback: str) -> str:
"""场景化 AI 直接生成回复(不依赖固定模板)。"""
if not client.enable_agent or not client.agent or not client.AgentDeps:
return fallback
try:
deps = client.AgentDeps(
msg_id=str(original_msg.get("msg_id", "") or f"{scene}_gen"),
acc_id=str(original_msg.get("acc_id", "") or ""),
from_id=str(original_msg.get("from_id", "") or ""),
platform=str(original_msg.get("acc_type", "") or ""),
)
customer_msg = client.to_chinese(str(original_msg.get("msg", "") or ""))
prompt = (
"你是淘宝客服,直接生成一条发给客户的话。\n"
f"场景: {scene}\n"
f"意图: {intent_hint}\n"
f"客户原话: {customer_msg}\n"
"要求: 1-2句自然口语不要模板腔不要新增价格/承诺;只输出最终回复。\n"
)
result = await client.agent.agent_natural_reply.run(prompt, deps=deps, message_history=[])
out = str(getattr(result, "output", "") or "").strip()
if not out:
return fallback
if out.startswith("话术|") or "[转移会话]" in out or "TRANSFER_REQUESTED" in out:
return fallback
client._activity_log(
"ai_scene_reply_generated",
acc_id=str(original_msg.get("acc_id", "") or ""),
customer_id=str(original_msg.get("from_id", "") or ""),
scene=scene,
generated=out[:160],
)
return out
except Exception as e:
client._activity_log(
"ai_scene_reply_error",
acc_id=str(original_msg.get("acc_id", "") or ""),
customer_id=str(original_msg.get("from_id", "") or ""),
scene=scene,
error=str(e),
)
return fallback

View File

@@ -0,0 +1,128 @@
import asyncio
import time
from datetime import datetime
def fire_and_forget(client, coro):
"""后台执行协程,不阻塞接收循环;异常会记录到日志。"""
task = asyncio.create_task(coro)
def _done(t):
if t.cancelled():
return
exc = t.exception()
if exc:
client.logger.exception(f"后台任务异常: {exc}")
task.add_done_callback(_done)
def prune_seen(seen: dict, now_mono: float, ttl_sec: float = 8.0):
if len(seen) <= 2000:
return
stale = [k for k, t in seen.items() if (now_mono - t) > ttl_sec]
for k in stale:
seen.pop(k, None)
def log_inbound_once(client, data: dict, chat_log_fn):
"""统一记录入站消息,短窗口去重,避免多分支重复写库。"""
try:
cid = data.get("from_id", "")
if not cid:
return
msg = client.to_chinese(data.get("msg", "") or "")
acc_id = data.get("acc_id", "")
mtype = int(data.get("msg_type", 0) or 0)
now_mono = time.monotonic()
sig = f"{acc_id}|{cid}|{mtype}|{msg}"
last = client._inbound_log_seen.get(sig, 0.0)
if (now_mono - last) < 2.0:
return
client._inbound_log_seen[sig] = now_mono
prune_seen(client._inbound_log_seen, now_mono, ttl_sec=8.0)
chat_log_fn(
cid,
msg,
"in",
customer_name=client.to_chinese(data.get("from_name", "") or data.get("cy_name", "")),
acc_id=acc_id,
platform=data.get("acc_type", ""),
msg_type=mtype,
)
except Exception:
client.logger.debug("入站消息写库失败", exc_info=True)
def log_outbound_once(client, original_msg: dict, reply_content: str, chat_log_fn):
"""统一记录出站消息,短窗口去重,避免重复写库。"""
try:
cid = original_msg.get("from_id", "")
if not cid:
return
msg = reply_content or ""
acc_id = original_msg.get("acc_id", "")
now_mono = time.monotonic()
sig = f"{acc_id}|{cid}|0|{msg}"
last = client._outbound_log_seen.get(sig, 0.0)
if (now_mono - last) < 2.0:
return
client._outbound_log_seen[sig] = now_mono
prune_seen(client._outbound_log_seen, now_mono, ttl_sec=8.0)
chat_log_fn(
cid,
msg,
"out",
customer_name=client.to_chinese(original_msg.get("from_name", "") or original_msg.get("cy_name", "")),
acc_id=acc_id,
platform=original_msg.get("acc_type", ""),
msg_type=0,
)
except Exception:
client.logger.debug("出站消息写库失败", exc_info=True)
def build_customer_message(client, data: dict, customer_message_cls):
"""把原始消息字典转换为 Agent 输入模型。"""
return customer_message_cls(
msg_id=data.get("msg_id", ""),
acc_id=data.get("acc_id", ""),
msg=client.to_chinese(data.get("msg", "")),
from_id=data.get("from_id", ""),
from_name=client.to_chinese(data.get("from_name", "")),
cy_id=data.get("cy_id", ""),
acc_type=data.get("acc_type", ""),
msg_type=data.get("msg_type", 0),
cy_name=client.to_chinese(data.get("cy_name", "")),
goods_name=client.to_chinese(data.get("goods_name", "")) if data.get("goods_name") else None,
goods_order=client.to_chinese(data.get("goods_order", "")) if data.get("goods_order") else None,
)
def touch_customer_last_contact(client, customer_id: str, db):
"""兜底更新客户最后联系时间。"""
if not customer_id:
return
try:
profile = db.get_customer(customer_id)
profile.last_contact = datetime.now().isoformat()
db.save_customer(profile)
except Exception:
client.logger.debug("更新客户最后联系时间失败: customer_id=%s", customer_id, exc_info=True)
def push_chat_to_wechat_safe(client, *, data: dict, customer_msg: str, reply_msg: str, tag: str, goods_name: str = ""):
"""异步推送企微聊天日志,失败不影响主流程。"""
try:
from utils.wechat_chat_log import push_chat_to_wechat
asyncio.create_task(push_chat_to_wechat(
customer_name=client.to_chinese(data.get("from_name", "") or data.get("cy_name", "")),
customer_id=data.get("from_id", ""),
acc_id=data.get("acc_id", ""),
customer_msg=client.to_chinese(customer_msg or ""),
reply_msg=reply_msg or "",
goods_name=goods_name or client.to_chinese(data.get("goods_name", "") or ""),
))
except Exception:
client.logger.debug("推送企微聊天日志失败(%s)", tag, exc_info=True)

View File

@@ -0,0 +1,11 @@
async def handle_image_message_flow(client, data: dict):
"""
处理图片消息。
先回复"我找找"然后把图片URL作为消息内容交给 Agent后台执行
"""
await client.send_reply(data, "我找找")
image_data = dict(data)
image_data["msg"] = f"[客户发来图片] {data.get('msg', '')}"
image_data["msg_type"] = 0
client._fire_and_forget(client._agent_reply_serialized(image_data))

View File

@@ -0,0 +1,86 @@
import logging
import os
from datetime import datetime
class _AnsiColorFormatter(logging.Formatter):
RESET = "\033[0m"
MESSAGE_TEXT_REPLACEMENTS = (
("[PROMPT->AI 前置提示词]", "[AI提示词]"),
("[PROMPT->AI", "[AI提示词"),
("[THINK/TOOL_CALL]", "[AI思考-工具调用]"),
("[THINK/TOOL_RETURN]", "[AI思考-工具返回]"),
("[THINK/RAW_OUTPUT]", "[AI思考-原始输出]"),
("[REPLY->CUSTOMER]", "[AI回复客户]"),
("[ACTIVITY]", "[活动日志]"),
("[AI质检]", "[AI质检]"),
)
MESSAGE_COLOR_RULES = (
("[PROMPT->AI", "\033[94m"),
("[THINK/", "\033[96m"),
("[REPLY->CUSTOMER]", "\033[92m"),
("Agent 回复", "\033[92m"),
("[ACTIVITY]", "\033[95m"),
("[AI质检]", "\033[97m"),
("收到新消息", "\033[36m"),
("发送成功", "\033[32m"),
("防抖等待", "\033[93m"),
)
COLORS = {
logging.DEBUG: "\033[36m",
logging.INFO: "\033[32m",
logging.WARNING: "\033[33m",
logging.ERROR: "\033[31m",
logging.CRITICAL: "\033[35m",
}
def __init__(self, fmt: str, datefmt: str | None = None, use_color: bool = True):
super().__init__(fmt=fmt, datefmt=datefmt)
self.use_color = use_color
def format(self, record: logging.LogRecord) -> str:
msg = super().format(record)
if not self.use_color:
for old, new in self.MESSAGE_TEXT_REPLACEMENTS:
msg = msg.replace(old, new)
return msg
raw_msg = record.getMessage()
for old, new in self.MESSAGE_TEXT_REPLACEMENTS:
msg = msg.replace(old, new)
for key, color in self.MESSAGE_COLOR_RULES:
if key in raw_msg:
return f"{color}{msg}{self.RESET}"
color = self.COLORS.get(record.levelno, "")
if not color:
return msg
return f"{color}{msg}{self.RESET}"
def setup_logger():
from logging.handlers import RotatingFileHandler
from config.config import LOG_DIR, LOG_MAX_BYTES, LOG_BACKUP_COUNT
logger = logging.getLogger("cs_agent")
if getattr(logger, "_cs_logger_configured", False):
return logger
logger.setLevel(logging.INFO)
logger.propagate = False
fmt = logging.Formatter("[%(asctime)s] %(message)s", datefmt="%H:%M:%S")
use_color = (os.getenv("LOG_COLOR", "1").lower() in ("1", "true", "yes")) and not bool(os.getenv("NO_COLOR"))
ch = logging.StreamHandler()
ch.setFormatter(_AnsiColorFormatter("[%(asctime)s] %(message)s", datefmt="%H:%M:%S", use_color=use_color))
logger.addHandler(ch)
LOG_DIR.mkdir(exist_ok=True)
today = datetime.now().strftime("%Y-%m-%d")
fh = RotatingFileHandler(
LOG_DIR / f"chat_{today}.log",
maxBytes=LOG_MAX_BYTES,
backupCount=LOG_BACKUP_COUNT,
encoding="utf-8",
)
fh.setFormatter(fmt)
logger.addHandler(fh)
logger._cs_logger_configured = True
return logger

View File

@@ -0,0 +1,98 @@
import json
import random
import re
def to_chinese_text(text):
"""处理文本,安全地转换 unicode 转义。"""
if not isinstance(text, str):
return text
if "\\u" not in text:
return text
try:
return json.loads(f'"{text}"')
except Exception:
return text
def is_transfer_msg(client, data: dict) -> bool:
msg = to_chinese_text(data.get("msg", ""))
return "转交给" in msg or "转接给" in msg
def pick_transfer_greeting() -> str:
choices = [
"在的亲,发图我看下",
"在呢亲,有需求直接说",
"我在的,您把要求发我",
"在的哈,你说我这边看着处理",
"在呢,图和需求发来我看看",
]
return random.choice(choices)
def is_shop_card(client, data: dict) -> bool:
msg = to_chinese_text(data.get("msg", ""))
return msg.startswith("[进店卡片]") or "我想咨询你们店的这个商品" in msg
def extract_customer_text_from_shop_card_msg(client, msg: str) -> str:
text = to_chinese_text(msg or "").strip()
if not text:
return ""
parts = [p.strip() for p in text.split("#*#") if p and p.strip()]
kept = []
for part in parts:
if part.startswith("[进店卡片]") or "我想咨询你们店的这个商品" in part:
continue
kept.append(part)
if kept:
return " ".join(kept).strip()
stripped = re.sub(r"\[进店卡片\][^\n\r]*", "", text).strip()
stripped = stripped.replace("我想咨询你们店的这个商品", "").strip(",。,#* ")
return stripped
def has_chat_history(customer_id: str, acc_id: str = "") -> bool:
if not customer_id:
return False
try:
from db.chat_log_db import get_recent_conversation
msgs = get_recent_conversation(customer_id, acc_id=acc_id, limit=1)
return len(msgs) > 0
except Exception:
return False
def should_ignore(client, data: dict) -> bool:
msg = to_chinese_text(data.get("msg", ""))
ignore_patterns = [
"已转接",
"接入会话",
"结束会话",
"会话已",
"[系统消息]",
"[系统通知]",
]
for pattern in ignore_patterns:
if pattern in msg:
return True
acc_id = data.get("acc_id", "")
from_id = data.get("from_id", "")
if acc_id and from_id and acc_id == from_id:
return True
return False
def get_msg_type_name(msg_type):
types = {
0: "文本",
1: "图片",
2: "视频",
3: "文件",
}
return types.get(msg_type, f"未知({msg_type})")

View File

@@ -0,0 +1,84 @@
import re
from typing import Any
def msg_is_price_inquiry(msg: str) -> bool:
if not msg:
return False
patterns = ("多少钱", "多少一张", "一张多少钱", "画图多少", "报价", "给个价", "几块", "多少钱")
return any(p in msg for p in patterns)
def detect_order_status(msg: str) -> str:
if not msg:
return ""
s = msg
if "买家已付款" in s or "已付款" in s:
return "paid"
if "[系统订单信息]" in s:
if "等待买家付款" in s or "未付款" in s:
return "waiting"
return "order"
return ""
def msg_requests_external_contact(msg: str) -> bool:
if not msg:
return False
lower = msg.lower()
kws = ("加qq", "qq号", "vx", "微信", "加v", "联系方式", "私聊", "加一下", "加个", "手机号", "电话", "加群", "q q", "v 信")
return any(k in lower for k in kws)
def extract_size_pairs_m(msg: str) -> list[tuple[float, float]]:
"""提取消息中的米制尺寸对,如 15*6.4米 / 15米*6.4 / 15x6.4m。"""
if not msg:
return []
s = (msg or "").lower().replace("×", "*").replace("x", "*")
pairs = []
patterns = [
r"(\d+(?:\.\d+)?)\s*\*\s*(\d+(?:\.\d+)?)\s*(?:米|m)\b",
r"(\d+(?:\.\d+)?)\s*(?:米|m)\s*\*\s*(\d+(?:\.\d+)?)\b",
]
for p in patterns:
for m in re.findall(p, s):
try:
a = float(m[0])
b = float(m[1])
if a > 0 and b > 0:
pairs.append((a, b))
except Exception:
continue
return pairs
def oversize_reply_if_needed(msg: str) -> str:
"""
检测超大尺寸需求并返回拒绝话术;未命中返回空字符串。
规则:最长边 > 阈值 或 面积 > 阈值。
"""
try:
from config.config import MAX_SERVICE_SIZE_LONGEST_METERS, MAX_SERVICE_SIZE_AREA_SQM
longest_limit = float(MAX_SERVICE_SIZE_LONGEST_METERS)
area_limit = float(MAX_SERVICE_SIZE_AREA_SQM)
except Exception:
longest_limit = 10.0
area_limit = 20.0
pairs = extract_size_pairs_m(msg)
for w, h in pairs:
longest = max(w, h)
area = w * h
if longest > longest_limit or area > area_limit:
return (
f"{w:g}米*{h:g}米这个尺寸太大了,我们这边做不了。"
"如果要做可以拆成几段小尺寸,我再给你按段评估。"
)
return ""
def build_auto_quote_signature(state: Any) -> str:
from core.websocket_auto_quote_flow import build_auto_quote_signature as _build
return _build(state)

View File

@@ -0,0 +1,130 @@
import os
import re
import time
def normalize_reply_semantic_key(text: str) -> str:
s = (text or "").strip().lower()
if not s:
return ""
for w in ("", "", "", "", "", "", ""):
s = s.replace(w, "")
s = re.sub(r"[,。!?、,.!?:\s~\-—_]+", "", s)
return s[:200]
def classify_outbound_reply(text: str) -> str:
s = (text or "").strip()
if not s:
return "empty"
if any(k in s for k in ("报价", "总价", "多少钱", "多少", "马上给你报价", "先给你报")):
return "quote"
if any(k in s for k in ("继续发图", "发完", "发图", "把图发", "先看图")):
return "collect"
if any(k in s for k in ("在吗", "你好", "在的", "在呢")):
return "greeting"
if any(k in s for k in ("转人工", "转接", "转给")):
return "transfer"
if any(k in s for k in ("稍等", "我先看", "看一下", "看下")):
return "ack"
return "general"
def template_family(reply: str) -> str:
s = (reply or "").strip()
if not s:
return ""
if "需求我记上了" in s and "继续发图" in s:
return "collect_remind"
if ("这批图过一遍" in s or "收齐了" in s or "收好了" in s) and ("总价" in s or "报价" in s):
return "quote_defer"
if "图片收到了" in s and "继续发" in s:
return "collect_ack"
if "好嘞,你稍等下,我这边看一下" in s:
return "fallback_ack"
return ""
def outbound_arbiter(client, original_msg: dict, reply_content: str, trace_id: str) -> tuple[bool, str]:
"""
统一出站裁决层:
1) 语义去重(相同语义短窗口不重复);
2) 同类回复节流(同类话术短窗口不重复)。
"""
key = f"{original_msg.get('acc_id', '')}:{original_msg.get('from_id', '')}"
now_mono = time.monotonic()
sem_key = normalize_reply_semantic_key(reply_content)
reply_class = classify_outbound_reply(reply_content)
try:
sem_window = max(30, int(os.getenv("AI_OUTBOUND_SEMANTIC_DEDUPE_SECONDS", "180")))
except Exception:
sem_window = 180
try:
class_window = max(20, int(os.getenv("AI_OUTBOUND_CLASS_DEDUPE_SECONDS", "90")))
except Exception:
class_window = 90
try:
template_window = max(120, int(os.getenv("AI_OUTBOUND_TEMPLATE_FATIGUE_SECONDS", "600")))
except Exception:
template_window = 600
sem_bucket = client._outbound_semantic_seen.setdefault(key, {})
cls_bucket = client._outbound_class_seen.setdefault(key, {})
tpl_bucket = client._outbound_template_seen.setdefault(key, {})
client._prune_seen(sem_bucket, now_mono, ttl_sec=max(sem_window * 2, 240))
client._prune_seen(cls_bucket, now_mono, ttl_sec=max(class_window * 2, 180))
client._prune_seen(tpl_bucket, now_mono, ttl_sec=max(template_window * 2, 1200))
if sem_key and (now_mono - sem_bucket.get(sem_key, 0.0)) < sem_window:
client._activity_log(
"outbound_arbiter_block",
trace_id=trace_id,
acc_id=original_msg.get("acc_id", ""),
customer_id=original_msg.get("from_id", ""),
reason="semantic_duplicate",
semantic_key=sem_key[:80],
reply_class=reply_class,
msg=reply_content,
)
return False, "semantic_duplicate"
family = template_family(reply_content)
if family and (now_mono - tpl_bucket.get(family, 0.0)) < template_window:
client._activity_log(
"outbound_arbiter_block",
trace_id=trace_id,
acc_id=original_msg.get("acc_id", ""),
customer_id=original_msg.get("from_id", ""),
reason="template_fatigue",
template_family=family,
msg=reply_content,
)
return False, "template_fatigue"
if reply_class in {"quote", "collect", "ack"} and (now_mono - cls_bucket.get(reply_class, 0.0)) < class_window:
client._activity_log(
"outbound_arbiter_block",
trace_id=trace_id,
acc_id=original_msg.get("acc_id", ""),
customer_id=original_msg.get("from_id", ""),
reason="class_duplicate",
reply_class=reply_class,
msg=reply_content,
)
return False, "class_duplicate"
if sem_key:
sem_bucket[sem_key] = now_mono
cls_bucket[reply_class] = now_mono
if family:
tpl_bucket[family] = now_mono
client._activity_log(
"outbound_arbiter_pass",
trace_id=trace_id,
acc_id=original_msg.get("acc_id", ""),
customer_id=original_msg.get("from_id", ""),
reply_class=reply_class,
template_family=family,
semantic_key=sem_key[:80] if sem_key else "",
)
return True, "pass"

View File

@@ -0,0 +1,119 @@
import asyncio
import os
async def command_handler_flow(client):
"""命令行交互。"""
client.logger.info("\n命令帮助:")
client.logger.info(" reply <内容> - 回复最后一条消息")
client.logger.info(" text <id> <平台> <内容> - 发送文本消息")
client.logger.info(" img <id> <平台> <路径> - 发送图片")
client.logger.info(" setid <id> - 设置回复ID")
client.logger.info(" agent on/off - 开启/关闭 Agent")
client.logger.info(" exit/quit - 退出\n")
while client.running:
try:
loop = asyncio.get_running_loop()
user_input = await loop.run_in_executor(None, input, "")
parts = user_input.strip().split(maxsplit=1)
if not parts:
continue
cmd = parts[0].lower()
if cmd in ["exit", "quit", "q"]:
client.logger.info(f"[{client.get_time()}] 正在关闭...")
client.running = False
if client.websocket:
await client.websocket.close()
break
if cmd == "setid" and len(parts) > 1:
client.reply_id = parts[1]
client.logger.info(f"[{client.get_time()}] 回复ID已设置为: {client.reply_id}")
continue
if cmd == "agent" and len(parts) > 1:
if parts[1].lower() == "on":
client.enable_agent = True
client.logger.info(f"[{client.get_time()}] Agent 已开启")
elif parts[1].lower() == "off":
client.enable_agent = False
client.logger.info(f"[{client.get_time()}] Agent 已关闭")
continue
if cmd == "reply" and len(parts) > 1:
if client.last_msg:
await client.send_reply(client.last_msg, parts[1])
else:
client.logger.info(f"[{client.get_time()}] 错误: 还没有收到任何消息")
continue
if cmd == "text" and len(parts) > 1:
args = parts[1].split(maxsplit=2)
if len(args) >= 3:
await client.send_text(args[0], args[1], args[2])
else:
client.logger.info(f"[{client.get_time()}] 格式: text <cy_id> <acc_type> <内容>")
continue
if cmd == "img" and len(parts) > 1:
args = parts[1].split(maxsplit=2)
if len(args) >= 3:
await client.send_image(args[0], args[1], args[2])
else:
client.logger.info(f"[{client.get_time()}] 格式: img <cy_id> <acc_type> <图片路径>")
continue
client.logger.info(f"[{client.get_time()}] 未知命令: {cmd}")
except Exception as e:
client.logger.info(f"[{client.get_time()}] 命令错误: {e}")
async def run_client_flow(client):
"""运行客户端。"""
tasks = [client.connect(), client.command_handler()]
try:
from mail.email_receiver import email_receiver
if email_receiver.username:
client.logger.info(f"[{client.get_time()}] 邮件接收已启动,监控: {email_receiver.username}")
tasks.append(email_receiver.start())
else:
client.logger.info(f"[{client.get_time()}] 未配置邮件账号,跳过邮件接收")
except Exception as e:
client.logger.info(f"[{client.get_time()}] 邮件接收模块加载失败: {e}")
try:
from utils.daily_summary import scheduler as daily_scheduler
tasks.append(daily_scheduler())
client.logger.info(f"[{client.get_time()}] 每日日报定时任务已启动")
except Exception as e:
client.logger.info(f"[{client.get_time()}] 日报模块加载失败: {e}")
try:
from utils.health_check import health_check_loop
def _qingjian_ok():
return client.websocket is not None and not getattr(client.websocket, "closed", True)
tasks.append(health_check_loop(_qingjian_ok))
client.logger.info(f"[{client.get_time()}] 健康检查已启动")
except Exception as e:
client.logger.info(f"[{client.get_time()}] 健康检查模块加载失败: {e}")
try:
from utils.wechat_chat_log import morning_startup_scheduler
tasks.append(morning_startup_scheduler())
client.logger.info(f"[{client.get_time()}] 早8点企微启动消息已启动")
except Exception as e:
client.logger.info(f"[{client.get_time()}] 企微启动消息模块加载失败: {e}")
if os.getenv("UNREPLIED_FOLLOWUP_ENABLED", "true").lower() in ("1", "true", "yes"):
tasks.append(client._unreplied_followup_loop())
client.logger.info(f"[{client.get_time()}] 未回复会话补偿任务已启动")
await asyncio.gather(*tasks)

View File

@@ -0,0 +1,70 @@
import json
import websockets
async def send_text_flow(client, cy_id, acc_type, content):
"""主动发送文本消息。"""
message = {
"msg_id": "",
"acc_id": "",
"msg": content,
"from_id": client.reply_id,
"from_name": client.reply_id,
"cy_id": cy_id,
"acc_type": acc_type,
"msg_type": 0,
"cy_name": "",
}
await client.send_message(message)
async def send_image_flow(client, cy_id, acc_type, image_path):
"""主动发送图片消息。"""
message = {
"msg_id": "",
"acc_id": "",
"msg": image_path,
"from_id": client.reply_id,
"from_name": client.reply_id,
"cy_id": cy_id,
"acc_type": acc_type,
"msg_type": 1,
"cy_name": "",
}
await client.send_message(message)
async def send_message_flow(client, message):
"""发送消息到服务器。"""
if client.websocket and client.websocket.state == websockets.protocol.State.OPEN:
try:
msg_json = json.dumps(message, ensure_ascii=False)
await client.websocket.send(msg_json)
pretty = json.dumps(message, ensure_ascii=False, indent=2)
client.logger.info(f"[{client.get_time()}] 发送成功:\n{pretty}")
client._activity_log(
"send_message_success",
trace_id=message.get("_trace_id", ""),
acc_id=message.get("acc_id", ""),
customer_id=message.get("from_id", ""),
msg_type=message.get("msg_type", 0),
msg=message.get("msg", ""),
)
except Exception as e:
client.logger.info(f"[{client.get_time()}] 发送失败: {e}")
client._activity_log(
"send_message_error",
trace_id=message.get("_trace_id", ""),
acc_id=message.get("acc_id", ""),
customer_id=message.get("from_id", ""),
error=str(e),
)
else:
client.logger.info(f"[{client.get_time()}] 错误: 连接未打开")
client._activity_log(
"send_message_skipped",
trace_id=message.get("_trace_id", ""),
reason="socket_not_open",
acc_id=message.get("acc_id", ""),
customer_id=message.get("from_id", ""),
)

View File

@@ -0,0 +1,23 @@
async def save_conversation_summary_flow(client, customer_id: str, buyer_msg: str, agent_reply: str):
"""用 AI 生成一句话对话摘要并持久化。"""
try:
from db.customer_db import db
from openai import AsyncOpenAI
api_client = AsyncOpenAI(
api_key=client.agent.api_key if client.agent else None,
base_url=client.agent.base_url if client.agent else None,
)
resp = await api_client.chat.completions.create(
model=client.agent.model_name if client.agent else "gpt-4o-mini",
messages=[
{"role": "system", "content": "用一句话15字以内总结这段对话的核心内容只输出摘要文字。"},
{"role": "user", "content": f"买家:{buyer_msg}\n客服:{agent_reply}"},
],
max_tokens=30,
temperature=0.3,
)
summary = resp.choices[0].message.content.strip()
db.save_conversation_summary(customer_id, summary)
except Exception:
client.logger.debug("保存对话摘要失败(不影响主流程)", exc_info=True)

View File

@@ -0,0 +1,143 @@
import json
import logging
import os
from pathlib import Path
from typing import Any, Dict, List
from utils.metrics_tracker import emit as metrics_emit
logger = logging.getLogger("cs_agent")
def load_system_inquiry_rules() -> Dict[str, Any]:
"""加载系统客服询单规则(全局 + 店铺覆盖)。"""
from config.config import (
SYSTEM_INQUIRY_ENABLED,
SYSTEM_INQUIRY_DEFAULT_ACTION,
SYSTEM_INQUIRY_DEFAULT_REPLY,
SYSTEM_INQUIRY_RULES_FILE,
)
enabled_env = os.getenv("SYSTEM_INQUIRY_ENABLED")
enabled = (
enabled_env.lower() in ("1", "true", "yes")
if isinstance(enabled_env, str)
else bool(SYSTEM_INQUIRY_ENABLED)
)
action = (os.getenv("SYSTEM_INQUIRY_DEFAULT_ACTION") or SYSTEM_INQUIRY_DEFAULT_ACTION or "silent").strip().lower()
reply = os.getenv("SYSTEM_INQUIRY_DEFAULT_REPLY") or SYSTEM_INQUIRY_DEFAULT_REPLY or ""
rules_file = os.getenv("SYSTEM_INQUIRY_RULES_FILE") or str(SYSTEM_INQUIRY_RULES_FILE)
defaults: Dict[str, Any] = {
"enabled": bool(enabled),
"default_action": action,
"default_reply": reply,
"sender_keywords": ["系统客服", "官方客服", "平台客服", "机器人客服", "商家客服系统"],
"message_keywords": ["系统询单", "代客咨询", "平台代问", "系统代发", "客服询单"],
"shops": {},
}
try:
p = Path(rules_file)
if p.exists():
with p.open("r", encoding="utf-8") as f:
loaded = json.load(f)
if isinstance(loaded, dict):
defaults.update(loaded)
except Exception as e:
logger.warning("系统询单规则加载失败,使用默认规则: %s", e)
return defaults
def normalize_kw_list(v: Any) -> List[str]:
if not isinstance(v, list):
return []
return [str(x).strip().lower() for x in v if str(x).strip()]
def resolve_system_inquiry_policy(client, acc_id: str) -> Dict[str, Any]:
"""根据店铺合并系统询单策略。"""
from config.config import SYSTEM_INQUIRY_SHOPS
rules = client._system_inquiry_rules or {}
if not bool(rules.get("enabled", True)):
return {"enabled": False}
shops_env = os.getenv("SYSTEM_INQUIRY_SHOPS", SYSTEM_INQUIRY_SHOPS or "")
shop_whitelist = [s.strip() for s in shops_env.split(",") if s.strip()]
if shop_whitelist and (acc_id or "") not in shop_whitelist:
return {"enabled": False}
policy: Dict[str, Any] = {
"enabled": True,
"action": str(rules.get("default_action", "silent")).strip().lower(),
"reply": str(rules.get("default_reply", "")).strip(),
"sender_keywords": normalize_kw_list(rules.get("sender_keywords")),
"message_keywords": normalize_kw_list(rules.get("message_keywords")),
}
shop_cfg = (rules.get("shops") or {}).get(acc_id or "", {})
if isinstance(shop_cfg, dict):
if "enabled" in shop_cfg and not bool(shop_cfg.get("enabled", True)):
return {"enabled": False}
if shop_cfg.get("action"):
policy["action"] = str(shop_cfg.get("action")).strip().lower()
if shop_cfg.get("reply"):
policy["reply"] = str(shop_cfg.get("reply")).strip()
if isinstance(shop_cfg.get("sender_keywords"), list):
policy["sender_keywords"] = normalize_kw_list(shop_cfg.get("sender_keywords"))
if isinstance(shop_cfg.get("message_keywords"), list):
policy["message_keywords"] = normalize_kw_list(shop_cfg.get("message_keywords"))
if policy["action"] not in ("silent", "reply", "transfer"):
policy["action"] = "silent"
return policy
def match_system_inquiry(client, data: dict, policy: Dict[str, Any]) -> bool:
"""识别是否为系统客服询单消息。"""
if not policy.get("enabled", False):
return False
from_name = client.to_chinese(data.get("from_name", "") or "").lower()
from_id = str(data.get("from_id", "") or "").lower()
msg = client.to_chinese(data.get("msg", "") or "").lower()
sender_hits = 0
for kw in policy.get("sender_keywords", []):
if kw and (kw in from_name or kw in from_id):
sender_hits += 1
message_hits = 0
for kw in policy.get("message_keywords", []):
if kw and kw in msg:
message_hits += 1
# 优先看发送者特征;纯文本命中时至少要求两个关键词,降低误判风险
return sender_hits > 0 or message_hits >= 2
async def handle_system_inquiry(client, data: dict) -> bool:
"""命中系统询单后按策略处理。"""
acc_id = data.get("acc_id", "")
policy = resolve_system_inquiry_policy(client, acc_id)
if not match_system_inquiry(client, data, policy):
return False
customer_id = data.get("from_id", "")
metrics_emit("system_inquiry_detected", customer_id=customer_id, acc_id=acc_id)
action = policy.get("action", "silent")
logger.info("系统询单命中 | 店铺:%s | 客户:%s | action:%s", acc_id, customer_id, action)
if action == "reply":
reply = await client._compose_ai_scene_reply(
original_msg=data,
scene="system_inquiry_reply",
intent_hint="这是系统客服询单消息,简短确认已收到并说明会跟进即可。",
fallback=(policy.get("reply") or "您好,这边已收到询单消息,稍后由人工客服跟进处理。"),
)
await client.send_reply(data, reply)
metrics_emit("system_inquiry_auto_reply", customer_id=customer_id, acc_id=acc_id)
return True
if action == "transfer":
await client.transfer_to_human(data, "系统询单转人工")
metrics_emit("system_inquiry_transfer", customer_id=customer_id, acc_id=acc_id)
return True
metrics_emit("system_inquiry_ignored", customer_id=customer_id, acc_id=acc_id)
return True

View File

@@ -0,0 +1,83 @@
import logging
from utils.metrics_tracker import emit as metrics_emit
logger = logging.getLogger("cs_agent")
async def transfer_to_human_flow(client, data: dict, transfer_msg: str = "", *, transfer_group_resolver=None):
"""
转接人工客服。
1. 优先调用 dispatch 服务 GET /assign 一键派单
2. 派单失败时,回退旧版 designer_roster 派单
3. 无人在线或未配置时,回退到 config/transfer_groups.json
设计师在线状态:仅在转人工时按需查询,不轮询。
"""
if not client.websocket:
logger.info("[%s] 错误: 未连接到服务器", client.get_time())
return
acc_id = data.get("acc_id", "")
group_id = None
assigned_to = ""
dispatch_res = await client._dispatch_assign_once()
if dispatch_res.get("success"):
assigned_to = str(dispatch_res.get("assigned_to", "") or "").strip()
logger.info(
"一键派单成功 | task_id=%s | assigned_to=%s | online_count=%s",
dispatch_res.get("task_id", ""),
assigned_to or "未知",
dispatch_res.get("online_count", 0),
)
metrics_emit(
"dispatch_assign_success",
acc_id=acc_id,
assigned_to=assigned_to,
online_count=dispatch_res.get("online_count", 0),
)
else:
logger.warning("一键派单失败,回退旧派单逻辑: %s", dispatch_res.get("reason", "unknown"))
metrics_emit("dispatch_assign_failed", acc_id=acc_id)
# 2. 派单失败时,回退旧版 designer_roster
if not dispatch_res.get("success"):
try:
from utils.designer_roster import poll_and_update_roster
from db.designer_roster_db import get_transfer_group_for_shop
await poll_and_update_roster()
group_id = get_transfer_group_for_shop(acc_id)
except Exception as e:
logger.debug("设计师派单未启用或异常: %s", e)
# 3. 无人在线时企微提醒(新旧两套都没拿到在线结果时)
online_count = int(dispatch_res.get("online_count", 0) or 0)
if online_count <= 0 and not group_id:
try:
from config.config import WECHAT_WEBHOOK
if WECHAT_WEBHOOK:
import httpx
async with httpx.AsyncClient(timeout=5) as c:
resp = await c.post(WECHAT_WEBHOOK, json={
"msgtype": "text",
"text": {"content": "谁在线啊"},
})
if resp.status_code != 200:
logger.warning("企微提醒发送失败: %s %s", resp.status_code, resp.text)
else:
logger.debug("未配置 WECHAT_WEBHOOK跳过企微提醒")
except Exception as e:
logger.warning("企微提醒发送异常: %s", e)
# 4. 构造转接命令:有 assigned_to 用人名,否则回退分组
if assigned_to:
cmd = f"正在为你转接人工|[转移会话],{assigned_to},无原因"
await client.send_reply(data, cmd)
logger.info("[%s] 已发送转接请求 (店铺:%s -> 设计师:%s)", client.get_time(), acc_id or "未知", assigned_to)
return
if not group_id:
group_id = transfer_group_resolver(acc_id) if transfer_group_resolver else "20252916034"
cmd = f"话术|[转移会话],分组{group_id},无原因"
await client.send_reply(data, cmd)
logger.info("[%s] 已发送转接请求 (店铺:%s -> 分组:%s)", client.get_time(), acc_id or "未知", group_id)

View File

@@ -0,0 +1,64 @@
import asyncio
async def workflow_agent_notify_flow(client, customer_id: str, acc_id: str, acc_type: str, system_hint: str):
"""图片处理完成后,让客服 AI 生成自然话术发给客户。"""
if not client.enable_agent or not client.agent:
return
try:
from core.pydantic_ai_agent import CustomerMessage
notify_msg = CustomerMessage(
msg_id="workflow_notify",
acc_id=acc_id,
msg=system_hint,
from_id=customer_id,
from_name="",
cy_id=customer_id,
acc_type=acc_type,
msg_type=0,
cy_name="",
)
response = await client.agent.process_message(notify_msg)
if response.should_reply and response.reply:
nonsense_patterns = [
"无需", "流程已完成", "不需要回复", "无需额外", "已完成",
"无需回复", "不需要额外", "已经完成", "无需再", "操作已完成",
"任务完成", "流程完成", "记录完成", "报价已",
]
if not any(p in response.reply for p in nonsense_patterns):
fake_data = {
"acc_id": acc_id,
"from_id": customer_id,
"from_name": "",
"cy_id": customer_id,
"acc_type": acc_type,
}
await asyncio.sleep(0.5)
await client.send_reply(fake_data, response.reply)
client.logger.info(f"[Workflow] AI 通知已发送: {response.reply}")
except Exception as e:
client.logger.error(f"[Workflow] AI 通知生成失败: {e}")
async def workflow_send_flow(
client,
customer_id: str,
acc_id: str,
acc_type: str,
content: str,
msg_type: int = 0,
):
"""workflow 回调图片AI完成后用此方法推送消息给客户。"""
msg = {
"msg_id": "",
"acc_id": acc_id,
"msg": content,
"from_id": customer_id,
"from_name": customer_id,
"cy_id": customer_id,
"acc_type": acc_type,
"msg_type": msg_type,
"cy_name": customer_id,
}
await client.send_message(msg)