From 0b28e445286bcec8d8c67877ad75f8cecb167823 Mon Sep 17 00:00:00 2001 From: wangxinbo Date: Thu, 23 Mar 2023 18:35:04 +0800 Subject: [PATCH] fix - avoid program running into stuck when facing fatalpanic --- CHANGELOG.md | 6 ++++++ cmd/kafka_gen_log/main.go | 4 +--- config_manager/lags.go | 25 ++++++++++++------------- docs/configuration/config.md | 2 -- go.sum | 2 -- task/sinker.go | 14 ++++++++------ 6 files changed, 27 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 82e9fae3..ab0e9d40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ Improvements: - Have writingpool per shard to avoid ErrAcquireConnTimeout +- Do not create kafka client everytime when caculate lags +- Support configuring PlainloginModule in kafka.security section + +Bug Fixes: +- Avoid program running into stuck when facing fatalpanic + #### Version 3.0.2 (2023-03-13) diff --git a/cmd/kafka_gen_log/main.go b/cmd/kafka_gen_log/main.go index 34c8677b..8bb86be5 100644 --- a/cmd/kafka_gen_log/main.go +++ b/cmd/kafka_gen_log/main.go @@ -151,9 +151,7 @@ func (g *LogGenerator) Init() error { g.lineno = 0 fnPatt := regexp.MustCompile(LogfilePattern) d, err := os.Open(LogfileDir) - defer func() { - d.Close() - }() + defer d.Close() if err != nil { err = errors.Wrapf(err, "") return err diff --git a/config_manager/lags.go b/config_manager/lags.go index 89d883c7..d5881116 100644 --- a/config_manager/lags.go +++ b/config_manager/lags.go @@ -25,16 +25,20 @@ type StateLag struct { // GetTaskStateAndLags get state and lag of all tasks. func GetTaskStateAndLags(cfg *config.Config) (stateLags map[string]StateLag, err error) { - _, adm, err := newClient(cfg.Kafka) - if err != nil { - return + kconf := cfg.Kafka + if !reflect.DeepEqual(&kconf, kafkaConfig) { + cleanupKafkaClient() + if err = newClient(cfg.Kafka); err != nil { + return + } + kafkaConfig = &kconf } stateLags = make(map[string]StateLag, len(cfg.Tasks)) for _, taskCfg := range cfg.Tasks { var state string var totalLags int64 - if state, totalLags, err = getStateAndLag(adm, taskCfg.Topic, taskCfg.ConsumerGroup); err != nil { + if state, totalLags, err = getStateAndLag(theAdm, taskCfg.Topic, taskCfg.ConsumerGroup); err != nil { return } stateLags[taskCfg.Name] = StateLag{State: state, Lag: totalLags} @@ -46,21 +50,16 @@ func GetTaskStateAndLags(cfg *config.Config) (stateLags map[string]StateLag, err func cleanupKafkaClient() { if theCl != nil { theCl.Close() + theCl = nil } if theAdm != nil { theAdm.Close() + theAdm = nil } } -func newClient(cfg config.KafkaConfig) (cl *kgo.Client, adm *kadm.Client, err error) { +func newClient(cfg config.KafkaConfig) (err error) { var opts []kgo.Opt - if reflect.DeepEqual(&cfg, kafkaConfig) { - return theCl, theAdm, nil - } - - cleanupKafkaClient() - - kafkaConfig = &cfg if opts, err = input.GetFranzConfig(&cfg); err != nil { return } @@ -70,7 +69,7 @@ func newClient(cfg config.KafkaConfig) (cl *kgo.Client, adm *kadm.Client, err er return } theAdm = kadm.NewClient(theCl) - return theCl, theAdm, err + return } // getStateAndLag is inspired by https://github.com/cloudhut/kminion/blob/1ffd02ba94a5edc26d4f11e57191ed3479d8a111/prometheus/collect_consumer_group_lags.go diff --git a/docs/configuration/config.md b/docs/configuration/config.md index 268fc310..e0ef501a 100644 --- a/docs/configuration/config.md +++ b/docs/configuration/config.md @@ -89,8 +89,6 @@ "task": { "name": "test_dynamic_schema", - // kafka client, possible values: sarama, kafka-go. (defaults to sarama) - "kafkaClient": "sarama", // kafka topic "topic": "topic", // kafka consume from earliest or latest diff --git a/go.sum b/go.sum index a7e1b5b5..d3909743 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/ClickHouse/ch-go v0.54.0 h1:WzPo/iZ8Gchb9Ze30TywMheOzKW5FkhTeZPxvz/iE6o= github.com/ClickHouse/ch-go v0.54.0/go.mod h1:2jvyjBRb5zhzFvcOBxPelzpbB9qsS47vwJssurJe2OA= -github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV1coaaFcI0= github.com/ClickHouse/clickhouse-go/v2 v2.7.0 h1:KFRvFjnewYkJBwkfBvDYESwZtZmQipz/xRuaBz0oVNA= github.com/ClickHouse/clickhouse-go/v2 v2.7.0/go.mod h1:6I79Gj2EPbV/DdlDShfCaxrja/pxLVSfDrvEEQp77VE= github.com/DATA-DOG/go-sqlmock v1.3.0 h1:ljjRxlddjfChBJdFKJs5LuCwCWPLaC1UZLwAo3PBBMk= @@ -32,7 +31,6 @@ github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx2 github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.8.6 h1:aUgO9S8gvdN6SyW2EhIpAw5E4ChworywIEndZCkCVXk= github.com/bytedance/sonic v1.8.6/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/task/sinker.go b/task/sinker.go index 26d12b40..6c32f736 100644 --- a/task/sinker.go +++ b/task/sinker.go @@ -99,14 +99,12 @@ func (s *Sinker) GetCurrentConfig() *config.Config { func (s *Sinker) Run() { var err error var newCfg *config.Config - defer func() { - s.exitCh <- struct{}{} - }() + if s.cmdOps.PushGatewayAddrs != "" { addrs := strings.Split(s.cmdOps.PushGatewayAddrs, ",") s.pusher = statistics.NewPusher(addrs, s.cmdOps.PushInterval, s.httpAddr) if err = s.pusher.Init(); err != nil { - util.Logger.Error("failed to initialize connection to the specified push gateway address", zap.Error(err)) + util.Logger.Fatal("failed to initialize connection to the specified push gateway address", zap.Error(err)) return } go s.pusher.Run() @@ -136,11 +134,12 @@ func (s *Sinker) Run() { util.Logger.Fatal("s.applyConfig failed", zap.Error(err)) return } + LOOP: for { select { case <-s.ctx.Done(): util.Logger.Info("Sinker.Run quit due to context has been canceled") - return + break LOOP case c := <-s.consumerRestartCh: // only restart the consumer which was not changed in applyAnotherConfig if c == s.consumers[c.grpConfig.Name] { @@ -171,11 +170,12 @@ func (s *Sinker) Run() { // Golang <-time.After() is not garbage collected before expiry. ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() + WORKLOOP: for { select { case <-s.ctx.Done(): util.Logger.Info("Sinker.Run quit due to context has been canceled") - return + break WORKLOOP case <-ticker.C: if newCfg, err = s.rcm.GetConfig(); err != nil { util.Logger.Error("s.rcm.GetConfig failed", zap.Error(err)) @@ -217,6 +217,8 @@ func (s *Sinker) Run() { } } } + + s.exitCh <- struct{}{} } // Close shutdown task