From 433f6e77e5b71bb81b7e52fdfc92146dc49bd628 Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Sun, 1 Mar 2026 16:00:28 +0800 Subject: [PATCH] refactor: extract batch quote helpers from pydantic agent --- core/batch_quote_helpers.py | 181 +++++++++++++++++++++++++++++++++++ core/pydantic_ai_agent.py | 185 +++--------------------------------- 2 files changed, 193 insertions(+), 173 deletions(-) create mode 100644 core/batch_quote_helpers.py diff --git a/core/batch_quote_helpers.py b/core/batch_quote_helpers.py new file mode 100644 index 0000000..c61c65f --- /dev/null +++ b/core/batch_quote_helpers.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +import random +from typing import Any + + +def calc_requirement_surcharge(requirements: list[str]) -> dict[str, Any]: + """ + 把客户补充需求做成结构化加价,避免纯靠模型自由发挥导致价格波动。 + 返回: + {"extra": int, "hits": List[str]} + """ + text = " ".join(requirements or []) + rules = [ + (["分层", "psd", "源文件"], 30, "分层/源文件"), + (["去背景", "抠图", "透明底", "白底"], 5, "去背景"), + (["换背景", "换场景", "合成", "转到", "换到", "放到", "贴到", "移到", "套到", "图案上去", "元素放到"], 10, "跨图合成/换背景"), + (["改字", "改文字", "替换文字", "排版"], 10, "改文字/排版"), + (["调色", "改色", "换色", "配色"], 5, "调色"), + (["多版本", "多个版本", "两版", "三版"], 10, "多版本"), + (["加急", "今天要", "马上要", "尽快"], 10, "加急"), + ] + total = 0 + hits: list[str] = [] + for keywords, fee, label in rules: + if any(k in text for k in keywords): + total += fee + hits.append(f"{label}+{fee}") + total = min(total, 60) + total = round(total / 5) * 5 + return {"extra": total, "hits": hits} + + +def build_batch_quote_reply( + *, + results: list[tuple[str, dict[str, Any]]], + total_suggest: int, + bundle_price: int, + req_fee: dict[str, Any], +) -> str: + """构建分图明细 + 单条总报价可选项回复。""" + complexity_map = { + "simple": "简单", + "normal": "常规", + "complex": "复杂", + "hard": "高难", + } + detail_lines: list[str] = [] + for i, (_, r) in enumerate(results, 1): + p = int(r.get("price_suggest", 20) or 20) + cx = complexity_map.get(str(r.get("complexity", "normal")), "常规") + reason = str(r.get("reason", "常规处理")).replace("\n", " ").strip() + if len(reason) > 18: + reason = reason[:18] + "..." + detail_lines.append(f"图{i}:{p}元({cx},{reason})") + + extra = int(req_fee.get("extra", 0) or 0) + single_total = round((total_suggest + extra) / 5) * 5 + req_hit = "、".join(req_fee.get("hits", [])) if req_fee.get("hits") else "" + + if len(results) == 1: + line = detail_lines[0].replace("图1:", "这张:") + heads = [ + "这张我看过了,先给你报下:", + "这张可以做,价格给你报下:", + "看了这张图,报价如下:", + "我先按这张给你算下:", + "这张处理没问题,我给你报个实在价:", + "我看完这张了,价格给你说下:", + "按这张图的难度,报价是:", + "这张我已经评估完了,先给你个价格:", + ] + lines = [f"{random.choice(heads)}{line.split(':', 1)[1]}"] + if req_hit: + lines.append(f"按你的需求另加{extra}元({req_hit})。") + tails = [ + f"这张做下来共{single_total}元,定了我马上开工。", + f"合下来是{single_total}元,你点头我这边立刻安排。", + f"总价{single_total}元,可以的话我现在就给你做。", + f"这一张算下来{single_total}元,你说开做我就马上弄。", + f"给你按{single_total}元做,确定的话我现在就排上。", + f"这张我按{single_total}元给你做,没问题就直接开始。", + f"这张最终{single_total}元,你点头我立刻开干。", + f"这张就按{single_total}元走,你确认我就马上安排。", + ] + lines.append(random.choice(tails)) + return "\n".join(lines) + + heads = [ + "我先按这几张给你报一下:", + "这几张我都看过了,价格给你列一下:", + "我把每张价格先给你说清楚:", + "我先把这几张的价格拆开给你看:", + "这几张我都评估过了,报价给你写明白:", + "先别急,我把每张大概价给你列出来:", + "我按这批图先报个明细给你:", + "我先把每张费用和总价给你算出来:", + ] + lines = [random.choice(heads)] + lines.extend(detail_lines) + if req_hit: + lines.append(f"需求加价:+{extra}元({req_hit})") + option_line = random.choice([ + f"可选:按单张做(共{single_total}元),或打包做({bundle_price}元,会更省一点)。", + f"可选:单张算下来一共{single_total}元;打包给你{bundle_price}元,更划算。", + f"可选:你按单张做共{single_total}元,按打包做我给你{bundle_price}元。", + f"可选:分开做总共{single_total}元,打包做{bundle_price}元(省一点)。", + f"可选:按张算共{single_total}元;直接打包{bundle_price}元。", + ]) + lines.append(option_line) + lines.append( + random.choice( + [ + "你定一个,我这边马上开工。", + "你选个方案,我立刻给你安排上。", + "你拍板就行,我这边马上开做。", + "你看选哪个合适,我这边马上给你做。", + "你一句话定下来,我现在就给你安排。", + ] + ) + ) + return "\n".join(lines) + + +def prepare_batch_intake(state: Any) -> dict[str, Any]: + """Stage 1: 收集阶段,标准化输入并做上限约束。""" + urls = list(getattr(state, "pending_image_urls", []) or []) + if not urls: + return {"ok": False, "reply": "你先把图片发我,我看完再给你统一报价。", "need_transfer": False} + try: + from config.config import BATCH_ANALYZE_CONCURRENCY, BATCH_MAX_IMAGES + + max_images = max(1, int(BATCH_MAX_IMAGES)) + analyze_concurrency = max(1, int(BATCH_ANALYZE_CONCURRENCY)) + except Exception: + max_images = 12 + analyze_concurrency = 3 + if len(urls) > max_images: + return { + "ok": False, + "reply": f"这次图片有点多({len(urls)}张),我先按前{max_images}张处理报价,剩下的下一批继续发我。", + "need_transfer": False, + } + return { + "ok": True, + "urls": urls[:max_images], + "requirements": list(getattr(state, "pending_requirements", []) or []), + "analyze_concurrency": analyze_concurrency, + } + + +def assess_batch_risk(results: list[tuple[str, dict[str, Any]]]) -> dict[str, list[str]]: + """Stage 2.5: 分离可做和风险图。""" + unsafe: list[str] = [] + dense_text_reject: list[str] = [] + for i, (_, r) in enumerate(results, 1): + if r.get("feasibility") == "no" or r.get("risk") == "high": + unsafe.append(f"图{i}") + note = str(r.get("note", "") or "") + if "文字内容过于密集" in note or "密集文字" in note: + dense_text_reject.append(f"图{i}") + return {"unsafe": unsafe, "dense_text_reject": dense_text_reject} + + +def build_batch_pricing_plan(results: list[tuple[str, dict[str, Any]]], requirements: list[str]) -> dict[str, Any]: + """Stage 3: 报价计算(图片成本 + 需求加价 + 打包价)。""" + total_suggest = sum(int(r.get("price_suggest", 20) or 20) for _, r in results) + req_fee = calc_requirement_surcharge(requirements) + if len(results) == 2: + bundle_price = max(10, total_suggest - 5) + elif len(results) >= 3: + bundle_price = max(10, round(total_suggest * 0.9 / 5) * 5) + else: + bundle_price = total_suggest + bundle_price += int(req_fee.get("extra", 0) or 0) + bundle_price = round(bundle_price / 5) * 5 + return { + "total_suggest": total_suggest, + "req_fee": req_fee, + "bundle_price": bundle_price, + } diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index 4e430b1..924e5e1 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -66,6 +66,13 @@ from core.context_helpers import ( get_intent_emotion_hint, get_refusal_context_hint, ) +from core.batch_quote_helpers import ( + assess_batch_risk, + build_batch_pricing_plan, + build_batch_quote_reply, + calc_requirement_surcharge, + prepare_batch_intake, +) load_dotenv() @@ -1592,146 +1599,6 @@ class CustomerServiceAgent: _build_not_understood_reply = staticmethod(build_not_understood_reply) _append_requirement = staticmethod(append_requirement) - def _calc_requirement_surcharge(self, requirements: List[str]) -> Dict[str, Any]: - """ - 把客户补充需求做成结构化加价,避免纯靠模型自由发挥导致价格波动。 - 返回: - {"extra": int, "hits": List[str]} - """ - text = " ".join(requirements or []) - rules = [ - (["分层", "psd", "源文件"], 30, "分层/源文件"), - (["去背景", "抠图", "透明底", "白底"], 5, "去背景"), - (["换背景", "换场景", "合成", "转到", "换到", "放到", "贴到", "移到", "套到", "图案上去", "元素放到"], 10, "跨图合成/换背景"), - (["改字", "改文字", "替换文字", "排版"], 10, "改文字/排版"), - (["调色", "改色", "换色", "配色"], 5, "调色"), - (["多版本", "多个版本", "两版", "三版"], 10, "多版本"), - (["加急", "今天要", "马上要", "尽快"], 10, "加急"), - ] - total = 0 - hits: List[str] = [] - for keywords, fee, label in rules: - if any(k in text for k in keywords): - total += fee - hits.append(f"{label}+{fee}") - # 防止需求加价过高,做个上限保护 - total = min(total, 60) - # 金额统一 5 的倍数 - total = round(total / 5) * 5 - return {"extra": total, "hits": hits} - - def _build_batch_quote_reply( - self, - results: List[Tuple[str, Dict[str, Any]]], - total_suggest: int, - bundle_price: int, - req_fee: Dict[str, Any], - ) -> str: - """构建分图明细 + 单条总报价可选项回复。""" - complexity_map = { - "simple": "简单", - "normal": "常规", - "complex": "复杂", - "hard": "高难", - } - detail_lines: List[str] = [] - for i, (_, r) in enumerate(results, 1): - p = int(r.get("price_suggest", 20) or 20) - cx = complexity_map.get(str(r.get("complexity", "normal")), "常规") - reason = str(r.get("reason", "常规处理")).replace("\n", " ").strip() - if len(reason) > 18: - reason = reason[:18] + "..." - detail_lines.append(f"图{i}:{p}元({cx},{reason})") - - extra = int(req_fee.get("extra", 0) or 0) - single_total = round((total_suggest + extra) / 5) * 5 - req_hit = "、".join(req_fee.get("hits", [])) if req_fee.get("hits") else "" - - # 单图时不要使用“分图/这批/A-B方案”措辞,避免客户误解为多图。 - if len(results) == 1: - line = detail_lines[0].replace("图1:", "这张:") - heads = [ - "这张我看过了,先给你报下:", - "这张可以做,价格给你报下:", - "看了这张图,报价如下:", - "我先按这张给你算下:", - "这张处理没问题,我给你报个实在价:", - "我看完这张了,价格给你说下:", - "按这张图的难度,报价是:", - "这张我已经评估完了,先给你个价格:", - ] - lines = [f"{random.choice(heads)}{line.split(':', 1)[1]}"] - if req_hit: - lines.append(f"按你的需求另加{extra}元({req_hit})。") - tails = [ - f"这张做下来共{single_total}元,定了我马上开工。", - f"合下来是{single_total}元,你点头我这边立刻安排。", - f"总价{single_total}元,可以的话我现在就给你做。", - f"这一张算下来{single_total}元,你说开做我就马上弄。", - f"给你按{single_total}元做,确定的话我现在就排上。", - f"这张我按{single_total}元给你做,没问题就直接开始。", - f"这张最终{single_total}元,你点头我立刻开干。", - f"这张就按{single_total}元走,你确认我就马上安排。", - ] - lines.append(random.choice(tails)) - return "\n".join(lines) - - heads = [ - "我先按这几张给你报一下:", - "这几张我都看过了,价格给你列一下:", - "我把每张价格先给你说清楚:", - "我先把这几张的价格拆开给你看:", - "这几张我都评估过了,报价给你写明白:", - "先别急,我把每张大概价给你列出来:", - "我按这批图先报个明细给你:", - "我先把每张费用和总价给你算出来:", - ] - lines = [random.choice(heads)] - lines.extend(detail_lines) - if req_hit: - lines.append(f"需求加价:+{extra}元({req_hit})") - option_line = random.choice([ - f"可选:按单张做(共{single_total}元),或打包做({bundle_price}元,会更省一点)。", - f"可选:单张算下来一共{single_total}元;打包给你{bundle_price}元,更划算。", - f"可选:你按单张做共{single_total}元,按打包做我给你{bundle_price}元。", - f"可选:分开做总共{single_total}元,打包做{bundle_price}元(省一点)。", - f"可选:按张算共{single_total}元;直接打包{bundle_price}元。", - ]) - lines.append(option_line) - lines.append(random.choice([ - "你定一个,我这边马上开工。", - "你选个方案,我立刻给你安排上。", - "你拍板就行,我这边马上开做。", - "你看选哪个合适,我这边马上给你做。", - "你一句话定下来,我现在就给你安排。", - ])) - return "\n".join(lines) - - def _prepare_batch_intake(self, state: ConversationState) -> Dict[str, Any]: - """Stage 1: 收集阶段,标准化输入并做上限约束。""" - urls = list(state.pending_image_urls) - if not urls: - return {"ok": False, "reply": "你先把图片发我,我看完再给你统一报价。", "need_transfer": False} - try: - from config.config import BATCH_MAX_IMAGES, BATCH_ANALYZE_CONCURRENCY - max_images = max(1, int(BATCH_MAX_IMAGES)) - analyze_concurrency = max(1, int(BATCH_ANALYZE_CONCURRENCY)) - except Exception: - max_images = 12 - analyze_concurrency = 3 - if len(urls) > max_images: - return { - "ok": False, - "reply": f"这次图片有点多({len(urls)}张),我先按前{max_images}张处理报价,剩下的下一批继续发我。", - "need_transfer": False, - } - return { - "ok": True, - "urls": urls[:max_images], - "requirements": list(state.pending_requirements or []), - "analyze_concurrency": analyze_concurrency, - } - async def _run_batch_feasibility(self, urls: List[str], concurrency: int) -> List[Tuple[str, Dict[str, Any]]]: """Stage 2: 可做性分析(逐图)。""" from image.image_analyzer import image_analyzer @@ -1775,39 +1642,11 @@ class CustomerServiceAgent: except Exception as e: print(f"[Agent] Workflow 批量任务创建失败: {e}") - def _assess_batch_risk(self, results: List[Tuple[str, Dict[str, Any]]]) -> Dict[str, List[str]]: - """Stage 2.5: 分离可做和风险图。""" - unsafe: List[str] = [] - dense_text_reject: List[str] = [] - for i, (_, r) in enumerate(results, 1): - if r.get("feasibility") == "no" or r.get("risk") == "high": - unsafe.append(f"图{i}") - note = str(r.get("note", "") or "") - if "文字内容过于密集" in note or "密集文字" in note: - dense_text_reject.append(f"图{i}") - return {"unsafe": unsafe, "dense_text_reject": dense_text_reject} - - def _build_batch_pricing_plan( - self, - results: List[Tuple[str, Dict[str, Any]]], - requirements: List[str], - ) -> Dict[str, Any]: - """Stage 3: 报价计算(图片成本 + 需求加价 + 打包价)。""" - total_suggest = sum(int(r.get("price_suggest", 20) or 20) for _, r in results) - req_fee = self._calc_requirement_surcharge(requirements) - if len(results) == 2: - bundle_price = max(10, total_suggest - 5) - elif len(results) >= 3: - bundle_price = max(10, round(total_suggest * 0.9 / 5) * 5) - else: - bundle_price = total_suggest - bundle_price += int(req_fee.get("extra", 0) or 0) - bundle_price = round(bundle_price / 5) * 5 - return { - "total_suggest": total_suggest, - "req_fee": req_fee, - "bundle_price": bundle_price, - } + _calc_requirement_surcharge = staticmethod(calc_requirement_surcharge) + _build_batch_quote_reply = staticmethod(build_batch_quote_reply) + _prepare_batch_intake = staticmethod(prepare_batch_intake) + _assess_batch_risk = staticmethod(assess_batch_risk) + _build_batch_pricing_plan = staticmethod(build_batch_pricing_plan) async def _try_batch_auto_process( self,