Skip to content

Commit

Permalink
fix - avoid program running into stuck when facing fatalpanic
Browse files Browse the repository at this point in the history
  • Loading branch information
momingkotoba committed Mar 27, 2023
1 parent 40a5f67 commit 0b28e44
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 26 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

Improvements:
- Have writingpool per shard to avoid ErrAcquireConnTimeout
- Do not create kafka client everytime when caculate lags
- Support configuring PlainloginModule in kafka.security section

Bug Fixes:
- Avoid program running into stuck when facing fatalpanic


#### Version 3.0.2 (2023-03-13)

Expand Down
4 changes: 1 addition & 3 deletions cmd/kafka_gen_log/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ func (g *LogGenerator) Init() error {
g.lineno = 0
fnPatt := regexp.MustCompile(LogfilePattern)
d, err := os.Open(LogfileDir)
defer func() {
d.Close()
}()
defer d.Close()
if err != nil {
err = errors.Wrapf(err, "")
return err
Expand Down
25 changes: 12 additions & 13 deletions config_manager/lags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@ type StateLag struct {

// GetTaskStateAndLags get state and lag of all tasks.
func GetTaskStateAndLags(cfg *config.Config) (stateLags map[string]StateLag, err error) {
_, adm, err := newClient(cfg.Kafka)
if err != nil {
return
kconf := cfg.Kafka
if !reflect.DeepEqual(&kconf, kafkaConfig) {
cleanupKafkaClient()
if err = newClient(cfg.Kafka); err != nil {
return
}
kafkaConfig = &kconf
}

stateLags = make(map[string]StateLag, len(cfg.Tasks))
for _, taskCfg := range cfg.Tasks {
var state string
var totalLags int64
if state, totalLags, err = getStateAndLag(adm, taskCfg.Topic, taskCfg.ConsumerGroup); err != nil {
if state, totalLags, err = getStateAndLag(theAdm, taskCfg.Topic, taskCfg.ConsumerGroup); err != nil {
return
}
stateLags[taskCfg.Name] = StateLag{State: state, Lag: totalLags}
Expand All @@ -46,21 +50,16 @@ func GetTaskStateAndLags(cfg *config.Config) (stateLags map[string]StateLag, err
func cleanupKafkaClient() {
if theCl != nil {
theCl.Close()
theCl = nil
}
if theAdm != nil {
theAdm.Close()
theAdm = nil
}
}

func newClient(cfg config.KafkaConfig) (cl *kgo.Client, adm *kadm.Client, err error) {
func newClient(cfg config.KafkaConfig) (err error) {
var opts []kgo.Opt
if reflect.DeepEqual(&cfg, kafkaConfig) {
return theCl, theAdm, nil
}

cleanupKafkaClient()

kafkaConfig = &cfg
if opts, err = input.GetFranzConfig(&cfg); err != nil {
return
}
Expand All @@ -70,7 +69,7 @@ func newClient(cfg config.KafkaConfig) (cl *kgo.Client, adm *kadm.Client, err er
return
}
theAdm = kadm.NewClient(theCl)
return theCl, theAdm, err
return
}

// getStateAndLag is inspired by https://github.com/cloudhut/kminion/blob/1ffd02ba94a5edc26d4f11e57191ed3479d8a111/prometheus/collect_consumer_group_lags.go
Expand Down
2 changes: 0 additions & 2 deletions docs/configuration/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@

"task": {
"name": "test_dynamic_schema",
// kafka client, possible values: sarama, kafka-go. (defaults to sarama)
"kafkaClient": "sarama",
// kafka topic
"topic": "topic",
// kafka consume from earliest or latest
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/ClickHouse/ch-go v0.54.0 h1:WzPo/iZ8Gchb9Ze30TywMheOzKW5FkhTeZPxvz/iE6o=
github.com/ClickHouse/ch-go v0.54.0/go.mod h1:2jvyjBRb5zhzFvcOBxPelzpbB9qsS47vwJssurJe2OA=
github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV1coaaFcI0=
github.com/ClickHouse/clickhouse-go/v2 v2.7.0 h1:KFRvFjnewYkJBwkfBvDYESwZtZmQipz/xRuaBz0oVNA=
github.com/ClickHouse/clickhouse-go/v2 v2.7.0/go.mod h1:6I79Gj2EPbV/DdlDShfCaxrja/pxLVSfDrvEEQp77VE=
github.com/DATA-DOG/go-sqlmock v1.3.0 h1:ljjRxlddjfChBJdFKJs5LuCwCWPLaC1UZLwAo3PBBMk=
Expand Down Expand Up @@ -32,7 +31,6 @@ github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx2
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.8.6 h1:aUgO9S8gvdN6SyW2EhIpAw5E4ChworywIEndZCkCVXk=
github.com/bytedance/sonic v1.8.6/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
14 changes: 8 additions & 6 deletions task/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,12 @@ func (s *Sinker) GetCurrentConfig() *config.Config {
func (s *Sinker) Run() {
var err error
var newCfg *config.Config
defer func() {
s.exitCh <- struct{}{}
}()

if s.cmdOps.PushGatewayAddrs != "" {
addrs := strings.Split(s.cmdOps.PushGatewayAddrs, ",")
s.pusher = statistics.NewPusher(addrs, s.cmdOps.PushInterval, s.httpAddr)
if err = s.pusher.Init(); err != nil {
util.Logger.Error("failed to initialize connection to the specified push gateway address", zap.Error(err))
util.Logger.Fatal("failed to initialize connection to the specified push gateway address", zap.Error(err))
return
}
go s.pusher.Run()
Expand Down Expand Up @@ -136,11 +134,12 @@ func (s *Sinker) Run() {
util.Logger.Fatal("s.applyConfig failed", zap.Error(err))
return
}
LOOP:
for {
select {
case <-s.ctx.Done():
util.Logger.Info("Sinker.Run quit due to context has been canceled")
return
break LOOP
case c := <-s.consumerRestartCh:
// only restart the consumer which was not changed in applyAnotherConfig
if c == s.consumers[c.grpConfig.Name] {
Expand Down Expand Up @@ -171,11 +170,12 @@ func (s *Sinker) Run() {
// Golang <-time.After() is not garbage collected before expiry.
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
WORKLOOP:
for {
select {
case <-s.ctx.Done():
util.Logger.Info("Sinker.Run quit due to context has been canceled")
return
break WORKLOOP
case <-ticker.C:
if newCfg, err = s.rcm.GetConfig(); err != nil {
util.Logger.Error("s.rcm.GetConfig failed", zap.Error(err))
Expand Down Expand Up @@ -217,6 +217,8 @@ func (s *Sinker) Run() {
}
}
}

s.exitCh <- struct{}{}
}

// Close shutdown task
Expand Down

0 comments on commit 0b28e44

Please sign in to comment.