diff --git a/cmd/salt-exporter/config.go b/cmd/salt-exporter/config.go index 6ff2da2..3d38f91 100644 --- a/cmd/salt-exporter/config.go +++ b/cmd/salt-exporter/config.go @@ -38,6 +38,7 @@ type Config struct { ListenAddress string `mapstructure:"listen-address"` ListenPort int `mapstructure:"listen-port"` IPCFile string `mapstructure:"ipc-file"` + PKIDir string `mapstructure:"pki-dir"` TLS struct { Enabled bool Key string @@ -86,6 +87,7 @@ func setDefaults(healthMinions bool) { viper.SetDefault("log-level", defaultLogLevel) viper.SetDefault("listen-port", defaultPort) viper.SetDefault("ipc-file", listener.DefaultIPCFilepath) + viper.SetDefault("pki-dir", listener.DefaultPKIDirpath) viper.SetDefault("metrics.health-minions", defaultHealthMinion) viper.SetDefault("metrics.salt_new_job_total.enabled", true) viper.SetDefault("metrics.salt_expected_responses_total.enabled", true) diff --git a/cmd/salt-exporter/config_test.go b/cmd/salt-exporter/config_test.go index b4f6d0c..202a3a1 100644 --- a/cmd/salt-exporter/config_test.go +++ b/cmd/salt-exporter/config_test.go @@ -28,6 +28,7 @@ func TestReadConfigFlagOnly(t *testing.T) { ListenAddress: "127.0.0.1", ListenPort: 8080, IPCFile: listener.DefaultIPCFilepath, + PKIDir: listener.DefaultPKIDirpath, TLS: struct { Enabled bool Key string @@ -119,6 +120,7 @@ func TestReadConfigFlagOnly(t *testing.T) { ListenAddress: "127.0.0.1", ListenPort: 8080, IPCFile: "/dev/null", + PKIDir: "/etc/salt/pki", TLS: struct { Enabled bool Key string @@ -228,6 +230,7 @@ func TestConfigFileOnly(t *testing.T) { ListenAddress: "127.0.0.1", ListenPort: 2113, IPCFile: "/dev/null", + PKIDir: "/tmp/pki", TLS: struct { Enabled bool Key string @@ -338,6 +341,7 @@ func TestConfigFileWithFlags(t *testing.T) { ListenAddress: "127.0.0.1", ListenPort: 8080, IPCFile: "/somewhere", + PKIDir: "/tmp/pki", TLS: struct { Enabled bool Key string diff --git a/cmd/salt-exporter/config_test.yml b/cmd/salt-exporter/config_test.yml index a6d62e8..01812b4 100644 --- a/cmd/salt-exporter/config_test.yml +++ b/cmd/salt-exporter/config_test.yml @@ -2,6 +2,7 @@ listen-address: "127.0.0.1" listen-port: 2113 ipc-file: /dev/null +pki-dir: /tmp/pki log-level: "info" tls: diff --git a/cmd/salt-exporter/main.go b/cmd/salt-exporter/main.go index d116db9..04a55d4 100644 --- a/cmd/salt-exporter/main.go +++ b/cmd/salt-exporter/main.go @@ -55,14 +55,23 @@ func start(config Config) { log.Info().Msg("listening for events...") eventChan := make(chan event.SaltEvent) + watchChan := make(chan event.WatchEvent) // listen and expose metric parser := parser.NewEventParser(false) eventListener := listener.NewEventListener(ctx, parser, eventChan) eventListener.SetIPCFilepath(config.IPCFile) + if config.Metrics.HealthMinions { + pkiWatcher, err := listener.NewPKIWatcher(ctx, config.PKIDir, watchChan) + if err != nil { + log.Fatal().Msgf("unable to watch PKI for minions change: %v", err) + } + + go pkiWatcher.StartWatching() + } go eventListener.ListenEvents() - go metrics.ExposeMetrics(ctx, eventChan, config.Metrics) + go metrics.ExposeMetrics(ctx, eventChan, watchChan, config.Metrics) // start http server log.Info().Msg("exposing metrics on " + listenSocket + "/metrics") diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 934150b..a2c1c90 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -4,6 +4,7 @@ import ( "context" "github.com/kpetremann/salt-exporter/pkg/event" + evt "github.com/kpetremann/salt-exporter/pkg/event" "github.com/rs/zerolog/log" ) @@ -14,7 +15,15 @@ func boolToFloat64(b bool) float64 { return 0.0 } -func eventToMetrics(event event.SaltEvent, r Registry) { +func eventToMetrics(event event.SaltEvent, r *Registry) { + if event.Module == evt.BeaconModule { + if event.Type != "status" { + return + } + r.UpdateLastHeartbeat(event.Data.Id) + return + } + switch event.Type { case "new": state := event.ExtractState() @@ -48,7 +57,7 @@ func eventToMetrics(event event.SaltEvent, r Registry) { } } -func ExposeMetrics(ctx context.Context, eventChan <-chan event.SaltEvent, config Config) { +func ExposeMetrics(ctx context.Context, eventChan <-chan event.SaltEvent, watchChan <-chan event.WatchEvent, config Config) { registry := NewRegistry(config) for { @@ -56,6 +65,13 @@ func ExposeMetrics(ctx context.Context, eventChan <-chan event.SaltEvent, config case <-ctx.Done(): log.Info().Msg("stopping event listener") return + case event := <-watchChan: + if event.Op == evt.Accepted { + registry.AddObservableMinion(event.MinionName) + } + if event.Op == evt.Removed { + registry.DeleteObservableMinion(event.MinionName) + } case event := <-eventChan: if config.Global.Filters.IgnoreTest && event.IsTest { return @@ -64,7 +80,7 @@ func ExposeMetrics(ctx context.Context, eventChan <-chan event.SaltEvent, config return } - eventToMetrics(event, registry) + eventToMetrics(event, ®istry) } } } diff --git a/internal/metrics/registry.go b/internal/metrics/registry.go index 3758335..6ab7dd6 100644 --- a/internal/metrics/registry.go +++ b/internal/metrics/registry.go @@ -2,6 +2,7 @@ package metrics import ( "strconv" + "time" "github.com/kpetremann/salt-exporter/internal/filters" "github.com/prometheus/client_golang/prometheus" @@ -11,6 +12,8 @@ import ( type Registry struct { config Config + observedMinions int32 + newJobTotal *prometheus.CounterVec expectedResponsesTotal *prometheus.CounterVec @@ -19,6 +22,9 @@ type Registry struct { responseTotal *prometheus.CounterVec functionStatus *prometheus.GaugeVec + + statusLastResponse *prometheus.GaugeVec + minionsTotal *prometheus.GaugeVec } func NewRegistry(config Config) Registry { @@ -35,6 +41,7 @@ func NewRegistry(config Config) Registry { return Registry{ config: config, + observedMinions: 0, newJobTotal: promauto.NewCounterVec( prometheus.CounterOpts{ Name: "salt_new_job_total", @@ -82,9 +89,39 @@ func NewRegistry(config Config) Registry { }, []string{"minion", "function", "state"}, ), + statusLastResponse: promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "salt_health_last_heartbeat", + Help: "Last status beacon received. Unix timestamp", + }, + []string{"minion"}, + ), + minionsTotal: promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "salt_health_minions_total", + Help: "Total number of observed minions via status beacon", + }, []string{}, + ), } } +func (r *Registry) UpdateLastHeartbeat(minion string) { + timestamp := time.Now().Unix() + r.statusLastResponse.WithLabelValues(minion).Set(float64(timestamp)) +} + +func (r *Registry) AddObservableMinion(minion string) { + r.observedMinions += 1 + r.UpdateLastHeartbeat(minion) + r.minionsTotal.WithLabelValues().Set(float64(r.observedMinions)) +} + +func (r *Registry) DeleteObservableMinion(minion string) { + r.statusLastResponse.DeleteLabelValues(minion) + r.observedMinions -= 1 + r.minionsTotal.WithLabelValues().Set(float64(r.observedMinions)) +} + func (r *Registry) IncreaseNewJobTotal(function, state string) { if r.config.SaltNewJobTotal.Enabled { r.newJobTotal.WithLabelValues(function, state).Inc() diff --git a/pkg/event/event.go b/pkg/event/event.go index f17b531..cc8561a 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -3,11 +3,33 @@ package event import ( "encoding/json" "errors" + "strings" "github.com/vmihailenco/msgpack/v5" "gopkg.in/yaml.v3" ) +type EventModule int + +type WatchOp uint32 + +const ( + UnknownModule EventModule = iota + RunnerModule + JobModule + BeaconModule +) + +const ( + Accepted WatchOp = iota + Removed +) + +type WatchEvent struct { + MinionName string + Op WatchOp +} + type EventData struct { Arg []interface{} `msgpack:"arg"` Cmd string `msgpack:"cmd"` @@ -32,6 +54,7 @@ type EventData struct { type SaltEvent struct { Tag string Type string + Module EventModule TargetNumber int Data EventData IsScheduleJob bool @@ -74,6 +97,23 @@ func (e SaltEvent) RawToYAML() ([]byte, error) { return yaml.Marshal(data) } +func GetEventModule(tag string) EventModule { + tagParts := strings.Split(tag, "/") + if len(tagParts) < 2 { + return UnknownModule + } + switch tagParts[1] { + case "run": + return RunnerModule + case "job": + return JobModule + case "beacon": + return BeaconModule + default: + return UnknownModule + } +} + // extractStateFromArgs extracts embedded state info func extractStateFromArgs(args interface{}, key string) string { // args only diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go index aa712e6..aaf8d93 100644 --- a/pkg/event/event_test.go +++ b/pkg/event/event_test.go @@ -10,6 +10,7 @@ func getNewStateEvent() event.SaltEvent { return event.SaltEvent{ Tag: "salt/job/20220630000f000000000/new", Type: "new", + Module: event.JobModule, TargetNumber: 1, Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", diff --git a/pkg/listener/pkiwatcher.go b/pkg/listener/pkiwatcher.go new file mode 100644 index 0000000..23f0a35 --- /dev/null +++ b/pkg/listener/pkiwatcher.go @@ -0,0 +1,128 @@ +package listener + +import ( + "context" + "os" + "path" + "sync" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/kpetremann/salt-exporter/pkg/event" + "github.com/rs/zerolog/log" +) + +const DefaultPKIDirpath = "/etc/salt/pki" + +type PKIWatcher struct { + ctx context.Context + + pkiDirPath string + + watcher *fsnotify.Watcher + + eventChan chan<- event.WatchEvent + + lock sync.RWMutex +} + +func NewPKIWatcher(ctx context.Context, pkiDirPath string, eventChan chan event.WatchEvent) (*PKIWatcher, error) { + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + w := &PKIWatcher{ + ctx: ctx, + pkiDirPath: pkiDirPath, + watcher: watcher, + eventChan: eventChan, + lock: sync.RWMutex{}, + } + + return w, nil +} + +// SetPKIDirectory sets the filepath to the salt-master pki directory +// +// The directory must be readable by the user running the exporter (usually salt). +// +// Default: /etc/salt/pki +func (w *PKIWatcher) SetPKIDirectory(filepath string) { + w.pkiDirPath = filepath +} + +func (w *PKIWatcher) open() { + + for { + select { + case <-w.ctx.Done(): + return + default: + } + + minionsDir := path.Join(w.pkiDirPath, "master/minions") + + log.Info().Msg("loading currently accepted minions") + entries, err := os.ReadDir(minionsDir) + if err != nil { + log.Error().Str("error", err.Error()).Msg("failed to list PKI directory") + time.Sleep(5 * time.Second) + } else { + for _, e := range entries { + if !e.IsDir() { + w.eventChan <- event.WatchEvent{ + MinionName: e.Name(), + Op: event.Accepted, + } + log.Info().Msgf("minion %s loaded", e.Name()) + } + } + + // Add a path. + err = w.watcher.Add(minionsDir) + if err != nil { + log.Error().Str("error", err.Error()).Msg("failed to watch PKI directory") + time.Sleep(time.Second * 5) + } else { + return + } + } + } +} + +func (w *PKIWatcher) StartWatching() { + w.open() + + for { + select { + case <-w.ctx.Done(): + w.Stop() + return + case evt := <-w.watcher.Events: + minionName := path.Base(evt.Name) + if evt.Op == fsnotify.Create { + w.eventChan <- event.WatchEvent{ + MinionName: minionName, + Op: event.Accepted, + } + log.Info().Msgf("minion %s accepted by master", minionName) + } + if evt.Op == fsnotify.Remove { + w.eventChan <- event.WatchEvent{ + MinionName: minionName, + Op: event.Removed, + } + log.Info().Msgf("minion %s removed from master", minionName) + } + case err := <-w.watcher.Errors: + log.Error().Str("error", err.Error()).Msg("fail processing watch event") + } + } +} + +func (w *PKIWatcher) Stop() { + log.Info().Msg("stop listening for PKI changes") + w.watcher.Close() +} diff --git a/pkg/parser/fake_beacon_data_test.go b/pkg/parser/fake_beacon_data_test.go new file mode 100644 index 0000000..ee8a2e2 --- /dev/null +++ b/pkg/parser/fake_beacon_data_test.go @@ -0,0 +1,57 @@ +package parser_test + +import ( + "log" + + "github.com/kpetremann/salt-exporter/pkg/event" + "github.com/vmihailenco/msgpack/v5" +) + +/* +Fake new beacon message of type /status + + salt/beacon/host1.example.com/status/2023-10-09T11:36:02.182345 { + { + "id": "host1.example.com", + "data": { + "loadavg": { + "1-min": 0.35, + "5-min": 0.48, + "15-min": 0.26 + } + }, + "_stamp": "2023-10-09T11:36:02.205686" + } + } +*/ +var expectedBeacon = event.SaltEvent{ + Tag: "salt/beacon/host1.example.com/status/2023-10-09T11:36:02.182345", + Type: "status", + Module: event.BeaconModule, + TargetNumber: 0, + Data: event.EventData{ + Timestamp: "2023-10-09T11:36:02.205686", + Id: "host1.example.com", + Minions: []string{}, + }, + IsScheduleJob: false, +} + +func fakeBeaconEvent() []byte { + // Marshal the data using MsgPack + fake := FakeData{ + Timestamp: "2023-10-09T11:36:02.205686", + Minions: []string{}, + Id: "host1.example.com", + } + + fakeBody, err := msgpack.Marshal(fake) + if err != nil { + log.Fatalln(err) + } + + fakeMessage := []byte("salt/beacon/host1.example.com/status/2023-10-09T11:36:02.182345\n\n") + fakeMessage = append(fakeMessage, fakeBody...) + + return fakeMessage +} diff --git a/pkg/parser/fake_exec_data_test.go b/pkg/parser/fake_exec_data_test.go index 14bae10..f1b8ebd 100644 --- a/pkg/parser/fake_exec_data_test.go +++ b/pkg/parser/fake_exec_data_test.go @@ -28,6 +28,7 @@ import ( var expectedNewJob = event.SaltEvent{ Tag: "salt/job/20220630000000000000/new", Type: "new", + Module: event.JobModule, TargetNumber: 1, Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", @@ -84,6 +85,7 @@ func fakeNewJobEvent() []byte { var expectedReturnJob = event.SaltEvent{ Tag: "salt/job/20220630000000000000/ret/localhost", Type: "ret", + Module: event.JobModule, TargetNumber: 0, Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", @@ -144,6 +146,7 @@ func fakeRetJobEvent() []byte { var expectedNewScheduleJob = event.SaltEvent{ Tag: "salt/job/20220630000000000000/new", Type: "new", + Module: event.JobModule, TargetNumber: 1, Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", @@ -206,6 +209,7 @@ func fakeNewScheduleJobEvent() []byte { var expectedAckScheduleJob = event.SaltEvent{ Tag: "salt/job/20220630000000000000/ret/localhost", Type: "ret", + Module: event.JobModule, TargetNumber: 0, Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", @@ -294,6 +298,7 @@ func fakeAckScheduleJobEvent() []byte { var expectedScheduleJobReturn = event.SaltEvent{ Tag: "salt/job/20220630000000000000/ret/localhost", Type: "ret", + Module: event.JobModule, TargetNumber: 0, Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", diff --git a/pkg/parser/fake_state_data_test.go b/pkg/parser/fake_state_data_test.go index 3f51dac..cb3d51d 100644 --- a/pkg/parser/fake_state_data_test.go +++ b/pkg/parser/fake_state_data_test.go @@ -33,6 +33,7 @@ var True = true var expectedNewStateSlsJob = event.SaltEvent{ Tag: "salt/job/20220630000000000000/new", Type: "new", + Module: event.JobModule, TargetNumber: 1, Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", @@ -110,6 +111,7 @@ func fakeNewStateSlsJobEvent() []byte { var expectedStateSlsReturn = event.SaltEvent{ Tag: "salt/job/20220630000000000000/ret/node1", Type: "ret", + Module: event.JobModule, TargetNumber: 0, Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", @@ -208,6 +210,7 @@ func fakeStateSlsReturnEvent() []byte { var expectedNewStateSingle = event.SaltEvent{ Tag: "salt/job/20220630000000000000/new", Type: "new", + Module: event.JobModule, TargetNumber: 1, Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", @@ -301,6 +304,7 @@ func fakeNewStateSingleEvent() []byte { var expectedStateSingleReturn = event.SaltEvent{ Tag: "salt/job/20220630000000000000/ret/node1", Type: "ret", + Module: event.JobModule, TargetNumber: 0, Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", @@ -408,6 +412,7 @@ func fakeStateSingleReturnEvent() []byte { var expectedNewTestMockStateSlsJob = event.SaltEvent{ Tag: "salt/job/20220630000000000000/new", Type: "new", + Module: event.JobModule, TargetNumber: 1, Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", @@ -517,6 +522,7 @@ func fakeNewTestMockStateSlsJobEvent() []byte { var expectedTestMockStateSlsReturn = event.SaltEvent{ Tag: "salt/job/20220630000000000000/ret/node1", Type: "ret", + Module: event.JobModule, TargetNumber: 0, Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", diff --git a/pkg/parser/parser.go b/pkg/parser/parser.go index a69992d..9395b24 100644 --- a/pkg/parser/parser.go +++ b/pkg/parser/parser.go @@ -99,17 +99,29 @@ func (e Event) Parse(message map[string]interface{}) (event.SaltEvent, error) { lines := strings.SplitN(body, "\n\n", 2) tag := lines[0] - if !(strings.HasPrefix(tag, "salt/job") || strings.HasPrefix(tag, "salt/run")) { + if !(strings.HasPrefix(tag, "salt/")) { return event.SaltEvent{}, errors.New("tag not supported") } log.Debug().Str("tag", tag).Msg("new event") + parts := strings.Split(tag, "/") + + if len(parts) < 3 { + return event.SaltEvent{}, errors.New("tag not supported") + } + + event_module := event.GetEventModule(tag) + + if event_module == event.UnknownModule { + return event.SaltEvent{}, errors.New("tag not supported. Module unknown") + } + // Extract job type from the tag job_type := strings.Split(tag, "/")[3] // Parse message body byteResult := []byte(lines[1]) - ev := event.SaltEvent{Tag: tag, Type: job_type} + ev := event.SaltEvent{Tag: tag, Type: job_type, Module: event_module} if e.KeepRawBody { ev.RawBody = byteResult diff --git a/pkg/parser/parser_test.go b/pkg/parser/parser_test.go index 5dbb770..844586d 100644 --- a/pkg/parser/parser_test.go +++ b/pkg/parser/parser_test.go @@ -69,6 +69,11 @@ func TestParseEvent(t *testing.T) { args: fakeEventAsMap(fakeTestMockStateSlsReturnEvent()), want: expectedTestMockStateSlsReturn, }, + { + name: "beacon", + args: fakeEventAsMap(fakeBeaconEvent()), + want: expectedBeacon, + }, } p := parser.NewEventParser(false)