Files
tw/api/http_server.py

334 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
HTTP API 服务器
提供天网任务接收接口
"""
from flask import Flask, request, jsonify
import logging
from datetime import datetime
from db.task_db.task_model import get_task_manager, TaskStatus
from core.task_scheduler import get_task_scheduler
import asyncio
import threading
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['JSON_AS_ASCII'] = False # 支持中文
task_manager = None
task_scheduler = None
def get_async_loop():
"""获取异步事件循环"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
def run_async(coro):
"""运行异步协程"""
loop = get_async_loop()
return loop.run_until_complete(coro)
@app.route('/api/task/receive', methods=['POST'])
def receive_task():
"""
接收天网下发的任务
Request Body:
{
"task_id": "TASK_20260226_001",
"type": "send_file_after_reply",
"customer": {
"name": "小明",
"id": "customer_123"
},
"trigger": {
"type": "customer_reply",
"keyword": "好的"
},
"action": {
"type": "send_file",
"file_url": "https://xxx.com/file.zip",
"message": "这是您要的文件"
},
"priority": "normal",
"timeout_hours": 24,
"created_by": "设计师 lz"
}
Response:
{
"code": 200,
"message": "任务接收成功",
"data": {
"task_id": "TASK_20260226_001",
"status": "pending"
}
}
"""
try:
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求体不能为空'
}), 400
# 验证必填字段
required_fields = ['task_id', 'type', 'customer', 'trigger', 'action']
for field in required_fields:
if field not in data:
return jsonify({
'code': 400,
'message': f'缺少必填字段:{field}'
}), 400
# 添加时间戳
data['created_at'] = datetime.now().isoformat()
data['status'] = 'pending'
# 保存到数据库
success = task_manager.add_task(data)
if success:
logger.info(f"任务接收成功:{data['task_id']}")
return jsonify({
'code': 200,
'message': '任务接收成功',
'data': {
'task_id': data['task_id'],
'status': 'pending'
}
})
else:
logger.error(f"任务保存失败:{data['task_id']}")
return jsonify({
'code': 500,
'message': '任务保存失败'
}), 500
except Exception as e:
logger.error(f"接收任务异常:{e}")
return jsonify({
'code': 500,
'message': f'服务器错误:{str(e)}'
}), 500
@app.route('/api/task/cancel', methods=['POST'])
def cancel_task():
"""
取消任务
Request Body:
{
"task_id": "TASK_20260226_001",
"reason": "客户已退款"
}
"""
try:
data = request.get_json()
task_id = data.get('task_id')
reason = data.get('reason')
if not task_id:
return jsonify({
'code': 400,
'message': '缺少 task_id'
}), 400
task_manager.cancel_task(task_id, reason)
logger.info(f"任务已取消:{task_id}")
return jsonify({
'code': 200,
'message': '任务已取消',
'data': {
'task_id': task_id,
'status': 'cancelled'
}
})
except Exception as e:
logger.error(f"取消任务异常:{e}")
return jsonify({
'code': 500,
'message': f'服务器错误:{str(e)}'
}), 500
@app.route('/api/task/status/<task_id>', methods=['GET'])
def get_task_status(task_id):
"""
查询任务状态
Response:
{
"code": 200,
"data": {
"task_id": "TASK_20260226_001",
"status": "pending",
"created_at": "2026-02-26 16:30:00",
"triggered_at": null,
"completed_at": null
}
}
"""
try:
task = task_manager.get_task(task_id)
if not task:
return jsonify({
'code': 404,
'message': '任务不存在'
}), 404
return jsonify({
'code': 200,
'data': {
'task_id': task['task_id'],
'type': task['type'],
'status': task['status'],
'priority': task['priority'],
'retry_count': task['retry_count'],
'created_at': task['created_at'],
'created_by': task['created_by'],
'triggered_at': task.get('triggered_at'),
'completed_at': task.get('completed_at'),
'error_message': task.get('error_message')
}
})
except Exception as e:
logger.error(f"查询任务状态异常:{e}")
return jsonify({
'code': 500,
'message': f'服务器错误:{str(e)}'
}), 500
@app.route('/api/task/list', methods=['GET'])
def list_tasks():
"""
查询任务列表
Query Params:
- customer_id: 客户 ID可选
- status: 任务状态(可选)
- page: 页码(默认 1
- page_size: 每页数量(默认 20
"""
try:
customer_id = request.args.get('customer_id')
status = request.args.get('status')
page = int(request.args.get('page', 1))
page_size = int(request.args.get('page_size', 20))
if status:
if status == 'pending':
tasks = task_manager.get_pending_tasks(customer_id)
else:
# TODO: 实现其他状态查询
tasks = []
else:
tasks = task_manager.get_pending_tasks(customer_id)
# 分页
total = len(tasks)
start = (page - 1) * page_size
end = start + page_size
tasks_page = tasks[start:end]
return jsonify({
'code': 200,
'data': {
'total': total,
'page': page,
'page_size': page_size,
'tasks': tasks_page
}
})
except Exception as e:
logger.error(f"查询任务列表异常:{e}")
return jsonify({
'code': 500,
'message': f'服务器错误:{str(e)}'
}), 500
@app.route('/api/health', methods=['GET'])
def health_check():
"""健康检查"""
return jsonify({
'code': 200,
'message': 'OK',
'data': {
'timestamp': datetime.now().isoformat(),
'service': 'ai-cs-tianwang-bridge'
}
})
@app.route('/api/metrics', methods=['GET'])
def metrics_dashboard():
"""
运行与业务看板
Query:
- hours: 统计窗口小时数默认24
"""
try:
hours = int(request.args.get('hours', 24))
if hours <= 0:
hours = 24
from utils.metrics_tracker import get_dashboard
data = get_dashboard(hours=hours)
return jsonify({
'code': 200,
'message': 'OK',
'data': data,
})
except Exception as e:
logger.error(f"指标看板异常:{e}")
return jsonify({
'code': 500,
'message': f'服务器错误:{str(e)}'
}), 500
def start_http_server(host='127.0.0.1', port=6060, debug=False):
"""启动 HTTP 服务器"""
global task_manager, task_scheduler
task_manager = get_task_manager()
logger.info(f"HTTP API 服务器启动http://{host}:{port}")
# 在新线程中运行 Flask
thread = threading.Thread(
target=app.run,
kwargs={
'host': host,
'port': port,
'debug': debug,
'use_reloader': False
},
daemon=True
)
thread.start()
return thread
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s'
)
start_http_server(host='127.0.0.1', port=6060, debug=True)