Skip to content

Commit

Permalink
Added config watcher and supporting metrics reloading
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierCazade committed Feb 22, 2024
1 parent 9a314b8 commit 7f972e1
Show file tree
Hide file tree
Showing 13 changed files with 450 additions and 61 deletions.
22 changes: 17 additions & 5 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,23 @@ type Options struct {
}

type ConfigFileStruct struct {
LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"`
MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"`
Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"`
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
PerfSettings PerfSettings `yaml:"perfSettings,omitempty" json:"perfSettings,omitempty"`
LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"`
MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"`
Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"`
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
PerfSettings PerfSettings `yaml:"perfSettings,omitempty" json:"perfSettings,omitempty"`
DynamicParameters DynamicParameters `yaml:"dynamicParameters,omitempty" json:"dynamicParameters,omitempty"`
}

type DynamicParameters struct {
Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty"`
Name string `yaml:"name,omitempty" json:"name,omitempty"`
FileName string `yaml:"fileName,omitempty" json:"fileName,omitempty"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
}

type ConfigHotReloadStruct struct {
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
}

type Health struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/pipeline/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@ type encodeNone struct {

type Encoder interface {
Encode(in config.GenericMap)
Update(config.StageParam)
}

// Encode encodes a flow before being stored
func (t *encodeNone) Encode(in config.GenericMap) {
t.prevRecord = in
}

func (t *encodeNone) Update(_ config.StageParam) {
log.Warn("Encode None, update not supported")
}

// NewEncodeNone create a new encode
func NewEncodeNone() (Encoder, error) {
log.Debugf("entering NewEncodeNone")
Expand Down
4 changes: 4 additions & 0 deletions pkg/pipeline/encode/encode_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (r *encodeKafka) Encode(entry config.GenericMap) {
}
}

func (t *encodeKafka) Update(_ config.StageParam) {
log.Warn("Encode Kafka, update not supported")
}

// NewEncodeKafka create a new writer to kafka
func NewEncodeKafka(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) {
log.Debugf("entering NewEncodeKafka")
Expand Down
250 changes: 234 additions & 16 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package encode

import (
"fmt"
"reflect"
"strings"
"time"

Expand Down Expand Up @@ -53,17 +54,18 @@ type histoInfo struct {
type EncodeProm struct {
cfg *api.PromEncode
registerer prometheus.Registerer
gauges []gaugeInfo
counters []counterInfo
histos []histoInfo
aggHistos []histoInfo
gauges map[string]gaugeInfo
counters map[string]counterInfo
histos map[string]histoInfo
aggHistos map[string]histoInfo
expiryTime time.Duration
mCache *putils.TimedCache
mChacheLenMetric prometheus.Gauge
exitChan <-chan struct{}
metricsProcessed prometheus.Counter
metricsDropped prometheus.Counter
errorsCounter *prometheus.CounterVec
updateChan chan config.StageParam
}

var (
Expand Down Expand Up @@ -261,6 +263,217 @@ func (e *EncodeProm) cleanupExpiredEntriesLoop() {
}
}

func (e *EncodeProm) addCounter(fullMetricName string, mInfo *MetricInfo) {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(counter)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.counters[fullMetricName] = counterInfo{
counter: counter,
info: mInfo,
}
}

func (e *EncodeProm) addGauge(fullMetricName string, mInfo *MetricInfo) {
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(gauge)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.gauges[fullMetricName] = gaugeInfo{
gauge: gauge,
info: mInfo,
}
}
func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *MetricInfo) {
histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(histogram)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.histos[fullMetricName] = histoInfo{
histo: histogram,
info: mInfo,
}
}
func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *MetricInfo) {
agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(agghistogram)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.aggHistos[fullMetricName] = histoInfo{
histo: agghistogram,
info: mInfo,
}
}

func (e *EncodeProm) cleanDeletedCounters(newCfg api.PromEncode) {
for fullName, counter := range e.counters {
if !strings.HasPrefix(fullName, newCfg.Prefix) {
e.registerer.Unregister(counter.counter)
delete(e.counters, fullName)
}
metricName := strings.TrimPrefix(fullName, newCfg.Prefix)
found := false
for _, mCfg := range newCfg.Metrics {
if metricName == mCfg.Name {
found = true
break
}
}
if !found {
e.registerer.Unregister(counter.counter)
delete(e.counters, fullName)

}
}

}

func (e *EncodeProm) cleanDeletedGauges(newCfg api.PromEncode) {
for fullName, gauge := range e.gauges {
if !strings.HasPrefix(fullName, newCfg.Prefix) {
e.registerer.Unregister(gauge.gauge)
delete(e.gauges, fullName)
}
metricName := strings.TrimPrefix(fullName, newCfg.Prefix)
found := false
for _, mCfg := range newCfg.Metrics {
if metricName == mCfg.Name {
found = true
break
}
}
if !found {
e.registerer.Unregister(gauge.gauge)
delete(e.gauges, fullName)

}
}

}

func (e *EncodeProm) cleanDeletedHistograms(newCfg api.PromEncode) {
for fullName, histogram := range e.histos {
if !strings.HasPrefix(fullName, newCfg.Prefix) {
e.registerer.Unregister(histogram.histo)
delete(e.histos, fullName)
}
metricName := strings.TrimPrefix(fullName, newCfg.Prefix)
found := false
for _, mCfg := range newCfg.Metrics {
if metricName == mCfg.Name {
found = true
break
}
}
if !found {
e.registerer.Unregister(histogram.histo)
delete(e.histos, fullName)

}
}

}

func (e *EncodeProm) cleanDeletedAggHistograms(newCfg api.PromEncode) {
for fullName, aggHisto := range e.aggHistos {
if !strings.HasPrefix(fullName, newCfg.Prefix) {
e.registerer.Unregister(aggHisto.histo)
delete(e.aggHistos, fullName)
}
metricName := strings.TrimPrefix(fullName, newCfg.Prefix)
found := false
for _, mCfg := range newCfg.Metrics {
if metricName == mCfg.Name {
found = true
break
}
}
if !found {
e.registerer.Unregister(aggHisto.histo)
delete(e.aggHistos, fullName)

}
}

}

func (e *EncodeProm) cleandDeletedMetrics(newCfg api.PromEncode) {
e.cleanDeletedCounters(newCfg)
e.cleanDeletedGauges(newCfg)
e.cleanDeletedHistograms(newCfg)
e.cleanDeletedAggHistograms(newCfg)
}

func (e *EncodeProm) checkConfUpdate() {
select {
case stage := <-e.updateChan:
cfg := api.PromEncode{}
if stage.Encode != nil && stage.Encode.Prom != nil {
cfg = *stage.Encode.Prom
}

e.cleandDeletedMetrics(cfg)

for _, mCfg := range cfg.Metrics {
fullMetricName := cfg.Prefix + mCfg.Name
mInfo := CreateMetricInfo(mCfg)
switch mCfg.Type {
case api.MetricEncodeOperationName("Counter"):
if oldMetric, ok := e.counters[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo, oldMetric.info) {
e.registerer.Unregister(oldMetric.counter)
e.addCounter(fullMetricName, mInfo)
}
} else {
// New metric
e.addCounter(fullMetricName, mInfo)
}
case api.MetricEncodeOperationName("Gauge"):
if oldMetric, ok := e.gauges[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo, oldMetric.info) {
e.registerer.Unregister(oldMetric.gauge)
e.addGauge(fullMetricName, mInfo)
}
} else {
// New metric
e.addGauge(fullMetricName, mInfo)
}
case api.MetricEncodeOperationName("Histogram"):
if oldMetric, ok := e.histos[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo, oldMetric.info) {
e.registerer.Unregister(oldMetric.histo)
e.addHistogram(fullMetricName, mInfo)
}
} else {
// New metric
e.addHistogram(fullMetricName, mInfo)
}
case api.MetricEncodeOperationName("AggHistogram"):
if oldMetric, ok := e.aggHistos[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo, oldMetric.info) {
e.registerer.Unregister(oldMetric.histo)
e.addAgghistogram(fullMetricName, mInfo)
}
} else {
// New metric
e.addAgghistogram(fullMetricName, mInfo)
}
case "default":
log.Errorf("invalid metric type = %v, skipping", mCfg.Type)
continue
}

}
default:
//Nothing to do
return
}
}

func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) {
cfg := api.PromEncode{}
if params.Encode != nil && params.Encode.Prom != nil {
Expand All @@ -283,10 +496,10 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
registerer = prometheus.DefaultRegisterer
}

counters := []counterInfo{}
gauges := []gaugeInfo{}
histos := []histoInfo{}
aggHistos := []histoInfo{}
gauges := make(map[string]gaugeInfo)
counters := make(map[string]counterInfo)
histos := make(map[string]histoInfo)
aggHistos := make(map[string]histoInfo)

for _, mCfg := range cfg.Metrics {
fullMetricName := cfg.Prefix + mCfg.Name
Expand All @@ -302,21 +515,21 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
counters = append(counters, counterInfo{
counters[fullMetricName] = counterInfo{
counter: counter,
info: mInfo,
})
}
case api.MetricEncodeOperationName("Gauge"):
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels)
err := registerer.Register(gauge)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
gauges = append(gauges, gaugeInfo{
gauges[fullMetricName] = gaugeInfo{
gauge: gauge,
info: mInfo,
})
}
case api.MetricEncodeOperationName("Histogram"):
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
Expand All @@ -325,10 +538,10 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
histos = append(histos, histoInfo{
histos[fullMetricName] = histoInfo{
histo: hist,
info: mInfo,
})
}
case api.MetricEncodeOperationName("AggHistogram"):
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
Expand All @@ -337,10 +550,10 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
aggHistos = append(aggHistos, histoInfo{
aggHistos[fullMetricName] = histoInfo{
histo: hist,
info: mInfo,
})
}
case "default":
log.Errorf("invalid metric type = %v, skipping", mCfg.Type)
continue
Expand All @@ -367,8 +580,13 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
exitChan: putils.ExitChannel(),
metricsProcessed: opMetrics.NewCounter(&MetricsProcessed, params.Name),
metricsDropped: opMetrics.NewCounter(&MetricsDropped, params.Name),
updateChan: make(chan config.StageParam),
errorsCounter: opMetrics.NewCounterVec(&EncodePromErrors),
}
go w.cleanupExpiredEntriesLoop()
return w, nil
}

func (t *EncodeProm) Update(config config.StageParam) {
t.updateChan <- config
}
Loading

0 comments on commit 7f972e1

Please sign in to comment.