2402 lines
107 KiB
Python
Executable File
2402 lines
107 KiB
Python
Executable File
import asyncio
|
||
import websockets
|
||
import json
|
||
import re
|
||
import logging
|
||
import random
|
||
import secrets
|
||
import time
|
||
import hashlib
|
||
from collections import deque
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Optional, Dict, Any, List
|
||
from utils.observability import emit_activity, build_trace_id
|
||
|
||
# ========== 转接分组映射 ==========
|
||
def _get_transfer_group(acc_id: str) -> str:
|
||
"""根据店铺 acc_id 获取转接分组 ID。不同店铺对应不同客服分组。"""
|
||
from config.config import CONFIG_DIR
|
||
config_path = CONFIG_DIR / "transfer_groups.json"
|
||
default_group = "20252916034"
|
||
try:
|
||
if config_path.exists():
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
cfg = json.load(f)
|
||
return cfg.get(acc_id, cfg.get("default", default_group))
|
||
except Exception:
|
||
pass
|
||
return default_group
|
||
|
||
# ========== 日志配置(轮转:按大小 10MB,保留 7 份)==========
|
||
class _AnsiColorFormatter(logging.Formatter):
|
||
RESET = "\033[0m"
|
||
MESSAGE_TEXT_REPLACEMENTS = (
|
||
("[PROMPT->AI 前置提示词]", "[AI提示词]"),
|
||
("[PROMPT->AI", "[AI提示词"),
|
||
("[THINK/TOOL_CALL]", "[AI思考-工具调用]"),
|
||
("[THINK/TOOL_RETURN]", "[AI思考-工具返回]"),
|
||
("[THINK/RAW_OUTPUT]", "[AI思考-原始输出]"),
|
||
("[REPLY->CUSTOMER]", "[AI回复客户]"),
|
||
("[ACTIVITY]", "[活动日志]"),
|
||
("[AI质检]", "[AI质检]"),
|
||
)
|
||
# 业务消息类型颜色(优先于 level)
|
||
MESSAGE_COLOR_RULES = (
|
||
("[PROMPT->AI", "\033[94m"), # bright blue
|
||
("[THINK/", "\033[96m"), # bright cyan
|
||
("[REPLY->CUSTOMER]", "\033[92m"), # bright green
|
||
("Agent 回复", "\033[92m"), # bright green
|
||
("[ACTIVITY]", "\033[95m"), # bright magenta
|
||
("[AI质检]", "\033[93m"), # bright yellow
|
||
("收到新消息", "\033[36m"), # cyan
|
||
("发送成功", "\033[32m"), # green
|
||
("防抖等待", "\033[93m"), # bright yellow
|
||
)
|
||
COLORS = {
|
||
logging.DEBUG: "\033[36m", # cyan
|
||
logging.INFO: "\033[32m", # green
|
||
logging.WARNING: "\033[33m", # yellow
|
||
logging.ERROR: "\033[31m", # red
|
||
logging.CRITICAL: "\033[35m", # magenta
|
||
}
|
||
|
||
def __init__(self, fmt: str, datefmt: str | None = None, use_color: bool = True):
|
||
super().__init__(fmt=fmt, datefmt=datefmt)
|
||
self.use_color = use_color
|
||
|
||
def format(self, record: logging.LogRecord) -> str:
|
||
msg = super().format(record)
|
||
if not self.use_color:
|
||
for old, new in self.MESSAGE_TEXT_REPLACEMENTS:
|
||
msg = msg.replace(old, new)
|
||
return msg
|
||
raw_msg = record.getMessage()
|
||
for old, new in self.MESSAGE_TEXT_REPLACEMENTS:
|
||
msg = msg.replace(old, new)
|
||
for key, color in self.MESSAGE_COLOR_RULES:
|
||
if key in raw_msg:
|
||
return f"{color}{msg}{self.RESET}"
|
||
color = self.COLORS.get(record.levelno, "")
|
||
if not color:
|
||
return msg
|
||
return f"{color}{msg}{self.RESET}"
|
||
|
||
|
||
def setup_logger():
|
||
from logging.handlers import RotatingFileHandler
|
||
from config.config import LOG_DIR, LOG_MAX_BYTES, LOG_BACKUP_COUNT
|
||
logger = logging.getLogger("cs_agent")
|
||
if getattr(logger, "_cs_logger_configured", False):
|
||
return logger
|
||
logger.setLevel(logging.INFO)
|
||
# 避免同一日志既被 cs_agent 打印,又向 root logger 传播再打一遍。
|
||
logger.propagate = False
|
||
fmt = logging.Formatter("[%(asctime)s] %(message)s", datefmt="%H:%M:%S")
|
||
use_color = (os.getenv("LOG_COLOR", "1").lower() in ("1", "true", "yes")) and not bool(os.getenv("NO_COLOR"))
|
||
ch = logging.StreamHandler()
|
||
ch.setFormatter(_AnsiColorFormatter("[%(asctime)s] %(message)s", datefmt="%H:%M:%S", use_color=use_color))
|
||
logger.addHandler(ch)
|
||
LOG_DIR.mkdir(exist_ok=True)
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
fh = RotatingFileHandler(
|
||
LOG_DIR / f"chat_{today}.log",
|
||
maxBytes=LOG_MAX_BYTES,
|
||
backupCount=LOG_BACKUP_COUNT,
|
||
encoding="utf-8",
|
||
)
|
||
fh.setFormatter(fmt)
|
||
logger.addHandler(fh)
|
||
logger._cs_logger_configured = True
|
||
return logger
|
||
|
||
import os
|
||
logger = setup_logger()
|
||
|
||
from db.chat_log_db import log_message as _chat_log
|
||
from utils.metrics_tracker import emit as metrics_emit
|
||
|
||
# 导入 Agent 模块
|
||
try:
|
||
from core.pydantic_ai_agent import CustomerServiceAgent, CustomerMessage, AgentDeps, _get_shop_type
|
||
from db.customer_db import db
|
||
from core.workflow import workflow
|
||
AGENT_AVAILABLE = True
|
||
except Exception as e:
|
||
AGENT_AVAILABLE = False
|
||
workflow = None
|
||
AgentDeps = None
|
||
_get_shop_type = lambda acc_id, goods_name: "find_image"
|
||
import traceback
|
||
logger.info(f"警告: Agent 模块导入失败: {e}")
|
||
traceback.print_exc()
|
||
logger.info("将使用基础回复功能")
|
||
|
||
|
||
class QingjianAPIClient:
|
||
"""轻简API WebSocket客户端"""
|
||
|
||
def __init__(self, uri=None, enable_agent: bool = True):
|
||
from config.config import QINGJIAN_WS_URI
|
||
from config.config import IMAGE_MODULE_ENABLED
|
||
from config.config import MESSAGE_DEBOUNCE_SECONDS
|
||
self.uri = uri or QINGJIAN_WS_URI
|
||
self.websocket = None
|
||
self.running = True
|
||
self.reply_id = "tb001" # 回复时使用的from_id
|
||
self.last_msg = None # 保存最后一条消息
|
||
self.enable_agent = enable_agent and AGENT_AVAILABLE
|
||
self.agent = None
|
||
self._replied_msg_ids: deque = deque(maxlen=200) # 已回复消息ID,FIFO去重
|
||
|
||
# 消息防抖:同一客户连续发消息时,等待 N 秒后合并处理
|
||
self._DEBOUNCE_SECONDS = MESSAGE_DEBOUNCE_SECONDS if isinstance(MESSAGE_DEBOUNCE_SECONDS, int) else 8
|
||
self._adaptive_debounce_enabled = os.getenv("ADAPTIVE_DEBOUNCE_ENABLED", "true").lower() in ("1", "true", "yes")
|
||
self._debounce_tasks: dict = {} # customer_key -> asyncio.Task
|
||
self._pending_msgs: dict = {} # customer_key -> list[data]
|
||
self._image_enabled = IMAGE_MODULE_ENABLED
|
||
|
||
# 同客户消息串行:保证「发图→这个高清」等顺序,避免误判
|
||
self._customer_locks: dict = {} # customer_key -> asyncio.Lock
|
||
# agent_reply 并发上限,防止 API 打满
|
||
self._agent_semaphore = asyncio.Semaphore(8)
|
||
self._pending_images: dict = {}
|
||
self._pending_image_tasks: dict = {}
|
||
self._auto_quote_tasks: dict = {} # customer_key -> asyncio.Task
|
||
self._auto_quote_done_sig: dict = {} # customer_key -> signature(同一批内容仅自动触发一次)
|
||
# 旧版“看图即报价”快速链路(默认关闭,避免与 Agent 批量收集逻辑并发打架)
|
||
self._legacy_fast_quote_enabled = os.getenv("LEGACY_FAST_IMAGE_QUOTE", "false").lower() in ("1", "true", "yes")
|
||
self._system_inquiry_rules = self._load_system_inquiry_rules()
|
||
self._last_reply_sent_at: dict = {} # customer_key -> monotonic ts
|
||
self._inbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入)
|
||
self._outbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入)
|
||
self._tianwang_callback_url = (
|
||
os.getenv("TIANWANG_CALLBACK_URL", "").strip()
|
||
or "http://139.199.3.75:18789/api/callback"
|
||
)
|
||
self._tianwang_agent_name = os.getenv("TIANWANG_AGENT_NAME", "终结者").strip() or "终结者"
|
||
self._reply_guard_enabled = os.getenv("AI_REPLY_GUARD_ENABLED", "true").lower() in ("1", "true", "yes")
|
||
self._reply_guard_verbose = os.getenv("AI_REPLY_GUARD_VERBOSE", "false").lower() in ("1", "true", "yes")
|
||
|
||
# 延迟加载任务模块(避免循环导入)
|
||
self.task_scheduler = None
|
||
self.task_manager = None
|
||
self.trigger_engine = None
|
||
|
||
# 多进程分片支持
|
||
self.shard_keys: set = set() # 本进程负责的客户 key 集合
|
||
self.worker_id = int(os.getenv('AI_CS_WORKER_ID', '0'))
|
||
self.worker_count = max(1, int(os.getenv('AI_CS_WORKER_COUNT', '1')))
|
||
|
||
# 初始化 Agent
|
||
if self.enable_agent:
|
||
try:
|
||
self.agent = CustomerServiceAgent()
|
||
logger.info(f"[{self.get_time()}] Agent 初始化成功")
|
||
except Exception as e:
|
||
logger.info(f"[{self.get_time()}] Agent 初始化失败: {e}")
|
||
self.enable_agent = False
|
||
|
||
# 注册 workflow 消息发送回调(供图片AI完成后推送消息用)
|
||
if workflow:
|
||
workflow.register_send_callback(self._workflow_send)
|
||
workflow.register_agent_notify_callback(self._workflow_agent_notify)
|
||
|
||
def _activity_log(self, event: str, **kwargs):
|
||
"""统一活动日志,便于按 event 检索完整链路。"""
|
||
emit_activity(
|
||
logger,
|
||
event=event,
|
||
trace_id=str(kwargs.pop("trace_id", "")),
|
||
customer_id=str(kwargs.pop("customer_id", "")),
|
||
result=str(kwargs.pop("result", "ok")),
|
||
**kwargs,
|
||
)
|
||
|
||
async def _post_tianwang_callback(self, event: str, data: dict, extra: Optional[Dict[str, Any]] = None):
|
||
"""将消息处理事件回调给天网。"""
|
||
if not self._tianwang_callback_url:
|
||
return
|
||
try:
|
||
import httpx
|
||
|
||
trust_env = os.getenv("TIANWANG_CALLBACK_TRUST_ENV", "false").lower() in ("1", "true", "yes")
|
||
payload = {
|
||
"event": event,
|
||
"timestamp": datetime.now().isoformat(),
|
||
"agent_name": self._tianwang_agent_name,
|
||
"acc_id": str(data.get("acc_id", "") or ""),
|
||
"customer_id": str(data.get("from_id", "") or ""),
|
||
"customer_name": self.to_chinese(data.get("from_name", "") or data.get("cy_name", "")),
|
||
"msg_id": str(data.get("msg_id", "") or ""),
|
||
"msg_type": int(data.get("msg_type", 0) or 0),
|
||
"msg": self.to_chinese(data.get("msg", "") or ""),
|
||
"goods_name": self.to_chinese(data.get("goods_name", "") or ""),
|
||
"goods_order": self.to_chinese(data.get("goods_order", "") or ""),
|
||
}
|
||
if extra:
|
||
payload.update(extra)
|
||
async with httpx.AsyncClient(timeout=6, trust_env=trust_env) as client:
|
||
resp = await client.post(self._tianwang_callback_url, json=payload)
|
||
ok = 200 <= resp.status_code < 300
|
||
self._activity_log(
|
||
"tianwang_callback",
|
||
result="ok" if ok else "http_error",
|
||
event_name=event,
|
||
status_code=resp.status_code,
|
||
acc_id=payload["acc_id"],
|
||
customer_id=payload["customer_id"],
|
||
)
|
||
except Exception as e:
|
||
self._activity_log(
|
||
"tianwang_callback",
|
||
result="error",
|
||
event_name=event,
|
||
acc_id=str(data.get("acc_id", "") or ""),
|
||
customer_id=str(data.get("from_id", "") or ""),
|
||
error=str(e),
|
||
)
|
||
|
||
|
||
async def connect(self):
|
||
"""连接WebSocket服务器"""
|
||
while self.running:
|
||
try:
|
||
logger.info(f"[{self.get_time()}] 正在连接轻简API {self.uri}...")
|
||
async with websockets.connect(self.uri) as websocket:
|
||
self.websocket = websocket
|
||
from utils.health_check import set_qingjian_connected
|
||
set_qingjian_connected(True)
|
||
logger.info(f"[{self.get_time()}] 连接成功!")
|
||
if self.enable_agent:
|
||
logger.info(f"[{self.get_time()}] AI Agent 已启用,将自动处理消息")
|
||
logger.info(f"[{self.get_time()}] 等待接收消息...")
|
||
|
||
# 持续接收消息
|
||
await self.receive_messages()
|
||
|
||
except ConnectionRefusedError:
|
||
from utils.health_check import set_qingjian_connected
|
||
set_qingjian_connected(False)
|
||
logger.info(f"[{self.get_time()}] 连接被拒绝,请检查轻简软件是否已启动")
|
||
except websockets.exceptions.InvalidURI:
|
||
from utils.health_check import set_qingjian_connected
|
||
set_qingjian_connected(False)
|
||
logger.info(f"[{self.get_time()}] URI格式错误")
|
||
except Exception as e:
|
||
from utils.health_check import set_qingjian_connected
|
||
set_qingjian_connected(False)
|
||
logger.info(f"[{self.get_time()}] 连接错误: {e}")
|
||
|
||
# 等待5秒后重连
|
||
if self.running:
|
||
logger.info(f"[{self.get_time()}] 5秒后尝试重连...")
|
||
await asyncio.sleep(5)
|
||
|
||
def _customer_key(self, data: dict) -> str:
|
||
"""同一店铺+客户 = 同一会话"""
|
||
return f"{data.get('acc_id','')}:{data.get('from_id','')}"
|
||
|
||
def _get_customer_lock(self, key: str) -> asyncio.Lock:
|
||
if key not in self._customer_locks:
|
||
self._customer_locks[key] = asyncio.Lock()
|
||
return self._customer_locks[key]
|
||
|
||
def _is_owned_by_this_worker(self, customer_key: str) -> bool:
|
||
"""
|
||
多进程兜底路由:
|
||
- 若显式分片存在,用显式分片;
|
||
- 否则按 customer_key 哈希到固定 worker,避免多进程重复处理同一消息。
|
||
"""
|
||
if self.shard_keys:
|
||
return customer_key in self.shard_keys
|
||
if self.worker_count <= 1:
|
||
return True
|
||
try:
|
||
h = int(hashlib.md5(customer_key.encode("utf-8")).hexdigest()[:8], 16)
|
||
return (h % self.worker_count) == self.worker_id
|
||
except Exception:
|
||
return self.worker_id == 0
|
||
|
||
async def _agent_reply_serialized(self, data: dict):
|
||
"""同客户串行 + 全局并发限制,再执行 agent_reply"""
|
||
key = self._customer_key(data)
|
||
async with self._get_customer_lock(key):
|
||
async with self._agent_semaphore:
|
||
await self.agent_reply(data)
|
||
|
||
def _fire_and_forget(self, coro):
|
||
"""后台执行协程,不阻塞接收循环;异常会记录到日志"""
|
||
task = asyncio.create_task(coro)
|
||
|
||
def _done(t):
|
||
if t.cancelled():
|
||
return
|
||
exc = t.exception()
|
||
if exc:
|
||
logger.exception(f"后台任务异常: {exc}")
|
||
|
||
task.add_done_callback(_done)
|
||
|
||
@staticmethod
|
||
def _prune_seen(seen: dict, now_mono: float, ttl_sec: float = 8.0):
|
||
if len(seen) <= 2000:
|
||
return
|
||
stale = [k for k, t in seen.items() if (now_mono - t) > ttl_sec]
|
||
for k in stale:
|
||
seen.pop(k, None)
|
||
|
||
def _log_inbound_once(self, data: dict):
|
||
"""统一记录入站消息,短窗口去重,避免多分支重复写库。"""
|
||
try:
|
||
cid = data.get("from_id", "")
|
||
if not cid:
|
||
return
|
||
msg = self.to_chinese(data.get("msg", "") or "")
|
||
acc_id = data.get("acc_id", "")
|
||
mtype = int(data.get("msg_type", 0) or 0)
|
||
now_mono = time.monotonic()
|
||
sig = f"{acc_id}|{cid}|{mtype}|{msg}"
|
||
last = self._inbound_log_seen.get(sig, 0.0)
|
||
if (now_mono - last) < 2.0:
|
||
return
|
||
self._inbound_log_seen[sig] = now_mono
|
||
self._prune_seen(self._inbound_log_seen, now_mono, ttl_sec=8.0)
|
||
_chat_log(
|
||
cid,
|
||
msg,
|
||
"in",
|
||
customer_name=self.to_chinese(data.get("from_name", "") or data.get("cy_name", "")),
|
||
acc_id=acc_id,
|
||
platform=data.get("acc_type", ""),
|
||
msg_type=mtype,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
def _log_outbound_once(self, original_msg: dict, reply_content: str):
|
||
"""统一记录出站消息,短窗口去重,避免重复写库。"""
|
||
try:
|
||
cid = original_msg.get("from_id", "")
|
||
if not cid or not reply_content:
|
||
return
|
||
acc_id = original_msg.get("acc_id", "")
|
||
now_mono = time.monotonic()
|
||
sig = f"{acc_id}|{cid}|{reply_content}"
|
||
last = self._outbound_log_seen.get(sig, 0.0)
|
||
if (now_mono - last) < 2.0:
|
||
return
|
||
self._outbound_log_seen[sig] = now_mono
|
||
self._prune_seen(self._outbound_log_seen, now_mono, ttl_sec=8.0)
|
||
_chat_log(
|
||
cid,
|
||
reply_content,
|
||
"out",
|
||
customer_name=self.to_chinese(original_msg.get("from_name", "") or original_msg.get("cy_name", "")),
|
||
acc_id=acc_id,
|
||
platform=original_msg.get("acc_type", ""),
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
async def receive_messages(self):
|
||
"""持续接收消息"""
|
||
try:
|
||
async for message in self.websocket:
|
||
await self.handle_message(message)
|
||
|
||
except websockets.exceptions.ConnectionClosed:
|
||
from utils.health_check import set_qingjian_connected
|
||
set_qingjian_connected(False)
|
||
logger.info(f"[{self.get_time()}] 连接已关闭")
|
||
except Exception as e:
|
||
from utils.health_check import set_qingjian_connected
|
||
set_qingjian_connected(False)
|
||
logger.info(f"[{self.get_time()}] 接收消息错误: {e}")
|
||
|
||
async def handle_message(self, message):
|
||
"""处理接收到的消息"""
|
||
try:
|
||
data = json.loads(message)
|
||
|
||
# 多进程分片检查:确保同一客户只由一个 worker 处理
|
||
customer_key = self._customer_key(data)
|
||
if not self._is_owned_by_this_worker(customer_key):
|
||
return
|
||
|
||
timestamp = self.get_time()
|
||
|
||
# 保存最后一条消息用于回复
|
||
self.last_msg = data
|
||
|
||
# 打印格式化的消息
|
||
logger.info(f"\n{'='*50}")
|
||
logger.info(f"[{timestamp}] 收到新消息:")
|
||
logger.info(f"{'='*50}")
|
||
logger.info(f" 消息ID: {data.get('msg_id', 'N/A')}")
|
||
logger.info(f" 账号ID: {self.to_chinese(data.get('acc_id', 'N/A'))}")
|
||
logger.info(f" 发送者ID: {self.to_chinese(data.get('from_id', 'N/A'))}")
|
||
logger.info(f" 发送者名称: {self.to_chinese(data.get('from_name', 'N/A'))}")
|
||
logger.info(f" 会话ID: {self.to_chinese(data.get('cy_id', 'N/A'))}")
|
||
logger.info(f" 平台类型: {data.get('acc_type', 'N/A')}")
|
||
logger.info(f" 消息类型: {self.get_msg_type_name(data.get('msg_type', 0))}")
|
||
logger.info(f" 消息内容: {self.to_chinese(data.get('msg', 'N/A'))}")
|
||
|
||
# 显示商品信息(如果有)
|
||
if data.get('goods_name'):
|
||
logger.info(f" 商品名称: {self.to_chinese(data.get('goods_name', ''))}")
|
||
if data.get('goods_order'):
|
||
logger.info(f" 订单信息: {self.to_chinese(data.get('goods_order', ''))}")
|
||
|
||
logger.info(f"{'='*50}\n")
|
||
|
||
# 消息去重:同一条消息不重复处理
|
||
msg_id = data.get('msg_id', '')
|
||
if msg_id and msg_id in self._replied_msg_ids:
|
||
logger.info(f"重复消息,跳过: {msg_id}")
|
||
return
|
||
if msg_id:
|
||
self._replied_msg_ids.append(msg_id) # deque 自动淘汰最旧的
|
||
|
||
# 空消息/无效消息过滤(N/A 或关键字段全为空)
|
||
from_id = data.get('from_id', '')
|
||
acc_id = data.get('acc_id', '')
|
||
msg_body = data.get('msg', '')
|
||
if not from_id or from_id == 'N/A' or not acc_id or acc_id == 'N/A':
|
||
logger.info(f"[{self.get_time()}] 空消息跳过(from_id={from_id!r} acc_id={acc_id!r})")
|
||
return
|
||
self._log_inbound_once(data)
|
||
self._fire_and_forget(self._post_tianwang_callback("message_received", data))
|
||
|
||
# Gemini 店铺:不回复,直接跳过
|
||
goods_name = self.to_chinese(data.get('goods_name', '') or '')
|
||
if _get_shop_type(acc_id, goods_name) == "gemini_api":
|
||
logger.info(f"[{self.get_time()}] Gemini 店铺消息,跳过")
|
||
try:
|
||
from utils.wechat_chat_log import push_chat_to_wechat
|
||
asyncio.create_task(push_chat_to_wechat(
|
||
customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')),
|
||
customer_id=data.get('from_id', ''),
|
||
acc_id=data.get('acc_id', ''),
|
||
customer_msg=self.to_chinese(data.get('msg', '')),
|
||
reply_msg="",
|
||
goods_name=goods_name,
|
||
))
|
||
except Exception:
|
||
pass
|
||
return
|
||
|
||
# 使用 Agent 自动回复(仅处理文本消息)
|
||
if self.enable_agent:
|
||
msg_type = data.get('msg_type', 0)
|
||
if msg_type == 0:
|
||
if self._is_transfer_msg(data):
|
||
# 会话转交 → 主动打招呼
|
||
logger.info(f"[{self.get_time()}] 收到转交消息,发送问候")
|
||
greeting = self._pick_transfer_greeting()
|
||
await self.send_reply(data, greeting)
|
||
try:
|
||
from utils.wechat_chat_log import push_chat_to_wechat
|
||
asyncio.create_task(push_chat_to_wechat(
|
||
customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')),
|
||
customer_id=data.get('from_id', ''),
|
||
acc_id=data.get('acc_id', ''),
|
||
customer_msg=self.to_chinese(data.get('msg', '')),
|
||
reply_msg=greeting,
|
||
goods_name=self.to_chinese(data.get('goods_name', '') or ''),
|
||
))
|
||
except Exception:
|
||
pass
|
||
elif self._is_shop_card(data):
|
||
# 进店卡片:有历史对话就不回复,没有才打招呼(Gemini 已在上面统一跳过)
|
||
cid = data.get('from_id', '')
|
||
acc_id = data.get('acc_id', '')
|
||
residual_text = self._extract_customer_text_from_shop_card_msg(data.get('msg', ''))
|
||
if residual_text:
|
||
logger.info(f"[{self.get_time()}] 进店卡片携带客户文本,转普通消息处理: {residual_text}")
|
||
patched = dict(data)
|
||
patched['msg'] = residual_text
|
||
await self._debounce_agent_reply(patched)
|
||
elif self._has_chat_history(cid, acc_id=acc_id):
|
||
logger.info(f"[{self.get_time()}] 进店卡片(已有记录),跳过")
|
||
else:
|
||
logger.info(f"[{self.get_time()}] 进店卡片(新客户),发送问候")
|
||
greeting = "在呢,发图来我看看"
|
||
await self.send_reply(data, greeting)
|
||
try:
|
||
from utils.wechat_chat_log import push_chat_to_wechat
|
||
asyncio.create_task(push_chat_to_wechat(
|
||
customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')),
|
||
customer_id=data.get('from_id', ''),
|
||
acc_id=data.get('acc_id', ''),
|
||
customer_msg=self.to_chinese(data.get('msg', '')),
|
||
reply_msg=greeting,
|
||
goods_name=goods_name,
|
||
))
|
||
except Exception:
|
||
pass
|
||
elif await self._handle_system_inquiry(data):
|
||
logger.info(f"[{self.get_time()}] 系统客服询单消息,已按规则处理")
|
||
elif self._should_ignore(data):
|
||
logger.info(f"[{self.get_time()}] 系统通知,跳过回复")
|
||
else:
|
||
await self._debounce_agent_reply(data)
|
||
elif msg_type == 1:
|
||
# 图片消息直接处理,不走防抖(图片不会连续多发)
|
||
await self.handle_image_message(data)
|
||
|
||
except json.JSONDecodeError:
|
||
logger.info(f"[{timestamp}] 收到非JSON消息: {message}")
|
||
|
||
async def _debounce_agent_reply(self, data: dict):
|
||
"""
|
||
消息防抖:同一客户在 _DEBOUNCE_SECONDS 内的连续消息合并后再处理。
|
||
订单通知、付款相关消息不走防抖,立即处理。
|
||
"""
|
||
msg_body = data.get('msg', '')
|
||
key = f"{data.get('acc_id','')}:{data.get('from_id','')}"
|
||
self._cancel_auto_quote_task(key, reason="new_inbound")
|
||
# 以下情况跳过防抖,立即处理(后台执行,不阻塞接收循环)
|
||
immediate_keywords = ["买家已付款", "已付款", "[系统订单信息]"]
|
||
if any(kw in msg_body for kw in immediate_keywords):
|
||
self._activity_log(
|
||
"debounce_bypass_immediate",
|
||
acc_id=data.get("acc_id", ""),
|
||
customer_id=data.get("from_id", ""),
|
||
reason="payment_or_order",
|
||
msg=msg_body,
|
||
)
|
||
self._fire_and_forget(self._agent_reply_serialized(data))
|
||
return
|
||
|
||
# 积攒消息
|
||
if key not in self._pending_msgs:
|
||
self._pending_msgs[key] = []
|
||
self._pending_msgs[key].append(msg_body)
|
||
self._activity_log(
|
||
"debounce_enqueue",
|
||
key=key,
|
||
queue_size=len(self._pending_msgs[key]),
|
||
msg=msg_body,
|
||
)
|
||
|
||
# 取消上一个等待任务(如果有)
|
||
old_task = self._debounce_tasks.get(key)
|
||
if old_task and not old_task.done():
|
||
old_task.cancel()
|
||
|
||
debounce_seconds = self._pick_debounce_seconds(data, msg_body)
|
||
|
||
# 创建新的延迟处理任务
|
||
async def _delayed(capture_key, capture_data, wait_s: float):
|
||
await asyncio.sleep(wait_s)
|
||
msgs = self._pending_msgs.pop(capture_key, [])
|
||
if not msgs:
|
||
return
|
||
if len(msgs) == 1:
|
||
merged_msg = msgs[0]
|
||
else:
|
||
merged_msg = "、".join(m for m in msgs if m.strip())
|
||
logger.info(f"[{self.get_time()}] 防抖合并 {len(msgs)} 条消息: {merged_msg[:60]}")
|
||
self._activity_log(
|
||
"debounce_flush",
|
||
key=capture_key,
|
||
merged_count=len(msgs),
|
||
merged_msg=merged_msg,
|
||
)
|
||
merged_data = dict(capture_data)
|
||
merged_data['msg'] = merged_msg
|
||
await self._agent_reply_serialized(merged_data)
|
||
|
||
task = asyncio.create_task(_delayed(key, data, debounce_seconds))
|
||
self._debounce_tasks[key] = task
|
||
|
||
@staticmethod
|
||
def _rand_between(low: float, high: float) -> float:
|
||
if high <= low:
|
||
return float(low)
|
||
# 使用 secrets 增强随机性,避免固定周期导致机械感
|
||
span = high - low
|
||
return round(low + span * (secrets.randbelow(1000) / 1000.0), 2)
|
||
|
||
def _guess_intent_for_debounce(self, msg: str) -> str:
|
||
text = (msg or "").strip()
|
||
if not text:
|
||
return "unknown"
|
||
if self._msg_has_image_url(text):
|
||
return "image"
|
||
try:
|
||
from utils.intent_analyzer import detect_intent
|
||
decision = detect_intent(text)
|
||
intent = decision.intent
|
||
if intent:
|
||
self._activity_log(
|
||
"debounce_intent_detected",
|
||
intent=intent,
|
||
source=decision.source,
|
||
score=round(float(decision.score or 0.0), 4),
|
||
msg=text[:120],
|
||
)
|
||
except Exception:
|
||
intent = ""
|
||
if intent:
|
||
return intent
|
||
lower = text.lower()
|
||
if any(k in lower for k in ["报价", "多少钱", "价格", "贵", "优惠", "收费", "怎么收费", "咋收费"]):
|
||
return "询价"
|
||
if any(k in lower for k in ["做一下", "改一下", "需求", "门头", "上面的字", "处理"]):
|
||
return "修改"
|
||
if any(k in lower for k in ["在吗", "你好", "有人"]):
|
||
return "打招呼"
|
||
return "unknown"
|
||
|
||
@staticmethod
|
||
def _looks_like_requirement_text(msg: str) -> bool:
|
||
text = (msg or "").strip().lower()
|
||
if not text:
|
||
return False
|
||
req_kw = (
|
||
"做一下", "改一下", "处理一下", "这个字", "上面的字", "门头", "去背景", "抠图",
|
||
"换色", "调色", "清晰", "高清", "尺寸", "比例", "横版", "竖版", "排版", "改字",
|
||
"按这个做", "照这个做", "就这张", "看看做", "弄一下",
|
||
)
|
||
return any(k in text for k in req_kw)
|
||
|
||
def _pick_debounce_seconds(self, data: dict, msg: str) -> float:
|
||
"""意图驱动防抖:不同意图不同等待区间,并引入轻微随机。"""
|
||
base = max(1.0, float(self._DEBOUNCE_SECONDS))
|
||
if not self._adaptive_debounce_enabled:
|
||
return base
|
||
|
||
intent = self._guess_intent_for_debounce(msg)
|
||
is_req = self._looks_like_requirement_text(msg)
|
||
has_img = self._msg_has_image_url(msg)
|
||
# 区间策略:越明确、越短消息,等待越短;需求描述类稍长
|
||
if intent == "打招呼":
|
||
low, high = 1.0, min(3.0, base)
|
||
elif intent in ("询价", "砍价"):
|
||
# 询价先略等一会,给客户补发图片/需求的窗口,减少机械两连回
|
||
low, high = 4.0, min(7.0, max(base, 7.0))
|
||
elif intent == "image":
|
||
# 文本里直接贴图链接:短等合并上下文,避免和上一条询价并发
|
||
low, high = 2.2, 4.2
|
||
elif intent in ("修改", "批量"):
|
||
low, high = max(3.0, base * 0.65), min(18.0, base + 2.0)
|
||
elif intent == "转接":
|
||
low, high = 1.0, 2.5
|
||
else:
|
||
low, high = max(2.0, base * 0.5), base
|
||
|
||
# 发图后的需求描述,优先“多等一点”收集完整需求,减少半句回复
|
||
# 约束到 12-14s,避免等待过长。
|
||
if is_req and not has_img:
|
||
low = max(low, 12.0)
|
||
high = min(14.0, max(high, 12.6))
|
||
|
||
# 短句更快,长句稍慢,避免把连续半句拆开
|
||
text_len = len((msg or "").strip())
|
||
if text_len <= 4:
|
||
high = min(high, max(low + 0.2, 2.5))
|
||
elif text_len >= 18:
|
||
low = min(high, low + 0.6)
|
||
|
||
wait_s = self._rand_between(low, high)
|
||
logger.info(f"防抖等待 {wait_s}s | intent={intent} | len={text_len}")
|
||
return wait_s
|
||
|
||
def _msg_has_image_url(self, msg: str) -> bool:
|
||
"""判断文本消息里是否包含图片URL(客户粘贴了图片链接,可能带前缀文字如 有吗#*#https://...)"""
|
||
if not msg:
|
||
return False
|
||
lower = msg.lower()
|
||
image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp")
|
||
image_hosts = ("alicdn.com", "imgextra", "taobao.com", "jd.com", "pinduoduo.com")
|
||
if "http://" in lower or "https://" in lower:
|
||
if any(ext in lower for ext in image_exts) or any(h in lower for h in image_hosts):
|
||
return True
|
||
return False
|
||
|
||
def _msg_refers_images(self, msg: str) -> bool:
|
||
"""判断文本是否指代之前的图片(图一/图二/这张/那张/上面那张等)"""
|
||
if not msg:
|
||
return False
|
||
refs = (
|
||
"图一", "图二", "第一张", "第二张",
|
||
"这张", "那张", "这图", "那个图",
|
||
"这个", "这个呢",
|
||
"上面那张", "下面那张", "刚才那张", "上一张", "下一张",
|
||
)
|
||
return any(r in msg for r in refs)
|
||
|
||
def _extract_image_urls(self, msg: str) -> list:
|
||
if not msg:
|
||
return []
|
||
parts = [p.strip() for p in msg.split("#*#") if p.strip()]
|
||
urls = []
|
||
for p in parts:
|
||
if p.startswith("http://") or p.startswith("https://"):
|
||
urls.append(p)
|
||
if not urls and ("http://" in msg or "https://" in msg):
|
||
tokens = re.findall(r'(https?://\S+)', msg)
|
||
for t in tokens:
|
||
if any(ext in t.lower() for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"]):
|
||
urls.append(t)
|
||
return urls[:8]
|
||
|
||
def _collect_recent_image_urls(self, customer_id: str, acc_id: str, max_count: int = 6) -> list:
|
||
"""从最近对话中回溯收集图片URL(优先买家消息),用于慢发或引用图片的场景"""
|
||
urls, seen = [], set()
|
||
try:
|
||
from db.chat_log_db import get_recent_conversation
|
||
recent = get_recent_conversation(customer_id=customer_id, acc_id=acc_id, limit=20)
|
||
# 从最近到更早遍历,收集买家(in)消息中的图片链接
|
||
for m in reversed(recent):
|
||
if m.get("direction") != "in":
|
||
continue
|
||
ms = m.get("message") or ""
|
||
us = self._extract_image_urls(ms)
|
||
for u in us:
|
||
if u not in seen:
|
||
seen.add(u)
|
||
urls.append(u)
|
||
if len(urls) >= max_count:
|
||
return urls
|
||
except Exception:
|
||
pass
|
||
return urls
|
||
|
||
def _msg_is_requirement(self, msg: str) -> bool:
|
||
if not msg:
|
||
return False
|
||
kws = (
|
||
"要", "抓到", "放到", "合成", "替换", "抠", "修", "高清", "尺寸", "横", "竖", "颜色", "去背景", "排版", "一样", "类似", "同款",
|
||
"能不能做", "能做吗", "可以做吗", "做不做", "这个能做吗", "这个能不能做",
|
||
)
|
||
return any(k in msg for k in kws)
|
||
|
||
def _add_pending_images(self, key: str, urls: list, limit: int = 12):
|
||
if not urls:
|
||
return
|
||
cur = self._pending_images.get(key) or []
|
||
for u in urls:
|
||
if u not in cur:
|
||
cur.append(u)
|
||
if len(cur) >= limit:
|
||
break
|
||
self._pending_images[key] = cur
|
||
|
||
async def _flush_pending_images(self, key: str, data: dict):
|
||
urls = self._pending_images.get(key) or []
|
||
if not urls:
|
||
return
|
||
self._pending_images[key] = []
|
||
if len(urls) == 1:
|
||
await self._analyze_single_and_reply(data, urls[0])
|
||
else:
|
||
await self._analyze_multi_and_reply(data, urls)
|
||
|
||
def _msg_is_price_inquiry(self, msg: str) -> bool:
|
||
"""判断是否是价格询问"""
|
||
if not msg:
|
||
return False
|
||
patterns = ("多少钱", "多少一张", "一张多少钱", "画图多少", "报价", "给个价", "几块", "多少钱")
|
||
return any(p in msg for p in patterns)
|
||
|
||
def _detect_order_status(self, msg: str) -> str:
|
||
if not msg:
|
||
return ""
|
||
s = msg
|
||
if "买家已付款" in s or "已付款" in s:
|
||
return "paid"
|
||
if "[系统订单信息]" in s:
|
||
if "等待买家付款" in s or "未付款" in s:
|
||
return "waiting"
|
||
return "order"
|
||
return ""
|
||
|
||
async def _analyze_single_and_reply(self, data: dict, url: str):
|
||
try:
|
||
from image.image_analyzer import image_analyzer
|
||
r = await image_analyzer.analyze(url)
|
||
if isinstance(r, dict) and r.get("success", False):
|
||
if r.get("feasibility") == "no" or r.get("risk") == "high":
|
||
note = str(r.get("note", "") or "")
|
||
if "文字内容过于密集" in note or "密集文字" in note:
|
||
reply = "这类文字太密的图我们这边不接单,抱歉哈。你要是简化后再发我可以继续看。"
|
||
else:
|
||
reply = "这张处理风险比较高,我这边先不直接接,建议转人工评估更稳。"
|
||
await self.send_reply(data, reply)
|
||
return
|
||
from config.config import MIN_PRICE_FLOOR
|
||
p = r.get("price_suggest", 20)
|
||
floor_dyn = r.get("price_min", MIN_PRICE_FLOOR)
|
||
floor = max(MIN_PRICE_FLOOR, int(floor_dyn) if isinstance(floor_dyn, (int, float)) else MIN_PRICE_FLOOR)
|
||
p = max(floor, round(p / 5) * 5)
|
||
try:
|
||
from db.customer_db import db as _db
|
||
_db.update_last_min_price(data.get('from_id',''), floor)
|
||
except Exception:
|
||
pass
|
||
reply = f"这张按{p}元,满意再拍"
|
||
else:
|
||
# 识别失败时不做兜底报价,避免把未识别图片误判为可做
|
||
reply = "这张我这边暂时识别不稳定,先不乱报价。你可以换一张更清晰的,我再给你准报价。"
|
||
await self.send_reply(data, reply)
|
||
except Exception:
|
||
pass
|
||
|
||
async def agent_reply(self, data: dict):
|
||
"""使用 Agent 处理消息并回复"""
|
||
try:
|
||
msg_text = self.to_chinese(data.get('msg', ''))
|
||
_cid = data.get('from_id', '')
|
||
trace_id = build_trace_id(data.get("acc_id", ""), _cid, data.get("msg_id", ""), msg_text[:64])
|
||
data["_trace_id"] = trace_id
|
||
_name = self.to_chinese(data.get('from_name', '') or data.get('cy_name', ''))
|
||
_plat = data.get('acc_type', '')
|
||
_shop_type = _get_shop_type(data.get('acc_id', ''), self.to_chinese(data.get('goods_name', '') or ''))
|
||
|
||
# 超大尺寸(米制)直接拒单,避免进入报价/处理流程
|
||
oversize_reply = self._oversize_reply_if_needed(msg_text)
|
||
if oversize_reply:
|
||
await self.send_reply(data, oversize_reply)
|
||
return
|
||
|
||
# 找图/修图店铺:统一走 Agent 的“收集需求后统一报价”流程,避免按单图快速报价
|
||
if self._legacy_fast_quote_enabled and _shop_type != "find_image":
|
||
# 消息含图片URL:累积到待处理列表,先询问要求
|
||
if self._msg_has_image_url(msg_text):
|
||
urls = self._extract_image_urls(msg_text)
|
||
key = self._customer_key(data)
|
||
self._add_pending_images(key, urls)
|
||
await self.send_reply(data, "收到,我看看哈")
|
||
old = self._pending_image_tasks.get(key)
|
||
if old and not old.done():
|
||
old.cancel()
|
||
async def _delay_flush(capture_key, capture_data):
|
||
await asyncio.sleep(self._DEBOUNCE_SECONDS + 4)
|
||
# 与同客户 agent_reply 串行,避免“延迟报价”和“当前追问”并发打架
|
||
async with self._get_customer_lock(capture_key):
|
||
await self._flush_pending_images(capture_key, capture_data)
|
||
task = asyncio.create_task(_delay_flush(key, data))
|
||
self._pending_image_tasks[key] = task
|
||
return
|
||
elif self._msg_refers_images(msg_text):
|
||
urls = self._collect_recent_image_urls(_cid, data.get('acc_id', ''), max_count=6)
|
||
if urls:
|
||
key = self._customer_key(data)
|
||
self._add_pending_images(key, urls)
|
||
await self.send_reply(data, "稍等,我找找刚才那几张")
|
||
await self._flush_pending_images(key, data)
|
||
return
|
||
else:
|
||
status = self._detect_order_status(msg_text)
|
||
if status == "paid":
|
||
ack = "收到付款,我马上安排处理,有需要第一时间联系您"
|
||
await self.send_reply(data, ack)
|
||
return
|
||
elif status in ("waiting", "order"):
|
||
ack = "订单我看到了哈,方便的话请完成付款,我好安排处理"
|
||
await self.send_reply(data, ack)
|
||
return
|
||
else:
|
||
urls = self._extract_image_urls(msg_text)
|
||
if len(urls) == 1:
|
||
key = self._customer_key(data)
|
||
self._add_pending_images(key, urls)
|
||
await self.send_reply(data, "收到,我看看哈")
|
||
return
|
||
else:
|
||
if self._msg_requests_external_contact(msg_text):
|
||
reply = "这里沟通就可以哦,其他联系方式不方便"
|
||
await self.send_reply(data, reply)
|
||
try:
|
||
from utils.wechat_chat_log import push_chat_to_wechat
|
||
asyncio.create_task(push_chat_to_wechat(
|
||
customer_name=_name,
|
||
customer_id=_cid,
|
||
acc_id=data.get('acc_id', ''),
|
||
customer_msg=msg_text,
|
||
reply_msg=reply,
|
||
goods_name=self.to_chinese(data.get('goods_name', '') or ''),
|
||
))
|
||
except Exception:
|
||
pass
|
||
return
|
||
if self._msg_is_requirement(msg_text) or self._msg_is_price_inquiry(msg_text):
|
||
key = self._customer_key(data)
|
||
if self._pending_images.get(key):
|
||
old = self._pending_image_tasks.get(key)
|
||
if old and not old.done():
|
||
old.cancel()
|
||
await self.send_reply(data, "稍等,我把刚才那几张一起看下")
|
||
await self._flush_pending_images(key, data)
|
||
return
|
||
if self._msg_is_price_inquiry(msg_text):
|
||
recent_urls = self._collect_recent_image_urls(_cid, data.get('acc_id', ''), max_count=6)
|
||
if recent_urls:
|
||
await self.send_reply(data, "稍等,我刚才那几张一起看下")
|
||
if len(recent_urls) == 1:
|
||
asyncio.create_task(self._analyze_single_and_reply(data, recent_urls[0]))
|
||
else:
|
||
asyncio.create_task(self._analyze_multi_and_reply(data, recent_urls))
|
||
return
|
||
status = self._detect_order_status(msg_text)
|
||
if status == "paid":
|
||
ack = "收到付款,我马上安排处理,有需要第一时间联系您"
|
||
await self.send_reply(data, ack)
|
||
return
|
||
elif status in ("waiting", "order"):
|
||
ack = "订单我看到了哈,方便的话请完成付款,我好安排处理"
|
||
await self.send_reply(data, ack)
|
||
return
|
||
|
||
# 构建 CustomerMessage
|
||
customer_msg = CustomerMessage(
|
||
msg_id=data.get('msg_id', ''),
|
||
acc_id=data.get('acc_id', ''),
|
||
msg=self.to_chinese(data.get('msg', '')),
|
||
from_id=data.get('from_id', ''),
|
||
from_name=self.to_chinese(data.get('from_name', '')),
|
||
cy_id=data.get('cy_id', ''),
|
||
acc_type=data.get('acc_type', ''),
|
||
msg_type=data.get('msg_type', 0),
|
||
cy_name=self.to_chinese(data.get('cy_name', '')),
|
||
goods_name=self.to_chinese(data.get('goods_name', '')) if data.get('goods_name') else None,
|
||
goods_order=self.to_chinese(data.get('goods_order', '')) if data.get('goods_order') else None
|
||
)
|
||
|
||
# 先检查是否是 workflow 等待确认中的回复(如邮箱、确认/不满意)
|
||
if workflow:
|
||
workflow_reply = await workflow.handle_customer_reply(
|
||
customer_id=data.get('from_id', ''),
|
||
message=self.to_chinese(data.get('msg', '')),
|
||
acc_id=data.get('acc_id', ''),
|
||
acc_type=data.get('acc_type', 'AliWorkbench')
|
||
)
|
||
if workflow_reply:
|
||
logger.info(f"Workflow 回复: {workflow_reply}")
|
||
await self.send_reply(data, workflow_reply)
|
||
# 推送到企微:客户消息+回复成对
|
||
try:
|
||
from utils.wechat_chat_log import push_chat_to_wechat
|
||
asyncio.create_task(push_chat_to_wechat(
|
||
customer_name=_name,
|
||
customer_id=_cid,
|
||
acc_id=data.get('acc_id', ''),
|
||
customer_msg=msg_text,
|
||
reply_msg=workflow_reply,
|
||
goods_name=self.to_chinese(data.get('goods_name', '') or ''),
|
||
))
|
||
except Exception:
|
||
pass
|
||
return
|
||
|
||
logger.info("Agent 正在处理消息...")
|
||
self._activity_log(
|
||
"agent_process_start",
|
||
trace_id=trace_id,
|
||
acc_id=data.get("acc_id", ""),
|
||
customer_id=data.get("from_id", ""),
|
||
msg=msg_text,
|
||
)
|
||
|
||
# 调用 Agent
|
||
_t0 = time.monotonic()
|
||
response = await self.agent.process_message(customer_msg)
|
||
self._activity_log(
|
||
"agent_process_done",
|
||
trace_id=trace_id,
|
||
acc_id=data.get("acc_id", ""),
|
||
customer_id=data.get("from_id", ""),
|
||
result="ok",
|
||
latency_ms=int((time.monotonic() - _t0) * 1000),
|
||
should_reply=bool(response.should_reply),
|
||
need_transfer=bool(response.need_transfer),
|
||
)
|
||
|
||
# 检查是否需要转接人工
|
||
if response.need_transfer:
|
||
logger.info("Agent 决定转接人工")
|
||
self._activity_log(
|
||
"agent_transfer",
|
||
trace_id=trace_id,
|
||
acc_id=data.get("acc_id", ""),
|
||
customer_id=data.get("from_id", ""),
|
||
transfer_msg=response.transfer_msg,
|
||
)
|
||
self._fire_and_forget(self._post_tianwang_callback(
|
||
"message_processed",
|
||
data,
|
||
extra={
|
||
"should_reply": bool(response.should_reply),
|
||
"need_transfer": True,
|
||
"agent_reply": response.reply or "",
|
||
"transfer_msg": response.transfer_msg or "",
|
||
},
|
||
))
|
||
await self.transfer_to_human(data, response.transfer_msg)
|
||
# 推送到企微:客户消息+转接回复成对
|
||
try:
|
||
from utils.wechat_chat_log import push_chat_to_wechat
|
||
asyncio.create_task(push_chat_to_wechat(
|
||
customer_name=_name,
|
||
customer_id=_cid,
|
||
acc_id=data.get('acc_id', ''),
|
||
customer_msg=msg_text,
|
||
reply_msg=response.transfer_msg or "转接",
|
||
goods_name=self.to_chinese(data.get('goods_name', '') or ''),
|
||
))
|
||
except Exception:
|
||
pass
|
||
|
||
# 联系方式提取已由 Agent 的 update_contact_info 工具负责
|
||
# 此处仅做兜底:更新最后联系时间
|
||
customer_id = data.get('from_id', '')
|
||
if customer_id:
|
||
try:
|
||
profile = db.get_customer(customer_id)
|
||
profile.last_contact = datetime.now().isoformat()
|
||
db.save_customer(profile)
|
||
except Exception:
|
||
pass
|
||
|
||
# 保存对话摘要(异步,不阻塞回复)
|
||
if response.should_reply and response.reply and customer_id:
|
||
asyncio.create_task(self._save_conversation_summary(
|
||
customer_id=customer_id,
|
||
buyer_msg=self.to_chinese(data.get('msg', '')),
|
||
agent_reply=response.reply,
|
||
))
|
||
|
||
# 正常回复
|
||
if response.should_reply and response.reply:
|
||
# 过滤 AI 误输出的"无需回复"类废话,避免发给客户
|
||
nonsense_patterns = [
|
||
"无需", "流程已完成", "不需要回复", "无需额外", "已完成",
|
||
"无需回复", "不需要额外", "已经完成", "无需再", "操作已完成",
|
||
"任务完成", "流程完成", "记录完成", "报价已",
|
||
]
|
||
matched = [p for p in nonsense_patterns if p in response.reply]
|
||
if matched:
|
||
logger.warning(f"Agent 回复含无效内容,已拦截: {response.reply} ← 命中pattern: {matched}")
|
||
else:
|
||
# 模拟真人打字延迟,避免瞬间回复太机械
|
||
await asyncio.sleep(0.8)
|
||
logger.info(f"Agent 回复: {response.reply}")
|
||
self._activity_log(
|
||
"agent_reply",
|
||
trace_id=trace_id,
|
||
acc_id=data.get("acc_id", ""),
|
||
customer_id=data.get("from_id", ""),
|
||
reply=response.reply,
|
||
)
|
||
await self.send_reply(data, response.reply)
|
||
await self._maybe_schedule_auto_quote(data)
|
||
self._fire_and_forget(self._post_tianwang_callback(
|
||
"message_processed",
|
||
data,
|
||
extra={
|
||
"should_reply": True,
|
||
"need_transfer": bool(response.need_transfer),
|
||
"agent_reply": response.reply,
|
||
},
|
||
))
|
||
# 推送到企微:客户消息+AI回复成对
|
||
try:
|
||
from utils.wechat_chat_log import push_chat_to_wechat
|
||
asyncio.create_task(push_chat_to_wechat(
|
||
customer_name=_name,
|
||
customer_id=_cid,
|
||
acc_id=data.get('acc_id', ''),
|
||
customer_msg=msg_text,
|
||
reply_msg=response.reply,
|
||
goods_name=self.to_chinese(data.get('goods_name', '') or ''),
|
||
))
|
||
except Exception:
|
||
pass
|
||
elif not response.need_transfer:
|
||
logger.info("Agent 决定不回复此消息")
|
||
self._activity_log(
|
||
"agent_no_reply",
|
||
trace_id=trace_id,
|
||
acc_id=data.get("acc_id", ""),
|
||
customer_id=data.get("from_id", ""),
|
||
)
|
||
self._fire_and_forget(self._post_tianwang_callback(
|
||
"message_processed",
|
||
data,
|
||
extra={
|
||
"should_reply": False,
|
||
"need_transfer": False,
|
||
"agent_reply": "",
|
||
},
|
||
))
|
||
|
||
except Exception as e:
|
||
logger.error(f"Agent 处理失败: {e}")
|
||
self._activity_log(
|
||
"agent_process_error",
|
||
trace_id=data.get("_trace_id", ""),
|
||
acc_id=data.get("acc_id", ""),
|
||
customer_id=data.get("from_id", ""),
|
||
error=str(e),
|
||
)
|
||
|
||
def _cancel_auto_quote_task(self, key: str, reason: str = ""):
|
||
task = self._auto_quote_tasks.get(key)
|
||
if task and not task.done():
|
||
task.cancel()
|
||
self._activity_log("auto_quote_cancel", key=key, reason=reason or "unknown")
|
||
|
||
@staticmethod
|
||
def _build_auto_quote_signature(state: Any) -> str:
|
||
"""为待报价内容生成稳定签名,用于避免同一批内容反复自动触发。"""
|
||
urls = list(getattr(state, "pending_image_urls", []) or [])
|
||
reqs = list(getattr(state, "pending_requirements", []) or [])
|
||
req_tail = reqs[-6:] if len(reqs) > 6 else reqs
|
||
return "||".join(urls) + "##" + "||".join(req_tail)
|
||
|
||
async def _maybe_schedule_auto_quote(self, data: dict):
|
||
"""
|
||
智能兜底:客户发图后若长时间不再补充消息,自动触发一次报价,避免会话卡住。
|
||
"""
|
||
if not self.enable_agent or not self.agent:
|
||
return
|
||
try:
|
||
shop_type = _get_shop_type(data.get('acc_id', ''), self.to_chinese(data.get('goods_name', '') or ''))
|
||
if shop_type != "find_image":
|
||
return
|
||
cid = data.get('from_id', '')
|
||
key = self._customer_key(data)
|
||
state = self.agent._get_conversation_state(cid)
|
||
if not state or not getattr(state, "pending_image_urls", None):
|
||
self._cancel_auto_quote_task(key, reason="no_pending_images")
|
||
self._auto_quote_done_sig.pop(key, None)
|
||
return
|
||
if state.quote_phase not in {"collecting", "waiting_result"}:
|
||
return
|
||
current_sig = self._build_auto_quote_signature(state)
|
||
if current_sig and self._auto_quote_done_sig.get(key) == current_sig:
|
||
self._activity_log(
|
||
"auto_quote_skip_duplicate",
|
||
key=key,
|
||
pending_count=len(state.pending_image_urls),
|
||
)
|
||
return
|
||
try:
|
||
idle_seconds = max(8, int(os.getenv("AUTO_QUOTE_IDLE_SECONDS", "18")))
|
||
except Exception:
|
||
idle_seconds = 18
|
||
|
||
self._cancel_auto_quote_task(key, reason="reschedule")
|
||
|
||
async def _delayed_auto_quote(capture_key: str, capture_data: dict, wait_s: int, capture_sig: str):
|
||
await asyncio.sleep(wait_s)
|
||
async with self._get_customer_lock(capture_key):
|
||
capture_cid = capture_data.get('from_id', '')
|
||
st = self.agent._get_conversation_state(capture_cid)
|
||
if not st or not st.pending_image_urls:
|
||
self._auto_quote_done_sig.pop(capture_key, None)
|
||
return
|
||
# 内容变化时,放弃旧触发(会在新一轮消息后重新调度)。
|
||
if self._build_auto_quote_signature(st) != capture_sig:
|
||
return
|
||
# 标记本批次已自动触发,避免同内容循环“马上报价”。
|
||
self._auto_quote_done_sig[capture_key] = capture_sig
|
||
# 直接置为可报价,然后走“发完了,报价吧”触发既有报价链路
|
||
self.agent._mark_quote_ready(st)
|
||
self.agent._sync_pending_quote_state(capture_cid, st)
|
||
self._activity_log(
|
||
"auto_quote_trigger",
|
||
key=capture_key,
|
||
pending_count=len(st.pending_image_urls),
|
||
wait_s=wait_s,
|
||
)
|
||
notify_msg = CustomerMessage(
|
||
msg_id="auto_quote_idle_trigger",
|
||
acc_id=capture_data.get('acc_id', ''),
|
||
msg="发完了,报价吧",
|
||
from_id=capture_cid,
|
||
from_name=self.to_chinese(capture_data.get('from_name', '') or capture_data.get('cy_name', '')),
|
||
cy_id=capture_data.get('cy_id', ''),
|
||
acc_type=capture_data.get('acc_type', ''),
|
||
msg_type=0,
|
||
cy_name=self.to_chinese(capture_data.get('cy_name', '') or capture_data.get('from_name', '')),
|
||
goods_name=self.to_chinese(capture_data.get('goods_name', '')) if capture_data.get('goods_name') else None,
|
||
goods_order=self.to_chinese(capture_data.get('goods_order', '')) if capture_data.get('goods_order') else None,
|
||
)
|
||
response = await self.agent.process_message(notify_msg)
|
||
if response.should_reply and response.reply and not response.need_transfer:
|
||
await self.send_reply(capture_data, response.reply)
|
||
self._activity_log(
|
||
"auto_quote_sent",
|
||
key=capture_key,
|
||
reply=response.reply,
|
||
)
|
||
|
||
task = asyncio.create_task(_delayed_auto_quote(key, dict(data), idle_seconds, current_sig))
|
||
self._auto_quote_tasks[key] = task
|
||
self._activity_log(
|
||
"auto_quote_scheduled",
|
||
key=key,
|
||
pending_count=len(state.pending_image_urls),
|
||
phase=state.quote_phase,
|
||
wait_s=idle_seconds,
|
||
)
|
||
except Exception as e:
|
||
self._activity_log("auto_quote_schedule_error", error=str(e), key=self._customer_key(data))
|
||
|
||
async def _analyze_multi_and_reply(self, data: dict, urls: list):
|
||
try:
|
||
from image.image_analyzer import image_analyzer
|
||
def _detect_composite_request() -> bool:
|
||
try:
|
||
from db.chat_log_db import get_recent_conversation
|
||
recent = get_recent_conversation(
|
||
customer_id=data.get('from_id', ''),
|
||
acc_id=data.get('acc_id', ''),
|
||
limit=8
|
||
)
|
||
kw = ("抓到", "放到", "合成", "融合", "嵌到", "换到", "替换", "P到", "抠出来放到")
|
||
for m in recent:
|
||
msg = (m.get("message") or "")
|
||
if any(k in msg for k in kw):
|
||
return True
|
||
except Exception:
|
||
pass
|
||
return False
|
||
|
||
tasks = [image_analyzer.analyze(u) for u in urls]
|
||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||
# 先做风险分流:多图中只要出现不可做/高风险,不进入报价
|
||
unsafe = []
|
||
dense_text_reject = []
|
||
for i, r in enumerate(results, 1):
|
||
if isinstance(r, dict) and r.get("success", False):
|
||
if r.get("feasibility") == "no" or r.get("risk") == "high":
|
||
unsafe.append(f"图{i}")
|
||
note = str(r.get("note", "") or "")
|
||
if "文字内容过于密集" in note or "密集文字" in note:
|
||
dense_text_reject.append(f"图{i}")
|
||
|
||
if unsafe:
|
||
if dense_text_reject and len(dense_text_reject) == len(unsafe):
|
||
reply = "这类文字太密的图我们这边不接单,抱歉哈。你要是简化后再发我可以继续看。"
|
||
else:
|
||
reply = f"这批里{'、'.join(unsafe)}处理风险较高,我这边先不直接接,建议转人工评估更稳。"
|
||
await self.send_reply(data, reply)
|
||
return
|
||
|
||
pairs = []
|
||
for u, r in zip(urls, results):
|
||
if isinstance(r, dict) and r.get("success", False):
|
||
from config.config import MIN_PRICE_FLOOR
|
||
floor_dyn = r.get("price_min", MIN_PRICE_FLOOR)
|
||
floor = max(MIN_PRICE_FLOOR, int(floor_dyn) if isinstance(floor_dyn, (int, float)) else MIN_PRICE_FLOOR)
|
||
ps = max(floor, round(r.get("price_suggest", 20) / 5) * 5)
|
||
pairs.append((u, ps, r.get("category", ""), r.get("megapixels", 0.0)))
|
||
try:
|
||
if pairs:
|
||
floors = []
|
||
for u, r in zip(urls, results):
|
||
if isinstance(r, dict) and r.get("success", False):
|
||
floor_dyn = r.get("price_min", MIN_PRICE_FLOOR)
|
||
floor = max(MIN_PRICE_FLOOR, int(floor_dyn) if isinstance(floor_dyn, (int, float)) else MIN_PRICE_FLOOR)
|
||
floors.append(floor)
|
||
if floors:
|
||
from db.customer_db import db as _db
|
||
_db.update_last_min_price(data.get('from_id',''), min(floors))
|
||
except Exception:
|
||
pass
|
||
if not pairs:
|
||
await self.send_reply(data, "这组图我这边暂时识别不稳定,先不乱报价。你可以换清晰图再发我。")
|
||
return
|
||
composite = _detect_composite_request()
|
||
composite_fee = 5 if composite else 0
|
||
avg_raw = sum(p for _, p, _, _ in pairs) / len(pairs)
|
||
from config.config import MIN_PRICE_FLOOR
|
||
avg_price = max(MIN_PRICE_FLOOR, round((avg_raw + composite_fee) / 5) * 5)
|
||
top_price = max(MIN_PRICE_FLOOR, max(pairs, key=lambda x: x[1])[1] + composite_fee)
|
||
count = len(pairs)
|
||
if composite:
|
||
reply = f"这组{count}张我看了,按{avg_price}元一张;合成那张{top_price}元,满意再拍"
|
||
else:
|
||
reply = f"这组{count}张我看了,按{avg_price}元一张;复杂那张{top_price}元,满意再拍"
|
||
await self.send_reply(data, reply)
|
||
except Exception as e:
|
||
logger.error(f"多图分析失败: {e}")
|
||
try:
|
||
await self.send_reply(data, "这组图我这边暂时识别异常,先不乱报价。你可以稍后再发我。")
|
||
except Exception:
|
||
pass
|
||
def _msg_requests_external_contact(self, msg: str) -> bool:
|
||
if not msg:
|
||
return False
|
||
lower = msg.lower()
|
||
kws = ("加qq", "qq号", "vx", "微信", "加v", "联系方式", "私聊", "加一下", "加个", "手机号", "电话", "加群", "q q", "v 信")
|
||
return any(k in lower for k in kws)
|
||
|
||
@staticmethod
|
||
def _extract_size_pairs_m(msg: str) -> list[tuple[float, float]]:
|
||
"""提取消息中的米制尺寸对,如 15*6.4米 / 15米*6.4 / 15x6.4m。"""
|
||
if not msg:
|
||
return []
|
||
s = (msg or "").lower().replace("×", "*").replace("x", "*")
|
||
pairs = []
|
||
patterns = [
|
||
r'(\d+(?:\.\d+)?)\s*\*\s*(\d+(?:\.\d+)?)\s*(?:米|m)\b',
|
||
r'(\d+(?:\.\d+)?)\s*(?:米|m)\s*\*\s*(\d+(?:\.\d+)?)\b',
|
||
]
|
||
for p in patterns:
|
||
for m in re.findall(p, s):
|
||
try:
|
||
a = float(m[0])
|
||
b = float(m[1])
|
||
if a > 0 and b > 0:
|
||
pairs.append((a, b))
|
||
except Exception:
|
||
continue
|
||
return pairs
|
||
|
||
def _oversize_reply_if_needed(self, msg: str) -> str:
|
||
"""
|
||
检测超大尺寸需求并返回拒绝话术;未命中返回空字符串。
|
||
规则:最长边 > 阈值 或 面积 > 阈值。
|
||
"""
|
||
try:
|
||
from config.config import MAX_SERVICE_SIZE_LONGEST_METERS, MAX_SERVICE_SIZE_AREA_SQM
|
||
longest_limit = float(MAX_SERVICE_SIZE_LONGEST_METERS)
|
||
area_limit = float(MAX_SERVICE_SIZE_AREA_SQM)
|
||
except Exception:
|
||
longest_limit = 10.0
|
||
area_limit = 20.0
|
||
|
||
pairs = self._extract_size_pairs_m(msg)
|
||
for w, h in pairs:
|
||
longest = max(w, h)
|
||
area = w * h
|
||
if longest > longest_limit or area > area_limit:
|
||
return (
|
||
f"{w:g}米*{h:g}米这个尺寸太大了,我们这边做不了。"
|
||
"如果要做可以拆成几段小尺寸,我再给你按段评估。"
|
||
)
|
||
return ""
|
||
def _is_transfer_msg(self, data: dict) -> bool:
|
||
"""判断是否是会话转交消息(需要主动打招呼)"""
|
||
msg = self.to_chinese(data.get('msg', ''))
|
||
return '转交给' in msg or '转接给' in msg
|
||
|
||
def _pick_transfer_greeting(self) -> str:
|
||
"""转接后问候话术:简短自然,随机避免机械感。"""
|
||
choices = [
|
||
"在的亲,发图我看下",
|
||
"在呢亲,有需求直接说",
|
||
"我在的,您把要求发我",
|
||
"在的哈,你说我这边看着处理",
|
||
"在呢,图和需求发来我看看",
|
||
]
|
||
return random.choice(choices)
|
||
|
||
def _is_shop_card(self, data: dict) -> bool:
|
||
"""判断是否是进店卡片消息"""
|
||
msg = self.to_chinese(data.get('msg', ''))
|
||
return msg.startswith('[进店卡片]') or '我想咨询你们店的这个商品' in msg
|
||
|
||
def _extract_customer_text_from_shop_card_msg(self, msg: str) -> str:
|
||
"""从“进店卡片+文本”混合消息里提取客户真实文本。"""
|
||
text = self.to_chinese(msg or "").strip()
|
||
if not text:
|
||
return ""
|
||
parts = [p.strip() for p in text.split("#*#") if p and p.strip()]
|
||
kept = []
|
||
for part in parts:
|
||
if part.startswith("[进店卡片]") or "我想咨询你们店的这个商品" in part:
|
||
continue
|
||
kept.append(part)
|
||
if kept:
|
||
return " ".join(kept).strip()
|
||
stripped = re.sub(r"\[进店卡片\][^\n\r]*", "", text).strip()
|
||
stripped = stripped.replace("我想咨询你们店的这个商品", "").strip(",。,#* ")
|
||
return stripped
|
||
|
||
def _has_chat_history(self, customer_id: str, acc_id: str = "") -> bool:
|
||
"""判断该客户在当前店铺是否已有聊天记录。"""
|
||
if not customer_id:
|
||
return False
|
||
# 按店铺+客户查数据库,避免跨店串历史导致错误跳过。
|
||
try:
|
||
from db.chat_log_db import get_recent_conversation
|
||
msgs = get_recent_conversation(customer_id, acc_id=acc_id, limit=1)
|
||
return len(msgs) > 0
|
||
except Exception:
|
||
return False
|
||
|
||
def _load_system_inquiry_rules(self) -> Dict[str, Any]:
|
||
"""加载系统客服询单规则(全局 + 店铺覆盖)。"""
|
||
from config.config import (
|
||
SYSTEM_INQUIRY_ENABLED,
|
||
SYSTEM_INQUIRY_DEFAULT_ACTION,
|
||
SYSTEM_INQUIRY_DEFAULT_REPLY,
|
||
SYSTEM_INQUIRY_RULES_FILE,
|
||
)
|
||
enabled_env = os.getenv("SYSTEM_INQUIRY_ENABLED")
|
||
enabled = (
|
||
enabled_env.lower() in ("1", "true", "yes")
|
||
if isinstance(enabled_env, str)
|
||
else bool(SYSTEM_INQUIRY_ENABLED)
|
||
)
|
||
action = (os.getenv("SYSTEM_INQUIRY_DEFAULT_ACTION") or SYSTEM_INQUIRY_DEFAULT_ACTION or "silent").strip().lower()
|
||
reply = os.getenv("SYSTEM_INQUIRY_DEFAULT_REPLY") or SYSTEM_INQUIRY_DEFAULT_REPLY or ""
|
||
rules_file = os.getenv("SYSTEM_INQUIRY_RULES_FILE") or str(SYSTEM_INQUIRY_RULES_FILE)
|
||
defaults: Dict[str, Any] = {
|
||
"enabled": bool(enabled),
|
||
"default_action": action,
|
||
"default_reply": reply,
|
||
"sender_keywords": ["系统客服", "官方客服", "平台客服", "机器人客服", "商家客服系统"],
|
||
"message_keywords": ["系统询单", "代客咨询", "平台代问", "系统代发", "客服询单"],
|
||
"shops": {},
|
||
}
|
||
try:
|
||
p = Path(rules_file)
|
||
if p.exists():
|
||
with p.open("r", encoding="utf-8") as f:
|
||
loaded = json.load(f)
|
||
if isinstance(loaded, dict):
|
||
defaults.update(loaded)
|
||
except Exception as e:
|
||
logger.warning(f"系统询单规则加载失败,使用默认规则: {e}")
|
||
return defaults
|
||
|
||
@staticmethod
|
||
def _normalize_kw_list(v: Any) -> List[str]:
|
||
if not isinstance(v, list):
|
||
return []
|
||
return [str(x).strip().lower() for x in v if str(x).strip()]
|
||
|
||
def _resolve_system_inquiry_policy(self, acc_id: str) -> Dict[str, Any]:
|
||
"""根据店铺合并系统询单策略。"""
|
||
from config.config import SYSTEM_INQUIRY_SHOPS
|
||
|
||
rules = self._system_inquiry_rules or {}
|
||
if not bool(rules.get("enabled", True)):
|
||
return {"enabled": False}
|
||
|
||
shops_env = os.getenv("SYSTEM_INQUIRY_SHOPS", SYSTEM_INQUIRY_SHOPS or "")
|
||
shop_whitelist = [s.strip() for s in shops_env.split(",") if s.strip()]
|
||
if shop_whitelist and (acc_id or "") not in shop_whitelist:
|
||
return {"enabled": False}
|
||
|
||
policy: Dict[str, Any] = {
|
||
"enabled": True,
|
||
"action": str(rules.get("default_action", "silent")).strip().lower(),
|
||
"reply": str(rules.get("default_reply", "")).strip(),
|
||
"sender_keywords": self._normalize_kw_list(rules.get("sender_keywords")),
|
||
"message_keywords": self._normalize_kw_list(rules.get("message_keywords")),
|
||
}
|
||
shop_cfg = (rules.get("shops") or {}).get(acc_id or "", {})
|
||
if isinstance(shop_cfg, dict):
|
||
if "enabled" in shop_cfg and not bool(shop_cfg.get("enabled", True)):
|
||
return {"enabled": False}
|
||
if shop_cfg.get("action"):
|
||
policy["action"] = str(shop_cfg.get("action")).strip().lower()
|
||
if shop_cfg.get("reply"):
|
||
policy["reply"] = str(shop_cfg.get("reply")).strip()
|
||
if isinstance(shop_cfg.get("sender_keywords"), list):
|
||
policy["sender_keywords"] = self._normalize_kw_list(shop_cfg.get("sender_keywords"))
|
||
if isinstance(shop_cfg.get("message_keywords"), list):
|
||
policy["message_keywords"] = self._normalize_kw_list(shop_cfg.get("message_keywords"))
|
||
if policy["action"] not in ("silent", "reply", "transfer"):
|
||
policy["action"] = "silent"
|
||
return policy
|
||
|
||
def _match_system_inquiry(self, data: dict, policy: Dict[str, Any]) -> bool:
|
||
"""识别是否为系统客服询单消息。"""
|
||
if not policy.get("enabled", False):
|
||
return False
|
||
|
||
from_name = self.to_chinese(data.get("from_name", "") or "").lower()
|
||
from_id = str(data.get("from_id", "") or "").lower()
|
||
msg = self.to_chinese(data.get("msg", "") or "").lower()
|
||
|
||
sender_hits = 0
|
||
for kw in policy.get("sender_keywords", []):
|
||
if kw and (kw in from_name or kw in from_id):
|
||
sender_hits += 1
|
||
message_hits = 0
|
||
for kw in policy.get("message_keywords", []):
|
||
if kw and kw in msg:
|
||
message_hits += 1
|
||
|
||
# 优先看发送者特征;纯文本命中时至少要求两个关键词,降低误判风险
|
||
return sender_hits > 0 or message_hits >= 2
|
||
|
||
async def _handle_system_inquiry(self, data: dict) -> bool:
|
||
"""命中系统询单后按策略处理。"""
|
||
acc_id = data.get("acc_id", "")
|
||
policy = self._resolve_system_inquiry_policy(acc_id)
|
||
if not self._match_system_inquiry(data, policy):
|
||
return False
|
||
|
||
customer_id = data.get("from_id", "")
|
||
metrics_emit("system_inquiry_detected", customer_id=customer_id, acc_id=acc_id)
|
||
action = policy.get("action", "silent")
|
||
logger.info(f"系统询单命中 | 店铺:{acc_id} | 客户:{customer_id} | action:{action}")
|
||
|
||
if action == "reply":
|
||
reply = policy.get("reply") or "您好,这边已收到询单消息,稍后由人工客服跟进处理。"
|
||
await self.send_reply(data, reply)
|
||
metrics_emit("system_inquiry_auto_reply", customer_id=customer_id, acc_id=acc_id)
|
||
return True
|
||
if action == "transfer":
|
||
await self.transfer_to_human(data, "系统询单转人工")
|
||
metrics_emit("system_inquiry_transfer", customer_id=customer_id, acc_id=acc_id)
|
||
return True
|
||
|
||
metrics_emit("system_inquiry_ignored", customer_id=customer_id, acc_id=acc_id)
|
||
return True
|
||
|
||
def _should_ignore(self, data: dict) -> bool:
|
||
"""判断是否应该忽略该消息(不回复)"""
|
||
msg = self.to_chinese(data.get('msg', ''))
|
||
|
||
# 会话转交由 _is_transfer_msg 单独处理,这里不再忽略
|
||
ignore_patterns = [
|
||
'已转接',
|
||
'接入会话',
|
||
'结束会话',
|
||
'会话已',
|
||
'[系统消息]',
|
||
'[系统通知]',
|
||
]
|
||
for pattern in ignore_patterns:
|
||
if pattern in msg:
|
||
return True
|
||
|
||
# 发送者是自己(店铺账号),避免回复自己发的消息
|
||
acc_id = data.get('acc_id', '')
|
||
from_id = data.get('from_id', '')
|
||
if acc_id and from_id and acc_id == from_id:
|
||
return True
|
||
|
||
return False
|
||
|
||
def get_msg_type_name(self, msg_type):
|
||
"""获取消息类型名称"""
|
||
types = {
|
||
0: "文本",
|
||
1: "图片",
|
||
2: "视频",
|
||
3: "文件"
|
||
}
|
||
return types.get(msg_type, f"未知({msg_type})")
|
||
|
||
def _extract_and_save_customer_info(self, message: str, customer_id: str):
|
||
"""从消息中提取客户信息并保存"""
|
||
if not message or not customer_id:
|
||
return
|
||
|
||
# 提取邮箱
|
||
email_pattern = r'[\w\.-]+@[\w\.-]+\.\w+'
|
||
email_match = re.search(email_pattern, message)
|
||
if email_match:
|
||
db.update_email(customer_id, email_match.group())
|
||
|
||
# 提取手机号
|
||
phone_pattern = r'1[3-9]\d{9}'
|
||
phone_match = re.search(phone_pattern, message)
|
||
if phone_match:
|
||
db.update_phone(customer_id, phone_match.group())
|
||
|
||
# 提取微信号
|
||
wechat_pattern = r'[Vv微信]+号[::]?\s*([\w-]+)'
|
||
wechat_match = re.search(wechat_pattern, message)
|
||
if wechat_match:
|
||
db.update_wechat(customer_id, wechat_match.group(1))
|
||
|
||
# 提取预算关键词
|
||
budget_keywords = ['预算', '不超过', '最多', '便宜点', '便宜']
|
||
for keyword in budget_keywords:
|
||
if keyword in message:
|
||
db.add_personality_tag(customer_id, "关注价格")
|
||
break
|
||
|
||
# 提取性格关键词
|
||
personality_keywords = {
|
||
'爽快': '爽快',
|
||
'干脆': '爽快',
|
||
'纠结': '纠结',
|
||
'墨迹': '纠结',
|
||
'砍价': '砍价',
|
||
'贵': '砍价'
|
||
}
|
||
for keyword, tag in personality_keywords.items():
|
||
if keyword in message:
|
||
db.add_personality_tag(customer_id, tag)
|
||
|
||
# 更新最后联系时间
|
||
profile = db.get_customer(customer_id)
|
||
profile.last_contact = datetime.now().isoformat()
|
||
db.save_customer(profile)
|
||
|
||
def to_chinese(self, text):
|
||
"""处理文本,安全地转换 unicode 转义"""
|
||
if not isinstance(text, str):
|
||
return text
|
||
if '\\u' not in text:
|
||
return text
|
||
try:
|
||
return json.loads(f'"{text}"')
|
||
except Exception:
|
||
return text
|
||
|
||
async def handle_image_message(self, data: dict):
|
||
"""
|
||
处理图片消息。
|
||
先回复"我找找",然后把图片URL作为消息内容交给 Agent(后台执行)。
|
||
Agent 会自主调用 analyze_image() 工具分析复杂度,再报价。
|
||
整个过程由 Agent 自主协调,无需外部干预。
|
||
不阻塞接收循环,可同时接收其他客户消息。
|
||
"""
|
||
# 立刻回复,让客户感觉真人在操作
|
||
await self.send_reply(data, "我找找")
|
||
|
||
# 把图片URL当作消息内容,交给 Agent 后台处理(图片分析约 12 秒,不阻塞新消息接收)
|
||
image_data = dict(data)
|
||
image_data['msg'] = f"[客户发来图片] {data.get('msg', '')}"
|
||
image_data['msg_type'] = 0 # 转为文本消息,让 agent_reply 处理
|
||
self._fire_and_forget(self._agent_reply_serialized(image_data))
|
||
|
||
async def _dispatch_assign_once(self) -> Dict[str, Any]:
|
||
"""
|
||
调用新的一键派单接口:
|
||
GET {DISPATCH_BASE_URL}/assign
|
||
Header: X-API-Key
|
||
"""
|
||
base_url = os.getenv("DISPATCH_BASE_URL", "http://1.12.50.92:8006").strip().rstrip("/")
|
||
api_key = os.getenv("DISPATCH_API_KEY", "tuhui_dispatch_key_2026").strip()
|
||
timeout_s = float(os.getenv("DISPATCH_TIMEOUT_SECONDS", "5"))
|
||
if not base_url or not api_key:
|
||
return {"success": False, "reason": "dispatch config missing"}
|
||
try:
|
||
import httpx
|
||
async with httpx.AsyncClient(timeout=timeout_s) as client:
|
||
resp = await client.get(
|
||
f"{base_url}/assign",
|
||
headers={"X-API-Key": api_key},
|
||
)
|
||
if resp.status_code != 200:
|
||
return {"success": False, "reason": f"http {resp.status_code}"}
|
||
data = resp.json() if resp.content else {}
|
||
ok = bool((data or {}).get("success", False))
|
||
return {
|
||
"success": ok,
|
||
"task_id": str((data or {}).get("task_id", "") or ""),
|
||
"assigned_to": str((data or {}).get("assigned_to", "") or ""),
|
||
"online_count": int((data or {}).get("online_count", 0) or 0),
|
||
"notification_sent": bool((data or {}).get("notification_sent", False)),
|
||
"raw": data,
|
||
}
|
||
except Exception as e:
|
||
return {"success": False, "reason": str(e)}
|
||
|
||
async def transfer_to_human(self, data: dict, transfer_msg: str = ""):
|
||
"""
|
||
转接人工客服。
|
||
1. 优先调用 dispatch 服务 GET /assign 一键派单
|
||
2. 派单失败时,回退旧版 designer_roster 派单
|
||
3. 无人在线或未配置时,回退到 config/transfer_groups.json
|
||
设计师在线状态:仅在转人工时按需查询,不轮询。
|
||
"""
|
||
if not self.websocket:
|
||
logger.info(f"[{self.get_time()}] 错误: 未连接到服务器")
|
||
return
|
||
|
||
acc_id = data.get("acc_id", "")
|
||
group_id = None
|
||
assigned_to = ""
|
||
dispatch_res = await self._dispatch_assign_once()
|
||
if dispatch_res.get("success"):
|
||
assigned_to = str(dispatch_res.get("assigned_to", "") or "").strip()
|
||
logger.info(
|
||
f"一键派单成功 | task_id={dispatch_res.get('task_id','')} | assigned_to={assigned_to or '未知'} | online_count={dispatch_res.get('online_count',0)}"
|
||
)
|
||
metrics_emit(
|
||
"dispatch_assign_success",
|
||
acc_id=acc_id,
|
||
assigned_to=assigned_to,
|
||
online_count=dispatch_res.get("online_count", 0),
|
||
)
|
||
else:
|
||
logger.warning(f"一键派单失败,回退旧派单逻辑: {dispatch_res.get('reason', 'unknown')}")
|
||
metrics_emit("dispatch_assign_failed", acc_id=acc_id)
|
||
|
||
# 2. 派单失败时,回退旧版 designer_roster
|
||
if not dispatch_res.get("success"):
|
||
try:
|
||
from utils.designer_roster import poll_and_update_roster
|
||
from db.designer_roster_db import get_transfer_group_for_shop
|
||
await poll_and_update_roster()
|
||
group_id = get_transfer_group_for_shop(acc_id)
|
||
except Exception as e:
|
||
logger.debug(f"设计师派单未启用或异常: {e}")
|
||
|
||
# 3. 无人在线时企微提醒(新旧两套都没拿到在线结果时)
|
||
online_count = int(dispatch_res.get("online_count", 0) or 0)
|
||
if online_count <= 0 and not group_id:
|
||
try:
|
||
from config.config import WECHAT_WEBHOOK
|
||
if WECHAT_WEBHOOK:
|
||
import httpx
|
||
async with httpx.AsyncClient(timeout=5) as client:
|
||
resp = await client.post(WECHAT_WEBHOOK, json={
|
||
"msgtype": "text",
|
||
"text": {"content": "谁在线啊"}
|
||
})
|
||
if resp.status_code != 200:
|
||
logger.warning(f"企微提醒发送失败: {resp.status_code} {resp.text}")
|
||
else:
|
||
logger.debug("未配置 WECHAT_WEBHOOK,跳过企微提醒")
|
||
except Exception as e:
|
||
logger.warning(f"企微提醒发送异常: {e}")
|
||
|
||
# 4. 构造转接命令:有 assigned_to 用人名,否则回退分组
|
||
if assigned_to:
|
||
cmd = f"正在为你转接人工|[转移会话],{assigned_to},无原因"
|
||
await self.send_reply(data, cmd)
|
||
logger.info(f"[{self.get_time()}] 已发送转接请求 (店铺:{acc_id or '未知'} -> 设计师:{assigned_to})")
|
||
return
|
||
|
||
if not group_id:
|
||
group_id = _get_transfer_group(acc_id)
|
||
cmd = f"话术|[转移会话],分组{group_id},无原因"
|
||
await self.send_reply(data, cmd)
|
||
logger.info(f"[{self.get_time()}] 已发送转接请求 (店铺:{acc_id or '未知'} -> 分组:{group_id})")
|
||
|
||
async def _save_conversation_summary(self, customer_id: str, buyer_msg: str, agent_reply: str):
|
||
"""用 AI 生成一句话对话摘要并持久化"""
|
||
try:
|
||
from db.customer_db import db
|
||
from openai import AsyncOpenAI
|
||
client = AsyncOpenAI(
|
||
api_key=self.agent.api_key if self.agent else None,
|
||
base_url=self.agent.base_url if self.agent else None,
|
||
)
|
||
resp = await client.chat.completions.create(
|
||
model=self.agent.model_name if self.agent else "gpt-4o-mini",
|
||
messages=[
|
||
{"role": "system", "content": "用一句话(15字以内)总结这段对话的核心内容,只输出摘要文字。"},
|
||
{"role": "user", "content": f"买家:{buyer_msg}\n客服:{agent_reply}"},
|
||
],
|
||
max_tokens=30,
|
||
temperature=0.3,
|
||
)
|
||
summary = resp.choices[0].message.content.strip()
|
||
db.save_conversation_summary(customer_id, summary)
|
||
except Exception:
|
||
pass # 摘要失败不影响主流程
|
||
|
||
async def _workflow_agent_notify(
|
||
self,
|
||
customer_id: str,
|
||
acc_id: str,
|
||
acc_type: str,
|
||
system_hint: str,
|
||
):
|
||
"""图片处理完成后,让客服 AI 生成自然话术发给客户"""
|
||
if not self.enable_agent or not self.agent:
|
||
return
|
||
try:
|
||
from core.pydantic_ai_agent import CustomerMessage
|
||
notify_msg = CustomerMessage(
|
||
msg_id="workflow_notify",
|
||
acc_id=acc_id,
|
||
msg=system_hint,
|
||
from_id=customer_id,
|
||
from_name="",
|
||
cy_id=customer_id,
|
||
acc_type=acc_type,
|
||
msg_type=0,
|
||
cy_name="",
|
||
)
|
||
response = await self.agent.process_message(notify_msg)
|
||
if response.should_reply and response.reply:
|
||
nonsense_patterns = [
|
||
"无需", "流程已完成", "不需要回复", "无需额外", "已完成",
|
||
"无需回复", "不需要额外", "已经完成", "无需再", "操作已完成",
|
||
"任务完成", "流程完成", "记录完成", "报价已",
|
||
]
|
||
if not any(p in response.reply for p in nonsense_patterns):
|
||
# 构造一个虚拟原始消息用于 send_reply
|
||
fake_data = {
|
||
"acc_id": acc_id,
|
||
"from_id": customer_id,
|
||
"from_name": "",
|
||
"cy_id": customer_id,
|
||
"acc_type": acc_type,
|
||
}
|
||
await asyncio.sleep(0.5)
|
||
await self.send_reply(fake_data, response.reply)
|
||
logger.info(f"[Workflow] AI 通知已发送: {response.reply}")
|
||
except Exception as e:
|
||
logger.error(f"[Workflow] AI 通知生成失败: {e}")
|
||
|
||
async def _workflow_send(
|
||
self,
|
||
customer_id: str,
|
||
acc_id: str,
|
||
acc_type: str,
|
||
content: str,
|
||
msg_type: int = 0
|
||
):
|
||
"""workflow 回调:图片AI完成后用此方法推送消息给客户"""
|
||
msg = {
|
||
"msg_id": "",
|
||
"acc_id": acc_id,
|
||
"msg": content,
|
||
"from_id": customer_id,
|
||
"from_name": customer_id,
|
||
"cy_id": customer_id,
|
||
"acc_type": acc_type,
|
||
"msg_type": msg_type,
|
||
"cy_name": customer_id
|
||
}
|
||
await self.send_message(msg)
|
||
|
||
async def send_reply(self, original_msg, reply_content):
|
||
"""
|
||
发送回复消息
|
||
|
||
Args:
|
||
original_msg: 收到的原始消息字典
|
||
reply_content: 回复内容(文本或本地文件路径/http地址)
|
||
"""
|
||
trace_id = original_msg.get("_trace_id", "")
|
||
if not self.websocket:
|
||
logger.info(f"[{self.get_time()}] 错误: 未连接到服务器")
|
||
self._activity_log(
|
||
"send_reply_skipped",
|
||
trace_id=trace_id,
|
||
reason="websocket_not_connected",
|
||
acc_id=original_msg.get("acc_id", ""),
|
||
customer_id=original_msg.get("from_id", ""),
|
||
)
|
||
return
|
||
|
||
reply_content = self._colloquialize_outbound_reply(reply_content)
|
||
|
||
# 同一客户外发限流:N 秒内最多 1 条
|
||
try:
|
||
from config.config import OUTBOUND_PER_CUSTOMER_COOLDOWN_SECONDS
|
||
cooldown = max(0, int(OUTBOUND_PER_CUSTOMER_COOLDOWN_SECONDS))
|
||
except Exception:
|
||
cooldown = 5
|
||
if cooldown > 0:
|
||
ckey = f"{original_msg.get('acc_id', '')}:{original_msg.get('from_id', '')}"
|
||
now_mono = time.monotonic()
|
||
last = self._last_reply_sent_at.get(ckey, 0.0)
|
||
if (now_mono - last) < cooldown:
|
||
logger.info(
|
||
f"外发限流命中,跳过发送 | 客户:{ckey} | cooldown:{cooldown}s | msg:{str(reply_content)[:40]}"
|
||
)
|
||
self._activity_log(
|
||
"send_reply_throttled",
|
||
trace_id=trace_id,
|
||
key=ckey,
|
||
cooldown_s=cooldown,
|
||
msg=str(reply_content),
|
||
)
|
||
return
|
||
self._last_reply_sent_at[ckey] = now_mono
|
||
|
||
shop_id = original_msg.get("acc_id", "")
|
||
|
||
# 根据轻简API文档:
|
||
# from_id = 客户ID(收消息方)
|
||
# cy_id = 非群聊时与 from_id 相同
|
||
customer_id = original_msg.get("from_id", "")
|
||
customer_name = original_msg.get("from_name", "")
|
||
|
||
allow_send, checked_reply, guard_reason = await self._ai_guard_outbound_reply(
|
||
original_msg=original_msg,
|
||
reply_content=str(reply_content),
|
||
)
|
||
self._activity_log(
|
||
"reply_guard_decision",
|
||
trace_id=trace_id,
|
||
acc_id=shop_id,
|
||
customer_id=customer_id,
|
||
result="ok" if allow_send else "blocked",
|
||
reason=guard_reason,
|
||
original_reply=str(reply_content),
|
||
final_reply=str(checked_reply or ""),
|
||
)
|
||
if not allow_send:
|
||
logger.info(f"回复被AI质检拦截: {guard_reason}")
|
||
return
|
||
reply_content = checked_reply or str(reply_content)
|
||
|
||
reply = {
|
||
"msg_id": "",
|
||
"acc_id": shop_id,
|
||
"msg": reply_content,
|
||
"from_id": customer_id,
|
||
"from_name": customer_name,
|
||
"cy_id": customer_id,
|
||
"acc_type": original_msg.get("acc_type", ""),
|
||
"msg_type": 0,
|
||
"cy_name": customer_name
|
||
}
|
||
self._log_outbound_once(original_msg, str(reply_content))
|
||
self._activity_log(
|
||
"send_reply_attempt",
|
||
trace_id=trace_id,
|
||
acc_id=shop_id,
|
||
customer_id=customer_id,
|
||
msg=str(reply_content),
|
||
)
|
||
reply["_trace_id"] = trace_id
|
||
await self.send_message(reply)
|
||
|
||
def _colloquialize_outbound_reply(self, text: Any) -> Any:
|
||
"""统一外发口语化处理,避免机械话术。"""
|
||
if not isinstance(text, str):
|
||
return text
|
||
raw = text.strip()
|
||
if not raw:
|
||
return text
|
||
# 控制指令/转接命令不得改写
|
||
if raw.startswith("话术|") or "[转移会话]" in raw:
|
||
return text
|
||
# 纯链接不改
|
||
if re.fullmatch(r"https?://\S+", raw):
|
||
return text
|
||
|
||
out = raw
|
||
replacements = {
|
||
"我这边": "我这边",
|
||
"请您": "你",
|
||
"您好": "你好",
|
||
"稍后": "一会儿",
|
||
"可以的话": "可以的话",
|
||
"请稍等": "稍等哈",
|
||
"先不乱报价": "先不急着给你乱报",
|
||
"建议转人工评估更稳": "建议转人工看会更稳",
|
||
"统一报价": "一起报价",
|
||
"马上安排": "马上给你安排",
|
||
"确认我就安排": "你点头我就开做",
|
||
"收到,我看看哈": "收到,我先看下",
|
||
"收到,我找找刚才那几张": "收到,我把刚才那几张一起看下",
|
||
"这组图我这边暂时识别不稳定": "这组图我这边识别得不太稳",
|
||
"这组图我这边暂时识别异常": "这组图我这边刚才识别有点异常",
|
||
"你可以换一张更清晰的,我再给你准报价。": "你换张更清晰的发我,我再给你报准点。",
|
||
"你可以换清晰图再发我。": "你换张清晰点的再发我哈。",
|
||
"你可以稍后再发我。": "你晚点再发我也行。",
|
||
"收到付款,我马上安排处理,有需要第一时间联系您": "收到付款啦,我马上安排处理,有进展第一时间告诉你",
|
||
"亲,正在为您转接人工客服,请稍等~": "我这就给你转人工,稍等哈~",
|
||
}
|
||
for k, v in replacements.items():
|
||
out = out.replace(k, v)
|
||
|
||
# 收尾语气柔化
|
||
out = out.replace("。", "。")
|
||
return out
|
||
|
||
async def _ai_guard_outbound_reply(self, original_msg: dict, reply_content: str) -> tuple[bool, str, str]:
|
||
"""
|
||
专用AI质检:发送前判断“这句是否该发”,可拦截或改写。
|
||
读取当前客户在当前店铺的完整对话上下文。
|
||
"""
|
||
text = (reply_content or "").strip()
|
||
if not text:
|
||
return False, "", "empty_reply"
|
||
if text.startswith("话术|") or "[转移会话]" in text:
|
||
return True, text, "command_bypass"
|
||
if not self._reply_guard_enabled or not self.enable_agent or not self.agent or not AgentDeps:
|
||
return True, text, "guard_disabled"
|
||
try:
|
||
from db.chat_log_db import get_conversation
|
||
|
||
acc_id = str(original_msg.get("acc_id", "") or "")
|
||
customer_id = str(original_msg.get("from_id", "") or "")
|
||
if not customer_id:
|
||
return True, text, "no_customer_id"
|
||
|
||
# 默认读取较大窗口,尽量覆盖完整上下文;可用环境变量继续放大。
|
||
try:
|
||
max_rows = max(50, int(os.getenv("AI_REPLY_GUARD_CONTEXT_ROWS", "500")))
|
||
except Exception:
|
||
max_rows = 500
|
||
rows = get_conversation(customer_id=customer_id, limit=max_rows) or []
|
||
shop_rows = [r for r in rows if str(r.get("acc_id", "") or "") == acc_id] if acc_id else rows
|
||
|
||
context_lines = []
|
||
for r in shop_rows:
|
||
role = "客" if (r.get("direction") == "in") else "服"
|
||
msg = self.to_chinese((r.get("message") or "").strip())
|
||
if msg:
|
||
context_lines.append(f"{role}:{msg}")
|
||
context_text = "\n".join(context_lines) if context_lines else "无历史"
|
||
if self._reply_guard_verbose:
|
||
logger.info(
|
||
"[AI质检] 启动 | customer=%s | acc=%s | context_rows=%s | candidate=%s",
|
||
customer_id,
|
||
acc_id,
|
||
len(shop_rows),
|
||
text[:120],
|
||
)
|
||
|
||
deps = AgentDeps(
|
||
msg_id=str(original_msg.get("msg_id", "") or "reply_guard"),
|
||
acc_id=acc_id,
|
||
from_id=customer_id,
|
||
platform=str(original_msg.get("acc_type", "") or ""),
|
||
)
|
||
prompt = (
|
||
"你是淘宝客服回复质检器。目标:判断候选回复是否和上下文一致,是否会造成重复触发式答复。\n"
|
||
"必须检查:\n"
|
||
"1) 是否答非所问;\n"
|
||
"2) 是否重复说“马上报价/继续发图”但当前上下文不需要;\n"
|
||
"3) 是否与历史状态冲突;\n"
|
||
"4) 语气是否自然可直接发给客户。\n"
|
||
"若不合适,给可直接发送的一句改写。\n"
|
||
"只输出 JSON:{\"allow\":true/false,\"rewrite\":\"...\",\"reason\":\"...\"}\n\n"
|
||
f"完整上下文(当前店铺):\n{context_text}\n\n"
|
||
f"客户当前消息:{self.to_chinese(original_msg.get('msg', '') or '')}\n"
|
||
f"候选回复:{text}\n"
|
||
)
|
||
result = await self.agent.agent_natural_reply.run(prompt, deps=deps, message_history=[])
|
||
raw = str(getattr(result, "output", "") or "").strip()
|
||
if not raw:
|
||
return True, text, "guard_empty_output"
|
||
import json as _json
|
||
import re as _re
|
||
|
||
m = _re.search(r"\{[\s\S]*\}", raw)
|
||
if not m:
|
||
return True, text, "guard_non_json"
|
||
obj = _json.loads(m.group(0))
|
||
allow = bool(obj.get("allow", True))
|
||
rewrite = str(obj.get("rewrite", "") or "").strip()
|
||
reason = str(obj.get("reason", "") or "").strip() or "guard_decision"
|
||
if self._reply_guard_verbose:
|
||
logger.info(
|
||
"[AI质检] 结果 | allow=%s | reason=%s | rewrite=%s",
|
||
allow,
|
||
reason,
|
||
(rewrite or "")[:160],
|
||
)
|
||
if allow:
|
||
return True, (rewrite or text), reason
|
||
if rewrite:
|
||
return True, rewrite, reason
|
||
return False, "", reason
|
||
except Exception as e:
|
||
logger.warning("[AI质检] 异常,降级放行: %s", e)
|
||
return True, text, f"guard_error:{e}"
|
||
|
||
async def send_text(self, cy_id, acc_type, content):
|
||
"""
|
||
主动发送文本消息
|
||
|
||
Args:
|
||
cy_id: 会话ID(对方ID)
|
||
acc_type: 平台类型
|
||
content: 消息内容
|
||
"""
|
||
message = {
|
||
"msg_id": "",
|
||
"acc_id": "",
|
||
"msg": content,
|
||
"from_id": self.reply_id,
|
||
"from_name": self.reply_id,
|
||
"cy_id": cy_id,
|
||
"acc_type": acc_type,
|
||
"msg_type": 0,
|
||
"cy_name": ""
|
||
}
|
||
await self.send_message(message)
|
||
|
||
async def send_image(self, cy_id, acc_type, image_path):
|
||
"""
|
||
主动发送图片消息
|
||
|
||
Args:
|
||
cy_id: 会话ID(对方ID)
|
||
acc_type: 平台类型
|
||
image_path: 图片本地路径或http地址
|
||
"""
|
||
message = {
|
||
"msg_id": "",
|
||
"acc_id": "",
|
||
"msg": image_path,
|
||
"from_id": self.reply_id,
|
||
"from_name": self.reply_id,
|
||
"cy_id": cy_id,
|
||
"acc_type": acc_type,
|
||
"msg_type": 1,
|
||
"cy_name": ""
|
||
}
|
||
await self.send_message(message)
|
||
|
||
async def send_message(self, message):
|
||
"""发送消息到服务器"""
|
||
if self.websocket and self.websocket.state == websockets.protocol.State.OPEN:
|
||
try:
|
||
msg_json = json.dumps(message, ensure_ascii=False)
|
||
await self.websocket.send(msg_json)
|
||
pretty = json.dumps(message, ensure_ascii=False, indent=2)
|
||
logger.info(f"[{self.get_time()}] 发送成功:\n{pretty}")
|
||
self._activity_log(
|
||
"send_message_success",
|
||
trace_id=message.get("_trace_id", ""),
|
||
acc_id=message.get("acc_id", ""),
|
||
customer_id=message.get("from_id", ""),
|
||
msg_type=message.get("msg_type", 0),
|
||
msg=message.get("msg", ""),
|
||
)
|
||
except Exception as e:
|
||
logger.info(f"[{self.get_time()}] 发送失败: {e}")
|
||
self._activity_log(
|
||
"send_message_error",
|
||
trace_id=message.get("_trace_id", ""),
|
||
acc_id=message.get("acc_id", ""),
|
||
customer_id=message.get("from_id", ""),
|
||
error=str(e),
|
||
)
|
||
else:
|
||
logger.info(f"[{self.get_time()}] 错误: 连接未打开")
|
||
self._activity_log(
|
||
"send_message_skipped",
|
||
trace_id=message.get("_trace_id", ""),
|
||
reason="socket_not_open",
|
||
acc_id=message.get("acc_id", ""),
|
||
customer_id=message.get("from_id", ""),
|
||
)
|
||
|
||
async def auto_reply(self, data):
|
||
"""自动回复示例(已弃用,使用 agent_reply 替代)"""
|
||
pass
|
||
|
||
async def command_handler(self):
|
||
"""命令行交互"""
|
||
logger.info("\n命令帮助:")
|
||
logger.info(" reply <内容> - 回复最后一条消息")
|
||
logger.info(" text <id> <平台> <内容> - 发送文本消息")
|
||
logger.info(" img <id> <平台> <路径> - 发送图片")
|
||
logger.info(" setid <id> - 设置回复ID")
|
||
logger.info(" agent on/off - 开启/关闭 Agent")
|
||
logger.info(" exit/quit - 退出\n")
|
||
|
||
while self.running:
|
||
try:
|
||
loop = asyncio.get_running_loop()
|
||
user_input = await loop.run_in_executor(None, input, "")
|
||
|
||
parts = user_input.strip().split(maxsplit=1)
|
||
if not parts:
|
||
continue
|
||
|
||
cmd = parts[0].lower()
|
||
|
||
if cmd in ["exit", "quit", "q"]:
|
||
logger.info(f"[{self.get_time()}] 正在关闭...")
|
||
self.running = False
|
||
if self.websocket:
|
||
await self.websocket.close()
|
||
break
|
||
|
||
elif cmd == "setid" and len(parts) > 1:
|
||
self.reply_id = parts[1]
|
||
logger.info(f"[{self.get_time()}] 回复ID已设置为: {self.reply_id}")
|
||
|
||
elif cmd == "agent" and len(parts) > 1:
|
||
if parts[1].lower() == "on":
|
||
self.enable_agent = True
|
||
logger.info(f"[{self.get_time()}] Agent 已开启")
|
||
elif parts[1].lower() == "off":
|
||
self.enable_agent = False
|
||
logger.info(f"[{self.get_time()}] Agent 已关闭")
|
||
|
||
elif cmd == "reply" and len(parts) > 1:
|
||
if self.last_msg:
|
||
await self.send_reply(self.last_msg, parts[1])
|
||
else:
|
||
logger.info(f"[{self.get_time()}] 错误: 还没有收到任何消息")
|
||
|
||
elif cmd == "text" and len(parts) > 1:
|
||
# text cy_id acc_type content
|
||
args = parts[1].split(maxsplit=2)
|
||
if len(args) >= 3:
|
||
await self.send_text(args[0], args[1], args[2])
|
||
else:
|
||
logger.info(f"[{self.get_time()}] 格式: text <cy_id> <acc_type> <内容>")
|
||
|
||
elif cmd == "img" and len(parts) > 1:
|
||
# img cy_id acc_type image_path
|
||
args = parts[1].split(maxsplit=2)
|
||
if len(args) >= 3:
|
||
await self.send_image(args[0], args[1], args[2])
|
||
else:
|
||
logger.info(f"[{self.get_time()}] 格式: img <cy_id> <acc_type> <图片路径>")
|
||
|
||
else:
|
||
logger.info(f"[{self.get_time()}] 未知命令: {cmd}")
|
||
|
||
except Exception as e:
|
||
logger.info(f"[{self.get_time()}] 命令错误: {e}")
|
||
|
||
def get_time(self):
|
||
"""获取当前时间字符串"""
|
||
return datetime.now().strftime("%H:%M:%S")
|
||
|
||
async def run(self):
|
||
"""运行客户端"""
|
||
tasks = [self.connect(), self.command_handler()]
|
||
|
||
# 启动邮件接收后台任务
|
||
try:
|
||
from mail.email_receiver import email_receiver
|
||
if email_receiver.username:
|
||
logger.info(f"[{self.get_time()}] 邮件接收已启动,监控: {email_receiver.username}")
|
||
tasks.append(email_receiver.start())
|
||
else:
|
||
logger.info(f"[{self.get_time()}] 未配置邮件账号,跳过邮件接收")
|
||
except Exception as e:
|
||
logger.info(f"[{self.get_time()}] 邮件接收模块加载失败: {e}")
|
||
|
||
# 启动每日汇总定时任务
|
||
try:
|
||
from utils.daily_summary import scheduler as daily_scheduler
|
||
tasks.append(daily_scheduler())
|
||
logger.info(f"[{self.get_time()}] 每日日报定时任务已启动")
|
||
except Exception as e:
|
||
logger.info(f"[{self.get_time()}] 日报模块加载失败: {e}")
|
||
|
||
# 设计师在线状态:转人工时按需查询,不再轮询
|
||
|
||
# 启动健康检查(轻简/企微断线告警)
|
||
try:
|
||
from utils.health_check import health_check_loop
|
||
def _qingjian_ok():
|
||
return self.websocket is not None and not getattr(self.websocket, "closed", True)
|
||
tasks.append(health_check_loop(_qingjian_ok))
|
||
logger.info(f"[{self.get_time()}] 健康检查已启动")
|
||
except Exception as e:
|
||
logger.info(f"[{self.get_time()}] 健康检查模块加载失败: {e}")
|
||
|
||
# 每天早上8点发送启动消息到企微群
|
||
try:
|
||
from utils.wechat_chat_log import morning_startup_scheduler
|
||
tasks.append(morning_startup_scheduler())
|
||
logger.info(f"[{self.get_time()}] 早8点企微启动消息已启动")
|
||
except Exception as e:
|
||
logger.info(f"[{self.get_time()}] 企微启动消息模块加载失败: {e}")
|
||
|
||
await asyncio.gather(*tasks)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import sys
|
||
|
||
# 检查是否有 --no-agent 参数
|
||
enable_agent = "--no-agent" not in sys.argv
|
||
|
||
client = QingjianAPIClient(enable_agent=enable_agent)
|
||
try:
|
||
asyncio.run(client.run())
|
||
except KeyboardInterrupt:
|
||
logger.info("\n已停止")
|
||
|
||
|
||
async def _load_task_modules(self):
|
||
"""延迟加载任务模块,避免循环导入"""
|
||
from core.task_scheduler import get_task_scheduler
|
||
from core.task_trigger import get_trigger_engine
|
||
from db.task_db.task_model import get_task_manager
|
||
self.trigger_engine = get_trigger_engine()
|
||
|
||
async def check_and_trigger_tasks(self, data: dict):
|
||
"""检查并触发匹配的任务"""
|
||
try:
|
||
customer_key = self._customer_key(data)
|
||
customer_id = data.get('from_id')
|
||
message = data.get('content', '')
|
||
|
||
# 获取该客户的待触发任务
|
||
pending_tasks = self.task_manager.get_pending_tasks(customer_id)
|
||
|
||
for task in pending_tasks:
|
||
trigger = {
|
||
'type': task['trigger_type'],
|
||
'keyword': task['trigger_keyword'],
|
||
'keywords': task['trigger_keywords']
|
||
}
|
||
|
||
# 检查是否匹配触发条件
|
||
if self.task_scheduler.check_trigger_match(message, trigger):
|
||
logger.info(f"任务触发条件匹配:{task['task_id']}")
|
||
|
||
# 异步执行任务
|
||
asyncio.create_task(self.task_scheduler.execute_task(task))
|
||
|
||
except Exception as e:
|
||
logger.error(f"检查任务触发失败:{e}")
|
||
|
||
|
||
async def _load_task_modules(self):
|
||
"""延迟加载任务模块,避免循环导入"""
|
||
from core.task_scheduler import get_task_scheduler
|
||
from core.task_trigger import get_trigger_engine
|
||
from db.task_db.task_model import get_task_manager
|
||
self.trigger_engine = get_trigger_engine()
|
||
|
||
async def _load_task_modules(self):
|
||
"""延迟加载任务模块"""
|
||
if self.task_scheduler is None:
|
||
from core.task_scheduler import get_task_scheduler
|
||
from core.task_trigger import get_trigger_engine
|
||
from db.task_db.task_model import get_task_manager
|
||
self.trigger_engine = get_trigger_engine()
|
||
|
||
async def check_and_trigger_tasks_v2(self, data: dict):
|
||
"""增强版:检查并触发匹配的任务(支持指定客户)"""
|
||
# 确保任务模块已加载
|
||
await self._load_task_modules()
|
||
try:
|
||
customer_key = self._customer_key(data)
|
||
customer_id = data.get('from_id')
|
||
customer_name = data.get('from_name')
|
||
message = data.get('content', '')
|
||
|
||
# 准备上下文
|
||
context = {
|
||
'customer_id': customer_id,
|
||
'customer_name': customer_name,
|
||
'acc_id': data.get('acc_id')
|
||
}
|
||
|
||
# 获取该客户的待触发任务
|
||
pending_tasks = self.task_manager.get_pending_tasks(customer_id)
|
||
|
||
for task in pending_tasks:
|
||
trigger = {
|
||
'type': task['trigger_type'],
|
||
'keyword': task['trigger_keyword'],
|
||
'keywords': task['trigger_keywords'],
|
||
# 指定客户相关字段
|
||
'customer_id': task.get('specified_customer_id'),
|
||
'customer_name': task.get('specified_customer_name')
|
||
}
|
||
|
||
# 使用触发引擎检查是否匹配
|
||
if self.trigger_engine.check_trigger(message, trigger, context):
|
||
logger.info(f"任务触发条件匹配:{task['task_id']} (客户:{customer_name}/{customer_id})")
|
||
|
||
# 异步执行任务
|
||
asyncio.create_task(self.task_scheduler.execute_task(task))
|
||
|
||
except Exception as e:
|
||
logger.error(f"检查任务触发失败:{e}")
|
||
|