From be4af9ff5e042caa25b45a94fab57590395f3366 Mon Sep 17 00:00:00 2001 From: Christoph Petrausch Date: Sun, 8 Nov 2020 22:01:36 +0100 Subject: [PATCH] Refactor metric extraction from MQTT This commit allows to extract the metric name from the topic path. Now it can be configured if all metrics are in a object in a certain topic or if every topic contains exactly one metric. However, currently these modes can not be mixed. This should solve !26 TODO: * Update documentation * Add unit tests --- cmd/mqtt2prometheus.go | 32 +++++- hack/Readme.md | 29 ++++++ hack/dht.env | 1 + hack/{mqtt2prometheus.yaml => dht22.yaml} | 0 hack/docker-compose.yml | 6 +- hack/shelly.env | 1 + hack/shelly.yaml | 21 ++++ pkg/config/config.go | 51 ++++++++-- pkg/metrics/collector.go | 11 +- pkg/metrics/extractor.go | 50 +++++++++ pkg/metrics/ingest.go | 118 +++------------------- pkg/metrics/instrumentation.go | 33 ++++++ pkg/metrics/parser.go | 96 ++++++++++++++++++ 13 files changed, 330 insertions(+), 119 deletions(-) create mode 100644 hack/Readme.md create mode 100644 hack/dht.env rename hack/{mqtt2prometheus.yaml => dht22.yaml} (100%) create mode 100644 hack/shelly.env create mode 100644 hack/shelly.yaml create mode 100644 pkg/metrics/extractor.go create mode 100644 pkg/metrics/instrumentation.go create mode 100644 pkg/metrics/parser.go diff --git a/cmd/mqtt2prometheus.go b/cmd/mqtt2prometheus.go index 87ab178..0b87884 100644 --- a/cmd/mqtt2prometheus.go +++ b/cmd/mqtt2prometheus.go @@ -54,6 +54,11 @@ var ( ) ) +type Ingest interface { + MessageMetric() *prometheus.CounterVec + SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHandler +} + func main() { flag.Parse() if *versionFlag { @@ -77,9 +82,12 @@ func main() { mqttClientOptions.SetPassword(cfg.MQTT.Password) mqttClientOptions.SetClientID(mustMQTTClientID()) - collector := metrics.NewCollector(cfg.Cache.Timeout, cfg.Metrics) - ingest := metrics.NewIngest(collector, cfg.Metrics, cfg.MQTT.DeviceIDRegex) - + collector := metrics.NewCollector(cfg.Cache.Timeout, cfg.Metrics, logger) + extractor, err := setupExtractor(cfg) + if err != nil { + logger.Fatal("could not setup a metric extractor", zap.Error(err)) + } + ingest := metrics.NewIngest(collector, extractor, cfg.MQTT.DeviceIDRegex) errorChan := make(chan error, 1) for { @@ -97,7 +105,7 @@ func main() { time.Sleep(10 * time.Second) } - prometheus.MustRegister(ingest.MessageMetric) + prometheus.MustRegister(ingest.MessageMetric()) prometheus.MustRegister(collector) http.Handle("/metrics", promhttp.Handler()) go func() { @@ -163,3 +171,19 @@ func mustSetupLogger() *zap.Logger { config.SetProcessContext(logger) return logger } + +func setupExtractor(cfg config.Config) (metrics.Extractor, error) { + parser := metrics.NewParser(cfg.Metrics) + if cfg.MQTT.ObjectPerTopicConfig != nil { + switch cfg.MQTT.ObjectPerTopicConfig.Encoding { + case config.EncodingJSON: + return metrics.NewJSONObjectExtractor(parser), nil + default: + return nil, fmt.Errorf("unsupported object format: %s", cfg.MQTT.ObjectPerTopicConfig.Encoding) + } + } + if cfg.MQTT.MetricPerTopicConfig != nil { + return metrics.NewMetricPerTopicExtractor(parser, cfg.MQTT.MetricPerTopicConfig.MetricNameRegex), nil + } + return nil, fmt.Errorf("no extractor configured") +} diff --git a/hack/Readme.md b/hack/Readme.md new file mode 100644 index 0000000..6ed939c --- /dev/null +++ b/hack/Readme.md @@ -0,0 +1,29 @@ +# Hack Scenarios + +Required is a MQTT client. I use this: https://github.com/shirou/mqttcli + +## Shelly (Metric Per Topic) +The scenario is the feature requested by issue https://github.com/hikhvar/mqtt2prometheus/issues/26. + +To start the scenario run: +```bash +docker-compose --env-file shelly.env up +``` + +To publish a message run: +```bash +mqttcli pub --host localhost -p 1883 -t shellies/living-room/sensor/temperature '15' +``` + +## DHT22 (Object Per Topic) +The default scenario + +To start the scenario run: +```bash +docker-compose --env-file dht22.env up +``` + +To publish a message run: +```bash +mqttcli pub --host localhost -p 1883 -t v1/devices/me/test -m '{"temperature":"12", "humidity":21}' +``` \ No newline at end of file diff --git a/hack/dht.env b/hack/dht.env new file mode 100644 index 0000000..5f0bf2c --- /dev/null +++ b/hack/dht.env @@ -0,0 +1 @@ +CONFIG=dht22.yaml \ No newline at end of file diff --git a/hack/mqtt2prometheus.yaml b/hack/dht22.yaml similarity index 100% rename from hack/mqtt2prometheus.yaml rename to hack/dht22.yaml diff --git a/hack/docker-compose.yml b/hack/docker-compose.yml index e321ebf..1e2b0b0 100644 --- a/hack/docker-compose.yml +++ b/hack/docker-compose.yml @@ -4,11 +4,15 @@ services: build: context: ../ dockerfile: Dockerfile + command: + - /mqtt2prometheus + - -log-level + - debug ports: - 9641:9641 volumes: - type: bind - source: ./mqtt2prometheus.yaml + source: ./${CONFIG:-dht22.yaml} target: /config.yaml mosquitto: image: eclipse-mosquitto:1.6.9 diff --git a/hack/shelly.env b/hack/shelly.env new file mode 100644 index 0000000..300c90c --- /dev/null +++ b/hack/shelly.env @@ -0,0 +1 @@ +CONFIG=shelly.yaml \ No newline at end of file diff --git a/hack/shelly.yaml b/hack/shelly.yaml new file mode 100644 index 0000000..acfbe7c --- /dev/null +++ b/hack/shelly.yaml @@ -0,0 +1,21 @@ +mqtt: + server: tcp://mosquitto:1883 + topic_path: shellies/+/sensor/+ + device_id_regex: "shellies/(?P.*)/sensor" + metric_per_topic_config: + metric_name_regex: "shellies/(?P.*)/sensor/(?P.*)" + qos: 0 + cache: + timeout: 24h +metrics: + - prom_name: temperature + # The name of the metric in a MQTT JSON message + mqtt_name: temperature + # The prometheus help text for this metric + help: shelly temperature reading + # The prometheus type for this metric. Valid values are: "gauge" and "counter" + type: gauge + # A map of string to string for constant labels. This labels will be attached to every prometheus metric + const_labels: + sensor_type: shelly + # The name of the metric in prometheus \ No newline at end of file diff --git a/pkg/config/config.go b/pkg/config/config.go index 01d37d8..4a4252d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -10,10 +10,13 @@ import ( "gopkg.in/yaml.v2" ) -const GaugeValueType = "gauge" -const CounterValueType = "counter" +const ( + GaugeValueType = "gauge" + CounterValueType = "counter" -const DeviceIDRegexGroup = "deviceid" + DeviceIDRegexGroup = "deviceid" + MetricNameRegexGroup = "metricname" +) var MQTTConfigDefaults = MQTTConfig{ Server: "tcp://127.0.0.1:1883", @@ -87,12 +90,24 @@ type CacheConfig struct { } type MQTTConfig struct { - Server string `yaml:"server"` - TopicPath string `yaml:"topic_path"` - DeviceIDRegex *Regexp `yaml:"device_id_regex"` - User string `yaml:"user"` - Password string `yaml:"password"` - QoS byte `yaml:"qos"` + Server string `yaml:"server"` + TopicPath string `yaml:"topic_path"` + DeviceIDRegex *Regexp `yaml:"device_id_regex"` + User string `yaml:"user"` + Password string `yaml:"password"` + QoS byte `yaml:"qos"` + ObjectPerTopicConfig *ObjectPerTopicConfig `yaml:"object_per_topic_config"` + MetricPerTopicConfig *MetricPerTopicConfig `yaml:"metric_per_topic_config"` +} + +const EncodingJSON = "JSON" + +type ObjectPerTopicConfig struct { + Encoding string `yaml:"encoding"` // Currently only JSON is a valid value +} + +type MetricPerTopicConfig struct { + MetricNameRegex *Regexp `yaml:"metric_name_regex"` // Default } // Metrics Config is a mapping between a metric send on mqtt to a prometheus metric @@ -157,5 +172,23 @@ func LoadConfig(configFile string) (Config, error) { if !validRegex { return Config{}, fmt.Errorf("device id regex %q does not contain required regex group %q", cfg.MQTT.DeviceIDRegex.pattern, DeviceIDRegexGroup) } + if cfg.MQTT.ObjectPerTopicConfig == nil && cfg.MQTT.MetricPerTopicConfig == nil { + cfg.MQTT.ObjectPerTopicConfig = &ObjectPerTopicConfig{ + Encoding: EncodingJSON, + } + } + + if cfg.MQTT.MetricPerTopicConfig != nil { + validRegex = false + for _, name := range cfg.MQTT.MetricPerTopicConfig.MetricNameRegex.RegEx().SubexpNames() { + if name == MetricNameRegexGroup { + validRegex = true + } + } + if !validRegex { + return Config{}, fmt.Errorf("metric name regex %q does not contain required regex group %q", cfg.MQTT.DeviceIDRegex.pattern, MetricNameRegexGroup) + } + } + return cfg, nil } diff --git a/pkg/metrics/collector.go b/pkg/metrics/collector.go index d6f2c90..cbc6991 100644 --- a/pkg/metrics/collector.go +++ b/pkg/metrics/collector.go @@ -1,6 +1,7 @@ package metrics import ( + "go.uber.org/zap" "time" "github.com/hikhvar/mqtt2prometheus/pkg/config" @@ -18,6 +19,7 @@ type Collector interface { type MemoryCachedCollector struct { cache *gocache.Cache descriptions []*prometheus.Desc + logger *zap.Logger } type Metric struct { @@ -30,7 +32,7 @@ type Metric struct { type MetricCollection []Metric -func NewCollector(defaultTimeout time.Duration, possibleMetrics []config.MetricConfig) Collector { +func NewCollector(defaultTimeout time.Duration, possibleMetrics []config.MetricConfig, logger *zap.Logger) Collector { var descs []*prometheus.Desc for _, m := range possibleMetrics { descs = append(descs, m.PrometheusDescription()) @@ -38,10 +40,14 @@ func NewCollector(defaultTimeout time.Duration, possibleMetrics []config.MetricC return &MemoryCachedCollector{ cache: gocache.New(defaultTimeout, defaultTimeout*10), descriptions: descs, + logger: logger, } } func (c *MemoryCachedCollector) Observe(deviceID string, collection MetricCollection) { + if len(collection) < 1 { + return + } c.cache.Set(deviceID, collection, DefaultTimeout) } @@ -55,6 +61,9 @@ func (c *MemoryCachedCollector) Collect(mc chan<- prometheus.Metric) { for device, metricsRaw := range c.cache.Items() { metrics := metricsRaw.Object.(MetricCollection) for _, metric := range metrics { + if metric.Description == nil { + c.logger.Warn("empty description", zap.String("topic", metric.Topic), zap.Float64("value", metric.Value)) + } m := prometheus.MustNewConstMetric( metric.Description, metric.ValueType, diff --git a/pkg/metrics/extractor.go b/pkg/metrics/extractor.go new file mode 100644 index 0000000..3dfdc73 --- /dev/null +++ b/pkg/metrics/extractor.go @@ -0,0 +1,50 @@ +package metrics + +import ( + "errors" + "fmt" + "github.com/hikhvar/mqtt2prometheus/pkg/config" + gojsonq "github.com/thedevsaddam/gojsonq/v2" +) + +type Extractor func(topic string, payload []byte, deviceID string) (MetricCollection, error) + +func NewJSONObjectExtractor(p Parser) Extractor { + return func(topic string, payload []byte, deviceID string) (MetricCollection, error) { + var mc MetricCollection + parsed := gojsonq.New().FromString(string(payload)) + + for path := range p.config() { + rawValue := parsed.Find(path) + parsed.Reset() + if rawValue == nil { + continue + } + m, err := p.parseMetric(path, deviceID, rawValue) + if err != nil { + return nil, fmt.Errorf("failed to parse valid metric value: %w", err) + } + m.Topic = topic + mc = append(mc, m) + } + return mc, nil + } +} + +func NewMetricPerTopicExtractor(p Parser, metricName *config.Regexp) Extractor { + return func(topic string, payload []byte, deviceID string) (MetricCollection, error) { + mName := metricName.GroupValue(topic, config.MetricNameRegexGroup) + if mName == "" { + return nil, fmt.Errorf("failed to find valid metric in topic path") + } + m, err := p.parseMetric(mName, deviceID, string(payload)) + if err != nil { + if errors.Is(err, metricNotConfigured) { + return nil, nil + } + return nil, fmt.Errorf("failed to parse metric: %w", err) + } + m.Topic = topic + return MetricCollection{m}, nil + } +} diff --git a/pkg/metrics/ingest.go b/pkg/metrics/ingest.go index 657015b..7b4e8cd 100644 --- a/pkg/metrics/ingest.go +++ b/pkg/metrics/ingest.go @@ -3,140 +3,50 @@ package metrics import ( "fmt" "go.uber.org/zap" - "strconv" - "time" - - gojsonq "github.com/thedevsaddam/gojsonq/v2" "github.com/eclipse/paho.mqtt.golang" "github.com/hikhvar/mqtt2prometheus/pkg/config" - "github.com/prometheus/client_golang/prometheus" ) type Ingest struct { - metricConfigs map[string][]config.MetricConfig + instrumentation + extractor Extractor deviceIDRegex *config.Regexp collector Collector - MessageMetric *prometheus.CounterVec logger *zap.Logger } -func NewIngest(collector Collector, metrics []config.MetricConfig, deviceIDRegex *config.Regexp) *Ingest { - cfgs := make(map[string][]config.MetricConfig) - for i := range metrics { - key := metrics[i].MQTTName - cfgs[key] = append(cfgs[key], metrics[i]) - } - return &Ingest{ - metricConfigs: cfgs, - deviceIDRegex: deviceIDRegex, - collector: collector, - MessageMetric: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "received_messages", - Help: "received messages per topic and status", - }, []string{"status", "topic"}, - ), - logger: config.ProcessContext.Logger(), - } -} +func NewIngest(collector Collector, extractor Extractor, deviceIDRegex *config.Regexp) *Ingest { -// validMetric returns config matching the metric and deviceID -// Second return value indicates if config was found. -func (i *Ingest) validMetric(metric string, deviceID string) (config.MetricConfig, bool) { - for _, c := range i.metricConfigs[metric] { - if c.SensorNameFilter.Match(deviceID) { - return c, true - } + return &Ingest{ + instrumentation: defaultInstrumentation, + extractor: extractor, + deviceIDRegex: deviceIDRegex, + collector: collector, + logger: config.ProcessContext.Logger(), } - return config.MetricConfig{}, false } func (i *Ingest) store(topic string, payload []byte) error { - var mc MetricCollection deviceID := i.deviceID(topic) - parsed := gojsonq.New().FromString(string(payload)) - - for path := range i.metricConfigs { - rawValue := parsed.Find(path) - parsed.Reset() - if rawValue == nil { - continue - } - m, err := i.parseMetric(path, deviceID, rawValue) - if err != nil { - return fmt.Errorf("failed to parse valid metric value: %w", err) - } - m.Topic = topic - mc = append(mc, m) + mc, err := i.extractor(topic, payload, deviceID) + if err != nil { + return fmt.Errorf("failed to extract metric values from topic: %w", err) } - i.collector.Observe(deviceID, mc) return nil } -func (i *Ingest) parseMetric(metricPath string, deviceID string, value interface{}) (Metric, error) { - cfg, cfgFound := i.validMetric(metricPath, deviceID) - if !cfgFound { - return Metric{}, nil - } - - var metricValue float64 - - if boolValue, ok := value.(bool); ok { - if boolValue { - metricValue = 1 - } else { - metricValue = 0 - } - } else if strValue, ok := value.(string); ok { - - // If string value mapping is defined, use that - if cfg.StringValueMapping != nil { - - floatValue, ok := cfg.StringValueMapping.Map[strValue] - if ok { - metricValue = floatValue - } else if cfg.StringValueMapping.ErrorValue != nil { - metricValue = *cfg.StringValueMapping.ErrorValue - } else { - return Metric{}, fmt.Errorf("got unexpected string data '%s'", strValue) - } - - } else { - - // otherwise try to parse float - floatValue, err := strconv.ParseFloat(strValue, 64) - if err != nil { - return Metric{}, fmt.Errorf("got data with unexpectd type: %T ('%s') and failed to parse to float", value, value) - } - metricValue = floatValue - - } - - } else if floatValue, ok := value.(float64); ok { - metricValue = floatValue - } else { - return Metric{}, fmt.Errorf("got data with unexpectd type: %T ('%s')", value, value) - } - return Metric{ - Description: cfg.PrometheusDescription(), - Value: metricValue, - ValueType: cfg.PrometheusValueType(), - IngestTime: time.Now(), - }, nil -} - func (i *Ingest) SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHandler { return func(c mqtt.Client, m mqtt.Message) { i.logger.Debug("Got message", zap.String("topic", m.Topic()), zap.String("payload", string(m.Payload()))) err := i.store(m.Topic(), m.Payload()) if err != nil { errChan <- fmt.Errorf("could not store metrics '%s' on topic %s: %s", string(m.Payload()), m.Topic(), err.Error()) - i.MessageMetric.WithLabelValues("storeError", m.Topic()).Inc() + i.CountStoreError(m.Topic()) return } - i.MessageMetric.WithLabelValues("success", m.Topic()).Inc() + i.CountSuccess(m.Topic()) } } diff --git a/pkg/metrics/instrumentation.go b/pkg/metrics/instrumentation.go new file mode 100644 index 0000000..0082bdc --- /dev/null +++ b/pkg/metrics/instrumentation.go @@ -0,0 +1,33 @@ +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +const ( + storeError = "storeError" + success = "success" +) + +var defaultInstrumentation = instrumentation{ + messageMetric: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "received_messages", + Help: "received messages per topic and status", + }, []string{"status", "topic"}, + ), +} + +type instrumentation struct { + messageMetric *prometheus.CounterVec +} + +func (i *instrumentation) MessageMetric() *prometheus.CounterVec { + return i.messageMetric +} + +func (i *instrumentation) CountSuccess(topic string) { + i.messageMetric.WithLabelValues(success, topic).Inc() +} + +func (i *instrumentation) CountStoreError(topic string) { + i.messageMetric.WithLabelValues(storeError, topic).Inc() +} diff --git a/pkg/metrics/parser.go b/pkg/metrics/parser.go new file mode 100644 index 0000000..c5bfec3 --- /dev/null +++ b/pkg/metrics/parser.go @@ -0,0 +1,96 @@ +package metrics + +import ( + "errors" + "fmt" + "github.com/hikhvar/mqtt2prometheus/pkg/config" + "strconv" + "time" +) + +type metricNotConfiguredError error + +var metricNotConfigured metricNotConfiguredError = errors.New("metric not configured failed to parse") + +type Parser struct { + metricConfigs map[string][]config.MetricConfig +} + +func NewParser(metrics []config.MetricConfig) Parser { + cfgs := make(map[string][]config.MetricConfig) + for i := range metrics { + key := metrics[i].MQTTName + cfgs[key] = append(cfgs[key], metrics[i]) + } + return Parser{ + metricConfigs: cfgs, + } +} + +// Config returns the underlying metrics config +func (p *Parser) config() map[string][]config.MetricConfig { + return p.metricConfigs +} + +// validMetric returns config matching the metric and deviceID +// Second return value indicates if config was found. +func (p *Parser) validMetric(metric string, deviceID string) (config.MetricConfig, bool) { + for _, c := range p.metricConfigs[metric] { + if c.SensorNameFilter.Match(deviceID) { + return c, true + } + } + return config.MetricConfig{}, false +} + +func (p *Parser) parseMetric(metricPath string, deviceID string, value interface{}) (Metric, error) { + cfg, cfgFound := p.validMetric(metricPath, deviceID) + if !cfgFound { + return Metric{}, metricNotConfigured + } + + var metricValue float64 + + if boolValue, ok := value.(bool); ok { + if boolValue { + metricValue = 1 + } else { + metricValue = 0 + } + } else if strValue, ok := value.(string); ok { + + // If string value mapping is defined, use that + if cfg.StringValueMapping != nil { + + floatValue, ok := cfg.StringValueMapping.Map[strValue] + if ok { + metricValue = floatValue + } else if cfg.StringValueMapping.ErrorValue != nil { + metricValue = *cfg.StringValueMapping.ErrorValue + } else { + return Metric{}, fmt.Errorf("got unexpected string data '%s'", strValue) + } + + } else { + + // otherwise try to parse float + floatValue, err := strconv.ParseFloat(strValue, 64) + if err != nil { + return Metric{}, fmt.Errorf("got data with unexpectd type: %T ('%s') and failed to parse to float", value, value) + } + metricValue = floatValue + + } + + } else if floatValue, ok := value.(float64); ok { + metricValue = floatValue + } else { + return Metric{}, fmt.Errorf("got data with unexpectd type: %T ('%s')", value, value) + } + return Metric{ + Description: cfg.PrometheusDescription(), + Value: metricValue, + ValueType: cfg.PrometheusValueType(), + IngestTime: time.Now(), + }, nil +}