Files
tw2/qingjian_cs/app/state_machine.py
jimi a842f1861f
Some checks failed
Pre-commit / run (ubuntu-latest) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_en (ubuntu-latest, 3.10) (push) Has been cancelled
Deploy Sphinx documentation to Pages / build_zh (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (macos-15, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (ubuntu-latest, 3.12) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.10) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.11) (push) Has been cancelled
Python Unittest Coverage / test (windows-latest, 3.12) (push) Has been cancelled
chore: initial import of standalone agentscope project
2026-03-02 18:21:40 +08:00

70 lines
2.2 KiB
Python

from __future__ import annotations
from typing import Any
AFTER_SALES_STAGES = {
"new",
"waiting_material",
"quoted",
"processing",
"waiting_feedback",
"done",
"refunding",
"transferred",
}
def migrate_state_schema(state: dict[str, Any] | None) -> dict[str, Any]:
src = dict(state or {})
# 兼容旧字段
if "after_sales_stage" not in src:
old = src.get("aftersales_stage") or src.get("status") or "new"
src["after_sales_stage"] = str(old)
if src.get("after_sales_stage") not in AFTER_SALES_STAGES:
src["after_sales_stage"] = "new"
if "quote_count" not in src:
src["quote_count"] = int(src.get("quotes", 0) or 0)
if "image_count" not in src:
src["image_count"] = int(src.get("images", 0) or 0)
if "last_intent" not in src:
src["last_intent"] = str(src.get("intent", "unknown") or "unknown")
if "version" not in src:
src["version"] = 2
return src
def evolve_after_sales_state(prev_state: dict[str, Any], *, route: str, action: str, intent: str, order_status: str, msg: str) -> dict[str, Any]:
s = migrate_state_schema(prev_state)
stage = s.get("after_sales_stage", "new")
text = (msg or "").lower()
if action == "transfer" or route == "risk":
stage = "transferred"
elif route == "quote" and action == "quote":
stage = "quoted"
s["quote_count"] = int(s.get("quote_count", 0)) + 1
elif route == "after_sales":
if any(k in text for k in ["退款", "退钱", "退货"]):
stage = "refunding"
elif any(k in text for k in ["做完", "完成", "好了", "发我"]):
stage = "waiting_feedback"
elif any(k in text for k in ["补图", "重发", "再发", "原图"]):
stage = "waiting_material"
elif order_status == "paid":
stage = "processing"
elif route == "pre_sales":
if intent == "image":
stage = "waiting_material"
# 终态收敛
if stage == "waiting_feedback" and any(k in text for k in ["没问题", "可以", "确认", "好评"]):
stage = "done"
s["after_sales_stage"] = stage
s["last_intent"] = intent
s["last_order_status"] = order_status or "unknown"
s["version"] = 2
return s