Files
tw/tests/test_transfer_flow.py
ZuoWei a6c42d505a feat: 完整功能部署 v1.0
新增功能:
- 天网协作系统 (HTTP API 端口 6060)
- 三种工作流 (查找图片/处理图片/转人工派单)
- 图片任务数据库 (支持客户后续增加需求)
- 图绘派单系统集成 (API: 8005)
- 文字检测与加价 (60-80 元高价值订单)
- 风险评估与接单判断
- 作图失败自动转人工

新增文档:
- 项目功能汇总.md
- 三种工作流功能说明.md
- 文字加价功能说明.md
- 风险评估功能说明.md
- 图片任务数据库功能说明.md
- 图绘派单系统集成说明.md
- 作图失败转接人工说明.md
- DEPLOYMENT.md
- TIANWANG_INTEGRATION.md

核心修改:
- core/pydantic_ai_agent.py
- core/workflow.py
- core/websocket_client.py
- image/image_analyzer.py
- services/service_tuhui_dispatch.py
- db/image_tasks_db.py

版本:v1.0
日期:2026-02-28
2026-02-28 11:20:40 +08:00

54 lines
1.7 KiB
Python
Executable File

# -*- coding: utf-8 -*-
"""测试转人工流程:设计师在线查询 + 派单 + 无人在线时企微提醒"""
import asyncio
import os
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
# 测试用 API 地址(文档中的)
os.environ.setdefault("DESIGNER_ROSTER_API", "http://huichang.online:8001/online")
async def main():
from utils.designer_roster import poll_and_update_roster
from db.designer_roster_db import list_designers, get_transfer_group_for_shop
print("1. 查询并同步设计师在线状态...")
await poll_and_update_roster()
print(" OK")
print("\n2. 当前设计师状态:")
for d in list_designers():
status = "在线" if d["is_online"] else "离线"
print(f" - {d['name']} ({d['wechat_user_id']}): {status}")
print("\n3. 派单测试 (店铺: 小威哥1216):")
shop_id = "小威哥1216"
group_id = get_transfer_group_for_shop(shop_id)
if group_id:
print(f" 派单成功 -> group_id={group_id}")
else:
print(" 无人在线,将回退到静态配置")
print(" (转人工时会发企微「谁在线啊」)")
print("\n4. 静态回退:")
from config.config import CONFIG_DIR
import json
cfg_path = CONFIG_DIR / "transfer_groups.json"
default = "20252916034"
if cfg_path.exists():
with open(cfg_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
fallback = cfg.get(shop_id, cfg.get("default", default))
else:
fallback = default
print(f" 回退分组: {fallback}")
print("\n测试完成")
if __name__ == "__main__":
asyncio.run(main())