From 51d0f53d0bde04efb24aa5d60a524658d5313f6c Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Fri, 22 Nov 2024 12:49:55 +0800 Subject: [PATCH] enh: wait for all action finish before close --- controller/ws/tmq/tmq.go | 5 +++-- controller/ws/ws/handler.go | 6 +++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/controller/ws/tmq/tmq.go b/controller/ws/tmq/tmq.go index ec6b3c9..b9656ac 100644 --- a/controller/ws/tmq/tmq.go +++ b/controller/ws/tmq/tmq.go @@ -1235,10 +1235,11 @@ func (t *TMQ) Close(logger *logrus.Entry) { }() select { case <-ctx.Done(): - logger.Error("wait for all goroutines to exit timeout") + logger.Warn("wait stop over 1 minute") + <-done case <-done: - logger.Debug("all goroutines exit") } + logger.Debug("wait stop done") isDebug := log.IsDebug() defer func() { diff --git a/controller/ws/ws/handler.go b/controller/ws/ws/handler.go index cb7011b..6e32433 100644 --- a/controller/ws/ws/handler.go +++ b/controller/ws/ws/handler.go @@ -157,7 +157,7 @@ func (h *messageHandler) stop() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - waitCh := make(chan struct{}, 1) + waitCh := make(chan struct{}) go func() { h.wait.Wait() close(waitCh) @@ -165,8 +165,12 @@ func (h *messageHandler) stop() { select { case <-ctx.Done(): + h.logger.Warn("wait stop over 1 minute") + <-waitCh + break case <-waitCh: } + h.logger.Debugf("wait stop done") // clean query result and stmt h.queryResults.FreeAll(h.logger) h.stmts.FreeAll(h.logger)