feat: 添加 AI Agent 对话测试工具 + 代码优化
主要变更: - 新增 tests/test_ai_chat.py: AI Agent 对话测试工具 - 优化 core/pydantic_ai_agent.py 和 db/chat_log_db.py - 清理归档文件,更新文档 Made-with: Cursor
This commit is contained in:
313
tests/test_ai_chat.py
Normal file
313
tests/test_ai_chat.py
Normal file
@@ -0,0 +1,313 @@
|
||||
"""
|
||||
AI Agent 对话测试脚本
|
||||
从数据库加载聊天记录,测试 AI 回复效果
|
||||
"""
|
||||
import sqlite3
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
|
||||
# 颜色代码
|
||||
COLORS = {
|
||||
'header': '\033[95m\033[1m',
|
||||
'customer': '\033[94m',
|
||||
'agent': '\033[92m',
|
||||
'system': '\033[90m',
|
||||
'price': '\033[93m',
|
||||
'error': '\033[91m',
|
||||
'cyan': '\033[96m',
|
||||
'reset': '\033[0m',
|
||||
}
|
||||
|
||||
def cprint(text, color='reset'):
|
||||
print(f"{COLORS.get(color, '')}{text}{COLORS['reset']}")
|
||||
|
||||
def check_database():
|
||||
"""检查数据库内容"""
|
||||
db_path = 'db/chat_log_db/chats.db'
|
||||
try:
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.execute("SELECT COUNT(*) FROM chat_logs")
|
||||
count = cursor.fetchone()[0]
|
||||
|
||||
if count == 0:
|
||||
cprint(f"\n✗ 数据库为空,没有聊天记录", 'error')
|
||||
cprint("提示:需要先有一些聊天记录才能测试", 'system')
|
||||
conn.close()
|
||||
return None
|
||||
|
||||
cprint(f"\n✓ 数据库连接成功!共 {count} 条聊天记录", 'system')
|
||||
|
||||
# 获取客户列表
|
||||
cursor = conn.execute("""
|
||||
SELECT customer_id, customer_name, COUNT(*) as cnt, MAX(timestamp) as last
|
||||
FROM chat_logs
|
||||
GROUP BY customer_id
|
||||
ORDER BY cnt DESC
|
||||
LIMIT 20
|
||||
""")
|
||||
customers = cursor.fetchall()
|
||||
|
||||
cprint(f"\n找到 {len(customers)} 个客户:", 'cyan')
|
||||
for i, (cid, name, cnt, last) in enumerate(customers, 1):
|
||||
cprint(f" {i:2d}. {name or cid:30s} | {cnt:4d}条 | 最后:{last}", 'customer')
|
||||
|
||||
conn.close()
|
||||
return customers
|
||||
|
||||
except Exception as e:
|
||||
cprint(f"\n✗ 数据库检查失败:{e}", 'error')
|
||||
return None
|
||||
|
||||
async def test_customer_conversation(customer_id, customer_name, limit=5):
|
||||
"""测试某个客户的对话"""
|
||||
cprint(f"\n{'='*70}", 'cyan')
|
||||
cprint(f"测试客户:{customer_name or customer_id}", 'header')
|
||||
cprint(f"{'='*70}\n", 'cyan')
|
||||
|
||||
# 获取对话记录
|
||||
conn = sqlite3.connect('db/chat_log_db/chats.db')
|
||||
cursor = conn.execute("""
|
||||
SELECT direction, message, timestamp
|
||||
FROM chat_logs
|
||||
WHERE customer_id = ?
|
||||
ORDER BY timestamp ASC
|
||||
LIMIT ?
|
||||
""", (customer_id, limit))
|
||||
conversations = cursor.fetchall()
|
||||
conn.close()
|
||||
|
||||
if not conversations:
|
||||
cprint(" 该客户没有对话记录", 'system')
|
||||
return
|
||||
|
||||
# 初始化 AI Agent
|
||||
try:
|
||||
from core.pydantic_ai_agent import CustomerServiceAgent, CustomerMessage
|
||||
agent = CustomerServiceAgent(skills_dir="skills")
|
||||
cprint("✓ AI Agent 已加载", 'system')
|
||||
except Exception as e:
|
||||
cprint(f"✗ AI Agent 加载失败:{e}", 'error')
|
||||
return
|
||||
|
||||
# 模拟对话
|
||||
for i, (direction, message, timestamp) in enumerate(conversations, 1):
|
||||
if direction == 'in':
|
||||
# 客户消息
|
||||
cprint(f"\n【消息 {i}/{len(conversations)}】{timestamp}", 'system')
|
||||
cprint(f"客户:{message}", 'customer')
|
||||
|
||||
# 创建测试消息
|
||||
test_msg = CustomerMessage(
|
||||
msg_id=f"test_{i}",
|
||||
acc_id="test_shop",
|
||||
msg=message,
|
||||
from_id=customer_id,
|
||||
from_name=customer_name or "测试",
|
||||
cy_id=customer_id,
|
||||
acc_type="AliWorkbench",
|
||||
msg_type=0,
|
||||
cy_name=customer_name or "测试",
|
||||
goods_name="专业找图",
|
||||
goods_order=""
|
||||
)
|
||||
|
||||
# 获取 AI 回复
|
||||
start = datetime.now()
|
||||
try:
|
||||
response = await agent.process_message(test_msg)
|
||||
elapsed = (datetime.now() - start).total_seconds() * 1000
|
||||
|
||||
if response.should_reply:
|
||||
cprint(f"AI [{elapsed:.0f}ms]: {response.reply}", 'agent')
|
||||
|
||||
# 检测特殊内容
|
||||
if any(kw in response.reply for kw in ['元', '块', '价格']):
|
||||
cprint(" ↳ [价格信息]", 'price')
|
||||
if response.need_transfer:
|
||||
cprint(" ↳ [转人工]", 'error')
|
||||
else:
|
||||
cprint("[AI 静默]", 'system')
|
||||
|
||||
except Exception as e:
|
||||
cprint(f"✗ AI 回复失败:{e}", 'error')
|
||||
|
||||
elif direction == 'out':
|
||||
cprint(f"\n[历史回复] {timestamp}", 'system')
|
||||
cprint(f"客服:{message}", 'system')
|
||||
|
||||
cprint(f"\n{'='*70}", 'cyan')
|
||||
|
||||
async def test_all_customers(customers, limit_per_customer=5):
|
||||
"""批量测试所有客户"""
|
||||
cprint(f"\n{'='*70}", 'header')
|
||||
cprint(f" 开始批量测试 {len(customers)} 个客户", 'header')
|
||||
cprint(f" 每个客户测试前 {limit_per_customer} 条消息", 'header')
|
||||
cprint(f"{'='*70}\n", 'header')
|
||||
|
||||
total_msgs = 0
|
||||
total_replies = 0
|
||||
|
||||
for i, (cid, name, cnt, _) in enumerate(customers, 1):
|
||||
cprint(f"\n\n{'='*70}", 'cyan')
|
||||
cprint(f"进度:{i}/{len(customers)} - {name or cid} ({cnt}条消息)", 'cyan')
|
||||
cprint(f"{'='*70}", 'cyan')
|
||||
|
||||
if cnt == 0:
|
||||
cprint(" 跳过(无消息记录)", 'system')
|
||||
continue
|
||||
|
||||
# 获取对话记录
|
||||
conn = sqlite3.connect('db/chat_log_db/chats.db')
|
||||
cursor = conn.execute("""
|
||||
SELECT direction, message, timestamp
|
||||
FROM chat_logs
|
||||
WHERE customer_id = ?
|
||||
ORDER BY timestamp ASC
|
||||
LIMIT ?
|
||||
""", (cid, limit_per_customer))
|
||||
conversations = cursor.fetchall()
|
||||
conn.close()
|
||||
|
||||
# 初始化 AI Agent(只初始化一次)
|
||||
try:
|
||||
from core.pydantic_ai_agent import CustomerServiceAgent, CustomerMessage
|
||||
if i == 1: # 第一个客户时初始化
|
||||
agent = CustomerServiceAgent(skills_dir="skills")
|
||||
cprint("✓ AI Agent 已加载", 'system')
|
||||
except Exception as e:
|
||||
cprint(f"✗ AI Agent 加载失败:{e}", 'error')
|
||||
return
|
||||
|
||||
# 模拟对话
|
||||
for j, (direction, message, timestamp) in enumerate(conversations, 1):
|
||||
if direction == 'in':
|
||||
total_msgs += 1
|
||||
|
||||
# 创建测试消息
|
||||
test_msg = CustomerMessage(
|
||||
msg_id=f"test_{i}_{j}",
|
||||
acc_id="test_shop",
|
||||
msg=message,
|
||||
from_id=cid,
|
||||
from_name=name or "测试",
|
||||
cy_id=cid,
|
||||
acc_type="AliWorkbench",
|
||||
msg_type=0,
|
||||
cy_name=name or "测试",
|
||||
goods_name="专业找图",
|
||||
goods_order=""
|
||||
)
|
||||
|
||||
# 获取 AI 回复
|
||||
start = datetime.now()
|
||||
try:
|
||||
response = await agent.process_message(test_msg)
|
||||
elapsed = (datetime.now() - start).total_seconds() * 1000
|
||||
|
||||
if response.should_reply:
|
||||
total_replies += 1
|
||||
cprint(f"\n[{i}/{len(customers)}] {name or cid} - 消息 {j}", 'system')
|
||||
cprint(f"客户:{message}", 'customer')
|
||||
cprint(f"AI [{elapsed:.0f}ms]: {response.reply}", 'agent')
|
||||
|
||||
# 检测特殊内容
|
||||
if any(kw in response.reply for kw in ['元', '块', '价格']):
|
||||
cprint(" ↳ [价格信息]", 'price')
|
||||
if response.need_transfer:
|
||||
cprint(" ↳ [转人工]", 'error')
|
||||
else:
|
||||
cprint(f"\n[{i}/{len(customers)}] [AI 静默]", 'system')
|
||||
|
||||
except Exception as e:
|
||||
cprint(f"✗ AI 回复失败:{e}", 'error')
|
||||
|
||||
# 每个客户之间休息一下
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
# 统计结果
|
||||
cprint(f"\n\n{'='*70}", 'header')
|
||||
cprint(f" 批量测试完成!", 'header')
|
||||
cprint(f"{'='*70}", 'header')
|
||||
cprint(f"\n统计:", 'system')
|
||||
cprint(f" 测试客户数:{len(customers)}", 'cyan')
|
||||
cprint(f" 处理消息数:{total_msgs}", 'cyan')
|
||||
cprint(f" AI 回复数:{total_replies}", 'cyan')
|
||||
if total_msgs > 0:
|
||||
reply_rate = (total_replies / total_msgs) * 100
|
||||
cprint(f" 回复率:{reply_rate:.1f}%", 'cyan')
|
||||
|
||||
async def main():
|
||||
cprint("="*70, 'header')
|
||||
cprint(" AI Agent 对话测试", 'header')
|
||||
cprint(" 从数据库加载聊天记录,测试 AI 回复效果", 'header')
|
||||
cprint("="*70, 'header')
|
||||
|
||||
# 检查数据库
|
||||
customers = check_database()
|
||||
if not customers:
|
||||
return
|
||||
|
||||
# 选择测试模式
|
||||
cprint(f"\n请选择测试模式:", 'cyan')
|
||||
cprint(f" 1. 交互式测试 (手动选择客户)", 'customer')
|
||||
cprint(f" 2. 批量测试所有客户 (自动)", 'agent')
|
||||
cprint(f" 3. 快速测试前 5 个客户", 'price')
|
||||
cprint(f" q. 退出", 'system')
|
||||
|
||||
mode = input("\n选择:").strip().lower()
|
||||
|
||||
if mode == 'q':
|
||||
cprint("\n测试结束!", 'system')
|
||||
return
|
||||
|
||||
try:
|
||||
if mode == '1':
|
||||
# 交互式测试
|
||||
cprint(f"\n请输入客户编号 (1-{len(customers)}) 进行测试:", 'cyan')
|
||||
|
||||
while True:
|
||||
try:
|
||||
choice = input("\n选择:").strip()
|
||||
|
||||
if choice.lower() == 'q':
|
||||
cprint("\n测试结束!", 'system')
|
||||
return
|
||||
|
||||
choice_num = int(choice)
|
||||
if 1 <= choice_num <= len(customers):
|
||||
cid, name, cnt, _ = customers[choice_num - 1]
|
||||
await test_customer_conversation(cid, name or cid, limit=min(cnt, 10))
|
||||
else:
|
||||
cprint(f"请输入 1-{len(customers)} 之间的数字", 'error')
|
||||
|
||||
except ValueError:
|
||||
cprint("请输入有效数字或 q 退出", 'error')
|
||||
except KeyboardInterrupt:
|
||||
cprint("\n\n测试中断", 'error')
|
||||
return
|
||||
except Exception as e:
|
||||
cprint(f"错误:{e}", 'error')
|
||||
|
||||
elif mode == '2':
|
||||
# 批量测试所有客户
|
||||
await test_all_customers(customers, limit_per_customer=5)
|
||||
|
||||
elif mode == '3':
|
||||
# 快速测试前 5 个客户
|
||||
top_5 = customers[:5]
|
||||
cprint(f"\n快速测试前 5 个客户...", 'cyan')
|
||||
await test_all_customers(top_5, limit_per_customer=5)
|
||||
|
||||
else:
|
||||
cprint("无效的选择", 'error')
|
||||
|
||||
except KeyboardInterrupt:
|
||||
cprint("\n\n测试中断", 'error')
|
||||
except Exception as e:
|
||||
cprint(f"错误:{e}", 'error')
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except Exception as e:
|
||||
cprint(f"\n程序异常:{e}", 'error')
|
||||
@@ -1,33 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""配置中心测试"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
def test_config_paths():
|
||||
from config.config import ROOT, LOG_DIR, RESULTS_DIR, CONFIG_DIR
|
||||
assert ROOT.exists()
|
||||
assert ROOT.is_dir()
|
||||
assert (ROOT / "config").samefile(CONFIG_DIR)
|
||||
assert LOG_DIR == ROOT / "logs"
|
||||
print("config paths OK")
|
||||
|
||||
|
||||
def test_config_values():
|
||||
from config.config import (
|
||||
IMAGE_QUEUE_MAX_CONCURRENT,
|
||||
IMAGE_QUEUE_MAX_SIZE,
|
||||
LOG_MAX_BYTES,
|
||||
LOG_BACKUP_COUNT,
|
||||
)
|
||||
assert IMAGE_QUEUE_MAX_CONCURRENT >= 1
|
||||
assert IMAGE_QUEUE_MAX_SIZE >= 1
|
||||
assert LOG_MAX_BYTES > 0
|
||||
assert LOG_BACKUP_COUNT >= 1
|
||||
print("config values OK")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_config_paths()
|
||||
test_config_values()
|
||||
print("All config tests passed")
|
||||
@@ -1,19 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""设计师派单数据库测试"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
|
||||
def test_list_designers():
|
||||
"""测试列出设计师(不修改数据)"""
|
||||
from db.designer_roster_db import list_designers
|
||||
designers = list_designers()
|
||||
assert isinstance(designers, list)
|
||||
print(f"designer roster: {len(designers)} designers")
|
||||
print("designer roster OK")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_list_designers()
|
||||
print("All designer roster tests passed")
|
||||
@@ -1,28 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""健康检查测试"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
|
||||
def test_set_status():
|
||||
"""测试状态设置"""
|
||||
from utils.health_check import set_qingjian_connected, set_wechat_ok
|
||||
set_qingjian_connected(True)
|
||||
set_qingjian_connected(False)
|
||||
set_wechat_ok(True)
|
||||
print("health check status OK")
|
||||
|
||||
|
||||
async def test_run_check():
|
||||
"""测试执行健康检查(不实际发告警)"""
|
||||
from utils.health_check import run_health_check
|
||||
await run_health_check(lambda: True)
|
||||
print("health check run OK")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
test_set_status()
|
||||
asyncio.run(test_run_check())
|
||||
print("All health check tests passed")
|
||||
@@ -1,36 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""图片队列测试"""
|
||||
import sys
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
|
||||
async def test_queue_semaphore():
|
||||
"""测试队列并发限制"""
|
||||
from utils.image_queue import init, run_with_queue, release
|
||||
init(max_concurrent=2, max_queue=5)
|
||||
running = 0
|
||||
max_running = 0
|
||||
|
||||
async def fake_task():
|
||||
nonlocal running, max_running
|
||||
running += 1
|
||||
max_running = max(max_running, running)
|
||||
await asyncio.sleep(0.1)
|
||||
running -= 1
|
||||
return "ok"
|
||||
|
||||
results = await asyncio.gather(
|
||||
run_with_queue(fake_task()),
|
||||
run_with_queue(fake_task()),
|
||||
run_with_queue(fake_task()),
|
||||
)
|
||||
assert all(r == "ok" for r in results)
|
||||
assert max_running <= 2
|
||||
print("image queue OK")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(test_queue_semaphore())
|
||||
print("All image queue tests passed")
|
||||
@@ -1,140 +0,0 @@
|
||||
"""
|
||||
端到端测试:模拟客户付款后的自动图片处理流程
|
||||
运行:python test_process.py
|
||||
"""
|
||||
import sys
|
||||
import io
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
|
||||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# ── 测试参数 ────────────────────────────────────────────────
|
||||
TEST_CUSTOMER_ID = "test_user_001"
|
||||
TEST_ACC_ID = "小威哥1216"
|
||||
TEST_IMAGE_URL = (
|
||||
"https://img.alicdn.com/imgextra/i3/O1CN01tqQst21qIEOdcUOCQ"
|
||||
"_!!4611686018427380880-0-amp.jpg"
|
||||
)
|
||||
# ────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
async def step1_analyze():
|
||||
"""Step 1: 图片分析(模拟 analyze_image 工具调用)"""
|
||||
print("\n" + "="*60)
|
||||
print("Step 1: 图片分析")
|
||||
print("="*60)
|
||||
from image.image_analyzer import image_analyzer
|
||||
result = await image_analyzer.analyze(TEST_IMAGE_URL)
|
||||
print(f"分析结果: {result}")
|
||||
return result
|
||||
|
||||
|
||||
async def step2_create_task(analysis: dict):
|
||||
"""Step 2: 创建 Workflow 任务(模拟 image_analysis_result)"""
|
||||
print("\n" + "="*60)
|
||||
print("Step 2: 创建 Workflow 任务")
|
||||
print("="*60)
|
||||
from core.workflow import workflow
|
||||
await workflow.image_analysis_result(
|
||||
customer_id = TEST_CUSTOMER_ID,
|
||||
image_url = TEST_IMAGE_URL,
|
||||
complexity = analysis.get("complexity", "normal"),
|
||||
acc_id = TEST_ACC_ID,
|
||||
acc_type = "AliWorkbench",
|
||||
gemini_prompt= analysis.get("gemini_prompt", ""),
|
||||
aspect_ratio = analysis.get("aspect_ratio", "1:1"),
|
||||
perspective = analysis.get("perspective", "no"),
|
||||
proc_type = analysis.get("proc_type", ""),
|
||||
subject = analysis.get("subject", ""),
|
||||
quality = analysis.get("quality", ""),
|
||||
)
|
||||
|
||||
task_id = workflow.customer_active_task.get(TEST_CUSTOMER_ID)
|
||||
if task_id:
|
||||
task = workflow.tasks[task_id]
|
||||
print(f"任务已创建: {task_id[:8]}...")
|
||||
print(f" requirements: {task.requirements}")
|
||||
print(f" image: {task.original_image[:80]}...")
|
||||
else:
|
||||
print("⚠️ 任务创建失败!")
|
||||
sys.exit(1)
|
||||
return task_id
|
||||
|
||||
|
||||
async def step3_trigger_payment():
|
||||
"""Step 3: 模拟付款触发处理(等待 Gemini 完成)"""
|
||||
print("\n" + "="*60)
|
||||
print("Step 3: 模拟付款,触发 Gemini 处理(同步等待完成)")
|
||||
print("="*60)
|
||||
from core.workflow import workflow
|
||||
|
||||
# 注入发送函数(避免真实发消息,只打印)
|
||||
async def fake_send(**kw):
|
||||
cid = kw.get("customer_id", kw.get("from_id", "?"))
|
||||
content = kw.get("content", kw.get("msg", ""))
|
||||
print(f"[FAKE SEND -> {cid}] {str(content)[:120]}")
|
||||
|
||||
workflow._send_message = fake_send
|
||||
|
||||
# 直接调用 _auto_process 同步等待,而非后台 task
|
||||
task_id = workflow.customer_active_task.get(TEST_CUSTOMER_ID)
|
||||
if not task_id:
|
||||
print(" 找不到待处理任务!")
|
||||
return
|
||||
|
||||
print(f" 开始处理任务: {task_id[:8]}...")
|
||||
await workflow._auto_process(task_id, acc_id=TEST_ACC_ID, acc_type="AliWorkbench")
|
||||
|
||||
|
||||
async def step4_check_result():
|
||||
"""Step 4: 检查结果"""
|
||||
print("\n" + "="*60)
|
||||
print("Step 4: 检查处理结果")
|
||||
print("="*60)
|
||||
result_dir = os.getenv("RESULT_IMAGE_DIR", "results")
|
||||
if not os.path.exists(result_dir):
|
||||
print(f"结果目录不存在: {result_dir}")
|
||||
return
|
||||
|
||||
files = sorted(
|
||||
[f for f in os.listdir(result_dir) if f.startswith("result_")],
|
||||
key=lambda f: os.path.getmtime(os.path.join(result_dir, f)),
|
||||
reverse=True,
|
||||
)
|
||||
if files:
|
||||
latest = files[0]
|
||||
path = os.path.join(result_dir, latest)
|
||||
size = os.path.getsize(path)
|
||||
print(f"最新结果文件: {latest}")
|
||||
print(f" 大小: {size:,} bytes ({size/1024:.1f} KB)")
|
||||
else:
|
||||
print("结果目录为空,可能处理失败")
|
||||
|
||||
|
||||
async def main():
|
||||
print("=" * 60)
|
||||
print(" 图片处理流程端到端测试")
|
||||
print("=" * 60)
|
||||
print(f"测试图片: {TEST_IMAGE_URL[:80]}...")
|
||||
|
||||
try:
|
||||
analysis = await step1_analyze()
|
||||
await step2_create_task(analysis)
|
||||
await step3_trigger_payment()
|
||||
await step4_check_result()
|
||||
print("\n[OK] 测试完成")
|
||||
except KeyboardInterrupt:
|
||||
print("\n测试被中断")
|
||||
except Exception as e:
|
||||
import traceback
|
||||
print(f"\n[FAIL] 测试失败: {e}")
|
||||
traceback.print_exc()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -1,53 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""测试转人工流程:设计师在线查询 + 派单 + 无人在线时企微提醒"""
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
# 测试用 API 地址(文档中的)
|
||||
os.environ.setdefault("DESIGNER_ROSTER_API", "http://huichang.online:8001/online")
|
||||
|
||||
|
||||
async def main():
|
||||
from utils.designer_roster import poll_and_update_roster
|
||||
from db.designer_roster_db import list_designers, get_transfer_group_for_shop
|
||||
|
||||
print("1. 查询并同步设计师在线状态...")
|
||||
await poll_and_update_roster()
|
||||
print(" OK")
|
||||
|
||||
print("\n2. 当前设计师状态:")
|
||||
for d in list_designers():
|
||||
status = "在线" if d["is_online"] else "离线"
|
||||
print(f" - {d['name']} ({d['wechat_user_id']}): {status}")
|
||||
|
||||
print("\n3. 派单测试 (店铺: 小威哥1216):")
|
||||
shop_id = "小威哥1216"
|
||||
group_id = get_transfer_group_for_shop(shop_id)
|
||||
if group_id:
|
||||
print(f" 派单成功 -> group_id={group_id}")
|
||||
else:
|
||||
print(" 无人在线,将回退到静态配置")
|
||||
print(" (转人工时会发企微「谁在线啊」)")
|
||||
|
||||
print("\n4. 静态回退:")
|
||||
from config.config import CONFIG_DIR
|
||||
import json
|
||||
cfg_path = CONFIG_DIR / "transfer_groups.json"
|
||||
default = "20252916034"
|
||||
if cfg_path.exists():
|
||||
with open(cfg_path, "r", encoding="utf-8") as f:
|
||||
cfg = json.load(f)
|
||||
fallback = cfg.get(shop_id, cfg.get("default", default))
|
||||
else:
|
||||
fallback = default
|
||||
print(f" 回退分组: {fallback}")
|
||||
|
||||
print("\n测试完成")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -1,27 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""转接分组测试"""
|
||||
import sys
|
||||
import json
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
|
||||
def test_get_transfer_group():
|
||||
"""测试转接分组查找逻辑"""
|
||||
from config.config import CONFIG_DIR
|
||||
config_path = CONFIG_DIR / "transfer_groups.json"
|
||||
default = "20252916034"
|
||||
if not config_path.exists():
|
||||
print("transfer_groups.json 不存在,跳过")
|
||||
return
|
||||
with open(config_path, "r", encoding="utf-8") as f:
|
||||
cfg = json.load(f)
|
||||
got = cfg.get("default", default)
|
||||
assert got
|
||||
print(f"default group: {got}")
|
||||
print("transfer groups OK")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_get_transfer_group()
|
||||
print("All transfer tests passed")
|
||||
@@ -1,37 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""测试企微「谁在线啊」消息发送"""
|
||||
import asyncio
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
|
||||
async def main():
|
||||
from config.config import WECHAT_WEBHOOK
|
||||
import httpx
|
||||
|
||||
if not WECHAT_WEBHOOK:
|
||||
print("未配置 WECHAT_WEBHOOK,无法测试")
|
||||
return
|
||||
|
||||
print(f"发送测试消息到企微...")
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.post(WECHAT_WEBHOOK, json={
|
||||
"msgtype": "text",
|
||||
"text": {"content": "谁在线啊"}
|
||||
})
|
||||
print(f"状态码: {resp.status_code}")
|
||||
print(f"响应: {resp.text}")
|
||||
if resp.status_code == 200:
|
||||
data = resp.json()
|
||||
if data.get("errcode") == 0:
|
||||
print("发送成功,请检查企微群是否收到")
|
||||
else:
|
||||
print(f"企微返回错误: {data}")
|
||||
else:
|
||||
print("发送失败")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user