From 872c44a0c0adda0956ad2c0624ec31ab8e4023ed Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Sun, 1 Mar 2026 16:06:43 +0800 Subject: [PATCH] refactor: extract prompt building and image workflow routing from agent --- core/image_workflow_router.py | 52 ++++++++ core/prompt_builder.py | 187 ++++++++++++++++++++++++++++ core/pydantic_ai_agent.py | 222 +++------------------------------- 3 files changed, 256 insertions(+), 205 deletions(-) create mode 100644 core/image_workflow_router.py create mode 100644 core/prompt_builder.py diff --git a/core/image_workflow_router.py b/core/image_workflow_router.py new file mode 100644 index 0000000..ff68a48 --- /dev/null +++ b/core/image_workflow_router.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +from typing import Any + + +async def handle_image_workflow(*, workflow_router: Any, message: str, data: dict, image_urls: list) -> bool: + """处理图片工作流(根据客户说的话判断执行哪种工作流)。""" + if not image_urls: + return False + + workflow_type, confidence = workflow_router.detect_workflow(message) + + customer_id = data.get("from_id") + acc_id = data.get("acc_id", "") + acc_type = data.get("acc_type", "AliWorkbench") + image_url = image_urls[0] + + print(f"[Agent] 检测到工作流类型:{workflow_type} (置信度:{confidence})") + + if workflow_type == "find_image": + print(f"[Agent] 执行查找图片工作流 | 客户:{customer_id}") + from core.workflow import workflow + + return await workflow.find_image_workflow( + customer_id=customer_id, + image_url=image_url, + acc_id=acc_id, + acc_type=acc_type, + ) + if workflow_type == "process_image": + print(f"[Agent] 执行处理图片工作流 | 客户:{customer_id}") + from core.workflow import workflow + + return await workflow.process_image_workflow( + customer_id=customer_id, + image_url=image_url, + acc_id=acc_id, + acc_type=acc_type, + ) + if workflow_type == "transfer_human": + print(f"[Agent] 执行转人工派单工作流 | 客户:{customer_id}") + from core.workflow import workflow + + return await workflow.transfer_to_designer_workflow( + customer_id=customer_id, + image_url=image_url, + acc_id=acc_id, + acc_type=acc_type, + reason="客户主动要求转人工", + ) + + return False diff --git a/core/prompt_builder.py b/core/prompt_builder.py new file mode 100644 index 0000000..3f45620 --- /dev/null +++ b/core/prompt_builder.py @@ -0,0 +1,187 @@ +from __future__ import annotations + +import re +from typing import Any, Callable + + +def split_customer_text(msg: str) -> tuple[str, str]: + """ + 把混合消息拆分为(客户真实文字, 系统订单块)。 + 平台有时把客户文字和系统订单通知拼在同一条消息里。 + """ + order_marker = re.search(r"\[系统订单信息\]|\[系统通知\]", msg or "") + if order_marker: + customer_text = (msg or "")[: order_marker.start()].strip() + order_block = (msg or "")[order_marker.start() :].strip() + else: + customer_text = (msg or "").strip() + order_block = "" + return customer_text, order_block + + +def build_prompt( + *, + message: Any, + state: Any, + extract_image_url: Callable[[str], str], + shop_type_resolver: Callable[[str, str], str], + parse_order_info: Callable[[str], dict[str, str]], + build_order_instruction: Callable[[str, str], str], +) -> str: + """构建提示词。""" + msg_content = message.msg + stage_info = f"【当前阶段】{state.stage}" + + customer_text, order_block = split_customer_text(msg_content) + has_order = bool(order_block) + + if has_order: + order = parse_order_info(order_block) + if order.get("order_id"): + state.last_order_id = order["order_id"] + stage_info += f"\n【订单号】{order['order_id']}" + if order.get("order_status"): + state.order_status = order["order_status"] + stage_info += f"\n【订单状态】{order['order_status']}" + if order.get("pay_status"): + stage_info += f"\n【支付状态】{order['pay_status']}" + if order.get("amount"): + stage_info += f"\n【订单金额】{order['amount']}元" + if order.get("quantity"): + stage_info += f"\n【数量】{order['quantity']}件" + if order.get("order_time"): + stage_info += f"\n【下单时间】{order['order_time']}" + if order.get("buyer_note"): + stage_info += f"\n【买家备注】{order['buyer_note']}" + + if state.discount_count > 0: + stage_info += f"\n【客户压价次数】{state.discount_count}" + + shop_type = shop_type_resolver(message.acc_id or "", message.goods_name or "") + shop_hint = "" + try: + from config.config import CONFIG_DIR + import json + + cfg_path = CONFIG_DIR / "shop_prompts.json" + if cfg_path.exists(): + with open(cfg_path, "r", encoding="utf-8") as f: + cfg = json.load(f) + hints = cfg.get("type_hints", {}) + shop_hint = hints.get(shop_type, "") + if not shop_hint and message.acc_id: + sh = cfg.get("shops", {}).get(message.acc_id, {}) + shop_hint = sh.get("hint", "") + except Exception: + pass + + prompt = f"""收到新消息: +{stage_info} + +发送者: {message.from_name} ({message.from_id}) +""" + if message.goods_name: + prompt += f"商品名称: {message.goods_name}\n" + if shop_hint: + prompt += f"\n{shop_hint}\n" + + order_paid = False + order_unpaid = False + if has_order: + order = parse_order_info(order_block) + paid_kws = ["等待发货", "已付款", "付款成功", "买家已付款"] + unpaid_kws = ["等待买家付款", "待付款", "未付款"] + ps = order.get("pay_status", "") + os_ = order.get("order_status", "") + if any(kw in ps or kw in os_ for kw in paid_kws): + order_paid = True + elif any(kw in ps or kw in os_ for kw in unpaid_kws): + order_unpaid = True + + progress_keywords = [ + "安排了吗", + "安排好了吗", + "好了吗", + "做了吗", + "做好了吗", + "弄好了吗", + "好了没", + "做了没", + "什么时候好", + "多久好", + "进度", + "催一下", + "快点", + "什么时候能好", + "做完了吗", + ] + + if customer_text: + prompt += f"\n客户说:{customer_text}\n" + image_url = extract_image_url(customer_text) + price_keywords = ["多少钱", "多少", "价格", "几块", "怎么收费", "报个价"] + size_keywords = [ + "尺寸", + "比例", + "宽", + "高", + "米", + "厘米", + "mm", + "cm", + "横版", + "竖版", + "2米", + "3米", + "改成", + "做成", + ] + has_size_change = any(kw in customer_text.lower() for kw in [k.lower() for k in size_keywords]) + + if shop_type == "gemini_api": + prompt += "\n【Gemini API 店铺】客户问账号/pro/续费/套餐等,按 API 客服自然回复,不要求发图。" + elif image_url: + prompt += "\n客户在继续发图阶段:先确认“已收图”,并引导客户把图和要求一次发完;等客户明确“发完了/统一报价”后再统一报价。" + elif any(kw in customer_text for kw in price_keywords): + last_url = extract_image_url(msg_content) + if last_url: + prompt += "\n客户在询问价格:若客户已确认发完,则给总报价;若还在发图,先引导发完后统一报价。" + else: + prompt += "\n客户在询问价格但未发图:先简短承接(如“在看呢/收到”),不要机械连发;再自然引导对方发图。" + if has_size_change: + prompt += ( + "\n⚠️ 尺寸改动场景:优先判断图片主体是否会被拉伸变形," + "不是只看整张图宽高比。若会变形,要先提示“需要补图/扩边”,再给报价。" + ) + elif has_size_change: + prompt += ( + "\n客户在改尺寸/改比例:先按主体比例判断是否会变形," + "不是只看整图比例。若目标尺寸会拉伸主体,先明确说明要补图(如上下补图/扩边)再报价。" + ) + elif any(kw in customer_text for kw in progress_keywords): + if order_unpaid: + prompt += "\n⚠️【订单未付款】客户问安排进度,但订单还未付款。自然告知拍下付款后马上安排即可。" + elif order_paid: + prompt += "\n客户催单,订单已付款,自然回复在做了/快了之类。" + else: + prompt += "\n客户催单,查询当前处理状态后自然回复。" + elif any(kw in customer_text for kw in ["贵", "有点贵", "太贵", "算了", "便宜点", "少点", "打折", "贵哦"]): + prompt += "\n⚠️【客户嫌贵】客户已表达价格顾虑或要放弃。禁止追问「什么问题」「说清楚点」。\n→ 第一步:直接让价一次(如原价 20→15),话术自然。\n→ 第二步:如果让价后客户还是嫌贵,引导客户自己去注册星绘平台:\n - 每个话术都必须带网址 xinhui.cloud\n - 强调「自己去弄」「自己操作」「自己下单」\n - 示例:「嫌贵就去 xinhui.cloud 注册个号,自己有余额可以弄」「xinhui.cloud 去注册下,新用户送体验金,自己弄便宜」\n - 语气直接,让客户自己去操作」" + elif any(kw in customer_text for kw in ["擦边", "黄色", "色情", "大尺度", "性感图", "露点", "半裸"]): + prompt += "\n⚠️【拒绝】客户询问擦边/黄色/敏感内容。直接拒绝,不接单,不说「发图来看看」。自然回复如:这类不做/不接/做不了。" + else: + prompt += "\n根据客户说的内容自然回应,像真人聊天,不要套模板。" + + if has_order: + order = parse_order_info(order_block) + order_instruction = build_order_instruction(order.get("pay_status", ""), order.get("order_status", "")) + if customer_text: + if not order_unpaid: + prompt += f"\n\n【背景参考-订单通知】{order_instruction}" + else: + prompt += f"\n\n{order_instruction}" + + if not customer_text and not has_order: + prompt += f"\n消息内容: {msg_content}\n请按工作流规则回复。" + + return prompt diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index 924e5e1..b2a751a 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -73,6 +73,8 @@ from core.batch_quote_helpers import ( calc_requirement_surcharge, prepare_batch_intake, ) +from core.prompt_builder import build_prompt as build_agent_prompt, split_customer_text +from core.image_workflow_router import handle_image_workflow as route_image_workflow load_dotenv() @@ -1777,215 +1779,25 @@ class CustomerServiceAgent: self._finalize_batch_state(state, message.from_id, final_price=bundle_price) return {"reply": reply_text, "need_transfer": False} - def _split_customer_text(self, msg: str) -> tuple: - """ - 把混合消息拆分为(客户真实文字, 系统订单块)。 - 平台有时把客户文字和系统订单通知拼在同一条消息里。 - """ - import re - # 找到系统订单块的起始位置 - order_marker = re.search(r'\[系统订单信息\]|\[系统通知\]', msg) - if order_marker: - customer_text = msg[:order_marker.start()].strip() - order_block = msg[order_marker.start():].strip() - else: - customer_text = msg.strip() - order_block = "" - return customer_text, order_block + _split_customer_text = staticmethod(split_customer_text) def _build_prompt(self, message: CustomerMessage, state: ConversationState) -> str: - """构建提示词""" - msg_content = message.msg - stage_info = f"【当前阶段】{state.stage}" - - # 拆分:客户文字 vs 系统订单块 - customer_text, order_block = self._split_customer_text(msg_content) - has_order = bool(order_block) - - if has_order: - order = parse_order_info(order_block) - if order.get('order_id'): - state.last_order_id = order['order_id'] - stage_info += f"\n【订单号】{order['order_id']}" - if order.get('order_status'): - state.order_status = order['order_status'] - stage_info += f"\n【订单状态】{order['order_status']}" - if order.get('pay_status'): - stage_info += f"\n【支付状态】{order['pay_status']}" - if order.get('amount'): - stage_info += f"\n【订单金额】{order['amount']}元" - if order.get('quantity'): - stage_info += f"\n【数量】{order['quantity']}件" - if order.get('order_time'): - stage_info += f"\n【下单时间】{order['order_time']}" - if order.get('buyer_note'): - stage_info += f"\n【买家备注】{order['buyer_note']}" - - if state.discount_count > 0: - stage_info += f"\n【客户压价次数】{state.discount_count}" - - # 店铺类型:不同店铺不同回复策略 - shop_type = _get_shop_type(message.acc_id or "", message.goods_name or "") - shop_hint = "" - try: - from config.config import CONFIG_DIR - import json - cfg_path = CONFIG_DIR / "shop_prompts.json" - if cfg_path.exists(): - with open(cfg_path, "r", encoding="utf-8") as f: - cfg = json.load(f) - hints = cfg.get("type_hints", {}) - shop_hint = hints.get(shop_type, "") - if not shop_hint and message.acc_id: - sh = cfg.get("shops", {}).get(message.acc_id, {}) - shop_hint = sh.get("hint", "") - except Exception: - pass - - prompt = f"""收到新消息: -{stage_info} - -发送者: {message.from_name} ({message.from_id}) -""" - if message.goods_name: - prompt += f"商品名称: {message.goods_name}\n" - if shop_hint: - prompt += f"\n{shop_hint}\n" - - # ── 优先处理客户真实问题 ── - # ── 判断订单付款状态(供后续逻辑使用)── - order_paid = False - order_unpaid = False - if has_order: - order = parse_order_info(order_block) - paid_kws = ["等待发货", "已付款", "付款成功", "买家已付款"] - unpaid_kws = ["等待买家付款", "待付款", "未付款"] - ps = order.get('pay_status', '') - os_ = order.get('order_status', '') - if any(kw in ps or kw in os_ for kw in paid_kws): - order_paid = True - elif any(kw in ps or kw in os_ for kw in unpaid_kws): - order_unpaid = True - - # ── 催单/进度询问关键词 ── - progress_keywords = [ - "安排了吗", "安排好了吗", "好了吗", "做了吗", "做好了吗", - "弄好了吗", "好了没", "做了没", "什么时候好", "多久好", - "进度", "催一下", "快点", "什么时候能好", "做完了吗", - ] - - if customer_text: - prompt += f"\n客户说:{customer_text}\n" - image_url = self._extract_image_url(customer_text) - price_keywords = ["多少钱", "多少", "价格", "几块", "怎么收费", "报个价"] - size_keywords = [ - "尺寸", "比例", "宽", "高", "米", "厘米", "mm", "cm", - "横版", "竖版", "2米", "3米", "改成", "做成", - ] - has_size_change = any(kw in customer_text.lower() for kw in [k.lower() for k in size_keywords]) - - # gemini_api 店铺:不触发找图流程,按 API 客服回复 - if shop_type == "gemini_api": - prompt += "\n【Gemini API 店铺】客户问账号/pro/续费/套餐等,按 API 客服自然回复,不要求发图。" - elif image_url: - prompt += "\n客户在继续发图阶段:先确认“已收图”,并引导客户把图和要求一次发完;等客户明确“发完了/统一报价”后再统一报价。" - elif any(kw in customer_text for kw in price_keywords): - last_url = self._extract_image_url(msg_content) - if last_url: - prompt += "\n客户在询问价格:若客户已确认发完,则给总报价;若还在发图,先引导发完后统一报价。" - else: - prompt += "\n客户在询问价格但未发图:先简短承接(如“在看呢/收到”),不要机械连发;再自然引导对方发图。" - if has_size_change: - prompt += ( - "\n⚠️ 尺寸改动场景:优先判断图片主体是否会被拉伸变形," - "不是只看整张图宽高比。若会变形,要先提示“需要补图/扩边”,再给报价。" - ) - elif has_size_change: - prompt += ( - "\n客户在改尺寸/改比例:先按主体比例判断是否会变形," - "不是只看整图比例。若目标尺寸会拉伸主体,先明确说明要补图(如上下补图/扩边)再报价。" - ) - elif any(kw in customer_text for kw in progress_keywords): - # 客户问进度/催单,必须先核查付款状态 - if order_unpaid: - prompt += "\n⚠️【订单未付款】客户问安排进度,但订单还未付款。自然告知拍下付款后马上安排即可。" - elif order_paid: - prompt += "\n客户催单,订单已付款,自然回复在做了/快了之类。" - else: - prompt += "\n客户催单,查询当前处理状态后自然回复。" - elif any(kw in customer_text for kw in ["贵", "有点贵", "太贵", "算了", "便宜点", "少点", "打折", "贵哦"]): - # 客户嫌贵/要放弃 → 直接让价一次,不问「什么问题」 - prompt += "\n⚠️【客户嫌贵】客户已表达价格顾虑或要放弃。禁止追问「什么问题」「说清楚点」。\n→ 第一步:直接让价一次(如原价 20→15),话术自然。\n→ 第二步:如果让价后客户还是嫌贵,引导客户自己去注册星绘平台:\n - 每个话术都必须带网址 xinhui.cloud\n - 强调「自己去弄」「自己操作」「自己下单」\n - 示例:「嫌贵就去 xinhui.cloud 注册个号,自己有余额可以弄」「xinhui.cloud 去注册下,新用户送体验金,自己弄便宜」\n - 语气直接,让客户自己去操作」" - elif any(kw in customer_text for kw in ["擦边", "黄色", "色情", "大尺度", "性感图", "露点", "半裸"]): - # 客户问擦边/黄色内容 → 直接拒绝,不说「发图来看看」 - prompt += "\n⚠️【拒绝】客户询问擦边/黄色/敏感内容。直接拒绝,不接单,不说「发图来看看」。自然回复如:这类不做/不接/做不了。" - else: - prompt += "\n根据客户说的内容自然回应,像真人聊天,不要套模板。" - - # ── 附加订单信息(不覆盖客户问题的优先级)── - if has_order: - order = parse_order_info(order_block) - order_instruction = build_order_instruction( - order.get('pay_status', ''), - order.get('order_status', '') - ) - if customer_text: - if not order_unpaid: - # 未付款情况已在上面明确处理,不重复添加背景 - prompt += f"\n\n【背景参考-订单通知】{order_instruction}" - else: - # 纯系统通知,没有客户文字 - prompt += f"\n\n{order_instruction}" - - if not customer_text and not has_order: - prompt += f"\n消息内容: {msg_content}\n请按工作流规则回复。" - - return prompt + return build_agent_prompt( + message=message, + state=state, + extract_image_url=self._extract_image_url, + shop_type_resolver=_get_shop_type, + parse_order_info=parse_order_info, + build_order_instruction=build_order_instruction, + ) async def _handle_image_workflow(self, message: str, data: dict, image_urls: list) -> bool: - """处理图片工作流(根据客户说的话判断执行哪种工作流)""" - if not image_urls: - return False - - workflow_type, confidence = self.workflow_router.detect_workflow(message) - - customer_id = data.get('from_id') - acc_id = data.get('acc_id', '') - acc_type = data.get('acc_type', 'AliWorkbench') - image_url = image_urls[0] - - print(f"[Agent] 检测到工作流类型:{workflow_type} (置信度:{confidence})") - - if workflow_type == "find_image": - print(f"[Agent] 执行查找图片工作流 | 客户:{customer_id}") - from core.workflow import workflow - return await workflow.find_image_workflow( - customer_id=customer_id, - image_url=image_url, - acc_id=acc_id, - acc_type=acc_type - ) - elif workflow_type == "process_image": - print(f"[Agent] 执行处理图片工作流 | 客户:{customer_id}") - from core.workflow import workflow - return await workflow.process_image_workflow( - customer_id=customer_id, - image_url=image_url, - acc_id=acc_id, - acc_type=acc_type - ) - elif workflow_type == "transfer_human": - print(f"[Agent] 执行转人工派单工作流 | 客户:{customer_id}") - from core.workflow import workflow - return await workflow.transfer_to_designer_workflow( - customer_id=customer_id, - image_url=image_url, - acc_id=acc_id, - acc_type=acc_type, - reason="客户主动要求转人工" - ) - - return False + return await route_image_workflow( + workflow_router=self.workflow_router, + message=message, + data=data, + image_urls=image_urls, + ) async def test_agent():