Files
tw/utils/health_check.py
ZuoWei a6c42d505a feat: 完整功能部署 v1.0
新增功能:
- 天网协作系统 (HTTP API 端口 6060)
- 三种工作流 (查找图片/处理图片/转人工派单)
- 图片任务数据库 (支持客户后续增加需求)
- 图绘派单系统集成 (API: 8005)
- 文字检测与加价 (60-80 元高价值订单)
- 风险评估与接单判断
- 作图失败自动转人工

新增文档:
- 项目功能汇总.md
- 三种工作流功能说明.md
- 文字加价功能说明.md
- 风险评估功能说明.md
- 图片任务数据库功能说明.md
- 图绘派单系统集成说明.md
- 作图失败转接人工说明.md
- DEPLOYMENT.md
- TIANWANG_INTEGRATION.md

核心修改:
- core/pydantic_ai_agent.py
- core/workflow.py
- core/websocket_client.py
- image/image_analyzer.py
- services/service_tuhui_dispatch.py
- db/image_tasks_db.py

版本:v1.0
日期:2026-02-28
2026-02-28 11:20:40 +08:00

115 lines
3.8 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
健康检查 - 定时检测轻简/企微连接,断线时告警
"""
import asyncio
import logging
import time
from typing import Callable, Optional
logger = logging.getLogger(__name__)
# 状态
_qingjian_connected = False
_wechat_ok = True
_last_alert_at: dict[str, float] = {}
_ALERT_COOLDOWN = 300
_start_ts = time.time()
def set_qingjian_connected(ok: bool):
"""设置轻简连接状态(由 websocket_client 在连接/断开时调用)"""
global _qingjian_connected
_qingjian_connected = ok
def set_wechat_ok(ok: bool):
"""设置企微可达状态"""
global _wechat_ok
_wechat_ok = ok
async def _check_wechat() -> bool:
"""检测企微 Webhook 是否可达"""
import httpx
from config.config import WECHAT_WEBHOOK, HEALTH_CHECK_WECHAT_PING
if not WECHAT_WEBHOOK:
return True
if not HEALTH_CHECK_WECHAT_PING:
return True # 不主动 ping避免刷屏
try:
async with httpx.AsyncClient(timeout=5) as client:
resp = await client.post(WECHAT_WEBHOOK, json={"msgtype": "text", "text": {"content": "ok"}})
return resp.status_code == 200
except Exception as e:
logger.warning(f"企微健康检查失败: {e}")
return False
async def _send_alert(title: str, content: str):
"""发送告警到企微"""
from config.config import WECHAT_WEBHOOK
import time
global _last_alert_at
now = time.time()
if now - _last_alert_at.get(title, 0) < _ALERT_COOLDOWN:
return
_last_alert_at[title] = now
if not WECHAT_WEBHOOK:
logger.warning(f"[健康检查] {title}: {content}")
return
try:
import httpx
async with httpx.AsyncClient(timeout=10) as client:
await client.post(WECHAT_WEBHOOK, json={
"msgtype": "markdown",
"markdown": {"content": f"⚠️ **{title}**\n{content}"}
})
logger.info(f"[健康检查] 已发送告警: {title}")
except Exception as e:
logger.warning(f"[健康检查] 告警发送失败: {e}")
async def run_health_check(get_qingjian_status: Optional[Callable[[], bool]] = None):
"""
执行一次健康检查。
get_qingjian_status: 返回轻简是否已连接的函数
"""
from config.config import HEALTH_CHECK_INTERVAL, WECHAT_WEBHOOK, HEALTH_CHECK_STARTUP_GRACE, HEALTH_CHECK_QINGJIAN_ALERTS_ENABLED
global _qingjian_connected, _wechat_ok
# 轻简
if get_qingjian_status:
qj_ok = get_qingjian_status()
if not qj_ok:
if time.time() - _start_ts >= HEALTH_CHECK_STARTUP_GRACE and HEALTH_CHECK_QINGJIAN_ALERTS_ENABLED:
if _qingjian_connected:
await _send_alert("轻简连接断开", "WebSocket 已断开,请检查轻简软件是否运行。")
else:
await _send_alert("轻简未连接", "无法连接轻简 API请确认轻简软件已启动在 ws://127.0.0.1:9528")
_qingjian_connected = qj_ok
# 企微
wechat_ok = await _check_wechat()
if not wechat_ok and _wechat_ok and WECHAT_WEBHOOK:
await _send_alert("企微不可达", "企业微信 Webhook 无法访问,告警将无法送达。")
_wechat_ok = wechat_ok
# API 成本预算告警
try:
from utils.api_cost_tracker import check_budget_alert
await check_budget_alert()
except Exception as e:
logger.debug(f"[健康检查] 成本告警跳过: {e}")
async def health_check_loop(get_qingjian_status: Optional[Callable[[], bool]] = None):
"""健康检查循环"""
from config.config import HEALTH_CHECK_INTERVAL
while True:
try:
await run_health_check(get_qingjian_status)
except Exception as e:
logger.warning(f"[健康检查] 异常: {e}")
await asyncio.sleep(HEALTH_CHECK_INTERVAL)