""" 邮件接收模块 - 监控收件箱,客户发图询价/下单自动处理 流程: 客户发邮件(含图片附件)→ 自动分析图片复杂度 → 回复报价 客户回复"拍了"/"确认" → 创建处理任务 → Gemini 作图 → 发结果 """ import asyncio import imaplib import email import email.header import os import tempfile import logging from datetime import datetime from email.header import decode_header from typing import Optional logger = logging.getLogger(__name__) # 支持的图片格式 IMAGE_EXTS = (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp") def _decode_str(value: str) -> str: """解码邮件头部字段(处理中文编码)""" if not value: return "" parts = decode_header(value) result = [] for part, charset in parts: if isinstance(part, bytes): try: result.append(part.decode(charset or "utf-8", errors="replace")) except Exception: result.append(part.decode("utf-8", errors="replace")) else: result.append(part) return "".join(result) class EmailReceiver: """IMAP 邮件接收器,轮询新邮件并自动处理图片询价""" def __init__( self, imap_host: str = "imap.qq.com", imap_port: int = 993, username: str = "", password: str = "", poll_interval: int = 30, ): self.imap_host = imap_host self.imap_port = imap_port self.username = username self.password = password self.poll_interval = poll_interval self._running = False self._send_reply = None # 注入的回复函数 def register_reply_callback(self, callback): """注入回复函数(直接用 email_sender 回复)""" self._send_reply = callback # ========== 主循环 ========== async def start(self): """启动轮询(作为后台任务运行)""" self._running = True logger.info(f"[EmailReceiver] 启动,每 {self.poll_interval}s 检查一次收件箱") while self._running: try: await self._check_inbox() except Exception as e: logger.error(f"[EmailReceiver] 轮询异常: {e}") await asyncio.sleep(self.poll_interval) def stop(self): self._running = False # ========== 收件箱检查 ========== async def _check_inbox(self): """连接 IMAP,检查未读邮件""" loop = asyncio.get_event_loop() await loop.run_in_executor(None, self._check_inbox_sync) def _check_inbox_sync(self): """同步版收件箱检查(在线程池里跑,避免阻塞事件循环)""" try: conn = imaplib.IMAP4_SSL(self.imap_host, self.imap_port) conn.login(self.username, self.password) conn.select("INBOX") # 搜索未读邮件 _, msg_ids = conn.search(None, "UNSEEN") ids = msg_ids[0].split() if not ids: conn.logout() return logger.info(f"[EmailReceiver] 发现 {len(ids)} 封未读邮件") for msg_id in ids: try: _, data = conn.fetch(msg_id, "(RFC822)") raw = data[0][1] msg = email.message_from_bytes(raw) self._process_email_sync(msg) # 标记为已读 conn.store(msg_id, "+FLAGS", "\\Seen") except Exception as e: logger.error(f"[EmailReceiver] 处理邮件 {msg_id} 失败: {e}") conn.logout() except Exception as e: logger.error(f"[EmailReceiver] IMAP 连接失败: {e}") # ========== 邮件处理 ========== def _process_email_sync(self, msg): """处理单封邮件:提取发件人、附件图片,触发分析和回复""" sender = _decode_str(msg.get("From", "")) subject = _decode_str(msg.get("Subject", "(无主题)")) # 提取发件人邮箱地址 sender_email = self._extract_email_addr(sender) if not sender_email: logger.warning(f"[EmailReceiver] 无法解析发件人地址: {sender}") return logger.info(f"[EmailReceiver] 处理邮件 | 来自: {sender_email} | 主题: {subject}") # 提取正文 body_text = self._extract_body(msg) # 提取图片附件 image_paths = self._extract_images(msg) # 异步触发处理(把同步上下文切回事件循环) loop = asyncio.new_event_loop() try: loop.run_until_complete( self._handle_email(sender_email, subject, body_text, image_paths) ) finally: loop.close() # 清理临时图片 for p in image_paths: try: os.remove(p) except Exception: pass async def _handle_email( self, sender_email: str, subject: str, body: str, image_paths: list, ): """根据邮件内容决定如何处理""" body_lower = (body or "").lower() # ① 有图片附件 → 分析图片,回复报价 if image_paths: await self._handle_image_inquiry(sender_email, subject, image_paths) return # ② 纯文字邮件 → 引导发图 await self._reply_email( to=sender_email, subject=f"Re: {subject}", body=self._html( "您好!收到您的邮件。

