commit eaed17c36281cb31eb4f35c59d346ce6fd6430ed Author: zuowei1216 <12206728+zuowei1216@user.noreply.gitee.com> Date: Mon May 25 19:35:22 2026 +0800 feat: 开版订单 AI 解析服务 - 豆包 AI 解析企微消息 → 开版订单字段 - 字典枚举缓存(每日刷新,11 个分组 + 关联流程) - 布料精确匹配,找不到自动新增 - 客户精确匹配,未找到返回建议 - 企微消息代理(解决前端 CORS) - 支持 tags 写入(记录解析次数) - 单条消息图片获取接口 diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..0a2b4cc --- /dev/null +++ b/.env.example @@ -0,0 +1,15 @@ +# 豆包 AI 配置 +AI_API_KEY=your-doubao-api-key +AI_BASE_URL=https://ark.cn-beijing.volces.com/api/v3 +AI_MODEL=doubao-seed-2-0-mini-260428 + +# ERP 后端配置(用于查询字典/客户/布料) +ERP_BASE_URL=https://yuwenerp.yuwen.cloud +ERP_USERNAME=your-username +ERP_PASSWORD=your-password + +# 服务端口 +PORT=5050 + +# 调试模式(生产环境改为 false) +DEBUG=true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b730881 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +# 环境变量(含密钥,不提交) +.env +.env.local +.env.production +.env.test + +# Python +__pycache__/ +*.pyc +*.pyo +*.pyd +.Python +*.egg-info/ +dist/ +build/ +venv/ +.venv/ + +# 测试/调试文件 +bench.py +debug_api.py +test_*.py diff --git a/app.py b/app.py new file mode 100644 index 0000000..3f4a45d --- /dev/null +++ b/app.py @@ -0,0 +1,597 @@ +""" +开版订单 AI 解析服务 +Flask + 豆包 API(/api/v3/responses 格式) + +功能: +1. 代理企微消息接口(解决 CORS) +2. AI 解析消息内容 → 开版订单字段 +3. 字典枚举值缓存(启动时拉取,每日刷新) +4. 布料/客户智能匹配:搜索 → 不存在则自动新增 + +启动: + python app.py +""" + +import os +import json +import logging +import threading +import time +import requests +from datetime import datetime, timedelta +from flask import Flask, request, jsonify +from flask_cors import CORS +from dotenv import load_dotenv + +load_dotenv() + +# ─── 日志 ───────────────────────────────────────────────────────────────────── +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", +) +logger = logging.getLogger(__name__) + +# ─── Flask ──────────────────────────────────────────────────────────────────── +app = Flask(__name__) +CORS(app, origins="*") + +# ─── 豆包 AI 配置 ───────────────────────────────────────────────────────────── +AI_API_KEY = os.getenv("AI_API_KEY", "") +AI_BASE_URL = os.getenv("AI_BASE_URL", "https://ark.cn-beijing.volces.com/api/v3") +AI_MODEL = os.getenv("AI_MODEL", "doubao-seed-2-0-mini-260428") +DOUBAO_URL = f"{AI_BASE_URL}/responses" +DOUBAO_HEADERS = { + "Authorization": f"Bearer {AI_API_KEY}", + "Content-Type": "application/json", +} + +# ─── 企微消息服务配置 ───────────────────────────────────────────────────────── +WECHAT_API_BASE = "https://test.ruicaiyinhua.online" +WECHAT_AUTH = "hophopkk" +WORK_ORDER_ROOM = "wrW_mXZwAAaGwXBWvzbiAfEjy3ZM7ong" + +# ─── ERP 后端配置 ───────────────────────────────────────────────────────────── +ERP_BASE_URL = os.getenv("ERP_BASE_URL", "https://yuwenerp.yuwen.cloud") +ERP_USERNAME = os.getenv("ERP_USERNAME", "jimi") +ERP_PASSWORD = os.getenv("ERP_PASSWORD", "zuowei1216") + +# 需要全量缓存的小字典分组(用于 AI prompt 约束) +SMALL_DICT_GROUPS = [ + "起版情况", "紧急程度", "做货方式", "开版方式", + "地区", "画图评级", "调色评级", "套样评级", + "难度评级", "布源", "幅宽", +] + +# ─── ERP Token 管理 ────────────────────────────────────────────────────────── +class ErpAuth: + def __init__(self): + self.access_token = None + self.refresh_token = None + self.expires_at = 0 + self._lock = threading.Lock() + + def login(self): + with self._lock: + try: + r = requests.post( + f"{ERP_BASE_URL}/api/auth/login/", + json={"username": ERP_USERNAME, "password": ERP_PASSWORD}, + timeout=15, + ) + r.raise_for_status() + data = r.json() + self.access_token = data["access"] + self.refresh_token = data.get("refresh") + # JWT 默认 7 天,提前 1 小时刷新 + self.expires_at = time.time() + 6 * 24 * 3600 + logger.info("ERP 登录成功") + return self.access_token + except Exception as e: + logger.error(f"ERP 登录失败: {e}") + raise + + def get_token(self): + if not self.access_token or time.time() >= self.expires_at: + return self.login() + return self.access_token + + def headers(self): + return {"Authorization": f"Bearer {self.get_token()}"} + +erp_auth = ErpAuth() + + +def erp_get(path: str, params: dict = None): + """GET ERP 接口,自动处理 401""" + url = f"{ERP_BASE_URL}{path}" + r = requests.get(url, headers=erp_auth.headers(), params=params or {}, timeout=30) + if r.status_code == 401: + erp_auth.login() + r = requests.get(url, headers=erp_auth.headers(), params=params or {}, timeout=30) + r.raise_for_status() + return r.json() + + +def erp_post(path: str, data: dict): + """POST ERP 接口""" + url = f"{ERP_BASE_URL}{path}" + r = requests.post(url, headers={**erp_auth.headers(), "Content-Type": "application/json"}, + json=data, timeout=30) + if r.status_code == 401: + erp_auth.login() + r = requests.post(url, headers={**erp_auth.headers(), "Content-Type": "application/json"}, + json=data, timeout=30) + r.raise_for_status() + return r.json() + + +# ─── 字典缓存 ───────────────────────────────────────────────────────────────── +class DictCache: + """小字典分组的内存缓存,启动时加载,每日刷新""" + + def __init__(self): + self.cache = {} # {group: [name1, name2, ...]} + self.processes = [] # [{id, name, description}] 关联流程 + self.last_loaded = 0 + self.ttl = 24 * 3600 # 24 小时 + self._lock = threading.Lock() + + def load(self): + with self._lock: + logger.info("加载字典缓存...") + new_cache = {} + for group in SMALL_DICT_GROUPS: + try: + data = erp_get("/api/backend/quick-inputs/", {"group": group, "limit": 200}) + items = data.get("results", []) + new_cache[group] = [it["name"] for it in items if it.get("name")] + logger.info(f" [{group}] {len(new_cache[group])} 项") + except Exception as e: + logger.warning(f" [{group}] 加载失败: {e}") + new_cache[group] = self.cache.get(group, []) + self.cache = new_cache + + # 加载关联流程 + try: + data = erp_get("/api/v1/stateflow/processes/", {"limit": 100}) + self.processes = [ + {"id": p["id"], "name": p["name"], "description": p.get("description", "")} + for p in data.get("results", []) + ] + logger.info(f" [关联流程] {len(self.processes)} 项") + except Exception as e: + logger.warning(f" [关联流程] 加载失败: {e}") + + self.last_loaded = time.time() + logger.info("字典缓存加载完成") + + def get(self, group: str): + if time.time() - self.last_loaded > self.ttl: + try: + self.load() + except Exception: + pass + return self.cache.get(group, []) + + def get_processes(self): + return self.processes + + def find_process_id(self, name: str): + """根据名称查找流程 ID(精确 + 模糊)""" + if not name: + return None + for p in self.processes: + if p["name"] == name: + return p["id"] + # 模糊匹配 + for p in self.processes: + if name in p["name"] or p["name"] in name: + return p["id"] + return None + + def all(self): + return dict(self.cache) + + +dict_cache = DictCache() + + +def background_init(): + """后台线程:登录 ERP + 加载字典缓存(不阻塞 Flask 启动)""" + try: + erp_auth.login() + dict_cache.load() + except Exception as e: + logger.error(f"初始化失败: {e}") + + +# ─── 布料/客户 智能匹配 ─────────────────────────────────────────────────────── + +def find_or_create_fabric(name: str) -> dict: + """ + 精确匹配优先;找不到则新增(不做模糊匹配,避免错配) + 返回 {id, name, exists, created} + """ + if not name: + return None + name = name.strip() + try: + # 精确搜索(按 name 参数过滤) + data = erp_get("/api/backend/quick-inputs/", {"group": "布料", "name": name, "limit": 20}) + items = data.get("results", []) + # 只接受精确匹配 + exact = next((it for it in items if it.get("name") == name), None) + if exact: + return {"id": exact["id"], "name": exact["name"], "exists": True, "created": False} + # 找不到精确 → 新增 + new_item = erp_post("/api/backend/quick-inputs/", { + "group": "布料", + "name": name, + "value": name, + }) + logger.info(f"新增布料: {name} (id={new_item.get('id')})") + return {"id": new_item.get("id"), "name": name, "exists": False, "created": True} + except Exception as e: + logger.error(f"匹配/新增布料失败 [{name}]: {e}") + return None + + +def find_customer(name: str) -> dict: + """ + 搜索客户:精确匹配优先;模糊匹配只在精确未命中时使用,并标记 fuzzy + 返回 {id, name, exists, ...} + """ + if not name: + return None + name = name.strip() + try: + data = erp_get("/api/backend/customers/", {"name": name, "limit": 10}) + items = data.get("results", []) if isinstance(data, dict) else data + if not items: + return {"id": None, "name": name, "exists": False} + # 精确匹配 + exact = next((it for it in items if it.get("name") == name), None) + if exact: + return { + "id": exact["id"], "name": exact["name"], "exists": True, + "area": exact.get("area"), + "visible_employees": exact.get("visible_employees"), + } + # 没有精确匹配 → 返回未找到(让前端决定是否模糊建议) + return { + "id": None, "name": name, "exists": False, + "suggestions": [{"id": it["id"], "name": it["name"]} for it in items[:5]], + } + except Exception as e: + logger.error(f"搜索客户失败 [{name}]: {e}") + return None + + +# ─── AI 解析(带枚举约束)──────────────────────────────────────────────────── + +def build_system_prompt() -> str: + """根据缓存的字典动态构建 prompt,让 AI 严格选枚举""" + + def opts(group): + vals = dict_cache.get(group) + return "/".join(vals) if vals else "(待加载)" + + # 流程选项(带描述) + process_lines = [] + for p in dict_cache.get_processes(): + desc = f"({p['description']})" if p.get("description") else "" + process_lines.append(f" - {p['name']}{desc}") + process_text = "\n".join(process_lines) if process_lines else "(待加载)" + + return f"""你是一个印染厂开版订单信息提取助手。 + +用户会发给你一条企业微信群聊消息,消息内容是跟单员发给设计师的工单指令。 +你需要从消息中提取以下字段,返回 JSON 格式。 + +【字段说明(**必须严格使用以下枚举值**,不在列表中的返回 null)】 + +- customer_name: 客户名称(自由文本,消息开头通常是客户名,如"華鑫"、"林庆福"、"糖糖") +- plate_type: 起版情况,**只能选**:{opts('起版情况')} + 映射规则:图片起版/样衣起版/图片开版/样衣开版/找图开发/起版 → 首版 + 修改/加色/调色/调整/改版 → 修改单 + 套大货/套纸样/套样/重新套 → 复版/开版 +- urgency_level: 紧急程度,**只能选**:{opts('紧急程度')} + 映射规则:消息有"加急"→ 加急;有"特急"→ 特急;"加急已下单"等 → 紧急已下单;否则 → 正常 +- production_method: 做货方式,**只能选**:{opts('做货方式')} +- plate_method: 开版方式,**只能选**:{opts('开版方式')} + 映射规则:消息有"图片起版/图片开版"→ 图片开版 + 消息有"样衣起版/样板起版/样衣开版"→ 样衣开版 + 消息有"文件开版"→ 文件开版 + 其他原始词如"开发"、"开板"→ 开发 +- area: 客户区域,**只能选**:{opts('地区')}(消息里很少提及,多数返回 null) +- fabric: 面料名称(自由文本,完整保留克重和布料类型,如"140g四面弹"、"120克本白四面弹"、"180克牛奶丝"、"泡泡皱") +- width: 面料幅宽,**只能选**:{opts('幅宽')}(如"幅宽151"匹配"1.51","120"匹配"120",找不到则 null) +- fabric_source: 面料来源,**只能选**:{opts('布源')} +- drawing_rating: 画图评级,**只能选**:{opts('画图评级')}(消息里几乎不会有,多返 null) +- color_matching_rating: 调色评级,**只能选**:{opts('调色评级')}(多返 null) +- sample_rating: 套样评级,**只能选**:{opts('套样评级')}(多返 null) +- difficulty_rating: 难度评级,**只能选**:{opts('难度评级')}(多返 null) +- process_name: 关联流程,**只能选**以下名称之一(直接返回流程名字符串,不要带括号注释): +{process_text} + 选择规则:根据 plate_type 和操作内容综合判断 + - 套大货/套纸样 → "开版(套米样)" 或 "开版(套大货)" + - 加色/调色 → "开版(配色)" 或 "开版(绘图调色)" + - P图/抠图 → "开版(P图)" + - 描图/找图开发 → "开版(描图开版)" 或 "开版(绘图调色套样)" + - 不确定时返回 null(让用户手动选) +- plate_notes: 打版备注(自由文本,把消息中的具体要求、注意事项原文保留:克重、颜色要求、特殊工艺等) +- style_name: 款号名称(如"NSY1090"、"G67单600-20款"、"6576改"等款号,没有则 null) +- original_design_code: 原订单ID(如果消息提到要"套大货/套纸样/修改"等,且消息中有 5-7 位的纯数字编号,那就是原订单号;否则 null) +- merchandiser_name: 跟单员(直接返回 sender 字段的原值) + +【重要规则】 +1. 枚举字段(plate_type、urgency_level、production_method、plate_method、area、width、fabric_source、各评级、process_name)**必须**从给定选项里选,不能自创;判断不出就返回 null +2. 自由文本字段(customer_name、fabric、plate_notes、style_name、original_design_code)保持原文 +3. 一律返回 JSON 对象,不要任何其它文字""" + + +def call_doubao(user_text: str) -> dict: + """调用豆包 /v3/responses,返回解析后的 dict""" + payload = { + "model": AI_MODEL, + "input": [ + {"role": "system", "content": [{"type": "input_text", "text": build_system_prompt()}]}, + {"role": "user", "content": [{"type": "input_text", "text": user_text}]}, + ], + "temperature": 0.1, + "max_output_tokens": 4096, + } + r = requests.post(DOUBAO_URL, headers=DOUBAO_HEADERS, json=payload, timeout=60) + r.raise_for_status() + data = r.json() + + raw_text = "" + for item in data.get("output", []): + if item.get("type") == "message": + for c in item.get("content", []): + if c.get("type") == "output_text": + raw_text = c.get("text", "").strip() + break + break + if not raw_text: + raise ValueError("豆包返回内容为空") + + start = raw_text.find("{") + end = raw_text.rfind("}") + 1 + if start == -1 or end == 0: + raise ValueError(f"响应中未找到 JSON:{raw_text}") + return json.loads(raw_text[start:end]) + + +# ─── 路由 ───────────────────────────────────────────────────────────────────── + +@app.route("/health", methods=["GET"]) +def health(): + return jsonify({ + "status": "ok", + "model": AI_MODEL, + "dict_cache_loaded": bool(dict_cache.cache), + "dict_groups": list(dict_cache.cache.keys()), + "erp_logged_in": bool(erp_auth.access_token), + }) + + +@app.route("/api/v1/dict-cache/", methods=["GET"]) +def get_dict_cache(): + """查看当前缓存(调试用)""" + return jsonify(dict_cache.all()) + + +@app.route("/api/v1/dict-cache/refresh/", methods=["POST"]) +def refresh_dict_cache(): + """手动刷新字典缓存""" + try: + dict_cache.load() + return jsonify({"status": "ok", "groups": list(dict_cache.cache.keys())}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# ─── 企微消息代理 ───────────────────────────────────────────────────────────── + +@app.route("/api/v1/wechat/messages/", methods=["GET"]) +def proxy_wechat_messages(): + limit = request.args.get("limit", "30") + offset = request.args.get("offset", "0") + room_id = request.args.get("room_id", WORK_ORDER_ROOM) + with_image = request.args.get("with_image", "false") + + try: + r = requests.get( + f"{WECHAT_API_BASE}/messages", + headers={"Authorization": WECHAT_AUTH}, + params={"room_id": room_id, "limit": limit, "offset": offset, "with_image": with_image}, + timeout=60, + ) + r.raise_for_status() + return jsonify(r.json()) + except requests.Timeout: + return jsonify({"error": "企微接口超时"}), 504 + except Exception as e: + logger.error(f"代理请求失败: {e}") + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/v1/wechat/messages//with-image", methods=["GET"]) +def proxy_get_message_with_image(msg_id): + """ + 代理 GET /messages/:msg_id?with_image=true + 企微服务直接支持按 msg_id 查单条消息,速度极快(< 0.1s) + """ + try: + r = requests.get( + f"{WECHAT_API_BASE}/messages/{msg_id}", + headers={"Authorization": WECHAT_AUTH}, + params={"with_image": "true"}, + timeout=30, + ) + r.raise_for_status() + return jsonify(r.json()) + except requests.Timeout: + return jsonify({"error": "企微接口超时"}), 504 + except requests.HTTPError as e: + return jsonify({"error": f"企微接口错误 {e.response.status_code}"}), e.response.status_code + except Exception as e: + logger.error(f"获取消息图片失败 msg_id={msg_id}: {e}") + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/v1/wechat/messages//tags", methods=["PUT"]) +def proxy_update_tags(msg_id): + try: + body = request.get_data() + r = requests.put( + f"{WECHAT_API_BASE}/messages/{msg_id}/tags", + headers={"Authorization": WECHAT_AUTH, "Content-Type": "application/json"}, + data=body, + timeout=10, + ) + return jsonify(r.json()), r.status_code + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/v1/wechat/messages//parse-record/", methods=["POST"]) +def record_parse(msg_id): + try: + body = request.get_json(silent=True) or {} + old_tags = body.get("old_tags") or {} + new_count = int(old_tags.get("parse_count", 0)) + 1 + new_tags = { + **old_tags, + "parse_count": new_count, + "last_parsed_at": datetime.utcnow().isoformat() + "Z", + "last_parsed_by": body.get("user", "anonymous"), + } + r = requests.put( + f"{WECHAT_API_BASE}/messages/{msg_id}/tags", + headers={"Authorization": WECHAT_AUTH, "Content-Type": "application/json"}, + json=new_tags, + timeout=10, + ) + r.raise_for_status() + return jsonify(r.json()) + except Exception as e: + logger.error(f"记录解析失败 msg_id={msg_id}: {e}") + return jsonify({"error": str(e)}), 500 + + +# ─── 主接口:解析开版订单 ──────────────────────────────────────────────────── + +@app.route("/api/v1/ai/parse-plate-order/", methods=["POST"]) +def parse_plate_order(): + """ + 解析企业微信消息 → 开版订单字段 + 自动匹配/新增布料、搜索客户 + + 请求体: + { + "text": "消息文字", + "sender": "发送者", + "has_images": true, + "image_count": 2 + } + """ + data = request.get_json(silent=True) or {} + text = data.get("text", "").strip() + sender = data.get("sender", "") + has_images = data.get("has_images", False) + image_count = data.get("image_count", 0) + + if not text and not has_images: + return jsonify({"error": "消息内容为空"}), 400 + + user_content = f"发送者:{sender}\n" + if has_images: + user_content += f"(消息含 {image_count} 张图片)\n" + user_content += f"消息内容:{text or '(纯图片消息,无文字)'}" + + logger.info(f"解析请求 | sender={sender} | text={text[:80]}") + + try: + result = call_doubao(user_content) + except Exception as e: + logger.error(f"AI 解析失败: {e}") + return jsonify({"error": f"AI 解析失败: {e}"}), 500 + + # 清理空值 + cleaned = {k: v for k, v in result.items() if v not in (None, "", "null")} + + # 跟单员兜底 + if "merchandiser_name" not in cleaned and sender: + cleaned["merchandiser_name"] = sender + + # 校验枚举字段(不在缓存里就剔除) + enum_fields = { + "plate_type": "起版情况", + "urgency_level": "紧急程度", + "production_method": "做货方式", + "plate_method": "开版方式", + "area": "地区", + "width": "幅宽", + "fabric_source": "布源", + "drawing_rating": "画图评级", + "color_matching_rating": "调色评级", + "sample_rating": "套样评级", + "difficulty_rating": "难度评级", + } + for field, group in enum_fields.items(): + if field in cleaned: + valid = dict_cache.get(group) + if valid and cleaned[field] not in valid: + logger.warning(f"AI 返回 {field}={cleaned[field]} 不在枚举中,剔除") + cleaned.pop(field, None) + + # 处理布料:搜索 → 不存在则新增 + if cleaned.get("fabric"): + match = find_or_create_fabric(cleaned["fabric"]) + if match: + cleaned["fabric"] = match["name"] # 用规范化的名称 + cleaned["fabric_id"] = match["id"] + cleaned["fabric_created"] = match.get("created", False) + + # 处理客户:精确匹配优先;找不到时附带模糊建议供前端选择 + if cleaned.get("customer_name"): + match = find_customer(cleaned["customer_name"]) + if match and match.get("id"): + cleaned["customer_id"] = match["id"] + cleaned["customer_name"] = match["name"] + if match.get("area"): + cleaned["customer_area"] = match["area"] + if match.get("visible_employees"): + cleaned["customer_visible_employees"] = match["visible_employees"] + else: + cleaned["customer_not_found"] = True + if match and match.get("suggestions"): + cleaned["customer_suggestions"] = match["suggestions"] + + # 处理关联流程:根据 process_name 查找 ID + if cleaned.get("process_name"): + pid = dict_cache.find_process_id(cleaned["process_name"]) + if pid: + cleaned["process_id"] = pid + else: + logger.warning(f"未找到流程:{cleaned['process_name']}") + cleaned.pop("process_name", None) + + logger.info(f"解析结果:{cleaned}") + return jsonify(cleaned) + + +# ─── 入口 ───────────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + port = int(os.getenv("PORT", 5050)) + debug = os.getenv("DEBUG", "false").lower() == "true" + + # 后台线程初始化(不阻塞 Flask 启动) + threading.Thread(target=background_init, daemon=True).start() + + logger.info(f"启动 plate-ai-service | port={port} | model={AI_MODEL}") + app.run(host="0.0.0.0", port=port, debug=debug, use_reloader=False) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3fe7025 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +flask==3.0.3 +flask-cors==4.0.1 +requests==2.32.3 +python-dotenv==1.0.1 diff --git a/start.bat b/start.bat new file mode 100644 index 0000000..12e9782 --- /dev/null +++ b/start.bat @@ -0,0 +1,8 @@ +@echo off +cd /d %~dp0 +echo 安装依赖... +pip install -r requirements.txt +echo. +echo 启动 plate-ai-service... +python app.py +pause