feat: add online evolution loop and 5% gray risk-policy rollout

This commit is contained in:
2026-02-28 22:03:30 +08:00
parent fec5aaf8f3
commit d497e8d42a
9 changed files with 948 additions and 0 deletions

View File

@@ -64,6 +64,7 @@ curl http://localhost:6060/api/health
|------|------|
| **项目功能汇总.md** | 全部功能详细说明(工作流、报价、风险、派单、数据库等) |
| **部署文档.md** | 部署、API 接口、天网集成、多进程、故障排查 |
| **features/self_evolution_mvp.md** | 自我进化 MVP采样、评测、建议、灰度门禁 |
---
@@ -80,3 +81,11 @@ curl http://localhost:6060/api/health
├── skills/ # Agent 技能定义
└── run.py # 统一入口(--api-only / --tianwang / 默认 WebSocket
```
## 自我进化 MVP
```bash
python scripts/evolution_cycle.py --hours 24 --publish
```
默认从线上 MySQL 读取对话数据(可用 `--source` 切换)。

View File

@@ -0,0 +1,63 @@
{
"version": "candidate-20260228_220131",
"created_at": "2026-02-28T22:01:32",
"sample_file": "D:\\main\\tw\\evolution\\artifacts\\samples_20260228_220131.jsonl",
"eval_file": "D:\\main\\tw\\evolution\\artifacts\\eval_report_20260228_220131.json",
"proposal_file": "D:\\main\\tw\\evolution\\artifacts\\proposals_20260228_220131.json",
"gate_report": {
"sample_count": 132,
"high_findings": 1,
"high_findings_rate": 0.0076,
"runtime": {
"window_hours": 24,
"counts": {
"inbound_msg": 29,
"quote_generated": 1,
"transfer_to_human": 1,
"system_inquiry_detected": 15,
"system_inquiry_ignored": 2,
"system_inquiry_auto_reply": 13
},
"rates": {
"transfer_rate": 3.45,
"quote_rate": 3.45,
"ai_fail_rate": 0.0,
"no_image_rate": 0.0
}
},
"policy_gate": {
"min_sample_count": 30,
"max_high_findings_rate": 0.08,
"max_ai_fail_rate": 5.0,
"max_transfer_rate": 45.0
},
"reasons": []
},
"proposals": [
{
"id": "policy-risk-transfer",
"priority": "p0",
"module": "policy/prompt",
"title": "风险关键词触发后强制转人工",
"suggestion": "在风险路由的系统提示词中增加硬规则:遇到退款/投诉/法律威胁类诉求必须调用 transfer_to_human。",
"evidence_count": 1
},
{
"id": "tone-empathy-pack",
"priority": "p1",
"module": "policy/prompt",
"title": "高风险场景补充安抚模板",
"suggestion": "为投诉类回复追加一段安抚模板,降低激化概率。",
"evidence_count": 1
},
{
"id": "ops-regression-gate",
"priority": "p0",
"module": "eval/pipeline",
"title": "上线前回归门禁",
"suggestion": "新增候选策略必须在离线评测集上通过,再灰度 5% 流量后扩大。",
"evidence_count": 132
}
],
"status": "ready_for_gray_5_percent"
}

View File

@@ -0,0 +1,14 @@
{
"publish_gate": {
"min_sample_count": 30,
"max_high_findings_rate": 0.08,
"max_ai_fail_rate": 5.0,
"max_transfer_rate": 45.0
},
"gray_release": {
"first_stage_percent": 5,
"second_stage_percent": 20,
"final_stage_percent": 100
}
}

View File

@@ -11,6 +11,8 @@ import asyncio
import random
import hashlib
import re
import json
from pathlib import Path
from typing import Optional, Dict, List, Any, Tuple
from datetime import datetime
from pydantic import BaseModel, Field
@@ -162,6 +164,7 @@ class CustomerServiceAgent:
C_TOOL = "\033[93m" # yellow
C_REPLY = "\033[92m" # green
C_MUTED = "\033[90m" # gray
_DEFAULT_EVOLUTION_CANDIDATE = Path("config") / "evolution_candidate.json"
def __init__(self, skills_dir: str = "skills"):
self.api_key = os.getenv("OPENAI_API_KEY")
@@ -175,6 +178,7 @@ class CustomerServiceAgent:
self.conversations: Dict[str, ConversationState] = {}
# 多轮对话历史PydanticAI ModelMessage 列表按客户ID存储
self.message_histories: Dict[str, list] = {}
self.evolution_candidate = self._load_evolution_candidate()
# 加载 skills 内容
self.skills_content = load_skill_md(skills_dir)
@@ -230,6 +234,64 @@ class CustomerServiceAgent:
# 注册工具
self._register_tools()
def _load_evolution_candidate(self) -> Dict[str, Any]:
"""读取自我进化候选配置(灰度策略),读取失败时返回空。"""
try:
path = Path(os.getenv("EVOLUTION_CANDIDATE_PATH", str(self._DEFAULT_EVOLUTION_CANDIDATE)))
if not path.exists():
return {}
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
return {}
return data
except Exception:
return {}
def _evolution_gray_percent(self) -> int:
"""灰度比例,默认 5%"""
try:
env_pct = os.getenv("EVOLUTION_GRAY_PERCENT", "").strip()
if env_pct:
pct = int(float(env_pct))
else:
pct = int(((self.evolution_candidate or {}).get("gray_percent", 5)))
return max(0, min(100, pct))
except Exception:
return 5
def _evolution_enabled_for_customer(self, customer_id: str) -> bool:
"""按客户哈希稳定灰度命中,命中后启用候选策略。"""
cand = self.evolution_candidate or {}
if str(cand.get("status", "")).strip() != "ready_for_gray_5_percent":
return False
if not customer_id:
return False
pct = self._evolution_gray_percent()
if pct <= 0:
return False
digest = hashlib.md5(customer_id.encode("utf-8")).hexdigest()
bucket = int(digest[:8], 16) % 100
hit = bucket < pct
if hit:
metrics_emit("evolution_gray_hit", customer_id=customer_id, percent=pct, version=str(cand.get("version", "")))
return hit
def _evolution_has_proposal(self, proposal_id: str) -> bool:
cand = self.evolution_candidate or {}
for p in cand.get("proposals", []) or []:
if str((p or {}).get("id", "")).strip() == proposal_id:
return True
return False
@staticmethod
def _is_service_risk_inquiry(text: str) -> bool:
"""识别退款/投诉等服务风险场景。"""
s = (text or "").strip().lower()
if not s:
return False
kw = ("退款", "退货", "投诉", "差评", "举报", "欺骗", "骗人", "起诉", "法院", "生气", "不满意")
return any(k in s for k in kw)
@staticmethod
def _log_block(title: str, content: str):
"""统一的控制台分层日志输出。"""
@@ -1637,6 +1699,17 @@ class CustomerServiceAgent:
transfer_msg = TRANSFER_MESSAGE
metrics_emit("transfer_to_human", customer_id=message.from_id, acc_id=message.acc_id)
# 自我进化候选策略灰度(默认 5%):风险投诉场景强制转人工,并补安抚话术
evo_hit = self._evolution_enabled_for_customer(message.from_id)
if evo_hit and self._is_service_risk_inquiry(message.msg):
if self._evolution_has_proposal("policy-risk-transfer"):
need_transfer = True
transfer_msg = TRANSFER_MESSAGE
metrics_emit("evolution_force_transfer", customer_id=message.from_id, acc_id=message.acc_id)
if self._evolution_has_proposal("tone-empathy-pack"):
reply_text = "抱歉让您不舒服了,这边先为您转接人工专员马上处理。"
metrics_emit("evolution_empathy_reply", customer_id=message.from_id, acc_id=message.acc_id)
# 未成交记录:客户表达放弃且已报价过(转人工不记录)
customer_text, _ = self._split_customer_text(message.msg)
no_convert_keywords = ["算了", "不要了", "不做了", "下次再说", "先不弄了"]
@@ -1649,6 +1722,8 @@ class CustomerServiceAgent:
# 需要转接时不把原始回复发给客户
should_reply = bool(reply_text and reply_text.strip()) and not need_transfer
if evo_hit and need_transfer and self._evolution_has_proposal("tone-empathy-pack"):
should_reply = True
# 记录本次回复时间,供冷却期判断
if should_reply:

2
evolution/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
"""Self-evolution MVP utilities for the customer service agent."""

591
evolution/mvp.py Normal file
View File

@@ -0,0 +1,591 @@
from __future__ import annotations
import json
import os
import sqlite3
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
ROOT = Path(__file__).resolve().parent.parent
ARTIFACT_DIR = ROOT / "evolution" / "artifacts"
DEFAULT_POLICY_PATH = ROOT / "config" / "evolution_policy.json"
DEFAULT_CANDIDATE_PATH = ROOT / "config" / "evolution_candidate.json"
RISK_KEYWORDS = (
"退款",
"退货",
"投诉",
"差评",
"举报",
"欺骗",
"骗人",
"不满意",
"生气",
"法院",
"起诉",
)
TRANSFER_HINTS = ("转人工", "人工", "为您转接", "专员", "稍后联系")
WEAK_REPLY_HINTS = ("不清楚", "不知道", "稍后", "晚点", "我再看下", "等会")
EMPATHY_HINTS = ("抱歉", "不好意思", "理解", "辛苦", "感谢反馈")
@dataclass
class Sample:
customer_id: str
acc_id: str
in_ts: str
in_text: str
out_ts: str
out_text: str
latency_sec: int
@dataclass
class Finding:
kind: str
severity: str
customer_id: str
acc_id: str
in_ts: str
in_text: str
out_text: str
detail: str
@dataclass
class ChatSourceConfig:
source: str = "auto" # auto | sqlite | mysql
sqlite_path: str = str(ROOT / "db" / "chat_log_db" / "chats.db")
mysql_host: str = os.getenv("MYSQL_HOST", "127.0.0.1")
mysql_port: int = int(os.getenv("MYSQL_PORT", "3306"))
mysql_user: str = os.getenv("MYSQL_USER", "root")
mysql_password: str = os.getenv("MYSQL_PASSWORD", "")
mysql_database: str = os.getenv("MYSQL_DATABASE", "ai_cs")
def _parse_ts(ts_text: str) -> Optional[datetime]:
if not ts_text:
return None
try:
return datetime.strptime(ts_text, "%Y-%m-%d %H:%M:%S")
except ValueError:
return None
def _to_ts_text(value: Any) -> str:
if isinstance(value, datetime):
return value.strftime("%Y-%m-%d %H:%M:%S")
if value is None:
return ""
return str(value)
def _iter_recent_conversations_sqlite(
cfg: ChatSourceConfig,
hours: int,
max_customers: int,
max_messages_per_customer: int,
) -> Iterable[Tuple[str, List[Dict[str, Any]]]]:
cutoff_dt = datetime.now() - timedelta(hours=hours)
cutoff_text = cutoff_dt.strftime("%Y-%m-%d %H:%M:%S")
db_path = Path(cfg.sqlite_path)
if not db_path.exists():
return
conn = sqlite3.connect(f"file:{db_path.as_posix()}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
try:
cur = conn.execute(
"""
SELECT customer_id, MAX(timestamp) AS last_ts
FROM chat_logs
WHERE timestamp >= ?
GROUP BY customer_id
ORDER BY last_ts DESC
LIMIT ?
""",
(cutoff_text, max_customers),
)
customers = [dict(r) for r in cur.fetchall()]
for c in customers:
customer_id = str(c.get("customer_id") or "").strip()
if not customer_id:
continue
rows_cur = conn.execute(
"""
SELECT direction, message, timestamp, acc_id
FROM chat_logs
WHERE customer_id = ? AND timestamp >= ?
ORDER BY timestamp ASC, id ASC
LIMIT ?
""",
(customer_id, cutoff_text, max_messages_per_customer),
)
rows = [dict(r) for r in rows_cur.fetchall()]
if rows:
yield customer_id, rows
finally:
conn.close()
def _iter_recent_conversations_mysql(
cfg: ChatSourceConfig,
hours: int,
max_customers: int,
max_messages_per_customer: int,
) -> Iterable[Tuple[str, List[Dict[str, Any]]]]:
try:
import pymysql
except Exception:
return
cutoff_dt = datetime.now() - timedelta(hours=hours)
try:
conn = pymysql.connect(
host=cfg.mysql_host,
port=cfg.mysql_port,
user=cfg.mysql_user,
password=cfg.mysql_password,
database=cfg.mysql_database,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)
except Exception:
return
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT customer_id, MAX(timestamp) AS last_ts
FROM chat_logs
WHERE timestamp >= %s
GROUP BY customer_id
ORDER BY last_ts DESC
LIMIT %s
""",
(cutoff_dt, max_customers),
)
customers = cur.fetchall() or []
for c in customers:
customer_id = str(c.get("customer_id") or "").strip()
if not customer_id:
continue
with conn.cursor() as cur:
cur.execute(
"""
SELECT direction, message, timestamp, acc_id
FROM chat_logs
WHERE customer_id = %s AND timestamp >= %s
ORDER BY timestamp ASC, id ASC
LIMIT %s
""",
(customer_id, cutoff_dt, max_messages_per_customer),
)
rows = cur.fetchall() or []
normalized = []
for r in rows:
normalized.append(
{
"direction": r.get("direction"),
"message": r.get("message"),
"timestamp": _to_ts_text(r.get("timestamp")),
"acc_id": r.get("acc_id"),
}
)
if normalized:
yield customer_id, normalized
finally:
conn.close()
def _iter_recent_conversations(
cfg: ChatSourceConfig,
hours: int,
max_customers: int,
max_messages_per_customer: int,
) -> Iterable[Tuple[str, List[Dict[str, Any]]]]:
source = (cfg.source or "auto").strip().lower()
if source == "sqlite":
yield from _iter_recent_conversations_sqlite(cfg, hours, max_customers, max_messages_per_customer)
return
if source == "mysql":
yield from _iter_recent_conversations_mysql(cfg, hours, max_customers, max_messages_per_customer)
return
# auto: prefer mysql when DB_TYPE=mysql, otherwise sqlite
db_type = os.getenv("DB_TYPE", "").strip().lower()
if db_type in ("mysql", "mariadb"):
got_any = False
for item in _iter_recent_conversations_mysql(cfg, hours, max_customers, max_messages_per_customer):
got_any = True
yield item
if got_any:
return
yield from _iter_recent_conversations_sqlite(cfg, hours, max_customers, max_messages_per_customer)
def build_samples(
hours: int = 24,
max_customers: int = 200,
max_messages_per_customer: int = 80,
chat_source: Optional[ChatSourceConfig] = None,
) -> List[Sample]:
cfg = chat_source or ChatSourceConfig()
samples: List[Sample] = []
for customer_id, rows in _iter_recent_conversations(
cfg=cfg,
hours=hours,
max_customers=max_customers,
max_messages_per_customer=max_messages_per_customer,
):
pending_in: Optional[Dict[str, Any]] = None
for row in rows:
direction = str(row.get("direction") or "")
if direction == "in":
pending_in = row
continue
if direction != "out" or pending_in is None:
continue
in_text = str(pending_in.get("message") or "").strip()
out_text = str(row.get("message") or "").strip()
if not in_text:
pending_in = None
continue
in_ts = _parse_ts(str(pending_in.get("timestamp") or ""))
out_ts = _parse_ts(str(row.get("timestamp") or ""))
latency = 0
if in_ts and out_ts:
latency = int((out_ts - in_ts).total_seconds())
samples.append(
Sample(
customer_id=customer_id,
acc_id=str(row.get("acc_id") or pending_in.get("acc_id") or ""),
in_ts=str(pending_in.get("timestamp") or ""),
in_text=in_text,
out_ts=str(row.get("timestamp") or ""),
out_text=out_text,
latency_sec=max(0, latency),
)
)
pending_in = None
return samples
def evaluate_samples(samples: List[Sample]) -> List[Finding]:
findings: List[Finding] = []
for s in samples:
in_text = s.in_text
out_text = s.out_text
inbound_risky = any(k in in_text for k in RISK_KEYWORDS)
if not out_text:
findings.append(
Finding(
kind="empty_reply",
severity="high",
customer_id=s.customer_id,
acc_id=s.acc_id,
in_ts=s.in_ts,
in_text=s.in_text,
out_text=s.out_text,
detail="收到消息但回复为空",
)
)
continue
if s.latency_sec > 600:
findings.append(
Finding(
kind="slow_reply",
severity="medium",
customer_id=s.customer_id,
acc_id=s.acc_id,
in_ts=s.in_ts,
in_text=s.in_text,
out_text=s.out_text,
detail=f"回复耗时 {s.latency_sec}s (>600s)",
)
)
if inbound_risky:
has_transfer = any(k in out_text for k in TRANSFER_HINTS)
has_empathy = any(k in out_text for k in EMPATHY_HINTS)
if not has_transfer:
findings.append(
Finding(
kind="risk_not_transferred",
severity="high",
customer_id=s.customer_id,
acc_id=s.acc_id,
in_ts=s.in_ts,
in_text=s.in_text,
out_text=s.out_text,
detail="高风险诉求未出现转人工提示",
)
)
if not has_empathy:
findings.append(
Finding(
kind="risk_no_empathy",
severity="medium",
customer_id=s.customer_id,
acc_id=s.acc_id,
in_ts=s.in_ts,
in_text=s.in_text,
out_text=s.out_text,
detail="高风险诉求回复缺少安抚语气",
)
)
if any(k in out_text for k in WEAK_REPLY_HINTS):
findings.append(
Finding(
kind="weak_reply",
severity="medium",
customer_id=s.customer_id,
acc_id=s.acc_id,
in_ts=s.in_ts,
in_text=s.in_text,
out_text=s.out_text,
detail="回复存在低置信度兜底话术",
)
)
return findings
def summarize_findings(findings: List[Finding]) -> Dict[str, Any]:
by_kind: Dict[str, int] = {}
by_severity: Dict[str, int] = {}
for f in findings:
by_kind[f.kind] = by_kind.get(f.kind, 0) + 1
by_severity[f.severity] = by_severity.get(f.severity, 0) + 1
return {"total": len(findings), "by_kind": by_kind, "by_severity": by_severity}
def make_proposals(findings: List[Finding], sample_count: int) -> List[Dict[str, Any]]:
summary = summarize_findings(findings)
by_kind = summary["by_kind"]
proposals: List[Dict[str, Any]] = []
if by_kind.get("risk_not_transferred", 0) > 0:
proposals.append(
{
"id": "policy-risk-transfer",
"priority": "p0",
"module": "policy/prompt",
"title": "风险关键词触发后强制转人工",
"suggestion": "在风险路由的系统提示词中增加硬规则:遇到退款/投诉/法律威胁类诉求必须调用 transfer_to_human。",
"evidence_count": by_kind["risk_not_transferred"],
}
)
if by_kind.get("risk_no_empathy", 0) > 0:
proposals.append(
{
"id": "tone-empathy-pack",
"priority": "p1",
"module": "policy/prompt",
"title": "高风险场景补充安抚模板",
"suggestion": "为投诉类回复追加一段安抚模板,降低激化概率。",
"evidence_count": by_kind["risk_no_empathy"],
}
)
if by_kind.get("weak_reply", 0) > 0:
proposals.append(
{
"id": "fallback-reduction",
"priority": "p1",
"module": "intent/router",
"title": "减少低置信度兜底话术",
"suggestion": "出现“不清楚/稍后”等兜底词时,优先触发澄清问题或转人工而非直接结束。",
"evidence_count": by_kind["weak_reply"],
}
)
if by_kind.get("slow_reply", 0) > 0:
proposals.append(
{
"id": "slow-path-timeout",
"priority": "p2",
"module": "tools/workflow",
"title": "慢链路超时与短回复兜底",
"suggestion": "当工具调用超过阈值时先发短确认回复,避免长时间无响应。",
"evidence_count": by_kind["slow_reply"],
}
)
proposals.append(
{
"id": "ops-regression-gate",
"priority": "p0",
"module": "eval/pipeline",
"title": "上线前回归门禁",
"suggestion": "新增候选策略必须在离线评测集上通过,再灰度 5% 流量后扩大。",
"evidence_count": sample_count,
}
)
return proposals
def load_policy(path: Path = DEFAULT_POLICY_PATH) -> Dict[str, Any]:
if not path.exists():
return {
"publish_gate": {
"min_sample_count": 30,
"max_high_findings_rate": 0.08,
"max_ai_fail_rate": 5.0,
"max_transfer_rate": 45.0,
}
}
return json.loads(path.read_text(encoding="utf-8"))
def can_publish_candidate(samples: List[Sample], findings: List[Finding], runtime_hours: int, policy: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]:
try:
from utils.metrics_tracker import get_runtime_summary
except Exception:
def get_runtime_summary(hours: int = 24) -> Dict[str, Any]:
return {"window_hours": hours, "counts": {}, "rates": {"ai_fail_rate": 0.0, "transfer_rate": 0.0}}
gate = (policy or {}).get("publish_gate", {})
min_sample_count = int(gate.get("min_sample_count", 30))
max_high_rate = float(gate.get("max_high_findings_rate", 0.08))
max_ai_fail_rate = float(gate.get("max_ai_fail_rate", 5.0))
max_transfer_rate = float(gate.get("max_transfer_rate", 45.0))
high_cnt = sum(1 for f in findings if f.severity == "high")
sample_count = max(1, len(samples))
high_rate = high_cnt / sample_count
runtime = get_runtime_summary(hours=runtime_hours)
ai_fail_rate = float(runtime.get("rates", {}).get("ai_fail_rate", 0.0))
transfer_rate = float(runtime.get("rates", {}).get("transfer_rate", 0.0))
reasons = []
ok = True
if len(samples) < min_sample_count:
ok = False
reasons.append(f"样本不足: {len(samples)} < {min_sample_count}")
if high_rate > max_high_rate:
ok = False
reasons.append(f"高危发现占比过高: {high_rate:.2%} > {max_high_rate:.2%}")
if ai_fail_rate > max_ai_fail_rate:
ok = False
reasons.append(f"AI失败率过高: {ai_fail_rate:.2f}% > {max_ai_fail_rate:.2f}%")
if transfer_rate > max_transfer_rate:
ok = False
reasons.append(f"转人工率过高: {transfer_rate:.2f}% > {max_transfer_rate:.2f}%")
return ok, {
"sample_count": len(samples),
"high_findings": high_cnt,
"high_findings_rate": round(high_rate, 4),
"runtime": runtime,
"policy_gate": gate,
"reasons": reasons,
}
def _write_json(path: Path, payload: Dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def _write_jsonl(path: Path, rows: Iterable[Dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
for row in rows:
f.write(json.dumps(row, ensure_ascii=False) + "\n")
def run_cycle(
hours: int = 24,
max_customers: int = 200,
max_messages_per_customer: int = 80,
runtime_hours: int = 24,
publish: bool = False,
chat_source: Optional[ChatSourceConfig] = None,
policy_path: Path = DEFAULT_POLICY_PATH,
candidate_path: Path = DEFAULT_CANDIDATE_PATH,
) -> Dict[str, Any]:
ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)
now_tag = datetime.now().strftime("%Y%m%d_%H%M%S")
source_error = ""
try:
samples = build_samples(
hours=hours,
max_customers=max_customers,
max_messages_per_customer=max_messages_per_customer,
chat_source=chat_source,
)
except Exception as e:
samples = []
source_error = str(e)
findings = evaluate_samples(samples)
proposals = make_proposals(findings=findings, sample_count=len(samples))
policy = load_policy(path=policy_path)
publish_ok, gate_report = can_publish_candidate(
samples=samples,
findings=findings,
runtime_hours=runtime_hours,
policy=policy,
)
sample_file = ARTIFACT_DIR / f"samples_{now_tag}.jsonl"
eval_file = ARTIFACT_DIR / f"eval_report_{now_tag}.json"
proposal_file = ARTIFACT_DIR / f"proposals_{now_tag}.json"
_write_jsonl(sample_file, (asdict(s) for s in samples))
_write_json(
eval_file,
{
"generated_at": datetime.now().isoformat(timespec="seconds"),
"sample_count": len(samples),
"finding_summary": summarize_findings(findings),
"publish_gate_report": gate_report,
},
)
_write_json(
proposal_file,
{
"generated_at": datetime.now().isoformat(timespec="seconds"),
"proposals": proposals,
},
)
published = False
candidate_payload: Dict[str, Any] = {}
if publish and publish_ok:
candidate_payload = {
"version": f"candidate-{now_tag}",
"created_at": datetime.now().isoformat(timespec="seconds"),
"sample_file": str(sample_file),
"eval_file": str(eval_file),
"proposal_file": str(proposal_file),
"gate_report": gate_report,
"proposals": proposals,
"status": "ready_for_gray_5_percent",
}
_write_json(candidate_path, candidate_payload)
published = True
source_view = asdict(chat_source) if chat_source else asdict(ChatSourceConfig())
if source_view.get("mysql_password"):
source_view["mysql_password"] = "***"
return {
"samples": len(samples),
"findings": len(findings),
"publish_ok": publish_ok,
"published": published,
"chat_source": source_view,
"source_error": source_error,
"artifacts": {
"samples": str(sample_file),
"evaluation": str(eval_file),
"proposals": str(proposal_file),
"candidate": str(candidate_path) if published else "",
},
"gate_report": gate_report,
"top_proposals": proposals[:3],
}

View File

@@ -0,0 +1,45 @@
# 自我进化 MVP可控版
目标:让客服 agent 持续变聪明,同时避免“自动改坏线上”。
## 1. 已落地能力
- 失败样本采集:从 `db/chat_log_db/chats.db` 抽取近 N 小时客服问答对。
- 离线评测:自动识别高风险未转人工、低置信度兜底、慢回复等问题。
- 改进建议生成:输出可执行的模块级 proposalprompt/router/workflow
- 发布门禁:结合运行指标(`config/.runtime_metrics.jsonl`)判断是否允许发布候选版本。
- 候选产物:通过门禁后写入 `config/evolution_candidate.json`,用于 5% 灰度。
## 2. 运行方式
```bash
python scripts/evolution_cycle.py --hours 24 --publish
```
默认即读取线上 MySQL`--source mysql`)。连接信息来自 `.env``MYSQL_*`
常用参数:
- `--max-customers 200`
- `--max-messages-per-customer 80`
- `--runtime-hours 24`
- `--policy-path config/evolution_policy.json`
## 3. 产物说明
运行后会在 `evolution/artifacts/` 生成:
- `samples_*.jsonl`:评测样本
- `eval_report_*.json`:评测摘要与门禁结果
- `proposals_*.json`:改进建议列表
`--publish` 且门禁通过时:
- 写入 `config/evolution_candidate.json`
- 状态标记为 `ready_for_gray_5_percent`
## 4. 下一步建议
-`scripts/evolution_cycle.py` 加入每日定时任务(例如凌晨 2 点)。
- 在灰度层接入 `evolution_candidate.json` 的版本号,按店铺或客户哈希做 5% 放量。
- 将 proposal 落地为具体 patch 后,先跑 `tests/` 回归,再扩大流量。

View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Self-evolution MVP cycle runner.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
load_dotenv(dotenv_path=PROJECT_ROOT / ".env")
from evolution.mvp import ChatSourceConfig, DEFAULT_CANDIDATE_PATH, DEFAULT_POLICY_PATH, run_cycle
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run self-evolution MVP cycle")
parser.add_argument(
"--source",
type=str,
default="mysql",
choices=["auto", "sqlite", "mysql"],
help="Chat data source, default mysql (online)",
)
parser.add_argument("--hours", type=int, default=24, help="Lookback window for chat samples")
parser.add_argument("--max-customers", type=int, default=200, help="Max customers sampled")
parser.add_argument(
"--max-messages-per-customer",
type=int,
default=80,
help="Max messages loaded per customer",
)
parser.add_argument("--runtime-hours", type=int, default=24, help="Runtime metric window")
parser.add_argument(
"--publish",
action="store_true",
help="Write config/evolution_candidate.json when gate passes",
)
parser.add_argument(
"--policy-path",
type=str,
default=str(DEFAULT_POLICY_PATH),
help="Path to evolution gate policy file",
)
parser.add_argument(
"--candidate-path",
type=str,
default=str(DEFAULT_CANDIDATE_PATH),
help="Path to candidate output file",
)
parser.add_argument("--db-path", type=str, default="", help="SQLite path when --source sqlite")
parser.add_argument("--mysql-host", type=str, default=os.getenv("MYSQL_HOST", "127.0.0.1"))
parser.add_argument("--mysql-port", type=int, default=int(os.getenv("MYSQL_PORT", "3306")))
parser.add_argument("--mysql-user", type=str, default=os.getenv("MYSQL_USER", "root"))
parser.add_argument("--mysql-password", type=str, default=os.getenv("MYSQL_PASSWORD", ""))
parser.add_argument("--mysql-database", type=str, default=os.getenv("MYSQL_DATABASE", "ai_cs"))
return parser.parse_args()
def main() -> int:
args = parse_args()
os.environ.setdefault("PYTHONUTF8", "1")
chat_source = ChatSourceConfig(
source=args.source,
sqlite_path=args.db_path or str(PROJECT_ROOT / "db" / "chat_log_db" / "chats.db"),
mysql_host=args.mysql_host,
mysql_port=args.mysql_port,
mysql_user=args.mysql_user,
mysql_password=args.mysql_password,
mysql_database=args.mysql_database,
)
result = run_cycle(
hours=args.hours,
max_customers=args.max_customers,
max_messages_per_customer=args.max_messages_per_customer,
runtime_hours=args.runtime_hours,
publish=args.publish,
chat_source=chat_source,
policy_path=Path(args.policy_path),
candidate_path=Path(args.candidate_path),
)
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,54 @@
import unittest
from unittest.mock import patch
from evolution.mvp import Finding, Sample, can_publish_candidate, evaluate_samples
class EvolutionMvpTest(unittest.TestCase):
def test_evaluate_detects_risk_without_transfer(self):
samples = [
Sample(
customer_id="c1",
acc_id="shop",
in_ts="2026-02-28 10:00:00",
in_text="我要投诉并退款,你们骗人",
out_ts="2026-02-28 10:00:10",
out_text="这个我不清楚,稍后再说",
latency_sec=10,
)
]
findings = evaluate_samples(samples)
kinds = {f.kind for f in findings}
self.assertIn("risk_not_transferred", kinds)
self.assertIn("weak_reply", kinds)
def test_publish_gate(self):
samples = [
Sample(
customer_id=f"c{i}",
acc_id="shop",
in_ts="2026-02-28 10:00:00",
in_text="你好",
out_ts="2026-02-28 10:00:05",
out_text="您好",
latency_sec=5,
)
for i in range(35)
]
findings: list[Finding] = []
policy = {
"publish_gate": {
"min_sample_count": 30,
"max_high_findings_rate": 0.1,
"max_ai_fail_rate": 5.0,
"max_transfer_rate": 45.0,
}
}
with patch("utils.metrics_tracker.get_runtime_summary", return_value={"rates": {"ai_fail_rate": 1.0, "transfer_rate": 10.0}}):
ok, report = can_publish_candidate(samples, findings, runtime_hours=24, policy=policy)
self.assertTrue(ok)
self.assertEqual(report["sample_count"], 35)
if __name__ == "__main__":
unittest.main(verbosity=2)