From e6fe63b75709b354a3f32e532c08d0e89154d124 Mon Sep 17 00:00:00 2001 From: Kevin Petremann Date: Sun, 18 Jun 2023 11:25:36 +0200 Subject: [PATCH 1/2] refacto: remove channel dependency for ParseEvent - ease the tests - better role separation: only the listener is routing events --- pkg/events/events.go | 9 ++++----- pkg/events/events_test.go | 21 +++++++++++++-------- pkg/events/listener.go | 4 +++- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/pkg/events/events.go b/pkg/events/events.go index 3de41ba..1848217 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -118,15 +118,14 @@ func (e *SaltEvent) ExtractState() string { // ParseEvent parses a salt event // -// Once parsed, the message is sent to the eventChan channel. // KeepRawBody is used to keep the raw body of the event. -func ParseEvent(message map[string]interface{}, eventChan chan<- SaltEvent, keepRawBody bool) { +func ParseEvent(message map[string]interface{}, keepRawBody bool) (SaltEvent, error) { body := string(message["body"].([]byte)) lines := strings.SplitN(body, "\n\n", 2) tag := lines[0] if !(strings.HasPrefix(tag, "salt/job") || strings.HasPrefix(tag, "salt/run")) { - return + return SaltEvent{}, errors.New("tag not supported") } log.Debug().Str("tag", tag).Msg("new event") @@ -143,7 +142,7 @@ func ParseEvent(message map[string]interface{}, eventChan chan<- SaltEvent, keep if err := msgpack.Unmarshal(byteResult, &event.Data); err != nil { log.Warn().Str("error", err.Error()).Str("tag", tag).Msg("decoding_failure") - return + return SaltEvent{}, err } event.TargetNumber = len(event.Data.Minions) @@ -154,5 +153,5 @@ func ParseEvent(message map[string]interface{}, eventChan chan<- SaltEvent, keep event.Data.Id = "master" } - eventChan <- event + return event, nil } diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go index e14df08..46267bb 100644 --- a/pkg/events/events_test.go +++ b/pkg/events/events_test.go @@ -2,14 +2,12 @@ package events_test import ( "testing" - "time" "github.com/google/go-cmp/cmp" "github.com/kpetremann/salt-exporter/pkg/events" ) func TestParseEvent(t *testing.T) { - eventChan := make(chan events.SaltEvent) tests := []struct { name string args map[string]interface{} @@ -60,15 +58,22 @@ func TestParseEvent(t *testing.T) { args: fakeEventAsMap(fakeStateSingleReturnEvent()), want: expectedStateSingleReturn, }, + { + name: "dry-run", + args: fakeEventAsMap(fakeStateSingleReturnEvent()), + want: expectedStateSingleReturn, + }, + { + name: "mock", + args: fakeEventAsMap(fakeStateSingleReturnEvent()), + want: expectedStateSingleReturn, + }, } for _, test := range tests { - var parsed events.SaltEvent - go events.ParseEvent(test.args, eventChan, false) - - select { - case parsed = <-eventChan: - case <-time.After(1 * time.Millisecond): + parsed, err := events.ParseEvent(test.args, false) + if err != nil { + t.Errorf("Unexpected error %s", err.Error()) } if diff := cmp.Diff(parsed, test.want); diff != "" { diff --git a/pkg/events/listener.go b/pkg/events/listener.go index 492421a..dcc72f6 100644 --- a/pkg/events/listener.go +++ b/pkg/events/listener.go @@ -115,7 +115,9 @@ func (e *EventListener) ListenEvents(keepRawBody bool) { continue } - ParseEvent(message, e.eventChan, keepRawBody) + if event, err := ParseEvent(message, keepRawBody); err == nil { + e.eventChan <- event + } } } } From e232a1e082ce6361214f809ca038acf8326bfc13 Mon Sep 17 00:00:00 2001 From: Kevin Petremann Date: Sun, 18 Jun 2023 14:25:07 +0200 Subject: [PATCH 2/2] refacto!: decouple listener and parser --- cmd/salt-exporter/main.go | 9 +- cmd/salt-live/main.go | 11 +- internal/metrics/metrics.go | 2 +- internal/tui/item.go | 4 +- internal/tui/tui.go | 6 +- pkg/{events/events.go => event/event.go} | 44 +---- pkg/event/event_test.go | 99 +++++++++++ pkg/events/events_test.go | 155 ------------------ pkg/{events => listener}/listener.go | 27 ++- pkg/{events => parser}/fake_data_test.go | 2 +- pkg/{events => parser}/fake_exec_data_test.go | 24 +-- .../fake_state_data_test.go | 40 ++--- pkg/parser/parser.go | 58 +++++++ pkg/parser/parser_test.go | 75 +++++++++ 14 files changed, 293 insertions(+), 263 deletions(-) rename pkg/{events/events.go => event/event.go} (69%) create mode 100644 pkg/event/event_test.go delete mode 100644 pkg/events/events_test.go rename pkg/{events => listener}/listener.go (81%) rename pkg/{events => parser}/fake_data_test.go (98%) rename pkg/{events => parser}/fake_exec_data_test.go (95%) rename pkg/{events => parser}/fake_state_data_test.go (89%) create mode 100644 pkg/parser/parser.go create mode 100644 pkg/parser/parser_test.go diff --git a/cmd/salt-exporter/main.go b/cmd/salt-exporter/main.go index 56dc47d..81a7ee5 100644 --- a/cmd/salt-exporter/main.go +++ b/cmd/salt-exporter/main.go @@ -12,7 +12,9 @@ import ( "github.com/kpetremann/salt-exporter/internal/logging" "github.com/kpetremann/salt-exporter/internal/metrics" - "github.com/kpetremann/salt-exporter/pkg/events" + events "github.com/kpetremann/salt-exporter/pkg/event" + "github.com/kpetremann/salt-exporter/pkg/listener" + "github.com/kpetremann/salt-exporter/pkg/parser" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog/log" ) @@ -84,9 +86,10 @@ func main() { eventChan := make(chan events.SaltEvent) // listen and expose metric - eventListener := events.NewEventListener(ctx, eventChan) + parser := parser.NewEventParser(false) + eventListener := listener.NewEventListener(ctx, parser, eventChan) - go eventListener.ListenEvents(false) + go eventListener.ListenEvents() go metrics.ExposeMetrics(ctx, eventChan, metricsConfig) // start http server diff --git a/cmd/salt-live/main.go b/cmd/salt-live/main.go index 422013a..bedada8 100644 --- a/cmd/salt-live/main.go +++ b/cmd/salt-live/main.go @@ -9,7 +9,9 @@ import ( "syscall" "github.com/kpetremann/salt-exporter/internal/tui" - "github.com/kpetremann/salt-exporter/pkg/events" + "github.com/kpetremann/salt-exporter/pkg/event" + "github.com/kpetremann/salt-exporter/pkg/listener" + "github.com/kpetremann/salt-exporter/pkg/parser" "github.com/rs/zerolog/log" tea "github.com/charmbracelet/bubbletea" @@ -47,9 +49,10 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() - eventChan := make(chan events.SaltEvent, *bufferSize) - eventListener := events.NewEventListener(ctx, eventChan) - go eventListener.ListenEvents(true) + eventChan := make(chan event.SaltEvent, *bufferSize) + parser := parser.NewEventParser(true) + eventListener := listener.NewEventListener(ctx, parser, eventChan) + go eventListener.ListenEvents() p := tea.NewProgram(tui.NewModel(eventChan, *maxItems, *filter), tea.WithMouseCellMotion()) if _, err := p.Run(); err != nil { diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 1d05e61..672bf8a 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -6,7 +6,7 @@ import ( "time" "github.com/kpetremann/salt-exporter/internal/filters" - "github.com/kpetremann/salt-exporter/pkg/events" + events "github.com/kpetremann/salt-exporter/pkg/event" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/rs/zerolog/log" diff --git a/internal/tui/item.go b/internal/tui/item.go index 812d83e..721da01 100644 --- a/internal/tui/item.go +++ b/internal/tui/item.go @@ -3,13 +3,13 @@ package tui import ( "fmt" - "github.com/kpetremann/salt-exporter/pkg/events" + "github.com/kpetremann/salt-exporter/pkg/event" ) type item struct { title string description string - event events.SaltEvent + event event.SaltEvent datetime string sender string state string diff --git a/internal/tui/tui.go b/internal/tui/tui.go index c19e30a..2c4c469 100644 --- a/internal/tui/tui.go +++ b/internal/tui/tui.go @@ -10,7 +10,7 @@ import ( teaViewport "github.com/charmbracelet/bubbles/viewport" tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/lipgloss" - "github.com/kpetremann/salt-exporter/pkg/events" + "github.com/kpetremann/salt-exporter/pkg/event" ) const theme = "solarized-dark" @@ -28,7 +28,7 @@ type model struct { eventList teaList.Model itemsBuffer []teaList.Item rawView teaViewport.Model - eventChan <-chan events.SaltEvent + eventChan <-chan event.SaltEvent hardFilter string keys *keyMap sideInfos string @@ -40,7 +40,7 @@ type model struct { wordWrap bool } -func NewModel(eventChan <-chan events.SaltEvent, maxItems int, filter string) model { +func NewModel(eventChan <-chan event.SaltEvent, maxItems int, filter string) model { var listKeys = defaultKeyMap() list := teaList.NewDefaultDelegate() diff --git a/pkg/events/events.go b/pkg/event/event.go similarity index 69% rename from pkg/events/events.go rename to pkg/event/event.go index 1848217..8e956af 100644 --- a/pkg/events/events.go +++ b/pkg/event/event.go @@ -1,11 +1,9 @@ -package events +package event import ( "encoding/json" "errors" - "strings" - "github.com/rs/zerolog/log" "github.com/vmihailenco/msgpack/v5" "gopkg.in/yaml.v3" ) @@ -115,43 +113,3 @@ func (e *SaltEvent) ExtractState() string { } return "" } - -// ParseEvent parses a salt event -// -// KeepRawBody is used to keep the raw body of the event. -func ParseEvent(message map[string]interface{}, keepRawBody bool) (SaltEvent, error) { - body := string(message["body"].([]byte)) - lines := strings.SplitN(body, "\n\n", 2) - - tag := lines[0] - if !(strings.HasPrefix(tag, "salt/job") || strings.HasPrefix(tag, "salt/run")) { - return SaltEvent{}, errors.New("tag not supported") - } - log.Debug().Str("tag", tag).Msg("new event") - - // Extract job type from the tag - job_type := strings.Split(tag, "/")[3] - - // Parse message body - byteResult := []byte(lines[1]) - event := SaltEvent{Tag: tag, Type: job_type} - - if keepRawBody { - event.RawBody = byteResult - } - - if err := msgpack.Unmarshal(byteResult, &event.Data); err != nil { - log.Warn().Str("error", err.Error()).Str("tag", tag).Msg("decoding_failure") - return SaltEvent{}, err - } - - event.TargetNumber = len(event.Data.Minions) - event.IsScheduleJob = event.Data.Schedule != "" - - // A runner are executed on the master but they do not provide their ID in the event - if strings.HasPrefix(tag, "salt/run") && event.Data.Id == "" { - event.Data.Id = "master" - } - - return event, nil -} diff --git a/pkg/event/event_test.go b/pkg/event/event_test.go new file mode 100644 index 0000000..aa712e6 --- /dev/null +++ b/pkg/event/event_test.go @@ -0,0 +1,99 @@ +package event_test + +import ( + "testing" + + "github.com/kpetremann/salt-exporter/pkg/event" +) + +func getNewStateEvent() event.SaltEvent { + return event.SaltEvent{ + Tag: "salt/job/20220630000f000000000/new", + Type: "new", + TargetNumber: 1, + Data: event.EventData{ + Timestamp: "2022-06-30T00:00:00.000000", + Fun: "state.sls", + Arg: []interface{}{"test"}, + Jid: "20220630000000000000", + Minions: []string{"node1"}, + Missing: []string{}, + Tgt: "node1", + TgtType: "glob", + User: "salt_user", + }, + IsScheduleJob: false, + } +} + +func TestExtractState(t *testing.T) { + stateSls := getNewStateEvent() + + stateSlsFunArg := getNewStateEvent() + stateSlsFunArg.Data.Arg = nil + stateSlsFunArg.Data.FunArgs = []interface{}{"test", map[string]bool{"dry_run": true}} + + stateSlsFunArgMap := getNewStateEvent() + stateSlsFunArgMap.Data.Arg = nil + stateSlsFunArgMap.Data.FunArgs = []interface{}{map[string]interface{}{"mods": "test", "dry_run": true}} + + stateApplyArg := getNewStateEvent() + stateApplyArg.Data.Fun = "state.apply" + + stateApplyHighstate := getNewStateEvent() + stateApplyHighstate.Data.Fun = "state.apply" + stateApplyHighstate.Data.Arg = nil + + stateHighstate := getNewStateEvent() + stateHighstate.Data.Fun = "state.highstate" + stateHighstate.Data.Arg = nil + + tests := []struct { + name string + event event.SaltEvent + want string + }{ + { + name: "state via state.sls", + event: stateSls, + want: "test", + }, + { + name: "state via state.sls args + kwargs", + event: stateSlsFunArg, + want: "test", + }, + { + name: "state via state.sls kwargs only", + event: stateSlsFunArgMap, + want: "test", + }, + { + name: "state via state.apply args only", + event: stateApplyArg, + want: "test", + }, + { + name: "state via state.apply", + event: stateApplyArg, + want: "test", + }, + { + name: "highstate via state.apply", + event: stateApplyHighstate, + want: "highstate", + }, + { + name: "state.highstate", + event: stateHighstate, + want: "highstate", + }, + } + + for _, test := range tests { + if res := test.event.ExtractState(); res != test.want { + t.Errorf("Mismatch for '%s', wants '%s' got '%s' ", test.name, test.want, res) + } + } + +} diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go deleted file mode 100644 index 46267bb..0000000 --- a/pkg/events/events_test.go +++ /dev/null @@ -1,155 +0,0 @@ -package events_test - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/kpetremann/salt-exporter/pkg/events" -) - -func TestParseEvent(t *testing.T) { - tests := []struct { - name string - args map[string]interface{} - want events.SaltEvent - }{ - { - name: "new job", - args: fakeEventAsMap(fakeNewJobEvent()), - want: expectedNewJob, - }, - { - name: "return job", - args: fakeEventAsMap(fakeRetJobEvent()), - want: expectedReturnJob, - }, - { - name: "new schedule job", - args: fakeEventAsMap(fakeNewScheduleJobEvent()), - want: expectedNewScheduleJob, - }, - { - name: "return ack schedule job", - args: fakeEventAsMap(fakeAckScheduleJobEvent()), - want: expectedAckScheduleJob, - }, - { - name: "return schedule job", - args: fakeEventAsMap(fakeScheduleJobReturnEvent()), - want: expectedScheduleJobReturn, - }, - { - name: "new state.sls", - args: fakeEventAsMap(fakeNewStateSlsJobEvent()), - want: expectedNewStateSlsJob, - }, - { - name: "return state.sls", - args: fakeEventAsMap(fakeStateSlsReturnEvent()), - want: expectedStateSlsReturn, - }, - { - name: "new state.single", - args: fakeEventAsMap(fakeNewStateSingleEvent()), - want: expectedNewStateSingle, - }, - { - name: "return state.single", - args: fakeEventAsMap(fakeStateSingleReturnEvent()), - want: expectedStateSingleReturn, - }, - { - name: "dry-run", - args: fakeEventAsMap(fakeStateSingleReturnEvent()), - want: expectedStateSingleReturn, - }, - { - name: "mock", - args: fakeEventAsMap(fakeStateSingleReturnEvent()), - want: expectedStateSingleReturn, - }, - } - - for _, test := range tests { - parsed, err := events.ParseEvent(test.args, false) - if err != nil { - t.Errorf("Unexpected error %s", err.Error()) - } - - if diff := cmp.Diff(parsed, test.want); diff != "" { - t.Errorf("Mismatch for '%s' test:\n%s", test.name, diff) - } - } -} - -func TestExtractState(t *testing.T) { - stateSls := getNewStateEvent() - - stateSlsFunArg := getNewStateEvent() - stateSlsFunArg.Data.Arg = nil - stateSlsFunArg.Data.FunArgs = []interface{}{"test", map[string]bool{"dry_run": true}} - - stateSlsFunArgMap := getNewStateEvent() - stateSlsFunArgMap.Data.Arg = nil - stateSlsFunArgMap.Data.FunArgs = []interface{}{map[string]interface{}{"mods": "test", "dry_run": true}} - - stateApplyArg := getNewStateEvent() - stateApplyArg.Data.Fun = "state.apply" - - stateApplyHighstate := getNewStateEvent() - stateApplyHighstate.Data.Fun = "state.apply" - stateApplyHighstate.Data.Arg = nil - - stateHighstate := getNewStateEvent() - stateHighstate.Data.Fun = "state.highstate" - stateHighstate.Data.Arg = nil - - tests := []struct { - name string - event events.SaltEvent - want string - }{ - { - name: "state via state.sls", - event: stateSls, - want: "test", - }, - { - name: "state via state.sls args + kwargs", - event: stateSlsFunArg, - want: "test", - }, - { - name: "state via state.sls kwargs only", - event: stateSlsFunArgMap, - want: "test", - }, - { - name: "state via state.apply args only", - event: stateApplyArg, - want: "test", - }, - { - name: "state via state.apply", - event: stateApplyArg, - want: "test", - }, - { - name: "highstate via state.apply", - event: stateApplyHighstate, - want: "highstate", - }, - { - name: "state.highstate", - event: stateHighstate, - want: "highstate", - }, - } - - for _, test := range tests { - if res := test.event.ExtractState(); res != test.want { - t.Errorf("Mismatch for '%s', wants '%s' got '%s' ", test.name, test.want, res) - } - } - -} diff --git a/pkg/events/listener.go b/pkg/listener/listener.go similarity index 81% rename from pkg/events/listener.go rename to pkg/listener/listener.go index dcc72f6..72a5cb3 100644 --- a/pkg/events/listener.go +++ b/pkg/listener/listener.go @@ -1,4 +1,4 @@ -package events +package listener import ( "context" @@ -6,10 +6,15 @@ import ( "net" "time" + events "github.com/kpetremann/salt-exporter/pkg/event" "github.com/rs/zerolog/log" "github.com/vmihailenco/msgpack/v5" ) +type eventParser interface { + Parse(message map[string]interface{}) (events.SaltEvent, error) +} + const defaultIPCFilepath = "/var/run/salt/master/master_event_pub.ipc" // EventListener listens to the salt-master event bus and sends events to the event channel @@ -18,7 +23,7 @@ type EventListener struct { ctx context.Context // eventChan is the channel to send events to - eventChan chan SaltEvent + eventChan chan events.SaltEvent // iPCFilepath is filepath to the salt-master event bus iPCFilepath string @@ -28,6 +33,8 @@ type EventListener struct { // decoder is msgpack decoder for parsing the event bus messages decoder *msgpack.Decoder + + eventParser eventParser } // Open opens the salt-master event bus @@ -78,8 +85,13 @@ func (e *EventListener) Reconnect() { // NewEventListener creates a new EventListener // // The events will be sent to eventChan. -func NewEventListener(ctx context.Context, eventChan chan SaltEvent) *EventListener { - e := EventListener{ctx: ctx, eventChan: eventChan, iPCFilepath: defaultIPCFilepath} +func NewEventListener(ctx context.Context, eventParser eventParser, eventChan chan events.SaltEvent) *EventListener { + e := EventListener{ + ctx: ctx, + eventChan: eventChan, + eventParser: eventParser, + iPCFilepath: defaultIPCFilepath, + } return &e } @@ -93,10 +105,7 @@ func (e *EventListener) SetIPCFilepath(filepath string) { } // ListenEvents listens to the salt-master event bus and sends events to the event channel -// -// if keepRawBody is true, the raw event body will be kept in the event struct. -// It can be useful for debugging or post-processing. -func (e *EventListener) ListenEvents(keepRawBody bool) { +func (e *EventListener) ListenEvents() { e.Open() for { @@ -115,7 +124,7 @@ func (e *EventListener) ListenEvents(keepRawBody bool) { continue } - if event, err := ParseEvent(message, keepRawBody); err == nil { + if event, err := e.eventParser.Parse(message); err == nil { e.eventChan <- event } } diff --git a/pkg/events/fake_data_test.go b/pkg/parser/fake_data_test.go similarity index 98% rename from pkg/events/fake_data_test.go rename to pkg/parser/fake_data_test.go index 1b13d83..93a0880 100644 --- a/pkg/events/fake_data_test.go +++ b/pkg/parser/fake_data_test.go @@ -1,4 +1,4 @@ -package events_test +package parser_test import ( "log" diff --git a/pkg/events/fake_exec_data_test.go b/pkg/parser/fake_exec_data_test.go similarity index 95% rename from pkg/events/fake_exec_data_test.go rename to pkg/parser/fake_exec_data_test.go index 104573e..14bae10 100644 --- a/pkg/events/fake_exec_data_test.go +++ b/pkg/parser/fake_exec_data_test.go @@ -1,9 +1,9 @@ -package events_test +package parser_test import ( "log" - "github.com/kpetremann/salt-exporter/pkg/events" + "github.com/kpetremann/salt-exporter/pkg/event" "github.com/vmihailenco/msgpack/v5" ) @@ -25,11 +25,11 @@ import ( } */ -var expectedNewJob = events.SaltEvent{ +var expectedNewJob = event.SaltEvent{ Tag: "salt/job/20220630000000000000/new", Type: "new", TargetNumber: 1, - Data: events.EventData{ + Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", Fun: "test.ping", Jid: "20220630000000000000", @@ -81,11 +81,11 @@ func fakeNewJobEvent() []byte { */ -var expectedReturnJob = events.SaltEvent{ +var expectedReturnJob = event.SaltEvent{ Tag: "salt/job/20220630000000000000/ret/localhost", Type: "ret", TargetNumber: 0, - Data: events.EventData{ + Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", Cmd: "_return", Fun: "test.ping", @@ -141,11 +141,11 @@ func fakeRetJobEvent() []byte { } */ -var expectedNewScheduleJob = events.SaltEvent{ +var expectedNewScheduleJob = event.SaltEvent{ Tag: "salt/job/20220630000000000000/new", Type: "new", TargetNumber: 1, - Data: events.EventData{ + Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", FunArgs: []interface{}{"sync_all"}, Fun: "schedule.run_job", @@ -203,11 +203,11 @@ func fakeNewScheduleJobEvent() []byte { } */ -var expectedAckScheduleJob = events.SaltEvent{ +var expectedAckScheduleJob = event.SaltEvent{ Tag: "salt/job/20220630000000000000/ret/localhost", Type: "ret", TargetNumber: 0, - Data: events.EventData{ + Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", Cmd: "_return", Fun: "schedule.run_job", @@ -291,11 +291,11 @@ func fakeAckScheduleJobEvent() []byte { } */ -var expectedScheduleJobReturn = events.SaltEvent{ +var expectedScheduleJobReturn = event.SaltEvent{ Tag: "salt/job/20220630000000000000/ret/localhost", Type: "ret", TargetNumber: 0, - Data: events.EventData{ + Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", Cmd: "_return", Fun: "saltutil.sync_all", diff --git a/pkg/events/fake_state_data_test.go b/pkg/parser/fake_state_data_test.go similarity index 89% rename from pkg/events/fake_state_data_test.go rename to pkg/parser/fake_state_data_test.go index 2a6c9aa..55ea992 100644 --- a/pkg/events/fake_state_data_test.go +++ b/pkg/parser/fake_state_data_test.go @@ -1,32 +1,12 @@ -package events_test +package parser_test import ( "log" - "github.com/kpetremann/salt-exporter/pkg/events" + "github.com/kpetremann/salt-exporter/pkg/event" "github.com/vmihailenco/msgpack/v5" ) -func getNewStateEvent() events.SaltEvent { - return events.SaltEvent{ - Tag: "salt/job/20220630000000000000/new", - Type: "new", - TargetNumber: 1, - Data: events.EventData{ - Timestamp: "2022-06-30T00:00:00.000000", - Fun: "state.sls", - Arg: []interface{}{"test"}, - Jid: "20220630000000000000", - Minions: []string{"node1"}, - Missing: []string{}, - Tgt: "node1", - TgtType: "glob", - User: "salt_user", - }, - IsScheduleJob: false, - } -} - /* Fake state.sls job @@ -47,11 +27,11 @@ func getNewStateEvent() events.SaltEvent { } */ -var expectedNewStateSlsJob = events.SaltEvent{ +var expectedNewStateSlsJob = event.SaltEvent{ Tag: "salt/job/20220630000000000000/new", Type: "new", TargetNumber: 1, - Data: events.EventData{ + Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", Fun: "state.sls", Arg: []interface{}{"test"}, @@ -122,11 +102,11 @@ func fakeNewStateSlsJobEvent() []byte { */ -var expectedStateSlsReturn = events.SaltEvent{ +var expectedStateSlsReturn = event.SaltEvent{ Tag: "salt/job/20220630000000000000/ret/node1", Type: "ret", TargetNumber: 0, - Data: events.EventData{ + Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", Cmd: "_return", Fun: "state.sls", @@ -217,11 +197,11 @@ func fakeStateSlsReturnEvent() []byte { */ -var expectedNewStateSingle = events.SaltEvent{ +var expectedNewStateSingle = event.SaltEvent{ Tag: "salt/job/20220630000000000000/new", Type: "new", TargetNumber: 1, - Data: events.EventData{ + Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", Arg: []interface{}{ map[string]interface{}{ @@ -308,11 +288,11 @@ func fakeNewStateSingleEvent() []byte { */ -var expectedStateSingleReturn = events.SaltEvent{ +var expectedStateSingleReturn = event.SaltEvent{ Tag: "salt/job/20220630000000000000/ret/node1", Type: "ret", TargetNumber: 0, - Data: events.EventData{ + Data: event.EventData{ Timestamp: "2022-06-30T00:00:00.000000", Cmd: "_return", Fun: "state.single", diff --git a/pkg/parser/parser.go b/pkg/parser/parser.go new file mode 100644 index 0000000..3af2983 --- /dev/null +++ b/pkg/parser/parser.go @@ -0,0 +1,58 @@ +package parser + +import ( + "errors" + "strings" + + "github.com/kpetremann/salt-exporter/pkg/event" + "github.com/rs/zerolog/log" + "github.com/vmihailenco/msgpack/v5" +) + +type Event struct { + KeepRawBody bool +} + +func NewEventParser(KeepRawBody bool) Event { + return Event{KeepRawBody: KeepRawBody} +} + +// ParseEvent parses a salt event +// +// KeepRawBody is used to keep the raw body of the event. +func (e Event) Parse(message map[string]interface{}) (event.SaltEvent, error) { + body := string(message["body"].([]byte)) + lines := strings.SplitN(body, "\n\n", 2) + + tag := lines[0] + if !(strings.HasPrefix(tag, "salt/job") || strings.HasPrefix(tag, "salt/run")) { + return event.SaltEvent{}, errors.New("tag not supported") + } + log.Debug().Str("tag", tag).Msg("new event") + + // Extract job type from the tag + job_type := strings.Split(tag, "/")[3] + + // Parse message body + byteResult := []byte(lines[1]) + ev := event.SaltEvent{Tag: tag, Type: job_type} + + if e.KeepRawBody { + ev.RawBody = byteResult + } + + if err := msgpack.Unmarshal(byteResult, &ev.Data); err != nil { + log.Warn().Str("error", err.Error()).Str("tag", tag).Msg("decoding_failure") + return event.SaltEvent{}, err + } + + ev.TargetNumber = len(ev.Data.Minions) + ev.IsScheduleJob = ev.Data.Schedule != "" + + // A runner are executed on the master but they do not provide their ID in the event + if strings.HasPrefix(tag, "salt/run") && ev.Data.Id == "" { + ev.Data.Id = "master" + } + + return ev, nil +} diff --git a/pkg/parser/parser_test.go b/pkg/parser/parser_test.go new file mode 100644 index 0000000..81aa615 --- /dev/null +++ b/pkg/parser/parser_test.go @@ -0,0 +1,75 @@ +package parser_test + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/kpetremann/salt-exporter/pkg/event" + "github.com/kpetremann/salt-exporter/pkg/parser" +) + +func TestParseEvent(t *testing.T) { + tests := []struct { + name string + args map[string]interface{} + want event.SaltEvent + }{ + { + name: "new job", + args: fakeEventAsMap(fakeNewJobEvent()), + want: expectedNewJob, + }, + { + name: "return job", + args: fakeEventAsMap(fakeRetJobEvent()), + want: expectedReturnJob, + }, + { + name: "new schedule job", + args: fakeEventAsMap(fakeNewScheduleJobEvent()), + want: expectedNewScheduleJob, + }, + { + name: "return ack schedule job", + args: fakeEventAsMap(fakeAckScheduleJobEvent()), + want: expectedAckScheduleJob, + }, + { + name: "return schedule job", + args: fakeEventAsMap(fakeScheduleJobReturnEvent()), + want: expectedScheduleJobReturn, + }, + { + name: "new state.sls", + args: fakeEventAsMap(fakeNewStateSlsJobEvent()), + want: expectedNewStateSlsJob, + }, + { + name: "return state.sls", + args: fakeEventAsMap(fakeStateSlsReturnEvent()), + want: expectedStateSlsReturn, + }, + { + name: "new state.single", + args: fakeEventAsMap(fakeNewStateSingleEvent()), + want: expectedNewStateSingle, + }, + { + name: "return state.single", + args: fakeEventAsMap(fakeStateSingleReturnEvent()), + want: expectedStateSingleReturn, + }, + } + + p := parser.NewEventParser(false) + for _, test := range tests { + parsed, err := p.Parse(test.args) + if err != nil { + t.Errorf("Unexpected error %s", err.Error()) + } + + if diff := cmp.Diff(parsed, test.want); diff != "" { + t.Errorf("Mismatch for '%s' test:\n%s", test.name, diff) + } + } +}