Files
tw/utils/service_base.py
ZuoWei a6c42d505a feat: 完整功能部署 v1.0
新增功能:
- 天网协作系统 (HTTP API 端口 6060)
- 三种工作流 (查找图片/处理图片/转人工派单)
- 图片任务数据库 (支持客户后续增加需求)
- 图绘派单系统集成 (API: 8005)
- 文字检测与加价 (60-80 元高价值订单)
- 风险评估与接单判断
- 作图失败自动转人工

新增文档:
- 项目功能汇总.md
- 三种工作流功能说明.md
- 文字加价功能说明.md
- 风险评估功能说明.md
- 图片任务数据库功能说明.md
- 图绘派单系统集成说明.md
- 作图失败转接人工说明.md
- DEPLOYMENT.md
- TIANWANG_INTEGRATION.md

核心修改:
- core/pydantic_ai_agent.py
- core/workflow.py
- core/websocket_client.py
- image/image_analyzer.py
- services/service_tuhui_dispatch.py
- db/image_tasks_db.py

版本:v1.0
日期:2026-02-28
2026-02-28 11:20:40 +08:00

106 lines
3.1 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
服务基类 - 供 VectorizerService 等异步服务使用
"""
import logging
import asyncio
from dataclasses import dataclass
from typing import Any, Callable, TypeVar, Optional
import aiohttp
T = TypeVar("T")
@dataclass
class RetryConfig:
max_retries: int = 3
base_delay: float = 2.0
max_delay: float = 30.0
@dataclass
class TimeoutConfig:
connection_timeout: float = 60.0
read_timeout: float = 240.0
total_timeout: float = 1200.0
class ServiceError(Exception):
"""服务异常基类"""
pass
class ServiceTimeoutError(ServiceError):
"""超时异常"""
pass
class ServiceNetworkError(ServiceError):
"""网络异常"""
pass
class PollingMixin:
"""轮询混入 - 子类需实现 _is_task_complete, _is_task_failed, _get_error_message"""
def _is_task_complete(self, result: Any) -> bool:
raise NotImplementedError
def _is_task_failed(self, result: Any) -> bool:
raise NotImplementedError
def _get_error_message(self, result: Any) -> str:
raise NotImplementedError
class BaseService:
"""异步服务基类"""
def __init__(
self,
name: str = "BaseService",
base_url: str = "",
retry_config: Optional[RetryConfig] = None,
timeout_config: Optional[TimeoutConfig] = None,
):
self.name = name
self.base_url = base_url.rstrip("/")
self.retry_config = retry_config or RetryConfig()
self.timeout_config = timeout_config or TimeoutConfig()
self.logger = logging.getLogger(name)
self._session: Optional[aiohttp.ClientSession] = None
async def create_http_session(self) -> aiohttp.ClientSession:
"""创建 aiohttp 会话(不验证 SSL"""
connector = aiohttp.TCPConnector(ssl=False)
timeout = aiohttp.ClientTimeout(
connect=self.timeout_config.connection_timeout,
total=self.timeout_config.total_timeout,
)
return aiohttp.ClientSession(connector=connector, timeout=timeout)
async def execute_with_retry(
self,
func: Callable[..., Any],
*args,
error_context: str = "",
**kwargs,
) -> Any:
"""带重试的执行"""
last_error = None
for attempt in range(self.retry_config.max_retries + 1):
try:
return await func(*args, **kwargs)
except (ServiceTimeoutError, ServiceNetworkError):
raise
except Exception as e:
last_error = e
self.logger.warning(f"{attempt + 1}次尝试失败{error_context}: {e}")
if attempt < self.retry_config.max_retries:
delay = min(
self.retry_config.base_delay * (2 ** attempt),
self.retry_config.max_delay,
)
await asyncio.sleep(delay)
raise last_error or ServiceError("未知错误")
async def cleanup_failed_task(self, task_id: str) -> None:
"""清理失败任务(子类可覆盖)"""
self.logger.debug(f"清理任务: {task_id}")