Files
tw2/qingjian_cs/services/service_tuhui_upload.py
jimi 01c32be6ea
Some checks failed
Pre-commit / run (ubuntu-latest) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_en (ubuntu-latest, 3.10) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_zh (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.12) (push) Has been cancelled
feat: low-latency debounce, context logs, and stable draw/upload config
2026-03-03 13:38:18 +08:00

107 lines
3.5 KiB
Python

from __future__ import annotations
import os
from typing import Optional, Tuple
from urllib.parse import urljoin
import requests
TUHUI_BASE_URL = os.getenv("TUHUI_BASE_URL", "https://tuhui.cloud").strip()
TUHUI_PHONE = os.getenv("TUHUI_PHONE", "17520145271").strip()
TUHUI_PASSWORD = os.getenv("TUHUI_PASSWORD", "zuowei1216").strip()
TUHUI_DEFAULT_PRICE = int(os.getenv("TUHUI_DEFAULT_PRICE", "20"))
TUHUI_UPLOAD_ENDPOINT = os.getenv("TUHUI_UPLOAD_ENDPOINT", "/api/upload").strip()
TUHUI_UPLOAD_FILE_FIELD = os.getenv("TUHUI_UPLOAD_FILE_FIELD", "file").strip()
TUHUI_DEFAULT_CATEGORY = os.getenv("TUHUI_DEFAULT_CATEGORY", "高清修复").strip()
TUHUI_TIMEOUT_SECONDS = int(os.getenv("TUHUI_TIMEOUT_SECONDS", "30"))
def _login() -> Tuple[bool, str]:
try:
resp = requests.post(
f"{TUHUI_BASE_URL}/api/auth/login",
json={"phone": TUHUI_PHONE, "password": TUHUI_PASSWORD},
timeout=TUHUI_TIMEOUT_SECONDS,
)
if resp.status_code != 200:
return False, f"login_http_{resp.status_code}:{resp.text[:120]}"
data = resp.json() if resp.text else {}
token = str(data.get("access_token", "") or "")
if not token:
return False, "login_no_token"
return True, token
except Exception as e:
return False, f"login_error:{e}"
def _upload_sync(
image_path: str,
title: str,
description: str = "",
price: int = 20,
category: Optional[str] = None,
) -> Tuple[bool, str, int]:
if not os.path.exists(image_path):
return False, "file_not_found", 0
ok, token_or_err = _login()
if not ok:
return False, token_or_err, 0
token = token_or_err
use_price = int(price or TUHUI_DEFAULT_PRICE)
use_category = (category or TUHUI_DEFAULT_CATEGORY or "高清修复").strip()
headers = {"Authorization": f"Bearer {token}"}
data = {
"title": title,
"description": description,
"price": str(use_price),
"category": use_category,
}
try:
with open(image_path, "rb") as f:
files = {TUHUI_UPLOAD_FILE_FIELD: ("image.jpg", f, "image/jpeg")}
resp = requests.post(
f"{TUHUI_BASE_URL}{TUHUI_UPLOAD_ENDPOINT}",
files=files,
data=data,
headers=headers,
timeout=TUHUI_TIMEOUT_SECONDS,
)
if resp.status_code not in (200, 201):
return False, f"upload_http_{resp.status_code}:{resp.text[:120]}", 0
payload = resp.json() if resp.text else {}
work = payload.get("work", {}) if isinstance(payload.get("work"), dict) else {}
work_id = int(work.get("id") or payload.get("work_id") or payload.get("id") or 0)
image_url = (
str(work.get("original_image") or work.get("image_url") or payload.get("image_url") or "")
)
if image_url.startswith("/"):
image_url = urljoin(f"{TUHUI_BASE_URL}/", image_url.lstrip("/"))
if not image_url:
return False, "upload_no_image_url", 0
return True, image_url, work_id
except Exception as e:
return False, f"upload_error:{e}", 0
async def upload_to_tuhui(
image_path: str,
title: str,
description: str = "",
price: int = 20,
category: Optional[str] = None,
) -> Tuple[bool, str, int]:
import asyncio
return await asyncio.to_thread(
_upload_sync,
image_path,
title,
description,
price,
category,
)