From 484f1f6be47be85172e9c0371a869832ab22d41d Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Tue, 3 Mar 2026 14:10:00 +0800 Subject: [PATCH] feat: integrate quality-gated draw flow with online dispatch transfer --- qingjian_cs/app/auto_draw.py | 34 +++++++++ qingjian_cs/app/client.py | 21 +++++- qingjian_cs/app/image_quote_analyzer.py | 79 +++++++++++++++++++++ qingjian_cs/app/transfer_flow.py | 91 +++++++++++++++++++++++++ 4 files changed, 223 insertions(+), 2 deletions(-) create mode 100644 qingjian_cs/app/transfer_flow.py diff --git a/qingjian_cs/app/auto_draw.py b/qingjian_cs/app/auto_draw.py index fca0ab8..e446fbc 100644 --- a/qingjian_cs/app/auto_draw.py +++ b/qingjian_cs/app/auto_draw.py @@ -28,6 +28,7 @@ async def auto_draw_preview( logger.info("[作图] 开始 customer=%s image=%s", customer_id, image_url) from services.service_gemini_stable import GeminiExtractStableService # type: ignore from services.service_tuhui_upload import upload_to_tuhui # type: ignore + from .image_quote_analyzer import analyze_image_for_quote, evaluate_generated_image except Exception as e: logger.error("[作图] 依赖加载失败: %s", e) return {"ok": False, "error": f"依赖加载失败:{e}"} @@ -37,6 +38,25 @@ async def auto_draw_preview( output_path = os.path.join(tempfile.gettempdir(), f"qjcs_out_{uuid.uuid4().hex}.jpg") try: + logger.info("[作图] 识图评估中") + analysis = await analyze_image_for_quote( + image_url=image_url, + customer_text=requirement, + goods_name="", + ) + if analysis.get("ok"): + business_related = str(analysis.get("business_related", "yes")).lower() + can_do = str(analysis.get("can_do", "partial")).lower() + if business_related == "no": + logger.info("[作图] 终止: 非印花/印刷相关") + return {"ok": False, "error": "非印花/印刷相关,退出"} + if can_do == "no": + logger.info("[作图] 终止: 识图判定不可做") + return {"ok": False, "error": "识图判定不可做,退出"} + if str(analysis.get("gemini_prompt", "")).strip(): + prompt = str(analysis.get("gemini_prompt", "")).strip() + logger.info("[作图] 使用识图提示词: %s", prompt[:80]) + logger.info("[作图] 下载原图中") headers = { "User-Agent": ( @@ -71,6 +91,20 @@ async def auto_draw_preview( return {"ok": False, "error": "生成失败:未产出文件"} logger.info("[作图] Gemini 生成完成") + logger.info("[作图] 结果评估中") + review = await evaluate_generated_image( + original_image_url=image_url, + generated_image_path=output_path, + requirement=requirement, + ) + if not review.get("pass", False): + logger.info("[作图] 终止: 评估不通过 reason=%s", review.get("reason", "")) + return { + "ok": False, + "need_transfer": True, + "error": f"评估不通过:{review.get('reason', 'unknown')}", + } + logger.info("[作图] 上传图绘中") ok_upload, link, _ = await upload_to_tuhui( output_path, diff --git a/qingjian_cs/app/client.py b/qingjian_cs/app/client.py index 8e477e3..3240818 100644 --- a/qingjian_cs/app/client.py +++ b/qingjian_cs/app/client.py @@ -25,6 +25,7 @@ from .orchestrator import Orchestrator from .rules import extract_image_urls, prefilter_message from .runtime_switch import is_listen_only from .store import ConversationStore +from .transfer_flow import transfer_to_human_flow class QingjianClient: @@ -383,8 +384,11 @@ class QingjianClient: text = (decision.transfer_msg or "").strip() if self._is_invalid_ai_reply(text): text = self._fallback_reply("transfer") - await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version) - self.last_reply_key[key] = text + ok_transfer, reason = await transfer_to_human_flow(self, data, transfer_msg=text, trace_id=trace_id) + if not ok_transfer: + self.logger.error("[转人工] 指令失败: %s", reason) + await self.send_reply(data, text, trace_id=trace_id, turn_version=turn_version) + self.last_reply_key[key] = text await post_tianwang_callback("message_processed", data, extra={"trace_id": trace_id, "route": route, "action": "transfer", "reply": text}) return @@ -433,6 +437,19 @@ class QingjianClient: customer_id=context["customer_id"], error=str(draw_res.get("error", "unknown")), ) + if bool(draw_res.get("need_transfer")): + tmsg = "这个我转人工给你看下" + ok_transfer, reason = await transfer_to_human_flow(self, data, transfer_msg=tmsg, trace_id=trace_id) + if not ok_transfer: + self.logger.error("[转人工] 指令失败: %s", reason) + await self.send_reply(data, tmsg, trace_id=trace_id, turn_version=turn_version) + self.last_reply_key[key] = tmsg + await post_tianwang_callback( + "message_processed", + data, + extra={"trace_id": trace_id, "route": route, "action": "transfer", "reply": tmsg}, + ) + return self.logger.error("[作图] 失败 customer=%s error=%s", context["customer_id"], draw_res.get("error", "unknown")) elif AUTO_DRAW_ENABLED and (not draw_allowed): self.logger.info("[作图] 已跳过: 识图结果不可做 can_do=%s customer=%s", can_do, context["customer_id"]) diff --git a/qingjian_cs/app/image_quote_analyzer.py b/qingjian_cs/app/image_quote_analyzer.py index 3031cd0..4ee7971 100644 --- a/qingjian_cs/app/image_quote_analyzer.py +++ b/qingjian_cs/app/image_quote_analyzer.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import base64 import logging import os import re @@ -191,10 +192,20 @@ def _extract_line(text: str, key: str) -> str: def _parse_result(text: str) -> dict[str, Any]: intent = _extract_line(text, "诉求类型") + if not intent: + intent = _extract_line(text, "类型") can_do = _extract_line(text, "可做").lower() + business_related = _extract_line(text, "业务相关").lower() complexity = _extract_line(text, "复杂度").lower() price_text = _extract_line(text, "建议报价") + if not price_text: + price_text = _extract_line(text, "建议价格") + gemini_prompt = _extract_line(text, "提示词") + aspect_ratio = _extract_line(text, "比例") + risk = _extract_line(text, "风险").lower() note = _extract_line(text, "说明") + if not note: + note = _extract_line(text, "备注") if intent not in {"找图", "高清修复", "其他"}: intent = "其他" @@ -202,6 +213,12 @@ def _parse_result(text: str) -> dict[str, Any]: can_do = "partial" if complexity not in {"simple", "normal", "complex", "hard"}: complexity = "normal" + if business_related not in {"yes", "no"}: + business_related = "yes" + if aspect_ratio not in {"1:1", "9:16", "16:9", "3:4", "4:3", "3:2", "2:3", "5:4", "4:5"}: + aspect_ratio = "1:1" + if risk not in {"none", "low", "high"}: + risk = "none" price = 0 m = re.search(r"\d+", price_text or "") @@ -211,8 +228,12 @@ def _parse_result(text: str) -> dict[str, Any]: return { "intent_type": intent, "can_do": can_do, + "business_related": business_related, "complexity": complexity, "price_suggest": price, + "gemini_prompt": gemini_prompt or "", + "aspect_ratio": aspect_ratio, + "risk": risk, "note": note or "已看图", } @@ -263,3 +284,61 @@ async def analyze_image_for_quote(image_url: str, customer_text: str = "", goods logger.error("[识图报价] 调用失败: %s", e) return {"ok": False, "error": str(e)} + +def _image_file_to_data_url(path: str) -> str: + with open(path, "rb") as f: + b64 = base64.b64encode(f.read()).decode("utf-8") + return f"data:image/jpeg;base64,{b64}" + + +async def evaluate_generated_image( + original_image_url: str, + generated_image_path: str, + requirement: str = "", +) -> dict[str, Any]: + """ + 生成后质量评估:不通过则建议直接退出,不发送给客户。 + """ + if not OPENAI_API_KEY: + return {"ok": False, "pass": True, "reason": "no_api_key_skip"} + if not os.path.exists(generated_image_path): + return {"ok": False, "pass": False, "reason": "generated_file_missing"} + client = AsyncOpenAI(base_url=OPENAI_BASE_URL, api_key=OPENAI_API_KEY) + review_prompt = ( + "你是印花/印刷交付质检。请对比原图和生成图,判断是否可发给客户。\n" + "重点看:主体是否跑偏、细节是否糊、是否明显AI味、是否不适合印刷。\n" + "只输出两行:\n" + "评估: \n" + "原因: <20字内>\n" + f"客户要求: {requirement or '无'}" + ) + try: + gen_url = _image_file_to_data_url(generated_image_path) + resp = await asyncio.wait_for( + client.chat.completions.create( + model=VISION_MODEL, + temperature=0.1, + messages=[ + {"role": "system", "content": "你是严格的图像交付质检员。"}, + { + "role": "user", + "content": [ + {"type": "text", "text": review_prompt}, + {"type": "image_url", "image_url": {"url": original_image_url}}, + {"type": "image_url", "image_url": {"url": gen_url}}, + ], + }, + ], + ), + timeout=30, + ) + text = str((resp.choices[0].message.content if resp and resp.choices else "") or "").strip() + flag = _extract_line(text, "评估").lower() + reason = _extract_line(text, "原因") or _clip(text, 40) + passed = flag == "pass" + logger.info("[作图评估] result=%s reason=%s", "pass" if passed else "fail", reason) + return {"ok": True, "pass": passed, "reason": reason, "raw": text} + except Exception as e: + logger.error("[作图评估] 调用失败: %s", e) + return {"ok": False, "pass": False, "reason": str(e)} + diff --git a/qingjian_cs/app/transfer_flow.py b/qingjian_cs/app/transfer_flow.py new file mode 100644 index 0000000..e2c278f --- /dev/null +++ b/qingjian_cs/app/transfer_flow.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +import os +import asyncio +from typing import Any +import requests + +from .observability import activity_event + +DEFAULT_TRANSFER_GROUP = os.getenv("TRANSFER_GROUP_ID", "20252916034").strip() or "20252916034" +DEFAULT_TRANSFER_ASSIGNEE = os.getenv("TRANSFER_ASSIGNED_TO", "").strip() +DISPATCH_BASE_URL = os.getenv("DISPATCH_BASE_URL", "http://1.12.50.92:8006").strip().rstrip("/") +DISPATCH_API_KEY = os.getenv("DISPATCH_API_KEY", "tuhui_dispatch_key_2026").strip() +DISPATCH_TIMEOUT_SECONDS = float(os.getenv("DISPATCH_TIMEOUT_SECONDS", "5")) + + +def _dispatch_assign_once() -> dict[str, Any]: + if not DISPATCH_BASE_URL or not DISPATCH_API_KEY: + return {"success": False, "reason": "dispatch_config_missing"} + try: + resp = requests.get( + f"{DISPATCH_BASE_URL}/assign", + headers={"X-API-Key": DISPATCH_API_KEY}, + timeout=DISPATCH_TIMEOUT_SECONDS, + ) + if resp.status_code != 200: + return {"success": False, "reason": f"http_{resp.status_code}"} + data = resp.json() if resp.text else {} + return { + "success": bool(data.get("success", False)), + "task_id": str(data.get("task_id", "") or ""), + "assigned_to": str(data.get("assigned_to", "") or ""), + "online_count": int(data.get("online_count", 0) or 0), + "raw": data, + } + except Exception as e: + return {"success": False, "reason": str(e)} + + +async def transfer_to_human_flow(client, data: dict[str, Any], transfer_msg: str = "", trace_id: str = "-") -> tuple[bool, str]: + """ + 真实转人工:发送千牛转接命令,不走AI文本压缩逻辑。 + """ + if not client.websocket: + return False, "websocket_not_connected" + + acc_id = str(data.get("acc_id", "") or "") + from_id = str(data.get("from_id", "") or "") + from_name = str(data.get("from_name", from_id) or from_id) + acc_type = str(data.get("acc_type", "AliWorkbench") or "AliWorkbench") + + dispatch_res = await asyncio.to_thread(_dispatch_assign_once) + assignee = str(dispatch_res.get("assigned_to", "") or "").strip() or DEFAULT_TRANSFER_ASSIGNEE + activity_event( + client.logger, + "dispatch_assign", + trace_id=trace_id, + customer_id=from_id or "-", + success=bool(dispatch_res.get("success")), + assigned_to=assignee, + online_count=int(dispatch_res.get("online_count", 0) or 0), + reason=str(dispatch_res.get("reason", "") or ""), + ) + if assignee: + cmd = f"正在为你转接人工|[转移会话],{assignee},无原因" + target = assignee + else: + cmd = f"话术|[转移会话],分组{DEFAULT_TRANSFER_GROUP},无原因" + target = f"分组{DEFAULT_TRANSFER_GROUP}" + + msg = { + "msg_id": "", + "acc_id": acc_id, + "msg": cmd, + "from_id": from_id, + "from_name": from_name, + "cy_id": from_id, + "acc_type": acc_type, + "msg_type": 0, + "cy_name": from_name, + } + await client.send_message(msg) + activity_event( + client.logger, + "transfer_command_sent", + trace_id=trace_id, + customer_id=from_id or "-", + target=target, + transfer_msg=(transfer_msg or ""), + ) + return True, target