" "请将您需要处理的图片作为附件发送过来,我们会尽快为您报价。

" "支持格式:JPG、PNG、WEBP 等常见图片格式。" ), ) async def _handle_image_inquiry( self, sender_email: str, subject: str, image_paths: list ): """分析图片,回复报价""" from image.image_analyzer import image_analyzer quotes = [] for idx, img_path in enumerate(image_paths, 1): try: # image_analyzer 支持本地路径 result = await image_analyzer.analyze(img_path) price = result.get("price_suggest", 30) reason = result.get("reason", "") label = { "simple": "画面简洁", "normal": "一般复杂度", "complex": "细节较多", "hard": "非常复杂", }.get(result.get("complexity", ""), "") quotes.append( f"图片{idx}:{label},建议报价 {price} 元" + (f"({reason})" if reason else "") ) except Exception as e: logger.error(f"[EmailReceiver] 图片分析失败: {e}") quotes.append(f"图片{idx}:分析失败,建议报价 30 元") # 多图打包优惠 n = len(image_paths) if n >= 5: tip = f"

📦 您共发来 {n} 张图片,支持打包优惠,欢迎咨询。" elif n >= 3: tip = f"

📦 您共发来 {n} 张图片,3张以上可享9折优惠。" else: tip = "" quote_html = "
".join(quotes) body = self._html( f"您好!感谢您发来图片,已为您完成分析:

" f"{quote_html}{tip}

" f"如需处理,请直接在淘宝店铺下单,付款后我们会尽快为您完成制作并发回。
" f"如有疑问欢迎回复此邮件。" ) await self._reply_email( to=sender_email, subject=f"Re: {subject}" if subject else "您的图片报价", body=body, ) logger.info(f"[EmailReceiver] 已向 {sender_email} 回复报价") # ========== 工具方法 ========== async def _reply_email(self, to: str, subject: str, body: str): """发送回复邮件""" try: from mail.email_sender import email_sender result = email_sender.send(to_email=to, subject=subject, body=body) if not result.get("success"): logger.error(f"[EmailReceiver] 回复发送失败: {result.get('message')}") except Exception as e: logger.error(f"[EmailReceiver] 回复异常: {e}") def _extract_email_addr(self, from_field: str) -> Optional[str]: """从 From 字段提取邮箱地址""" import re m = re.search(r'[\w\.\+\-]+@[\w\.\-]+\.\w+', from_field) return m.group(0) if m else None def _extract_body(self, msg) -> str: """提取邮件纯文本正文""" body = "" if msg.is_multipart(): for part in msg.walk(): ct = part.get_content_type() if ct == "text/plain": charset = part.get_content_charset() or "utf-8" try: body += part.get_payload(decode=True).decode(charset, errors="replace") except Exception: pass else: charset = msg.get_content_charset() or "utf-8" try: body = msg.get_payload(decode=True).decode(charset, errors="replace") except Exception: pass return body.strip() def _extract_images(self, msg) -> list: """提取邮件中的图片附件,保存到临时文件,返回路径列表""" paths = [] for part in msg.walk(): content_disposition = part.get("Content-Disposition", "") content_type = part.get_content_type() is_attachment = "attachment" in content_disposition is_image_type = content_type.startswith("image/") filename = part.get_filename() if filename: filename = _decode_str(filename) # 判断是否是图片 if not (is_image_type or (filename and any( filename.lower().endswith(ext) for ext in IMAGE_EXTS ))): continue try: data = part.get_payload(decode=True) if not data: continue suffix = ".jpg" if filename: ext = os.path.splitext(filename)[1].lower() if ext in IMAGE_EXTS: suffix = ext fd, tmp_path = tempfile.mkstemp(suffix=suffix, prefix="email_img_") with os.fdopen(fd, "wb") as f: f.write(data) paths.append(tmp_path) logger.info(f"[EmailReceiver] 提取图片附件: {filename} → {tmp_path}") except Exception as e: logger.error(f"[EmailReceiver] 提取附件失败: {e}") return paths @staticmethod def _html(content: str) -> str: return f""" {content}


修图客服 · 自动回复

""" # ========== 全局实例(从 .env 读取配置)========== from dotenv import load_dotenv load_dotenv() email_receiver = EmailReceiver( imap_host="imap.qq.com", imap_port=993, username=os.getenv("SMTP_USER", ""), password=os.getenv("SMTP_PASSWORD", ""), poll_interval=int(os.getenv("EMAIL_POLL_INTERVAL", "30")), )