Files
tw/mail/email_sender.py
ZuoWei a6c42d505a feat: 完整功能部署 v1.0
新增功能:
- 天网协作系统 (HTTP API 端口 6060)
- 三种工作流 (查找图片/处理图片/转人工派单)
- 图片任务数据库 (支持客户后续增加需求)
- 图绘派单系统集成 (API: 8005)
- 文字检测与加价 (60-80 元高价值订单)
- 风险评估与接单判断
- 作图失败自动转人工

新增文档:
- 项目功能汇总.md
- 三种工作流功能说明.md
- 文字加价功能说明.md
- 风险评估功能说明.md
- 图片任务数据库功能说明.md
- 图绘派单系统集成说明.md
- 作图失败转接人工说明.md
- DEPLOYMENT.md
- TIANWANG_INTEGRATION.md

核心修改:
- core/pydantic_ai_agent.py
- core/workflow.py
- core/websocket_client.py
- image/image_analyzer.py
- services/service_tuhui_dispatch.py
- db/image_tasks_db.py

版本:v1.0
日期:2026-02-28
2026-02-28 11:20:40 +08:00

113 lines
3.6 KiB
Python
Executable File

"""邮件发送模块"""
import os
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.header import Header
from typing import Optional, List
from dotenv import load_dotenv
load_dotenv()
class EmailSender:
"""邮件发送"""
def __init__(self):
self.smtp_host = os.getenv("SMTP_HOST", "")
self.smtp_port = int(os.getenv("SMTP_PORT", "587"))
self.smtp_user = os.getenv("SMTP_USER", "")
self.smtp_password = os.getenv("SMTP_PASSWORD", "")
self.sender_name = os.getenv("SENDER_NAME", "修图客服")
def send(
self,
to_email: str,
subject: str,
body: str,
images: Optional[List[str]] = None
) -> dict:
"""
发送邮件
Args:
to_email: 收件人邮箱
subject: 邮件主题
body: 邮件正文
images: 图片路径列表
Returns:
{"success": bool, "message": str}
"""
if not self.smtp_host or not self.smtp_user:
return {"success": False, "message": "未配置邮件SMTP"}
try:
# 创建邮件
msg = MIMEMultipart('related')
msg['From'] = f"{Header(self.sender_name, 'utf-8').encode()} <{self.smtp_user}>"
msg['To'] = to_email
msg['Subject'] = subject
# 添加正文
msg.attach(MIMEText(body, 'html', 'utf-8'))
# 添加图片
if images:
for idx, img_path in enumerate(images):
if os.path.exists(img_path):
with open(img_path, 'rb') as f:
img = MIMEImage(f.read())
img.add_header('Content-ID', f'<image{idx}>')
msg.attach(img)
# 发送邮件(失败时重试 1 次)
import time
last_err = None
for attempt in range(2):
try:
server = smtplib.SMTP(self.smtp_host, self.smtp_port)
server.starttls()
server.login(self.smtp_user, self.smtp_password)
server.sendmail(self.smtp_user, to_email, msg.as_string())
server.quit()
return {"success": True, "message": "发送成功"}
except Exception as e:
last_err = e
if attempt == 0:
time.sleep(2)
return {"success": False, "message": f"发送失败: {str(last_err)}"}
except Exception as e:
return {"success": False, "message": f"发送失败: {str(e)}"}
def send_completed_work(
self,
to_email: str,
customer_name: str,
image_description: str,
result_images: List[str]
) -> dict:
"""发送完成的作品"""
subject = f"您的修图作品已完成 - {image_description}"
body = f"""
<html>
<body>
<h2>您好 {customer_name},您的修图作品已完成!</h2>
<p>感谢您选择我们的服务。以下是您处理后的图片:</p>
<p><b>处理内容:</b> {image_description}</p>
<br>
<p>如有任何问题,请随时联系我们。</p>
<br>
<p>祝您生活愉快!</p>
</body>
</html>
"""
return self.send(to_email, subject, body, result_images)
# 全局实例
email_sender = EmailSender()