""" 客服工作流 + 图片任务状态机 架构说明: - CustomerServiceWorkflow 负责管理图片处理任务的完整生命周期 - 图片AI接入点:调用 workflow.image_ai_submit_result(task_id, result_url) - 消息回调接口:通过 register_send_callback 注入发送函数 """ import asyncio import os import uuid from enum import Enum from typing import Optional, Dict, Callable, Awaitable, Any, List from datetime import datetime from dataclasses import dataclass, field _WECHAT_WEBHOOK = os.getenv("WECHAT_WEBHOOK", "") async def _wechat_notify(content: str): """workflow 内部异常推送企业微信""" if not _WECHAT_WEBHOOK: return try: import httpx async with httpx.AsyncClient(timeout=10) as client: resp = await client.post(_WECHAT_WEBHOOK, json={ "msgtype": "markdown", "markdown": {"content": content} }) data = resp.json() if data.get("errcode") == 0: print(f"[Workflow通知] 企业微信推送成功 ✓") else: print(f"[Workflow通知] 企业微信推送失败: {data}") except Exception as e: print(f"[Workflow通知] 推送异常: {e}") from db.customer_db import db # ========== 任务状态 ========== class TaskStatus(Enum): PENDING = "待处理" # 任务已创建,等待图片AI处理 PROCESSING = "处理中" # 图片AI正在处理 AWAITING_CONFIRM = "等待客户确认" # 结果已发给客户,等待确认 REVISION = "修改中" # 客户要求修改,重新处理 COMPLETED = "已完成" # 客户确认,邮件已发 FAILED = "失败" # 处理失败 # ========== 任务数据结构 ========== @dataclass class ImageTask: task_id: str customer_id: str customer_name: str original_image: str # 原图路径或URL operation: str # 处理操作类型 requirements: str = "" # 客户原始需求描述 result_url: str = "" # 处理结果URL email: str = "" # 客户邮箱 status: TaskStatus = TaskStatus.PENDING revision_count: int = 0 # 修改次数 created_at: str = field(default_factory=lambda: datetime.now().isoformat()) updated_at: str = field(default_factory=lambda: datetime.now().isoformat()) def update_status(self, status: TaskStatus): self.status = status self.updated_at = datetime.now().isoformat() # ========== 工作流 ========== class CustomerServiceWorkflow: """ 客服工作流 图片AI对接方式: 1. 调用 create_image_task() 创建任务,获取 task_id 2. 图片AI处理完成后调用 image_ai_submit_result(task_id, result_url) 3. 工作流自动发图给客户确认,并等待客户回复 """ def __init__(self): self.tasks: Dict[str, ImageTask] = {} # task_id -> ImageTask self.customer_active_task: Dict[str, str] = {} # customer_id -> 最新 task_id self._send_message: Optional[Callable] = None # 注入的消息发送函数 self._agent_notify: Optional[Callable] = None # 注入的 AI 通知函数 self._pending_analysis: Dict[str, dict] = {} # 待报价的识别结果 # ========== 回调注册(由 websocket_client 调用)========== def register_agent_notify_callback(self, callback: Callable): """ 注册 AI 通知回调,图片处理完成时调用 AI 生成消息发给客户。 callback 签名: async def notify(customer_id, acc_id, acc_type, system_prompt) """ self._agent_notify = callback def register_send_callback(self, callback: Callable[[str, str, str, int], Awaitable[None]]): """ 注册消息发送回调函数 callback 签名: async def send(customer_id, acc_id, acc_type, content, msg_type=0) """ self._send_message = callback # ========== 任务管理 ========== def create_image_task( self, customer_id: str, customer_name: str, original_image: str, operation: str, requirements: str = "" ) -> str: """ 创建图片处理任务,返回 task_id 图片AI收到此 task_id 后开始处理,完成后调用 image_ai_submit_result """ task_id = str(uuid.uuid4()) task = ImageTask( task_id=task_id, customer_id=customer_id, customer_name=customer_name, original_image=original_image, operation=operation, requirements=requirements, ) self.tasks[task_id] = task self.customer_active_task[customer_id] = task_id # 记录需求到客户画像 if requirements: db.add_requirement(customer_id, requirements) print(f"[Workflow] 创建任务 {task_id} | 客户: {customer_name} | 操作: {operation}") return task_id def get_task(self, task_id: str) -> Optional[ImageTask]: return self.tasks.get(task_id) def get_customer_active_task(self, customer_id: str) -> Optional[ImageTask]: task_id = self.customer_active_task.get(customer_id) return self.tasks.get(task_id) if task_id else None # ========== 图片识别AI接入点(报价用)========== async def image_analysis_result( self, customer_id: str, image_url: str, complexity: str, acc_id: str = "", acc_type: str = "AliWorkbench", gemini_prompt: str = "", aspect_ratio: str = "1:1", perspective: str = "no", proc_type: str = "", subject: str = "", quality: str = "", ) -> bool: """ 【图片识别AI专用接口】分析完成后调用此方法,触发客服AI报价 Args: customer_id: 客户ID image_url: 图片URL(原图) complexity: 复杂度评估结果,枚举值: "simple" → 10-20元 "normal" → 20-30元 "complex" → 30元 "hard" → 40元 acc_id: 店铺账号ID acc_type: 平台类型 Returns: True = 成功触发报价,False = 客户不存在 """ price_map = { "simple": "10-15元,这张比较简单", "normal": "15-20元", "complex": "20-25元", "hard": "25-30元", } price_hint = price_map.get(complexity, "20元") # 把所有分析字段存入任务 requirements = f"complexity:{complexity}" if gemini_prompt: requirements += f"|prompt:{gemini_prompt}" if aspect_ratio: requirements += f"|ratio:{aspect_ratio}" if perspective and perspective != "no": requirements += f"|perspective:{perspective}" if proc_type: requirements += f"|proc_type:{proc_type}" if subject: requirements += f"|subject:{subject}" if quality: requirements += f"|quality:{quality}" task_id = self.create_image_task( customer_id=customer_id, customer_name=customer_id, original_image=image_url, operation="enhance", requirements=requirements, ) print(f"[Workflow] 图片识别完成 | 客户:{customer_id} | 复杂度:{complexity} | 建议报价:{price_hint}") # 通知客服AI报价(把识别结果注入消息,让AI根据结果报价) if self._send_message: # 这里不直接发价格,而是触发 agent 重新处理一条带识别结果的内部消息 # 实际报价由客服AI根据 complexity 生成,保持口吻一致 self._pending_analysis[customer_id] = { "task_id": task_id, "complexity": complexity, "price_hint": price_hint, "image_url": image_url, } return True def get_pending_analysis(self, customer_id: str) -> dict: """ 客服AI处理消息时调用,检查该客户是否有待报价的识别结果 取出后自动清除(一次性) """ return self._pending_analysis.pop(customer_id, None) # ========== 付款后触发 Gemini 作图 ========== async def trigger_processing_on_payment( self, customer_id: str, acc_id: str = "", acc_type: str = "AliWorkbench" ) -> bool: try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: await _wechat_notify( f"ℹ️ **付款触发但已暂停自动作图**\n客户:{customer_id}\n店铺:{acc_id}\n请人工安排处理" ) return False except Exception: return False """ 客户付款后调用此方法,找到该客户待处理的任务并启动 Gemini 作图。 由 pydantic_ai_agent 在识别到"已付款"订单通知时调用。 也可作为 tool 由 AI 主动触发。 Returns: True=已启动处理, False=无待处理任务 """ task = self.get_customer_active_task(customer_id) if not task: # 内存任务丢失(重启场景)→ 从客户档案重建 print(f"[Workflow] 付款触发:内存无任务,尝试从客户档案重建 | 客户: {customer_id}") task = await self._rebuild_task_from_profile(customer_id, acc_id, acc_type) if not task: print(f"[Workflow] 付款触发:客户 {customer_id} 无图片记录,无法重建任务,跳过") await _wechat_notify( f"⚠️ **付款但无图片**\n" f"客户:{customer_id}\n" f"店铺:{acc_id}\n" f"已付款但找不到待处理图片,请人工发图处理" ) return False if task.status not in (TaskStatus.PENDING,): print(f"[Workflow] 付款触发:任务 {task.task_id[:8]}... 状态={task.status.value},跳过") return False task.operation = task.operation or "enhance" print(f"[Workflow] 付款确认,启动 Gemini 处理 | 客户: {customer_id} | 任务: {task.task_id[:8]}...") asyncio.create_task(self._auto_process(task.task_id, acc_id=acc_id, acc_type=acc_type)) return True async def _rebuild_task_from_profile( self, customer_id: str, acc_id: str, acc_type: str ) -> Optional["ImageTask"]: """ 重启后任务丢失时,从客户档案里读取 last_image_url 重建一个 PENDING 任务。 """ try: from db.customer_db import db profile = db.get_customer(customer_id) image_url = profile.last_image_url if not image_url: return None complexity = profile.complexity_history[-1] if profile.complexity_history else "" gemini_prompt = getattr(profile, "last_gemini_prompt", "") aspect_ratio = getattr(profile, "last_aspect_ratio", "1:1") perspective = getattr(profile, "last_perspective", "no") requirements = f"complexity:{complexity}" if complexity else "" if gemini_prompt: requirements += f"|prompt:{gemini_prompt}" if aspect_ratio: requirements += f"|ratio:{aspect_ratio}" if perspective and perspective != "no": requirements += f"|perspective:{perspective}" task_id = str(uuid.uuid4()) task = ImageTask( task_id=task_id, customer_id=customer_id, customer_name=profile.name or customer_id, original_image=image_url, operation="enhance", requirements=requirements, status=TaskStatus.PENDING, ) self.tasks[task_id] = task self.customer_active_task[customer_id] = task_id print(f"[Workflow] 任务已重建 | 客户: {customer_id} | 图片: {image_url[:60]}...") return task except Exception as e: print(f"[Workflow] 任务重建失败: {e}") return None @staticmethod def _parse_requirements(requirements: str) -> dict: """从 requirements 字符串解析各字段,格式: complexity:xxx|prompt:xxx|ratio:xxx""" parsed = {} for part in (requirements or "").split("|"): part = part.strip() if ":" in part: k, v = part.split(":", 1) parsed[k.strip()] = v.strip() return parsed async def _auto_process(self, task_id: str, acc_id: str = "", acc_type: str = "AliWorkbench"): """付款确认后自动调用 Gemini 处理图片,完成后通知客户""" try: from config.config import IMAGE_MODULE_ENABLED if not IMAGE_MODULE_ENABLED: return except Exception: return task = self.tasks.get(task_id) if not task: return task.update_status(TaskStatus.PROCESSING) req = self._parse_requirements(task.requirements) gemini_prompt = req.get("prompt", "") aspect_ratio = req.get("ratio", "1:1") perspective = req.get("perspective", "no") proc_type = req.get("proc_type", "") subject = req.get("subject", "") quality = req.get("quality", "") revision_note = req.get("revision", "") # 客户修改意见追加到 prompt 末尾 if revision_note: gemini_prompt = (gemini_prompt or "") + f"\n【客户修改要求】{revision_note}" print(f"[Workflow] Gemini 开始处理 | 任务: {task_id[:8]}... | 比例: {aspect_ratio} | 透视: {perspective} | 图片: {task.original_image}") try: from image.image_processor import image_processor from utils.image_queue import run_with_queue result = await run_with_queue(image_processor.process_image( task.original_image, task.operation, requirements=task.requirements, gemini_prompt=gemini_prompt, aspect_ratio=aspect_ratio, perspective=perspective, proc_type=proc_type, subject=subject, quality=quality, )) if result["success"]: attempts = result.get("attempts", 1) qa_score = result.get("qa_score", 0) qa_pass = result.get("qa_pass", True) qa_issue = result.get("qa_issue", "") print(f"[Workflow] Gemini 处理完成 | 任务: {task_id[:8]}... | 质检: {qa_score}分 | 尝试: {attempts}次") # 质检未通过(已达重试上限,保留结果但人工跟进) if not qa_pass: await _wechat_notify( f"⚠️ **图片质检未通过,请人工核查**\n" f"客户:{task.customer_id}\n" f"店铺:{acc_id}\n" f"质检得分:{qa_score}/100\n" f"问题:{qa_issue}\n" f"已处理 {attempts} 次,结果已发出,请人工确认质量" ) await self.image_ai_submit_result( task_id=task_id, result_url=result["result_path"], acc_id=acc_id, acc_type=acc_type, ) else: err_msg = result['message'] print(f"[Workflow] Gemini 处理失败: {err_msg}") task.update_status(TaskStatus.FAILED) # 企业微信预警 await _wechat_notify( f"⚠️ **Gemini作图失败**\n" f"客户:{task.customer_id}\n" f"店铺:{acc_id}\n" f"原因:{err_msg[:200]}\n" f"请人工跟进" ) # 通知客户稍等,并告知转人工 if self._send_message: await self._send_message( customer_id=task.customer_id, acc_id=acc_id, acc_type=acc_type, content="您好,图片处理遇到点问题,已帮您转接人工客服处理,请稍候", msg_type=0, ) except Exception as e: print(f"[Workflow] 自动处理异常: {e}") task.update_status(TaskStatus.FAILED) await _wechat_notify( f"⚠️ **Workflow处理异常**\n" f"客户:{task.customer_id}\n" f"错误:{str(e)[:200]}" ) # ========== 图片AI接入点(作图用)========== async def image_ai_submit_result( self, task_id: str, result_url: str, acc_id: str = "", acc_type: str = "AliWorkbench" ) -> bool: """ 【图片AI专用接口】处理完成后调用此方法 Args: task_id: create_image_task 返回的任务ID result_url: 处理后的图片URL或本地路径 acc_id: 店铺账号ID(发消息用) acc_type: 平台类型 Returns: True = 成功,False = 任务不存在 """ task = self.tasks.get(task_id) if not task: print(f"[Workflow] 任务不存在: {task_id}") return False task.result_url = result_url task.update_status(TaskStatus.AWAITING_CONFIRM) print(f"[Workflow] 任务 {task_id} 处理完成,发送给客户确认") # 先发结果图片 if self._send_message: await self._send_message( customer_id=task.customer_id, acc_id=acc_id, acc_type=acc_type, content=result_url, msg_type=1 # 图片 ) # 让客服 AI 生成完成通知话术(自然口吻,询问邮箱) if self._agent_notify: await self._agent_notify( customer_id=task.customer_id, acc_id=acc_id, acc_type=acc_type, system_hint="【图片已处理完成并发给客户】请用自然口吻告诉客户图发好了,让他看一下效果,没问题把邮箱发过来,你来发给他。不超过1句话。", ) elif self._send_message: # 兜底:AI 不可用时用固定话术 await self._send_message( customer_id=task.customer_id, acc_id=acc_id, acc_type=acc_type, content="好了,你看一下效果,没问题把邮箱发我", msg_type=0, ) return True # ========== 客户回复处理 ========== async def handle_customer_reply( self, customer_id: str, message: str, acc_id: str = "", acc_type: str = "AliWorkbench" ) -> Optional[str]: """ 处理正在等待确认的客户回复 Returns: 需要回复客户的文本,None 表示不是确认相关消息 """ task = self.get_customer_active_task(customer_id) if not task or task.status != TaskStatus.AWAITING_CONFIRM: return None msg = message.strip() # 提取邮箱 import re email_match = re.search(r'[\w\.-]+@[\w\.-]+\.\w+', msg) if email_match: email = email_match.group() task.email = email db.update_email(customer_id, email) # 发送邮件(调用 email_sender) result = await self._send_email(task) if result: task.update_status(TaskStatus.COMPLETED) db.update_email_status(task.customer_id, "sent") db.complete_order(task.customer_id, had_revision=task.revision_count > 0) db.auto_compute_tags(task.customer_id) return "发到您邮箱了,注意查收哈" else: db.update_email_status(task.customer_id, "failed") return "邮件发送失败了,您再发一次邮箱试试" # 客户说不满意/要改 negative_keywords = ["不好", "不对", "不满意", "重做", "改一下", "差太多", "不行", "效果不好", "颜色不对"] if any(kw in msg for kw in negative_keywords): task.revision_count += 1 task.update_status(TaskStatus.REVISION) db.record_revision(task.customer_id) # 把客户的修改意见追加进 requirements,下次重做时 Gemini 能看到 if msg: task.requirements += f"|revision:{msg[:100]}" return "好,你说一下哪里要改,或者发图告诉我" # 客户提供了修改说明(处于 REVISION 状态时) if task.status == TaskStatus.REVISION and msg: task.requirements += f"|revision:{msg[:100]}" task.update_status(TaskStatus.PENDING) # 重新触发处理 asyncio.create_task( self._auto_process(task.task_id, acc_id=acc_id, acc_type=acc_type) ) return "好的,重新给你做" return None async def _send_email(self, task: ImageTask) -> bool: """发送完成作品邮件""" try: from mail.email_sender import email_sender profile = db.get_customer(task.customer_id) result = email_sender.send_completed_work( to_email=task.email, customer_name=profile.name or task.customer_name, image_description=task.requirements or task.operation, result_images=[task.result_url] ) return result.get("success", False) except Exception as e: print(f"[Workflow] 邮件发送失败: {e}") await _wechat_notify( f"⚠️ **邮件发送失败**\n" f"客户:{task.customer_id}\n" f"邮箱:{task.email}\n" f"错误:{str(e)[:200]}" ) return False # ========== 工具方法 ========== def detect_operation(self, message: str) -> str: """根据客户描述识别处理操作""" msg = message.lower() if any(kw in msg for kw in ["模糊", "清晰", "高清", "变清"]): return "enhance" elif any(kw in msg for kw in ["背景", "去背", "抠图", "透明"]): return "remove_bg" elif any(kw in msg for kw in ["尺寸", "大小", "缩放", "分辨率"]): return "resize" elif any(kw in msg for kw in ["老照片", "修复", "发黄", "破损"]): return "fix_old_photo" elif any(kw in msg for kw in ["分层", "psd"]): return "layered" else: return "enhance" def get_task_summary(self) -> str: """获取当前所有任务摘要(调试用)""" if not self.tasks: return "暂无任务" lines = [] for tid, task in self.tasks.items(): lines.append( f" [{task.status.value}] {task.customer_name} | {task.operation} | {tid[:8]}..." ) return "\n".join(lines) # ========== 客户需求变更 ========== async def add_customer_requirement(self, task_id: str, customer_id: str, requirement: str, changed_by: str = 'customer') -> bool: # 检查任务是否存在 task = self.get_task(task_id) if not task: # 尝试从数据库加载 db_task = self.db.get_task(task_id) if db_task: print(f"[Workflow] 从数据库加载任务:{task_id[:8]}...") # 可以在这里重建内存任务 else: print(f"[Workflow] 任务不存在:{task_id}") return False # 添加到数据库 success = self.db.add_customer_note(task_id, requirement, changed_by) if success: print(f"[Workflow] 客户添加需求:{task_id[:8]}... | {requirement}") # 如果任务还在待处理状态,通知 AI 客服 if task and task.status.value == 'pending': if self._send_message: await self._send_message( customer_id=customer_id, acc_id=task.acc_id, acc_type=task.acc_type, content=f"好的,已记录您的需求:{requirement},处理时会注意的", msg_type=0, ) return success async def modify_operation(self, task_id: str, customer_id: str, new_operation: str, changed_by: str = 'customer') -> bool: """ 客户修改操作类型 Args: task_id: 任务 ID customer_id: 客户 ID new_operation: 新操作(enhance/remove_bg/vectorize 等) changed_by: 修改者 Returns: bool: 是否成功 """ task = self.get_task(task_id) if not task: db_task = self.db.get_task(task_id) if not db_task: print(f"[Workflow] 任务不存在:{task_id}") return False # 检查状态,已处理完成的不允许修改 if task and task.status.value in ['completed', 'processing']: print(f"[Workflow] 任务已开始处理,不允许修改操作:{task_id}") if self._send_message: await self._send_message( customer_id=customer_id, acc_id=task.acc_id, acc_type=task.acc_type, content="抱歉,图片已经开始处理了,无法修改操作类型", msg_type=0, ) return False # 修改数据库 success = self.db.modify_operation(task_id, new_operation, changed_by) if success and task: task.operation = new_operation print(f"[Workflow] 修改操作类型:{task_id[:8]}... -> {new_operation}") if self._send_message: await self._send_message( customer_id=customer_id, acc_id=task.acc_id, acc_type=task.acc_type, content=f"好的,已为您修改为{new_operation}操作", msg_type=0, ) return success def get_task_requirement_history(self, task_id: str) -> List[dict]: """获取任务需求变更历史""" return self.db.get_requirement_history(task_id) # ========== 三种工作流 ========== async def find_image_workflow(self, customer_id: str, image_url: str, acc_id: str = "", acc_type: str = "AliWorkbench") -> bool: """ 工作流 1:查找图片 客户说"找一下这个图" → 自己处理 → 上传到图绘 → 返回 URL Args: customer_id: 客户 ID image_url: 图片 URL acc_id: 店铺 ID acc_type: 平台类型 Returns: bool: 是否成功 """ try: print(f"[Workflow] 启动查找图片工作流 | 客户:{customer_id}") # 1. 创建任务 task_id = self.create_image_task( customer_id=customer_id, customer_name=customer_id, original_image=image_url, operation="find", # 查找操作 requirements="type:find", acc_id=acc_id, acc_type=acc_type ) # 2. 这里调用图绘 API 上传图片 # TODO: 调用图绘上传 API # tuhui_url = await self._upload_to_tuhui(image_url) # 临时模拟 tuhui_url = f"http://tuhui.cloud/works/123" # 3. 更新任务结果 self.db.update_result(task_id, tuhui_url) self.db.update_status(task_id, DBTaskStatus.COMPLETED) # 4. 回复客户 if self._send_message: await self._send_message( customer_id=customer_id, acc_id=acc_id, acc_type=acc_type, content=f"找到了!图片在这里:{tuhui_url}", msg_type=0, ) print(f"[Workflow] 查找图片完成 | 客户:{customer_id} | URL: {tuhui_url}") return True except Exception as e: logger.error(f"查找图片工作流失败:{e}") return False async def process_image_workflow(self, customer_id: str, image_url: str, acc_id: str = "", acc_type: str = "AliWorkbench") -> bool: """ 工作流 2:处理图片 客户说"做一下" → 评估图片 → 稍等做 Args: customer_id: 客户 ID image_url: 图片 URL acc_id: 店铺 ID acc_type: 平台类型 Returns: bool: 是否成功 """ try: print(f"[Workflow] 启动处理图片工作流 | 客户:{customer_id}") # 1. 创建任务 task_id = self.create_image_task( customer_id=customer_id, customer_name=customer_id, original_image=image_url, operation="enhance", requirements="type:process", acc_id=acc_id, acc_type=acc_type ) # 2. 回复客户稍等 if self._send_message: await self._send_message( customer_id=customer_id, acc_id=acc_id, acc_type=acc_type, content="稍等,我看看...好的,可以做,马上处理", msg_type=0, ) # 3. 启动处理 await self.trigger_processing_on_payment(customer_id, acc_id, acc_type) print(f"[Workflow] 处理图片已启动 | 客户:{customer_id}") return True except Exception as e: logger.error(f"处理图片工作流失败:{e}") return False async def transfer_to_designer_workflow(self, customer_id: str, image_url: str, acc_id: str = "", acc_type: str = "AliWorkbench", reason: str = "做不了") -> bool: """ 工作流 3:转人工派单 做不了 → 查询企业微信在线设计师 → 派单 Args: customer_id: 客户 ID image_url: 图片 URL acc_id: 店铺 ID acc_type: 平台类型 reason: 转接原因 Returns: bool: 是否成功 """ try: print(f"[Workflow] 启动转人工派单工作流 | 客户:{customer_id} | 原因:{reason}") # 1. 创建任务 task_id = self.create_image_task( customer_id=customer_id, customer_name=customer_id, original_image=image_url, operation="manual", requirements=f"type:transfer|reason:{reason}", acc_id=acc_id, acc_type=acc_type ) # 2. 查询企业微信在线设计师 online_designers = await self._get_online_designers() if not online_designers: # 无人在线,通知客户 if self._send_message: await self._send_message( customer_id=customer_id, acc_id=acc_id, acc_type=acc_type, content="抱歉,现在设计师都不在线,稍后会有人联系您", msg_type=0, ) # 企业微信预警 await _wechat_notify( f"⚠️ **人工派单但无人在线**\n" f"客户:{customer_id}\n" f"店铺:{acc_id}\n" f"原因:{reason}\n" f"请安排设计师上线" ) print(f"[Workflow] 无人在线 | 客户:{customer_id}") return False # 3. 派单给在线设计师 designer_name = online_designers[0] # 取第一个在线的 success = await self._dispatch_to_designer(task_id, designer_name, customer_id, image_url, reason) if not success: logger.error("派单失败") return False # 4. 回复客户 if self._send_message: await self._send_message( customer_id=customer_id, acc_id=acc_id, acc_type=acc_type, content="好的,已帮您安排设计师处理,请稍候", msg_type=0, ) print(f"[Workflow] 已派单给设计师:{designer} | 客户:{customer_id}") return True except Exception as e: logger.error(f"转人工派单工作流失败:{e}") return False async def _get_online_designers(self) -> list: """ 查询在线设计师(使用图绘派单 API) Returns: list: 在线设计师名单 ["橘子", "婷婷", ...] """ try: designers = await self.dispatch_client.get_online_designers() print(f"[Workflow] 查询在线设计师:{len(designers)}人在线 | {designers}") return designers except Exception as e: logger.error(f"查询在线设计师失败:{e}") return [] async def _dispatch_to_designer(self, task_id: str, designer_name: str, customer_id: str, image_url: str, reason: str) -> bool: """ 派单给设计师(使用图绘派单 API) Args: task_id: 任务 ID designer_name: 设计师姓名 customer_id: 客户 ID image_url: 图片 URL reason: 转接原因 Returns: bool: 是否成功 """ try: # 1. 在派单系统创建任务 dispatch_task_id = await self.dispatch_client.create_task( task_name=f"图片处理-{customer_id[-4:]}", description=f"{reason}\n客户:{customer_id}\n图片:{image_url}", task_type="image_process", priority=2, deadline=None ) if not dispatch_task_id: logger.error("创建派单任务失败") return False # 2. 分配给设计师 success = await self.dispatch_client.assign_task( task_id=dispatch_task_id, designer_name=designer_name, notes=f"AI 客服自动派单\n原因:{reason}\n客户:{customer_id}" ) if success: print(f"[Workflow] 派单成功:{dispatch_task_id} → {designer_name} | 客户:{customer_id}") # 企业微信通知 await _wechat_notify( f"📋 **新任务派单**\n" f"设计师:{designer_name}\n" f"任务 ID: {dispatch_task_id}\n" f"客户:{customer_id}\n" f"原因:{reason}\n" f"请及时处理" ) return True else: logger.error("分配任务失败") return False except Exception as e: logger.error(f"派单失败:{e}") return False # ========== 全局实例 ========== workflow = CustomerServiceWorkflow()