fix: reduce mysql connection pressure
This commit is contained in:
@@ -20,9 +20,11 @@ _MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "")
|
||||
_MYSQL_DATABASE = os.getenv("MYSQL_DATABASE", "ai_cs")
|
||||
|
||||
# ========== MySQL 连接池 ==========
|
||||
_POOL_SIZE = int(os.getenv("MYSQL_POOL_SIZE", "50"))
|
||||
_POOL_SIZE = int(os.getenv("MYSQL_POOL_SIZE", "10"))
|
||||
_POOL_WAIT_TIMEOUT = float(os.getenv("MYSQL_POOL_WAIT_TIMEOUT", "10"))
|
||||
_mysql_pool: Optional[Queue] = None
|
||||
_pool_lock = threading.Lock()
|
||||
_mysql_conn_count = 0
|
||||
|
||||
|
||||
def _create_mysql_conn():
|
||||
@@ -44,58 +46,65 @@ def _create_mysql_conn():
|
||||
|
||||
|
||||
def _init_mysql_pool():
|
||||
"""初始化 MySQL 连接池"""
|
||||
"""初始化 MySQL 连接池(懒创建,不在启动时预建满)"""
|
||||
global _mysql_pool
|
||||
with _pool_lock:
|
||||
if _mysql_pool is None:
|
||||
_mysql_pool = Queue(maxsize=_POOL_SIZE)
|
||||
for _ in range(_POOL_SIZE):
|
||||
try:
|
||||
conn = _create_mysql_conn()
|
||||
_mysql_pool.put(conn)
|
||||
except Exception:
|
||||
pass # 启动时连接失败不阻塞,后续会重建
|
||||
|
||||
|
||||
def _discard_conn(conn):
|
||||
"""丢弃失效连接并维护计数"""
|
||||
global _mysql_conn_count
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
with _pool_lock:
|
||||
if _mysql_conn_count > 0:
|
||||
_mysql_conn_count -= 1
|
||||
|
||||
|
||||
def _get_pooled_conn(timeout: float = 5.0):
|
||||
"""从连接池获取连接"""
|
||||
global _mysql_pool
|
||||
"""从连接池获取连接,达到上限后阻塞等待,不再额外扩容。"""
|
||||
global _mysql_pool, _mysql_conn_count
|
||||
if _mysql_pool is None:
|
||||
_init_mysql_pool()
|
||||
|
||||
|
||||
with _pool_lock:
|
||||
if _mysql_conn_count < _POOL_SIZE:
|
||||
conn = _create_mysql_conn()
|
||||
_mysql_conn_count += 1
|
||||
return conn
|
||||
|
||||
try:
|
||||
conn = _mysql_pool.get(timeout=timeout)
|
||||
# 检查连接是否有效
|
||||
try:
|
||||
conn.ping(reconnect=True)
|
||||
except Exception:
|
||||
# 连接失效,创建新连接
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
conn = _create_mysql_conn()
|
||||
_discard_conn(conn)
|
||||
with _pool_lock:
|
||||
if _mysql_conn_count < _POOL_SIZE:
|
||||
conn = _create_mysql_conn()
|
||||
_mysql_conn_count += 1
|
||||
return conn
|
||||
conn = _mysql_pool.get(timeout=timeout)
|
||||
conn.ping(reconnect=True)
|
||||
return conn
|
||||
except Empty:
|
||||
# 池空了,创建新连接(不放回池)
|
||||
return _create_mysql_conn()
|
||||
raise TimeoutError(f"MySQL连接池已耗尽(pool_size={_POOL_SIZE}, wait_timeout={timeout}s)")
|
||||
|
||||
|
||||
def _return_conn(conn):
|
||||
"""归还连接到池"""
|
||||
"""归还连接到池,失效连接直接丢弃。"""
|
||||
global _mysql_pool
|
||||
if _mysql_pool is None:
|
||||
return
|
||||
try:
|
||||
if _mysql_pool.qsize() < _POOL_SIZE:
|
||||
_mysql_pool.put_nowait(conn)
|
||||
else:
|
||||
conn.close()
|
||||
conn.ping(reconnect=False)
|
||||
_mysql_pool.put_nowait(conn)
|
||||
except Exception:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
_discard_conn(conn)
|
||||
|
||||
|
||||
class _CompatResult:
|
||||
@@ -164,7 +173,7 @@ def _get_conn(max_retries: int = 3, retry_delay: float = 0.5) -> sqlite3.Connect
|
||||
last_error = None
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
conn = _get_pooled_conn(timeout=5.0)
|
||||
conn = _get_pooled_conn(timeout=_POOL_WAIT_TIMEOUT)
|
||||
return _PyMySQLCompatConn(conn, use_pool=True)
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
|
||||
@@ -10,6 +10,7 @@ from typing import Optional, Dict, List
|
||||
from pathlib import Path
|
||||
from enum import Enum
|
||||
import os
|
||||
from db.chat_log_db import _get_pooled_conn, _return_conn
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
_DB_TYPE = os.getenv("DB_TYPE", "sqlite").lower()
|
||||
@@ -45,6 +46,19 @@ class TaskPriority(Enum):
|
||||
HIGH = "high"
|
||||
URGENT = "urgent"
|
||||
|
||||
|
||||
class _PooledMySQLConn:
|
||||
"""包装 pymysql 连接,close 时归还到共享连接池。"""
|
||||
|
||||
def __init__(self, conn):
|
||||
self._conn = conn
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._conn, name)
|
||||
|
||||
def close(self):
|
||||
_return_conn(self._conn)
|
||||
|
||||
class TaskManager:
|
||||
"""任务管理器 - SQLite 存储"""
|
||||
|
||||
@@ -139,17 +153,7 @@ class TaskManager:
|
||||
def _get_conn(self):
|
||||
"""获取数据库连接"""
|
||||
if _is_mysql():
|
||||
import pymysql
|
||||
return pymysql.connect(
|
||||
host=_MYSQL_HOST,
|
||||
port=_MYSQL_PORT,
|
||||
user=_MYSQL_USER,
|
||||
password=_MYSQL_PASSWORD,
|
||||
database=_MYSQL_DATABASE,
|
||||
charset="utf8mb4",
|
||||
cursorclass=pymysql.cursors.DictCursor,
|
||||
autocommit=False,
|
||||
)
|
||||
return _PooledMySQLConn(_get_pooled_conn())
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
Reference in New Issue
Block a user