from __future__ import annotations import json import os import sqlite3 from dataclasses import asdict, dataclass from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple ROOT = Path(__file__).resolve().parent.parent ARTIFACT_DIR = ROOT / "evolution" / "artifacts" DEFAULT_POLICY_PATH = ROOT / "config" / "evolution_policy.json" DEFAULT_CANDIDATE_PATH = ROOT / "config" / "evolution_candidate.json" RISK_KEYWORDS = ( "退款", "退货", "投诉", "差评", "举报", "欺骗", "骗人", "不满意", "生气", "法院", "起诉", ) TRANSFER_HINTS = ("转人工", "人工", "为您转接", "专员", "稍后联系") WEAK_REPLY_HINTS = ("不清楚", "不知道", "稍后", "晚点", "我再看下", "等会") EMPATHY_HINTS = ("抱歉", "不好意思", "理解", "辛苦", "感谢反馈") @dataclass class Sample: customer_id: str acc_id: str in_ts: str in_text: str out_ts: str out_text: str latency_sec: int @dataclass class Finding: kind: str severity: str customer_id: str acc_id: str in_ts: str in_text: str out_text: str detail: str @dataclass class ChatSourceConfig: source: str = "auto" # auto | sqlite | mysql sqlite_path: str = str(ROOT / "db" / "chat_log_db" / "chats.db") mysql_host: str = os.getenv("MYSQL_HOST", "127.0.0.1") mysql_port: int = int(os.getenv("MYSQL_PORT", "3306")) mysql_user: str = os.getenv("MYSQL_USER", "root") mysql_password: str = os.getenv("MYSQL_PASSWORD", "") mysql_database: str = os.getenv("MYSQL_DATABASE", "ai_cs") def _parse_ts(ts_text: str) -> Optional[datetime]: if not ts_text: return None try: return datetime.strptime(ts_text, "%Y-%m-%d %H:%M:%S") except ValueError: return None def _to_ts_text(value: Any) -> str: if isinstance(value, datetime): return value.strftime("%Y-%m-%d %H:%M:%S") if value is None: return "" return str(value) def _iter_recent_conversations_sqlite( cfg: ChatSourceConfig, hours: int, max_customers: int, max_messages_per_customer: int, ) -> Iterable[Tuple[str, List[Dict[str, Any]]]]: cutoff_dt = datetime.now() - timedelta(hours=hours) cutoff_text = cutoff_dt.strftime("%Y-%m-%d %H:%M:%S") db_path = Path(cfg.sqlite_path) if not db_path.exists(): return conn = sqlite3.connect(f"file:{db_path.as_posix()}?mode=ro", uri=True) conn.row_factory = sqlite3.Row try: cur = conn.execute( """ SELECT customer_id, MAX(timestamp) AS last_ts FROM chat_logs WHERE timestamp >= ? GROUP BY customer_id ORDER BY last_ts DESC LIMIT ? """, (cutoff_text, max_customers), ) customers = [dict(r) for r in cur.fetchall()] for c in customers: customer_id = str(c.get("customer_id") or "").strip() if not customer_id: continue rows_cur = conn.execute( """ SELECT direction, message, timestamp, acc_id FROM chat_logs WHERE customer_id = ? AND timestamp >= ? ORDER BY timestamp ASC, id ASC LIMIT ? """, (customer_id, cutoff_text, max_messages_per_customer), ) rows = [dict(r) for r in rows_cur.fetchall()] if rows: yield customer_id, rows finally: conn.close() def _iter_recent_conversations_mysql( cfg: ChatSourceConfig, hours: int, max_customers: int, max_messages_per_customer: int, ) -> Iterable[Tuple[str, List[Dict[str, Any]]]]: try: import pymysql except Exception: return cutoff_dt = datetime.now() - timedelta(hours=hours) try: conn = pymysql.connect( host=cfg.mysql_host, port=cfg.mysql_port, user=cfg.mysql_user, password=cfg.mysql_password, database=cfg.mysql_database, charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, autocommit=True, ) except Exception: return try: with conn.cursor() as cur: cur.execute( """ SELECT customer_id, MAX(timestamp) AS last_ts FROM chat_logs WHERE timestamp >= %s GROUP BY customer_id ORDER BY last_ts DESC LIMIT %s """, (cutoff_dt, max_customers), ) customers = cur.fetchall() or [] for c in customers: customer_id = str(c.get("customer_id") or "").strip() if not customer_id: continue with conn.cursor() as cur: cur.execute( """ SELECT direction, message, timestamp, acc_id FROM chat_logs WHERE customer_id = %s AND timestamp >= %s ORDER BY timestamp ASC, id ASC LIMIT %s """, (customer_id, cutoff_dt, max_messages_per_customer), ) rows = cur.fetchall() or [] normalized = [] for r in rows: normalized.append( { "direction": r.get("direction"), "message": r.get("message"), "timestamp": _to_ts_text(r.get("timestamp")), "acc_id": r.get("acc_id"), } ) if normalized: yield customer_id, normalized finally: conn.close() def _iter_recent_conversations( cfg: ChatSourceConfig, hours: int, max_customers: int, max_messages_per_customer: int, ) -> Iterable[Tuple[str, List[Dict[str, Any]]]]: source = (cfg.source or "auto").strip().lower() if source == "sqlite": yield from _iter_recent_conversations_sqlite(cfg, hours, max_customers, max_messages_per_customer) return if source == "mysql": yield from _iter_recent_conversations_mysql(cfg, hours, max_customers, max_messages_per_customer) return # auto: prefer mysql when DB_TYPE=mysql, otherwise sqlite db_type = os.getenv("DB_TYPE", "").strip().lower() if db_type in ("mysql", "mariadb"): got_any = False for item in _iter_recent_conversations_mysql(cfg, hours, max_customers, max_messages_per_customer): got_any = True yield item if got_any: return yield from _iter_recent_conversations_sqlite(cfg, hours, max_customers, max_messages_per_customer) def build_samples( hours: int = 24, max_customers: int = 200, max_messages_per_customer: int = 80, chat_source: Optional[ChatSourceConfig] = None, ) -> List[Sample]: cfg = chat_source or ChatSourceConfig() samples: List[Sample] = [] for customer_id, rows in _iter_recent_conversations( cfg=cfg, hours=hours, max_customers=max_customers, max_messages_per_customer=max_messages_per_customer, ): pending_in: Optional[Dict[str, Any]] = None for row in rows: direction = str(row.get("direction") or "") if direction == "in": pending_in = row continue if direction != "out" or pending_in is None: continue in_text = str(pending_in.get("message") or "").strip() out_text = str(row.get("message") or "").strip() if not in_text: pending_in = None continue in_ts = _parse_ts(str(pending_in.get("timestamp") or "")) out_ts = _parse_ts(str(row.get("timestamp") or "")) latency = 0 if in_ts and out_ts: latency = int((out_ts - in_ts).total_seconds()) samples.append( Sample( customer_id=customer_id, acc_id=str(row.get("acc_id") or pending_in.get("acc_id") or ""), in_ts=str(pending_in.get("timestamp") or ""), in_text=in_text, out_ts=str(row.get("timestamp") or ""), out_text=out_text, latency_sec=max(0, latency), ) ) pending_in = None return samples def evaluate_samples(samples: List[Sample]) -> List[Finding]: findings: List[Finding] = [] for s in samples: in_text = s.in_text out_text = s.out_text inbound_risky = any(k in in_text for k in RISK_KEYWORDS) if not out_text: findings.append( Finding( kind="empty_reply", severity="high", customer_id=s.customer_id, acc_id=s.acc_id, in_ts=s.in_ts, in_text=s.in_text, out_text=s.out_text, detail="收到消息但回复为空", ) ) continue if s.latency_sec > 600: findings.append( Finding( kind="slow_reply", severity="medium", customer_id=s.customer_id, acc_id=s.acc_id, in_ts=s.in_ts, in_text=s.in_text, out_text=s.out_text, detail=f"回复耗时 {s.latency_sec}s (>600s)", ) ) if inbound_risky: has_transfer = any(k in out_text for k in TRANSFER_HINTS) has_empathy = any(k in out_text for k in EMPATHY_HINTS) if not has_transfer: findings.append( Finding( kind="risk_not_transferred", severity="high", customer_id=s.customer_id, acc_id=s.acc_id, in_ts=s.in_ts, in_text=s.in_text, out_text=s.out_text, detail="高风险诉求未出现转人工提示", ) ) if not has_empathy: findings.append( Finding( kind="risk_no_empathy", severity="medium", customer_id=s.customer_id, acc_id=s.acc_id, in_ts=s.in_ts, in_text=s.in_text, out_text=s.out_text, detail="高风险诉求回复缺少安抚语气", ) ) if any(k in out_text for k in WEAK_REPLY_HINTS): findings.append( Finding( kind="weak_reply", severity="medium", customer_id=s.customer_id, acc_id=s.acc_id, in_ts=s.in_ts, in_text=s.in_text, out_text=s.out_text, detail="回复存在低置信度兜底话术", ) ) return findings def summarize_findings(findings: List[Finding]) -> Dict[str, Any]: by_kind: Dict[str, int] = {} by_severity: Dict[str, int] = {} for f in findings: by_kind[f.kind] = by_kind.get(f.kind, 0) + 1 by_severity[f.severity] = by_severity.get(f.severity, 0) + 1 return {"total": len(findings), "by_kind": by_kind, "by_severity": by_severity} def make_proposals(findings: List[Finding], sample_count: int) -> List[Dict[str, Any]]: summary = summarize_findings(findings) by_kind = summary["by_kind"] proposals: List[Dict[str, Any]] = [] if by_kind.get("risk_not_transferred", 0) > 0: proposals.append( { "id": "policy-risk-transfer", "priority": "p0", "module": "policy/prompt", "title": "风险关键词触发后强制转人工", "suggestion": "在风险路由的系统提示词中增加硬规则:遇到退款/投诉/法律威胁类诉求必须调用 transfer_to_human。", "evidence_count": by_kind["risk_not_transferred"], } ) if by_kind.get("risk_no_empathy", 0) > 0: proposals.append( { "id": "tone-empathy-pack", "priority": "p1", "module": "policy/prompt", "title": "高风险场景补充安抚模板", "suggestion": "为投诉类回复追加一段安抚模板,降低激化概率。", "evidence_count": by_kind["risk_no_empathy"], } ) if by_kind.get("weak_reply", 0) > 0: proposals.append( { "id": "fallback-reduction", "priority": "p1", "module": "intent/router", "title": "减少低置信度兜底话术", "suggestion": "出现“不清楚/稍后”等兜底词时,优先触发澄清问题或转人工而非直接结束。", "evidence_count": by_kind["weak_reply"], } ) if by_kind.get("slow_reply", 0) > 0: proposals.append( { "id": "slow-path-timeout", "priority": "p2", "module": "tools/workflow", "title": "慢链路超时与短回复兜底", "suggestion": "当工具调用超过阈值时先发短确认回复,避免长时间无响应。", "evidence_count": by_kind["slow_reply"], } ) proposals.append( { "id": "ops-regression-gate", "priority": "p0", "module": "eval/pipeline", "title": "上线前回归门禁", "suggestion": "新增候选策略必须在离线评测集上通过,再灰度 5% 流量后扩大。", "evidence_count": sample_count, } ) return proposals def load_policy(path: Path = DEFAULT_POLICY_PATH) -> Dict[str, Any]: if not path.exists(): return { "publish_gate": { "min_sample_count": 30, "max_high_findings_rate": 0.08, "max_ai_fail_rate": 5.0, "max_transfer_rate": 45.0, } } return json.loads(path.read_text(encoding="utf-8")) def can_publish_candidate(samples: List[Sample], findings: List[Finding], runtime_hours: int, policy: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: try: from utils.metrics_tracker import get_runtime_summary except Exception: def get_runtime_summary(hours: int = 24) -> Dict[str, Any]: return {"window_hours": hours, "counts": {}, "rates": {"ai_fail_rate": 0.0, "transfer_rate": 0.0}} gate = (policy or {}).get("publish_gate", {}) min_sample_count = int(gate.get("min_sample_count", 30)) max_high_rate = float(gate.get("max_high_findings_rate", 0.08)) max_ai_fail_rate = float(gate.get("max_ai_fail_rate", 5.0)) max_transfer_rate = float(gate.get("max_transfer_rate", 45.0)) high_cnt = sum(1 for f in findings if f.severity == "high") sample_count = max(1, len(samples)) high_rate = high_cnt / sample_count runtime = get_runtime_summary(hours=runtime_hours) ai_fail_rate = float(runtime.get("rates", {}).get("ai_fail_rate", 0.0)) transfer_rate = float(runtime.get("rates", {}).get("transfer_rate", 0.0)) reasons = [] ok = True if len(samples) < min_sample_count: ok = False reasons.append(f"样本不足: {len(samples)} < {min_sample_count}") if high_rate > max_high_rate: ok = False reasons.append(f"高危发现占比过高: {high_rate:.2%} > {max_high_rate:.2%}") if ai_fail_rate > max_ai_fail_rate: ok = False reasons.append(f"AI失败率过高: {ai_fail_rate:.2f}% > {max_ai_fail_rate:.2f}%") if transfer_rate > max_transfer_rate: ok = False reasons.append(f"转人工率过高: {transfer_rate:.2f}% > {max_transfer_rate:.2f}%") return ok, { "sample_count": len(samples), "high_findings": high_cnt, "high_findings_rate": round(high_rate, 4), "runtime": runtime, "policy_gate": gate, "reasons": reasons, } def _write_json(path: Path, payload: Dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def _write_jsonl(path: Path, rows: Iterable[Dict[str, Any]]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as f: for row in rows: f.write(json.dumps(row, ensure_ascii=False) + "\n") def run_cycle( hours: int = 24, max_customers: int = 200, max_messages_per_customer: int = 80, runtime_hours: int = 24, publish: bool = False, chat_source: Optional[ChatSourceConfig] = None, policy_path: Path = DEFAULT_POLICY_PATH, candidate_path: Path = DEFAULT_CANDIDATE_PATH, ) -> Dict[str, Any]: ARTIFACT_DIR.mkdir(parents=True, exist_ok=True) now_tag = datetime.now().strftime("%Y%m%d_%H%M%S") source_error = "" try: samples = build_samples( hours=hours, max_customers=max_customers, max_messages_per_customer=max_messages_per_customer, chat_source=chat_source, ) except Exception as e: samples = [] source_error = str(e) findings = evaluate_samples(samples) proposals = make_proposals(findings=findings, sample_count=len(samples)) policy = load_policy(path=policy_path) publish_ok, gate_report = can_publish_candidate( samples=samples, findings=findings, runtime_hours=runtime_hours, policy=policy, ) sample_file = ARTIFACT_DIR / f"samples_{now_tag}.jsonl" eval_file = ARTIFACT_DIR / f"eval_report_{now_tag}.json" proposal_file = ARTIFACT_DIR / f"proposals_{now_tag}.json" _write_jsonl(sample_file, (asdict(s) for s in samples)) _write_json( eval_file, { "generated_at": datetime.now().isoformat(timespec="seconds"), "sample_count": len(samples), "finding_summary": summarize_findings(findings), "publish_gate_report": gate_report, }, ) _write_json( proposal_file, { "generated_at": datetime.now().isoformat(timespec="seconds"), "proposals": proposals, }, ) published = False candidate_payload: Dict[str, Any] = {} if publish and publish_ok: candidate_payload = { "version": f"candidate-{now_tag}", "created_at": datetime.now().isoformat(timespec="seconds"), "sample_file": str(sample_file), "eval_file": str(eval_file), "proposal_file": str(proposal_file), "gate_report": gate_report, "proposals": proposals, "status": "ready_for_gray_5_percent", } _write_json(candidate_path, candidate_payload) published = True source_view = asdict(chat_source) if chat_source else asdict(ChatSourceConfig()) if source_view.get("mysql_password"): source_view["mysql_password"] = "***" return { "samples": len(samples), "findings": len(findings), "publish_ok": publish_ok, "published": published, "chat_source": source_view, "source_error": source_error, "artifacts": { "samples": str(sample_file), "evaluation": str(eval_file), "proposals": str(proposal_file), "candidate": str(candidate_path) if published else "", }, "gate_report": gate_report, "top_proposals": proposals[:3], }