# -*- coding: utf-8 -*- """ 健康检查 - 定时检测轻简/企微连接,断线时告警 """ import asyncio import logging import time from typing import Callable, Optional logger = logging.getLogger(__name__) # 状态 _qingjian_connected = False _wechat_ok = True _last_alert_at: dict[str, float] = {} _ALERT_COOLDOWN = 300 _start_ts = time.time() def set_qingjian_connected(ok: bool): """设置轻简连接状态(由 websocket_client 在连接/断开时调用)""" global _qingjian_connected _qingjian_connected = ok def set_wechat_ok(ok: bool): """设置企微可达状态""" global _wechat_ok _wechat_ok = ok async def _check_wechat() -> bool: """检测企微 Webhook 是否可达""" import httpx from config.config import WECHAT_WEBHOOK, HEALTH_CHECK_WECHAT_PING if not WECHAT_WEBHOOK: return True if not HEALTH_CHECK_WECHAT_PING: return True # 不主动 ping,避免刷屏 try: async with httpx.AsyncClient(timeout=5) as client: resp = await client.post(WECHAT_WEBHOOK, json={"msgtype": "text", "text": {"content": "ok"}}) return resp.status_code == 200 except Exception as e: logger.warning(f"企微健康检查失败: {e}") return False async def _send_alert(title: str, content: str): """发送告警到企微""" from config.config import WECHAT_WEBHOOK import time global _last_alert_at now = time.time() if now - _last_alert_at.get(title, 0) < _ALERT_COOLDOWN: return _last_alert_at[title] = now if not WECHAT_WEBHOOK: logger.warning(f"[健康检查] {title}: {content}") return try: import httpx async with httpx.AsyncClient(timeout=10) as client: await client.post(WECHAT_WEBHOOK, json={ "msgtype": "markdown", "markdown": {"content": f"⚠️ **{title}**\n{content}"} }) logger.info(f"[健康检查] 已发送告警: {title}") except Exception as e: logger.warning(f"[健康检查] 告警发送失败: {e}") async def run_health_check(get_qingjian_status: Optional[Callable[[], bool]] = None): """ 执行一次健康检查。 get_qingjian_status: 返回轻简是否已连接的函数 """ from config.config import HEALTH_CHECK_INTERVAL, WECHAT_WEBHOOK, HEALTH_CHECK_STARTUP_GRACE, HEALTH_CHECK_QINGJIAN_ALERTS_ENABLED global _qingjian_connected, _wechat_ok # 轻简 if get_qingjian_status: qj_ok = get_qingjian_status() if not qj_ok: if time.time() - _start_ts >= HEALTH_CHECK_STARTUP_GRACE and HEALTH_CHECK_QINGJIAN_ALERTS_ENABLED: if _qingjian_connected: await _send_alert("轻简连接断开", "WebSocket 已断开,请检查轻简软件是否运行。") else: await _send_alert("轻简未连接", "无法连接轻简 API,请确认轻简软件已启动在 ws://127.0.0.1:9528") _qingjian_connected = qj_ok # 企微 wechat_ok = await _check_wechat() if not wechat_ok and _wechat_ok and WECHAT_WEBHOOK: await _send_alert("企微不可达", "企业微信 Webhook 无法访问,告警将无法送达。") _wechat_ok = wechat_ok # API 成本预算告警 try: from utils.api_cost_tracker import check_budget_alert await check_budget_alert() except Exception as e: logger.debug(f"[健康检查] 成本告警跳过: {e}") async def health_check_loop(get_qingjian_status: Optional[Callable[[], bool]] = None): """健康检查循环""" from config.config import HEALTH_CHECK_INTERVAL while True: try: await run_health_check(get_qingjian_status) except Exception as e: logger.warning(f"[健康检查] 异常: {e}") await asyncio.sleep(HEALTH_CHECK_INTERVAL)