Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[winlogbeat] Retry EvtSubscribe from start if fails with strict mode #30155

Merged
merged 19 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion winlogbeat/beater/winlogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (eb *Winlogbeat) init(b *beat.Beat) error {
if err != nil {
return fmt.Errorf("failed to create new event log: %w", err)
}
eb.log.Debugf("Initialized EventLog]", eventLog.Name())
eb.log.Debugf("Initialized EventLog %s", eventLog.Name())
marc-gr marked this conversation as resolved.
Show resolved Hide resolved

logger, err := newEventLogger(b.Info, eventLog, config, eb.log)
if err != nil {
Expand Down
13 changes: 12 additions & 1 deletion winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ func (l *winEventLog) openChannel(bookmark win.EvtHandle) error {

var flags win.EvtSubscribeFlag
if bookmark > 0 {
flags = win.EvtSubscribeStartAfterBookmark
// Use EvtSubscribeStrict to detect when the bookmark is missing and be able to
// subscribe again from the beginning.
flags = win.EvtSubscribeStartAfterBookmark | win.EvtSubscribeStrict
} else {
flags = win.EvtSubscribeStartAtOldestRecord
}
Expand All @@ -215,6 +217,15 @@ func (l *winEventLog) openChannel(bookmark win.EvtHandle) error {
l.query, // Query - nil means all events
bookmark, // Bookmark - for resuming from a specific event
flags)

switch err {
case win.ERROR_NOT_FOUND, win.ERROR_EVT_QUERY_RESULT_STALE,
win.ERROR_EVT_QUERY_RESULT_INVALID_POSITION:
// The bookmarked event was not found, we retry the subscription from the start.
marc-gr marked this conversation as resolved.
Show resolved Hide resolved
incrementMetric(readErrors, err)
subscriptionHandle, err = win.Subscribe(0, signalEvent, "", l.query, 0, win.EvtSubscribeStartAtOldestRecord)
}

if err != nil {
return err
}
Expand Down
18 changes: 16 additions & 2 deletions winlogbeat/eventlog/wineventlog_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,33 @@ func (l *winEventLogExp) openChannel(bookmark win.Bookmark) (win.EvtHandle, erro

var flags win.EvtSubscribeFlag
if bookmark > 0 {
flags = win.EvtSubscribeStartAfterBookmark
// Use EvtSubscribeStrict to detect when the bookmark is missing and be able to
// subscribe again from the beginning.
flags = win.EvtSubscribeStartAfterBookmark | win.EvtSubscribeStrict
} else {
flags = win.EvtSubscribeStartAtOldestRecord
}

l.log.Debugw("Using subscription query.", "winlog.query", l.query)
return win.Subscribe(
h, err := win.Subscribe(
0, // Session - nil for localhost
signalEvent,
"", // Channel - empty b/c channel is in the query
l.query, // Query - nil means all events
win.EvtHandle(bookmark), // Bookmark - for resuming from a specific event
flags)

switch err {
case nil:
return h, nil
case win.ERROR_NOT_FOUND, win.ERROR_EVT_QUERY_RESULT_STALE,
win.ERROR_EVT_QUERY_RESULT_INVALID_POSITION:
// The bookmarked event was not found, we retry the subscription from the start.
incrementMetric(readErrors, err)
return win.Subscribe(0, signalEvent, "", l.query, 0, win.EvtSubscribeStartAtOldestRecord)
default:
return 0, err
}
}

func (l *winEventLogExp) openFile(state checkpoint.EventLogState, bookmark win.Bookmark) (win.EvtHandle, error) {
Expand Down
22 changes: 17 additions & 5 deletions winlogbeat/sys/wineventlog/zsyscall_windows.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 85 additions & 0 deletions winlogbeat/tests/system/test_wineventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,91 @@ def test_resume_reading_events(self):
"winlog.opcode": "Info",
})

def test_cleared_channel_starts_from_beginning(self):
"""
wineventlog - When a bookmark points to a cleared (stale) channel
the subscription starts from the beginning
"""
msg1 = "First event"
self.write_event_log(msg1)
msg2 = "Second event"
self.write_event_log(msg2)

evts = self.read_events(expected_events=2)

self.assertTrue(len(evts), 2)
self.assert_common_fields(evts[0], msg=msg1)
self.assert_common_fields(evts[1], msg=msg2)

# remove the output file, otherwise there is a race condition
# in read_events() below where it reads the results of the previous
# execution
os.unlink(os.path.join(self.working_dir, "output", self.beat_name + "-" + self.today + ".ndjson"))

self.clear_event_log()

# we check that after clearing the event log the bookmark still points to the previous checkpoint
event_logs = self.read_registry(requireBookmark=True)
self.assertTrue(len(list(event_logs.keys())), 1)
self.assertIn(self.providerName, event_logs)
record_number = event_logs[self.providerName]["record_number"]
self.assertTrue(record_number, 2)

msg3 = "Third event"
self.write_event_log(msg3)

evts = self.read_events()
marc-gr marked this conversation as resolved.
Show resolved Hide resolved
self.assertTrue(len(evts), 1)
self.assert_common_fields(evts[0], msg=msg3)

def test_restart_if_bookmark_does_not_exist(self):
"""
wineventlog - When a bookmarked event does not exist the subcription
restarts from the beginning
"""
msg1 = "First event"
self.write_event_log(msg1)
msg2 = "Second event"
self.write_event_log(msg2)

evts = self.read_events(expected_events=2)

self.assertTrue(len(evts), 2)
self.assert_common_fields(evts[0], msg=msg1)
self.assert_common_fields(evts[1], msg=msg2)

# remove the output file, otherwise there is a race condition
# in read_events() below where it reads the results of the previous
# execution
os.unlink(os.path.join(self.working_dir, "output", self.beat_name + "-" + self.today + ".ndjson"))

msg3 = "Third event"
self.write_event_log(msg3)

event_logs = self.read_registry(requireBookmark=True)
self.assertTrue(len(list(event_logs.keys())), 1)
self.assertIn(self.providerName, event_logs)
record_number = event_logs[self.providerName]["record_number"]
self.assertTrue(record_number, 3)

# write invalid bookmark, it should start from the beginning again
f = open(os.path.join(self.working_dir, "data", ".winlogbeat.yml"), "w")
f.write((
"update_time: 2100-01-01T00:00:00Z\n" +
"event_logs:\n" +
" - name: {}\n" +
" record_number: 1000\n" +
" timestamp: 2100-01-01T00:00:00Z\n" +
" bookmark: \"<BookmarkList>\\r\\n <Bookmark Channel='{}' RecordId='1000' IsCurrent='true'/>\\r\\n</BookmarkList>\"\n".
format(self.providerName, self.providerName)
))

evts = self.read_events(expected_events=3)
self.assertTrue(len(evts), 3)
self.assert_common_fields(evts[0], msg=msg1)
self.assert_common_fields(evts[1], msg=msg2)
self.assert_common_fields(evts[2], msg=msg3)

def test_read_unknown_event_id(self):
"""
wineventlog - Read unknown event ID
Expand Down