Files
DP/temp_backup/AdminTool_redundant/test_db_connection.py
zuowei1216 12395d8eca newrun
2025-12-30 14:46:22 +08:00

128 lines
4.2 KiB
Python

# -*- coding: utf-8 -*-
"""
测试数据库连接和 app_deployments 表(通过 SSH 隧道)
"""
import json
import pymysql
from datetime import datetime
import sys
import io
from sshtunnel import SSHTunnelForwarder
# 设置 stdout 为 UTF-8
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
def load_config():
with open('deploy_config.json', 'r', encoding='utf-8') as f:
return json.load(f)
def test_connection():
print("="*60)
print("测试 MySQL 数据库连接(通过 SSH 隧道)")
print("="*60)
config = load_config()
mysql_config = config['mysql']
print(f"\n📋 SSH 服务器:")
print(f" 地址: {config['host']}")
print(f" 端口: {config['port']}")
print(f" 用户: {config['username']}")
print(f"\n📋 MySQL 配置:")
print(f" 主机: {mysql_config['host']}")
print(f" 端口: {mysql_config['port']}")
print(f" 用户: {mysql_config['username']}")
print(f" 数据库: {mysql_config['database']}")
tunnel = None
try:
print("\n🔌 创建 SSH 隧道...")
tunnel = SSHTunnelForwarder(
(config['host'], int(config.get('port', 22))),
ssh_username=config['username'],
ssh_password=config['password'],
remote_bind_address=('127.0.0.1', int(mysql_config.get('port', 3306)))
)
tunnel.start()
print(f"✅ SSH 隧道已建立,本地端口: {tunnel.local_bind_port}")
print("\n🔌 连接 MySQL...")
conn = pymysql.connect(
host='127.0.0.1',
port=tunnel.local_bind_port,
user=mysql_config['username'],
password=mysql_config['password'],
database=mysql_config['database'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
print("✅ MySQL 连接成功!")
# 测试创建表
print("\n📝 创建/检查 app_deployments 表...")
with conn.cursor() as cursor:
sql = """
CREATE TABLE IF NOT EXISTS app_deployments (
id INT PRIMARY KEY AUTO_INCREMENT,
version VARCHAR(50) UNIQUE NOT NULL,
deployed_at DATETIME NOT NULL,
is_current BOOLEAN DEFAULT FALSE,
file_size_mb DECIMAL(10, 2),
comment VARCHAR(500),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
cursor.execute(sql)
conn.commit()
print("✅ 表已就绪")
# 查看表结构
print("\n📊 表结构:")
with conn.cursor() as cursor:
cursor.execute("DESCRIBE app_deployments")
for row in cursor.fetchall():
print(f" {row['Field']}: {row['Type']} {'(主键)' if row['Key'] == 'PRI' else ''}")
# 查看现有数据
print("\n📚 现有部署记录:")
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM app_deployments ORDER BY deployed_at DESC")
records = cursor.fetchall()
if records:
for i, rec in enumerate(records, 1):
status = "✅ 当前" if rec['is_current'] else ""
print(f"\n [{i}] {rec['version']} {status}")
print(f" 部署时间: {rec['deployed_at']}")
print(f" 大小: {rec['file_size_mb']} MB")
print(f" 备注: {rec['comment'] or ''}")
else:
print(" (暂无记录)")
conn.close()
print("\n" + "="*60)
print("✅ 测试完成!数据库一切正常")
print("="*60)
except Exception as e:
print(f"\n❌ 测试失败: {e}")
print("\n请检查:")
print(" 1. SSH 服务器是否可访问")
print(" 2. deploy_config.json 配置是否正确")
print(" 3. 服务器上 MySQL 服务是否运行")
print(" 4. 数据库用户权限是否足够")
return False
finally:
if tunnel:
tunnel.stop()
print("\n🔌 SSH 隧道已关闭")
return True
if __name__ == "__main__":
test_connection()