From fc9a7a13b2bdb5f8d588f5ac4dcdb92073af9d09 Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Sat, 28 Feb 2026 22:38:24 +0800 Subject: [PATCH] refactor: split quote pipeline stages and add trust case-script guidance --- core/pydantic_ai_agent.py | 273 ++++++++++++++++++++++---------------- 1 file changed, 160 insertions(+), 113 deletions(-) diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index d938437..bc9317b 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -63,6 +63,7 @@ async def _notify_wechat_overdue(): # ========== 转接常量 ========== TRANSFER_MESSAGE = "话术|[转移会话],分组20252916034,无原因" +CASE_LIBRARY_LINK = "https://www.yuque.com/zuowei-dfvpq/kge0in/mynala0g35b8cec5" # ========== 数据模型 ========== @@ -1092,7 +1093,12 @@ class CustomerServiceAgent: 规则: - 收到图片或历史有图片依据时尽量结合复杂度给出单价,价格为5的整数倍 - 没有图片时引导发图,不给价格区间 -- 报价后紧跟一句推动成交,话术自然不重复 +- 报价后紧跟一句推动成交,话术自然不重复,避免机械重复“最低了” +- 客户说“有点贵/优惠点/两张优惠点”时,优先给打包价或数量优惠,不要只会拒绝 +- 客户说“不放心/先看效果”时,先建立信任:可发案例链接 {CASE_LIBRARY_LINK},并说明不满意可退 +- 可直接复用这条信任话术(按需微调,不要每次完全一样): + 小妹整理了一些案例图,亲点这个链接就能看到啦({CASE_LIBRARY_LINK})。 + 有什么想要的效果随时告诉我哈,我这边都可以按您的要求来做哦~/:065 效果不好不满意,我们这边包退的哦。 - 最低价不低于{floor}元,客户出价低于底线时礼貌拒绝(不好意思) - 输出不超过2句话""" @@ -2125,18 +2131,11 @@ class CustomerServiceAgent: lines.append("你定一个方案,我这边马上安排。") return "\n".join(lines) - async def _quote_pending_images(self, state: ConversationState, message: CustomerMessage) -> Dict[str, Any]: - """ - 批量识别待处理图片并统一处理: - - find_image 意图且可自动处理:直接 Gemini 处理 + 上传图绘 + 回链接 - - 高风险/不可做:转人工 - - 其他:统一报价 - """ - from image.image_analyzer import image_analyzer - + def _prepare_batch_intake(self, state: ConversationState) -> Dict[str, Any]: + """Stage 1: 收集阶段,标准化输入并做上限约束。""" urls = list(state.pending_image_urls) if not urls: - return {"reply": "你先把图片发我,我看完再给你统一报价。", "need_transfer": False} + return {"ok": False, "reply": "你先把图片发我,我看完再给你统一报价。", "need_transfer": False} try: from config.config import BATCH_MAX_IMAGES, BATCH_ANALYZE_CONCURRENCY max_images = max(1, int(BATCH_MAX_IMAGES)) @@ -2146,12 +2145,23 @@ class CustomerServiceAgent: analyze_concurrency = 3 if len(urls) > max_images: return { + "ok": False, "reply": f"这次图片有点多({len(urls)}张),我先按前{max_images}张处理报价,剩下的下一批继续发我。", "need_transfer": False, } - urls = urls[:max_images] + return { + "ok": True, + "urls": urls[:max_images], + "requirements": list(state.pending_requirements or []), + "analyze_concurrency": analyze_concurrency, + } + + async def _run_batch_feasibility(self, urls: List[str], concurrency: int) -> List[Tuple[str, Dict[str, Any]]]: + """Stage 2: 可做性分析(逐图)。""" + from image.image_analyzer import image_analyzer + + sem = asyncio.Semaphore(max(1, concurrency)) - sem = asyncio.Semaphore(analyze_concurrency) async def _analyze_one(url: str): async with sem: try: @@ -2167,9 +2177,10 @@ class CustomerServiceAgent: } return url, r - results = list(await asyncio.gather(*[_analyze_one(u) for u in urls])) + return list(await asyncio.gather(*[_analyze_one(u) for u in urls])) + + async def _sync_batch_analysis_to_workflow(self, results: List[Tuple[str, Dict[str, Any]]], message: CustomerMessage) -> None: for url, r in results: - # 与单图流程一致:识别后写入 workflow 任务 try: from core.workflow import workflow await workflow.image_analysis_result( @@ -2188,12 +2199,26 @@ class CustomerServiceAgent: except Exception as e: print(f"[Agent] Workflow 批量任务创建失败: {e}") - total_min = sum(int(r.get("price_min", 15) or 15) for _, r in results) - total_max = sum(int(r.get("price_max", 25) or 25) for _, r in results) - total_suggest = sum(int(r.get("price_suggest", 20) or 20) for _, r in results) - req_fee = self._calc_requirement_surcharge(state.pending_requirements) + def _assess_batch_risk(self, results: List[Tuple[str, Dict[str, Any]]]) -> Dict[str, List[str]]: + """Stage 2.5: 分离可做和风险图。""" + unsafe: List[str] = [] + dense_text_reject: List[str] = [] + for i, (_, r) in enumerate(results, 1): + if r.get("feasibility") == "no" or r.get("risk") == "high": + unsafe.append(f"图{i}") + note = str(r.get("note", "") or "") + if "文字内容过于密集" in note or "密集文字" in note: + dense_text_reject.append(f"图{i}") + return {"unsafe": unsafe, "dense_text_reject": dense_text_reject} - # 打包优惠:2 张减 5,3 张及以上按 9 折(四舍五入到 5 元) + def _build_batch_pricing_plan( + self, + results: List[Tuple[str, Dict[str, Any]]], + requirements: List[str], + ) -> Dict[str, Any]: + """Stage 3: 报价计算(图片成本 + 需求加价 + 打包价)。""" + total_suggest = sum(int(r.get("price_suggest", 20) or 20) for _, r in results) + req_fee = self._calc_requirement_surcharge(requirements) if len(results) == 2: bundle_price = max(10, total_suggest - 5) elif len(results) >= 3: @@ -2202,97 +2227,130 @@ class CustomerServiceAgent: bundle_price = total_suggest bundle_price += int(req_fee.get("extra", 0) or 0) bundle_price = round(bundle_price / 5) * 5 + return { + "total_suggest": total_suggest, + "req_fee": req_fee, + "bundle_price": bundle_price, + } - # 先分流:高风险/不可做 -> 转人工 - unsafe = [] - dense_text_reject = [] - for i, (_, r) in enumerate(results, 1): - if r.get("feasibility") == "no" or r.get("risk") == "high": - unsafe.append(f"图{i}") - note = str(r.get("note", "") or "") - if "文字内容过于密集" in note or "密集文字" in note: - dense_text_reject.append(f"图{i}") + async def _try_batch_auto_process( + self, + results: List[Tuple[str, Dict[str, Any]]], + message: CustomerMessage, + req_fee: Dict[str, Any], + ) -> Dict[str, Any]: + """Stage 4-A: 自动处理+图绘链接。失败时回退到需求澄清。""" + links = [] + try: + from image.image_processor import image_processor + from utils.image_queue import run_with_queue + for idx, (url, r) in enumerate(results, 1): + req_parts = [f"complexity:{r.get('complexity', 'normal')}"] + if r.get("gemini_prompt"): + req_parts.append(f"prompt:{r.get('gemini_prompt')}") + if r.get("aspect_ratio"): + req_parts.append(f"ratio:{r.get('aspect_ratio')}") + if r.get("perspective") and r.get("perspective") != "no": + req_parts.append(f"perspective:{r.get('perspective')}") + if r.get("proc_type"): + req_parts.append(f"proc_type:{r.get('proc_type')}") + if r.get("subject"): + req_parts.append(f"subject:{r.get('subject')}") + if r.get("quality"): + req_parts.append(f"quality:{r.get('quality')}") + + process_res = await run_with_queue(image_processor.process_image( + url, + "enhance", + requirements="|".join(req_parts), + gemini_prompt=r.get("gemini_prompt", ""), + aspect_ratio=r.get("aspect_ratio", "1:1"), + perspective=r.get("perspective", "no"), + proc_type=r.get("proc_type", ""), + subject=r.get("subject", ""), + quality=r.get("quality", ""), + )) + if not process_res.get("success"): + raise RuntimeError(process_res.get("message", "图片处理失败")) + + ok, link, _ = await upload_to_tuhui( + process_res["result_path"], + title=f"客户{message.from_id[-4:]}-图片{idx}", + description="AI自动处理结果", + price=max(10, int(r.get("price_suggest", 20) or 20) + int(req_fee.get("extra", 0) or 0) // max(1, len(results))), + ) + if not ok: + raise RuntimeError(str(link)) + links.append(link) + except Exception as e: + print(f"[Agent] 找图自动处理失败,回退需求澄清: {e}") + return { + "reply": "这种可以做类似款。你先说下具体需求:要几张、是否改字、尺寸比例、交付格式(单图/打包链接),我按需求给你直接做。", + "need_transfer": False, + } + lines = ["找到了,链接如下:"] + for i, link in enumerate(links, 1): + lines.append(f"链接{i}:{link}") + return {"reply": "\n".join(lines), "need_transfer": False} + + def _finalize_batch_state(self, state: ConversationState, customer_id: str, final_price: int = 0): + if final_price > 0: + state.last_price = final_price + try: + from db.customer_db import db + db.update_last_price(customer_id, final_price) + except Exception: + pass + state.pending_image_urls.clear() + state.pending_requirements.clear() + self._sync_pending_quote_state(customer_id, state) + + async def _quote_pending_images(self, state: ConversationState, message: CustomerMessage) -> Dict[str, Any]: + """ + 统一报价主流程(分层): + 1) Intake 收集 + 2) Feasibility 可做性 + 3) Pricing 报价 + 4) Router 自动处理/报价/转人工 + """ + intake = self._prepare_batch_intake(state) + if not intake.get("ok", False): + return {"reply": intake.get("reply", ""), "need_transfer": bool(intake.get("need_transfer", False))} + + urls = intake["urls"] + requirements = intake["requirements"] + analyze_concurrency = int(intake["analyze_concurrency"]) + + results = await self._run_batch_feasibility(urls=urls, concurrency=analyze_concurrency) + await self._sync_batch_analysis_to_workflow(results=results, message=message) + + risk = self._assess_batch_risk(results) + unsafe = risk["unsafe"] + dense_text_reject = risk["dense_text_reject"] if unsafe: - state.pending_image_urls.clear() - state.pending_requirements.clear() - self._sync_pending_quote_state(message.from_id, state) + self._finalize_batch_state(state, message.from_id, final_price=0) if dense_text_reject and len(dense_text_reject) == len(unsafe): - return { - "reply": self._build_reject_message("文字密集类图片暂不接单"), - "need_transfer": False, - } + return {"reply": self._build_reject_message("文字密集类图片暂不接单"), "need_transfer": False} return { "reply": f"这批里{'、'.join(unsafe)}处理风险较高,我先帮你转人工设计师跟进会更稳妥。", "need_transfer": True, } - # 查找图片意图:优先直接自动处理并回图绘链接 - intent_text = (message.msg or "") + " " + " ".join(state.pending_requirements[-5:]) + pricing = self._build_batch_pricing_plan(results=results, requirements=requirements) + total_suggest = int(pricing["total_suggest"]) + bundle_price = int(pricing["bundle_price"]) + req_fee = pricing["req_fee"] + + intent_text = (message.msg or "") + " " + " ".join(requirements[-5:]) workflow_type, _ = self.workflow_router.detect_workflow(intent_text) if workflow_type == "find_image": - links = [] - try: - from image.image_processor import image_processor - from utils.image_queue import run_with_queue - for idx, (url, r) in enumerate(results, 1): - req_parts = [f"complexity:{r.get('complexity', 'normal')}"] - if r.get("gemini_prompt"): - req_parts.append(f"prompt:{r.get('gemini_prompt')}") - if r.get("aspect_ratio"): - req_parts.append(f"ratio:{r.get('aspect_ratio')}") - if r.get("perspective") and r.get("perspective") != "no": - req_parts.append(f"perspective:{r.get('perspective')}") - if r.get("proc_type"): - req_parts.append(f"proc_type:{r.get('proc_type')}") - if r.get("subject"): - req_parts.append(f"subject:{r.get('subject')}") - if r.get("quality"): - req_parts.append(f"quality:{r.get('quality')}") - - process_res = await run_with_queue(image_processor.process_image( - url, - "enhance", - requirements="|".join(req_parts), - gemini_prompt=r.get("gemini_prompt", ""), - aspect_ratio=r.get("aspect_ratio", "1:1"), - perspective=r.get("perspective", "no"), - proc_type=r.get("proc_type", ""), - subject=r.get("subject", ""), - quality=r.get("quality", ""), - )) - if not process_res.get("success"): - raise RuntimeError(process_res.get("message", "图片处理失败")) - - ok, link, _ = await upload_to_tuhui( - process_res["result_path"], - title=f"客户{message.from_id[-4:]}-图片{idx}", - description="AI自动处理结果", - price=max(10, int(r.get("price_suggest", 20) or 20) + int(req_fee.get("extra", 0) or 0) // max(1, len(results))), - ) - if not ok: - raise RuntimeError(str(link)) - links.append(link) - except Exception as e: - # 找图分支失败时,不直接报价,先回到“收集需求再做” - print(f"[Agent] 找图自动处理失败,回退需求澄清: {e}") - return { - "reply": "这种可以做类似款。你先说下具体需求:要几张、是否改字、尺寸比例、交付格式(单图/打包链接),我按需求给你直接做。", - "need_transfer": False, - } - else: - lines = [f"找到了,链接如下:"] - for i, link in enumerate(links, 1): - lines.append(f"链接{i}:{link}") - state.last_price = bundle_price - try: - from db.customer_db import db - db.update_last_price(message.from_id, bundle_price) - except Exception: - pass - state.pending_image_urls.clear() - state.pending_requirements.clear() - self._sync_pending_quote_state(message.from_id, state) - return {"reply": "\n".join(lines), "need_transfer": False} + route_res = await self._try_batch_auto_process( + results=results, + message=message, + req_fee=req_fee, + ) + self._finalize_batch_state(state, message.from_id, final_price=bundle_price) + return route_res reply_text = self._build_batch_quote_reply( results=results, @@ -2300,18 +2358,7 @@ class CustomerServiceAgent: bundle_price=bundle_price, req_fee=req_fee, ) - - state.last_price = bundle_price - try: - from db.customer_db import db - db.update_last_price(message.from_id, bundle_price) - except Exception: - pass - - # 清空待报价队列(本轮已统一报价) - state.pending_image_urls.clear() - state.pending_requirements.clear() - self._sync_pending_quote_state(message.from_id, state) + self._finalize_batch_state(state, message.from_id, final_price=bundle_price) return {"reply": reply_text, "need_transfer": False} def _split_customer_text(self, msg: str) -> tuple: