feat: 开版订单 AI 解析服务

- 豆包 AI 解析企微消息 → 开版订单字段
- 字典枚举缓存(每日刷新,11 个分组 + 关联流程)
- 布料精确匹配,找不到自动新增
- 客户精确匹配,未找到返回建议
- 企微消息代理(解决前端 CORS)
- 支持 tags 写入(记录解析次数)
- 单条消息图片获取接口
This commit is contained in:
zuowei1216
2026-05-25 19:35:22 +08:00
commit eaed17c362
5 changed files with 646 additions and 0 deletions

15
.env.example Normal file
View File

@@ -0,0 +1,15 @@
# 豆包 AI 配置
AI_API_KEY=your-doubao-api-key
AI_BASE_URL=https://ark.cn-beijing.volces.com/api/v3
AI_MODEL=doubao-seed-2-0-mini-260428
# ERP 后端配置(用于查询字典/客户/布料)
ERP_BASE_URL=https://yuwenerp.yuwen.cloud
ERP_USERNAME=your-username
ERP_PASSWORD=your-password
# 服务端口
PORT=5050
# 调试模式(生产环境改为 false
DEBUG=true

22
.gitignore vendored Normal file
View File

@@ -0,0 +1,22 @@
# 环境变量(含密钥,不提交)
.env
.env.local
.env.production
.env.test
# Python
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
*.egg-info/
dist/
build/
venv/
.venv/
# 测试/调试文件
bench.py
debug_api.py
test_*.py

597
app.py Normal file
View File

