Files
tw/utils/daily_summary.py
2026-03-06 12:44:57 +08:00

290 lines
9.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
每日数据汇总 - 每天 23:59 自动发送到企业微信
"""
import os
import asyncio
import logging
from datetime import datetime, time as dtime
from collections import defaultdict
import httpx
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger("cs_agent")
async def generate_daily_report() -> str:
"""生成今日数据报告"""
import pymysql
# 从环境变量读取数据库配置
MYSQL_HOST = os.getenv("MYSQL_HOST", "127.0.0.1")
MYSQL_PORT = int(os.getenv("MYSQL_PORT", "3306"))
MYSQL_USER = os.getenv("MYSQL_USER", "root")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "")
MYSQL_DATABASE = os.getenv("MYSQL_DATABASE", "ai_cs")
try:
conn = pymysql.connect(
host=MYSQL_HOST,
port=MYSQL_PORT,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
database=MYSQL_DATABASE,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
except Exception as e:
logger.error(f"[DailySummary] 数据库连接失败: {e}")
return f"❌ 数据库连接失败: {e}"
try:
cur = conn.cursor()
today = datetime.now().strftime('%Y-%m-%d')
# 1. 基础统计
cur.execute(f"SELECT COUNT(DISTINCT customer_id) as cnt FROM chat_logs WHERE DATE(timestamp) = '{today}'")
total_customers = cur.fetchone()['cnt']
cur.execute(f"SELECT COUNT(*) as cnt FROM chat_logs WHERE DATE(timestamp) = '{today}'")
total_msgs = cur.fetchone()['cnt']
cur.execute(f"SELECT direction, COUNT(*) as cnt FROM chat_logs WHERE DATE(timestamp) = '{today}' GROUP BY direction")
dir_stats = {r['direction']: r['cnt'] for r in cur.fetchall()}
# 2. 获取所有对话
cur.execute(f'''
SELECT customer_id, customer_name, message, direction, timestamp
FROM chat_logs
WHERE DATE(timestamp) = '{today}'
ORDER BY customer_id, timestamp
''')
rows = cur.fetchall()
# 按客户分组
customers = defaultdict(list)
for r in rows:
cid = r['customer_id'] or 'unknown'
customers[cid].append(r)
# 统计
transferred = 0
paid = 0
shipped = 0
closed = 0
new_orders = 0
customer_details = []
for cid, msgs in customers.items():
cname = ''
has_transfer = False
has_paid = False
has_shipped = False
has_closed = False
has_new = False
need = ''
for m in msgs:
if m['customer_name']:
cname = m['customer_name']
msg = m['message'] or ''
if '[转移会话]' in msg:
has_transfer = True
if '买家已付款' in msg:
has_paid = True
if '卖家已发货' in msg:
has_shipped = True
if '订单关闭' in msg or '已取消' in msg:
has_closed = True
if '新订单' in msg:
has_new = True
# 识别需求
if m['direction'] == 'in' and not need:
if '找原图' in msg or '原图' in msg:
need = '找原图'
elif '修复' in msg or '高清' in msg:
need = '高清修复'
elif '找图' in msg:
need = '找图'
if has_transfer:
transferred += 1
if has_paid:
paid += 1
if has_shipped:
shipped += 1
if has_closed:
closed += 1
if has_new:
new_orders += 1
# 状态
if has_shipped:
status = '✅已发货'
elif has_paid:
status = '💰已付款'
elif has_closed:
status = '❌已关闭'
elif has_transfer:
status = '→已转接'
else:
status = '💬对话中'
display_name = cname if cname and cname != 'unknown' else cid[:12]
if display_name != 'unknown' and cid != 'unknown':
customer_details.append((display_name, need or '-', status))
# 构建报告
report = f'''📊 【{today} 客服数据日报】
📈 基础数据
• 独立客户数: {total_customers}
• 总消息数: {total_msgs}
- 客户发来: {dir_stats.get('in', 0)}
- AI/人工回复: {dir_stats.get('out', 0)}
💼 业务数据
• 新订单: {new_orders}
• 已付款: {paid}
• 已发货: {shipped}
• 订单关闭: {closed}
• 转接设计师: {transferred}
📝 客户明细 ({len(customer_details)} 条)
'''
# 按状态分组显示
shipped_list = [c for c in customer_details if '已发货' in c[2]]
paid_list = [c for c in customer_details if '已付款' in c[2]]
transfer_list = [c for c in customer_details if '已转接' in c[2]]
closed_list = [c for c in customer_details if '已关闭' in c[2]]
if shipped_list:
report += '\n✅ 已发货:\n'
for name, need, _ in shipped_list[:15]:
report += f'{name} ({need})\n'
if len(shipped_list) > 15:
report += f' ... 等共 {len(shipped_list)}\n'
if paid_list:
report += '\n💰 已付款待发货:\n'
for name, need, _ in paid_list[:10]:
report += f'{name} ({need})\n'
if transfer_list:
report += '\n→ 已转接设计师:\n'
for name, need, _ in transfer_list[:15]:
report += f'{name} ({need})\n'
if len(transfer_list) > 15:
report += f' ... 等共 {len(transfer_list)}\n'
if closed_list:
report += '\n❌ 订单关闭:\n'
for name, need, _ in closed_list[:5]:
report += f'{name}\n'
return report
except Exception as e:
logger.error(f"[DailySummary] 生成报告失败: {e}")
return f"❌ 生成报告失败: {e}"
finally:
if 'conn' in locals():
conn.close()
async def send_to_wechat(content: str) -> bool:
"""发送到企业微信"""
from config.config import WECHAT_WEBHOOK
if not WECHAT_WEBHOOK:
logger.warning("[DailySummary] 未配置企业微信 Webhook")
return False
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
WECHAT_WEBHOOK,
json={'msgtype': 'text', 'text': {'content': content}},
timeout=10
)
if resp.status_code == 200:
logger.info("[DailySummary] 日报已发送到企业微信")
return True
else:
logger.error(f"[DailySummary] 发送失败: {resp.status_code}")
return False
except Exception as e:
logger.error(f"[DailySummary] 发送异常: {e}")
return False
async def run_daily_summary():
"""执行日报任务"""
logger.info("[DailySummary] 开始生成每日日报...")
report = await generate_daily_report()
await send_to_wechat(report)
def _seconds_until(target_hour: int, target_minute: int) -> float:
"""计算距离目标时间的秒数"""
now = datetime.now()
target = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
# 如果今天的目标时间已过,则计算到明天
if now >= target:
target = target.replace(day=target.day + 1)
# 处理月末
try:
target = target
except ValueError:
# 下个月第一天
if target.month == 12:
target = target.replace(year=target.year + 1, month=1, day=1)
else:
target = target.replace(month=target.month + 1, day=1)
return (target - now).total_seconds()
async def scheduler():
"""
定时任务调度器 - 每天 23:59 执行日报
"""
logger.info("[DailySummary] 每日日报定时任务已启动,将在每天 23:59 发送")
while True:
try:
# 计算距离 23:59 的秒数
wait_seconds = _seconds_until(23, 59)
hours = int(wait_seconds // 3600)
minutes = int((wait_seconds % 3600) // 60)
logger.info(f"[DailySummary] 下次日报将在 {hours}小时{minutes}分钟 后发送")
# 等待到目标时间
await asyncio.sleep(wait_seconds)
# 执行日报
await run_daily_summary()
# 等待1分钟避免重复执行
await asyncio.sleep(60)
except asyncio.CancelledError:
logger.info("[DailySummary] 定时任务已取消")
break
except Exception as e:
logger.error(f"[DailySummary] 定时任务异常: {e}")
# 出错后等待5分钟再重试
await asyncio.sleep(300)
# 手动触发(用于测试)
async def manual_trigger():
"""手动触发日报(测试用)"""
await run_daily_summary()