Files
tw/core/websocket_client.py

1884 lines
83 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import websockets
import json
import re
import logging
import random
import secrets
import time
import hashlib
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any, List
# ========== 转接分组映射 ==========
def _get_transfer_group(acc_id: str) -> str:
"""根据店铺 acc_id 获取转接分组 ID。不同店铺对应不同客服分组。"""
from config.config import CONFIG_DIR
config_path = CONFIG_DIR / "transfer_groups.json"
default_group = "20252916034"
try:
if config_path.exists():
with open(config_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
return cfg.get(acc_id, cfg.get("default", default_group))
except Exception:
pass
return default_group
# ========== 日志配置(轮转:按大小 10MB保留 7 份)==========
def setup_logger():
from logging.handlers import RotatingFileHandler
from config.config import LOG_DIR, LOG_MAX_BYTES, LOG_BACKUP_COUNT
logger = logging.getLogger("cs_agent")
logger.setLevel(logging.INFO)
fmt = logging.Formatter("[%(asctime)s] %(message)s", datefmt="%H:%M:%S")
ch = logging.StreamHandler()
ch.setFormatter(fmt)
logger.addHandler(ch)
LOG_DIR.mkdir(exist_ok=True)
today = datetime.now().strftime("%Y-%m-%d")
fh = RotatingFileHandler(
LOG_DIR / f"chat_{today}.log",
maxBytes=LOG_MAX_BYTES,
backupCount=LOG_BACKUP_COUNT,
encoding="utf-8",
)
fh.setFormatter(fmt)
logger.addHandler(fh)
return logger
import os
logger = setup_logger()
from db.chat_log_db import log_message as _chat_log
from utils.metrics_tracker import emit as metrics_emit
# 导入 Agent 模块
try:
from core.pydantic_ai_agent import CustomerServiceAgent, CustomerMessage, _get_shop_type
from db.customer_db import db
from core.workflow import workflow
AGENT_AVAILABLE = True
except Exception as e:
AGENT_AVAILABLE = False
workflow = None
_get_shop_type = lambda acc_id, goods_name: "find_image"
import traceback
print(f"警告: Agent 模块导入失败: {e}")
traceback.print_exc()
print("将使用基础回复功能")
class QingjianAPIClient:
"""轻简API WebSocket客户端"""
def __init__(self, uri=None, enable_agent: bool = True):
from config.config import QINGJIAN_WS_URI
from config.config import IMAGE_MODULE_ENABLED
from config.config import MESSAGE_DEBOUNCE_SECONDS
self.uri = uri or QINGJIAN_WS_URI
self.websocket = None
self.running = True
self.reply_id = "tb001" # 回复时使用的from_id
self.last_msg = None # 保存最后一条消息
self.enable_agent = enable_agent and AGENT_AVAILABLE
self.agent = None
self._replied_msg_ids: deque = deque(maxlen=200) # 已回复消息IDFIFO去重
# 消息防抖:同一客户连续发消息时,等待 N 秒后合并处理
self._DEBOUNCE_SECONDS = MESSAGE_DEBOUNCE_SECONDS if isinstance(MESSAGE_DEBOUNCE_SECONDS, int) else 8
self._adaptive_debounce_enabled = os.getenv("ADAPTIVE_DEBOUNCE_ENABLED", "true").lower() in ("1", "true", "yes")
self._debounce_tasks: dict = {} # customer_key -> asyncio.Task
self._pending_msgs: dict = {} # customer_key -> list[data]
self._image_enabled = IMAGE_MODULE_ENABLED
# 同客户消息串行:保证「发图→这个高清」等顺序,避免误判
self._customer_locks: dict = {} # customer_key -> asyncio.Lock
# agent_reply 并发上限,防止 API 打满
self._agent_semaphore = asyncio.Semaphore(8)
self._pending_images: dict = {}
self._pending_image_tasks: dict = {}
# 旧版“看图即报价”快速链路(默认关闭,避免与 Agent 批量收集逻辑并发打架)
self._legacy_fast_quote_enabled = os.getenv("LEGACY_FAST_IMAGE_QUOTE", "false").lower() in ("1", "true", "yes")
self._system_inquiry_rules = self._load_system_inquiry_rules()
self._last_reply_sent_at: dict = {} # customer_key -> monotonic ts
self._inbound_log_seen: dict = {} # signature -> monotonic ts防重复写入
self._outbound_log_seen: dict = {} # signature -> monotonic ts防重复写入
# 延迟加载任务模块(避免循环导入)
self.task_scheduler = None
self.task_manager = None
self.trigger_engine = None
# 多进程分片支持
self.shard_keys: set = set() # 本进程负责的客户 key 集合
self.worker_id = int(os.getenv('AI_CS_WORKER_ID', '0'))
self.worker_count = max(1, int(os.getenv('AI_CS_WORKER_COUNT', '1')))
# 初始化 Agent
if self.enable_agent:
try:
self.agent = CustomerServiceAgent()
print(f"[{self.get_time()}] Agent 初始化成功")
except Exception as e:
print(f"[{self.get_time()}] Agent 初始化失败: {e}")
self.enable_agent = False
# 注册 workflow 消息发送回调供图片AI完成后推送消息用
if workflow:
workflow.register_send_callback(self._workflow_send)
workflow.register_agent_notify_callback(self._workflow_agent_notify)
async def connect(self):
"""连接WebSocket服务器"""
while self.running:
try:
print(f"[{self.get_time()}] 正在连接轻简API {self.uri}...")
async with websockets.connect(self.uri) as websocket:
self.websocket = websocket
from utils.health_check import set_qingjian_connected
set_qingjian_connected(True)
print(f"[{self.get_time()}] 连接成功!")
if self.enable_agent:
print(f"[{self.get_time()}] AI Agent 已启用,将自动处理消息")
print(f"[{self.get_time()}] 等待接收消息...")
# 持续接收消息
await self.receive_messages()
except ConnectionRefusedError:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
print(f"[{self.get_time()}] 连接被拒绝,请检查轻简软件是否已启动")
except websockets.exceptions.InvalidURI:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
print(f"[{self.get_time()}] URI格式错误")
except Exception as e:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
print(f"[{self.get_time()}] 连接错误: {e}")
# 等待5秒后重连
if self.running:
print(f"[{self.get_time()}] 5秒后尝试重连...")
await asyncio.sleep(5)
def _customer_key(self, data: dict) -> str:
"""同一店铺+客户 = 同一会话"""
return f"{data.get('acc_id','')}:{data.get('from_id','')}"
def _get_customer_lock(self, key: str) -> asyncio.Lock:
if key not in self._customer_locks:
self._customer_locks[key] = asyncio.Lock()
return self._customer_locks[key]
def _is_owned_by_this_worker(self, customer_key: str) -> bool:
"""
多进程兜底路由:
- 若显式分片存在,用显式分片;
- 否则按 customer_key 哈希到固定 worker避免多进程重复处理同一消息。
"""
if self.shard_keys:
return customer_key in self.shard_keys
if self.worker_count <= 1:
return True
try:
h = int(hashlib.md5(customer_key.encode("utf-8")).hexdigest()[:8], 16)
return (h % self.worker_count) == self.worker_id
except Exception:
return self.worker_id == 0
async def _agent_reply_serialized(self, data: dict):
"""同客户串行 + 全局并发限制,再执行 agent_reply"""
key = self._customer_key(data)
async with self._get_customer_lock(key):
async with self._agent_semaphore:
await self.agent_reply(data)
def _fire_and_forget(self, coro):
"""后台执行协程,不阻塞接收循环;异常会记录到日志"""
task = asyncio.create_task(coro)
def _done(t):
if t.cancelled():
return
exc = t.exception()
if exc:
logger.exception(f"后台任务异常: {exc}")
task.add_done_callback(_done)
@staticmethod
def _prune_seen(seen: dict, now_mono: float, ttl_sec: float = 8.0):
if len(seen) <= 2000:
return
stale = [k for k, t in seen.items() if (now_mono - t) > ttl_sec]
for k in stale:
seen.pop(k, None)
def _log_inbound_once(self, data: dict):
"""统一记录入站消息,短窗口去重,避免多分支重复写库。"""
try:
cid = data.get("from_id", "")
if not cid:
return
msg = self.to_chinese(data.get("msg", "") or "")
acc_id = data.get("acc_id", "")
mtype = int(data.get("msg_type", 0) or 0)
now_mono = time.monotonic()
sig = f"{acc_id}|{cid}|{mtype}|{msg}"
last = self._inbound_log_seen.get(sig, 0.0)
if (now_mono - last) < 2.0:
return
self._inbound_log_seen[sig] = now_mono
self._prune_seen(self._inbound_log_seen, now_mono, ttl_sec=8.0)
_chat_log(
cid,
msg,
"in",
customer_name=self.to_chinese(data.get("from_name", "") or data.get("cy_name", "")),
acc_id=acc_id,
platform=data.get("acc_type", ""),
msg_type=mtype,
)
except Exception:
pass
def _log_outbound_once(self, original_msg: dict, reply_content: str):
"""统一记录出站消息,短窗口去重,避免重复写库。"""
try:
cid = original_msg.get("from_id", "")
if not cid or not reply_content:
return
acc_id = original_msg.get("acc_id", "")
now_mono = time.monotonic()
sig = f"{acc_id}|{cid}|{reply_content}"
last = self._outbound_log_seen.get(sig, 0.0)
if (now_mono - last) < 2.0:
return
self._outbound_log_seen[sig] = now_mono
self._prune_seen(self._outbound_log_seen, now_mono, ttl_sec=8.0)
_chat_log(
cid,
reply_content,
"out",
customer_name=self.to_chinese(original_msg.get("from_name", "") or original_msg.get("cy_name", "")),
acc_id=acc_id,
platform=original_msg.get("acc_type", ""),
)
except Exception:
pass
async def receive_messages(self):
"""持续接收消息"""
try:
async for message in self.websocket:
await self.handle_message(message)
except websockets.exceptions.ConnectionClosed:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
print(f"[{self.get_time()}] 连接已关闭")
except Exception as e:
from utils.health_check import set_qingjian_connected
set_qingjian_connected(False)
print(f"[{self.get_time()}] 接收消息错误: {e}")
async def handle_message(self, message):
"""处理接收到的消息"""
try:
data = json.loads(message)
# 多进程分片检查:确保同一客户只由一个 worker 处理
customer_key = self._customer_key(data)
if not self._is_owned_by_this_worker(customer_key):
return
timestamp = self.get_time()
# 保存最后一条消息用于回复
self.last_msg = data
# 打印格式化的消息
print(f"\n{'='*50}")
print(f"[{timestamp}] 收到新消息:")
print(f"{'='*50}")
print(f" 消息ID: {data.get('msg_id', 'N/A')}")
print(f" 账号ID: {self.to_chinese(data.get('acc_id', 'N/A'))}")
print(f" 发送者ID: {self.to_chinese(data.get('from_id', 'N/A'))}")
print(f" 发送者名称: {self.to_chinese(data.get('from_name', 'N/A'))}")
print(f" 会话ID: {self.to_chinese(data.get('cy_id', 'N/A'))}")
print(f" 平台类型: {data.get('acc_type', 'N/A')}")
print(f" 消息类型: {self.get_msg_type_name(data.get('msg_type', 0))}")
print(f" 消息内容: {self.to_chinese(data.get('msg', 'N/A'))}")
# 显示商品信息(如果有)
if data.get('goods_name'):
print(f" 商品名称: {self.to_chinese(data.get('goods_name', ''))}")
if data.get('goods_order'):
print(f" 订单信息: {self.to_chinese(data.get('goods_order', ''))}")
print(f"{'='*50}\n")
# 消息去重:同一条消息不重复处理
msg_id = data.get('msg_id', '')
if msg_id and msg_id in self._replied_msg_ids:
logger.info(f"重复消息,跳过: {msg_id}")
return
if msg_id:
self._replied_msg_ids.append(msg_id) # deque 自动淘汰最旧的
# 空消息/无效消息过滤N/A 或关键字段全为空)
from_id = data.get('from_id', '')
acc_id = data.get('acc_id', '')
msg_body = data.get('msg', '')
if not from_id or from_id == 'N/A' or not acc_id or acc_id == 'N/A':
print(f"[{self.get_time()}] 空消息跳过from_id={from_id!r} acc_id={acc_id!r}")
return
self._log_inbound_once(data)
# Gemini 店铺:不回复,直接跳过
goods_name = self.to_chinese(data.get('goods_name', '') or '')
if _get_shop_type(acc_id, goods_name) == "gemini_api":
print(f"[{self.get_time()}] Gemini 店铺消息,跳过")
try:
from utils.wechat_chat_log import push_chat_to_wechat
asyncio.create_task(push_chat_to_wechat(
customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')),
customer_id=data.get('from_id', ''),
acc_id=data.get('acc_id', ''),
customer_msg=self.to_chinese(data.get('msg', '')),
reply_msg="",
goods_name=goods_name,
))
except Exception:
pass
return
# 使用 Agent 自动回复(仅处理文本消息)
if self.enable_agent:
msg_type = data.get('msg_type', 0)
if msg_type == 0:
if self._is_transfer_msg(data):
# 会话转交 → 主动打招呼
print(f"[{self.get_time()}] 收到转交消息,发送问候")
greeting = self._pick_transfer_greeting()
await self.send_reply(data, greeting)
try:
from utils.wechat_chat_log import push_chat_to_wechat
asyncio.create_task(push_chat_to_wechat(
customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')),
customer_id=data.get('from_id', ''),
acc_id=data.get('acc_id', ''),
customer_msg=self.to_chinese(data.get('msg', '')),
reply_msg=greeting,
goods_name=self.to_chinese(data.get('goods_name', '') or ''),
))
except Exception:
pass
elif self._is_shop_card(data):
# 进店卡片有历史对话就不回复没有才打招呼Gemini 已在上面统一跳过)
cid = data.get('from_id', '')
if self._has_chat_history(cid):
print(f"[{self.get_time()}] 进店卡片(已有记录),跳过")
else:
print(f"[{self.get_time()}] 进店卡片(新客户),发送问候")
greeting = "在呢,发图来我看看"
await self.send_reply(data, greeting)
try:
from utils.wechat_chat_log import push_chat_to_wechat
asyncio.create_task(push_chat_to_wechat(
customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')),
customer_id=data.get('from_id', ''),
acc_id=data.get('acc_id', ''),
customer_msg=self.to_chinese(data.get('msg', '')),
reply_msg=greeting,
goods_name=goods_name,
))
except Exception:
pass
elif await self._handle_system_inquiry(data):
print(f"[{self.get_time()}] 系统客服询单消息,已按规则处理")
elif self._should_ignore(data):
print(f"[{self.get_time()}] 系统通知,跳过回复")
else:
await self._debounce_agent_reply(data)
elif msg_type == 1:
# 图片消息直接处理,不走防抖(图片不会连续多发)
await self.handle_image_message(data)
except json.JSONDecodeError:
print(f"[{timestamp}] 收到非JSON消息: {message}")
async def _debounce_agent_reply(self, data: dict):
"""
消息防抖:同一客户在 _DEBOUNCE_SECONDS 内的连续消息合并后再处理。
订单通知、图片URL、付款相关消息不走防抖立即处理。
"""
msg_body = data.get('msg', '')
# 以下情况跳过防抖,立即处理(后台执行,不阻塞接收循环)
immediate_keywords = ["买家已付款", "已付款", "[系统订单信息]"]
if any(kw in msg_body for kw in immediate_keywords) or self._msg_has_image_url(msg_body):
self._fire_and_forget(self._agent_reply_serialized(data))
return
key = f"{data.get('acc_id','')}:{data.get('from_id','')}"
# 积攒消息
if key not in self._pending_msgs:
self._pending_msgs[key] = []
self._pending_msgs[key].append(msg_body)
# 取消上一个等待任务(如果有)
old_task = self._debounce_tasks.get(key)
if old_task and not old_task.done():
old_task.cancel()
debounce_seconds = self._pick_debounce_seconds(data, msg_body)
# 创建新的延迟处理任务
async def _delayed(capture_key, capture_data, wait_s: float):
await asyncio.sleep(wait_s)
msgs = self._pending_msgs.pop(capture_key, [])
if not msgs:
return
if len(msgs) == 1:
merged_msg = msgs[0]
else:
merged_msg = "".join(m for m in msgs if m.strip())
print(f"[{self.get_time()}] 防抖合并 {len(msgs)} 条消息: {merged_msg[:60]}")
merged_data = dict(capture_data)
merged_data['msg'] = merged_msg
await self._agent_reply_serialized(merged_data)
task = asyncio.create_task(_delayed(key, data, debounce_seconds))
self._debounce_tasks[key] = task
@staticmethod
def _rand_between(low: float, high: float) -> float:
if high <= low:
return float(low)
# 使用 secrets 增强随机性,避免固定周期导致机械感
span = high - low
return round(low + span * (secrets.randbelow(1000) / 1000.0), 2)
def _guess_intent_for_debounce(self, msg: str) -> str:
text = (msg or "").strip()
if not text:
return "unknown"
if self._msg_has_image_url(text):
return "image"
try:
from utils.intent_analyzer import detect_intent_keywords
intent = detect_intent_keywords(text)
except Exception:
intent = ""
if intent:
return intent
lower = text.lower()
if any(k in lower for k in ["报价", "多少钱", "价格", "", "优惠"]):
return "询价"
if any(k in lower for k in ["做一下", "改一下", "需求", "门头", "上面的字", "处理"]):
return "修改"
if any(k in lower for k in ["在吗", "你好", "有人"]):
return "打招呼"
return "unknown"
@staticmethod
def _looks_like_requirement_text(msg: str) -> bool:
text = (msg or "").strip().lower()
if not text:
return False
req_kw = (
"做一下", "改一下", "处理一下", "这个字", "上面的字", "门头", "去背景", "抠图",
"换色", "调色", "清晰", "高清", "尺寸", "比例", "横版", "竖版", "排版", "改字",
"按这个做", "照这个做", "就这张", "看看做", "弄一下",
)
return any(k in text for k in req_kw)
def _pick_debounce_seconds(self, data: dict, msg: str) -> float:
"""意图驱动防抖:不同意图不同等待区间,并引入轻微随机。"""
base = max(1.0, float(self._DEBOUNCE_SECONDS))
if not self._adaptive_debounce_enabled:
return base
intent = self._guess_intent_for_debounce(msg)
is_req = self._looks_like_requirement_text(msg)
has_img = self._msg_has_image_url(msg)
# 区间策略:越明确、越短消息,等待越短;需求描述类稍长
if intent == "打招呼":
low, high = 1.0, min(3.0, base)
elif intent in ("询价", "砍价"):
low, high = 2.0, min(5.0, base)
elif intent in ("修改", "批量"):
low, high = max(3.0, base * 0.65), min(18.0, base + 2.0)
elif intent == "转接":
low, high = 1.0, 2.5
else:
low, high = max(2.0, base * 0.5), base
# 发图后的需求描述,优先“多等一点”收集完整需求,减少半句回复
# 约束到 12-14s避免等待过长。
if is_req and not has_img:
low = max(low, 12.0)
high = min(14.0, max(high, 12.6))
# 短句更快,长句稍慢,避免把连续半句拆开
text_len = len((msg or "").strip())
if text_len <= 4:
high = min(high, max(low + 0.2, 2.5))
elif text_len >= 18:
low = min(high, low + 0.6)
wait_s = self._rand_between(low, high)
logger.info(f"防抖等待 {wait_s}s | intent={intent} | len={text_len}")
return wait_s
def _msg_has_image_url(self, msg: str) -> bool:
"""判断文本消息里是否包含图片URL客户粘贴了图片链接可能带前缀文字如 有吗#*#https://..."""
if not msg:
return False
lower = msg.lower()
image_exts = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp")
image_hosts = ("alicdn.com", "imgextra", "taobao.com", "jd.com", "pinduoduo.com")
if "http://" in lower or "https://" in lower:
if any(ext in lower for ext in image_exts) or any(h in lower for h in image_hosts):
return True
return False
def _msg_refers_images(self, msg: str) -> bool:
"""判断文本是否指代之前的图片(图一/图二/这张/那张/上面那张等)"""
if not msg:
return False
refs = (
"图一", "图二", "第一张", "第二张",
"这张", "那张", "这图", "那个图",
"这个", "这个呢",
"上面那张", "下面那张", "刚才那张", "上一张", "下一张",
)
return any(r in msg for r in refs)
def _extract_image_urls(self, msg: str) -> list:
if not msg:
return []
parts = [p.strip() for p in msg.split("#*#") if p.strip()]
urls = []
for p in parts:
if p.startswith("http://") or p.startswith("https://"):
urls.append(p)
if not urls and ("http://" in msg or "https://" in msg):
tokens = re.findall(r'(https?://\S+)', msg)
for t in tokens:
if any(ext in t.lower() for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"]):
urls.append(t)
return urls[:8]
def _collect_recent_image_urls(self, customer_id: str, acc_id: str, max_count: int = 6) -> list:
"""从最近对话中回溯收集图片URL优先买家消息用于慢发或引用图片的场景"""
urls, seen = [], set()
try:
from db.chat_log_db import get_recent_conversation
recent = get_recent_conversation(customer_id=customer_id, acc_id=acc_id, limit=20)
# 从最近到更早遍历,收集买家(in)消息中的图片链接
for m in reversed(recent):
if m.get("direction") != "in":
continue
ms = m.get("message") or ""
us = self._extract_image_urls(ms)
for u in us:
if u not in seen:
seen.add(u)
urls.append(u)
if len(urls) >= max_count:
return urls
except Exception:
pass
return urls
def _msg_is_requirement(self, msg: str) -> bool:
if not msg:
return False
kws = (
"", "抓到", "放到", "合成", "替换", "", "", "高清", "尺寸", "", "", "颜色", "去背景", "排版", "一样", "类似", "同款",
"能不能做", "能做吗", "可以做吗", "做不做", "这个能做吗", "这个能不能做",
)
return any(k in msg for k in kws)
def _add_pending_images(self, key: str, urls: list, limit: int = 12):
if not urls:
return
cur = self._pending_images.get(key) or []
for u in urls:
if u not in cur:
cur.append(u)
if len(cur) >= limit:
break
self._pending_images[key] = cur
async def _flush_pending_images(self, key: str, data: dict):
urls = self._pending_images.get(key) or []
if not urls:
return
self._pending_images[key] = []
if len(urls) == 1:
await self._analyze_single_and_reply(data, urls[0])
else:
await self._analyze_multi_and_reply(data, urls)
def _msg_is_price_inquiry(self, msg: str) -> bool:
"""判断是否是价格询问"""
if not msg:
return False
patterns = ("多少钱", "多少一张", "一张多少钱", "画图多少", "报价", "给个价", "几块", "多少钱")
return any(p in msg for p in patterns)
def _detect_order_status(self, msg: str) -> str:
if not msg:
return ""
s = msg
if "买家已付款" in s or "已付款" in s:
return "paid"
if "[系统订单信息]" in s:
if "等待买家付款" in s or "未付款" in s:
return "waiting"
return "order"
return ""
async def _analyze_single_and_reply(self, data: dict, url: str):
try:
from image.image_analyzer import image_analyzer
r = await image_analyzer.analyze(url)
if isinstance(r, dict) and r.get("success", False):
if r.get("feasibility") == "no" or r.get("risk") == "high":
note = str(r.get("note", "") or "")
if "文字内容过于密集" in note or "密集文字" in note:
reply = "这类文字太密的图我们这边不接单,抱歉哈。你要是简化后再发我可以继续看。"
else:
reply = "这张处理风险比较高,我这边先不直接接,建议转人工评估更稳。"
await self.send_reply(data, reply)
return
from config.config import MIN_PRICE_FLOOR
p = r.get("price_suggest", 20)
floor_dyn = r.get("price_min", MIN_PRICE_FLOOR)
floor = max(MIN_PRICE_FLOOR, int(floor_dyn) if isinstance(floor_dyn, (int, float)) else MIN_PRICE_FLOOR)
p = max(floor, round(p / 5) * 5)
try:
from db.customer_db import db as _db
_db.update_last_min_price(data.get('from_id',''), floor)
except Exception:
pass
reply = f"这张按{p}元,满意再拍"
else:
# 识别失败时不做兜底报价,避免把未识别图片误判为可做
reply = "这张我这边暂时识别不稳定,先不乱报价。你可以换一张更清晰的,我再给你准报价。"
await self.send_reply(data, reply)
except Exception:
pass
async def agent_reply(self, data: dict):
"""使用 Agent 处理消息并回复"""
try:
msg_text = self.to_chinese(data.get('msg', ''))
_cid = data.get('from_id', '')
_name = self.to_chinese(data.get('from_name', '') or data.get('cy_name', ''))
_plat = data.get('acc_type', '')
_shop_type = _get_shop_type(data.get('acc_id', ''), self.to_chinese(data.get('goods_name', '') or ''))
# 超大尺寸(米制)直接拒单,避免进入报价/处理流程
oversize_reply = self._oversize_reply_if_needed(msg_text)
if oversize_reply:
await self.send_reply(data, oversize_reply)
return
# 找图/修图店铺:统一走 Agent 的“收集需求后统一报价”流程,避免按单图快速报价
if self._legacy_fast_quote_enabled and _shop_type != "find_image":
# 消息含图片URL累积到待处理列表先询问要求
if self._msg_has_image_url(msg_text):
urls = self._extract_image_urls(msg_text)
key = self._customer_key(data)
self._add_pending_images(key, urls)
await self.send_reply(data, "收到,我看看哈")
old = self._pending_image_tasks.get(key)
if old and not old.done():
old.cancel()
async def _delay_flush(capture_key, capture_data):
await asyncio.sleep(self._DEBOUNCE_SECONDS + 4)
# 与同客户 agent_reply 串行,避免“延迟报价”和“当前追问”并发打架
async with self._get_customer_lock(capture_key):
await self._flush_pending_images(capture_key, capture_data)
task = asyncio.create_task(_delay_flush(key, data))
self._pending_image_tasks[key] = task
return
elif self._msg_refers_images(msg_text):
urls = self._collect_recent_image_urls(_cid, data.get('acc_id', ''), max_count=6)
if urls:
key = self._customer_key(data)
self._add_pending_images(key, urls)
await self.send_reply(data, "稍等,我找找刚才那几张")
await self._flush_pending_images(key, data)
return
else:
status = self._detect_order_status(msg_text)
if status == "paid":
ack = "收到付款,我马上安排处理,有需要第一时间联系您"
await self.send_reply(data, ack)
return
elif status in ("waiting", "order"):
ack = "订单我看到了哈,方便的话请完成付款,我好安排处理"
await self.send_reply(data, ack)
return
else:
urls = self._extract_image_urls(msg_text)
if len(urls) == 1:
key = self._customer_key(data)
self._add_pending_images(key, urls)
await self.send_reply(data, "收到,我看看哈")
return
else:
if self._msg_requests_external_contact(msg_text):
reply = "这里沟通就可以哦,其他联系方式不方便"
await self.send_reply(data, reply)
try:
from utils.wechat_chat_log import push_chat_to_wechat
asyncio.create_task(push_chat_to_wechat(
customer_name=_name,
customer_id=_cid,
acc_id=data.get('acc_id', ''),
customer_msg=msg_text,
reply_msg=reply,
goods_name=self.to_chinese(data.get('goods_name', '') or ''),
))
except Exception:
pass
return
if self._msg_is_requirement(msg_text) or self._msg_is_price_inquiry(msg_text):
key = self._customer_key(data)
if self._pending_images.get(key):
old = self._pending_image_tasks.get(key)
if old and not old.done():
old.cancel()
await self.send_reply(data, "稍等,我把刚才那几张一起看下")
await self._flush_pending_images(key, data)
return
if self._msg_is_price_inquiry(msg_text):
recent_urls = self._collect_recent_image_urls(_cid, data.get('acc_id', ''), max_count=6)
if recent_urls:
await self.send_reply(data, "稍等,我刚才那几张一起看下")
if len(recent_urls) == 1:
asyncio.create_task(self._analyze_single_and_reply(data, recent_urls[0]))
else:
asyncio.create_task(self._analyze_multi_and_reply(data, recent_urls))
return
status = self._detect_order_status(msg_text)
if status == "paid":
ack = "收到付款,我马上安排处理,有需要第一时间联系您"
await self.send_reply(data, ack)
return
elif status in ("waiting", "order"):
ack = "订单我看到了哈,方便的话请完成付款,我好安排处理"
await self.send_reply(data, ack)
return
# 构建 CustomerMessage
customer_msg = CustomerMessage(
msg_id=data.get('msg_id', ''),
acc_id=data.get('acc_id', ''),
msg=self.to_chinese(data.get('msg', '')),
from_id=data.get('from_id', ''),
from_name=self.to_chinese(data.get('from_name', '')),
cy_id=data.get('cy_id', ''),
acc_type=data.get('acc_type', ''),
msg_type=data.get('msg_type', 0),
cy_name=self.to_chinese(data.get('cy_name', '')),
goods_name=self.to_chinese(data.get('goods_name', '')) if data.get('goods_name') else None,
goods_order=self.to_chinese(data.get('goods_order', '')) if data.get('goods_order') else None
)
# 先检查是否是 workflow 等待确认中的回复(如邮箱、确认/不满意)
if workflow:
workflow_reply = await workflow.handle_customer_reply(
customer_id=data.get('from_id', ''),
message=self.to_chinese(data.get('msg', '')),
acc_id=data.get('acc_id', ''),
acc_type=data.get('acc_type', 'AliWorkbench')
)
if workflow_reply:
logger.info(f"Workflow 回复: {workflow_reply}")
await self.send_reply(data, workflow_reply)
# 推送到企微:客户消息+回复成对
try:
from utils.wechat_chat_log import push_chat_to_wechat
asyncio.create_task(push_chat_to_wechat(
customer_name=_name,
customer_id=_cid,
acc_id=data.get('acc_id', ''),
customer_msg=msg_text,
reply_msg=workflow_reply,
goods_name=self.to_chinese(data.get('goods_name', '') or ''),
))
except Exception:
pass
return
logger.info("Agent 正在处理消息...")
# 调用 Agent
response = await self.agent.process_message(customer_msg)
# 检查是否需要转接人工
if response.need_transfer:
logger.info("Agent 决定转接人工")
await self.transfer_to_human(data, response.transfer_msg)
# 推送到企微:客户消息+转接回复成对
try:
from utils.wechat_chat_log import push_chat_to_wechat
asyncio.create_task(push_chat_to_wechat(
customer_name=_name,
customer_id=_cid,
acc_id=data.get('acc_id', ''),
customer_msg=msg_text,
reply_msg=response.transfer_msg or "转接",
goods_name=self.to_chinese(data.get('goods_name', '') or ''),
))
except Exception:
pass
# 联系方式提取已由 Agent 的 update_contact_info 工具负责
# 此处仅做兜底:更新最后联系时间
customer_id = data.get('from_id', '')
if customer_id:
try:
profile = db.get_customer(customer_id)
profile.last_contact = datetime.now().isoformat()
db.save_customer(profile)
except Exception:
pass
# 保存对话摘要(异步,不阻塞回复)
if response.should_reply and response.reply and customer_id:
asyncio.create_task(self._save_conversation_summary(
customer_id=customer_id,
buyer_msg=self.to_chinese(data.get('msg', '')),
agent_reply=response.reply,
))
# 正常回复
if response.should_reply and response.reply:
# 过滤 AI 误输出的"无需回复"类废话,避免发给客户
nonsense_patterns = [
"无需", "流程已完成", "不需要回复", "无需额外", "已完成",
"无需回复", "不需要额外", "已经完成", "无需再", "操作已完成",
"任务完成", "流程完成", "记录完成", "报价已",
]
matched = [p for p in nonsense_patterns if p in response.reply]
if matched:
logger.warning(f"Agent 回复含无效内容,已拦截: {response.reply} ← 命中pattern: {matched}")
else:
# 模拟真人打字延迟,避免瞬间回复太机械
await asyncio.sleep(0.8)
logger.info(f"Agent 回复: {response.reply}")
await self.send_reply(data, response.reply)
# 推送到企微:客户消息+AI回复成对
try:
from utils.wechat_chat_log import push_chat_to_wechat
asyncio.create_task(push_chat_to_wechat(
customer_name=_name,
customer_id=_cid,
acc_id=data.get('acc_id', ''),
customer_msg=msg_text,
reply_msg=response.reply,
goods_name=self.to_chinese(data.get('goods_name', '') or ''),
))
except Exception:
pass
elif not response.need_transfer:
logger.info("Agent 决定不回复此消息")
except Exception as e:
logger.error(f"Agent 处理失败: {e}")
async def _analyze_multi_and_reply(self, data: dict, urls: list):
try:
from image.image_analyzer import image_analyzer
def _detect_composite_request() -> bool:
try:
from db.chat_log_db import get_recent_conversation
recent = get_recent_conversation(
customer_id=data.get('from_id', ''),
acc_id=data.get('acc_id', ''),
limit=8
)
kw = ("抓到", "放到", "合成", "融合", "嵌到", "换到", "替换", "P到", "抠出来放到")
for m in recent:
msg = (m.get("message") or "")
if any(k in msg for k in kw):
return True
except Exception:
pass
return False
tasks = [image_analyzer.analyze(u) for u in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 先做风险分流:多图中只要出现不可做/高风险,不进入报价
unsafe = []
dense_text_reject = []
for i, r in enumerate(results, 1):
if isinstance(r, dict) and r.get("success", False):
if r.get("feasibility") == "no" or r.get("risk") == "high":
unsafe.append(f"{i}")
note = str(r.get("note", "") or "")
if "文字内容过于密集" in note or "密集文字" in note:
dense_text_reject.append(f"{i}")
if unsafe:
if dense_text_reject and len(dense_text_reject) == len(unsafe):
reply = "这类文字太密的图我们这边不接单,抱歉哈。你要是简化后再发我可以继续看。"
else:
reply = f"这批里{''.join(unsafe)}处理风险较高,我这边先不直接接,建议转人工评估更稳。"
await self.send_reply(data, reply)
return
pairs = []
for u, r in zip(urls, results):
if isinstance(r, dict) and r.get("success", False):
from config.config import MIN_PRICE_FLOOR
floor_dyn = r.get("price_min", MIN_PRICE_FLOOR)
floor = max(MIN_PRICE_FLOOR, int(floor_dyn) if isinstance(floor_dyn, (int, float)) else MIN_PRICE_FLOOR)
ps = max(floor, round(r.get("price_suggest", 20) / 5) * 5)
pairs.append((u, ps, r.get("category", ""), r.get("megapixels", 0.0)))
try:
if pairs:
floors = []
for u, r in zip(urls, results):
if isinstance(r, dict) and r.get("success", False):
floor_dyn = r.get("price_min", MIN_PRICE_FLOOR)
floor = max(MIN_PRICE_FLOOR, int(floor_dyn) if isinstance(floor_dyn, (int, float)) else MIN_PRICE_FLOOR)
floors.append(floor)
if floors:
from db.customer_db import db as _db
_db.update_last_min_price(data.get('from_id',''), min(floors))
except Exception:
pass
if not pairs:
await self.send_reply(data, "这组图我这边暂时识别不稳定,先不乱报价。你可以换清晰图再发我。")
return
composite = _detect_composite_request()
composite_fee = 5 if composite else 0
avg_raw = sum(p for _, p, _, _ in pairs) / len(pairs)
from config.config import MIN_PRICE_FLOOR
avg_price = max(MIN_PRICE_FLOOR, round((avg_raw + composite_fee) / 5) * 5)
top_price = max(MIN_PRICE_FLOOR, max(pairs, key=lambda x: x[1])[1] + composite_fee)
count = len(pairs)
if composite:
reply = f"这组{count}张我看了,按{avg_price}元一张;合成那张{top_price}元,满意再拍"
else:
reply = f"这组{count}张我看了,按{avg_price}元一张;复杂那张{top_price}元,满意再拍"
await self.send_reply(data, reply)
except Exception as e:
logger.error(f"多图分析失败: {e}")
try:
await self.send_reply(data, "这组图我这边暂时识别异常,先不乱报价。你可以稍后再发我。")
except Exception:
pass
def _msg_requests_external_contact(self, msg: str) -> bool:
if not msg:
return False
lower = msg.lower()
kws = ("加qq", "qq号", "vx", "微信", "加v", "联系方式", "私聊", "加一下", "加个", "手机号", "电话", "加群", "q q", "v 信")
return any(k in lower for k in kws)
@staticmethod
def _extract_size_pairs_m(msg: str) -> list[tuple[float, float]]:
"""提取消息中的米制尺寸对,如 15*6.4米 / 15米*6.4 / 15x6.4m。"""
if not msg:
return []
s = (msg or "").lower().replace("×", "*").replace("x", "*")
pairs = []
patterns = [
r'(\d+(?:\.\d+)?)\s*\*\s*(\d+(?:\.\d+)?)\s*(?:米|m)\b',
r'(\d+(?:\.\d+)?)\s*(?:米|m)\s*\*\s*(\d+(?:\.\d+)?)\b',
]
for p in patterns:
for m in re.findall(p, s):
try:
a = float(m[0])
b = float(m[1])
if a > 0 and b > 0:
pairs.append((a, b))
except Exception:
continue
return pairs
def _oversize_reply_if_needed(self, msg: str) -> str:
"""
检测超大尺寸需求并返回拒绝话术;未命中返回空字符串。
规则:最长边 > 阈值 或 面积 > 阈值。
"""
try:
from config.config import MAX_SERVICE_SIZE_LONGEST_METERS, MAX_SERVICE_SIZE_AREA_SQM
longest_limit = float(MAX_SERVICE_SIZE_LONGEST_METERS)
area_limit = float(MAX_SERVICE_SIZE_AREA_SQM)
except Exception:
longest_limit = 10.0
area_limit = 20.0
pairs = self._extract_size_pairs_m(msg)
for w, h in pairs:
longest = max(w, h)
area = w * h
if longest > longest_limit or area > area_limit:
return (
f"{w:g}米*{h:g}米这个尺寸太大了,我们这边做不了。"
"如果要做可以拆成几段小尺寸,我再给你按段评估。"
)
return ""
def _is_transfer_msg(self, data: dict) -> bool:
"""判断是否是会话转交消息(需要主动打招呼)"""
msg = self.to_chinese(data.get('msg', ''))
return '转交给' in msg or '转接给' in msg
def _pick_transfer_greeting(self) -> str:
"""转接后问候话术:简短自然,随机避免机械感。"""
choices = [
"在的亲,发图我看下",
"在呢亲,有需求直接说",
"我在的,您把要求发我",
"在的哈,你说我这边看着处理",
"在呢,图和需求发来我看看",
]
return random.choice(choices)
def _is_shop_card(self, data: dict) -> bool:
"""判断是否是进店卡片消息"""
msg = self.to_chinese(data.get('msg', ''))
return msg.startswith('[进店卡片]') or '我想咨询你们店的这个商品' in msg
def _has_chat_history(self, customer_id: str) -> bool:
"""判断该客户是否已有聊天记录(内存历史或数据库均可)"""
if not customer_id:
return False
# 先查内存对话历史(最快)
if customer_id in self.agent.message_histories and self.agent.message_histories[customer_id]:
return True
# 再查数据库(重启后仍有记录)
try:
from db.chat_log_db import get_conversation
msgs = get_conversation(customer_id, limit=1)
return len(msgs) > 0
except Exception:
return False
def _load_system_inquiry_rules(self) -> Dict[str, Any]:
"""加载系统客服询单规则(全局 + 店铺覆盖)。"""
from config.config import (
SYSTEM_INQUIRY_ENABLED,
SYSTEM_INQUIRY_DEFAULT_ACTION,
SYSTEM_INQUIRY_DEFAULT_REPLY,
SYSTEM_INQUIRY_RULES_FILE,
)
enabled_env = os.getenv("SYSTEM_INQUIRY_ENABLED")
enabled = (
enabled_env.lower() in ("1", "true", "yes")
if isinstance(enabled_env, str)
else bool(SYSTEM_INQUIRY_ENABLED)
)
action = (os.getenv("SYSTEM_INQUIRY_DEFAULT_ACTION") or SYSTEM_INQUIRY_DEFAULT_ACTION or "silent").strip().lower()
reply = os.getenv("SYSTEM_INQUIRY_DEFAULT_REPLY") or SYSTEM_INQUIRY_DEFAULT_REPLY or ""
rules_file = os.getenv("SYSTEM_INQUIRY_RULES_FILE") or str(SYSTEM_INQUIRY_RULES_FILE)
defaults: Dict[str, Any] = {
"enabled": bool(enabled),
"default_action": action,
"default_reply": reply,
"sender_keywords": ["系统客服", "官方客服", "平台客服", "机器人客服", "商家客服系统"],
"message_keywords": ["系统询单", "代客咨询", "平台代问", "系统代发", "客服询单"],
"shops": {},
}
try:
p = Path(rules_file)
if p.exists():
with p.open("r", encoding="utf-8") as f:
loaded = json.load(f)
if isinstance(loaded, dict):
defaults.update(loaded)
except Exception as e:
logger.warning(f"系统询单规则加载失败,使用默认规则: {e}")
return defaults
@staticmethod
def _normalize_kw_list(v: Any) -> List[str]:
if not isinstance(v, list):
return []
return [str(x).strip().lower() for x in v if str(x).strip()]
def _resolve_system_inquiry_policy(self, acc_id: str) -> Dict[str, Any]:
"""根据店铺合并系统询单策略。"""
from config.config import SYSTEM_INQUIRY_SHOPS
rules = self._system_inquiry_rules or {}
if not bool(rules.get("enabled", True)):
return {"enabled": False}
shops_env = os.getenv("SYSTEM_INQUIRY_SHOPS", SYSTEM_INQUIRY_SHOPS or "")
shop_whitelist = [s.strip() for s in shops_env.split(",") if s.strip()]
if shop_whitelist and (acc_id or "") not in shop_whitelist:
return {"enabled": False}
policy: Dict[str, Any] = {
"enabled": True,
"action": str(rules.get("default_action", "silent")).strip().lower(),
"reply": str(rules.get("default_reply", "")).strip(),
"sender_keywords": self._normalize_kw_list(rules.get("sender_keywords")),
"message_keywords": self._normalize_kw_list(rules.get("message_keywords")),
}
shop_cfg = (rules.get("shops") or {}).get(acc_id or "", {})
if isinstance(shop_cfg, dict):
if "enabled" in shop_cfg and not bool(shop_cfg.get("enabled", True)):
return {"enabled": False}
if shop_cfg.get("action"):
policy["action"] = str(shop_cfg.get("action")).strip().lower()
if shop_cfg.get("reply"):
policy["reply"] = str(shop_cfg.get("reply")).strip()
if isinstance(shop_cfg.get("sender_keywords"), list):
policy["sender_keywords"] = self._normalize_kw_list(shop_cfg.get("sender_keywords"))
if isinstance(shop_cfg.get("message_keywords"), list):
policy["message_keywords"] = self._normalize_kw_list(shop_cfg.get("message_keywords"))
if policy["action"] not in ("silent", "reply", "transfer"):
policy["action"] = "silent"
return policy
def _match_system_inquiry(self, data: dict, policy: Dict[str, Any]) -> bool:
"""识别是否为系统客服询单消息。"""
if not policy.get("enabled", False):
return False
from_name = self.to_chinese(data.get("from_name", "") or "").lower()
from_id = str(data.get("from_id", "") or "").lower()
msg = self.to_chinese(data.get("msg", "") or "").lower()
sender_hits = 0
for kw in policy.get("sender_keywords", []):
if kw and (kw in from_name or kw in from_id):
sender_hits += 1
message_hits = 0
for kw in policy.get("message_keywords", []):
if kw and kw in msg:
message_hits += 1
# 优先看发送者特征;纯文本命中时至少要求两个关键词,降低误判风险
return sender_hits > 0 or message_hits >= 2
async def _handle_system_inquiry(self, data: dict) -> bool:
"""命中系统询单后按策略处理。"""
acc_id = data.get("acc_id", "")
policy = self._resolve_system_inquiry_policy(acc_id)
if not self._match_system_inquiry(data, policy):
return False
customer_id = data.get("from_id", "")
metrics_emit("system_inquiry_detected", customer_id=customer_id, acc_id=acc_id)
action = policy.get("action", "silent")
logger.info(f"系统询单命中 | 店铺:{acc_id} | 客户:{customer_id} | action:{action}")
if action == "reply":
reply = policy.get("reply") or "您好,这边已收到询单消息,稍后由人工客服跟进处理。"
await self.send_reply(data, reply)
metrics_emit("system_inquiry_auto_reply", customer_id=customer_id, acc_id=acc_id)
return True
if action == "transfer":
await self.transfer_to_human(data, "系统询单转人工")
metrics_emit("system_inquiry_transfer", customer_id=customer_id, acc_id=acc_id)
return True
metrics_emit("system_inquiry_ignored", customer_id=customer_id, acc_id=acc_id)
return True
def _should_ignore(self, data: dict) -> bool:
"""判断是否应该忽略该消息(不回复)"""
msg = self.to_chinese(data.get('msg', ''))
# 会话转交由 _is_transfer_msg 单独处理,这里不再忽略
ignore_patterns = [
'已转接',
'接入会话',
'结束会话',
'会话已',
'[系统消息]',
'[系统通知]',
]
for pattern in ignore_patterns:
if pattern in msg:
return True
# 发送者是自己(店铺账号),避免回复自己发的消息
acc_id = data.get('acc_id', '')
from_id = data.get('from_id', '')
if acc_id and from_id and acc_id == from_id:
return True
return False
def get_msg_type_name(self, msg_type):
"""获取消息类型名称"""
types = {
0: "文本",
1: "图片",
2: "视频",
3: "文件"
}
return types.get(msg_type, f"未知({msg_type})")
def _extract_and_save_customer_info(self, message: str, customer_id: str):
"""从消息中提取客户信息并保存"""
if not message or not customer_id:
return
# 提取邮箱
email_pattern = r'[\w\.-]+@[\w\.-]+\.\w+'
email_match = re.search(email_pattern, message)
if email_match:
db.update_email(customer_id, email_match.group())
# 提取手机号
phone_pattern = r'1[3-9]\d{9}'
phone_match = re.search(phone_pattern, message)
if phone_match:
db.update_phone(customer_id, phone_match.group())
# 提取微信号
wechat_pattern = r'[Vv微信]+号[:]?\s*([\w-]+)'
wechat_match = re.search(wechat_pattern, message)
if wechat_match:
db.update_wechat(customer_id, wechat_match.group(1))
# 提取预算关键词
budget_keywords = ['预算', '不超过', '最多', '便宜点', '便宜']
for keyword in budget_keywords:
if keyword in message:
db.add_personality_tag(customer_id, "关注价格")
break
# 提取性格关键词
personality_keywords = {
'爽快': '爽快',
'干脆': '爽快',
'纠结': '纠结',
'墨迹': '纠结',
'砍价': '砍价',
'': '砍价'
}
for keyword, tag in personality_keywords.items():
if keyword in message:
db.add_personality_tag(customer_id, tag)
# 更新最后联系时间
profile = db.get_customer(customer_id)
profile.last_contact = datetime.now().isoformat()
db.save_customer(profile)
def to_chinese(self, text):
"""处理文本,安全地转换 unicode 转义"""
if not isinstance(text, str):
return text
if '\\u' not in text:
return text
try:
return json.loads(f'"{text}"')
except Exception:
return text
async def handle_image_message(self, data: dict):
"""
处理图片消息。
先回复"我找找"然后把图片URL作为消息内容交给 Agent后台执行
Agent 会自主调用 analyze_image() 工具分析复杂度,再报价。
整个过程由 Agent 自主协调,无需外部干预。
不阻塞接收循环,可同时接收其他客户消息。
"""
# 立刻回复,让客户感觉真人在操作
await self.send_reply(data, "我找找")
# 把图片URL当作消息内容交给 Agent 后台处理(图片分析约 12 秒,不阻塞新消息接收)
image_data = dict(data)
image_data['msg'] = f"[客户发来图片] {data.get('msg', '')}"
image_data['msg_type'] = 0 # 转为文本消息,让 agent_reply 处理
self._fire_and_forget(self._agent_reply_serialized(image_data))
async def _dispatch_assign_once(self) -> Dict[str, Any]:
"""
调用新的一键派单接口:
GET {DISPATCH_BASE_URL}/assign
Header: X-API-Key
"""
base_url = os.getenv("DISPATCH_BASE_URL", "http://1.12.50.92:8006").strip().rstrip("/")
api_key = os.getenv("DISPATCH_API_KEY", "tuhui_dispatch_key_2026").strip()
timeout_s = float(os.getenv("DISPATCH_TIMEOUT_SECONDS", "5"))
if not base_url or not api_key:
return {"success": False, "reason": "dispatch config missing"}
try:
import httpx
async with httpx.AsyncClient(timeout=timeout_s) as client:
resp = await client.get(
f"{base_url}/assign",
headers={"X-API-Key": api_key},
)
if resp.status_code != 200:
return {"success": False, "reason": f"http {resp.status_code}"}
data = resp.json() if resp.content else {}
ok = bool((data or {}).get("success", False))
return {
"success": ok,
"task_id": str((data or {}).get("task_id", "") or ""),
"assigned_to": str((data or {}).get("assigned_to", "") or ""),
"online_count": int((data or {}).get("online_count", 0) or 0),
"notification_sent": bool((data or {}).get("notification_sent", False)),
"raw": data,
}
except Exception as e:
return {"success": False, "reason": str(e)}
async def transfer_to_human(self, data: dict, transfer_msg: str = ""):
"""
转接人工客服。
1. 优先调用 dispatch 服务 GET /assign 一键派单
2. 派单失败时,回退旧版 designer_roster 派单
3. 无人在线或未配置时,回退到 config/transfer_groups.json
设计师在线状态:仅在转人工时按需查询,不轮询。
"""
if not self.websocket:
print(f"[{self.get_time()}] 错误: 未连接到服务器")
return
acc_id = data.get("acc_id", "")
group_id = None
assigned_to = ""
dispatch_res = await self._dispatch_assign_once()
if dispatch_res.get("success"):
assigned_to = str(dispatch_res.get("assigned_to", "") or "").strip()
logger.info(
f"一键派单成功 | task_id={dispatch_res.get('task_id','')} | assigned_to={assigned_to or '未知'} | online_count={dispatch_res.get('online_count',0)}"
)
metrics_emit(
"dispatch_assign_success",
acc_id=acc_id,
assigned_to=assigned_to,
online_count=dispatch_res.get("online_count", 0),
)
else:
logger.warning(f"一键派单失败,回退旧派单逻辑: {dispatch_res.get('reason', 'unknown')}")
metrics_emit("dispatch_assign_failed", acc_id=acc_id)
# 2. 派单失败时,回退旧版 designer_roster
if not dispatch_res.get("success"):
try:
from utils.designer_roster import poll_and_update_roster
from db.designer_roster_db import get_transfer_group_for_shop
await poll_and_update_roster()
group_id = get_transfer_group_for_shop(acc_id)
except Exception as e:
logger.debug(f"设计师派单未启用或异常: {e}")
# 3. 无人在线时企微提醒(新旧两套都没拿到在线结果时)
online_count = int(dispatch_res.get("online_count", 0) or 0)
if online_count <= 0 and not group_id:
try:
from config.config import WECHAT_WEBHOOK
if WECHAT_WEBHOOK:
import httpx
async with httpx.AsyncClient(timeout=5) as client:
resp = await client.post(WECHAT_WEBHOOK, json={
"msgtype": "text",
"text": {"content": "谁在线啊"}
})
if resp.status_code != 200:
logger.warning(f"企微提醒发送失败: {resp.status_code} {resp.text}")
else:
logger.debug("未配置 WECHAT_WEBHOOK跳过企微提醒")
except Exception as e:
logger.warning(f"企微提醒发送异常: {e}")
# 4. 构造转接命令:有 assigned_to 用人名,否则回退分组
if assigned_to:
cmd = f"正在为你转接人工|[转移会话],{assigned_to},无原因"
await self.send_reply(data, cmd)
print(f"[{self.get_time()}] 已发送转接请求 (店铺:{acc_id or '未知'} -> 设计师:{assigned_to})")
return
if not group_id:
group_id = _get_transfer_group(acc_id)
cmd = f"话术|[转移会话],分组{group_id},无原因"
await self.send_reply(data, cmd)
print(f"[{self.get_time()}] 已发送转接请求 (店铺:{acc_id or '未知'} -> 分组:{group_id})")
async def _save_conversation_summary(self, customer_id: str, buyer_msg: str, agent_reply: str):
"""用 AI 生成一句话对话摘要并持久化"""
try:
from db.customer_db import db
from openai import AsyncOpenAI
client = AsyncOpenAI(
api_key=self.agent.api_key if self.agent else None,
base_url=self.agent.base_url if self.agent else None,
)
resp = await client.chat.completions.create(
model=self.agent.model_name if self.agent else "gpt-4o-mini",
messages=[
{"role": "system", "content": "用一句话15字以内总结这段对话的核心内容只输出摘要文字。"},
{"role": "user", "content": f"买家:{buyer_msg}\n客服:{agent_reply}"},
],
max_tokens=30,
temperature=0.3,
)
summary = resp.choices[0].message.content.strip()
db.save_conversation_summary(customer_id, summary)
except Exception:
pass # 摘要失败不影响主流程
async def _workflow_agent_notify(
self,
customer_id: str,
acc_id: str,
acc_type: str,
system_hint: str,
):
"""图片处理完成后,让客服 AI 生成自然话术发给客户"""
if not self.enable_agent or not self.agent:
return
try:
from core.pydantic_ai_agent import CustomerMessage
notify_msg = CustomerMessage(
msg_id="workflow_notify",
acc_id=acc_id,
msg=system_hint,
from_id=customer_id,
from_name="",
cy_id=customer_id,
acc_type=acc_type,
msg_type=0,
cy_name="",
)
response = await self.agent.process_message(notify_msg)
if response.should_reply and response.reply:
nonsense_patterns = [
"无需", "流程已完成", "不需要回复", "无需额外", "已完成",
"无需回复", "不需要额外", "已经完成", "无需再", "操作已完成",
"任务完成", "流程完成", "记录完成", "报价已",
]
if not any(p in response.reply for p in nonsense_patterns):
# 构造一个虚拟原始消息用于 send_reply
fake_data = {
"acc_id": acc_id,
"from_id": customer_id,
"from_name": "",
"cy_id": customer_id,
"acc_type": acc_type,
}
await asyncio.sleep(0.5)
await self.send_reply(fake_data, response.reply)
logger.info(f"[Workflow] AI 通知已发送: {response.reply}")
except Exception as e:
logger.error(f"[Workflow] AI 通知生成失败: {e}")
async def _workflow_send(
self,
customer_id: str,
acc_id: str,
acc_type: str,
content: str,
msg_type: int = 0
):
"""workflow 回调图片AI完成后用此方法推送消息给客户"""
msg = {
"msg_id": "",
"acc_id": acc_id,
"msg": content,
"from_id": customer_id,
"from_name": customer_id,
"cy_id": customer_id,
"acc_type": acc_type,
"msg_type": msg_type,
"cy_name": customer_id
}
await self.send_message(msg)
async def send_reply(self, original_msg, reply_content):
"""
发送回复消息
Args:
original_msg: 收到的原始消息字典
reply_content: 回复内容(文本或本地文件路径/http地址
"""
if not self.websocket:
print(f"[{self.get_time()}] 错误: 未连接到服务器")
return
reply_content = self._colloquialize_outbound_reply(reply_content)
# 同一客户外发限流N 秒内最多 1 条
try:
from config.config import OUTBOUND_PER_CUSTOMER_COOLDOWN_SECONDS
cooldown = max(0, int(OUTBOUND_PER_CUSTOMER_COOLDOWN_SECONDS))
except Exception:
cooldown = 5
if cooldown > 0:
ckey = f"{original_msg.get('acc_id', '')}:{original_msg.get('from_id', '')}"
now_mono = time.monotonic()
last = self._last_reply_sent_at.get(ckey, 0.0)
if (now_mono - last) < cooldown:
logger.info(
f"外发限流命中,跳过发送 | 客户:{ckey} | cooldown:{cooldown}s | msg:{str(reply_content)[:40]}"
)
return
self._last_reply_sent_at[ckey] = now_mono
shop_id = original_msg.get("acc_id", "")
# 根据轻简API文档
# from_id = 客户ID收消息方
# cy_id = 非群聊时与 from_id 相同
customer_id = original_msg.get("from_id", "")
customer_name = original_msg.get("from_name", "")
reply = {
"msg_id": "",
"acc_id": shop_id,
"msg": reply_content,
"from_id": customer_id,
"from_name": customer_name,
"cy_id": customer_id,
"acc_type": original_msg.get("acc_type", ""),
"msg_type": 0,
"cy_name": customer_name
}
self._log_outbound_once(original_msg, str(reply_content))
await self.send_message(reply)
def _colloquialize_outbound_reply(self, text: Any) -> Any:
"""统一外发口语化处理,避免机械话术。"""
if not isinstance(text, str):
return text
raw = text.strip()
if not raw:
return text
# 控制指令/转接命令不得改写
if raw.startswith("话术|") or "[转移会话]" in raw:
return text
# 纯链接不改
if re.fullmatch(r"https?://\S+", raw):
return text
out = raw
replacements = {
"我这边": "我这边",
"请您": "",
"您好": "你好",
"稍后": "一会儿",
"可以的话": "可以的话",
"请稍等": "稍等哈",
"先不乱报价": "先不急着给你乱报",
"建议转人工评估更稳": "建议转人工看会更稳",
"统一报价": "一起报价",
"马上安排": "马上给你安排",
"确认我就安排": "你点头我就开做",
"收到,我看看哈": "收到,我先看下",
"收到,我找找刚才那几张": "收到,我把刚才那几张一起看下",
"这组图我这边暂时识别不稳定": "这组图我这边识别得不太稳",
"这组图我这边暂时识别异常": "这组图我这边刚才识别有点异常",
"你可以换一张更清晰的,我再给你准报价。": "你换张更清晰的发我,我再给你报准点。",
"你可以换清晰图再发我。": "你换张清晰点的再发我哈。",
"你可以稍后再发我。": "你晚点再发我也行。",
"收到付款,我马上安排处理,有需要第一时间联系您": "收到付款啦,我马上安排处理,有进展第一时间告诉你",
"亲,正在为您转接人工客服,请稍等~": "我这就给你转人工,稍等哈~",
}
for k, v in replacements.items():
out = out.replace(k, v)
# 收尾语气柔化
out = out.replace("", "")
return out
async def send_text(self, cy_id, acc_type, content):
"""
主动发送文本消息
Args:
cy_id: 会话ID对方ID
acc_type: 平台类型
content: 消息内容
"""
message = {
"msg_id": "",
"acc_id": "",
"msg": content,
"from_id": self.reply_id,
"from_name": self.reply_id,
"cy_id": cy_id,
"acc_type": acc_type,
"msg_type": 0,
"cy_name": ""
}
await self.send_message(message)
async def send_image(self, cy_id, acc_type, image_path):
"""
主动发送图片消息
Args:
cy_id: 会话ID对方ID
acc_type: 平台类型
image_path: 图片本地路径或http地址
"""
message = {
"msg_id": "",
"acc_id": "",
"msg": image_path,
"from_id": self.reply_id,
"from_name": self.reply_id,
"cy_id": cy_id,
"acc_type": acc_type,
"msg_type": 1,
"cy_name": ""
}
await self.send_message(message)
async def send_message(self, message):
"""发送消息到服务器"""
if self.websocket and self.websocket.state == websockets.protocol.State.OPEN:
try:
msg_json = json.dumps(message, ensure_ascii=False)
await self.websocket.send(msg_json)
pretty = json.dumps(message, ensure_ascii=False, indent=2)
print(f"[{self.get_time()}] 发送成功:\n{pretty}")
except Exception as e:
print(f"[{self.get_time()}] 发送失败: {e}")
else:
print(f"[{self.get_time()}] 错误: 连接未打开")
async def auto_reply(self, data):
"""自动回复示例(已弃用,使用 agent_reply 替代)"""
pass
async def command_handler(self):
"""命令行交互"""
print("\n命令帮助:")
print(" reply <内容> - 回复最后一条消息")
print(" text <id> <平台> <内容> - 发送文本消息")
print(" img <id> <平台> <路径> - 发送图片")
print(" setid <id> - 设置回复ID")
print(" agent on/off - 开启/关闭 Agent")
print(" exit/quit - 退出\n")
while self.running:
try:
loop = asyncio.get_running_loop()
user_input = await loop.run_in_executor(None, input, "")
parts = user_input.strip().split(maxsplit=1)
if not parts:
continue
cmd = parts[0].lower()
if cmd in ["exit", "quit", "q"]:
print(f"[{self.get_time()}] 正在关闭...")
self.running = False
if self.websocket:
await self.websocket.close()
break
elif cmd == "setid" and len(parts) > 1:
self.reply_id = parts[1]
print(f"[{self.get_time()}] 回复ID已设置为: {self.reply_id}")
elif cmd == "agent" and len(parts) > 1:
if parts[1].lower() == "on":
self.enable_agent = True
print(f"[{self.get_time()}] Agent 已开启")
elif parts[1].lower() == "off":
self.enable_agent = False
print(f"[{self.get_time()}] Agent 已关闭")
elif cmd == "reply" and len(parts) > 1:
if self.last_msg:
await self.send_reply(self.last_msg, parts[1])
else:
print(f"[{self.get_time()}] 错误: 还没有收到任何消息")
elif cmd == "text" and len(parts) > 1:
# text cy_id acc_type content
args = parts[1].split(maxsplit=2)
if len(args) >= 3:
await self.send_text(args[0], args[1], args[2])
else:
print(f"[{self.get_time()}] 格式: text <cy_id> <acc_type> <内容>")
elif cmd == "img" and len(parts) > 1:
# img cy_id acc_type image_path
args = parts[1].split(maxsplit=2)
if len(args) >= 3:
await self.send_image(args[0], args[1], args[2])
else:
print(f"[{self.get_time()}] 格式: img <cy_id> <acc_type> <图片路径>")
else:
print(f"[{self.get_time()}] 未知命令: {cmd}")
except Exception as e:
print(f"[{self.get_time()}] 命令错误: {e}")
def get_time(self):
"""获取当前时间字符串"""
return datetime.now().strftime("%H:%M:%S")
async def run(self):
"""运行客户端"""
tasks = [self.connect(), self.command_handler()]
# 启动邮件接收后台任务
try:
from mail.email_receiver import email_receiver
if email_receiver.username:
print(f"[{self.get_time()}] 邮件接收已启动,监控: {email_receiver.username}")
tasks.append(email_receiver.start())
else:
print(f"[{self.get_time()}] 未配置邮件账号,跳过邮件接收")
except Exception as e:
print(f"[{self.get_time()}] 邮件接收模块加载失败: {e}")
# 启动每日汇总定时任务
try:
from utils.daily_summary import scheduler as daily_scheduler
tasks.append(daily_scheduler())
print(f"[{self.get_time()}] 每日日报定时任务已启动")
except Exception as e:
print(f"[{self.get_time()}] 日报模块加载失败: {e}")
# 设计师在线状态:转人工时按需查询,不再轮询
# 启动健康检查(轻简/企微断线告警)
try:
from utils.health_check import health_check_loop
def _qingjian_ok():
return self.websocket is not None and not getattr(self.websocket, "closed", True)
tasks.append(health_check_loop(_qingjian_ok))
print(f"[{self.get_time()}] 健康检查已启动")
except Exception as e:
print(f"[{self.get_time()}] 健康检查模块加载失败: {e}")
# 每天早上8点发送启动消息到企微群
try:
from utils.wechat_chat_log import morning_startup_scheduler
tasks.append(morning_startup_scheduler())
print(f"[{self.get_time()}] 早8点企微启动消息已启动")
except Exception as e:
print(f"[{self.get_time()}] 企微启动消息模块加载失败: {e}")
await asyncio.gather(*tasks)
if __name__ == "__main__":
import sys
# 检查是否有 --no-agent 参数
enable_agent = "--no-agent" not in sys.argv
client = QingjianAPIClient(enable_agent=enable_agent)
try:
asyncio.run(client.run())
except KeyboardInterrupt:
print("\n已停止")
async def _load_task_modules(self):
"""延迟加载任务模块,避免循环导入"""
from core.task_scheduler import get_task_scheduler
from core.task_trigger import get_trigger_engine
from db.task_db.task_model import get_task_manager
self.trigger_engine = get_trigger_engine()
async def check_and_trigger_tasks(self, data: dict):
"""检查并触发匹配的任务"""
try:
customer_key = self._customer_key(data)
customer_id = data.get('from_id')
message = data.get('content', '')
# 获取该客户的待触发任务
pending_tasks = self.task_manager.get_pending_tasks(customer_id)
for task in pending_tasks:
trigger = {
'type': task['trigger_type'],
'keyword': task['trigger_keyword'],
'keywords': task['trigger_keywords']
}
# 检查是否匹配触发条件
if self.task_scheduler.check_trigger_match(message, trigger):
logger.info(f"任务触发条件匹配:{task['task_id']}")
# 异步执行任务
asyncio.create_task(self.task_scheduler.execute_task(task))
except Exception as e:
logger.error(f"检查任务触发失败:{e}")
async def _load_task_modules(self):
"""延迟加载任务模块,避免循环导入"""
from core.task_scheduler import get_task_scheduler
from core.task_trigger import get_trigger_engine
from db.task_db.task_model import get_task_manager
self.trigger_engine = get_trigger_engine()
async def _load_task_modules(self):
"""延迟加载任务模块"""
if self.task_scheduler is None:
from core.task_scheduler import get_task_scheduler
from core.task_trigger import get_trigger_engine
from db.task_db.task_model import get_task_manager
self.trigger_engine = get_trigger_engine()
async def check_and_trigger_tasks_v2(self, data: dict):
"""增强版:检查并触发匹配的任务(支持指定客户)"""
# 确保任务模块已加载
await self._load_task_modules()
try:
customer_key = self._customer_key(data)
customer_id = data.get('from_id')
customer_name = data.get('from_name')
message = data.get('content', '')
# 准备上下文
context = {
'customer_id': customer_id,
'customer_name': customer_name,
'acc_id': data.get('acc_id')
}
# 获取该客户的待触发任务
pending_tasks = self.task_manager.get_pending_tasks(customer_id)
for task in pending_tasks:
trigger = {
'type': task['trigger_type'],
'keyword': task['trigger_keyword'],
'keywords': task['trigger_keywords'],
# 指定客户相关字段
'customer_id': task.get('specified_customer_id'),
'customer_name': task.get('specified_customer_name')
}
# 使用触发引擎检查是否匹配
if self.trigger_engine.check_trigger(message, trigger, context):
logger.info(f"任务触发条件匹配:{task['task_id']} (客户:{customer_name}/{customer_id})")
# 异步执行任务
asyncio.create_task(self.task_scheduler.execute_task(task))
except Exception as e:
logger.error(f"检查任务触发失败:{e}")