Files
tw/utils/api_cost_tracker.py
ZuoWei a6c42d505a feat: 完整功能部署 v1.0
新增功能:
- 天网协作系统 (HTTP API 端口 6060)
- 三种工作流 (查找图片/处理图片/转人工派单)
- 图片任务数据库 (支持客户后续增加需求)
- 图绘派单系统集成 (API: 8005)
- 文字检测与加价 (60-80 元高价值订单)
- 风险评估与接单判断
- 作图失败自动转人工

新增文档:
- 项目功能汇总.md
- 三种工作流功能说明.md
- 文字加价功能说明.md
- 风险评估功能说明.md
- 图片任务数据库功能说明.md
- 图绘派单系统集成说明.md
- 作图失败转接人工说明.md
- DEPLOYMENT.md
- TIANWANG_INTEGRATION.md

核心修改:
- core/pydantic_ai_agent.py
- core/workflow.py
- core/websocket_client.py
- image/image_analyzer.py
- services/service_tuhui_dispatch.py
- db/image_tasks_db.py

版本:v1.0
日期:2026-02-28
2026-02-28 11:20:40 +08:00

96 lines
2.9 KiB
Python
Executable File

# -*- coding: utf-8 -*-
"""
API 成本统计 - 记录调用成本,超预算告警
"""
import os
import json
import logging
from pathlib import Path
from datetime import datetime
from typing import Optional
logger = logging.getLogger(__name__)
ROOT = Path(__file__).resolve().parent.parent
COST_FILE = ROOT / "config" / ".api_cost.json"
BUDGET_DAILY = float(os.getenv("API_COST_BUDGET_DAILY", "0")) # 0=不限制
BUDGET_MONTHLY = float(os.getenv("API_COST_BUDGET_MONTHLY", "0"))
# 单次调用预估成本(元,可按实际调整)
COST_PER_CALL = {
"gemini_extract": 0.02,
"gemini_qa": 0.01,
"gemini_vision": 0.015,
"openai_chat": 0.02,
"openai_embedding": 0.001,
"qwen_enhance": 0.05,
}
def _load() -> dict:
if not COST_FILE.exists():
return {"daily": {}, "monthly": {}}
try:
with open(COST_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {"daily": {}, "monthly": {}}
def _save(data: dict):
COST_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(COST_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def record(service: str, count: int = 1, cost: Optional[float] = None):
"""记录一次 API 调用"""
c = cost if cost is not None else COST_PER_CALL.get(service, 0.01) * count
today = datetime.now().strftime("%Y-%m-%d")
month = datetime.now().strftime("%Y-%m")
data = _load()
data["daily"][today] = data["daily"].get(today, 0) + c
data["monthly"][month] = data["monthly"].get(month, 0) + c
_save(data)
return c
def get_today_cost() -> float:
today = datetime.now().strftime("%Y-%m-%d")
data = _load()
return data["daily"].get(today, 0)
def get_month_cost() -> float:
month = datetime.now().strftime("%Y-%m")
data = _load()
return data["monthly"].get(month, 0)
async def check_budget_alert():
"""检查是否超预算,超则企微告警"""
if BUDGET_DAILY <= 0 and BUDGET_MONTHLY <= 0:
return
try:
from config.config import WECHAT_WEBHOOK
if not WECHAT_WEBHOOK:
return
import httpx
today = get_today_cost()
month = get_month_cost()
msg_parts = []
if BUDGET_DAILY > 0 and today >= BUDGET_DAILY:
msg_parts.append(f"⚠️ 今日 API 成本 {today:.2f} 元 ≥ 预算 {BUDGET_DAILY}")
if BUDGET_MONTHLY > 0 and month >= BUDGET_MONTHLY:
msg_parts.append(f"⚠️ 本月 API 成本 {month:.2f} 元 ≥ 预算 {BUDGET_MONTHLY}")
if not msg_parts:
return
async with httpx.AsyncClient(timeout=10) as client:
await client.post(WECHAT_WEBHOOK, json={
"msgtype": "markdown",
"markdown": {"content": "\n".join(msg_parts)}
})
logger.info(f"[成本] 告警已发送: {msg_parts}")
except Exception as e:
logger.warning(f"[成本] 告警失败: {e}")