fix: make coordinator shutdown signal handling idempotent

This commit is contained in:
2026-03-01 17:13:56 +08:00
parent 00c80c3bec
commit 904d5b5693

View File

@@ -84,6 +84,7 @@ class Coordinator:
self.num_workers = num_workers or max(2, cpu_count()) self.num_workers = num_workers or max(2, cpu_count())
self.workers: List[WorkerProcess] = [] self.workers: List[WorkerProcess] = []
self.running = False self.running = False
self._stopping = False
self.enable_agent = enable_agent self.enable_agent = enable_agent
def _get_shard_key(self, acc_id: str, from_id: str) -> int: def _get_shard_key(self, acc_id: str, from_id: str) -> int:
@@ -153,11 +154,17 @@ class Coordinator:
def _signal_handler(self, signum, frame): def _signal_handler(self, signum, frame):
"""信号处理""" """信号处理"""
if self._stopping:
return
self._stopping = True
logger.info(f"收到信号 {signum},正在停止所有工作进程...") logger.info(f"收到信号 {signum},正在停止所有工作进程...")
self.stop() self.stop()
def stop(self): def stop(self):
"""停止所有工作进程""" """停止所有工作进程"""
if self._stopping and not self.running and not any(w.process and w.process.is_alive() for w in self.workers):
return
self._stopping = True
self.running = False self.running = False
for worker in self.workers: for worker in self.workers: