feat: alert wecom when no designer is available

This commit is contained in:
2026-03-13 10:42:14 +08:00
parent 5a38fa9e6c
commit 5b36693c2e
4 changed files with 137 additions and 1 deletions

View File

@@ -8,7 +8,7 @@ import sqlite3
import os import os
import threading import threading
from queue import Queue, Empty from queue import Queue, Empty
from datetime import datetime from datetime import datetime, timedelta
from typing import List, Dict, Optional from typing import List, Dict, Optional
_DB_PATH = os.path.join(os.path.dirname(__file__), "chat_log_db", "chats.db") _DB_PATH = os.path.join(os.path.dirname(__file__), "chat_log_db", "chats.db")
@@ -527,6 +527,49 @@ def get_latest_messages(limit: int = 20) -> List[Dict]:
return [dict(r) for r in rows] return [dict(r) for r in rows]
def get_waiting_customer_pool(window_minutes: int = 30) -> Dict:
"""统计最近窗口内、最后一条消息仍来自客户的待接待客户池。"""
cutoff = (datetime.now() - timedelta(minutes=max(window_minutes, 1))).strftime("%Y-%m-%d %H:%M:%S")
with _get_conn() as conn:
rows = conn.execute(_sql("""
SELECT id, customer_id, acc_id, direction, timestamp
FROM chat_logs
WHERE timestamp >= ?
AND customer_id <> ''
AND customer_id <> 'unknown'
AND acc_id <> ''
ORDER BY id DESC
"""), (cutoff,)).fetchall()
latest_by_session = {}
for row in rows:
item = dict(row)
key = (str(item.get("customer_id") or ""), str(item.get("acc_id") or ""))
if key not in latest_by_session:
latest_by_session[key] = item
per_shop: Dict[str, int] = {}
waiting_sessions = 0
for item in latest_by_session.values():
if str(item.get("direction") or "") != "in":
continue
acc_id = str(item.get("acc_id") or "")
if not acc_id:
continue
per_shop[acc_id] = per_shop.get(acc_id, 0) + 1
waiting_sessions += 1
shops = [
{"acc_id": acc_id, "waiting_customers": count}
for acc_id, count in sorted(per_shop.items(), key=lambda kv: (-kv[1], kv[0]))
]
return {
"total_waiting_customers": waiting_sessions,
"shops": shops,
"window_minutes": window_minutes,
}
# ========== 订单相关 ========== # ========== 订单相关 ==========
@_retry_db_operation @_retry_db_operation

View File

@@ -160,3 +160,15 @@ def retry_pending_transfer(row_id: int, delay_seconds: int = 60, error: str = ""
(next_s, now_s, error[:500], row_id), (next_s, now_s, error[:500], row_id),
) )
conn.commit() conn.commit()
def count_open_pending_transfers() -> int:
with _get_conn() as conn:
row = conn.execute(
"""
SELECT COUNT(*) AS cnt
FROM pending_transfers
WHERE status IN ('pending', 'processing')
"""
).fetchone()
return int(row["cnt"] or 0) if row else 0

View File

@@ -3,6 +3,7 @@ import logging
import httpx import httpx
import asyncio import asyncio
from typing import Optional from typing import Optional
from services.service_designer_alert import designer_alert_service
logger = logging.getLogger("cs_agent") logger = logging.getLogger("cs_agent")
@@ -53,6 +54,10 @@ class DispatchService:
return designer return designer
logger.warning(f"[Dispatch]{u_tag} 派单被拒: {data.get('reason')} body={body}") logger.warning(f"[Dispatch]{u_tag} 派单被拒: {data.get('reason')} body={body}")
await designer_alert_service.notify_if_needed(
trigger=f"dispatch_rejected:{data.get('reason') or 'unknown'}",
customer_id=user_id,
)
return None return None
if response.status_code == 401: if response.status_code == 401:

View File

@@ -0,0 +1,76 @@
import os
import time
import logging
from datetime import datetime
from typing import Dict
from db.chat_log_db import get_waiting_customer_pool
from db.pending_transfer_db import count_open_pending_transfers
from services.service_wecom_bot import wecom_bot_service
logger = logging.getLogger("cs_agent")
DESIGNER_ALERT_START_HOUR = int(os.getenv("DESIGNER_ALERT_START_HOUR", "8"))
DESIGNER_ALERT_END_HOUR = int(os.getenv("DESIGNER_ALERT_END_HOUR", "24"))
DESIGNER_ALERT_COOLDOWN_SECONDS = int(os.getenv("DESIGNER_ALERT_COOLDOWN_SECONDS", "300"))
DESIGNER_ALERT_POOL_WINDOW_MINUTES = int(os.getenv("DESIGNER_ALERT_POOL_WINDOW_MINUTES", "30"))
class DesignerAlertService:
def __init__(self):
self._last_alert_at = 0.0
@staticmethod
def _in_active_window(now: datetime) -> bool:
hour = now.hour
return DESIGNER_ALERT_START_HOUR <= hour < DESIGNER_ALERT_END_HOUR
@staticmethod
def _render_shop_lines(pool: Dict) -> str:
shops = pool.get("shops") or []
if not shops:
return "- 暂无店铺明细"
lines = []
for item in shops[:10]:
acc_id = str(item.get("acc_id") or "")
waiting = int(item.get("waiting_customers") or 0)
lines.append(f"- {acc_id}{waiting}")
return "\n".join(lines)
async def notify_if_needed(self, *, trigger: str = "", customer_id: str = "", acc_id: str = "") -> bool:
now = datetime.now()
if not self._in_active_window(now):
return False
now_ts = time.time()
if now_ts - self._last_alert_at < max(DESIGNER_ALERT_COOLDOWN_SECONDS, 30):
return False
pending_count = count_open_pending_transfers()
pool = get_waiting_customer_pool(DESIGNER_ALERT_POOL_WINDOW_MINUTES)
waiting_total = int(pool.get("total_waiting_customers") or 0)
if pending_count <= 0 and waiting_total <= 0:
return False
content = (
"【设计师在线提醒】\n"
f"当前时间:{now.strftime('%Y-%m-%d %H:%M:%S')}\n"
"8点到24点内检测到暂无设计师接单有客户待转接。\n"
f"触发来源:{trigger or '-'}\n"
f"当前会话:{customer_id or '-'} / {acc_id or '-'}\n"
f"待转接池:{pending_count}\n"
f"当前客户池:{waiting_total}人(近{pool.get('window_minutes') or DESIGNER_ALERT_POOL_WINDOW_MINUTES}分钟最后一条仍是客户消息)\n"
"店铺分布:\n"
f"{self._render_shop_lines(pool)}\n"
"群里如果有设计师在线,麻烦看一下。"
)
ok = await wecom_bot_service.send_text(content)
if ok:
self._last_alert_at = now_ts
logger.info(
f"[DesignerAlert] 已发送企微提醒 trigger={trigger} pending={pending_count} waiting={waiting_total}"
)
return ok
designer_alert_service = DesignerAlertService()