From 180029fbafe1d6aca6b493b1e33c8a4dc1bb2d03 Mon Sep 17 00:00:00 2001 From: wangxinbo Date: Wed, 8 Mar 2023 16:01:35 +0800 Subject: [PATCH] calculate topic lags every 10 seconds --- CHANGELOG.md | 1 + config_manager/lags.go | 2 +- config_manager/nacos.go | 18 ++++++++++++++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ecf4139a..98f5b1f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Improvements: - combine nacos log into sinker log - update dmseries map when applying new config, reload the records from series table every single day - avoid recreating dist tables, alter the table schema instead +- update clickhouse_sinker_consume_lags metric every 10 secs #### Version 3.0.1 (2023-03-03) diff --git a/config_manager/lags.go b/config_manager/lags.go index 403c0df6..266b2fd9 100644 --- a/config_manager/lags.go +++ b/config_manager/lags.go @@ -26,7 +26,7 @@ func GetTaskStateAndLags(cfg *config.Config) (stateLags map[string]StateLag, err defer adm.Close() defer cl.Close() - stateLags = make(map[string]StateLag) + stateLags = make(map[string]StateLag, len(cfg.Tasks)) for _, taskCfg := range cfg.Tasks { var state string var totalLags int64 diff --git a/config_manager/nacos.go b/config_manager/nacos.go index 78605145..88087f9a 100644 --- a/config_manager/nacos.go +++ b/config_manager/nacos.go @@ -226,6 +226,8 @@ func (ncm *NacosConfManager) Run() { // Assign regularly to handle lag change ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() + lagticker := time.NewTicker(10 * time.Second) + defer lagticker.Stop() LOOP_FOR: for { select { @@ -236,6 +238,12 @@ LOOP_FOR: util.Logger.Debug("assign triggered by 5 min timer") if err := ncm.assign(); err != nil { util.Logger.Error("assign failed", zap.Error(err)) + } else { + lagticker.Reset(10 * time.Second) + } + case <-lagticker.C: + if err := ncm.calculateGroupLag(); err != nil { + util.Logger.Error("calculate lag failed", zap.Error(err)) } } } @@ -427,6 +435,16 @@ func (ncm *NacosConfManager) assign() (err error) { return } +func (ncm *NacosConfManager) calculateGroupLag() (err error) { + if len(ncm.curInsts) == 0 || ncm.curCfg == nil || ncm.curInsts[0] != ncm.instance { + // Only the first instance is capable to report the lag + return + } + _, err = GetTaskStateAndLags(ncm.curCfg) + + return +} + type NacosLogger struct { *zap.SugaredLogger }