from __future__ import annotations import logging from typing import Any from pydantic_ai import RunContext from db.customer_risk_db import risk_db from services.service_tuhui_upload import upload_to_tuhui from core.order_helpers import parse_order_info logger = logging.getLogger("cs_agent") def register_tools(agent) -> None: """注册所有 Tool,让 Agent 可以主动调用。""" @agent.agent.tool async def analyze_image(ctx: RunContext[Any], image_url: str) -> str: """ 分析客户发来的图片复杂度,用于报价。 收到图片URL时调用此工具,返回复杂度和建议报价。 """ try: from image.image_analyzer import image_analyzer result = await image_analyzer.analyze(image_url) complexity_label = { "simple": "简单(画面干净)", "normal": "一般复杂度", "complex": "细节偏多", "hard": "非常复杂", }.get(result["complexity"], result["complexity"]) # 持久化图片URL和复杂度,重启后仍能记住这张图 try: from db.customer_db import db db.update_last_image( ctx.deps.from_id, image_url, complexity=result["complexity"], gemini_prompt=result.get("gemini_prompt", ""), aspect_ratio=result.get("aspect_ratio", "1:1"), perspective=result.get("perspective", "no"), ) except Exception: pass # 存图片类型到客户画像 try: from db.customer_db import db as _db if result.get("subject"): _db.add_image_type(ctx.deps.from_id, result["subject"]) except Exception: pass # 在 workflow 里创建待处理任务(付款后自动触发 Gemini) try: from core.workflow import workflow await workflow.image_analysis_result( customer_id=ctx.deps.from_id, image_url=image_url, complexity=result["complexity"], acc_id=ctx.deps.acc_id, acc_type=ctx.deps.platform, gemini_prompt=result.get("gemini_prompt", ""), aspect_ratio=result.get("aspect_ratio", "1:1"), perspective=result.get("perspective", "no"), proc_type=result.get("proc_type", ""), subject=result.get("subject", ""), quality=result.get("quality", ""), ) logger.info( "[Agent] Workflow 任务已创建 | 客户: %s | 比例: %s | 透视: %s | 图片: %s...", ctx.deps.from_id, result.get("aspect_ratio"), result.get("perspective"), image_url[:60], ) except Exception as e: logger.exception("[Agent] Workflow 任务创建失败: %s", e) # 组装给 AI 的分析报告 risk = result.get("risk", "none") has_face = result.get("has_face", "no") feasibility = result.get("feasibility", "yes") note = result.get("note", "") lines = [ f"图片主体:{result['subject'] or '未识别'}", f"处理类型:{result['proc_type'] or '高清修复'}", f"原图质量:{result['quality'] or '未知'}", f"图片类型:{result.get('category', '') or '通用'}", f"图片尺寸:{(result.get('width') or 0)}x{(result.get('height') or 0)}({result.get('megapixels', 0.0)}MP)", f"含人脸:{'是' if has_face == 'yes' else '否'}", f"复杂度:{complexity_label}", f"原因:{result['reason']}", ] if result.get("size_surcharge"): lines.append(f"尺寸加价:+{result['size_surcharge']}元") if result.get("size_note"): lines.append(f"尺寸提示:{result['size_note']}") try: st = agent._get_conversation_state(ctx.deps.from_id) if isinstance(result.get("price_min"), (int, float)): st.last_min_price = int(result.get("price_min") or 0) try: from db.customer_db import db as _db _db.update_last_min_price(ctx.deps.from_id, st.last_min_price) except Exception: pass except Exception: pass # 根据可做性和风险等级给 AI 不同的行动指引 if feasibility == "no": if "敏感" in (note or ""): lines.append("【拒绝】图片含敏感/黄色/擦边内容,不接单。") lines.append("→ 直接拒绝,不说「发图来看看」,自然回复如:这类不做/不接。") else: lines.append("【无法处理】此图无法处理(纯黑/纯白/完全损坏/要找原始RAW文件)。") lines.append("→ 告知客户无法处理,建议换图或说明原因,不要报价。") elif risk == "high": lines.append(f"【高风险】此图处理风险高:{note or 'AI修复后效果不能保证与原图一致'}") lines.append(f"建议报价:{result['price_suggest']}元") lines.append("→ 先自然说明风险(人脸/效果可能不完美),再报价,满意再拍。话术自然。") elif risk == "low": lines.append(f"【低风险-含人脸】修复后人脸相似度约70-90%,效果不稳定。") lines.append(f"建议报价:{result['price_suggest']}元") lines.append(f"→ 报价时自然加一句风险提示(人脸可能有轻微变化、满意再付等)") else: # 无风险,正常报价 base_price = result.get('price_suggest', 20) text_surcharge = result.get('text_surcharge', 0) layer_surcharge = result.get('layer_surcharge', 0) total_price = base_price + text_surcharge + layer_surcharge # 构建报价说明 price_explanation = f"建议报价:{total_price}元" if text_surcharge > 0: price_explanation += f"(含文字处理 +{text_surcharge}元)" if layer_surcharge > 0: price_explanation += f"(含分层 +{layer_surcharge}元)" lines.append(price_explanation) # 添加文字数量说明 text_amount = result.get('text_amount', 'none') if text_amount != 'none': lines.append(f"文字数量:{text_amount},需要精细处理") if feasibility == "partial": lines.append(f"⚠️ 此图有一定难度:{note or '效果可能不完美'},回复时可加「效果不满意退款」") if note and note not in ("无", ""): lines.append(f"提示:{note}") lines.append(f"【立刻回复客户报价 {total_price} 元,话术自然多变】") return "\n".join(lines) except Exception as e: return f"图片分析失败: {e},请根据经验判断报价" @agent.agent.tool async def get_customer_info(ctx: RunContext[Any], customer_id: str) -> str: """ 查询客户历史信息:消费记录、性格标签、报价历史等。 对话开始时或需要了解客户背景时调用。 """ try: from db.customer_db import db return db.get_profile_text(customer_id) except Exception as e: return f"查询失败: {e}" @agent.agent.tool async def transfer_to_human(ctx: RunContext[Any]) -> str: """ 转接人工客服。 遇到退款/投诉/情绪激动/复杂售后时调用。 """ return "TRANSFER_REQUESTED" @agent.agent.tool async def get_customer_risk_profile(ctx: RunContext[Any], customer_id: str = "") -> str: """查询客户风控画像:退款/不付款/差评/人工黑名单等。""" cid = customer_id or ctx.deps.from_id try: info = risk_db.evaluate_customer(cid) return ( f"客户:{cid}\n" f"不接单:{'是' if info.get('do_not_serve') else '否'}\n" f"风险等级:{info.get('computed_level','low')} 分数:{info.get('computed_score',0)}\n" f"近30天退款:{info.get('refund_30d',0)}\n" f"近7天未付款下单:{info.get('unpaid_7d',0)}\n" f"近90天差评:{info.get('bad_review_90d',0)}\n" f"备注:{info.get('note','') or '无'}" ) except Exception as e: return f"查询风控画像失败: {e}" @agent.agent.tool async def mark_customer_risk( ctx: RunContext[Any], customer_id: str, do_not_serve: bool = False, risk_level: str = "low", risk_score: int = 0, note: str = "", tag: str = "", ) -> str: """人工标记客户风控画像(不接单/高风险/备注标签)。""" try: tags = [tag] if tag else [] risk_db.set_profile( customer_id=customer_id, do_not_serve=do_not_serve, risk_level=risk_level, risk_score=risk_score, note=note, tags=tags, ) return "风控画像已更新" except Exception as e: return f"更新风控画像失败: {e}" @agent.agent.tool async def record_customer_risk_event( ctx: RunContext[Any], customer_id: str, event_type: str, event_count: int = 1, note: str = "", ) -> str: """记录风控事件:refund/unpaid_order/bad_review/blacklist_hit 等。""" try: risk_db.record_event( customer_id=customer_id, event_type=event_type, event_count=event_count, note=note, ) return "风控事件已记录" except Exception as e: return f"记录风控事件失败: {e}" @agent.agent.tool async def save_customer_note( ctx: RunContext[Any], customer_id: str, note: str ) -> str: """ 记录客户关键信息到画像(邮箱/微信/特殊需求等)。 客户提供联系方式或重要信息时调用。 """ try: from db.customer_db import db db.add_note(customer_id, note) return "已记录" except Exception as e: return f"记录失败: {e}" @agent.agent.tool async def update_contact_info( ctx: RunContext[Any], customer_id: str, contact_type: str, value: str ) -> str: """ 更新客户联系方式。 当客户说出邮箱/手机/微信时调用,比正则提取更准确。 contact_type 枚举值: email - 邮箱 phone - 手机号 wechat - 微信号 """ try: from db.customer_db import db if contact_type == "email": db.update_email(customer_id, value) elif contact_type == "phone": db.update_phone(customer_id, value) elif contact_type == "wechat": db.update_wechat(customer_id, value) else: return f"未知联系方式类型: {contact_type}" return f"已保存 {contact_type}: {value}" except Exception as e: return f"保存失败: {e}" @agent.agent.tool async def record_quote( ctx: RunContext[Any], customer_id: str, price: int, description: str = "" ) -> str: """ 记录本次报价到客户画像,用于后续对话保持价格一致。 每次给客户报价后调用。 Args: customer_id: 客户ID price: 报价金额(元) description: 报价描述,如"单图处理"/"三图打包" """ try: from db.customer_db import db db.update_last_price(customer_id, price) if description: db.add_note(customer_id, f"报价 {price}元({description})") # 同步到内存状态 state = agent.conversations.get(customer_id) if state: state.last_price = price return f"已记录报价 {price}元" except Exception as e: return f"记录失败: {e}" @agent.agent.tool async def process_image_gemini(ctx: RunContext[Any], customer_id: str = "") -> str: """ 触发 Gemini 作图处理。客户付款后或说「安排一下」「处理一下」时调用。 会从客户档案读取上次发图的 URL 和处理参数(提示词、比例、透视),启动 Gemini 流程。 处理完成后会自动发图给客户。 """ try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不自动作图" except Exception: return "现在处理模块暂时暂停,先不自动作图" cid = customer_id or ctx.deps.from_id try: from core.workflow import workflow ok = await workflow.trigger_processing_on_payment( customer_id=cid, acc_id=ctx.deps.acc_id, acc_type=ctx.deps.platform, ) if ok: return "已安排,稍后发你" return "该客户暂无待处理图片,请先发图" except Exception as e: return f"触发作图失败: {e},请稍后重试或转人工" @agent.agent_pricing.tool async def analyze_image_pricing(ctx: RunContext[Any], image_url: str) -> str: try: from image.image_analyzer import image_analyzer result = await image_analyzer.analyze(image_url) if result.get("feasibility") == "no" or result.get("risk") == "high": return "该图风险高或不可做:不报价,建议换图或转人工评估。" if not result.get("success", False): return "图片识别异常:先不报价,建议客户重发更清晰图片。" p = result.get("price_suggest", 20) try: st = agent._get_conversation_state(ctx.deps.from_id) if isinstance(result.get("price_min"), (int, float)): st.last_min_price = int(result.get("price_min") or 0) try: from db.customer_db import db as _db _db.update_last_min_price(ctx.deps.from_id, st.last_min_price) except Exception: pass except Exception: pass return f"建议报价:{p}元" except Exception as e: return f"图片分析失败: {e}" @agent.agent_pricing.tool async def record_quote_pricing( ctx: RunContext[Any], customer_id: str, price: int, description: str = "" ) -> str: try: from db.customer_db import db db.update_last_price(customer_id, price) return "ok" except Exception as e: return f"记录失败: {e}" @agent.agent_processing.tool async def process_image_gemini_run(ctx: RunContext[Any], customer_id: str = "") -> str: """触发 Gemini 作图处理(processing agent 专用入口)。""" return await process_image_gemini(ctx, customer_id) @agent.agent_similar.tool async def recommend_similar(ctx: RunContext[Any], hint: str = "") -> str: try: return "有类似款,拍下我发你参考图。" except Exception as e: return f"推荐失败: {e}" @agent.agent_order.tool async def handle_order(ctx: RunContext[Any], raw_msg: str = "") -> str: try: info = parse_order_info(raw_msg or "") paid_kw = ["等待发货", "已付款", "付款成功", "买家已付款"] if any(k in (info.get("pay_status", "") or "") for k in paid_kw) or any(k in (info.get("order_status", "") or "") for k in paid_kw): return "已安排,稍后发你" return "" except Exception: return "" @agent.agent_risk.tool async def risk_filter(ctx: RunContext[Any], text: str = "") -> str: return "这类不做哈,政治/敏感内容都不接。" @agent.agent_risk.tool async def get_customer_risk_profile_risk(ctx: RunContext[Any], customer_id: str = "") -> str: return await get_customer_risk_profile(ctx, customer_id) @agent.agent_risk.tool async def mark_customer_risk_risk( ctx: RunContext[Any], customer_id: str, do_not_serve: bool = False, risk_level: str = "low", risk_score: int = 0, note: str = "", tag: str = "", ) -> str: return await mark_customer_risk( ctx=ctx, customer_id=customer_id, do_not_serve=do_not_serve, risk_level=risk_level, risk_score=risk_score, note=note, tag=tag, ) @agent.agent_risk.tool async def record_customer_risk_event_risk( ctx: RunContext[Any], customer_id: str, event_type: str, event_count: int = 1, note: str = "", ) -> str: return await record_customer_risk_event( ctx=ctx, customer_id=customer_id, event_type=event_type, event_count=event_count, note=note, ) @agent.agent.tool async def remove_background(ctx: RunContext[Any], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】去背景,输出白底图。客户只要去背景时调用。""" try: from image.image_tools import remove_background as _rb r = await _rb(image_url) if r["success"]: return f"去背景完成,已保存。自然回复客户好了发你" return f"去背景失败:{r['message']}" except Exception as e: return f"去背景失败:{e}" @agent.agent.tool async def perspective_correct(ctx: RunContext[Any], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】透视矫正。输入需白底图,输出展平图。""" try: from image.image_tools import perspective_correct as _pc r = await _pc(image_url) if r["success"]: return f"透视矫正完成。自然回复客户好了" return f"透视矫正失败:{r['message']}" except Exception as e: return f"透视矫正失败:{e}" @agent.agent.tool async def extract_pattern_tool( ctx: RunContext[Any], image_url: str, prompt: str = "", aspect_ratio: str = "1:1" ) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】印花提取/主处理。按提示词和比例处理。""" try: from image.image_tools import extract_pattern r = await extract_pattern(image_url, prompt=prompt, aspect_ratio=aspect_ratio) if r["success"]: return f"提取完成。自然回复客户好了发你" return f"提取失败:{r['message']}" except Exception as e: return f"提取失败:{e}" @agent.agent.tool async def enhance_image_tool(ctx: RunContext[Any], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】高清增强。客户只要清晰化时调用。""" try: from image.image_tools import enhance_image r = await enhance_image(image_url) if r["success"]: return f"高清增强完成。自然回复客户好了" return f"增强失败:{r['message']}" except Exception as e: return f"增强失败:{e}" @agent.agent.tool async def color_match_tool( ctx: RunContext[Any], orig_url: str, result_url: str, strength: float = 0.75 ) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】颜色匹配。将 result 色调匹配到 orig。""" try: from image.image_tools import color_match_images r = await color_match_images(orig_url, result_url, strength=strength) if r["success"]: return f"颜色匹配完成" return f"颜色匹配失败:{r['message']}" except Exception as e: return f"颜色匹配失败:{e}" @agent.agent.tool async def trim_border_tool(ctx: RunContext[Any], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】裁切四周背景边(白/黄/米等)。""" try: from image.image_tools import trim_border r = await trim_border(image_url) if r["success"]: return f"裁边完成" return f"裁边失败:{r['message']}" except Exception as e: return f"裁边失败:{e}" @agent.agent.tool async def vectorize_to_eps_tool(ctx: RunContext[Any], image_url: str) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """【独立工具】矢量化 - 将图片转为 EPS 矢量文件。客户要做矢量图、转 EPS、转 AI 格式时调用。""" try: from image.image_tools import vectorize_to_eps r = await vectorize_to_eps(image_url) if r["success"]: return f"矢量化完成,已生成 EPS 文件。自然回复客户好了发你" return f"矢量化失败:{r['message']}" except Exception as e: return f"矢量化失败:{e}" @agent.agent.tool async def meitu_enhance_tool( ctx: RunContext[Any], image_url: str, mode: str = "standard" ) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """ 【独立工具】美图画质增强。客户要画质增强、清晰化、美图处理时调用。 Args: image_url: 图片 URL 或本地路径 mode: 处理模式。crystal(极速重绘) standard(标准) enhance(增强) hdr(HDR) portrait(人像优化) """ try: from image.image_tools import meitu_enhance r = await meitu_enhance(image_url, mode=mode) if r["success"]: return f"画质增强完成。自然回复客户好了发你" return f"画质增强失败:{r['message']}" except Exception as e: return f"画质增强失败:{e}" @agent.agent.tool async def resize_image( ctx: RunContext[Any], image_url: str, width: int, height: int = 0 ) -> str: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return "现在处理模块暂时暂停,先不处理图片" except Exception: return "现在处理模块暂时暂停,先不处理图片" """ 改图片尺寸。客户说「改成1920x1080」「弄成横图」「改下尺寸」时调用。 Args: image_url: 图片URL(客户刚发的图,或从对话中获取) width: 目标宽度(像素),如 1920 height: 目标高度(0=按宽度等比缩放),如 1080 常用尺寸:1920x1080(横屏) 1080x1920(竖屏) 2000x2000(方图) """ try: from image.image_processor import image_processor result = await image_processor.resize(image_url, width, height) if result["success"]: return f"改尺寸完成:{width}x{height},已保存。自然回复客户改好了" else: return f"改尺寸失败:{result['message']},告知客户稍后重试" except Exception as e: return f"改尺寸失败:{e}" @agent.agent.tool async def calculate_bulk_price( ctx: RunContext[Any], image_count: int, complexities: str = "" ) -> str: """ 计算多图打包价格。 客户要做多张图时调用,返回建议总价。 Args: image_count: 图片数量 complexities: 各图复杂度,逗号分隔,如 "normal,complex,simple" 没有识别结果时留空,按平均价格估算 """ if image_count <= 0: return "图片数量无效" # 各复杂度单价(必须为5的整数倍) unit_price = {"simple": 15, "normal": 20, "complex": 25, "hard": 30} default_unit = 20 # 没有识别结果时的默认单价 if complexities: levels = [c.strip() for c in complexities.split(",")] total = sum(unit_price.get(lv, default_unit) for lv in levels) else: total = image_count * default_unit # 打包优惠:3张以上9折,5张以上8折,价格必须为5的整数倍 if image_count >= 5: discounted = round(total * 0.8 / 5) * 5 tip = f"({image_count}张8折优惠)" elif image_count >= 3: discounted = round(total * 0.9 / 5) * 5 tip = f"({image_count}张9折优惠)" else: discounted = round(total / 5) * 5 tip = "" return f"建议打包报价:{discounted}元{tip}(原价{total}元)"