""" 每日数据汇总 - 每天 23:59 自动发送到企业微信 """ import os import asyncio import logging from datetime import datetime, time as dtime from collections import defaultdict import httpx from dotenv import load_dotenv load_dotenv() logger = logging.getLogger("cs_agent") async def generate_daily_report() -> str: """生成今日数据报告""" import pymysql # 从环境变量读取数据库配置 MYSQL_HOST = os.getenv("MYSQL_HOST", "127.0.0.1") MYSQL_PORT = int(os.getenv("MYSQL_PORT", "3306")) MYSQL_USER = os.getenv("MYSQL_USER", "root") MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "") MYSQL_DATABASE = os.getenv("MYSQL_DATABASE", "ai_cs") try: conn = pymysql.connect( host=MYSQL_HOST, port=MYSQL_PORT, user=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DATABASE, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) except Exception as e: logger.error(f"[DailySummary] 数据库连接失败: {e}") return f"❌ 数据库连接失败: {e}" try: cur = conn.cursor() today = datetime.now().strftime('%Y-%m-%d') # 1. 基础统计 cur.execute(f"SELECT COUNT(DISTINCT customer_id) as cnt FROM chat_logs WHERE DATE(timestamp) = '{today}'") total_customers = cur.fetchone()['cnt'] cur.execute(f"SELECT COUNT(*) as cnt FROM chat_logs WHERE DATE(timestamp) = '{today}'") total_msgs = cur.fetchone()['cnt'] cur.execute(f"SELECT direction, COUNT(*) as cnt FROM chat_logs WHERE DATE(timestamp) = '{today}' GROUP BY direction") dir_stats = {r['direction']: r['cnt'] for r in cur.fetchall()} # 2. 获取所有对话 cur.execute(f''' SELECT customer_id, customer_name, message, direction, timestamp FROM chat_logs WHERE DATE(timestamp) = '{today}' ORDER BY customer_id, timestamp ''') rows = cur.fetchall() # 按客户分组 customers = defaultdict(list) for r in rows: cid = r['customer_id'] or 'unknown' customers[cid].append(r) # 统计 transferred = 0 paid = 0 shipped = 0 closed = 0 new_orders = 0 customer_details = [] for cid, msgs in customers.items(): cname = '' has_transfer = False has_paid = False has_shipped = False has_closed = False has_new = False need = '' for m in msgs: if m['customer_name']: cname = m['customer_name'] msg = m['message'] or '' if '[转移会话]' in msg: has_transfer = True if '买家已付款' in msg: has_paid = True if '卖家已发货' in msg: has_shipped = True if '订单关闭' in msg or '已取消' in msg: has_closed = True if '新订单' in msg: has_new = True # 识别需求 if m['direction'] == 'in' and not need: if '找原图' in msg or '原图' in msg: need = '找原图' elif '修复' in msg or '高清' in msg: need = '高清修复' elif '找图' in msg: need = '找图' if has_transfer: transferred += 1 if has_paid: paid += 1 if has_shipped: shipped += 1 if has_closed: closed += 1 if has_new: new_orders += 1 # 状态 if has_shipped: status = '✅已发货' elif has_paid: status = '💰已付款' elif has_closed: status = '❌已关闭' elif has_transfer: status = '→已转接' else: status = '💬对话中' display_name = cname if cname and cname != 'unknown' else cid[:12] if display_name != 'unknown' and cid != 'unknown': customer_details.append((display_name, need or '-', status)) # 构建报告 report = f'''📊 【{today} 客服数据日报】 📈 基础数据 • 独立客户数: {total_customers} • 总消息数: {total_msgs} - 客户发来: {dir_stats.get('in', 0)} 条 - AI/人工回复: {dir_stats.get('out', 0)} 条 💼 业务数据 • 新订单: {new_orders} 单 • 已付款: {paid} 单 • 已发货: {shipped} 单 • 订单关闭: {closed} 单 • 转接设计师: {transferred} 次 📝 客户明细 ({len(customer_details)} 条) ''' # 按状态分组显示 shipped_list = [c for c in customer_details if '已发货' in c[2]] paid_list = [c for c in customer_details if '已付款' in c[2]] transfer_list = [c for c in customer_details if '已转接' in c[2]] closed_list = [c for c in customer_details if '已关闭' in c[2]] if shipped_list: report += '\n✅ 已发货:\n' for name, need, _ in shipped_list[:15]: report += f' • {name} ({need})\n' if len(shipped_list) > 15: report += f' ... 等共 {len(shipped_list)} 单\n' if paid_list: report += '\n💰 已付款待发货:\n' for name, need, _ in paid_list[:10]: report += f' • {name} ({need})\n' if transfer_list: report += '\n→ 已转接设计师:\n' for name, need, _ in transfer_list[:15]: report += f' • {name} ({need})\n' if len(transfer_list) > 15: report += f' ... 等共 {len(transfer_list)} 人\n' if closed_list: report += '\n❌ 订单关闭:\n' for name, need, _ in closed_list[:5]: report += f' • {name}\n' return report except Exception as e: logger.error(f"[DailySummary] 生成报告失败: {e}") return f"❌ 生成报告失败: {e}" finally: if 'conn' in locals(): conn.close() async def send_to_wechat(content: str) -> bool: """发送到企业微信""" from config.config import WECHAT_WEBHOOK if not WECHAT_WEBHOOK: logger.warning("[DailySummary] 未配置企业微信 Webhook") return False try: async with httpx.AsyncClient() as client: resp = await client.post( WECHAT_WEBHOOK, json={'msgtype': 'text', 'text': {'content': content}}, timeout=10 ) if resp.status_code == 200: logger.info("[DailySummary] 日报已发送到企业微信") return True else: logger.error(f"[DailySummary] 发送失败: {resp.status_code}") return False except Exception as e: logger.error(f"[DailySummary] 发送异常: {e}") return False async def run_daily_summary(): """执行日报任务""" logger.info("[DailySummary] 开始生成每日日报...") report = await generate_daily_report() await send_to_wechat(report) def _seconds_until(target_hour: int, target_minute: int) -> float: """计算距离目标时间的秒数""" now = datetime.now() target = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0) # 如果今天的目标时间已过,则计算到明天 if now >= target: target = target.replace(day=target.day + 1) # 处理月末 try: target = target except ValueError: # 下个月第一天 if target.month == 12: target = target.replace(year=target.year + 1, month=1, day=1) else: target = target.replace(month=target.month + 1, day=1) return (target - now).total_seconds() async def scheduler(): """ 定时任务调度器 - 每天 23:59 执行日报 """ logger.info("[DailySummary] 每日日报定时任务已启动,将在每天 23:59 发送") while True: try: # 计算距离 23:59 的秒数 wait_seconds = _seconds_until(23, 59) hours = int(wait_seconds // 3600) minutes = int((wait_seconds % 3600) // 60) logger.info(f"[DailySummary] 下次日报将在 {hours}小时{minutes}分钟 后发送") # 等待到目标时间 await asyncio.sleep(wait_seconds) # 执行日报 await run_daily_summary() # 等待1分钟,避免重复执行 await asyncio.sleep(60) except asyncio.CancelledError: logger.info("[DailySummary] 定时任务已取消") break except Exception as e: logger.error(f"[DailySummary] 定时任务异常: {e}") # 出错后等待5分钟再重试 await asyncio.sleep(300) # 手动触发(用于测试) async def manual_trigger(): """手动触发日报(测试用)""" await run_daily_summary()