diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6c0aea40898e..b191fbd192c1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -110,6 +110,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif *Winlogbeat* +- Improve the error message when the registry file content is invalid. {pull}30543[30543] +- Retry EvtSubscribe from start if fails with strict mode. {issue}29793[29793] {pull}30155[30155] + *Elastic Log Driver* diff --git a/winlogbeat/beater/winlogbeat.go b/winlogbeat/beater/winlogbeat.go index ec7ebf90ef6f..91aa2ffcc4f7 100644 --- a/winlogbeat/beater/winlogbeat.go +++ b/winlogbeat/beater/winlogbeat.go @@ -98,7 +98,7 @@ func (eb *Winlogbeat) init(b *beat.Beat) error { if err != nil { return fmt.Errorf("failed to create new event log: %w", err) } - eb.log.Debugf("Initialized EventLog]", eventLog.Name()) + eb.log.Debugw("Initialized EventLog", "id", eventLog.Name()) logger, err := newEventLogger(b.Info, eventLog, config, eb.log) if err != nil { diff --git a/winlogbeat/eventlog/wineventlog.go b/winlogbeat/eventlog/wineventlog.go index 7ec830f220a5..6ecfc2ba2c6b 100644 --- a/winlogbeat/eventlog/wineventlog.go +++ b/winlogbeat/eventlog/wineventlog.go @@ -22,6 +22,7 @@ package eventlog import ( "encoding/xml" + "errors" "fmt" "io" "path/filepath" @@ -30,7 +31,6 @@ import ( "time" "github.com/joeshaw/multierror" - "github.com/pkg/errors" "golang.org/x/sys/windows" "github.com/elastic/beats/v7/libbeat/common" @@ -90,7 +90,7 @@ func (a *NoMoreEventsAction) Unpack(v string) error { return nil } } - return errors.Errorf("invalid no_more_events action: %v", v) + return fmt.Errorf("invalid no_more_events action: %v", v) } // String returns the name of the action. @@ -196,13 +196,15 @@ func (l *winEventLog) openChannel(bookmark win.EvtHandle) error { // https://msdn.microsoft.com/en-us/library/windows/desktop/aa385771(v=vs.85).aspx#pull signalEvent, err := windows.CreateEvent(nil, 0, 0, nil) if err != nil { - return nil + return err } - defer windows.CloseHandle(signalEvent) + defer func() { _ = windows.CloseHandle(signalEvent) }() var flags win.EvtSubscribeFlag if bookmark > 0 { - flags = win.EvtSubscribeStartAfterBookmark + // Use EvtSubscribeStrict to detect when the bookmark is missing and be able to + // subscribe again from the beginning. + flags = win.EvtSubscribeStartAfterBookmark | win.EvtSubscribeStrict } else { flags = win.EvtSubscribeStartAtOldestRecord } @@ -215,7 +217,18 @@ func (l *winEventLog) openChannel(bookmark win.EvtHandle) error { l.query, // Query - nil means all events bookmark, // Bookmark - for resuming from a specific event flags) + + switch { + case errors.Is(err, win.ERROR_NOT_FOUND), errors.Is(err, win.ERROR_EVT_QUERY_RESULT_STALE), + errors.Is(err, win.ERROR_EVT_QUERY_RESULT_INVALID_POSITION): + debugf("%s error subscribing (first chance): %v", l.logPrefix, err) + // The bookmarked event was not found, we retry the subscription from the start. + incrementMetric(readErrors, err) + subscriptionHandle, err = win.Subscribe(0, signalEvent, "", l.query, 0, win.EvtSubscribeStartAtOldestRecord) + } + if err != nil { + debugf("%s error subscribing (final): %v", l.logPrefix, err) return err } @@ -228,7 +241,7 @@ func (l *winEventLog) openFile(state checkpoint.EventLogState, bookmark win.EvtH h, err := win.EvtQuery(0, path, "", win.EvtQueryFilePath|win.EvtQueryForwardDirection) if err != nil { - return errors.Wrapf(err, "failed to get handle to event log file %v", path) + return fmt.Errorf("failed to get handle to event log file %v: %w", path, err) } if bookmark > 0 { @@ -240,16 +253,16 @@ func (l *winEventLog) openFile(state checkpoint.EventLogState, bookmark win.EvtH if err = win.EvtSeek(h, 0, bookmark, win.EvtSeekRelativeToBookmark|win.EvtSeekStrict); err == nil { // Then we advance past the last read event to avoid sending that // event again. This won't fail if we're at the end of the file. - err = errors.Wrap( - win.EvtSeek(h, 1, bookmark, win.EvtSeekRelativeToBookmark), - "failed to seek past bookmarked position") + if seekErr := win.EvtSeek(h, 1, bookmark, win.EvtSeekRelativeToBookmark); seekErr != nil { + err = fmt.Errorf("failed to seek past bookmarked position: %w", seekErr) + } } else { logp.Warn("%s Failed to seek to bookmarked location in %v (error: %v). "+ "Recovering by reading the log from the beginning. (Did the file "+ "change since it was last read?)", l.logPrefix, path, err) - err = errors.Wrap( - win.EvtSeek(h, 0, 0, win.EvtSeekRelativeToFirst), - "failed to seek to beginning of log") + if seekErr := win.EvtSeek(h, 0, 0, win.EvtSeekRelativeToFirst); seekErr != nil { + err = fmt.Errorf("failed to seek to beginning of log: %w", seekErr) + } } if err != nil { @@ -273,11 +286,13 @@ func (l *winEventLog) Read() ([]Record, error) { }() detailf("%s EventHandles returned %d handles", l.logPrefix, len(handles)) + //nolint: prealloc // some handles can be skipped, the final size is unknown var records []Record for _, h := range handles { l.outputBuf.Reset() err := l.render(h, l.outputBuf) - if bufErr, ok := err.(sys.InsufficientBufferError); ok { + var bufErr sys.InsufficientBufferError + if ok := errors.As(err, &bufErr); ok { detailf("%s Increasing render buffer size to %d", l.logPrefix, bufErr.RequiredSize) l.renderBuf = make([]byte, bufErr.RequiredSize) @@ -290,7 +305,7 @@ func (l *winEventLog) Read() ([]Record, error) { continue } - r, _ := l.buildRecordFromXML(l.outputBuf.Bytes(), err) + r := l.buildRecordFromXML(l.outputBuf.Bytes(), err) r.Offset = checkpoint.EventLogState{ Name: l.id, RecordNumber: r.RecordID, @@ -314,26 +329,26 @@ func (l *winEventLog) Close() error { func (l *winEventLog) eventHandles(maxRead int) ([]win.EvtHandle, int, error) { handles, err := win.EventHandles(l.subscription, maxRead) - switch err { - case nil: + switch { + case err == nil: if l.maxRead > maxRead { debugf("%s Recovered from RPC_S_INVALID_BOUND error (errno 1734) "+ "by decreasing batch_read_size to %v", l.logPrefix, maxRead) } return handles, maxRead, nil - case win.ERROR_NO_MORE_ITEMS: + case errors.Is(err, win.ERROR_NO_MORE_ITEMS): detailf("%s No more events", l.logPrefix) if l.config.NoMoreEvents == Stop { return nil, maxRead, io.EOF } return nil, maxRead, nil - case win.RPC_S_INVALID_BOUND: + case errors.Is(err, win.RPC_S_INVALID_BOUND): incrementMetric(readErrors, err) if err := l.Close(); err != nil { - return nil, 0, errors.Wrap(err, "failed to recover from RPC_S_INVALID_BOUND") + return nil, 0, fmt.Errorf("failed to recover from RPC_S_INVALID_BOUND: %w", err) } if err := l.Open(l.lastRead); err != nil { - return nil, 0, errors.Wrap(err, "failed to recover from RPC_S_INVALID_BOUND") + return nil, 0, fmt.Errorf("failed to recover from RPC_S_INVALID_BOUND: %w", err) } return l.eventHandles(maxRead / 2) default: @@ -343,7 +358,7 @@ func (l *winEventLog) eventHandles(maxRead int) ([]win.EvtHandle, int, error) { } } -func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) (Record, error) { +func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) Record { includeXML := l.config.IncludeXML e, err := winevent.UnmarshalXML(x) if err != nil { @@ -388,7 +403,7 @@ func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) (Record, r.XML = string(x) } - return r, nil + return r } func newEventLogging(options *common.Config) (EventLog, error) { diff --git a/winlogbeat/eventlog/wineventlog_experimental.go b/winlogbeat/eventlog/wineventlog_experimental.go index 87eb4b328026..9ac4b82a5c68 100644 --- a/winlogbeat/eventlog/wineventlog_experimental.go +++ b/winlogbeat/eventlog/wineventlog_experimental.go @@ -21,11 +21,12 @@ package eventlog import ( + "errors" + "fmt" "io" "os" "path/filepath" - "github.com/pkg/errors" "go.uber.org/multierr" "golang.org/x/sys/windows" @@ -99,23 +100,37 @@ func (l *winEventLogExp) openChannel(bookmark win.Bookmark) (win.EvtHandle, erro if err != nil { return win.NilHandle, err } - defer windows.CloseHandle(signalEvent) + defer func() { _ = windows.CloseHandle(signalEvent) }() var flags win.EvtSubscribeFlag if bookmark > 0 { - flags = win.EvtSubscribeStartAfterBookmark + // Use EvtSubscribeStrict to detect when the bookmark is missing and be able to + // subscribe again from the beginning. + flags = win.EvtSubscribeStartAfterBookmark | win.EvtSubscribeStrict } else { flags = win.EvtSubscribeStartAtOldestRecord } l.log.Debugw("Using subscription query.", "winlog.query", l.query) - return win.Subscribe( + h, err := win.Subscribe( 0, // Session - nil for localhost signalEvent, "", // Channel - empty b/c channel is in the query l.query, // Query - nil means all events win.EvtHandle(bookmark), // Bookmark - for resuming from a specific event flags) + + switch { + case err == nil: + return h, nil + case errors.Is(err, win.ERROR_NOT_FOUND), errors.Is(err, win.ERROR_EVT_QUERY_RESULT_STALE), + errors.Is(err, win.ERROR_EVT_QUERY_RESULT_INVALID_POSITION): + // The bookmarked event was not found, we retry the subscription from the start. + incrementMetric(readErrors, err) + return win.Subscribe(0, signalEvent, "", l.query, 0, win.EvtSubscribeStartAtOldestRecord) + default: + return 0, err + } } func (l *winEventLogExp) openFile(state checkpoint.EventLogState, bookmark win.Bookmark) (win.EvtHandle, error) { @@ -123,7 +138,7 @@ func (l *winEventLogExp) openFile(state checkpoint.EventLogState, bookmark win.B h, err := win.EvtQuery(0, path, "", win.EvtQueryFilePath|win.EvtQueryForwardDirection) if err != nil { - return win.NilHandle, errors.Wrapf(err, "failed to get handle to event log file %v", path) + return win.NilHandle, fmt.Errorf("failed to get handle to event log file %v: %w", path, err) } if bookmark > 0 { @@ -135,16 +150,16 @@ func (l *winEventLogExp) openFile(state checkpoint.EventLogState, bookmark win.B if err = win.EvtSeek(h, 0, win.EvtHandle(bookmark), win.EvtSeekRelativeToBookmark|win.EvtSeekStrict); err == nil { // Then we advance past the last read event to avoid sending that // event again. This won't fail if we're at the end of the file. - err = errors.Wrap( - win.EvtSeek(h, 1, win.EvtHandle(bookmark), win.EvtSeekRelativeToBookmark), - "failed to seek past bookmarked position") + if seekErr := win.EvtSeek(h, 1, win.EvtHandle(bookmark), win.EvtSeekRelativeToBookmark); seekErr != nil { + err = fmt.Errorf("failed to seek past bookmarked position: %w", seekErr) + } } else { l.log.Warnf("s Failed to seek to bookmarked location in %v (error: %v). "+ "Recovering by reading the log from the beginning. (Did the file "+ "change since it was last read?)", path, err) - err = errors.Wrap( - win.EvtSeek(h, 0, 0, win.EvtSeekRelativeToFirst), - "failed to seek to beginning of log") + if seekErr := win.EvtSeek(h, 0, 0, win.EvtSeekRelativeToFirst); seekErr != nil { + err = fmt.Errorf("failed to seek to beginning of log: %w", seekErr) + } } if err != nil { @@ -198,6 +213,7 @@ func (l *winEventLogExp) processHandle(h win.EvtHandle) (*Record, error) { evt.RenderErr = append(evt.RenderErr, err.Error()) } + //nolint: godox // keep to have a record of feature disparity between non-experimental vs experimental // TODO: Need to add XML when configured. r := &Record{ @@ -224,7 +240,7 @@ func (l *winEventLogExp) processHandle(h win.EvtHandle) (*Record, error) { func (l *winEventLogExp) createBookmarkFromEvent(evtHandle win.EvtHandle) (string, error) { bookmark, err := win.NewBookmarkFromEvent(evtHandle) if err != nil { - return "", errors.Wrap(err, "failed to create new bookmark from event handle") + return "", fmt.Errorf("failed to create new bookmark from event handle: %w", err) } defer bookmark.Close() diff --git a/winlogbeat/sys/wineventlog/zsyscall_windows.go b/winlogbeat/sys/wineventlog/zsyscall_windows.go index 0388835e30f8..62e455f09a00 100644 --- a/winlogbeat/sys/wineventlog/zsyscall_windows.go +++ b/winlogbeat/sys/wineventlog/zsyscall_windows.go @@ -31,12 +31,18 @@ var _ unsafe.Pointer // Do the interface allocations only once for common // Errno values. const ( - errnoERROR_IO_PENDING = 997 + errnoERROR_IO_PENDING = 997 + errnoERROR_NOT_FOUND = 1168 + errnoERROR_EVT_QUERY_RESULT_STALE = 15011 + errnoERROR_EVT_QUERY_RESULT_INVALID_POSITION = 15012 ) var ( - errERROR_IO_PENDING error = syscall.Errno(errnoERROR_IO_PENDING) - errERROR_EINVAL error = syscall.EINVAL + ERROR_IO_PENDING error = syscall.Errno(errnoERROR_IO_PENDING) + ERROR_NOT_FOUND error = syscall.Errno(errnoERROR_NOT_FOUND) + ERROR_EVT_QUERY_RESULT_STALE error = syscall.Errno(errnoERROR_EVT_QUERY_RESULT_STALE) + ERROR_EVT_QUERY_RESULT_INVALID_POSITION error = syscall.Errno(errnoERROR_EVT_QUERY_RESULT_INVALID_POSITION) + ERROR_EINVAL error = syscall.EINVAL ) // errnoErr returns common boxed Errno values, to prevent @@ -44,9 +50,15 @@ var ( func errnoErr(e syscall.Errno) error { switch e { case 0: - return errERROR_EINVAL + return ERROR_EINVAL case errnoERROR_IO_PENDING: - return errERROR_IO_PENDING + return ERROR_IO_PENDING + case errnoERROR_NOT_FOUND: + return ERROR_NOT_FOUND + case errnoERROR_EVT_QUERY_RESULT_STALE: + return ERROR_EVT_QUERY_RESULT_STALE + case ERROR_EVT_QUERY_RESULT_INVALID_POSITION: + return ERROR_EVT_QUERY_RESULT_INVALID_POSITION } // TODO: add more here, after collecting data on the common // error values see on Windows. (perhaps when running diff --git a/winlogbeat/tests/system/test_wineventlog.py b/winlogbeat/tests/system/test_wineventlog.py index 9bbf5a7ded43..e0b24d18454f 100644 --- a/winlogbeat/tests/system/test_wineventlog.py +++ b/winlogbeat/tests/system/test_wineventlog.py @@ -63,6 +63,84 @@ def test_resume_reading_events(self): "winlog.opcode": "Info", }) + def test_cleared_channel_restarts(self): + """ + wineventlog - When a bookmark points to a cleared (stale) channel + the subscription starts from the beginning + """ + msg1 = "First event" + self.write_event_log(msg1) + msg2 = "Second event" + self.write_event_log(msg2) + + evts = self.read_events(expected_events=2) + + self.assertTrue(len(evts), 2) + self.assert_common_fields(evts[0], msg=msg1) + self.assert_common_fields(evts[1], msg=msg2) + + # remove the output file, otherwise there is a race condition + # in read_events() below where it reads the results of the previous + # execution + os.unlink(os.path.join(self.working_dir, "output", self.beat_name + "-" + self.today + ".ndjson")) + + self.clear_event_log() + + # we check that after clearing the event log the bookmark still points to the previous checkpoint + event_logs = self.read_registry(requireBookmark=True) + self.assertTrue(len(list(event_logs.keys())), 1) + self.assertIn(self.providerName, event_logs) + record_number = event_logs[self.providerName]["record_number"] + self.assertTrue(record_number, 2) + + msg3 = "Third event" + self.write_event_log(msg3) + + evts = self.read_events() + self.assertTrue(len(evts), 1) + self.assert_common_fields(evts[0], msg=msg3) + + def test_bad_bookmark_restart(self): + """ + wineventlog - When a bookmarked event does not exist the subcription + restarts from the beginning + """ + msg1 = "First event" + self.write_event_log(msg1) + + evts = self.read_events(expected_events=1) + + self.assertTrue(len(evts), 1) + self.assert_common_fields(evts[0], msg=msg1) + + event_logs = self.read_registry(requireBookmark=True) + self.assertTrue(len(list(event_logs.keys())), 1) + self.assertIn(self.providerName, event_logs) + record_number = event_logs[self.providerName]["record_number"] + self.assertTrue(record_number, 1) + + # write invalid bookmark, it should start from the beginning again + f = open(os.path.join(self.working_dir, "data", ".winlogbeat.yml"), "w") + f.write(( + "update_time: 2100-01-01T00:00:00Z\n" + + "event_logs:\n" + + " - name: {}\n" + + " record_number: 1000\n" + + " timestamp: 2100-01-01T00:00:00Z\n" + + " bookmark: \"\\r\\n \\r\\n\"\n"). + format(self.providerName, self.providerName) + ) + f.close() + + # remove the output file, otherwise there is a race condition + # in read_events() below where it reads the results of the previous + # execution + os.unlink(os.path.join(self.working_dir, "output", self.beat_name + "-" + self.today + ".ndjson")) + + evts = self.read_events(expected_events=1) + self.assertTrue(len(evts), 1) + self.assert_common_fields(evts[0], msg=msg1) + def test_read_unknown_event_id(self): """ wineventlog - Read unknown event ID