Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[winlogbeat] Retry EvtSubscribe from start if fails with strict mode #30155

Merged
merged 19 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
*Winlogbeat*

- Improve the error message when the registry file content is invalid. {pull}30543[30543]
- Retry EvtSubscribe from start if fails with strict mode. {issue}29793[29793] {pull}30155[30155]


*Elastic Log Driver*
Expand Down
2 changes: 1 addition & 1 deletion winlogbeat/beater/winlogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (eb *Winlogbeat) init(b *beat.Beat) error {
if err != nil {
return fmt.Errorf("failed to create new event log: %w", err)
}
eb.log.Debugf("Initialized EventLog]", eventLog.Name())
eb.log.Debugw("Initialized EventLog", "id", eventLog.Name())

logger, err := newEventLogger(b.Info, eventLog, config, eb.log)
if err != nil {
Expand Down
59 changes: 37 additions & 22 deletions winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package eventlog

import (
"encoding/xml"
"errors"
"fmt"
"io"
"path/filepath"
Expand All @@ -30,7 +31,6 @@ import (
"time"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"
"golang.org/x/sys/windows"

"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -90,7 +90,7 @@ func (a *NoMoreEventsAction) Unpack(v string) error {
return nil
}
}
return errors.Errorf("invalid no_more_events action: %v", v)
return fmt.Errorf("invalid no_more_events action: %v", v)
}

// String returns the name of the action.
Expand Down Expand Up @@ -196,13 +196,15 @@ func (l *winEventLog) openChannel(bookmark win.EvtHandle) error {
// https://msdn.microsoft.com/en-us/library/windows/desktop/aa385771(v=vs.85).aspx#pull
signalEvent, err := windows.CreateEvent(nil, 0, 0, nil)
if err != nil {
return nil
return err
}
defer windows.CloseHandle(signalEvent)
defer func() { _ = windows.CloseHandle(signalEvent) }()

var flags win.EvtSubscribeFlag
if bookmark > 0 {
flags = win.EvtSubscribeStartAfterBookmark
// Use EvtSubscribeStrict to detect when the bookmark is missing and be able to
// subscribe again from the beginning.
flags = win.EvtSubscribeStartAfterBookmark | win.EvtSubscribeStrict
} else {
flags = win.EvtSubscribeStartAtOldestRecord
}
Expand All @@ -215,7 +217,18 @@ func (l *winEventLog) openChannel(bookmark win.EvtHandle) error {
l.query, // Query - nil means all events
bookmark, // Bookmark - for resuming from a specific event
flags)

switch {
case errors.Is(err, win.ERROR_NOT_FOUND), errors.Is(err, win.ERROR_EVT_QUERY_RESULT_STALE),
errors.Is(err, win.ERROR_EVT_QUERY_RESULT_INVALID_POSITION):
debugf("%s error subscribing (first chance): %v", l.logPrefix, err)
// The bookmarked event was not found, we retry the subscription from the start.
marc-gr marked this conversation as resolved.
Show resolved Hide resolved
incrementMetric(readErrors, err)
subscriptionHandle, err = win.Subscribe(0, signalEvent, "", l.query, 0, win.EvtSubscribeStartAtOldestRecord)
}

if err != nil {
debugf("%s error subscribing (final): %v", l.logPrefix, err)
return err
}

Expand All @@ -228,7 +241,7 @@ func (l *winEventLog) openFile(state checkpoint.EventLogState, bookmark win.EvtH

h, err := win.EvtQuery(0, path, "", win.EvtQueryFilePath|win.EvtQueryForwardDirection)
if err != nil {
return errors.Wrapf(err, "failed to get handle to event log file %v", path)
return fmt.Errorf("failed to get handle to event log file %v: %w", path, err)
}

if bookmark > 0 {
Expand All @@ -240,16 +253,16 @@ func (l *winEventLog) openFile(state checkpoint.EventLogState, bookmark win.EvtH
if err = win.EvtSeek(h, 0, bookmark, win.EvtSeekRelativeToBookmark|win.EvtSeekStrict); err == nil {
// Then we advance past the last read event to avoid sending that
// event again. This won't fail if we're at the end of the file.
err = errors.Wrap(
win.EvtSeek(h, 1, bookmark, win.EvtSeekRelativeToBookmark),
"failed to seek past bookmarked position")
if seekErr := win.EvtSeek(h, 1, bookmark, win.EvtSeekRelativeToBookmark); seekErr != nil {
err = fmt.Errorf("failed to seek past bookmarked position: %w", seekErr)
}
} else {
logp.Warn("%s Failed to seek to bookmarked location in %v (error: %v). "+
"Recovering by reading the log from the beginning. (Did the file "+
"change since it was last read?)", l.logPrefix, path, err)
err = errors.Wrap(
win.EvtSeek(h, 0, 0, win.EvtSeekRelativeToFirst),
"failed to seek to beginning of log")
if seekErr := win.EvtSeek(h, 0, 0, win.EvtSeekRelativeToFirst); seekErr != nil {
err = fmt.Errorf("failed to seek to beginning of log: %w", seekErr)
}
}

if err != nil {
Expand All @@ -273,11 +286,13 @@ func (l *winEventLog) Read() ([]Record, error) {
}()
detailf("%s EventHandles returned %d handles", l.logPrefix, len(handles))

//nolint: prealloc // some handles can be skipped, the final size is unknown
var records []Record
for _, h := range handles {
l.outputBuf.Reset()
err := l.render(h, l.outputBuf)
if bufErr, ok := err.(sys.InsufficientBufferError); ok {
var bufErr sys.InsufficientBufferError
if ok := errors.As(err, &bufErr); ok {
detailf("%s Increasing render buffer size to %d", l.logPrefix,
bufErr.RequiredSize)
l.renderBuf = make([]byte, bufErr.RequiredSize)
Expand All @@ -290,7 +305,7 @@ func (l *winEventLog) Read() ([]Record, error) {
continue
}

r, _ := l.buildRecordFromXML(l.outputBuf.Bytes(), err)
r := l.buildRecordFromXML(l.outputBuf.Bytes(), err)
r.Offset = checkpoint.EventLogState{
Name: l.id,
RecordNumber: r.RecordID,
Expand All @@ -314,26 +329,26 @@ func (l *winEventLog) Close() error {

func (l *winEventLog) eventHandles(maxRead int) ([]win.EvtHandle, int, error) {
handles, err := win.EventHandles(l.subscription, maxRead)
switch err {
case nil:
switch {
case err == nil:
if l.maxRead > maxRead {
debugf("%s Recovered from RPC_S_INVALID_BOUND error (errno 1734) "+
"by decreasing batch_read_size to %v", l.logPrefix, maxRead)
}
return handles, maxRead, nil
case win.ERROR_NO_MORE_ITEMS:
case errors.Is(err, win.ERROR_NO_MORE_ITEMS):
detailf("%s No more events", l.logPrefix)
if l.config.NoMoreEvents == Stop {
return nil, maxRead, io.EOF
}
return nil, maxRead, nil
case win.RPC_S_INVALID_BOUND:
case errors.Is(err, win.RPC_S_INVALID_BOUND):
incrementMetric(readErrors, err)
if err := l.Close(); err != nil {
return nil, 0, errors.Wrap(err, "failed to recover from RPC_S_INVALID_BOUND")
return nil, 0, fmt.Errorf("failed to recover from RPC_S_INVALID_BOUND: %w", err)
}
if err := l.Open(l.lastRead); err != nil {
return nil, 0, errors.Wrap(err, "failed to recover from RPC_S_INVALID_BOUND")
return nil, 0, fmt.Errorf("failed to recover from RPC_S_INVALID_BOUND: %w", err)
}
return l.eventHandles(maxRead / 2)
default:
Expand All @@ -343,7 +358,7 @@ func (l *winEventLog) eventHandles(maxRead int) ([]win.EvtHandle, int, error) {
}
}

func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) (Record, error) {
func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) Record {
includeXML := l.config.IncludeXML
e, err := winevent.UnmarshalXML(x)
if err != nil {
Expand Down Expand Up @@ -388,7 +403,7 @@ func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) (Record,
r.XML = string(x)
}

return r, nil
return r
}

func newEventLogging(options *common.Config) (EventLog, error) {
Expand Down
40 changes: 28 additions & 12 deletions winlogbeat/eventlog/wineventlog_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
package eventlog

import (
"errors"
"fmt"
"io"
"os"
"path/filepath"

"github.com/pkg/errors"
"go.uber.org/multierr"
"golang.org/x/sys/windows"

Expand Down Expand Up @@ -99,31 +100,45 @@ func (l *winEventLogExp) openChannel(bookmark win.Bookmark) (win.EvtHandle, erro
if err != nil {
return win.NilHandle, err
}
defer windows.CloseHandle(signalEvent)
defer func() { _ = windows.CloseHandle(signalEvent) }()

var flags win.EvtSubscribeFlag
if bookmark > 0 {
flags = win.EvtSubscribeStartAfterBookmark
// Use EvtSubscribeStrict to detect when the bookmark is missing and be able to
// subscribe again from the beginning.
flags = win.EvtSubscribeStartAfterBookmark | win.EvtSubscribeStrict
} else {
flags = win.EvtSubscribeStartAtOldestRecord
}

l.log.Debugw("Using subscription query.", "winlog.query", l.query)
return win.Subscribe(
h, err := win.Subscribe(
0, // Session - nil for localhost
signalEvent,
"", // Channel - empty b/c channel is in the query
l.query, // Query - nil means all events
win.EvtHandle(bookmark), // Bookmark - for resuming from a specific event
flags)

switch {
case err == nil:
return h, nil
case errors.Is(err, win.ERROR_NOT_FOUND), errors.Is(err, win.ERROR_EVT_QUERY_RESULT_STALE),
errors.Is(err, win.ERROR_EVT_QUERY_RESULT_INVALID_POSITION):
// The bookmarked event was not found, we retry the subscription from the start.
incrementMetric(readErrors, err)
return win.Subscribe(0, signalEvent, "", l.query, 0, win.EvtSubscribeStartAtOldestRecord)
default:
return 0, err
}
}

func (l *winEventLogExp) openFile(state checkpoint.EventLogState, bookmark win.Bookmark) (win.EvtHandle, error) {
path := l.channelName

h, err := win.EvtQuery(0, path, "", win.EvtQueryFilePath|win.EvtQueryForwardDirection)
if err != nil {
return win.NilHandle, errors.Wrapf(err, "failed to get handle to event log file %v", path)
return win.NilHandle, fmt.Errorf("failed to get handle to event log file %v: %w", path, err)
}

if bookmark > 0 {
Expand All @@ -135,16 +150,16 @@ func (l *winEventLogExp) openFile(state checkpoint.EventLogState, bookmark win.B
if err = win.EvtSeek(h, 0, win.EvtHandle(bookmark), win.EvtSeekRelativeToBookmark|win.EvtSeekStrict); err == nil {
// Then we advance past the last read event to avoid sending that
// event again. This won't fail if we're at the end of the file.
err = errors.Wrap(
win.EvtSeek(h, 1, win.EvtHandle(bookmark), win.EvtSeekRelativeToBookmark),
"failed to seek past bookmarked position")
if seekErr := win.EvtSeek(h, 1, win.EvtHandle(bookmark), win.EvtSeekRelativeToBookmark); seekErr != nil {
err = fmt.Errorf("failed to seek past bookmarked position: %w", seekErr)
}
} else {
l.log.Warnf("s Failed to seek to bookmarked location in %v (error: %v). "+
"Recovering by reading the log from the beginning. (Did the file "+
"change since it was last read?)", path, err)
err = errors.Wrap(
win.EvtSeek(h, 0, 0, win.EvtSeekRelativeToFirst),
"failed to seek to beginning of log")
if seekErr := win.EvtSeek(h, 0, 0, win.EvtSeekRelativeToFirst); seekErr != nil {
err = fmt.Errorf("failed to seek to beginning of log: %w", seekErr)
}
}

if err != nil {
Expand Down Expand Up @@ -198,6 +213,7 @@ func (l *winEventLogExp) processHandle(h win.EvtHandle) (*Record, error) {
evt.RenderErr = append(evt.RenderErr, err.Error())
}

//nolint: godox // keep to have a record of feature disparity between non-experimental vs experimental
// TODO: Need to add XML when configured.

r := &Record{
Expand All @@ -224,7 +240,7 @@ func (l *winEventLogExp) processHandle(h win.EvtHandle) (*Record, error) {
func (l *winEventLogExp) createBookmarkFromEvent(evtHandle win.EvtHandle) (string, error) {
bookmark, err := win.NewBookmarkFromEvent(evtHandle)
if err != nil {
return "", errors.Wrap(err, "failed to create new bookmark from event handle")
return "", fmt.Errorf("failed to create new bookmark from event handle: %w", err)
}
defer bookmark.Close()

Expand Down
22 changes: 17 additions & 5 deletions winlogbeat/sys/wineventlog/zsyscall_windows.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading