fix: block dense-text table jobs and prevent duplicate quote races
This commit is contained in:
@@ -100,6 +100,8 @@ class QingjianAPIClient:
|
||||
self._pending_image_tasks: dict = {}
|
||||
self._system_inquiry_rules = self._load_system_inquiry_rules()
|
||||
self._last_reply_sent_at: dict = {} # customer_key -> monotonic ts
|
||||
self._inbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入)
|
||||
self._outbound_log_seen: dict = {} # signature -> monotonic ts(防重复写入)
|
||||
|
||||
# 延迟加载任务模块(避免循环导入)
|
||||
self.task_scheduler = None
|
||||
@@ -206,6 +208,67 @@ class QingjianAPIClient:
|
||||
|
||||
task.add_done_callback(_done)
|
||||
|
||||
@staticmethod
|
||||
def _prune_seen(seen: dict, now_mono: float, ttl_sec: float = 8.0):
|
||||
if len(seen) <= 2000:
|
||||
return
|
||||
stale = [k for k, t in seen.items() if (now_mono - t) > ttl_sec]
|
||||
for k in stale:
|
||||
seen.pop(k, None)
|
||||
|
||||
def _log_inbound_once(self, data: dict):
|
||||
"""统一记录入站消息,短窗口去重,避免多分支重复写库。"""
|
||||
try:
|
||||
cid = data.get("from_id", "")
|
||||
if not cid:
|
||||
return
|
||||
msg = self.to_chinese(data.get("msg", "") or "")
|
||||
acc_id = data.get("acc_id", "")
|
||||
mtype = int(data.get("msg_type", 0) or 0)
|
||||
now_mono = time.monotonic()
|
||||
sig = f"{acc_id}|{cid}|{mtype}|{msg}"
|
||||
last = self._inbound_log_seen.get(sig, 0.0)
|
||||
if (now_mono - last) < 2.0:
|
||||
return
|
||||
self._inbound_log_seen[sig] = now_mono
|
||||
self._prune_seen(self._inbound_log_seen, now_mono, ttl_sec=8.0)
|
||||
_chat_log(
|
||||
cid,
|
||||
msg,
|
||||
"in",
|
||||
customer_name=self.to_chinese(data.get("from_name", "") or data.get("cy_name", "")),
|
||||
acc_id=acc_id,
|
||||
platform=data.get("acc_type", ""),
|
||||
msg_type=mtype,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _log_outbound_once(self, original_msg: dict, reply_content: str):
|
||||
"""统一记录出站消息,短窗口去重,避免重复写库。"""
|
||||
try:
|
||||
cid = original_msg.get("from_id", "")
|
||||
if not cid or not reply_content:
|
||||
return
|
||||
acc_id = original_msg.get("acc_id", "")
|
||||
now_mono = time.monotonic()
|
||||
sig = f"{acc_id}|{cid}|{reply_content}"
|
||||
last = self._outbound_log_seen.get(sig, 0.0)
|
||||
if (now_mono - last) < 2.0:
|
||||
return
|
||||
self._outbound_log_seen[sig] = now_mono
|
||||
self._prune_seen(self._outbound_log_seen, now_mono, ttl_sec=8.0)
|
||||
_chat_log(
|
||||
cid,
|
||||
reply_content,
|
||||
"out",
|
||||
customer_name=self.to_chinese(original_msg.get("from_name", "") or original_msg.get("cy_name", "")),
|
||||
acc_id=acc_id,
|
||||
platform=original_msg.get("acc_type", ""),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def receive_messages(self):
|
||||
"""持续接收消息"""
|
||||
try:
|
||||
@@ -272,23 +335,12 @@ class QingjianAPIClient:
|
||||
if not from_id or from_id == 'N/A' or not acc_id or acc_id == 'N/A':
|
||||
print(f"[{self.get_time()}] 空消息跳过(from_id={from_id!r} acc_id={acc_id!r})")
|
||||
return
|
||||
self._log_inbound_once(data)
|
||||
|
||||
# Gemini 店铺:不回复,直接跳过
|
||||
goods_name = self.to_chinese(data.get('goods_name', '') or '')
|
||||
if _get_shop_type(acc_id, goods_name) == "gemini_api":
|
||||
print(f"[{self.get_time()}] Gemini 店铺消息,跳过")
|
||||
try:
|
||||
_chat_log(
|
||||
data.get('from_id', ''),
|
||||
self.to_chinese(data.get('msg', '')),
|
||||
"in",
|
||||
customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')),
|
||||
acc_id=data.get('acc_id', ''),
|
||||
platform=data.get('acc_type', ''),
|
||||
msg_type=data.get('msg_type', 0),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
from utils.wechat_chat_log import push_chat_to_wechat
|
||||
asyncio.create_task(push_chat_to_wechat(
|
||||
@@ -459,7 +511,10 @@ class QingjianAPIClient:
|
||||
def _msg_is_requirement(self, msg: str) -> bool:
|
||||
if not msg:
|
||||
return False
|
||||
kws = ("要", "抓到", "放到", "合成", "替换", "抠", "修", "高清", "尺寸", "横", "竖", "颜色", "去背景", "排版", "一样", "类似", "同款")
|
||||
kws = (
|
||||
"要", "抓到", "放到", "合成", "替换", "抠", "修", "高清", "尺寸", "横", "竖", "颜色", "去背景", "排版", "一样", "类似", "同款",
|
||||
"能不能做", "能做吗", "可以做吗", "做不做", "这个能做吗", "这个能不能做",
|
||||
)
|
||||
return any(k in msg for k in kws)
|
||||
|
||||
def _add_pending_images(self, key: str, urls: list, limit: int = 12):
|
||||
@@ -507,6 +562,14 @@ class QingjianAPIClient:
|
||||
from image.image_analyzer import image_analyzer
|
||||
r = await image_analyzer.analyze(url)
|
||||
if isinstance(r, dict) and r.get("success", False):
|
||||
if r.get("feasibility") == "no" or r.get("risk") == "high":
|
||||
note = str(r.get("note", "") or "")
|
||||
if "文字内容过于密集" in note or "密集文字" in note:
|
||||
reply = "这类文字太密的图我们这边不接单,抱歉哈。你要是简化后再发我可以继续看。"
|
||||
else:
|
||||
reply = "这张处理风险比较高,我这边先不直接接,建议转人工评估更稳。"
|
||||
await self.send_reply(data, reply)
|
||||
return
|
||||
from config.config import MIN_PRICE_FLOOR
|
||||
p = r.get("price_suggest", 20)
|
||||
floor_dyn = r.get("price_min", MIN_PRICE_FLOOR)
|
||||
@@ -521,17 +584,6 @@ class QingjianAPIClient:
|
||||
else:
|
||||
reply = "这张我看了,先按20元给你做"
|
||||
await self.send_reply(data, reply)
|
||||
try:
|
||||
_chat_log(
|
||||
data.get('from_id', ''),
|
||||
reply,
|
||||
"out",
|
||||
customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')),
|
||||
acc_id=data.get('acc_id', ''),
|
||||
platform=data.get('acc_type', '')
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -543,15 +595,6 @@ class QingjianAPIClient:
|
||||
_name = self.to_chinese(data.get('from_name', '') or data.get('cy_name', ''))
|
||||
_plat = data.get('acc_type', '')
|
||||
|
||||
# 记录客户来消息
|
||||
if _cid and msg_text:
|
||||
try:
|
||||
_chat_log(_cid, msg_text, "in", customer_name=_name,
|
||||
acc_id=data.get('acc_id', ''),
|
||||
platform=_plat, msg_type=data.get('msg_type', 0))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 超大尺寸(米制)直接拒单,避免进入报价/处理流程
|
||||
oversize_reply = self._oversize_reply_if_needed(msg_text)
|
||||
if oversize_reply:
|
||||
@@ -618,11 +661,11 @@ class QingjianAPIClient:
|
||||
if self._msg_is_requirement(msg_text) or self._msg_is_price_inquiry(msg_text):
|
||||
key = self._customer_key(data)
|
||||
if self._pending_images.get(key):
|
||||
await self.send_reply(data, "稍等,我把刚才那几张一起看下")
|
||||
await self._flush_pending_images(key, data)
|
||||
old = self._pending_image_tasks.get(key)
|
||||
if old and not old.done():
|
||||
old.cancel()
|
||||
await self.send_reply(data, "稍等,我把刚才那几张一起看下")
|
||||
await self._flush_pending_images(key, data)
|
||||
return
|
||||
if self._msg_is_price_inquiry(msg_text):
|
||||
recent_urls = self._collect_recent_image_urls(_cid, data.get('acc_id', ''), max_count=6)
|
||||
@@ -826,17 +869,6 @@ class QingjianAPIClient:
|
||||
else:
|
||||
reply = f"这组{count}张我看了,按{avg_price}元一张;复杂那张{top_price}元,满意再拍"
|
||||
await self.send_reply(data, reply)
|
||||
try:
|
||||
_chat_log(
|
||||
data.get('from_id', ''),
|
||||
reply,
|
||||
"out",
|
||||
customer_name=self.to_chinese(data.get('from_name', '') or data.get('cy_name', '')),
|
||||
acc_id=data.get('acc_id', ''),
|
||||
platform=data.get('acc_type', '')
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"多图分析失败: {e}")
|
||||
try:
|
||||
@@ -1361,7 +1393,7 @@ class QingjianAPIClient:
|
||||
"msg_type": 0,
|
||||
"cy_name": customer_name
|
||||
}
|
||||
|
||||
self._log_outbound_once(original_msg, str(reply_content))
|
||||
await self.send_message(reply)
|
||||
|
||||
async def send_text(self, cy_id, acc_type, content):
|
||||
|
||||
@@ -241,6 +241,7 @@ class ImageAnalyzer:
|
||||
|
||||
DENSE_TEXT_SUBJECT_KEYWORDS = (
|
||||
"宣传栏", "公告栏", "展板", "海报墙", "通知栏", "知识栏", "制度牌", "公示栏", "墙报", "密密麻麻",
|
||||
"表格", "检索表", "配伍表", "药物配伍", "课程表", "流程表", "说明表", "数据表",
|
||||
"word wall", "poster wall", "bulletin board",
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user