Skip to content

Commit

Permalink
Refactor metric extraction from MQTT
Browse files Browse the repository at this point in the history
This commit allows to extract the metric name from the topic path. Now
it can be configured if all metrics are in a object in a certain topic
or if every topic contains exactly one metric. However, currently these
modes can not be mixed.

This should solve !26

TODO:
* Update documentation
* Add unit tests
  • Loading branch information
Christoph Petrausch committed Nov 8, 2020
1 parent b3442ec commit be4af9f
Show file tree
Hide file tree
Showing 13 changed files with 330 additions and 119 deletions.
32 changes: 28 additions & 4 deletions cmd/mqtt2prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ var (
)
)

type Ingest interface {
MessageMetric() *prometheus.CounterVec
SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHandler
}

func main() {
flag.Parse()
if *versionFlag {
Expand All @@ -77,9 +82,12 @@ func main() {
mqttClientOptions.SetPassword(cfg.MQTT.Password)
mqttClientOptions.SetClientID(mustMQTTClientID())

collector := metrics.NewCollector(cfg.Cache.Timeout, cfg.Metrics)
ingest := metrics.NewIngest(collector, cfg.Metrics, cfg.MQTT.DeviceIDRegex)

collector := metrics.NewCollector(cfg.Cache.Timeout, cfg.Metrics, logger)
extractor, err := setupExtractor(cfg)
if err != nil {
logger.Fatal("could not setup a metric extractor", zap.Error(err))
}
ingest := metrics.NewIngest(collector, extractor, cfg.MQTT.DeviceIDRegex)
errorChan := make(chan error, 1)

for {
Expand All @@ -97,7 +105,7 @@ func main() {
time.Sleep(10 * time.Second)
}

prometheus.MustRegister(ingest.MessageMetric)
prometheus.MustRegister(ingest.MessageMetric())
prometheus.MustRegister(collector)
http.Handle("/metrics", promhttp.Handler())
go func() {
Expand Down Expand Up @@ -163,3 +171,19 @@ func mustSetupLogger() *zap.Logger {
config.SetProcessContext(logger)
return logger
}

func setupExtractor(cfg config.Config) (metrics.Extractor, error) {
parser := metrics.NewParser(cfg.Metrics)
if cfg.MQTT.ObjectPerTopicConfig != nil {
switch cfg.MQTT.ObjectPerTopicConfig.Encoding {
case config.EncodingJSON:
return metrics.NewJSONObjectExtractor(parser), nil
default:
return nil, fmt.Errorf("unsupported object format: %s", cfg.MQTT.ObjectPerTopicConfig.Encoding)
}
}
if cfg.MQTT.MetricPerTopicConfig != nil {
return metrics.NewMetricPerTopicExtractor(parser, cfg.MQTT.MetricPerTopicConfig.MetricNameRegex), nil
}
return nil, fmt.Errorf("no extractor configured")
}
29 changes: 29 additions & 0 deletions hack/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Hack Scenarios

Required is a MQTT client. I use this: https://github.com/shirou/mqttcli

## Shelly (Metric Per Topic)
The scenario is the feature requested by issue https://github.com/hikhvar/mqtt2prometheus/issues/26.

To start the scenario run:
```bash
docker-compose --env-file shelly.env up
```

To publish a message run:
```bash
mqttcli pub --host localhost -p 1883 -t shellies/living-room/sensor/temperature '15'
```

## DHT22 (Object Per Topic)
The default scenario

To start the scenario run:
```bash
docker-compose --env-file dht22.env up
```

To publish a message run:
```bash
mqttcli pub --host localhost -p 1883 -t v1/devices/me/test -m '{"temperature":"12", "humidity":21}'
```
1 change: 1 addition & 0 deletions hack/dht.env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CONFIG=dht22.yaml
File renamed without changes.
6 changes: 5 additions & 1 deletion hack/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ services:
build:
context: ../
dockerfile: Dockerfile
command:
- /mqtt2prometheus
- -log-level
- debug
ports:
- 9641:9641
volumes:
- type: bind
source: ./mqtt2prometheus.yaml
source: ./${CONFIG:-dht22.yaml}
target: /config.yaml
mosquitto:
image: eclipse-mosquitto:1.6.9
Expand Down
1 change: 1 addition & 0 deletions hack/shelly.env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CONFIG=shelly.yaml
21 changes: 21 additions & 0 deletions hack/shelly.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
mqtt:
server: tcp://mosquitto:1883
topic_path: shellies/+/sensor/+
device_id_regex: "shellies/(?P<deviceid>.*)/sensor"
metric_per_topic_config:
metric_name_regex: "shellies/(?P<deviceid>.*)/sensor/(?P<metricname>.*)"
qos: 0
cache:
timeout: 24h
metrics:
- prom_name: temperature
# The name of the metric in a MQTT JSON message
mqtt_name: temperature
# The prometheus help text for this metric
help: shelly temperature reading
# The prometheus type for this metric. Valid values are: "gauge" and "counter"
type: gauge
# A map of string to string for constant labels. This labels will be attached to every prometheus metric
const_labels:
sensor_type: shelly
# The name of the metric in prometheus
51 changes: 42 additions & 9 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
"gopkg.in/yaml.v2"
)

const GaugeValueType = "gauge"
const CounterValueType = "counter"
const (
GaugeValueType = "gauge"
CounterValueType = "counter"

const DeviceIDRegexGroup = "deviceid"
DeviceIDRegexGroup = "deviceid"
MetricNameRegexGroup = "metricname"
)

var MQTTConfigDefaults = MQTTConfig{
Server: "tcp://127.0.0.1:1883",
Expand Down Expand Up @@ -87,12 +90,24 @@ type CacheConfig struct {
}

type MQTTConfig struct {
Server string `yaml:"server"`
TopicPath string `yaml:"topic_path"`
DeviceIDRegex *Regexp `yaml:"device_id_regex"`
User string `yaml:"user"`
Password string `yaml:"password"`
QoS byte `yaml:"qos"`
Server string `yaml:"server"`
TopicPath string `yaml:"topic_path"`
DeviceIDRegex *Regexp `yaml:"device_id_regex"`
User string `yaml:"user"`
Password string `yaml:"password"`
QoS byte `yaml:"qos"`
ObjectPerTopicConfig *ObjectPerTopicConfig `yaml:"object_per_topic_config"`
MetricPerTopicConfig *MetricPerTopicConfig `yaml:"metric_per_topic_config"`
}

const EncodingJSON = "JSON"

type ObjectPerTopicConfig struct {
Encoding string `yaml:"encoding"` // Currently only JSON is a valid value
}

type MetricPerTopicConfig struct {
MetricNameRegex *Regexp `yaml:"metric_name_regex"` // Default
}

// Metrics Config is a mapping between a metric send on mqtt to a prometheus metric
Expand Down Expand Up @@ -157,5 +172,23 @@ func LoadConfig(configFile string) (Config, error) {
if !validRegex {
return Config{}, fmt.Errorf("device id regex %q does not contain required regex group %q", cfg.MQTT.DeviceIDRegex.pattern, DeviceIDRegexGroup)
}
if cfg.MQTT.ObjectPerTopicConfig == nil && cfg.MQTT.MetricPerTopicConfig == nil {
cfg.MQTT.ObjectPerTopicConfig = &ObjectPerTopicConfig{
Encoding: EncodingJSON,
}
}

if cfg.MQTT.MetricPerTopicConfig != nil {
validRegex = false
for _, name := range cfg.MQTT.MetricPerTopicConfig.MetricNameRegex.RegEx().SubexpNames() {
if name == MetricNameRegexGroup {
validRegex = true
}
}
if !validRegex {
return Config{}, fmt.Errorf("metric name regex %q does not contain required regex group %q", cfg.MQTT.DeviceIDRegex.pattern, MetricNameRegexGroup)
}
}

return cfg, nil
}
11 changes: 10 additions & 1 deletion pkg/metrics/collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"go.uber.org/zap"
"time"

"github.com/hikhvar/mqtt2prometheus/pkg/config"
Expand All @@ -18,6 +19,7 @@ type Collector interface {
type MemoryCachedCollector struct {
cache *gocache.Cache
descriptions []*prometheus.Desc
logger *zap.Logger
}

type Metric struct {
Expand All @@ -30,18 +32,22 @@ type Metric struct {

type MetricCollection []Metric

func NewCollector(defaultTimeout time.Duration, possibleMetrics []config.MetricConfig) Collector {
func NewCollector(defaultTimeout time.Duration, possibleMetrics []config.MetricConfig, logger *zap.Logger) Collector {
var descs []*prometheus.Desc
for _, m := range possibleMetrics {
descs = append(descs, m.PrometheusDescription())
}
return &MemoryCachedCollector{
cache: gocache.New(defaultTimeout, defaultTimeout*10),
descriptions: descs,
logger: logger,
}
}

func (c *MemoryCachedCollector) Observe(deviceID string, collection MetricCollection) {
if len(collection) < 1 {
return
}
c.cache.Set(deviceID, collection, DefaultTimeout)
}

Expand All @@ -55,6 +61,9 @@ func (c *MemoryCachedCollector) Collect(mc chan<- prometheus.Metric) {
for device, metricsRaw := range c.cache.Items() {
metrics := metricsRaw.Object.(MetricCollection)
for _, metric := range metrics {
if metric.Description == nil {
c.logger.Warn("empty description", zap.String("topic", metric.Topic), zap.Float64("value", metric.Value))
}
m := prometheus.MustNewConstMetric(
metric.Description,
metric.ValueType,
Expand Down
50 changes: 50 additions & 0 deletions pkg/metrics/extractor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package metrics

import (
"errors"
"fmt"
"github.com/hikhvar/mqtt2prometheus/pkg/config"
gojsonq "github.com/thedevsaddam/gojsonq/v2"
)

type Extractor func(topic string, payload []byte, deviceID string) (MetricCollection, error)

func NewJSONObjectExtractor(p Parser) Extractor {
return func(topic string, payload []byte, deviceID string) (MetricCollection, error) {
var mc MetricCollection
parsed := gojsonq.New().FromString(string(payload))

for path := range p.config() {
rawValue := parsed.Find(path)
parsed.Reset()
if rawValue == nil {
continue
}
m, err := p.parseMetric(path, deviceID, rawValue)
if err != nil {
return nil, fmt.Errorf("failed to parse valid metric value: %w", err)
}
m.Topic = topic
mc = append(mc, m)
}
return mc, nil
}
}

func NewMetricPerTopicExtractor(p Parser, metricName *config.Regexp) Extractor {
return func(topic string, payload []byte, deviceID string) (MetricCollection, error) {
mName := metricName.GroupValue(topic, config.MetricNameRegexGroup)
if mName == "" {
return nil, fmt.Errorf("failed to find valid metric in topic path")
}
m, err := p.parseMetric(mName, deviceID, string(payload))
if err != nil {
if errors.Is(err, metricNotConfigured) {
return nil, nil
}
return nil, fmt.Errorf("failed to parse metric: %w", err)
}
m.Topic = topic
return MetricCollection{m}, nil
}
}
Loading

0 comments on commit be4af9f

Please sign in to comment.