Skip to content

Commit

Permalink
Build up the Consumers map when normalizing the Config
Browse files Browse the repository at this point in the history
  • Loading branch information
momingkotoba committed Dec 30, 2022
1 parent 379f657 commit dfc271d
Show file tree
Hide file tree
Showing 11 changed files with 703 additions and 680 deletions.
968 changes: 534 additions & 434 deletions clickhouse_sinker-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/nacos_publish_config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func PublishSinkerConfig() {
return
}

if err = cfg.Normallize(); err != nil {
if err = cfg.Normallize(false, ""); err != nil {
util.Logger.Fatal("cfg.Normallize failed", zap.Error(err))
return
}
Expand Down
48 changes: 45 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"

"github.com/hjson/hjson-go/v4"
"go.uber.org/zap"

"github.com/housepower/clickhouse_sinker/util"

Expand All @@ -35,6 +36,7 @@ type Config struct {
Tasks []*TaskConfig
Assignment Assignment
LogLevel string
Groups map[string]*GroupConfig `json:"-"`
}

// KafkaConfig configuration parameters
Expand Down Expand Up @@ -146,7 +148,7 @@ type TaskConfig struct {

// ShardingKey is the column name to which sharding against
ShardingKey string `json:"shardingKey,omitempty"`
// ShardingStripe take effect iff the sharding key is numerical
// ShardingStripe take effect if the sharding key is numerical
ShardingStripe uint64 `json:"shardingStripe,omitempty"`

FlushInterval int `json:"flushInterval,omitempty"`
Expand All @@ -155,6 +157,15 @@ type TaskConfig struct {
TimeUnit float64 `json:"timeUnit"`
}

type GroupConfig struct {
Name string
Topics []string
Earliest bool
FlushInterval int
BufferSize int
Configs map[string]*TaskConfig
}

type Assignment struct {
Version int
UpdatedAt int64 // timestamp when created
Expand All @@ -175,7 +186,9 @@ const (
)

func ParseLocalCfgFile(cfgPath string) (cfg *Config, err error) {
cfg = &Config{}
cfg = &Config{
Groups: make(map[string]*GroupConfig),
}
var b []byte
b, err = os.ReadFile(cfgPath)
if err != nil {
Expand All @@ -190,7 +203,7 @@ func ParseLocalCfgFile(cfgPath string) (cfg *Config, err error) {
}

// Normalize and validate configuration
func (cfg *Config) Normallize() (err error) {
func (cfg *Config) Normallize(constructGroup bool, httpAddr string) (err error) {
if len(cfg.Clickhouse.Hosts) == 0 || cfg.Kafka.Brokers == "" {
err = errors.Newf("invalid configuration")
return
Expand Down Expand Up @@ -244,6 +257,35 @@ func (cfg *Config) Normallize() (err error) {
if err = cfg.normallizeTask(taskCfg); err != nil {
return
}
if constructGroup {
if httpAddr != "" && !cfg.IsAssigned(httpAddr, taskCfg.Name) {
continue
}
gCfg, ok := cfg.Groups[taskCfg.ConsumerGroup]
if !ok {
gCfg = &GroupConfig{
Name: taskCfg.ConsumerGroup,
Earliest: taskCfg.Earliest,
Topics: []string{taskCfg.Topic},
FlushInterval: taskCfg.FlushInterval,
BufferSize: taskCfg.BufferSize,
Configs: make(map[string]*TaskConfig),
}
gCfg.Configs[taskCfg.Name] = taskCfg
cfg.Groups[taskCfg.ConsumerGroup] = gCfg
} else {
if gCfg.Earliest != taskCfg.Earliest {
util.Logger.Fatal("Tasks are sharing same consumer group, but with different Earliest property specified!",
zap.String("task", gCfg.Name), zap.String("task", taskCfg.Name))
} else if gCfg.FlushInterval != taskCfg.FlushInterval {
util.Logger.Fatal("Tasks are sharing same consumer group, but with different FlushInterval property specified!",
zap.String("task", gCfg.Name), zap.String("task", taskCfg.Name))
}
gCfg.Topics = append(gCfg.Topics, taskCfg.Topic)
gCfg.BufferSize += taskCfg.BufferSize
gCfg.Configs[taskCfg.Name] = taskCfg
}
}
}
switch strings.ToLower(cfg.LogLevel) {
case "debug", "info", "warn", "error", "dpanic", "panic", "fatal":
Expand Down
2 changes: 1 addition & 1 deletion config_manager/nacos.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (ncm *NacosConfManager) GetConfig() (conf *config.Config, err error) {
err = errors.Wrapf(err, "")
return
}
conf = &config.Config{}
conf = &config.Config{Groups: make(map[string]*config.GroupConfig)}
if err = hjson.Unmarshal([]byte(content), conf); err != nil {
err = errors.Wrapf(err, "")
return
Expand Down
1 change: 0 additions & 1 deletion docker/test_auto_schema.hjson
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
parser: json
autoSchema: true
tableName: test_auto_schema
shardingKey: name
excludeColumns: []
bufferSize: 50000
}
Expand Down
1 change: 0 additions & 1 deletion docker/test_dynamic_schema.hjson
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
dynamicSchema: {
enable: true
}
shardingKey: name
bufferSize: 50000
}
logLevel: info
Expand Down
1 change: 0 additions & 1 deletion docker/test_fixed_schema.hjson
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
type: Float32
}
]
shardingKey: name
bufferSize: 50000
}
logLevel: info
Expand Down
11 changes: 2 additions & 9 deletions input/kafka_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,11 @@ const (
RetryBackoff = 5 * time.Second
)

type GroupConfig struct {
Name string
Topics []string
Earliest bool
FlushInterval int
}

// KafkaFranz implements input.Inputer
// refers to examples/group_consuming/main.go
type KafkaFranz struct {
cfg *config.Config
grpConfig *GroupConfig
grpConfig *config.GroupConfig
cl *kgo.Client
ctx context.Context
cancel context.CancelFunc
Expand All @@ -73,7 +66,7 @@ func NewKafkaFranz() *KafkaFranz {
}

// Init Initialise the kafka instance with configuration
func (k *KafkaFranz) Init(cfg *config.Config, gCfg *GroupConfig, f chan *kgo.Fetches, cleanupFn func()) (err error) {
func (k *KafkaFranz) Init(cfg *config.Config, gCfg *config.GroupConfig, f chan *kgo.Fetches, cleanupFn func()) (err error) {
k.cfg = cfg
k.grpConfig = gCfg
k.ctx, k.cancel = context.WithCancel(context.Background())
Expand Down
88 changes: 6 additions & 82 deletions statistics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,55 +40,13 @@ var (
},
[]string{"task"},
)
ConsumeMsgsErrorTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "consumer_msgs_error_total",
Help: "total num of consume errors",
},
[]string{"group"},
)
ParseMsgsErrorTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "parse_msgs_error_total",
Help: "total num of msgs with parse failure",
},
[]string{"task"},
)
RingMsgsOffTooSmallErrorTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "ring_msgs_offset_too_small_error_total",
Help: "total num of msgs with too small offset to put into ring",
},
[]string{"task"},
)
RingMsgsOffTooLargeErrorTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "ring_msgs_offset_too_large_error_total",
Help: "total num of msgs with too large offset to put into ring",
},
[]string{"task"},
)
RingNormalBatchsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "ring_normal_batchs_total",
Help: "total num of normal batches generated",
},
[]string{"task"},
)
RingForceBatchsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "ring_force_batchs_total",
Help: "total num of force batches generated",
},
[]string{"task"},
)
RingForceBatchAllTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "ring_force_batch_all_total",
Help: "total num of force batch_all generated",
},
[]string{"task"},
)
FlushMsgsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "flush_msgs_total",
Expand All @@ -108,21 +66,7 @@ var (
Name: prefix + "consume_offsets",
Help: "last committed offset for each topic partition pair",
},
[]string{"task", "topic", "partition"},
)
ParsedRingMsgs = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "parsed_ring_msgs",
Help: "num of parsed msgs in ring",
},
[]string{"task"},
)
RingMsgs = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "ring_msgs",
Help: "num of msgs in ring",
},
[]string{"task"},
[]string{"consumer", "topic", "partition"},
)
ShardMsgs = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand All @@ -131,13 +75,6 @@ var (
},
[]string{"task"},
)
ParsingPoolBacklog = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "parsing_pool_backlog",
Help: "GlobalParsingPool backlog",
},
[]string{"task"},
)
WritingPoolBacklog = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: prefix + "writing_pool_backlog",
Expand Down Expand Up @@ -193,20 +130,11 @@ var (

func init() {
prometheus.MustRegister(ConsumeMsgsTotal)
prometheus.MustRegister(ConsumeMsgsErrorTotal)
prometheus.MustRegister(ParseMsgsErrorTotal)
prometheus.MustRegister(RingMsgsOffTooSmallErrorTotal)
prometheus.MustRegister(RingMsgsOffTooLargeErrorTotal)
prometheus.MustRegister(RingNormalBatchsTotal)
prometheus.MustRegister(RingForceBatchsTotal)
prometheus.MustRegister(RingForceBatchAllTotal)
prometheus.MustRegister(FlushMsgsTotal)
prometheus.MustRegister(FlushMsgsErrorTotal)
prometheus.MustRegister(ConsumeOffsets)
prometheus.MustRegister(ParsedRingMsgs)
prometheus.MustRegister(RingMsgs)
prometheus.MustRegister(ShardMsgs)
prometheus.MustRegister(ParsingPoolBacklog)
prometheus.MustRegister(WritingPoolBacklog)
prometheus.MustRegister(WritingDurations)
prometheus.MustRegister(WriteSeriesAllowNew)
Expand Down Expand Up @@ -294,22 +222,18 @@ func (p *Pusher) reconnect() {
}
p.pusher = push.New(p.pgwAddrs[nextAddr], "clickhouse_sinker").
Collector(ConsumeMsgsTotal).
Collector(ConsumeMsgsErrorTotal).
Collector(ParseMsgsErrorTotal).
Collector(RingMsgsOffTooSmallErrorTotal).
Collector(RingMsgsOffTooLargeErrorTotal).
Collector(RingNormalBatchsTotal).
Collector(RingForceBatchsTotal).
Collector(RingForceBatchAllTotal).
Collector(FlushMsgsTotal).
Collector(FlushMsgsErrorTotal).
Collector(ConsumeOffsets).
Collector(RingMsgs).
Collector(ParsedRingMsgs).
Collector(ShardMsgs).
Collector(ParsingPoolBacklog).
Collector(WritingPoolBacklog).
Collector(WritingDurations).
Collector(WriteSeriesAllowNew).
Collector(WriteSeriesAllowChanged).
Collector(WriteSeriesDropQuota).
Collector(WriteSeriesDropUnchanged).
Collector(WriteSeriesSucceed).
Grouping("instance", p.instance).Format(expfmt.FmtText)
p.inUseAddr = nextAddr
}
Loading

0 comments on commit dfc271d

Please sign in to comment.