From 4b2d3347daec6ecb7b5fef9b2847dca9259e70d4 Mon Sep 17 00:00:00 2001 From: jimi <1847930177@qq.com> Date: Sun, 1 Mar 2026 16:14:29 +0800 Subject: [PATCH] refactor: extract tool registration implementations from agent --- core/agent_tools.py | 679 ++++++++++++++++++++++++++++++++++++++ core/pydantic_ai_agent.py | 670 +------------------------------------ 2 files changed, 681 insertions(+), 668 deletions(-) create mode 100644 core/agent_tools.py diff --git a/core/agent_tools.py b/core/agent_tools.py new file mode 100644 index 0000000..f4d848b --- /dev/null +++ b/core/agent_tools.py @@ -0,0 +1,679 @@ +from __future__ import annotations + +from typing import Any + +from pydantic_ai import RunContext + +from db.customer_risk_db import risk_db +from services.service_tuhui_upload import upload_to_tuhui +from core.order_helpers import parse_order_info + + +def register_tools(agent) -> None: + """注册所有 Tool,让 Agent 可以主动调用。""" + + @agent.agent.tool + async def analyze_image(ctx: RunContext[Any], image_url: str) -> str: + """ + 分析客户发来的图片复杂度,用于报价。 + 收到图片URL时调用此工具,返回复杂度和建议报价。 + """ + try: + from image.image_analyzer import image_analyzer + result = await image_analyzer.analyze(image_url) + complexity_label = { + "simple": "简单(画面干净)", + "normal": "一般复杂度", + "complex": "细节偏多", + "hard": "非常复杂", + }.get(result["complexity"], result["complexity"]) + # 持久化图片URL和复杂度,重启后仍能记住这张图 + try: + from db.customer_db import db + db.update_last_image( + ctx.deps.from_id, + image_url, + complexity=result["complexity"], + gemini_prompt=result.get("gemini_prompt", ""), + aspect_ratio=result.get("aspect_ratio", "1:1"), + perspective=result.get("perspective", "no"), + ) + except Exception: + pass + + # 存图片类型到客户画像 + try: + from db.customer_db import db as _db + if result.get("subject"): + _db.add_image_type(ctx.deps.from_id, result["subject"]) + except Exception: + pass + + # 在 workflow 里创建待处理任务(付款后自动触发 Gemini) + try: + from core.workflow import workflow + await workflow.image_analysis_result( + customer_id=ctx.deps.from_id, + image_url=image_url, + complexity=result["complexity"], + acc_id=ctx.deps.acc_id, + acc_type=ctx.deps.platform, + gemini_prompt=result.get("gemini_prompt", ""), + aspect_ratio=result.get("aspect_ratio", "1:1"), + perspective=result.get("perspective", "no"), + proc_type=result.get("proc_type", ""), + subject=result.get("subject", ""), + quality=result.get("quality", ""), + ) + print(f"[Agent] Workflow 任务已创建 | 客户: {ctx.deps.from_id} | 比例: {result.get('aspect_ratio')} | 透视: {result.get('perspective')} | 图片: {image_url[:60]}...") + except Exception as e: + print(f"[Agent] Workflow 任务创建失败: {e}") + + # 组装给 AI 的分析报告 + risk = result.get("risk", "none") + has_face = result.get("has_face", "no") + feasibility = result.get("feasibility", "yes") + note = result.get("note", "") + + lines = [ + f"图片主体:{result['subject'] or '未识别'}", + f"处理类型:{result['proc_type'] or '高清修复'}", + f"原图质量:{result['quality'] or '未知'}", + f"图片类型:{result.get('category', '') or '通用'}", + f"图片尺寸:{(result.get('width') or 0)}x{(result.get('height') or 0)}({result.get('megapixels', 0.0)}MP)", + f"含人脸:{'是' if has_face == 'yes' else '否'}", + f"复杂度:{complexity_label}", + f"原因:{result['reason']}", + ] + if result.get("size_surcharge"): + lines.append(f"尺寸加价:+{result['size_surcharge']}元") + if result.get("size_note"): + lines.append(f"尺寸提示:{result['size_note']}") + try: + st = agent._get_conversation_state(ctx.deps.from_id) + if isinstance(result.get("price_min"), (int, float)): + st.last_min_price = int(result.get("price_min") or 0) + try: + from db.customer_db import db as _db + _db.update_last_min_price(ctx.deps.from_id, st.last_min_price) + except Exception: + pass + except Exception: + pass + + # 根据可做性和风险等级给 AI 不同的行动指引 + if feasibility == "no": + if "敏感" in (note or ""): + lines.append("【拒绝】图片含敏感/黄色/擦边内容,不接单。") + lines.append("→ 直接拒绝,不说「发图来看看」,自然回复如:这类不做/不接。") + else: + lines.append("【无法处理】此图无法处理(纯黑/纯白/完全损坏/要找原始RAW文件)。") + lines.append("→ 告知客户无法处理,建议换图或说明原因,不要报价。") + elif risk == "high": + lines.append(f"【高风险】此图处理风险高:{note or 'AI修复后效果不能保证与原图一致'}") + lines.append(f"建议报价:{result['price_suggest']}元") + lines.append("→ 先自然说明风险(人脸/效果可能不完美),再报价,满意再拍。话术自然。") + elif risk == "low": + lines.append(f"【低风险-含人脸】修复后人脸相似度约70-90%,效果不稳定。") + lines.append(f"建议报价:{result['price_suggest']}元") + lines.append(f"→ 报价时自然加一句风险提示(人脸可能有轻微变化、满意再付等)") + else: + # 无风险,正常报价 + base_price = result.get('price_suggest', 20) + text_surcharge = result.get('text_surcharge', 0) + layer_surcharge = result.get('layer_surcharge', 0) + total_price = base_price + text_surcharge + layer_surcharge + + # 构建报价说明 + price_explanation = f"建议报价:{total_price}元" + if text_surcharge > 0: + price_explanation += f"(含文字处理 +{text_surcharge}元)" + if layer_surcharge > 0: + price_explanation += f"(含分层 +{layer_surcharge}元)" + + lines.append(price_explanation) + + # 添加文字数量说明 + text_amount = result.get('text_amount', 'none') + if text_amount != 'none': + lines.append(f"文字数量:{text_amount},需要精细处理") + + if feasibility == "partial": + lines.append(f"⚠️ 此图有一定难度:{note or '效果可能不完美'},回复时可加「效果不满意退款」") + if note and note not in ("无", ""): + lines.append(f"提示:{note}") + lines.append(f"【立刻回复客户报价 {total_price} 元,话术自然多变】") + + return "\n".join(lines) + except Exception as e: + return f"图片分析失败: {e},请根据经验判断报价" + + @agent.agent.tool + async def get_customer_info(ctx: RunContext[Any], customer_id: str) -> str: + """ + 查询客户历史信息:消费记录、性格标签、报价历史等。 + 对话开始时或需要了解客户背景时调用。 + """ + try: + from db.customer_db import db + return db.get_profile_text(customer_id) + except Exception as e: + return f"查询失败: {e}" + + @agent.agent.tool + async def transfer_to_human(ctx: RunContext[Any]) -> str: + """ + 转接人工客服。 + 遇到退款/投诉/情绪激动/复杂售后时调用。 + """ + return "TRANSFER_REQUESTED" + + @agent.agent.tool + async def get_customer_risk_profile(ctx: RunContext[Any], customer_id: str = "") -> str: + """查询客户风控画像:退款/不付款/差评/人工黑名单等。""" + cid = customer_id or ctx.deps.from_id + try: + info = risk_db.evaluate_customer(cid) + return ( + f"客户:{cid}\n" + f"不接单:{'是' if info.get('do_not_serve') else '否'}\n" + f"风险等级:{info.get('computed_level','low')} 分数:{info.get('computed_score',0)}\n" + f"近30天退款:{info.get('refund_30d',0)}\n" + f"近7天未付款下单:{info.get('unpaid_7d',0)}\n" + f"近90天差评:{info.get('bad_review_90d',0)}\n" + f"备注:{info.get('note','') or '无'}" + ) + except Exception as e: + return f"查询风控画像失败: {e}" + + @agent.agent.tool + async def mark_customer_risk( + ctx: RunContext[Any], + customer_id: str, + do_not_serve: bool = False, + risk_level: str = "low", + risk_score: int = 0, + note: str = "", + tag: str = "", + ) -> str: + """人工标记客户风控画像(不接单/高风险/备注标签)。""" + try: + tags = [tag] if tag else [] + risk_db.set_profile( + customer_id=customer_id, + do_not_serve=do_not_serve, + risk_level=risk_level, + risk_score=risk_score, + note=note, + tags=tags, + ) + return "风控画像已更新" + except Exception as e: + return f"更新风控画像失败: {e}" + + @agent.agent.tool + async def record_customer_risk_event( + ctx: RunContext[Any], + customer_id: str, + event_type: str, + event_count: int = 1, + note: str = "", + ) -> str: + """记录风控事件:refund/unpaid_order/bad_review/blacklist_hit 等。""" + try: + risk_db.record_event( + customer_id=customer_id, + event_type=event_type, + event_count=event_count, + note=note, + ) + return "风控事件已记录" + except Exception as e: + return f"记录风控事件失败: {e}" + + @agent.agent.tool + async def save_customer_note( + ctx: RunContext[Any], + customer_id: str, + note: str + ) -> str: + """ + 记录客户关键信息到画像(邮箱/微信/特殊需求等)。 + 客户提供联系方式或重要信息时调用。 + """ + try: + from db.customer_db import db + db.add_note(customer_id, note) + return "已记录" + except Exception as e: + return f"记录失败: {e}" + + @agent.agent.tool + async def update_contact_info( + ctx: RunContext[Any], + customer_id: str, + contact_type: str, + value: str + ) -> str: + """ + 更新客户联系方式。 + 当客户说出邮箱/手机/微信时调用,比正则提取更准确。 + + contact_type 枚举值: + email - 邮箱 + phone - 手机号 + wechat - 微信号 + """ + try: + from db.customer_db import db + if contact_type == "email": + db.update_email(customer_id, value) + elif contact_type == "phone": + db.update_phone(customer_id, value) + elif contact_type == "wechat": + db.update_wechat(customer_id, value) + else: + return f"未知联系方式类型: {contact_type}" + return f"已保存 {contact_type}: {value}" + except Exception as e: + return f"保存失败: {e}" + + @agent.agent.tool + async def record_quote( + ctx: RunContext[Any], + customer_id: str, + price: int, + description: str = "" + ) -> str: + """ + 记录本次报价到客户画像,用于后续对话保持价格一致。 + 每次给客户报价后调用。 + + Args: + customer_id: 客户ID + price: 报价金额(元) + description: 报价描述,如"单图处理"/"三图打包" + """ + try: + from db.customer_db import db + db.update_last_price(customer_id, price) + if description: + db.add_note(customer_id, f"报价 {price}元({description})") + # 同步到内存状态 + state = agent.conversations.get(customer_id) + if state: + state.last_price = price + return f"已记录报价 {price}元" + except Exception as e: + return f"记录失败: {e}" + + @agent.agent.tool + async def process_image_gemini(ctx: RunContext[Any], customer_id: str = "") -> str: + """ + 触发 Gemini 作图处理。客户付款后或说「安排一下」「处理一下」时调用。 + 会从客户档案读取上次发图的 URL 和处理参数(提示词、比例、透视),启动 Gemini 流程。 + 处理完成后会自动发图给客户。 + """ + try: + from config.config import IMAGE_MODULE_ENABLED + if not IMAGE_MODULE_ENABLED: + return "现在处理模块暂时暂停,先不自动作图" + except Exception: + return "现在处理模块暂时暂停,先不自动作图" + cid = customer_id or ctx.deps.from_id + try: + from core.workflow import workflow + ok = await workflow.trigger_processing_on_payment( + customer_id=cid, + acc_id=ctx.deps.acc_id, + acc_type=ctx.deps.platform, + ) + if ok: + return "已安排,稍后发你" + return "该客户暂无待处理图片,请先发图" + except Exception as e: + return f"触发作图失败: {e},请稍后重试或转人工" + + @agent.agent_pricing.tool + async def analyze_image_pricing(ctx: RunContext[Any], image_url: str) -> str: + try: + from image.image_analyzer import image_analyzer + result = await image_analyzer.analyze(image_url) + if result.get("feasibility") == "no" or result.get("risk") == "high": + return "该图风险高或不可做:不报价,建议换图或转人工评估。" + if not result.get("success", False): + return "图片识别异常:先不报价,建议客户重发更清晰图片。" + p = result.get("price_suggest", 20) + try: + st = agent._get_conversation_state(ctx.deps.from_id) + if isinstance(result.get("price_min"), (int, float)): + st.last_min_price = int(result.get("price_min") or 0) + try: + from db.customer_db import db as _db + _db.update_last_min_price(ctx.deps.from_id, st.last_min_price) + except Exception: + pass + except Exception: + pass + return f"建议报价:{p}元" + except Exception as e: + return f"图片分析失败: {e}" + + @agent.agent_pricing.tool + async def record_quote_pricing( + ctx: RunContext[Any], + customer_id: str, + price: int, + description: str = "" + ) -> str: + try: + from db.customer_db import db + db.update_last_price(customer_id, price) + return "ok" + except Exception as e: + return f"记录失败: {e}" + + @agent.agent_processing.tool + async def process_image_gemini_run(ctx: RunContext[Any], customer_id: str = "") -> str: + """触发 Gemini 作图处理(processing agent 专用入口)。""" + return await process_image_gemini(ctx, customer_id) + + @agent.agent_similar.tool + async def recommend_similar(ctx: RunContext[Any], hint: str = "") -> str: + try: + return "有类似款,拍下我发你参考图。" + except Exception as e: + return f"推荐失败: {e}" + + @agent.agent_order.tool + async def handle_order(ctx: RunContext[Any], raw_msg: str = "") -> str: + try: + info = parse_order_info(raw_msg or "") + paid_kw = ["等待发货", "已付款", "付款成功", "买家已付款"] + if any(k in (info.get("pay_status", "") or "") for k in paid_kw) or any(k in (info.get("order_status", "") or "") for k in paid_kw): + return "已安排,稍后发你" + return "" + except Exception: + return "" + + @agent.agent_risk.tool + async def risk_filter(ctx: RunContext[Any], text: str = "") -> str: + return "这类不做哈,政治/敏感内容都不接。" + + @agent.agent_risk.tool + async def get_customer_risk_profile_risk(ctx: RunContext[Any], customer_id: str = "") -> str: + return await get_customer_risk_profile(ctx, customer_id) + + @agent.agent_risk.tool + async def mark_customer_risk_risk( + ctx: RunContext[Any], + customer_id: str, + do_not_serve: bool = False, + risk_level: str = "low", + risk_score: int = 0, + note: str = "", + tag: str = "", + ) -> str: + return await mark_customer_risk( + ctx=ctx, + customer_id=customer_id, + do_not_serve=do_not_serve, + risk_level=risk_level, + risk_score=risk_score, + note=note, + tag=tag, + ) + + @agent.agent_risk.tool + async def record_customer_risk_event_risk( + ctx: RunContext[Any], + customer_id: str, + event_type: str, + event_count: int = 1, + note: str = "", + ) -> str: + return await record_customer_risk_event( + ctx=ctx, + customer_id=customer_id, + event_type=event_type, + event_count=event_count, + note=note, + ) + + @agent.agent.tool + async def remove_background(ctx: RunContext[Any], image_url: str) -> str: + try: + from config.config import IMAGE_MODULE_ENABLED + if not IMAGE_MODULE_ENABLED: + return "现在处理模块暂时暂停,先不处理图片" + except Exception: + return "现在处理模块暂时暂停,先不处理图片" + """【独立工具】去背景,输出白底图。客户只要去背景时调用。""" + try: + from image.image_tools import remove_background as _rb + r = await _rb(image_url) + if r["success"]: + return f"去背景完成,已保存。自然回复客户好了发你" + return f"去背景失败:{r['message']}" + except Exception as e: + return f"去背景失败:{e}" + + @agent.agent.tool + async def perspective_correct(ctx: RunContext[Any], image_url: str) -> str: + try: + from config.config import IMAGE_MODULE_ENABLED + if not IMAGE_MODULE_ENABLED: + return "现在处理模块暂时暂停,先不处理图片" + except Exception: + return "现在处理模块暂时暂停,先不处理图片" + """【独立工具】透视矫正。输入需白底图,输出展平图。""" + try: + from image.image_tools import perspective_correct as _pc + r = await _pc(image_url) + if r["success"]: + return f"透视矫正完成。自然回复客户好了" + return f"透视矫正失败:{r['message']}" + except Exception as e: + return f"透视矫正失败:{e}" + + @agent.agent.tool + async def extract_pattern_tool( + ctx: RunContext[Any], + image_url: str, + prompt: str = "", + aspect_ratio: str = "1:1" + ) -> str: + try: + from config.config import IMAGE_MODULE_ENABLED + if not IMAGE_MODULE_ENABLED: + return "现在处理模块暂时暂停,先不处理图片" + except Exception: + return "现在处理模块暂时暂停,先不处理图片" + """【独立工具】印花提取/主处理。按提示词和比例处理。""" + try: + from image.image_tools import extract_pattern + r = await extract_pattern(image_url, prompt=prompt, aspect_ratio=aspect_ratio) + if r["success"]: + return f"提取完成。自然回复客户好了发你" + return f"提取失败:{r['message']}" + except Exception as e: + return f"提取失败:{e}" + + @agent.agent.tool + async def enhance_image_tool(ctx: RunContext[Any], image_url: str) -> str: + try: + from config.config import IMAGE_MODULE_ENABLED + if not IMAGE_MODULE_ENABLED: + return "现在处理模块暂时暂停,先不处理图片" + except Exception: + return "现在处理模块暂时暂停,先不处理图片" + """【独立工具】高清增强。客户只要清晰化时调用。""" + try: + from image.image_tools import enhance_image + r = await enhance_image(image_url) + if r["success"]: + return f"高清增强完成。自然回复客户好了" + return f"增强失败:{r['message']}" + except Exception as e: + return f"增强失败:{e}" + + @agent.agent.tool + async def color_match_tool( + ctx: RunContext[Any], + orig_url: str, + result_url: str, + strength: float = 0.75 + ) -> str: + try: + from config.config import IMAGE_MODULE_ENABLED + if not IMAGE_MODULE_ENABLED: + return "现在处理模块暂时暂停,先不处理图片" + except Exception: + return "现在处理模块暂时暂停,先不处理图片" + """【独立工具】颜色匹配。将 result 色调匹配到 orig。""" + try: + from image.image_tools import color_match_images + r = await color_match_images(orig_url, result_url, strength=strength) + if r["success"]: + return f"颜色匹配完成" + return f"颜色匹配失败:{r['message']}" + except Exception as e: + return f"颜色匹配失败:{e}" + + @agent.agent.tool + async def trim_border_tool(ctx: RunContext[Any], image_url: str) -> str: + try: + from config.config import IMAGE_MODULE_ENABLED + if not IMAGE_MODULE_ENABLED: + return "现在处理模块暂时暂停,先不处理图片" + except Exception: + return "现在处理模块暂时暂停,先不处理图片" + """【独立工具】裁切四周背景边(白/黄/米等)。""" + try: + from image.image_tools import trim_border + r = await trim_border(image_url) + if r["success"]: + return f"裁边完成" + return f"裁边失败:{r['message']}" + except Exception as e: + return f"裁边失败:{e}" + + @agent.agent.tool + async def vectorize_to_eps_tool(ctx: RunContext[Any], image_url: str) -> str: + try: + from config.config import IMAGE_MODULE_ENABLED + if not IMAGE_MODULE_ENABLED: + return "现在处理模块暂时暂停,先不处理图片" + except Exception: + return "现在处理模块暂时暂停,先不处理图片" + """【独立工具】矢量化 - 将图片转为 EPS 矢量文件。客户要做矢量图、转 EPS、转 AI 格式时调用。""" + try: + from image.image_tools import vectorize_to_eps + r = await vectorize_to_eps(image_url) + if r["success"]: + return f"矢量化完成,已生成 EPS 文件。自然回复客户好了发你" + return f"矢量化失败:{r['message']}" + except Exception as e: + return f"矢量化失败:{e}" + + @agent.agent.tool + async def meitu_enhance_tool( + ctx: RunContext[Any], + image_url: str, + mode: str = "standard" + ) -> str: + try: + from config.config import IMAGE_MODULE_ENABLED + if not IMAGE_MODULE_ENABLED: + return "现在处理模块暂时暂停,先不处理图片" + except Exception: + return "现在处理模块暂时暂停,先不处理图片" + """ + 【独立工具】美图画质增强。客户要画质增强、清晰化、美图处理时调用。 + + Args: + image_url: 图片 URL 或本地路径 + mode: 处理模式。crystal(极速重绘) standard(标准) enhance(增强) hdr(HDR) portrait(人像优化) + """ + try: + from image.image_tools import meitu_enhance + r = await meitu_enhance(image_url, mode=mode) + if r["success"]: + return f"画质增强完成。自然回复客户好了发你" + return f"画质增强失败:{r['message']}" + except Exception as e: + return f"画质增强失败:{e}" + + @agent.agent.tool + async def resize_image( + ctx: RunContext[Any], + image_url: str, + width: int, + height: int = 0 + ) -> str: + try: + from config.config import IMAGE_MODULE_ENABLED + if not IMAGE_MODULE_ENABLED: + return "现在处理模块暂时暂停,先不处理图片" + except Exception: + return "现在处理模块暂时暂停,先不处理图片" + """ + 改图片尺寸。客户说「改成1920x1080」「弄成横图」「改下尺寸」时调用。 + + Args: + image_url: 图片URL(客户刚发的图,或从对话中获取) + width: 目标宽度(像素),如 1920 + height: 目标高度(0=按宽度等比缩放),如 1080 + + 常用尺寸:1920x1080(横屏) 1080x1920(竖屏) 2000x2000(方图) + """ + try: + from image.image_processor import image_processor + result = await image_processor.resize(image_url, width, height) + if result["success"]: + return f"改尺寸完成:{width}x{height},已保存。自然回复客户改好了" + else: + return f"改尺寸失败:{result['message']},告知客户稍后重试" + except Exception as e: + return f"改尺寸失败:{e}" + + @agent.agent.tool + async def calculate_bulk_price( + ctx: RunContext[Any], + image_count: int, + complexities: str = "" + ) -> str: + """ + 计算多图打包价格。 + 客户要做多张图时调用,返回建议总价。 + + Args: + image_count: 图片数量 + complexities: 各图复杂度,逗号分隔,如 "normal,complex,simple" + 没有识别结果时留空,按平均价格估算 + """ + if image_count <= 0: + return "图片数量无效" + + # 各复杂度单价(必须为5的整数倍) + unit_price = {"simple": 15, "normal": 20, "complex": 25, "hard": 30} + default_unit = 20 # 没有识别结果时的默认单价 + + if complexities: + levels = [c.strip() for c in complexities.split(",")] + total = sum(unit_price.get(lv, default_unit) for lv in levels) + else: + total = image_count * default_unit + + # 打包优惠:3张以上9折,5张以上8折,价格必须为5的整数倍 + if image_count >= 5: + discounted = round(total * 0.8 / 5) * 5 + tip = f"({image_count}张8折优惠)" + elif image_count >= 3: + discounted = round(total * 0.9 / 5) * 5 + tip = f"({image_count}张9折优惠)" + else: + discounted = round(total / 5) * 5 + tip = "" + + return f"建议打包报价:{discounted}元{tip}(原价{total}元)" diff --git a/core/pydantic_ai_agent.py b/core/pydantic_ai_agent.py index b2a751a..7302cbc 100755 --- a/core/pydantic_ai_agent.py +++ b/core/pydantic_ai_agent.py @@ -81,7 +81,6 @@ load_dotenv() from services.service_tuhui_upload import upload_to_tuhui from core.workflow_router import get_workflow_router from core.workflow_router import get_workflow_router -from db.customer_risk_db import risk_db # ========== 企业微信通知 ========== _WECHAT_WEBHOOK = os.getenv("WECHAT_WEBHOOK", "") @@ -585,673 +584,8 @@ class CustomerServiceAgent: def _register_tools(self): """注册所有 Tool,让 Agent 可以主动调用""" - - - @self.agent.tool - async def analyze_image(ctx: RunContext[AgentDeps], image_url: str) -> str: - """ - 分析客户发来的图片复杂度,用于报价。 - 收到图片URL时调用此工具,返回复杂度和建议报价。 - """ - try: - from image.image_analyzer import image_analyzer - result = await image_analyzer.analyze(image_url) - complexity_label = { - "simple": "简单(画面干净)", - "normal": "一般复杂度", - "complex": "细节偏多", - "hard": "非常复杂", - }.get(result["complexity"], result["complexity"]) - # 持久化图片URL和复杂度,重启后仍能记住这张图 - try: - from db.customer_db import db - db.update_last_image( - ctx.deps.from_id, - image_url, - complexity=result["complexity"], - gemini_prompt=result.get("gemini_prompt", ""), - aspect_ratio=result.get("aspect_ratio", "1:1"), - perspective=result.get("perspective", "no"), - ) - except Exception: - pass - - # 存图片类型到客户画像 - try: - from db.customer_db import db as _db - if result.get("subject"): - _db.add_image_type(ctx.deps.from_id, result["subject"]) - except Exception: - pass - - # 在 workflow 里创建待处理任务(付款后自动触发 Gemini) - try: - from core.workflow import workflow - await workflow.image_analysis_result( - customer_id=ctx.deps.from_id, - image_url=image_url, - complexity=result["complexity"], - acc_id=ctx.deps.acc_id, - acc_type=ctx.deps.platform, - gemini_prompt=result.get("gemini_prompt", ""), - aspect_ratio=result.get("aspect_ratio", "1:1"), - perspective=result.get("perspective", "no"), - proc_type=result.get("proc_type", ""), - subject=result.get("subject", ""), - quality=result.get("quality", ""), - ) - print(f"[Agent] Workflow 任务已创建 | 客户: {ctx.deps.from_id} | 比例: {result.get('aspect_ratio')} | 透视: {result.get('perspective')} | 图片: {image_url[:60]}...") - except Exception as e: - print(f"[Agent] Workflow 任务创建失败: {e}") - - # 组装给 AI 的分析报告 - risk = result.get("risk", "none") - has_face = result.get("has_face", "no") - feasibility = result.get("feasibility", "yes") - note = result.get("note", "") - - lines = [ - f"图片主体:{result['subject'] or '未识别'}", - f"处理类型:{result['proc_type'] or '高清修复'}", - f"原图质量:{result['quality'] or '未知'}", - f"图片类型:{result.get('category', '') or '通用'}", - f"图片尺寸:{(result.get('width') or 0)}x{(result.get('height') or 0)}({result.get('megapixels', 0.0)}MP)", - f"含人脸:{'是' if has_face == 'yes' else '否'}", - f"复杂度:{complexity_label}", - f"原因:{result['reason']}", - ] - if result.get("size_surcharge"): - lines.append(f"尺寸加价:+{result['size_surcharge']}元") - if result.get("size_note"): - lines.append(f"尺寸提示:{result['size_note']}") - try: - st = self._get_conversation_state(ctx.deps.from_id) - if isinstance(result.get("price_min"), (int, float)): - st.last_min_price = int(result.get("price_min") or 0) - try: - from db.customer_db import db as _db - _db.update_last_min_price(ctx.deps.from_id, st.last_min_price) - except Exception: - pass - except Exception: - pass - - # 根据可做性和风险等级给 AI 不同的行动指引 - if feasibility == "no": - if "敏感" in (note or ""): - lines.append("【拒绝】图片含敏感/黄色/擦边内容,不接单。") - lines.append("→ 直接拒绝,不说「发图来看看」,自然回复如:这类不做/不接。") - else: - lines.append("【无法处理】此图无法处理(纯黑/纯白/完全损坏/要找原始RAW文件)。") - lines.append("→ 告知客户无法处理,建议换图或说明原因,不要报价。") - elif risk == "high": - lines.append(f"【高风险】此图处理风险高:{note or 'AI修复后效果不能保证与原图一致'}") - lines.append(f"建议报价:{result['price_suggest']}元") - lines.append("→ 先自然说明风险(人脸/效果可能不完美),再报价,满意再拍。话术自然。") - elif risk == "low": - lines.append(f"【低风险-含人脸】修复后人脸相似度约70-90%,效果不稳定。") - lines.append(f"建议报价:{result['price_suggest']}元") - lines.append(f"→ 报价时自然加一句风险提示(人脸可能有轻微变化、满意再付等)") - else: - # 无风险,正常报价 - base_price = result.get('price_suggest', 20) - text_surcharge = result.get('text_surcharge', 0) - layer_surcharge = result.get('layer_surcharge', 0) - total_price = base_price + text_surcharge + layer_surcharge - - # 构建报价说明 - price_explanation = f"建议报价:{total_price}元" - if text_surcharge > 0: - price_explanation += f"(含文字处理 +{text_surcharge}元)" - if layer_surcharge > 0: - price_explanation += f"(含分层 +{layer_surcharge}元)" - - lines.append(price_explanation) - - # 添加文字数量说明 - text_amount = result.get('text_amount', 'none') - if text_amount != 'none': - lines.append(f"文字数量:{text_amount},需要精细处理") - - if feasibility == "partial": - lines.append(f"⚠️ 此图有一定难度:{note or '效果可能不完美'},回复时可加「效果不满意退款」") - if note and note not in ("无", ""): - lines.append(f"提示:{note}") - lines.append(f"【立刻回复客户报价 {total_price} 元,话术自然多变】") - - return "\n".join(lines) - except Exception as e: - return f"图片分析失败: {e},请根据经验判断报价" - - @self.agent.tool - async def get_customer_info(ctx: RunContext[AgentDeps], customer_id: str) -> str: - """ - 查询客户历史信息:消费记录、性格标签、报价历史等。 - 对话开始时或需要了解客户背景时调用。 - """ - try: - from db.customer_db import db - return db.get_profile_text(customer_id) - except Exception as e: - return f"查询失败: {e}" - - @self.agent.tool - async def transfer_to_human(ctx: RunContext[AgentDeps]) -> str: - """ - 转接人工客服。 - 遇到退款/投诉/情绪激动/复杂售后时调用。 - """ - return "TRANSFER_REQUESTED" - - @self.agent.tool - async def get_customer_risk_profile(ctx: RunContext[AgentDeps], customer_id: str = "") -> str: - """查询客户风控画像:退款/不付款/差评/人工黑名单等。""" - cid = customer_id or ctx.deps.from_id - try: - info = risk_db.evaluate_customer(cid) - return ( - f"客户:{cid}\n" - f"不接单:{'是' if info.get('do_not_serve') else '否'}\n" - f"风险等级:{info.get('computed_level','low')} 分数:{info.get('computed_score',0)}\n" - f"近30天退款:{info.get('refund_30d',0)}\n" - f"近7天未付款下单:{info.get('unpaid_7d',0)}\n" - f"近90天差评:{info.get('bad_review_90d',0)}\n" - f"备注:{info.get('note','') or '无'}" - ) - except Exception as e: - return f"查询风控画像失败: {e}" - - @self.agent.tool - async def mark_customer_risk( - ctx: RunContext[AgentDeps], - customer_id: str, - do_not_serve: bool = False, - risk_level: str = "low", - risk_score: int = 0, - note: str = "", - tag: str = "", - ) -> str: - """人工标记客户风控画像(不接单/高风险/备注标签)。""" - try: - tags = [tag] if tag else [] - risk_db.set_profile( - customer_id=customer_id, - do_not_serve=do_not_serve, - risk_level=risk_level, - risk_score=risk_score, - note=note, - tags=tags, - ) - return "风控画像已更新" - except Exception as e: - return f"更新风控画像失败: {e}" - - @self.agent.tool - async def record_customer_risk_event( - ctx: RunContext[AgentDeps], - customer_id: str, - event_type: str, - event_count: int = 1, - note: str = "", - ) -> str: - """记录风控事件:refund/unpaid_order/bad_review/blacklist_hit 等。""" - try: - risk_db.record_event( - customer_id=customer_id, - event_type=event_type, - event_count=event_count, - note=note, - ) - return "风控事件已记录" - except Exception as e: - return f"记录风控事件失败: {e}" - - @self.agent.tool - async def save_customer_note( - ctx: RunContext[AgentDeps], - customer_id: str, - note: str - ) -> str: - """ - 记录客户关键信息到画像(邮箱/微信/特殊需求等)。 - 客户提供联系方式或重要信息时调用。 - """ - try: - from db.customer_db import db - db.add_note(customer_id, note) - return "已记录" - except Exception as e: - return f"记录失败: {e}" - - @self.agent.tool - async def update_contact_info( - ctx: RunContext[AgentDeps], - customer_id: str, - contact_type: str, - value: str - ) -> str: - """ - 更新客户联系方式。 - 当客户说出邮箱/手机/微信时调用,比正则提取更准确。 - - contact_type 枚举值: - email - 邮箱 - phone - 手机号 - wechat - 微信号 - """ - try: - from db.customer_db import db - if contact_type == "email": - db.update_email(customer_id, value) - elif contact_type == "phone": - db.update_phone(customer_id, value) - elif contact_type == "wechat": - db.update_wechat(customer_id, value) - else: - return f"未知联系方式类型: {contact_type}" - return f"已保存 {contact_type}: {value}" - except Exception as e: - return f"保存失败: {e}" - - @self.agent.tool - async def record_quote( - ctx: RunContext[AgentDeps], - customer_id: str, - price: int, - description: str = "" - ) -> str: - """ - 记录本次报价到客户画像,用于后续对话保持价格一致。 - 每次给客户报价后调用。 - - Args: - customer_id: 客户ID - price: 报价金额(元) - description: 报价描述,如"单图处理"/"三图打包" - """ - try: - from db.customer_db import db - db.update_last_price(customer_id, price) - if description: - db.add_note(customer_id, f"报价 {price}元({description})") - # 同步到内存状态 - state = self.conversations.get(customer_id) - if state: - state.last_price = price - return f"已记录报价 {price}元" - except Exception as e: - return f"记录失败: {e}" - - @self.agent.tool - async def process_image_gemini(ctx: RunContext[AgentDeps], customer_id: str = "") -> str: - """ - 触发 Gemini 作图处理。客户付款后或说「安排一下」「处理一下」时调用。 - 会从客户档案读取上次发图的 URL 和处理参数(提示词、比例、透视),启动 Gemini 流程。 - 处理完成后会自动发图给客户。 - """ - try: - from config.config import IMAGE_MODULE_ENABLED - if not IMAGE_MODULE_ENABLED: - return "现在处理模块暂时暂停,先不自动作图" - except Exception: - return "现在处理模块暂时暂停,先不自动作图" - cid = customer_id or ctx.deps.from_id - try: - from core.workflow import workflow - ok = await workflow.trigger_processing_on_payment( - customer_id=cid, - acc_id=ctx.deps.acc_id, - acc_type=ctx.deps.platform, - ) - if ok: - return "已安排,稍后发你" - return "该客户暂无待处理图片,请先发图" - except Exception as e: - return f"触发作图失败: {e},请稍后重试或转人工" - - @self.agent_pricing.tool - async def analyze_image_pricing(ctx: RunContext[AgentDeps], image_url: str) -> str: - try: - from image.image_analyzer import image_analyzer - result = await image_analyzer.analyze(image_url) - if result.get("feasibility") == "no" or result.get("risk") == "high": - return "该图风险高或不可做:不报价,建议换图或转人工评估。" - if not result.get("success", False): - return "图片识别异常:先不报价,建议客户重发更清晰图片。" - p = result.get("price_suggest", 20) - try: - st = self._get_conversation_state(ctx.deps.from_id) - if isinstance(result.get("price_min"), (int, float)): - st.last_min_price = int(result.get("price_min") or 0) - try: - from db.customer_db import db as _db - _db.update_last_min_price(ctx.deps.from_id, st.last_min_price) - except Exception: - pass - except Exception: - pass - return f"建议报价:{p}元" - except Exception as e: - return f"图片分析失败: {e}" - - @self.agent_pricing.tool - async def record_quote_pricing( - ctx: RunContext[AgentDeps], - customer_id: str, - price: int, - description: str = "" - ) -> str: - try: - from db.customer_db import db - db.update_last_price(customer_id, price) - return "ok" - except Exception as e: - return f"记录失败: {e}" - - @self.agent_processing.tool - async def process_image_gemini_run(ctx: RunContext[AgentDeps], customer_id: str = "") -> str: - """触发 Gemini 作图处理(processing agent 专用入口)。""" - return await process_image_gemini(ctx, customer_id) - - @self.agent_similar.tool - async def recommend_similar(ctx: RunContext[AgentDeps], hint: str = "") -> str: - try: - return "有类似款,拍下我发你参考图。" - except Exception as e: - return f"推荐失败: {e}" - - @self.agent_order.tool - async def handle_order(ctx: RunContext[AgentDeps], raw_msg: str = "") -> str: - try: - info = parse_order_info(raw_msg or "") - paid_kw = ["等待发货", "已付款", "付款成功", "买家已付款"] - if any(k in (info.get("pay_status", "") or "") for k in paid_kw) or any(k in (info.get("order_status", "") or "") for k in paid_kw): - return "已安排,稍后发你" - return "" - except Exception: - return "" - - @self.agent_risk.tool - async def risk_filter(ctx: RunContext[AgentDeps], text: str = "") -> str: - return "这类不做哈,政治/敏感内容都不接。" - - @self.agent_risk.tool - async def get_customer_risk_profile_risk(ctx: RunContext[AgentDeps], customer_id: str = "") -> str: - return await get_customer_risk_profile(ctx, customer_id) - - @self.agent_risk.tool - async def mark_customer_risk_risk( - ctx: RunContext[AgentDeps], - customer_id: str, - do_not_serve: bool = False, - risk_level: str = "low", - risk_score: int = 0, - note: str = "", - tag: str = "", - ) -> str: - return await mark_customer_risk( - ctx=ctx, - customer_id=customer_id, - do_not_serve=do_not_serve, - risk_level=risk_level, - risk_score=risk_score, - note=note, - tag=tag, - ) - - @self.agent_risk.tool - async def record_customer_risk_event_risk( - ctx: RunContext[AgentDeps], - customer_id: str, - event_type: str, - event_count: int = 1, - note: str = "", - ) -> str: - return await record_customer_risk_event( - ctx=ctx, - customer_id=customer_id, - event_type=event_type, - event_count=event_count, - note=note, - ) - - @self.agent.tool - async def remove_background(ctx: RunContext[AgentDeps], image_url: str) -> str: - try: - from config.config import IMAGE_MODULE_ENABLED - if not IMAGE_MODULE_ENABLED: - return "现在处理模块暂时暂停,先不处理图片" - except Exception: - return "现在处理模块暂时暂停,先不处理图片" - """【独立工具】去背景,输出白底图。客户只要去背景时调用。""" - try: - from image.image_tools import remove_background as _rb - r = await _rb(image_url) - if r["success"]: - return f"去背景完成,已保存。自然回复客户好了发你" - return f"去背景失败:{r['message']}" - except Exception as e: - return f"去背景失败:{e}" - - @self.agent.tool - async def perspective_correct(ctx: RunContext[AgentDeps], image_url: str) -> str: - try: - from config.config import IMAGE_MODULE_ENABLED - if not IMAGE_MODULE_ENABLED: - return "现在处理模块暂时暂停,先不处理图片" - except Exception: - return "现在处理模块暂时暂停,先不处理图片" - """【独立工具】透视矫正。输入需白底图,输出展平图。""" - try: - from image.image_tools import perspective_correct as _pc - r = await _pc(image_url) - if r["success"]: - return f"透视矫正完成。自然回复客户好了" - return f"透视矫正失败:{r['message']}" - except Exception as e: - return f"透视矫正失败:{e}" - - @self.agent.tool - async def extract_pattern_tool( - ctx: RunContext[AgentDeps], - image_url: str, - prompt: str = "", - aspect_ratio: str = "1:1" - ) -> str: - try: - from config.config import IMAGE_MODULE_ENABLED - if not IMAGE_MODULE_ENABLED: - return "现在处理模块暂时暂停,先不处理图片" - except Exception: - return "现在处理模块暂时暂停,先不处理图片" - """【独立工具】印花提取/主处理。按提示词和比例处理。""" - try: - from image.image_tools import extract_pattern - r = await extract_pattern(image_url, prompt=prompt, aspect_ratio=aspect_ratio) - if r["success"]: - return f"提取完成。自然回复客户好了发你" - return f"提取失败:{r['message']}" - except Exception as e: - return f"提取失败:{e}" - - @self.agent.tool - async def enhance_image_tool(ctx: RunContext[AgentDeps], image_url: str) -> str: - try: - from config.config import IMAGE_MODULE_ENABLED - if not IMAGE_MODULE_ENABLED: - return "现在处理模块暂时暂停,先不处理图片" - except Exception: - return "现在处理模块暂时暂停,先不处理图片" - """【独立工具】高清增强。客户只要清晰化时调用。""" - try: - from image.image_tools import enhance_image - r = await enhance_image(image_url) - if r["success"]: - return f"高清增强完成。自然回复客户好了" - return f"增强失败:{r['message']}" - except Exception as e: - return f"增强失败:{e}" - - @self.agent.tool - async def color_match_tool( - ctx: RunContext[AgentDeps], - orig_url: str, - result_url: str, - strength: float = 0.75 - ) -> str: - try: - from config.config import IMAGE_MODULE_ENABLED - if not IMAGE_MODULE_ENABLED: - return "现在处理模块暂时暂停,先不处理图片" - except Exception: - return "现在处理模块暂时暂停,先不处理图片" - """【独立工具】颜色匹配。将 result 色调匹配到 orig。""" - try: - from image.image_tools import color_match_images - r = await color_match_images(orig_url, result_url, strength=strength) - if r["success"]: - return f"颜色匹配完成" - return f"颜色匹配失败:{r['message']}" - except Exception as e: - return f"颜色匹配失败:{e}" - - @self.agent.tool - async def trim_border_tool(ctx: RunContext[AgentDeps], image_url: str) -> str: - try: - from config.config import IMAGE_MODULE_ENABLED - if not IMAGE_MODULE_ENABLED: - return "现在处理模块暂时暂停,先不处理图片" - except Exception: - return "现在处理模块暂时暂停,先不处理图片" - """【独立工具】裁切四周背景边(白/黄/米等)。""" - try: - from image.image_tools import trim_border - r = await trim_border(image_url) - if r["success"]: - return f"裁边完成" - return f"裁边失败:{r['message']}" - except Exception as e: - return f"裁边失败:{e}" - - @self.agent.tool - async def vectorize_to_eps_tool(ctx: RunContext[AgentDeps], image_url: str) -> str: - try: - from config.config import IMAGE_MODULE_ENABLED - if not IMAGE_MODULE_ENABLED: - return "现在处理模块暂时暂停,先不处理图片" - except Exception: - return "现在处理模块暂时暂停,先不处理图片" - """【独立工具】矢量化 - 将图片转为 EPS 矢量文件。客户要做矢量图、转 EPS、转 AI 格式时调用。""" - try: - from image.image_tools import vectorize_to_eps - r = await vectorize_to_eps(image_url) - if r["success"]: - return f"矢量化完成,已生成 EPS 文件。自然回复客户好了发你" - return f"矢量化失败:{r['message']}" - except Exception as e: - return f"矢量化失败:{e}" - - @self.agent.tool - async def meitu_enhance_tool( - ctx: RunContext[AgentDeps], - image_url: str, - mode: str = "standard" - ) -> str: - try: - from config.config import IMAGE_MODULE_ENABLED - if not IMAGE_MODULE_ENABLED: - return "现在处理模块暂时暂停,先不处理图片" - except Exception: - return "现在处理模块暂时暂停,先不处理图片" - """ - 【独立工具】美图画质增强。客户要画质增强、清晰化、美图处理时调用。 - - Args: - image_url: 图片 URL 或本地路径 - mode: 处理模式。crystal(极速重绘) standard(标准) enhance(增强) hdr(HDR) portrait(人像优化) - """ - try: - from image.image_tools import meitu_enhance - r = await meitu_enhance(image_url, mode=mode) - if r["success"]: - return f"画质增强完成。自然回复客户好了发你" - return f"画质增强失败:{r['message']}" - except Exception as e: - return f"画质增强失败:{e}" - - @self.agent.tool - async def resize_image( - ctx: RunContext[AgentDeps], - image_url: str, - width: int, - height: int = 0 - ) -> str: - try: - from config.config import IMAGE_MODULE_ENABLED - if not IMAGE_MODULE_ENABLED: - return "现在处理模块暂时暂停,先不处理图片" - except Exception: - return "现在处理模块暂时暂停,先不处理图片" - """ - 改图片尺寸。客户说「改成1920x1080」「弄成横图」「改下尺寸」时调用。 - - Args: - image_url: 图片URL(客户刚发的图,或从对话中获取) - width: 目标宽度(像素),如 1920 - height: 目标高度(0=按宽度等比缩放),如 1080 - - 常用尺寸:1920x1080(横屏) 1080x1920(竖屏) 2000x2000(方图) - """ - try: - from image.image_processor import image_processor - result = await image_processor.resize(image_url, width, height) - if result["success"]: - return f"改尺寸完成:{width}x{height},已保存。自然回复客户改好了" - else: - return f"改尺寸失败:{result['message']},告知客户稍后重试" - except Exception as e: - return f"改尺寸失败:{e}" - - @self.agent.tool - async def calculate_bulk_price( - ctx: RunContext[AgentDeps], - image_count: int, - complexities: str = "" - ) -> str: - """ - 计算多图打包价格。 - 客户要做多张图时调用,返回建议总价。 - - Args: - image_count: 图片数量 - complexities: 各图复杂度,逗号分隔,如 "normal,complex,simple" - 没有识别结果时留空,按平均价格估算 - """ - if image_count <= 0: - return "图片数量无效" - - # 各复杂度单价(必须为5的整数倍) - unit_price = {"simple": 15, "normal": 20, "complex": 25, "hard": 30} - default_unit = 20 # 没有识别结果时的默认单价 - - if complexities: - levels = [c.strip() for c in complexities.split(",")] - total = sum(unit_price.get(lv, default_unit) for lv in levels) - else: - total = image_count * default_unit - - # 打包优惠:3张以上9折,5张以上8折,价格必须为5的整数倍 - if image_count >= 5: - discounted = round(total * 0.8 / 5) * 5 - tip = f"({image_count}张8折优惠)" - elif image_count >= 3: - discounted = round(total * 0.9 / 5) * 5 - tip = f"({image_count}张9折优惠)" - else: - discounted = round(total / 5) * 5 - tip = "" - - return f"建议打包报价:{discounted}元{tip}(原价{total}元)" + from core.agent_tools import register_tools + register_tools(self) # 对话状态超过多少小时后重置(避免昨天的售后状态影响今天) CONVERSATION_TIMEOUT_HOURS = 12