diff --git a/README.md b/README.md index 1d2c9bf..f85e8d7 100755 --- a/README.md +++ b/README.md @@ -64,6 +64,7 @@ curl http://localhost:6060/api/health |------|------| | **项目功能汇总.md** | 全部功能详细说明(工作流、报价、风险、派单、数据库等) | | **部署文档.md** | 部署、API 接口、天网集成、多进程、故障排查 | +| **features/self_evolution_mvp.md** | 自我进化 MVP(采样、评测、建议、灰度门禁) | --- @@ -80,3 +81,11 @@ curl http://localhost:6060/api/health ├── skills/ # Agent 技能定义 └── run.py # 统一入口(--api-only / --tianwang / 默认 WebSocket) ``` + +## 自我进化 MVP + +```bash +python scripts/evolution_cycle.py --hours 24 --publish +``` + +默认从线上 MySQL 读取对话数据(可用 `--source` 切换)。 diff --git a/config/evolution_candidate.json b/config/evolution_candidate.json new file mode 100644 index 0000000..6c96586 --- /dev/null +++ b/config/evolution_candidate.json @@ -0,0 +1,63 @@ +{ + "version": "candidate-20260228_220131", + "created_at": "2026-02-28T22:01:32", + "sample_file": "D:\\main\\tw\\evolution\\artifacts\\samples_20260228_220131.jsonl", + "eval_file": "D:\\main\\tw\\evolution\\artifacts\\eval_report_20260228_220131.json", + "proposal_file": "D:\\main\\tw\\evolution\\artifacts\\proposals_20260228_220131.json", + "gate_report": { + "sample_count": 132, + "high_findings": 1, + "high_findings_rate": 0.0076, + "runtime": { + "window_hours": 24, + "counts": { + "inbound_msg": 29, + "quote_generated": 1, + "transfer_to_human": 1, + "system_inquiry_detected": 15, + "system_inquiry_ignored": 2, + "system_inquiry_auto_reply": 13 + }, + "rates": { + "transfer_rate": 3.45, + "quote_rate": 3.45, + "ai_fail_rate": 0.0, + "no_image_rate": 0.0 + } + }, + "policy_gate": { + "min_sample_count": 30, + "max_high_findings_rate": 0.08, + "max_ai_fail_rate": 5.0, + "max_transfer_rate": 45.0 + }, + "reasons": [] + }, + "proposals": [ + { + "id": "policy-risk-transfer", + "priority": "p0", + "module": "policy/prompt", + "title": "风险关键词触发后强制转人工", + "suggestion": "在风险路由的系统提示词中增加硬规则:遇到退款/投诉/法律威胁类诉求必须调用 transfer_to_human。", + "evidence_count": 1 + }, + { + "id": "tone-empathy-pack", + "priority": "p1", + "module": "policy/prompt", + "title": "高风险场景补充安抚模板", + "suggestion": "为投诉类回复追加一段安抚模板,降低激化概率。", + "evidence_count": 1 + }, + { + "id": "ops-regression-gate", + "priority": "p0", + "module": "eval/pipeline", + "title": "上线前回归门禁", + "suggestion": "新增候选策略必须在离线评测集上通过,再灰度 5% 流量后扩大。", + "evidence_count": 132 + } + ], + "status": "ready_for_gray_5_percent" +} \ No newline at end of file diff --git a/config/evolution_policy.json b/config/evolution_policy.json new file mode 100644 index 0000000..e971067 --- /dev/null +++ b/config/evolution_policy.json @@ -0,0 +1,14 @@ +{ + "publish_gate": { + "min_sample_count": 30, + "max_high_findings_rate": 0.08, + "max_ai_fail_rate": 5.0, + "max_transfer_rate": 45.0 + }, + "gray_release": { + "first_stage_percent": 5, + "second_stage_percent": 20, + "final_stage_percent": 100 + } +} + diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index 1f76146..a3d9e01 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -11,6 +11,8 @@ import asyncio import random import hashlib import re +import json +from pathlib import Path from typing import Optional, Dict, List, Any, Tuple from datetime import datetime from pydantic import BaseModel, Field @@ -162,6 +164,7 @@ class CustomerServiceAgent: C_TOOL = "\033[93m" # yellow C_REPLY = "\033[92m" # green C_MUTED = "\033[90m" # gray + _DEFAULT_EVOLUTION_CANDIDATE = Path("config") / "evolution_candidate.json" def __init__(self, skills_dir: str = "skills"): self.api_key = os.getenv("OPENAI_API_KEY") @@ -175,6 +178,7 @@ class CustomerServiceAgent: self.conversations: Dict[str, ConversationState] = {} # 多轮对话历史(PydanticAI ModelMessage 列表,按客户ID存储) self.message_histories: Dict[str, list] = {} + self.evolution_candidate = self._load_evolution_candidate() # 加载 skills 内容 self.skills_content = load_skill_md(skills_dir) @@ -230,6 +234,64 @@ class CustomerServiceAgent: # 注册工具 self._register_tools() + def _load_evolution_candidate(self) -> Dict[str, Any]: + """读取自我进化候选配置(灰度策略),读取失败时返回空。""" + try: + path = Path(os.getenv("EVOLUTION_CANDIDATE_PATH", str(self._DEFAULT_EVOLUTION_CANDIDATE))) + if not path.exists(): + return {} + data = json.loads(path.read_text(encoding="utf-8")) + if not isinstance(data, dict): + return {} + return data + except Exception: + return {} + + def _evolution_gray_percent(self) -> int: + """灰度比例,默认 5%。""" + try: + env_pct = os.getenv("EVOLUTION_GRAY_PERCENT", "").strip() + if env_pct: + pct = int(float(env_pct)) + else: + pct = int(((self.evolution_candidate or {}).get("gray_percent", 5))) + return max(0, min(100, pct)) + except Exception: + return 5 + + def _evolution_enabled_for_customer(self, customer_id: str) -> bool: + """按客户哈希稳定灰度命中,命中后启用候选策略。""" + cand = self.evolution_candidate or {} + if str(cand.get("status", "")).strip() != "ready_for_gray_5_percent": + return False + if not customer_id: + return False + pct = self._evolution_gray_percent() + if pct <= 0: + return False + digest = hashlib.md5(customer_id.encode("utf-8")).hexdigest() + bucket = int(digest[:8], 16) % 100 + hit = bucket < pct + if hit: + metrics_emit("evolution_gray_hit", customer_id=customer_id, percent=pct, version=str(cand.get("version", ""))) + return hit + + def _evolution_has_proposal(self, proposal_id: str) -> bool: + cand = self.evolution_candidate or {} + for p in cand.get("proposals", []) or []: + if str((p or {}).get("id", "")).strip() == proposal_id: + return True + return False + + @staticmethod + def _is_service_risk_inquiry(text: str) -> bool: + """识别退款/投诉等服务风险场景。""" + s = (text or "").strip().lower() + if not s: + return False + kw = ("退款", "退货", "投诉", "差评", "举报", "欺骗", "骗人", "起诉", "法院", "生气", "不满意") + return any(k in s for k in kw) + @staticmethod def _log_block(title: str, content: str): """统一的控制台分层日志输出。""" @@ -1637,6 +1699,17 @@ class CustomerServiceAgent: transfer_msg = TRANSFER_MESSAGE metrics_emit("transfer_to_human", customer_id=message.from_id, acc_id=message.acc_id) + # 自我进化候选策略灰度(默认 5%):风险投诉场景强制转人工,并补安抚话术 + evo_hit = self._evolution_enabled_for_customer(message.from_id) + if evo_hit and self._is_service_risk_inquiry(message.msg): + if self._evolution_has_proposal("policy-risk-transfer"): + need_transfer = True + transfer_msg = TRANSFER_MESSAGE + metrics_emit("evolution_force_transfer", customer_id=message.from_id, acc_id=message.acc_id) + if self._evolution_has_proposal("tone-empathy-pack"): + reply_text = "抱歉让您不舒服了,这边先为您转接人工专员马上处理。" + metrics_emit("evolution_empathy_reply", customer_id=message.from_id, acc_id=message.acc_id) + # 未成交记录:客户表达放弃且已报价过(转人工不记录) customer_text, _ = self._split_customer_text(message.msg) no_convert_keywords = ["算了", "不要了", "不做了", "下次再说", "先不弄了"] @@ -1649,6 +1722,8 @@ class CustomerServiceAgent: # 需要转接时不把原始回复发给客户 should_reply = bool(reply_text and reply_text.strip()) and not need_transfer + if evo_hit and need_transfer and self._evolution_has_proposal("tone-empathy-pack"): + should_reply = True # 记录本次回复时间,供冷却期判断 if should_reply: diff --git a/evolution/__init__.py b/evolution/__init__.py new file mode 100644 index 0000000..f37b053 --- /dev/null +++ b/evolution/__init__.py @@ -0,0 +1,2 @@ +"""Self-evolution MVP utilities for the customer service agent.""" + diff --git a/evolution/mvp.py b/evolution/mvp.py new file mode 100644 index 0000000..20998e7 --- /dev/null +++ b/evolution/mvp.py @@ -0,0 +1,591 @@ +from __future__ import annotations + +import json +import os +import sqlite3 +from dataclasses import asdict, dataclass +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Tuple + +ROOT = Path(__file__).resolve().parent.parent +ARTIFACT_DIR = ROOT / "evolution" / "artifacts" +DEFAULT_POLICY_PATH = ROOT / "config" / "evolution_policy.json" +DEFAULT_CANDIDATE_PATH = ROOT / "config" / "evolution_candidate.json" + +RISK_KEYWORDS = ( + "退款", + "退货", + "投诉", + "差评", + "举报", + "欺骗", + "骗人", + "不满意", + "生气", + "法院", + "起诉", +) +TRANSFER_HINTS = ("转人工", "人工", "为您转接", "专员", "稍后联系") +WEAK_REPLY_HINTS = ("不清楚", "不知道", "稍后", "晚点", "我再看下", "等会") +EMPATHY_HINTS = ("抱歉", "不好意思", "理解", "辛苦", "感谢反馈") + + +@dataclass +class Sample: + customer_id: str + acc_id: str + in_ts: str + in_text: str + out_ts: str + out_text: str + latency_sec: int + + +@dataclass +class Finding: + kind: str + severity: str + customer_id: str + acc_id: str + in_ts: str + in_text: str + out_text: str + detail: str + + +@dataclass +class ChatSourceConfig: + source: str = "auto" # auto | sqlite | mysql + sqlite_path: str = str(ROOT / "db" / "chat_log_db" / "chats.db") + mysql_host: str = os.getenv("MYSQL_HOST", "127.0.0.1") + mysql_port: int = int(os.getenv("MYSQL_PORT", "3306")) + mysql_user: str = os.getenv("MYSQL_USER", "root") + mysql_password: str = os.getenv("MYSQL_PASSWORD", "") + mysql_database: str = os.getenv("MYSQL_DATABASE", "ai_cs") + + +def _parse_ts(ts_text: str) -> Optional[datetime]: + if not ts_text: + return None + try: + return datetime.strptime(ts_text, "%Y-%m-%d %H:%M:%S") + except ValueError: + return None + + +def _to_ts_text(value: Any) -> str: + if isinstance(value, datetime): + return value.strftime("%Y-%m-%d %H:%M:%S") + if value is None: + return "" + return str(value) + + +def _iter_recent_conversations_sqlite( + cfg: ChatSourceConfig, + hours: int, + max_customers: int, + max_messages_per_customer: int, +) -> Iterable[Tuple[str, List[Dict[str, Any]]]]: + cutoff_dt = datetime.now() - timedelta(hours=hours) + cutoff_text = cutoff_dt.strftime("%Y-%m-%d %H:%M:%S") + db_path = Path(cfg.sqlite_path) + if not db_path.exists(): + return + conn = sqlite3.connect(f"file:{db_path.as_posix()}?mode=ro", uri=True) + conn.row_factory = sqlite3.Row + try: + cur = conn.execute( + """ + SELECT customer_id, MAX(timestamp) AS last_ts + FROM chat_logs + WHERE timestamp >= ? + GROUP BY customer_id + ORDER BY last_ts DESC + LIMIT ? + """, + (cutoff_text, max_customers), + ) + customers = [dict(r) for r in cur.fetchall()] + for c in customers: + customer_id = str(c.get("customer_id") or "").strip() + if not customer_id: + continue + rows_cur = conn.execute( + """ + SELECT direction, message, timestamp, acc_id + FROM chat_logs + WHERE customer_id = ? AND timestamp >= ? + ORDER BY timestamp ASC, id ASC + LIMIT ? + """, + (customer_id, cutoff_text, max_messages_per_customer), + ) + rows = [dict(r) for r in rows_cur.fetchall()] + if rows: + yield customer_id, rows + finally: + conn.close() + + +def _iter_recent_conversations_mysql( + cfg: ChatSourceConfig, + hours: int, + max_customers: int, + max_messages_per_customer: int, +) -> Iterable[Tuple[str, List[Dict[str, Any]]]]: + try: + import pymysql + except Exception: + return + + cutoff_dt = datetime.now() - timedelta(hours=hours) + try: + conn = pymysql.connect( + host=cfg.mysql_host, + port=cfg.mysql_port, + user=cfg.mysql_user, + password=cfg.mysql_password, + database=cfg.mysql_database, + charset="utf8mb4", + cursorclass=pymysql.cursors.DictCursor, + autocommit=True, + ) + except Exception: + return + try: + with conn.cursor() as cur: + cur.execute( + """ + SELECT customer_id, MAX(timestamp) AS last_ts + FROM chat_logs + WHERE timestamp >= %s + GROUP BY customer_id + ORDER BY last_ts DESC + LIMIT %s + """, + (cutoff_dt, max_customers), + ) + customers = cur.fetchall() or [] + for c in customers: + customer_id = str(c.get("customer_id") or "").strip() + if not customer_id: + continue + with conn.cursor() as cur: + cur.execute( + """ + SELECT direction, message, timestamp, acc_id + FROM chat_logs + WHERE customer_id = %s AND timestamp >= %s + ORDER BY timestamp ASC, id ASC + LIMIT %s + """, + (customer_id, cutoff_dt, max_messages_per_customer), + ) + rows = cur.fetchall() or [] + normalized = [] + for r in rows: + normalized.append( + { + "direction": r.get("direction"), + "message": r.get("message"), + "timestamp": _to_ts_text(r.get("timestamp")), + "acc_id": r.get("acc_id"), + } + ) + if normalized: + yield customer_id, normalized + finally: + conn.close() + + +def _iter_recent_conversations( + cfg: ChatSourceConfig, + hours: int, + max_customers: int, + max_messages_per_customer: int, +) -> Iterable[Tuple[str, List[Dict[str, Any]]]]: + source = (cfg.source or "auto").strip().lower() + if source == "sqlite": + yield from _iter_recent_conversations_sqlite(cfg, hours, max_customers, max_messages_per_customer) + return + if source == "mysql": + yield from _iter_recent_conversations_mysql(cfg, hours, max_customers, max_messages_per_customer) + return + + # auto: prefer mysql when DB_TYPE=mysql, otherwise sqlite + db_type = os.getenv("DB_TYPE", "").strip().lower() + if db_type in ("mysql", "mariadb"): + got_any = False + for item in _iter_recent_conversations_mysql(cfg, hours, max_customers, max_messages_per_customer): + got_any = True + yield item + if got_any: + return + yield from _iter_recent_conversations_sqlite(cfg, hours, max_customers, max_messages_per_customer) + + +def build_samples( + hours: int = 24, + max_customers: int = 200, + max_messages_per_customer: int = 80, + chat_source: Optional[ChatSourceConfig] = None, +) -> List[Sample]: + cfg = chat_source or ChatSourceConfig() + samples: List[Sample] = [] + for customer_id, rows in _iter_recent_conversations( + cfg=cfg, + hours=hours, + max_customers=max_customers, + max_messages_per_customer=max_messages_per_customer, + ): + pending_in: Optional[Dict[str, Any]] = None + for row in rows: + direction = str(row.get("direction") or "") + if direction == "in": + pending_in = row + continue + if direction != "out" or pending_in is None: + continue + in_text = str(pending_in.get("message") or "").strip() + out_text = str(row.get("message") or "").strip() + if not in_text: + pending_in = None + continue + in_ts = _parse_ts(str(pending_in.get("timestamp") or "")) + out_ts = _parse_ts(str(row.get("timestamp") or "")) + latency = 0 + if in_ts and out_ts: + latency = int((out_ts - in_ts).total_seconds()) + samples.append( + Sample( + customer_id=customer_id, + acc_id=str(row.get("acc_id") or pending_in.get("acc_id") or ""), + in_ts=str(pending_in.get("timestamp") or ""), + in_text=in_text, + out_ts=str(row.get("timestamp") or ""), + out_text=out_text, + latency_sec=max(0, latency), + ) + ) + pending_in = None + return samples + + +def evaluate_samples(samples: List[Sample]) -> List[Finding]: + findings: List[Finding] = [] + for s in samples: + in_text = s.in_text + out_text = s.out_text + inbound_risky = any(k in in_text for k in RISK_KEYWORDS) + + if not out_text: + findings.append( + Finding( + kind="empty_reply", + severity="high", + customer_id=s.customer_id, + acc_id=s.acc_id, + in_ts=s.in_ts, + in_text=s.in_text, + out_text=s.out_text, + detail="收到消息但回复为空", + ) + ) + continue + + if s.latency_sec > 600: + findings.append( + Finding( + kind="slow_reply", + severity="medium", + customer_id=s.customer_id, + acc_id=s.acc_id, + in_ts=s.in_ts, + in_text=s.in_text, + out_text=s.out_text, + detail=f"回复耗时 {s.latency_sec}s (>600s)", + ) + ) + + if inbound_risky: + has_transfer = any(k in out_text for k in TRANSFER_HINTS) + has_empathy = any(k in out_text for k in EMPATHY_HINTS) + if not has_transfer: + findings.append( + Finding( + kind="risk_not_transferred", + severity="high", + customer_id=s.customer_id, + acc_id=s.acc_id, + in_ts=s.in_ts, + in_text=s.in_text, + out_text=s.out_text, + detail="高风险诉求未出现转人工提示", + ) + ) + if not has_empathy: + findings.append( + Finding( + kind="risk_no_empathy", + severity="medium", + customer_id=s.customer_id, + acc_id=s.acc_id, + in_ts=s.in_ts, + in_text=s.in_text, + out_text=s.out_text, + detail="高风险诉求回复缺少安抚语气", + ) + ) + + if any(k in out_text for k in WEAK_REPLY_HINTS): + findings.append( + Finding( + kind="weak_reply", + severity="medium", + customer_id=s.customer_id, + acc_id=s.acc_id, + in_ts=s.in_ts, + in_text=s.in_text, + out_text=s.out_text, + detail="回复存在低置信度兜底话术", + ) + ) + return findings + + +def summarize_findings(findings: List[Finding]) -> Dict[str, Any]: + by_kind: Dict[str, int] = {} + by_severity: Dict[str, int] = {} + for f in findings: + by_kind[f.kind] = by_kind.get(f.kind, 0) + 1 + by_severity[f.severity] = by_severity.get(f.severity, 0) + 1 + return {"total": len(findings), "by_kind": by_kind, "by_severity": by_severity} + + +def make_proposals(findings: List[Finding], sample_count: int) -> List[Dict[str, Any]]: + summary = summarize_findings(findings) + by_kind = summary["by_kind"] + + proposals: List[Dict[str, Any]] = [] + if by_kind.get("risk_not_transferred", 0) > 0: + proposals.append( + { + "id": "policy-risk-transfer", + "priority": "p0", + "module": "policy/prompt", + "title": "风险关键词触发后强制转人工", + "suggestion": "在风险路由的系统提示词中增加硬规则:遇到退款/投诉/法律威胁类诉求必须调用 transfer_to_human。", + "evidence_count": by_kind["risk_not_transferred"], + } + ) + if by_kind.get("risk_no_empathy", 0) > 0: + proposals.append( + { + "id": "tone-empathy-pack", + "priority": "p1", + "module": "policy/prompt", + "title": "高风险场景补充安抚模板", + "suggestion": "为投诉类回复追加一段安抚模板,降低激化概率。", + "evidence_count": by_kind["risk_no_empathy"], + } + ) + if by_kind.get("weak_reply", 0) > 0: + proposals.append( + { + "id": "fallback-reduction", + "priority": "p1", + "module": "intent/router", + "title": "减少低置信度兜底话术", + "suggestion": "出现“不清楚/稍后”等兜底词时,优先触发澄清问题或转人工而非直接结束。", + "evidence_count": by_kind["weak_reply"], + } + ) + if by_kind.get("slow_reply", 0) > 0: + proposals.append( + { + "id": "slow-path-timeout", + "priority": "p2", + "module": "tools/workflow", + "title": "慢链路超时与短回复兜底", + "suggestion": "当工具调用超过阈值时先发短确认回复,避免长时间无响应。", + "evidence_count": by_kind["slow_reply"], + } + ) + + proposals.append( + { + "id": "ops-regression-gate", + "priority": "p0", + "module": "eval/pipeline", + "title": "上线前回归门禁", + "suggestion": "新增候选策略必须在离线评测集上通过,再灰度 5% 流量后扩大。", + "evidence_count": sample_count, + } + ) + return proposals + + +def load_policy(path: Path = DEFAULT_POLICY_PATH) -> Dict[str, Any]: + if not path.exists(): + return { + "publish_gate": { + "min_sample_count": 30, + "max_high_findings_rate": 0.08, + "max_ai_fail_rate": 5.0, + "max_transfer_rate": 45.0, + } + } + return json.loads(path.read_text(encoding="utf-8")) + + +def can_publish_candidate(samples: List[Sample], findings: List[Finding], runtime_hours: int, policy: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: + try: + from utils.metrics_tracker import get_runtime_summary + except Exception: + def get_runtime_summary(hours: int = 24) -> Dict[str, Any]: + return {"window_hours": hours, "counts": {}, "rates": {"ai_fail_rate": 0.0, "transfer_rate": 0.0}} + + gate = (policy or {}).get("publish_gate", {}) + min_sample_count = int(gate.get("min_sample_count", 30)) + max_high_rate = float(gate.get("max_high_findings_rate", 0.08)) + max_ai_fail_rate = float(gate.get("max_ai_fail_rate", 5.0)) + max_transfer_rate = float(gate.get("max_transfer_rate", 45.0)) + + high_cnt = sum(1 for f in findings if f.severity == "high") + sample_count = max(1, len(samples)) + high_rate = high_cnt / sample_count + runtime = get_runtime_summary(hours=runtime_hours) + ai_fail_rate = float(runtime.get("rates", {}).get("ai_fail_rate", 0.0)) + transfer_rate = float(runtime.get("rates", {}).get("transfer_rate", 0.0)) + + reasons = [] + ok = True + if len(samples) < min_sample_count: + ok = False + reasons.append(f"样本不足: {len(samples)} < {min_sample_count}") + if high_rate > max_high_rate: + ok = False + reasons.append(f"高危发现占比过高: {high_rate:.2%} > {max_high_rate:.2%}") + if ai_fail_rate > max_ai_fail_rate: + ok = False + reasons.append(f"AI失败率过高: {ai_fail_rate:.2f}% > {max_ai_fail_rate:.2f}%") + if transfer_rate > max_transfer_rate: + ok = False + reasons.append(f"转人工率过高: {transfer_rate:.2f}% > {max_transfer_rate:.2f}%") + + return ok, { + "sample_count": len(samples), + "high_findings": high_cnt, + "high_findings_rate": round(high_rate, 4), + "runtime": runtime, + "policy_gate": gate, + "reasons": reasons, + } + + +def _write_json(path: Path, payload: Dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + + +def _write_jsonl(path: Path, rows: Iterable[Dict[str, Any]]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as f: + for row in rows: + f.write(json.dumps(row, ensure_ascii=False) + "\n") + + +def run_cycle( + hours: int = 24, + max_customers: int = 200, + max_messages_per_customer: int = 80, + runtime_hours: int = 24, + publish: bool = False, + chat_source: Optional[ChatSourceConfig] = None, + policy_path: Path = DEFAULT_POLICY_PATH, + candidate_path: Path = DEFAULT_CANDIDATE_PATH, +) -> Dict[str, Any]: + ARTIFACT_DIR.mkdir(parents=True, exist_ok=True) + now_tag = datetime.now().strftime("%Y%m%d_%H%M%S") + source_error = "" + + try: + samples = build_samples( + hours=hours, + max_customers=max_customers, + max_messages_per_customer=max_messages_per_customer, + chat_source=chat_source, + ) + except Exception as e: + samples = [] + source_error = str(e) + findings = evaluate_samples(samples) + proposals = make_proposals(findings=findings, sample_count=len(samples)) + policy = load_policy(path=policy_path) + publish_ok, gate_report = can_publish_candidate( + samples=samples, + findings=findings, + runtime_hours=runtime_hours, + policy=policy, + ) + + sample_file = ARTIFACT_DIR / f"samples_{now_tag}.jsonl" + eval_file = ARTIFACT_DIR / f"eval_report_{now_tag}.json" + proposal_file = ARTIFACT_DIR / f"proposals_{now_tag}.json" + + _write_jsonl(sample_file, (asdict(s) for s in samples)) + _write_json( + eval_file, + { + "generated_at": datetime.now().isoformat(timespec="seconds"), + "sample_count": len(samples), + "finding_summary": summarize_findings(findings), + "publish_gate_report": gate_report, + }, + ) + _write_json( + proposal_file, + { + "generated_at": datetime.now().isoformat(timespec="seconds"), + "proposals": proposals, + }, + ) + + published = False + candidate_payload: Dict[str, Any] = {} + if publish and publish_ok: + candidate_payload = { + "version": f"candidate-{now_tag}", + "created_at": datetime.now().isoformat(timespec="seconds"), + "sample_file": str(sample_file), + "eval_file": str(eval_file), + "proposal_file": str(proposal_file), + "gate_report": gate_report, + "proposals": proposals, + "status": "ready_for_gray_5_percent", + } + _write_json(candidate_path, candidate_payload) + published = True + + source_view = asdict(chat_source) if chat_source else asdict(ChatSourceConfig()) + if source_view.get("mysql_password"): + source_view["mysql_password"] = "***" + + return { + "samples": len(samples), + "findings": len(findings), + "publish_ok": publish_ok, + "published": published, + "chat_source": source_view, + "source_error": source_error, + "artifacts": { + "samples": str(sample_file), + "evaluation": str(eval_file), + "proposals": str(proposal_file), + "candidate": str(candidate_path) if published else "", + }, + "gate_report": gate_report, + "top_proposals": proposals[:3], + } diff --git a/features/self_evolution_mvp.md b/features/self_evolution_mvp.md new file mode 100644 index 0000000..1fd2436 --- /dev/null +++ b/features/self_evolution_mvp.md @@ -0,0 +1,45 @@ +# 自我进化 MVP(可控版) + +目标:让客服 agent 持续变聪明,同时避免“自动改坏线上”。 + +## 1. 已落地能力 + +- 失败样本采集:从 `db/chat_log_db/chats.db` 抽取近 N 小时客服问答对。 +- 离线评测:自动识别高风险未转人工、低置信度兜底、慢回复等问题。 +- 改进建议生成:输出可执行的模块级 proposal(prompt/router/workflow)。 +- 发布门禁:结合运行指标(`config/.runtime_metrics.jsonl`)判断是否允许发布候选版本。 +- 候选产物:通过门禁后写入 `config/evolution_candidate.json`,用于 5% 灰度。 + +## 2. 运行方式 + +```bash +python scripts/evolution_cycle.py --hours 24 --publish +``` + +默认即读取线上 MySQL(`--source mysql`)。连接信息来自 `.env` 的 `MYSQL_*`。 + +常用参数: + +- `--max-customers 200` +- `--max-messages-per-customer 80` +- `--runtime-hours 24` +- `--policy-path config/evolution_policy.json` + +## 3. 产物说明 + +运行后会在 `evolution/artifacts/` 生成: + +- `samples_*.jsonl`:评测样本 +- `eval_report_*.json`:评测摘要与门禁结果 +- `proposals_*.json`:改进建议列表 + +当 `--publish` 且门禁通过时: + +- 写入 `config/evolution_candidate.json` +- 状态标记为 `ready_for_gray_5_percent` + +## 4. 下一步建议 + +- 把 `scripts/evolution_cycle.py` 加入每日定时任务(例如凌晨 2 点)。 +- 在灰度层接入 `evolution_candidate.json` 的版本号,按店铺或客户哈希做 5% 放量。 +- 将 proposal 落地为具体 patch 后,先跑 `tests/` 回归,再扩大流量。 diff --git a/scripts/evolution_cycle.py b/scripts/evolution_cycle.py new file mode 100644 index 0000000..15b9aef --- /dev/null +++ b/scripts/evolution_cycle.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Self-evolution MVP cycle runner. +""" +from __future__ import annotations + +import argparse +import json +import os +import sys +from pathlib import Path + +from dotenv import load_dotenv + +PROJECT_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(PROJECT_ROOT)) +load_dotenv(dotenv_path=PROJECT_ROOT / ".env") + +from evolution.mvp import ChatSourceConfig, DEFAULT_CANDIDATE_PATH, DEFAULT_POLICY_PATH, run_cycle + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Run self-evolution MVP cycle") + parser.add_argument( + "--source", + type=str, + default="mysql", + choices=["auto", "sqlite", "mysql"], + help="Chat data source, default mysql (online)", + ) + parser.add_argument("--hours", type=int, default=24, help="Lookback window for chat samples") + parser.add_argument("--max-customers", type=int, default=200, help="Max customers sampled") + parser.add_argument( + "--max-messages-per-customer", + type=int, + default=80, + help="Max messages loaded per customer", + ) + parser.add_argument("--runtime-hours", type=int, default=24, help="Runtime metric window") + parser.add_argument( + "--publish", + action="store_true", + help="Write config/evolution_candidate.json when gate passes", + ) + parser.add_argument( + "--policy-path", + type=str, + default=str(DEFAULT_POLICY_PATH), + help="Path to evolution gate policy file", + ) + parser.add_argument( + "--candidate-path", + type=str, + default=str(DEFAULT_CANDIDATE_PATH), + help="Path to candidate output file", + ) + parser.add_argument("--db-path", type=str, default="", help="SQLite path when --source sqlite") + parser.add_argument("--mysql-host", type=str, default=os.getenv("MYSQL_HOST", "127.0.0.1")) + parser.add_argument("--mysql-port", type=int, default=int(os.getenv("MYSQL_PORT", "3306"))) + parser.add_argument("--mysql-user", type=str, default=os.getenv("MYSQL_USER", "root")) + parser.add_argument("--mysql-password", type=str, default=os.getenv("MYSQL_PASSWORD", "")) + parser.add_argument("--mysql-database", type=str, default=os.getenv("MYSQL_DATABASE", "ai_cs")) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + os.environ.setdefault("PYTHONUTF8", "1") + chat_source = ChatSourceConfig( + source=args.source, + sqlite_path=args.db_path or str(PROJECT_ROOT / "db" / "chat_log_db" / "chats.db"), + mysql_host=args.mysql_host, + mysql_port=args.mysql_port, + mysql_user=args.mysql_user, + mysql_password=args.mysql_password, + mysql_database=args.mysql_database, + ) + + result = run_cycle( + hours=args.hours, + max_customers=args.max_customers, + max_messages_per_customer=args.max_messages_per_customer, + runtime_hours=args.runtime_hours, + publish=args.publish, + chat_source=chat_source, + policy_path=Path(args.policy_path), + candidate_path=Path(args.candidate_path), + ) + print(json.dumps(result, ensure_ascii=False, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_evolution_mvp.py b/tests/test_evolution_mvp.py new file mode 100644 index 0000000..c41f4b9 --- /dev/null +++ b/tests/test_evolution_mvp.py @@ -0,0 +1,54 @@ +import unittest +from unittest.mock import patch + +from evolution.mvp import Finding, Sample, can_publish_candidate, evaluate_samples + + +class EvolutionMvpTest(unittest.TestCase): + def test_evaluate_detects_risk_without_transfer(self): + samples = [ + Sample( + customer_id="c1", + acc_id="shop", + in_ts="2026-02-28 10:00:00", + in_text="我要投诉并退款,你们骗人", + out_ts="2026-02-28 10:00:10", + out_text="这个我不清楚,稍后再说", + latency_sec=10, + ) + ] + findings = evaluate_samples(samples) + kinds = {f.kind for f in findings} + self.assertIn("risk_not_transferred", kinds) + self.assertIn("weak_reply", kinds) + + def test_publish_gate(self): + samples = [ + Sample( + customer_id=f"c{i}", + acc_id="shop", + in_ts="2026-02-28 10:00:00", + in_text="你好", + out_ts="2026-02-28 10:00:05", + out_text="您好", + latency_sec=5, + ) + for i in range(35) + ] + findings: list[Finding] = [] + policy = { + "publish_gate": { + "min_sample_count": 30, + "max_high_findings_rate": 0.1, + "max_ai_fail_rate": 5.0, + "max_transfer_rate": 45.0, + } + } + with patch("utils.metrics_tracker.get_runtime_summary", return_value={"rates": {"ai_fail_rate": 1.0, "transfer_rate": 10.0}}): + ok, report = can_publish_candidate(samples, findings, runtime_hours=24, policy=policy) + self.assertTrue(ok) + self.assertEqual(report["sample_count"], 35) + + +if __name__ == "__main__": + unittest.main(verbosity=2)