Some checks failed
Pre-commit / run (ubuntu-latest) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_en (ubuntu-latest, 3.10) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_zh (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.12) (push) Has been cancelled
61 lines
1.8 KiB
Python
61 lines
1.8 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
|
|
AFTER_SALES_STAGES = {
|
|
"new",
|
|
"waiting_material",
|
|
"quoted",
|
|
"processing",
|
|
"waiting_feedback",
|
|
"done",
|
|
"refunding",
|
|
"transferred",
|
|
}
|
|
|
|
|
|
def migrate_state_schema(state: dict[str, Any] | None) -> dict[str, Any]:
|
|
src = dict(state or {})
|
|
# 兼容旧字段
|
|
if "after_sales_stage" not in src:
|
|
old = src.get("aftersales_stage") or src.get("status") or "new"
|
|
src["after_sales_stage"] = str(old)
|
|
if src.get("after_sales_stage") not in AFTER_SALES_STAGES:
|
|
src["after_sales_stage"] = "new"
|
|
|
|
if "quote_count" not in src:
|
|
src["quote_count"] = int(src.get("quotes", 0) or 0)
|
|
if "image_count" not in src:
|
|
src["image_count"] = int(src.get("images", 0) or 0)
|
|
if "last_intent" not in src:
|
|
src["last_intent"] = str(src.get("intent", "unknown") or "unknown")
|
|
if "version" not in src:
|
|
src["version"] = 2
|
|
return src
|
|
|
|
|
|
def evolve_after_sales_state(prev_state: dict[str, Any], *, route: str, action: str, intent: str, order_status: str, msg: str) -> dict[str, Any]:
|
|
s = migrate_state_schema(prev_state)
|
|
stage = s.get("after_sales_stage", "new")
|
|
|
|
if action == "transfer" or route == "risk":
|
|
stage = "transferred"
|
|
elif route == "quote" and action == "quote":
|
|
stage = "quoted"
|
|
s["quote_count"] = int(s.get("quote_count", 0)) + 1
|
|
elif route == "after_sales":
|
|
if order_status == "paid":
|
|
stage = "processing"
|
|
elif stage == "new":
|
|
stage = "waiting_material"
|
|
elif route == "pre_sales":
|
|
if intent == "image":
|
|
stage = "waiting_material"
|
|
|
|
s["after_sales_stage"] = stage
|
|
s["last_intent"] = intent
|
|
s["last_order_status"] = order_status or "unknown"
|
|
s["version"] = 2
|
|
return s
|