from __future__ import annotations from flask import Flask, jsonify, request from .logger import setup_logger from .runtime_switch import is_listen_only, set_listen_only from .task_manager import TaskManager def create_http_app(task_manager: TaskManager | None = None) -> Flask: app = Flask(__name__) logger = setup_logger() tm = task_manager or TaskManager() @app.get('/api/health') def health(): return jsonify({'ok': True}) @app.get('/api/runtime/listen_only') def get_listen_only(): return jsonify({'ok': True, 'listen_only': is_listen_only()}) @app.post('/api/runtime/listen_only') def set_listen_only_mode(): body = request.get_json(silent=True) or {} if "enabled" not in body: return jsonify({'ok': False, 'error': 'enabled required'}), 400 enabled = bool(body.get("enabled")) current = set_listen_only(enabled) logger.info('[运行时] listen_only=%s', current) return jsonify({'ok': True, 'listen_only': current}) @app.post('/api/task/receive') def receive_task(): payload = request.get_json(silent=True) or {} task_id = tm.create_task(payload) logger.info('[任务] receive task_id=%s', task_id) return jsonify({'ok': True, 'task_id': task_id}) @app.post('/api/task/cancel') def cancel_task(): body = request.get_json(silent=True) or {} task_id = str(body.get('task_id', '')).strip() if not task_id: return jsonify({'ok': False, 'error': 'task_id required'}), 400 ok = tm.cancel_task(task_id) return jsonify({'ok': ok, 'task_id': task_id}) @app.get('/api/task/status/') def task_status(task_id: str): task = tm.get_task(task_id) if not task: return jsonify({'ok': False, 'error': 'not found'}), 404 return jsonify({'ok': True, 'task': task}) @app.get('/api/task/list') def task_list(): limit = int(request.args.get('limit', 100)) return jsonify({'ok': True, 'tasks': tm.list_tasks(limit=limit)}) return app def run_http_server(host: str, port: int, task_manager: TaskManager | None = None) -> None: app = create_http_app(task_manager=task_manager) app.run(host=host, port=port, debug=False, use_reloader=False)