This commit is contained in:
2026-03-06 12:44:57 +08:00
parent fa61b11b02
commit 006b035de4
132 changed files with 1361 additions and 17400 deletions

View File

@@ -26,7 +26,8 @@ class QianniuAdapter(BaseAdapter):
with open(config_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
return cfg.get(acc_id, self._default_group_id)
except Exception: pass
except Exception as e:
logger.warning(f"[QianniuAdapter] 读取转接分组配置失败: {e}")
return self._default_group_id
async def translate_inbound(self, raw: dict) -> Tuple[StandardMessage, str]:
@@ -81,6 +82,9 @@ class QianniuAdapter(BaseAdapter):
content = res.reply_content
try:
logger.info(
f"[REPLY->CUSTOMER] user={user_id} acc={acc_id} type={res.msg_type}\n{content}"
)
await self.ws_client.send(customer_id=user_id, acc_id=acc_id, acc_type=acc_type, content=content, msg_type=res.msg_type)
except Exception as e:
logger.error(f"[QianniuAdapter] 发送失败: {e}")

View File

@@ -19,7 +19,7 @@ async def transfer_to_human_tool(ctx: RunContext[Any], reason: str = Field(descr
designer_name = await dispatch_service.assign_designer()
if designer_name:
# 2. 有设计师在线:生成标准转接指令
# 2. 有设计师在线:生成标准转接指令 (必须包含 [转移会话] 且格式正确)
magic_cmd = f"正在为您转接|[转移会话],{designer_name},无原因"
logger.info(f"[Tool] 成功呼叫设计师: {designer_name}")
return magic_cmd

View File

@@ -2,6 +2,7 @@ import logging
import asyncio
import re
import time
import json
from typing import Optional, List, Any, Dict
from collections import deque
from core.schema import StandardMessage, StandardResponse
@@ -12,6 +13,11 @@ from core.repository import repo
logger = logging.getLogger("cs_agent")
# 配置常量
MSG_DEDUP_CAPACITY = 200 # 消息 ID 去重缓存容量
TRANSFER_COOLDOWN_SEC = 60 # 转接冷却时间(秒)
DEBOUNCE_SECONDS = 2.0 # 消息防抖延迟(秒)
class SystemOrchestrator:
"""
全系统总编排:具备转接冷却、防抖合并、多消息去重、以及精准日志。
@@ -22,19 +28,27 @@ class SystemOrchestrator:
self.brain = CustomerServiceBrain()
# 1. 消息 ID 去重
self._processed_msg_ids = deque(maxlen=200)
self._processed_msg_ids = deque(maxlen=MSG_DEDUP_CAPACITY)
# 2. 转接冷却存储 (customer_id -> last_transfer_time)
self._last_transfer_time: Dict[str, float] = {}
# 3. 防抖配置
self._debounce_seconds = 5.0
self._debounce_seconds = DEBOUNCE_SECONDS
self._debounce_tasks: Dict[str, asyncio.Task] = {}
self._pending_messages: Dict[str, List[StandardMessage]] = {}
self._user_locks: Dict[str, asyncio.Lock] = {}
bus.subscribe("MESSAGE_OUTBOUND", self.handle_outbound_event)
@staticmethod
def _has_transfer_intent(text: str) -> bool:
if not text:
return False
t = str(text)
keywords = ("转人工", "转接", "人工客服", "人工", "设计师", "叫人", "找人")
return any(k in t for k in keywords)
def _get_user_lock(self, user_id: str) -> asyncio.Lock:
if user_id not in self._user_locks:
self._user_locks[user_id] = asyncio.Lock()
@@ -47,18 +61,34 @@ class SystemOrchestrator:
std_msg, direction = await self.qianniu_adapter.translate_inbound(raw_data)
# 关键修复:确保 user_id 绝不为空
user_id = std_msg.user_id or str(raw_data.get("cy_id") or raw_data.get("from_id") or "unknown")
std_msg.user_id = user_id
# 店铺隔离:同一客户在不同店铺的对话独立处理
session_key = f"{user_id}@{std_msg.acc_id}"
# 订单消息处理:静默入库并更新状态,但不触发 AI 回复
if "[系统订单信息]" in (std_msg.content or ""):
await self._handle_order_packet(platform, std_msg)
logger.info(f"[订单消息] user={user_id} acc={std_msg.acc_id} 已入库更新状态")
await repo.save_chat(platform, user_id, std_msg.content, "in", acc_id=std_msg.acc_id)
return
preview = (std_msg.content or "").replace("\n", "\\n")
if len(preview) > 120:
preview = preview[:120] + "..."
logger.info(
f"[监听消息] dir={direction} user={user_id} acc={std_msg.acc_id} "
f"type={std_msg.msg_type} images={len(std_msg.image_urls)} content={preview}"
)
# 过滤心跳
if not std_msg.content.strip() and not std_msg.image_urls: return
# 如果是商家人工回复,静默入库
if direction == "out":
await repo.save_chat(platform, std_msg.user_id, std_msg.content, "out", acc_id=std_msg.acc_id)
return
# 订单消息处理:静默记录
if "[系统订单信息]" in std_msg.content:
await self._handle_order_packet(platform, std_msg)
await repo.save_chat(platform, std_msg.user_id, std_msg.content, "in", acc_id=std_msg.acc_id)
await repo.save_chat(platform, user_id, std_msg.content, "out", acc_id=std_msg.acc_id)
return
# ID 去重
@@ -66,13 +96,12 @@ class SystemOrchestrator:
if std_msg.msg_id in self._processed_msg_ids: return
self._processed_msg_ids.append(std_msg.msg_id)
# 进入防抖
user_id = std_msg.user_id
if user_id in self._debounce_tasks: self._debounce_tasks[user_id].cancel()
if user_id not in self._pending_messages: self._pending_messages[user_id] = []
self._pending_messages[user_id].append(std_msg)
# 进入防抖(使用 session_key 隔离不同店铺)
if session_key in self._debounce_tasks: self._debounce_tasks[session_key].cancel()
if session_key not in self._pending_messages: self._pending_messages[session_key] = []
self._pending_messages[session_key].append(std_msg)
self._debounce_tasks[user_id] = asyncio.create_task(self._debounced_process(user_id, platform))
self._debounce_tasks[session_key] = asyncio.create_task(self._debounced_process(session_key, user_id, platform))
except Exception as e:
logger.error(f"[Orchestrator] 处理失败: {e}")
@@ -81,15 +110,74 @@ class SystemOrchestrator:
try:
price_match = re.search(r"订单金额:金额:\s*([\d\.]+)元", msg.content)
if price_match: await repo.update_task_price(platform, msg.user_id, float(price_match.group(1)))
if "买家已付款" in msg.content: await repo.update_task_outcome(platform, msg.user_id, "deal_success")
elif any(k in msg.content for k in ["退款", "已关闭", "已取消"]): await repo.update_task_outcome(platform, msg.user_id, "refunded")
except Exception: pass
# 判定成交结果(扩大范围:已付款 或 已发货 都视为成功,用于后期 AI 话术微调)
if any(k in msg.content for k in ["买家已付款", "卖家已发货"]):
await repo.update_task_outcome(platform, msg.user_id, "deal_success")
elif any(k in msg.content for k in ["退款", "已关闭", "已取消"]):
await repo.update_task_outcome(platform, msg.user_id, "refunded")
except Exception as e:
logger.warning(f"[Orchestrator] 订单消息处理异常: {e}")
async def _debounced_process(self, user_id: str, platform: str):
async def _analyze_images_background(self, session_key: str, image_urls: List[str]):
"""后台静默分析图片,存入用户数据库用于数据标定"""
try:
from services.service_image_analyzer import image_analyzer_service
from db.customer_db import CustomerDatabase
db = CustomerDatabase()
profile = db.get_customer(session_key)
for url in image_urls:
try:
result = await image_analyzer_service.analyze(url)
result_json = json.dumps(result, ensure_ascii=False)
# 更新最近一次分析
profile.last_image_analysis = result_json
profile.last_image_url = url
profile.last_gemini_prompt = result.get("gemini_prompt", "")
profile.last_aspect_ratio = result.get("aspect_ratio", "1:1")
profile.last_perspective = result.get("perspective", "no")
# 追加到历史记录保留最近20条
if profile.image_analysis_history is None:
profile.image_analysis_history = []
profile.image_analysis_history.append(result_json)
if len(profile.image_analysis_history) > 20:
profile.image_analysis_history = profile.image_analysis_history[-20:]
# 更新复杂度历史
complexity = result.get("complexity", "normal")
if profile.complexity_history is None:
profile.complexity_history = []
profile.complexity_history.append(complexity)
if len(profile.complexity_history) > 10:
profile.complexity_history = profile.complexity_history[-10:]
# 更新图片类型历史
proc_type = result.get("proc_type", "")
if proc_type and profile.image_type_history is not None:
if proc_type not in profile.image_type_history:
profile.image_type_history.append(proc_type)
logger.debug(f"[ImageAnalysis] session={session_key} 分析完成: {result.get('subject', '?')} | {complexity}")
except Exception as e:
logger.warning(f"[ImageAnalysis] 单张图片分析失败: {e}")
continue
# 保存更新
db.save_customer(profile)
logger.info(f"[ImageAnalysis] session={session_key} 分析结果已保存到数据库")
except Exception as e:
logger.warning(f"[ImageAnalysis] 后台分析失败: {e}")
async def _debounced_process(self, session_key: str, user_id: str, platform: str):
try:
await asyncio.sleep(self._debounce_seconds)
async with self._get_user_lock(user_id):
messages = self._pending_messages.pop(user_id, [])
async with self._get_user_lock(session_key):
messages = self._pending_messages.pop(session_key, [])
if not messages: return
# A. 合并与元数据修复
@@ -108,6 +196,7 @@ class SystemOrchestrator:
msg_id=merged_msg_id,
user_id=user_id,
content=combined_content,
msg_type=messages[-1].msg_type,
image_urls=all_image_urls,
acc_id=acc_id,
acc_type=acc_type
@@ -116,17 +205,22 @@ class SystemOrchestrator:
# B. 持久化
db_content = combined_content
if all_image_urls: db_content = f"【系统:已收到{len(all_image_urls)}张图】\n{combined_content}"
await repo.save_chat(platform, user_id, db_content, "in", acc_id=acc_id)
await repo.save_chat(platform, user_id, db_content, "in", acc_id=acc_id, image_urls=all_image_urls)
# B2. 后台图片分析(不阻塞主流程,用于数据标定)
if all_image_urls:
asyncio.create_task(self._analyze_images_background(session_key, all_image_urls))
# C. 冷却检查:如果 60秒内发过转接,告诉大脑已处于转接中
is_in_cooldown = (time.time() - self._last_transfer_time.get(user_id, 0)) < 60
# C. 冷却检查:如果转接冷却期内发过转接,告诉大脑"已处于转接中"
is_in_cooldown = (time.time() - self._last_transfer_time.get(session_key, 0)) < TRANSFER_COOLDOWN_SEC
# D. 思考
history = await repo.get_chat_history(user_id, limit=10)
history = await repo.get_chat_history(user_id, limit=10, acc_id=acc_id)
if history and history[-1]['content'] == db_content: history = history[:-1]
# 如果在冷却中,在当前消息里注入“当前已在转接中”的信息
if is_in_cooldown:
# 只在“明确又要转接”时注入冷却提示,普通问候/新需求不注入
transfer_intent = self._has_transfer_intent(combined_content)
if is_in_cooldown and transfer_intent:
final_msg.content = f"【系统:当前已向设计师发出转接请求,请勿再次调用转接工具】\n{final_msg.content}"
std_res = await self.brain.think_and_reply(final_msg, history=history)
@@ -139,7 +233,7 @@ class SystemOrchestrator:
await repo.save_chat(platform, user_id, std_res.reply_content, "out", acc_id=acc_id)
if "[转移会话]" in std_res.reply_content:
self._last_transfer_time[user_id] = time.time()
self._last_transfer_time[session_key] = time.time()
except asyncio.CancelledError: pass
except Exception as e: logger.exception(f"[Orchestrator] 处理失败: {e}")

View File

@@ -11,6 +11,25 @@ logger = logging.getLogger("cs_agent")
from core.skill_manager import skill_manager
def _clip(text: str, limit: int = 1200) -> str:
if text is None:
return ""
text = str(text)
if len(text) <= limit:
return text
return f"{text[:limit]}...(截断, 共{len(text)}字)"
def _fmt_time(ts: Any) -> str:
s = str(ts or "").strip()
if not s:
return "--:--:--"
if " " in s:
return s.split(" ", 1)[1]
return s
class CustomerServiceBrain:
"""
重构后的单一 Agent 大脑:
@@ -27,27 +46,38 @@ class CustomerServiceBrain:
provider=OpenAIProvider(api_key=self.api_key, base_url=self.base_url)
)
all_skills = skill_manager.get_all_skills_text()
exclude_names = os.getenv("SKILL_EXCLUDE_FROM_PROMPT", "pricing-skill")
excluded_skills = [s.strip().lower() for s in exclude_names.split(",") if s.strip()]
all_skills = skill_manager.get_all_skills_text(exclude=excluded_skills)
logger.info(f"[SkillManager] 已从提示词排除技能: {excluded_skills}")
# --- 统一口径后的 System Prompt ---
system_prompt = (
"你是一位专注【高清修复】和【找原图】的专业店主。性格干脆,说话高端、专业。\n\n"
"你是一位专注【高清修复】和【找原图】的专业店主。性格干脆,说话自然、专业。\n\n"
"【统一称呼规范】\n"
"1. 严禁使用'师傅''客服''专员'等词汇!\n"
"2. 必须统一称呼为【设计师】。比如:'设计师看下''设计师马上来''等设计师核价'\n\n"
"1. 严禁使用'师傅''客服''专员'等词汇!必须统一称为【设计师】。\n"
"2. 未转接前,用第一人称(我/我这边)。例如:'我叫设计师看下'\n\n"
"【核心逻辑】\n"
"1. 业务:只聊高清修复和找原图。引导发图 -> 问需求 -> 找设计师。\n"
"2. 下线安抚:如果工具返回 'ERROR_NO_DESIGNER_ONLINE',说明设计师们【下班/下线】了。回:'亲亲,设计师现在下班啦,需求我先记下,明天第一时间回您哈!'\n"
"3. 正在转接中:如果看到系统提示已在转接,回:'设计师正在赶来,我再帮你催下哈!'\n"
"4. 没转接时:引导发图 -> 问需求 -> 调工具转人工。\n\n"
"5. 语气:淘宝亲切风,多用'亲亲''铁子'。每句回复【严禁超过15字】\n\n"
"【必杀令】\n"
"1. 每句回复严禁超过15个字\n"
"1. 业务:只聊高清修复和找原图。核心链路:引导发图 -> 问需求 -> 找设计师。\n"
"2. **主动引导(关键)**:如果客户【没发图】就问能不能做、问收费,你必须回:'亲亲先发图我看下哈'\n"
"3. **非业务问题**:如果客户问招聘、合作、闲聊等与做图无关的话题,礼貌拒绝:'亲亲咱这边只做图哦,暂不招人哈'\n"
"4. **客户说没有参考图**:如果客户明确说'没有图''找不到''想让你们帮找',直接转人工:'好的,我这就叫设计师帮您找哈'\n"
"5. **客户问尺寸/能否打印/退款**:这类问题需要设计师判断,直接转人工:'这个设计师帮您看下哈'\n"
"6. 转接时机:收到图片并明确需求后,立即调用转人工工具,并告知:'收到,正在呼叫设计师核价,稍等哈'\n"
"7. **下线安抚(重要)**:只有当【本次】工具返回 'ERROR_NO_DESIGNER_ONLINE' 时才能说下班。不能根据历史对话或自己猜测说下班!\n"
"8. 正在转接中:如果系统提示已在转接,回:'设计师正在赶来,我再帮你催下哈!'\n"
"9. **每次转接必须调用工具**:不要根据之前的结果猜测,每次需要转接都必须重新调用工具检查设计师是否在线。\n\n"
"【必杀令 - 严格遵守】\n"
"1. 每句回复严禁超过15个字语气淘宝亲切风多用''''\n"
"2. 严禁报价,严禁复读图片已收到的情况。\n"
"3. 必须原样输出工具返回的'正在为您转接|'指令。\n\n"
"3. 必须原样输出工具返回的'正在为您转接|'指令。\n"
"4. **严禁**说'在呢铁子'!只能说'在呢''在呢亲'\n"
"5. **严禁**重复发送相同内容!如果刚说过的话,换一种说法。\n"
"6. **严禁**输出任何代码、标记、括号等乱码!只输出自然语言。\n"
"7. **严禁**自己臆造'下班'只有工具返回ERROR才能说下班。\n\n"
f"业务参考:\n{all_skills}"
)
@@ -57,26 +87,70 @@ class CustomerServiceBrain:
async def think_and_reply(self, msg: StandardMessage, history: List[dict] = []) -> StandardResponse:
try:
# 构造增强上下文(强灌输)
# 构造增强上下文
user_content = msg.content
if msg.image_urls:
user_content = f"【系统通知:收到客户 {len(msg.image_urls)} 张图】\n{user_content}"
recent_context = ""
if history:
lines = [f"{('客户' if h['role']=='user' else '')}{h['content']}" for h in history[-6:]]
lines = [
f"[{_fmt_time(h.get('timestamp'))}] {('客户' if h['role']=='user' else '')}{h['content']}"
for h in history[-6:]
]
recent_context = "【近期对话回顾】\n" + "\n".join(lines) + "\n----------------\n"
full_input = f"{recent_context}现在的对话:{user_content}"
logger.info(
f"[PROMPT->AI] user={msg.user_id} acc={msg.acc_id} images={len(msg.image_urls)}\n"
f"{_clip(full_input)}"
)
result = await self.agent.run(full_input, message_history=history)
if hasattr(result, 'data') and isinstance(result.data, str):
reply_text = result.data
elif hasattr(result, 'output') and isinstance(result.output, str):
reply_text = result.output
else:
reply_text = str(result.data) if hasattr(result, 'data') else "在呢铁子。"
# --- 终极修复:强制截获工具返回的转接指令 ---
reply_text = ""
# pydantic-ai 1.x 使用 result.output旧版 0.x 使用 result.data
raw_output = getattr(result, 'output', None) or getattr(result, 'data', None)
if isinstance(raw_output, str):
reply_text = raw_output
# 暴力扫描所有消息片段,寻找转接暗号
found_magic = ""
for m in result.all_messages():
if hasattr(m, 'parts'):
for part in m.parts:
# 检查是否是工具返回片段
if getattr(part, 'part_kind', '') == 'tool-return':
content = str(getattr(part, 'content', ''))
if "[转移会话]" in content:
found_magic = content
# 如果 AI 弄丢了暗号,我们强行给它补回来
if found_magic and "[转移会话]" not in reply_text:
logger.info(f"[Brain] 检测到 AI 弄丢了转接暗号,正在强制恢复: {found_magic[:30]}...")
reply_text = found_magic
# ----------------------------------------
# 清理可能的乱码/代码标记
import re
reply_text = re.sub(r'\[\]<\|[^|]+\|>', '', reply_text) # 清理 []<|xxx|>
reply_text = re.sub(r'<\|[^|]+\|>', '', reply_text) # 清理 <|xxx|>
reply_text = re.sub(r'\[Function[^\]]*\]', '', reply_text) # 清理 [FunctionXxx]
reply_text = re.sub(r'<think[^>]*>.*', '', reply_text, flags=re.DOTALL) # 清理 <think_xxx>内部思考泄漏
reply_text = re.sub(r'</?think[^>]*>', '', reply_text) # 清理 think 标签
reply_text = re.sub(r'```[^`]*```', '', reply_text) # 清理代码块
reply_text = re.sub(r'\{["\'][^}]+\}', '', reply_text) # 清理 JSON
reply_text = reply_text.strip()
# 过滤"在呢铁子"
if "在呢铁子" in reply_text:
reply_text = reply_text.replace("在呢铁子", "在呢亲")
if not reply_text:
reply_text = "稍等我看看。"
logger.info(f"[THINK/RAW_OUTPUT] user={msg.user_id}\n{_clip(reply_text)}")
need_transfer = "[转移会话]" in reply_text

View File

@@ -18,24 +18,33 @@ class DataRepository:
# --- 聊天记录 (异步化) ---
async def save_chat(self, platform: str, user_id: str, content: str, direction: str, acc_id: str = ""):
async def save_chat(self, platform: str, user_id: str, content: str, direction: str, acc_id: str = "", image_urls: list = None):
"""异步持久化存储聊天记录"""
# 将图片URL列表转为\n分隔的字符串
urls_str = "\n".join(image_urls) if image_urls else ""
return await asyncio.to_thread(
log_message,
customer_id=user_id,
message=content,
direction=direction,
platform=platform,
acc_id=acc_id
acc_id=acc_id,
image_urls=urls_str
)
async def get_chat_history(self, user_id: str, limit: int = 10) -> List[dict]:
async def get_chat_history(self, user_id: str, limit: int = 10, acc_id: str = "") -> List[dict]:
"""异步获取历史记录"""
rows = await asyncio.to_thread(get_conversation, user_id, limit=limit)
rows = await asyncio.to_thread(get_conversation, user_id, limit=limit, acc_id=acc_id)
history = []
for r in rows:
role = "user" if r["direction"] == "in" else "assistant"
history.append({"role": role, "content": r["message"]})
history.append(
{
"role": role,
"content": r["message"],
"timestamp": r.get("timestamp", ""),
}
)
return history
# --- 客户相关 (异步化) ---

View File

@@ -9,6 +9,7 @@ class StandardMessage(BaseModel):
user_id: str # 发送者唯一ID
user_name: str = "" # 发送者昵称
content: str # 消息文本内容
msg_type: int = 0 # 消息类型0 文本, 1 图片, 2 语音等
image_urls: List[str] = [] # 提取出来的图片链接
acc_id: str = "" # 商家/店铺账号ID
acc_type: str = "" # 平台类型标识

View File

@@ -48,9 +48,11 @@ class SkillManager:
parts.append(f"### 技能:{name}\n{content}")
return "\n\n".join(parts)
def get_all_skills_text(self) -> str:
def get_all_skills_text(self, exclude: Optional[List[str]] = None) -> str:
"""获取所有技能的合集(用于全能大脑模式)"""
return self.compose_skills(list(self._skill_cache.keys()))
exclude_set = {n.lower() for n in (exclude or [])}
names = [n for n in self._skill_cache.keys() if n not in exclude_set]
return self.compose_skills(names)
# 全局单例
skill_manager = SkillManager()

View File

@@ -7,11 +7,16 @@ import asyncio
import logging
from typing import Optional, Dict
from datetime import datetime
from .websocket_client import QingjianAPIClient
from .websocket_client_v2 import QingjianAPIClient
from db.task_db.task_model import get_task_manager, TaskStatus, TaskPriority
logger = logging.getLogger(__name__)
# 配置常量
TIMEOUT_CHECK_INTERVAL_SEC = 300 # 超时检查间隔5分钟
ERROR_RETRY_DELAY_SEC = 60 # 错误后重试延迟1分钟
QUEUE_POLL_INTERVAL_SEC = 1 # 队列轮询间隔(秒)
class TaskScheduler:
"""任务调度器"""
@@ -54,14 +59,14 @@ class TaskScheduler:
# 通知天网任务超时
await self._notify_tianwang(task['task_id'], 'timeout')
# 每 5 分钟检查一次
await asyncio.sleep(300)
# 每隔固定时间检查一次
await asyncio.sleep(TIMEOUT_CHECK_INTERVAL_SEC)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"超时检查失败:{e}")
await asyncio.sleep(60)
await asyncio.sleep(ERROR_RETRY_DELAY_SEC)
async def _process_task_queue(self):
"""处理任务队列"""
@@ -69,8 +74,8 @@ class TaskScheduler:
while self.running:
try:
# 这里实际应该从队列获取任务
# 简化处理:每秒检查一次待触发任务
await asyncio.sleep(1)
# 简化处理:定期检查待触发任务
await asyncio.sleep(QUEUE_POLL_INTERVAL_SEC)
except Exception as e:
logger.error(f"任务队列处理失败:{e}")

View File

@@ -15,7 +15,7 @@ class QingjianAPIClient:
重构后的轻简API客户端 (协议全复刻版)
"""
def __init__(self, uri=None, enable_agent: bool = True):
def __init__(self, uri=None, enable_agent: bool = True, worker_id: int = -1, worker_count: int = 1):
from config.config import QINGJIAN_WS_URI
self.uri = uri or QINGJIAN_WS_URI
self.websocket = None
@@ -23,6 +23,12 @@ class QingjianAPIClient:
self.logger = logger
self.enable_agent = enable_agent
# 多进程分片逻辑
self.worker_id = worker_id
self.worker_count = worker_count
if self.worker_id >= 0:
logger.info(f"[WebSocket] 启用分片模式: Worker {self.worker_id}/{self.worker_count}")
# 初始化新架构总指挥部
self.orchestrator = init_orchestrator(ws_client=self)
logger.info("[WebSocket] 新架构 Orchestrator 已就绪。")
@@ -36,13 +42,35 @@ class QingjianAPIClient:
async def receive_messages(self):
await receive_messages_flow(self)
def _should_handle(self, customer_id: str) -> bool:
"""分片判定:这个客户归我管吗?"""
if self.worker_id < 0 or self.worker_count <= 1:
return True
# 如果没有 customer_id为了安全起见只让 Worker 0 处理
if not customer_id:
return self.worker_id == 0
import hashlib
# 使用稳定的哈希算法分配客户
hash_val = int(hashlib.md5(str(customer_id).encode("utf-8")).hexdigest(), 16)
return (hash_val % self.worker_count) == self.worker_id
async def handle_message(self, message):
"""收到消息处理"""
try:
data = json.loads(message)
# 预提取客户ID用于分片判定
customer_id = str(data.get("cy_id") or data.get("from_id") or "")
if not self._should_handle(customer_id):
return
await self.orchestrator.on_raw_message_received(platform="qianniu", raw_data=data)
except Exception as e:
logger.error(f"[WebSocket] 处理消息异常: {e}")
raw_preview = str(message).replace("\n", "\\n")
if len(raw_preview) > 300:
raw_preview = raw_preview[:300] + "..."
logger.error(f"[WebSocket] 处理消息异常: {e} raw={raw_preview}")
async def send(self, customer_id: str, acc_id: str, acc_type: str, content: str, msg_type: int = 0):
"""