# -*- coding: utf-8 -*- """ 轻量指标采集器 - 事件落盘到 JSONL - 提供近 N 小时聚合,给 /api/metrics 使用 """ from __future__ import annotations import json from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Any ROOT = Path(__file__).resolve().parent.parent METRICS_FILE = ROOT / "config" / ".runtime_metrics.jsonl" def _now_iso() -> str: return datetime.now().isoformat(timespec="seconds") def emit(event: str, **fields: Any): """记录一条事件,失败不抛异常。""" try: METRICS_FILE.parent.mkdir(parents=True, exist_ok=True) payload = {"ts": _now_iso(), "event": event} payload.update(fields or {}) with METRICS_FILE.open("a", encoding="utf-8") as f: f.write(json.dumps(payload, ensure_ascii=False) + "\n") except Exception: pass def _iter_recent(hours: int = 24): if not METRICS_FILE.exists(): return [] cutoff = datetime.now() - timedelta(hours=hours) items = [] try: with METRICS_FILE.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) ts = datetime.fromisoformat(obj.get("ts", "")) except Exception: continue if ts >= cutoff: items.append(obj) except Exception: return [] return items def get_runtime_summary(hours: int = 24) -> Dict[str, Any]: """近 N 小时运行指标。""" rows = _iter_recent(hours=hours) counts: Dict[str, int] = {} for r in rows: e = r.get("event", "unknown") counts[e] = counts.get(e, 0) + 1 inbound = counts.get("inbound_msg", 0) transfer = counts.get("transfer_to_human", 0) quote = counts.get("quote_generated", 0) ai_fail = counts.get("ai_call_failed", 0) gemini_req = counts.get("gemini_request", 0) no_image = counts.get("gemini_no_image", 0) return { "window_hours": hours, "counts": counts, "rates": { "transfer_rate": round((transfer / inbound) * 100, 2) if inbound else 0.0, "quote_rate": round((quote / inbound) * 100, 2) if inbound else 0.0, "ai_fail_rate": round((ai_fail / inbound) * 100, 2) if inbound else 0.0, "no_image_rate": round((no_image / gemini_req) * 100, 2) if gemini_req else 0.0, }, } def get_dashboard(hours: int = 24) -> Dict[str, Any]: """业务+运行看板聚合。""" runtime = get_runtime_summary(hours=hours) try: from db.deal_outcome_db import get_daily_summary daily = get_daily_summary() deal_ok = int(daily.get("成交数", 0)) deal_fail = int(daily.get("未成交数", 0)) total = deal_ok + deal_fail conversion = round((deal_ok / total) * 100, 2) if total else 0.0 except Exception: deal_ok = deal_fail = 0 conversion = 0.0 return { "runtime": runtime, "business": { "deal_success": deal_ok, "deal_fail": deal_fail, "conversion_rate": conversion, }, }