@@ -0,0 +1,597 @@
"""
开版订单 AI 解析服务
Flask + 豆包 API/api/v3/responses 格式)
功能:
1. 代理企微消息接口(解决 CORS
2. AI 解析消息内容 → 开版订单字段
3. 字典枚举值缓存(启动时拉取,每日刷新)
4. 布料/客户智能匹配:搜索 → 不存在则自动新增
启动:
python app.py
"""
import os
import json
import logging
import threading
import time
import requests
from datetime import datetime, timedelta
from flask import Flask, request, jsonify
from flask_cors import CORS
from dotenv import load_dotenv
load_dotenv()
# ─── 日志 ─────────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)
# ─── Flask ────────────────────────────────────────────────────────────────────
app = Flask(__name__)
CORS(app, origins="*")
# ─── 豆包 AI 配置 ─────────────────────────────────────────────────────────────
AI_API_KEY = os.getenv("AI_API_KEY", "")
AI_BASE_URL = os.getenv("AI_BASE_URL", "https://ark.cn-beijing.volces.com/api/v3")
AI_MODEL = os.getenv("AI_MODEL", "doubao-seed-2-0-mini-260428")
DOUBAO_URL = f"{AI_BASE_URL}/responses"
DOUBAO_HEADERS = {
"Authorization": f"Bearer {AI_API_KEY}",
"Content-Type": "application/json",
}
# ─── 企微消息服务配置 ─────────────────────────────────────────────────────────
WECHAT_API_BASE = "https://test.ruicaiyinhua.online"
WECHAT_AUTH = "hophopkk"
WORK_ORDER_ROOM = "wrW_mXZwAAaGwXBWvzbiAfEjy3ZM7ong"
# ─── ERP 后端配置 ─────────────────────────────────────────────────────────────
ERP_BASE_URL = os.getenv("ERP_BASE_URL", "https://yuwenerp.yuwen.cloud")
ERP_USERNAME = os.getenv("ERP_USERNAME", "jimi")
ERP_PASSWORD = os.getenv("ERP_PASSWORD", "zuowei1216")
# 需要全量缓存的小字典分组(用于 AI prompt 约束)
SMALL_DICT_GROUPS = [
"起版情况", "紧急程度", "做货方式", "开版方式",
"地区", "画图评级", "调色评级", "套样评级",
"难度评级", "布源", "幅宽",
]
# ─── ERP Token 管理 ──────────────────────────────────────────────────────────
class ErpAuth:
def __init__(self):
self.access_token = None
self.refresh_token = None
self.expires_at = 0
self._lock = threading.Lock()
def login(self):
with self._lock:
try:
r = requests.post(
f"{ERP_BASE_URL}/api/auth/login/",
json={"username": ERP_USERNAME, "password": ERP_PASSWORD},
timeout=15,
)
r.raise_for_status()
data = r.json()
self.access_token = data["access"]
self.refresh_token = data.get("refresh")
# JWT 默认 7 天,提前 1 小时刷新
self.expires_at = time.time() + 6 * 24 * 3600
logger.info("ERP 登录成功")
return self.access_token
except Exception as e:
logger.error(f"ERP 登录失败: {e}")
raise
def get_token(self):
if not self.access_token or time.time() >= self.expires_at:
return self.login()
return self.access_token
def headers(self):
return {"Authorization": f"Bearer {self.get_token()}"}
erp_auth = ErpAuth()
def erp_get(path: str, params: dict = None):
"""GET ERP 接口,自动处理 401"""
url = f"{ERP_BASE_URL}{path}"
r = requests.get(url, headers=erp_auth.headers(), params=params or {}, timeout=30)
if r.status_code == 401:
erp_auth.login()
r = requests.get(url, headers=erp_auth.headers(), params=params or {}, timeout=30)
r.raise_for_status()
return r.json()
def erp_post(path: str, data: dict):
"""POST ERP 接口"""
url = f"{ERP_BASE_URL}{path}"
r = requests.post(url, headers={**erp_auth.headers(), "Content-Type": "application/json"},
json=data, timeout=30)
if r.status_code == 401:
erp_auth.login()
r = requests.post(url, headers={**erp_auth.headers(), "Content-Type": "application/json"},
json=data, timeout=30)
r.raise_for_status()
return r.json()
# ─── 字典缓存 ─────────────────────────────────────────────────────────────────
class DictCache:
"""小字典分组的内存缓存,启动时加载,每日刷新"""
def __init__(self):
self.cache = {} # {group: [name1, name2, ...]}
self.processes = [] # [{id, name, description}] 关联流程
self.last_loaded = 0
self.ttl = 24 * 3600 # 24 小时
self._lock = threading.Lock()
def load(self):
with self._lock:
logger.info("加载字典缓存...")
new_cache = {}
for group in SMALL_DICT_GROUPS:
try:
data = erp_get("/api/backend/quick-inputs/", {"group": group, "limit": 200})
items = data.get("results", [])
new_cache[group] = [it["name"] for it in items if it.get("name")]
logger.info(f" [{group}] {len(new_cache[group])}")
except Exception as e:
logger.warning(f" [{group}] 加载失败: {e}")
new_cache[group] = self.cache.get(group, [])
self.cache = new_cache
# 加载关联流程
try:
data = erp_get("/api/v1/stateflow/processes/", {"limit": 100})
self.processes = [
{"id": p["id"], "name": p["name"], "description": p.get("description", "")}
for p in data.get("results", [])
]
logger.info(f" [关联流程] {len(self.processes)}")
except Exception as e:
logger.warning(f" [关联流程] 加载失败: {e}")
self.last_loaded = time.time()
logger.info("字典缓存加载完成")
def get(self, group: str):
if time.time() - self.last_loaded > self.ttl:
try:
self.load()
except Exception:
pass
return self.cache.get(group, [])
def get_processes(self):
return self.processes
def find_process_id(self, name: str):
"""根据名称查找流程 ID精确 + 模糊)"""
if not name:
return None
for p in self.processes:
if p["name"] == name:
return p["id"]
# 模糊匹配
for p in self.processes:
if name in p["name"] or p["name"] in name:
return p["id"]
return None
def all(self):
return dict(self.cache)
dict_cache = DictCache()
def background_init():
"""后台线程:登录 ERP + 加载字典缓存(不阻塞 Flask 启动)"""
try:
erp_auth.login()
dict_cache.load()
except Exception as e:
logger.error(f"初始化失败: {e}")
# ─── 布料/客户 智能匹配 ───────────────────────────────────────────────────────
def find_or_create_fabric(name: str) -> dict:
"""
精确匹配优先;找不到则新增(不做模糊匹配,避免错配)
返回 {id, name, exists, created}
"""
if not name:
return None
name = name.strip()
try:
# 精确搜索(按 name 参数过滤)
data = erp_get("/api/backend/quick-inputs/", {"group": "布料", "name": name, "limit": 20})
items = data.get("results", [])
# 只接受精确匹配
exact = next((it for it in items if it.get("name") == name), None)
if exact:
return {"id": exact["id"], "name": exact["name"], "exists": True, "created": False}
# 找不到精确 → 新增
new_item = erp_post("/api/backend/quick-inputs/", {
"group": "布料",
"name": name,
"value": name,
})
logger.info(f"新增布料: {name} (id={new_item.get('id')})")
return {"id": new_item.get("id"), "name": name, "exists": False, "created": True}
except Exception as e:
logger.error(f"匹配/新增布料失败 [{name}]: {e}")
return None
def find_customer(name: str) -> dict:
"""
搜索客户:精确匹配优先;模糊匹配只在精确未命中时使用,并标记 fuzzy
返回 {id, name, exists, ...}
"""
if not name:
return None
name = name.strip()
try:
data = erp_get("/api/backend/customers/", {"name": name, "limit": 10})
items = data.get("results", []) if isinstance(data, dict) else data
if not items:
return {"id": None, "name": name, "exists": False}
# 精确匹配
exact = next((it for it in items if it.get("name") == name), None)
if exact:
return {
"id": exact["id"], "name": exact["name"], "exists": True,
"area": exact.get("area"),
"visible_employees": exact.get("visible_employees"),
}
# 没有精确匹配 → 返回未找到(让前端决定是否模糊建议)
return {
"id": None, "name": name, "exists": False,
"suggestions": [{"id": it["id"], "name": it["name"]} for it in items[:5]],
}
except Exception as e:
logger.error(f"搜索客户失败 [{name}]: {e}")
return None
# ─── AI 解析(带枚举约束)────────────────────────────────────────────────────
def build_system_prompt() -> str:
"""根据缓存的字典动态构建 prompt让 AI 严格选枚举"""
def opts(group):
vals = dict_cache.get(group)
return "/".join(vals) if vals else "(待加载)"
# 流程选项(带描述)
process_lines = []
for p in dict_cache.get_processes():
desc = f"({p['description']})" if p.get("description") else ""
process_lines.append(f" - {p['name']}{desc}")
process_text = "\n".join(process_lines) if process_lines else "(待加载)"
return f"""你是一个印染厂开版订单信息提取助手。
用户会发给你一条企业微信群聊消息,消息内容是跟单员发给设计师的工单指令。
你需要从消息中提取以下字段,返回 JSON 格式。
【字段说明(**必须严格使用以下枚举值**,不在列表中的返回 null
- customer_name: 客户名称(自由文本,消息开头通常是客户名,如"華鑫""林庆福""糖糖"
- plate_type: 起版情况,**只能选**{opts('起版情况')}
映射规则:图片起版/样衣起版/图片开版/样衣开版/找图开发/起版 → 首版
修改/加色/调色/调整/改版 → 修改单
套大货/套纸样/套样/重新套 → 复版/开版
- urgency_level: 紧急程度,**只能选**{opts('紧急程度')}
映射规则:消息有"加急"→ 加急;有"特急"→ 特急;"加急已下单"等 → 紧急已下单;否则 → 正常
- production_method: 做货方式,**只能选**{opts('做货方式')}
- plate_method: 开版方式,**只能选**{opts('开版方式')}
映射规则:消息有"图片起版/图片开版"→ 图片开版
消息有"样衣起版/样板起版/样衣开版"→ 样衣开版
消息有"文件开版"→ 文件开版
其他原始词如"开发""开板"→ 开发
- area: 客户区域,**只能选**{opts('地区')}(消息里很少提及,多数返回 null
- fabric: 面料名称(自由文本,完整保留克重和布料类型,如"140g四面弹""120克本白四面弹""180克牛奶丝""泡泡皱"
- width: 面料幅宽,**只能选**{opts('幅宽')}(如"幅宽151"匹配"1.51""120"匹配"120",找不到则 null
- fabric_source: 面料来源,**只能选**{opts('布源')}
- drawing_rating: 画图评级,**只能选**{opts('画图评级')}(消息里几乎不会有,多返 null
- color_matching_rating: 调色评级,**只能选**{opts('调色评级')}(多返 null
- sample_rating: 套样评级,**只能选**{opts('套样评级')}(多返 null
- difficulty_rating: 难度评级,**只能选**{opts('难度评级')}(多返 null
- process_name: 关联流程,**只能选**以下名称之一(直接返回流程名字符串,不要带括号注释):
{process_text}
选择规则:根据 plate_type 和操作内容综合判断
- 套大货/套纸样 → "开版(套米样)""开版(套大货)"
- 加色/调色 → "开版(配色)""开版(绘图调色)"
- P图/抠图 → "开版(P图)"
- 描图/找图开发 → "开版(描图开版)""开版(绘图调色套样)"
- 不确定时返回 null让用户手动选
- plate_notes: 打版备注(自由文本,把消息中的具体要求、注意事项原文保留:克重、颜色要求、特殊工艺等)
- style_name: 款号名称(如"NSY1090""G67单600-20款""6576改"等款号,没有则 null
- original_design_code: 原订单ID如果消息提到要"套大货/套纸样/修改"等,且消息中有 5-7 位的纯数字编号,那就是原订单号;否则 null
- merchandiser_name: 跟单员(直接返回 sender 字段的原值)
【重要规则】
1. 枚举字段plate_type、urgency_level、production_method、plate_method、area、width、fabric_source、各评级、process_name**必须**从给定选项里选,不能自创;判断不出就返回 null
2. 自由文本字段customer_name、fabric、plate_notes、style_name、original_design_code保持原文
3. 一律返回 JSON 对象,不要任何其它文字"""
def call_doubao(user_text: str) -> dict:
"""调用豆包 /v3/responses返回解析后的 dict"""
payload = {
"model": AI_MODEL,
"input": [
{"role": "system", "content": [{"type": "input_text", "text": build_system_prompt()}]},
{"role": "user", "content": [{"type": "input_text", "text": user_text}]},
],
"temperature": 0.1,
"max_output_tokens": 4096,
}
r = requests.post(DOUBAO_URL, headers=DOUBAO_HEADERS, json=payload, timeout=60)
r.raise_for_status()
data = r.json()
raw_text = ""
for item in data.get("output", []):
if item.get("type") == "message":
for c in item.get("content", []):
if c.get("type") == "output_text":
raw_text = c.get("text", "").strip()
break
break
if not raw_text:
raise ValueError("豆包返回内容为空")
start = raw_text.find("{")
end = raw_text.rfind("}") + 1
if start == -1 or end == 0:
raise ValueError(f"响应中未找到 JSON{raw_text}")
return json.loads(raw_text[start:end])
# ─── 路由 ─────────────────────────────────────────────────────────────────────
@app.route("/health", methods=["GET"])
def health():
return jsonify({
"status": "ok",
"model": AI_MODEL,
"dict_cache_loaded": bool(dict_cache.cache),
"dict_groups": list(dict_cache.cache.keys()),
"erp_logged_in": bool(erp_auth.access_token),
})
@app.route("/api/v1/dict-cache/", methods=["GET"])
def get_dict_cache():
"""查看当前缓存(调试用)"""
return jsonify(dict_cache.all())
@app.route("/api/v1/dict-cache/refresh/", methods=["POST"])
def refresh_dict_cache():
"""手动刷新字典缓存"""
try:
dict_cache.load()
return jsonify({"status": "ok", "groups": list(dict_cache.cache.keys())})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ─── 企微消息代理 ─────────────────────────────────────────────────────────────
@app.route("/api/v1/wechat/messages/", methods=["GET"])
def proxy_wechat_messages():
limit = request.args.get("limit", "30")
offset = request.args.get("offset", "0")
room_id = request.args.get("room_id", WORK_ORDER_ROOM)
with_image = request.args.get("with_image", "false")
try:
r = requests.get(
f"{WECHAT_API_BASE}/messages",
headers={"Authorization": WECHAT_AUTH},
params={"room_id": room_id, "limit": limit, "offset": offset, "with_image": with_image},
timeout=60,
)
r.raise_for_status()
return jsonify(r.json())
except requests.Timeout:
return jsonify({"error": "企微接口超时"}), 504
except Exception as e:
logger.error(f"代理请求失败: {e}")
return jsonify({"error": str(e)}), 500
@app.route("/api/v1/wechat/messages/<path:msg_id>/with-image", methods=["GET"])
def proxy_get_message_with_image(msg_id):
"""
代理 GET /messages/:msg_id?with_image=true
企微服务直接支持按 msg_id 查单条消息,速度极快(< 0.1s
"""
try:
r = requests.get(
f"{WECHAT_API_BASE}/messages/{msg_id}",
headers={"Authorization": WECHAT_AUTH},
params={"with_image": "true"},
timeout=30,
)
r.raise_for_status()
return jsonify(r.json())
except requests.Timeout:
return jsonify({"error": "企微接口超时"}), 504
except requests.HTTPError as e:
return jsonify({"error": f"企微接口错误 {e.response.status_code}"}), e.response.status_code
except Exception as e:
logger.error(f"获取消息图片失败 msg_id={msg_id}: {e}")
return jsonify({"error": str(e)}), 500
@app.route("/api/v1/wechat/messages/<path:msg_id>/tags", methods=["PUT"])
def proxy_update_tags(msg_id):
try:
body = request.get_data()
r = requests.put(
f"{WECHAT_API_BASE}/messages/{msg_id}/tags",
headers={"Authorization": WECHAT_AUTH, "Content-Type": "application/json"},
data=body,
timeout=10,
)
return jsonify(r.json()), r.status_code
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/v1/wechat/messages/<path:msg_id>/parse-record/", methods=["POST"])
def record_parse(msg_id):
try:
body = request.get_json(silent=True) or {}
old_tags = body.get("old_tags") or {}
new_count = int(old_tags.get("parse_count", 0)) + 1
new_tags = {
**old_tags,
"parse_count": new_count,
"last_parsed_at": datetime.utcnow().isoformat() + "Z",
"last_parsed_by": body.get("user", "anonymous"),
}
r = requests.put(
f"{WECHAT_API_BASE}/messages/{msg_id}/tags",
headers={"Authorization": WECHAT_AUTH, "Content-Type": "application/json"},
json=new_tags,
timeout=10,
)
r.raise_for_status()
return jsonify(r.json())
except Exception as e:
logger.error(f"记录解析失败 msg_id={msg_id}: {e}")
return jsonify({"error": str(e)}), 500
# ─── 主接口:解析开版订单 ────────────────────────────────────────────────────
@app.route("/api/v1/ai/parse-plate-order/", methods=["POST"])
def parse_plate_order():
"""
解析企业微信消息 → 开版订单字段
自动匹配/新增布料、搜索客户
请求体:
{
"text": "消息文字",
"sender": "发送者",
"has_images": true,
"image_count": 2
}
"""
data = request.get_json(silent=True) or {}
text = data.get("text", "").strip()
sender = data.get("sender", "")
has_images = data.get("has_images", False)
image_count = data.get("image_count", 0)
if not text and not has_images:
return jsonify({"error": "消息内容为空"}), 400
user_content = f"发送者:{sender}\n"
if has_images:
user_content += f"(消息含 {image_count} 张图片)\n"
user_content += f"消息内容:{text or '(纯图片消息,无文字)'}"
logger.info(f"解析请求 | sender={sender} | text={text[:80]}")
try:
result = call_doubao(user_content)
except Exception as e:
logger.error(f"AI 解析失败: {e}")
return jsonify({"error": f"AI 解析失败: {e}"}), 500
# 清理空值
cleaned = {k: v for k, v in result.items() if v not in (None, "", "null")}
# 跟单员兜底
if "merchandiser_name" not in cleaned and sender:
cleaned["merchandiser_name"] = sender
# 校验枚举字段(不在缓存里就剔除)
enum_fields = {
"plate_type": "起版情况",
"urgency_level": "紧急程度",
"production_method": "做货方式",
"plate_method": "开版方式",
"area": "地区",
"width": "幅宽",
"fabric_source": "布源",
"drawing_rating": "画图评级",
"color_matching_rating": "调色评级",
"sample_rating": "套样评级",
"difficulty_rating": "难度评级",
}
for field, group in enum_fields.items():
if field in cleaned:
valid = dict_cache.get(group)
if valid and cleaned[field] not in valid:
logger.warning(f"AI 返回 {field}={cleaned[field]} 不在枚举中,剔除")
cleaned.pop(field, None)
# 处理布料:搜索 → 不存在则新增
if cleaned.get("fabric"):
match = find_or_create_fabric(cleaned["fabric"])
if match:
cleaned["fabric"] = match["name"] # 用规范化的名称
cleaned["fabric_id"] = match["id"]
cleaned["fabric_created"] = match.get("created", False)
# 处理客户:精确匹配优先;找不到时附带模糊建议供前端选择
if cleaned.get("customer_name"):
match = find_customer(cleaned["customer_name"])
if match and match.get("id"):
cleaned["customer_id"] = match["id"]
cleaned["customer_name"] = match["name"]
if match.get("area"):
cleaned["customer_area"] = match["area"]
if match.get("visible_employees"):
cleaned["customer_visible_employees"] = match["visible_employees"]
else:
cleaned["customer_not_found"] = True
if match and match.get("suggestions"):
cleaned["customer_suggestions"] = match["suggestions"]
# 处理关联流程:根据 process_name 查找 ID
if cleaned.get("process_name"):
pid = dict_cache.find_process_id(cleaned["process_name"])
if pid:
cleaned["process_id"] = pid
else:
logger.warning(f"未找到流程:{cleaned['process_name']}")
cleaned.pop("process_name", None)
logger.info(f"解析结果:{cleaned}")
return jsonify(cleaned)
# ─── 入口 ─────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
port = int(os.getenv("PORT", 5050))
debug = os.getenv("DEBUG", "false").lower() == "true"
# 后台线程初始化(不阻塞 Flask 启动)
threading.Thread(target=background_init, daemon=True).start()
logger.info(f"启动 plate-ai-service | port={port} | model={AI_MODEL}")
app.run(host="0.0.0.0", port=port, debug=debug, use_reloader=False)

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
flask==3.0.3
flask-cors==4.0.1
requests==2.32.3
python-dotenv==1.0.1

8
start.bat Normal file
View File

@@ -0,0 +1,8 @@
@echo off
cd /d %~dp0
echo 安装依赖...
pip install -r requirements.txt
echo.
echo 启动 plate-ai-service...
python app.py
pause