From abe5886b5d25da98b0875b47cc5612de9ff27b81 Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Sun, 1 Mar 2026 13:50:20 +0800 Subject: [PATCH] feat: add mysql-backed customer risk tools and manual do-not-serve gate --- core/pydantic_ai_agent.py | 119 +++++++++++++ db/customer_risk_db.py | 336 +++++++++++++++++++++++++++++++++++++ skills/risk-skill/SKILL.md | 11 ++ 3 files changed, 466 insertions(+) create mode 100644 db/customer_risk_db.py diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index 41ac84e..878c0ca 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -28,6 +28,7 @@ load_dotenv() from services.service_tuhui_upload import upload_to_tuhui from core.workflow_router import get_workflow_router from core.workflow_router import get_workflow_router +from db.customer_risk_db import risk_db # ========== 企业微信通知 ========== _WECHAT_WEBHOOK = os.getenv("WECHAT_WEBHOOK", "") @@ -673,6 +674,69 @@ class CustomerServiceAgent: """ return "TRANSFER_REQUESTED" + @self.agent.tool + async def get_customer_risk_profile(ctx: RunContext[AgentDeps], customer_id: str = "") -> str: + """查询客户风控画像:退款/不付款/差评/人工黑名单等。""" + cid = customer_id or ctx.deps.from_id + try: + info = risk_db.evaluate_customer(cid) + return ( + f"客户:{cid}\n" + f"不接单:{'是' if info.get('do_not_serve') else '否'}\n" + f"风险等级:{info.get('computed_level','low')} 分数:{info.get('computed_score',0)}\n" + f"近30天退款:{info.get('refund_30d',0)}\n" + f"近7天未付款下单:{info.get('unpaid_7d',0)}\n" + f"近90天差评:{info.get('bad_review_90d',0)}\n" + f"备注:{info.get('note','') or '无'}" + ) + except Exception as e: + return f"查询风控画像失败: {e}" + + @self.agent.tool + async def mark_customer_risk( + ctx: RunContext[AgentDeps], + customer_id: str, + do_not_serve: bool = False, + risk_level: str = "low", + risk_score: int = 0, + note: str = "", + tag: str = "", + ) -> str: + """人工标记客户风控画像(不接单/高风险/备注标签)。""" + try: + tags = [tag] if tag else [] + risk_db.set_profile( + customer_id=customer_id, + do_not_serve=do_not_serve, + risk_level=risk_level, + risk_score=risk_score, + note=note, + tags=tags, + ) + return "风控画像已更新" + except Exception as e: + return f"更新风控画像失败: {e}" + + @self.agent.tool + async def record_customer_risk_event( + ctx: RunContext[AgentDeps], + customer_id: str, + event_type: str, + event_count: int = 1, + note: str = "", + ) -> str: + """记录风控事件:refund/unpaid_order/bad_review/blacklist_hit 等。""" + try: + risk_db.record_event( + customer_id=customer_id, + event_type=event_type, + event_count=event_count, + note=note, + ) + return "风控事件已记录" + except Exception as e: + return f"记录风控事件失败: {e}" + @self.agent.tool async def save_customer_note( ctx: RunContext[AgentDeps], @@ -842,6 +906,46 @@ class CustomerServiceAgent: async def risk_filter(ctx: RunContext[AgentDeps], text: str = "") -> str: return "这类不做哈,政治/敏感内容都不接。" + @self.agent_risk.tool + async def get_customer_risk_profile_risk(ctx: RunContext[AgentDeps], customer_id: str = "") -> str: + return await get_customer_risk_profile(ctx, customer_id) + + @self.agent_risk.tool + async def mark_customer_risk_risk( + ctx: RunContext[AgentDeps], + customer_id: str, + do_not_serve: bool = False, + risk_level: str = "low", + risk_score: int = 0, + note: str = "", + tag: str = "", + ) -> str: + return await mark_customer_risk( + ctx=ctx, + customer_id=customer_id, + do_not_serve=do_not_serve, + risk_level=risk_level, + risk_score=risk_score, + note=note, + tag=tag, + ) + + @self.agent_risk.tool + async def record_customer_risk_event_risk( + ctx: RunContext[AgentDeps], + customer_id: str, + event_type: str, + event_count: int = 1, + note: str = "", + ) -> str: + return await record_customer_risk_event( + ctx=ctx, + customer_id=customer_id, + event_type=event_type, + event_count=event_count, + note=note, + ) + @self.agent.tool async def remove_background(ctx: RunContext[AgentDeps], image_url: str) -> str: try: @@ -1687,6 +1791,21 @@ class CustomerServiceAgent: # 前置风控:客户文本一旦命中政治/敏感询问,直接拒绝,避免“发图我看看”类答非所问 try: + # 人工风控:标记为不接单的客户直接转人工 + manual_risk = risk_db.evaluate_customer(message.from_id) + if bool(manual_risk.get("do_not_serve")): + self._activity_log( + "agent_manual_risk_reject", + customer_id=message.from_id, + risk=manual_risk, + ) + return AgentResponse( + reply="这边无法继续为你处理该类需求,给你转人工专员对接。", + should_reply=True, + need_transfer=True, + transfer_msg=TRANSFER_MESSAGE, + ) + from utils.content_filter import should_block_customer_smart risk_hit, risk_category, _risk_reason = await should_block_customer_smart(message.msg) map_hit = self._is_map_inquiry(message.msg) or (risk_category == "map") diff --git a/db/customer_risk_db.py b/db/customer_risk_db.py new file mode 100644 index 0000000..b65034d --- /dev/null +++ b/db/customer_risk_db.py @@ -0,0 +1,336 @@ +"""客户风控数据库(MySQL 优先,SQLite 兜底)""" +import os +import sqlite3 +import json +from datetime import datetime +from pathlib import Path +from typing import Dict, Any +from dotenv import load_dotenv + +load_dotenv() + +_DB_TYPE = os.getenv("DB_TYPE", "sqlite").lower() +_MYSQL_HOST = os.getenv("MYSQL_HOST", "127.0.0.1") +_MYSQL_PORT = int(os.getenv("MYSQL_PORT", "3306")) +_MYSQL_USER = os.getenv("MYSQL_USER", "root") +_MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "") +_MYSQL_DATABASE = os.getenv("MYSQL_DATABASE", "ai_cs") + + +def _is_mysql() -> bool: + return _DB_TYPE in ("mysql", "mariadb") + + +class CustomerRiskDB: + def __init__(self, sqlite_path: str = "db/customer_risk_db/risk.db"): + self.sqlite_path = Path(sqlite_path) + self.backend = "mysql" if _is_mysql() else "sqlite" + self._sqlite_in_memory = False + try: + self._ensure_db() + except Exception: + # MySQL 不可用时自动回退,避免主流程被数据库连接拖垮 + self.backend = "sqlite" + try: + self._ensure_sqlite_db() + except Exception: + # 最后兜底:内存 SQLite,保证模块可导入 + self._sqlite_in_memory = True + self._ensure_sqlite_db() + + def _get_mysql_conn(self): + import pymysql + return pymysql.connect( + host=_MYSQL_HOST, + port=_MYSQL_PORT, + user=_MYSQL_USER, + password=_MYSQL_PASSWORD, + database=_MYSQL_DATABASE, + charset="utf8mb4", + cursorclass=pymysql.cursors.DictCursor, + autocommit=False, + ) + + def _get_sqlite_conn(self): + if self._sqlite_in_memory: + conn = sqlite3.connect(":memory:") + else: + self.sqlite_path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(self.sqlite_path)) + conn.row_factory = sqlite3.Row + return conn + + def _ensure_db(self): + if self.backend == "mysql": + with self._get_mysql_conn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + CREATE TABLE IF NOT EXISTS customer_risk_profile ( + customer_id VARCHAR(128) PRIMARY KEY, + do_not_serve TINYINT(1) NOT NULL DEFAULT 0, + risk_level VARCHAR(16) NOT NULL DEFAULT 'low', + risk_score INT NOT NULL DEFAULT 0, + note TEXT, + tags_json TEXT, + updated_at DATETIME NOT NULL + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """ + ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS customer_risk_event ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + customer_id VARCHAR(128) NOT NULL, + event_type VARCHAR(32) NOT NULL, + event_count INT NOT NULL DEFAULT 1, + note TEXT, + created_at DATETIME NOT NULL, + INDEX idx_customer_time (customer_id, created_at), + INDEX idx_event_type (event_type) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """ + ) + conn.commit() + return + self._ensure_sqlite_db() + + def _ensure_sqlite_db(self): + with self._get_sqlite_conn() as conn: + cur = conn.cursor() + cur.execute( + """ + CREATE TABLE IF NOT EXISTS customer_risk_profile ( + customer_id TEXT PRIMARY KEY, + do_not_serve INTEGER NOT NULL DEFAULT 0, + risk_level TEXT NOT NULL DEFAULT 'low', + risk_score INTEGER NOT NULL DEFAULT 0, + note TEXT, + tags_json TEXT, + updated_at TEXT NOT NULL + ) + """ + ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS customer_risk_event ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + customer_id TEXT NOT NULL, + event_type TEXT NOT NULL, + event_count INTEGER NOT NULL DEFAULT 1, + note TEXT, + created_at TEXT NOT NULL + ) + """ + ) + cur.execute("CREATE INDEX IF NOT EXISTS idx_customer_time ON customer_risk_event(customer_id, created_at)") + cur.execute("CREATE INDEX IF NOT EXISTS idx_event_type ON customer_risk_event(event_type)") + conn.commit() + + def record_event(self, customer_id: str, event_type: str, event_count: int = 1, note: str = ""): + if not customer_id or not event_type: + return + now = datetime.now() + if self.backend == "mysql": + with self._get_mysql_conn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO customer_risk_event (customer_id, event_type, event_count, note, created_at) + VALUES (%s, %s, %s, %s, %s) + """, + (customer_id, event_type, int(max(1, event_count)), note, now.strftime("%Y-%m-%d %H:%M:%S")), + ) + conn.commit() + return + with self._get_sqlite_conn() as conn: + cur = conn.cursor() + cur.execute( + """ + INSERT INTO customer_risk_event (customer_id, event_type, event_count, note, created_at) + VALUES (?, ?, ?, ?, ?) + """, + (customer_id, event_type, int(max(1, event_count)), note, now.isoformat()), + ) + conn.commit() + + def set_profile( + self, + customer_id: str, + *, + do_not_serve: bool = False, + risk_level: str = "low", + risk_score: int = 0, + note: str = "", + tags: list | None = None, + ): + if not customer_id: + return + tags_json = json.dumps(tags or [], ensure_ascii=False) + now = datetime.now() + if self.backend == "mysql": + with self._get_mysql_conn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + REPLACE INTO customer_risk_profile + (customer_id, do_not_serve, risk_level, risk_score, note, tags_json, updated_at) + VALUES (%s, %s, %s, %s, %s, %s, %s) + """, + ( + customer_id, + 1 if do_not_serve else 0, + risk_level, + int(max(0, risk_score)), + note, + tags_json, + now.strftime("%Y-%m-%d %H:%M:%S"), + ), + ) + conn.commit() + return + with self._get_sqlite_conn() as conn: + cur = conn.cursor() + cur.execute( + """ + INSERT INTO customer_risk_profile + (customer_id, do_not_serve, risk_level, risk_score, note, tags_json, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(customer_id) DO UPDATE SET + do_not_serve=excluded.do_not_serve, + risk_level=excluded.risk_level, + risk_score=excluded.risk_score, + note=excluded.note, + tags_json=excluded.tags_json, + updated_at=excluded.updated_at + """, + ( + customer_id, + 1 if do_not_serve else 0, + risk_level, + int(max(0, risk_score)), + note, + tags_json, + now.isoformat(), + ), + ) + conn.commit() + + def _sum_events(self, customer_id: str, event_type: str, days: int) -> int: + if self.backend == "mysql": + with self._get_mysql_conn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT COALESCE(SUM(event_count), 0) AS total + FROM customer_risk_event + WHERE customer_id=%s + AND event_type=%s + AND created_at >= (NOW() - INTERVAL %s DAY) + """, + (customer_id, event_type, int(max(1, days))), + ) + row = cur.fetchone() or {} + return int(row.get("total") or 0) + with self._get_sqlite_conn() as conn: + cur = conn.cursor() + cur.execute( + """ + SELECT COALESCE(SUM(event_count), 0) AS total + FROM customer_risk_event + WHERE customer_id=? + AND event_type=? + AND created_at >= datetime('now', ?) + """, + (customer_id, event_type, f"-{int(max(1, days))} day"), + ) + row = cur.fetchone() + return int((row["total"] if row else 0) or 0) + + def get_profile(self, customer_id: str) -> Dict[str, Any]: + out = { + "customer_id": customer_id, + "do_not_serve": False, + "risk_level": "low", + "risk_score": 0, + "note": "", + "tags": [], + } + if self.backend == "mysql": + with self._get_mysql_conn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT customer_id, do_not_serve, risk_level, risk_score, note, tags_json + FROM customer_risk_profile + WHERE customer_id=%s + LIMIT 1 + """, + (customer_id,), + ) + row = cur.fetchone() + if not row: + return out + out.update( + { + "do_not_serve": bool(row.get("do_not_serve")), + "risk_level": str(row.get("risk_level") or "low"), + "risk_score": int(row.get("risk_score") or 0), + "note": str(row.get("note") or ""), + "tags": json.loads(row.get("tags_json") or "[]"), + } + ) + return out + with self._get_sqlite_conn() as conn: + cur = conn.cursor() + cur.execute( + """ + SELECT customer_id, do_not_serve, risk_level, risk_score, note, tags_json + FROM customer_risk_profile + WHERE customer_id=? + LIMIT 1 + """, + (customer_id,), + ) + row = cur.fetchone() + if not row: + return out + out.update( + { + "do_not_serve": bool(row["do_not_serve"]), + "risk_level": str(row["risk_level"] or "low"), + "risk_score": int(row["risk_score"] or 0), + "note": str(row["note"] or ""), + "tags": json.loads(row["tags_json"] or "[]"), + } + ) + return out + + def evaluate_customer(self, customer_id: str) -> Dict[str, Any]: + profile = self.get_profile(customer_id) + refund_30d = self._sum_events(customer_id, "refund", 30) + unpaid_7d = self._sum_events(customer_id, "unpaid_order", 7) + bad_review_90d = self._sum_events(customer_id, "bad_review", 90) + + score = int(profile.get("risk_score") or 0) + score += refund_30d * 20 + score += unpaid_7d * 8 + score += bad_review_90d * 15 + + level = "low" + if score >= 70: + level = "high" + elif score >= 35: + level = "medium" + + return { + **profile, + "refund_30d": refund_30d, + "unpaid_7d": unpaid_7d, + "bad_review_90d": bad_review_90d, + "computed_score": score, + "computed_level": level, + } + + +risk_db = CustomerRiskDB() diff --git a/skills/risk-skill/SKILL.md b/skills/risk-skill/SKILL.md index 198d56c..e805170 100644 --- a/skills/risk-skill/SKILL.md +++ b/skills/risk-skill/SKILL.md @@ -16,3 +16,14 @@ description: 风控拒绝技能,覆盖敏感内容拦截、拒绝边界和安 - 拒绝后若客户追问「能做吗/有吗」,保持一致,不反复改口。 - 不输出技术解释,不展开争论。 - 句子短、边界清晰、语气克制。 + +## 工具化风控(必须优先用工具) + +- 先调用 `get_customer_risk_profile` 看客户历史风控画像(退款/未付款/差评/是否不接单)。 +- 若人工确认该客户不接单,调用 `mark_customer_risk` 写入:`do_not_serve=true` + 备注原因。 +- 每次发生风险行为时调用 `record_customer_risk_event`: + - `refund`:客户退款/退款成功 + - `unpaid_order`:短时间多次下单不付款 + - `bad_review`:差评/威胁差评 + - `blacklist_hit`:命中黑名单场景 +- 规则:先写事件,再做回复或转人工,保证后续可追溯。