# -*- coding: utf-8 -*- """ HTTP API 服务器 提供天网任务接收接口 """ from flask import Flask, request, jsonify import logging from datetime import datetime from db.task_db.task_model import get_task_manager, TaskStatus from core.task_scheduler import get_task_scheduler import asyncio import threading logger = logging.getLogger(__name__) app = Flask(__name__) app.config['JSON_AS_ASCII'] = False # 支持中文 task_manager = None task_scheduler = None def get_async_loop(): """获取异步事件循环""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop def run_async(coro): """运行异步协程""" loop = get_async_loop() return loop.run_until_complete(coro) @app.route('/api/task/receive', methods=['POST']) def receive_task(): """ 接收天网下发的任务 Request Body: { "task_id": "TASK_20260226_001", "type": "send_file_after_reply", "customer": { "name": "小明", "id": "customer_123" }, "trigger": { "type": "customer_reply", "keyword": "好的" }, "action": { "type": "send_file", "file_url": "https://xxx.com/file.zip", "message": "这是您要的文件" }, "priority": "normal", "timeout_hours": 24, "created_by": "设计师 lz" } Response: { "code": 200, "message": "任务接收成功", "data": { "task_id": "TASK_20260226_001", "status": "pending" } } """ try: data = request.get_json() if not data: return jsonify({ 'code': 400, 'message': '请求体不能为空' }), 400 # 验证必填字段 required_fields = ['task_id', 'type', 'customer', 'trigger', 'action'] for field in required_fields: if field not in data: return jsonify({ 'code': 400, 'message': f'缺少必填字段:{field}' }), 400 # 添加时间戳 data['created_at'] = datetime.now().isoformat() data['status'] = 'pending' # 保存到数据库 success = task_manager.add_task(data) if success: logger.info(f"任务接收成功:{data['task_id']}") return jsonify({ 'code': 200, 'message': '任务接收成功', 'data': { 'task_id': data['task_id'], 'status': 'pending' } }) else: logger.error(f"任务保存失败:{data['task_id']}") return jsonify({ 'code': 500, 'message': '任务保存失败' }), 500 except Exception as e: logger.error(f"接收任务异常:{e}") return jsonify({ 'code': 500, 'message': f'服务器错误:{str(e)}' }), 500 @app.route('/api/task/cancel', methods=['POST']) def cancel_task(): """ 取消任务 Request Body: { "task_id": "TASK_20260226_001", "reason": "客户已退款" } """ try: data = request.get_json() task_id = data.get('task_id') reason = data.get('reason') if not task_id: return jsonify({ 'code': 400, 'message': '缺少 task_id' }), 400 task_manager.cancel_task(task_id, reason) logger.info(f"任务已取消:{task_id}") return jsonify({ 'code': 200, 'message': '任务已取消', 'data': { 'task_id': task_id, 'status': 'cancelled' } }) except Exception as e: logger.error(f"取消任务异常:{e}") return jsonify({ 'code': 500, 'message': f'服务器错误:{str(e)}' }), 500 @app.route('/api/task/status/', methods=['GET']) def get_task_status(task_id): """ 查询任务状态 Response: { "code": 200, "data": { "task_id": "TASK_20260226_001", "status": "pending", "created_at": "2026-02-26 16:30:00", "triggered_at": null, "completed_at": null } } """ try: task = task_manager.get_task(task_id) if not task: return jsonify({ 'code': 404, 'message': '任务不存在' }), 404 return jsonify({ 'code': 200, 'data': { 'task_id': task['task_id'], 'type': task['type'], 'status': task['status'], 'priority': task['priority'], 'retry_count': task['retry_count'], 'created_at': task['created_at'], 'created_by': task['created_by'], 'triggered_at': task.get('triggered_at'), 'completed_at': task.get('completed_at'), 'error_message': task.get('error_message') } }) except Exception as e: logger.error(f"查询任务状态异常:{e}") return jsonify({ 'code': 500, 'message': f'服务器错误:{str(e)}' }), 500 @app.route('/api/task/list', methods=['GET']) def list_tasks(): """ 查询任务列表 Query Params: - customer_id: 客户 ID(可选) - status: 任务状态(可选) - page: 页码(默认 1) - page_size: 每页数量(默认 20) """ try: customer_id = request.args.get('customer_id') status = request.args.get('status') page = int(request.args.get('page', 1)) page_size = int(request.args.get('page_size', 20)) if status: if status == 'pending': tasks = task_manager.get_pending_tasks(customer_id) else: # TODO: 实现其他状态查询 tasks = [] else: tasks = task_manager.get_pending_tasks(customer_id) # 分页 total = len(tasks) start = (page - 1) * page_size end = start + page_size tasks_page = tasks[start:end] return jsonify({ 'code': 200, 'data': { 'total': total, 'page': page, 'page_size': page_size, 'tasks': tasks_page } }) except Exception as e: logger.error(f"查询任务列表异常:{e}") return jsonify({ 'code': 500, 'message': f'服务器错误:{str(e)}' }), 500 @app.route('/api/health', methods=['GET']) def health_check(): """健康检查""" return jsonify({ 'code': 200, 'message': 'OK', 'data': { 'timestamp': datetime.now().isoformat(), 'service': 'ai-cs-tianwang-bridge' } }) @app.route('/api/metrics', methods=['GET']) def metrics_dashboard(): """ 运行与业务看板 Query: - hours: 统计窗口小时数,默认24 """ try: hours = int(request.args.get('hours', 24)) if hours <= 0: hours = 24 from utils.metrics_tracker import get_dashboard data = get_dashboard(hours=hours) return jsonify({ 'code': 200, 'message': 'OK', 'data': data, }) except Exception as e: logger.error(f"指标看板异常:{e}") return jsonify({ 'code': 500, 'message': f'服务器错误:{str(e)}' }), 500 def start_http_server(host='0.0.0.0', port=6060, debug=False): """启动 HTTP 服务器""" global task_manager, task_scheduler task_manager = get_task_manager() logger.info(f"HTTP API 服务器启动:http://{host}:{port}") # 在新线程中运行 Flask thread = threading.Thread( target=app.run, kwargs={ 'host': host, 'port': port, 'debug': debug, 'use_reloader': False }, daemon=True ) thread.start() return thread if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s' ) start_http_server(port=6060, debug=True)