diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 06412795584..be2baf0fbd8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -111,6 +111,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Update Go version to 1.22.6. {pull}40528[40528] - Aborts all active connections for Elasticsearch output. {pull}40572[40572] - Closes beat Publisher on beat stop and by the Agent manager. {pull}40572[40572] +- The journald input now restarts if there is an error/crash {issue}32782[32782] {pull}40558[40558] *Auditbeat* diff --git a/filebeat/input/journald/input.go b/filebeat/input/journald/input.go index 8798a8e72ea..9ce61042791 100644 --- a/filebeat/input/journald/input.go +++ b/filebeat/input/journald/input.go @@ -132,6 +132,7 @@ func (inp *journald) Test(src cursor.Source, ctx input.TestContext) error { "", inp.Since, src.Name(), + journalctl.Factory, ) if err != nil { return err @@ -161,6 +162,7 @@ func (inp *journald) Run( pos, inp.Since, src.Name(), + journalctl.Factory, ) if err != nil { return fmt.Errorf("could not start journal reader: %w", err) @@ -179,12 +181,17 @@ func (inp *journald) Run( for { entry, err := parser.Next() if err != nil { + switch { // The input has been cancelled, gracefully return - if errors.Is(err, journalctl.ErrCancelled) { + case errors.Is(err, journalctl.ErrCancelled): return nil + // Journalctl is restarting, do ignore the empty event + case errors.Is(err, journalctl.ErrRestarting): + continue + default: + logger.Errorf("could not read event: %s", err) + return err } - logger.Errorf("could not read event: %s", err) - return err } event := entry.ToEvent() diff --git a/filebeat/input/journald/pkg/journalctl/jctlmock_test.go b/filebeat/input/journald/pkg/journalctl/jctlmock_test.go new file mode 100644 index 00000000000..c9244a5fa43 --- /dev/null +++ b/filebeat/input/journald/pkg/journalctl/jctlmock_test.go @@ -0,0 +1,130 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package journalctl + +import ( + "sync" + + input "github.com/elastic/beats/v7/filebeat/input/v2" +) + +// Ensure, that JctlMock does implement Jctl. +// If this is not the case, regenerate this file with moq. +var _ Jctl = &JctlMock{} + +// JctlMock is a mock implementation of Jctl. +// +// func TestSomethingThatUsesJctl(t *testing.T) { +// +// // make and configure a mocked Jctl +// mockedJctl := &JctlMock{ +// KillFunc: func() error { +// panic("mock out the Kill method") +// }, +// NextFunc: func(canceler input.Canceler) ([]byte, error) { +// panic("mock out the Next method") +// }, +// } +// +// // use mockedJctl in code that requires Jctl +// // and then make assertions. +// +// } +type JctlMock struct { + // KillFunc mocks the Kill method. + KillFunc func() error + + // NextFunc mocks the Next method. + NextFunc func(canceler input.Canceler) ([]byte, error) + + // calls tracks calls to the methods. + calls struct { + // Kill holds details about calls to the Kill method. + Kill []struct { + } + // Next holds details about calls to the Next method. + Next []struct { + // Canceler is the canceler argument value. + Canceler input.Canceler + } + } + lockKill sync.RWMutex + lockNext sync.RWMutex +} + +// Kill calls KillFunc. +func (mock *JctlMock) Kill() error { + if mock.KillFunc == nil { + panic("JctlMock.KillFunc: method is nil but Jctl.Kill was just called") + } + callInfo := struct { + }{} + mock.lockKill.Lock() + mock.calls.Kill = append(mock.calls.Kill, callInfo) + mock.lockKill.Unlock() + return mock.KillFunc() +} + +// KillCalls gets all the calls that were made to Kill. +// Check the length with: +// +// len(mockedJctl.KillCalls()) +func (mock *JctlMock) KillCalls() []struct { +} { + var calls []struct { + } + mock.lockKill.RLock() + calls = mock.calls.Kill + mock.lockKill.RUnlock() + return calls +} + +// Next calls NextFunc. +func (mock *JctlMock) Next(canceler input.Canceler) ([]byte, error) { + if mock.NextFunc == nil { + panic("JctlMock.NextFunc: method is nil but Jctl.Next was just called") + } + callInfo := struct { + Canceler input.Canceler + }{ + Canceler: canceler, + } + mock.lockNext.Lock() + mock.calls.Next = append(mock.calls.Next, callInfo) + mock.lockNext.Unlock() + return mock.NextFunc(canceler) +} + +// NextCalls gets all the calls that were made to Next. +// Check the length with: +// +// len(mockedJctl.NextCalls()) +func (mock *JctlMock) NextCalls() []struct { + Canceler input.Canceler +} { + var calls []struct { + Canceler input.Canceler + } + mock.lockNext.RLock() + calls = mock.calls.Next + mock.lockNext.RUnlock() + return calls +} diff --git a/filebeat/input/journald/pkg/journalctl/journalctl.go b/filebeat/input/journald/pkg/journalctl/journalctl.go new file mode 100644 index 00000000000..54bcb208b82 --- /dev/null +++ b/filebeat/input/journald/pkg/journalctl/journalctl.go @@ -0,0 +1,147 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package journalctl + +import ( + "bufio" + "errors" + "fmt" + "io" + "os/exec" + "strings" + + input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/elastic-agent-libs/logp" +) + +type journalctl struct { + cmd *exec.Cmd + dataChan chan []byte + stdout io.ReadCloser + stderr io.ReadCloser + + logger *logp.Logger + canceler input.Canceler +} + +// Factory returns an instance of journalctl ready to use. +// The caller is responsible for calling Kill to ensure the +// journalctl process created is correctly terminated. +// +// The returned type is an interface to allow mocking for testing +func Factory(canceller input.Canceler, logger *logp.Logger, binary string, args ...string) (Jctl, error) { + cmd := exec.Command(binary, args...) + + jctl := journalctl{ + canceler: canceller, + cmd: cmd, + dataChan: make(chan []byte), + logger: logger, + } + + var err error + jctl.stdout, err = cmd.StdoutPipe() + if err != nil { + return &journalctl{}, fmt.Errorf("cannot get stdout pipe: %w", err) + } + jctl.stderr, err = cmd.StderrPipe() + if err != nil { + return &journalctl{}, fmt.Errorf("cannot get stderr pipe: %w", err) + } + + // This gorroutune reads the stderr from the journalctl process, if the + // process exits for any reason, then its stderr is closed, this goroutine + // gets an EOF error and exits + go func() { + defer jctl.logger.Debug("stderr reader goroutine done") + reader := bufio.NewReader(jctl.stderr) + for { + line, err := reader.ReadString('\n') + if err != nil { + if !errors.Is(err, io.EOF) { + logger.Errorf("cannot read from journalctl stderr: %s", err) + } + return + } + + logger.Errorf("Journalctl wrote to stderr: %s", line) + } + }() + + // This goroutine reads the stdout from the journalctl process and makes + // the data available via the `Next()` method. + // If the journalctl process exits for any reason, then its stdout is closed + // this goroutine gets an EOF error and exits. + go func() { + defer jctl.logger.Debug("stdout reader goroutine done") + defer close(jctl.dataChan) + reader := bufio.NewReader(jctl.stdout) + for { + data, err := reader.ReadBytes('\n') + if err != nil { + if !errors.Is(err, io.EOF) { + logger.Errorf("cannot read from journalctl stdout: %s", err) + } + return + } + + select { + case <-jctl.canceler.Done(): + return + case jctl.dataChan <- data: + } + } + }() + + logger.Infof("Journalctl command: journalctl %s", strings.Join(args, " ")) + + if err := cmd.Start(); err != nil { + return &journalctl{}, fmt.Errorf("cannot start journalctl: %w", err) + } + + logger.Infof("journalctl started with PID %d", cmd.Process.Pid) + + // Whenever the journalctl process exits, the `Wait` call returns, + // if there was an error it is logged and this goroutine exits. + go func() { + if err := cmd.Wait(); err != nil { + jctl.logger.Errorf("journalctl exited with an error, exit code %d ", cmd.ProcessState.ExitCode()) + } + }() + + return &jctl, nil +} + +// Kill Terminates the journalctl process using a SIGKILL. +func (j *journalctl) Kill() error { + j.logger.Debug("sending SIGKILL to journalctl") + err := j.cmd.Process.Kill() + return err +} + +func (j *journalctl) Next(cancel input.Canceler) ([]byte, error) { + select { + case <-cancel.Done(): + return []byte{}, ErrCancelled + case d, open := <-j.dataChan: + if !open { + return []byte{}, errors.New("no more data to read, journalctl might have exited unexpectedly") + } + return d, nil + } +} diff --git a/filebeat/input/journald/pkg/journalctl/reader.go b/filebeat/input/journald/pkg/journalctl/reader.go index 91e08c54f40..25b90d9a490 100644 --- a/filebeat/input/journald/pkg/journalctl/reader.go +++ b/filebeat/input/journald/pkg/journalctl/reader.go @@ -18,20 +18,15 @@ package journalctl import ( - "bufio" - "context" "encoding/json" "errors" "fmt" - "io" - "os/exec" "strconv" - "strings" - "sync" "time" "github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalfield" input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/common/backoff" "github.com/elastic/elastic-agent-libs/logp" ) @@ -45,6 +40,7 @@ const sinceTimeFormat = "2006-01-02 15:04:05.999999999" // ErrCancelled indicates the read was cancelled var ErrCancelled = errors.New("cancelled") +var ErrRestarting = errors.New("restarting journalctl") // JournalEntry holds all fields of a journal entry plus cursor and timestamps type JournalEntry struct { @@ -54,6 +50,18 @@ type JournalEntry struct { MonotonicTimestamp uint64 } +// JctlFactory is a function that returns an instance of journalctl ready to use. +// It exists to allow testing +type JctlFactory func(canceller input.Canceler, logger *logp.Logger, binary string, args ...string) (Jctl, error) + +// Jctl abstracts the call to journalctl, it exists only for testing purposes +// +//go:generate moq --fmt gofmt -out jctlmock_test.go . Jctl +type Jctl interface { + Next(input.Canceler) ([]byte, error) + Kill() error +} + // Reader reads entries from journald by calling `jouranlctl` // and reading its output. // @@ -66,35 +74,37 @@ type JournalEntry struct { // More details can be found in the PR introducing this feature and related // issues. PR: https://github.com/elastic/beats/pull/40061. type Reader struct { - cmd *exec.Cmd - dataChan chan []byte - errChan chan string + args []string + cursor string + logger *logp.Logger - stdout io.ReadCloser - stderr io.ReadCloser canceler input.Canceler - wg sync.WaitGroup + + jctl Jctl + jctlFactory JctlFactory + + backoff backoff.Backoff } -// handleSeekAndCursor adds the correct arguments for seek and cursor. +// handleSeekAndCursor returns the correct arguments for seek and cursor. // If there is a cursor, only the cursor is used, seek is ignored. // If there is no cursor, then seek is used -func handleSeekAndCursor(args []string, mode SeekMode, since time.Duration, cursor string) []string { +func handleSeekAndCursor(mode SeekMode, since time.Duration, cursor string) []string { if cursor != "" { - args = append(args, "--after-cursor", cursor) - return args + return []string{"--after-cursor", cursor} } switch mode { case SeekSince: - args = append(args, "--since", time.Now().Add(since).Format(sinceTimeFormat)) + return []string{"--since", time.Now().Add(since).Format(sinceTimeFormat)} case SeekTail: - args = append(args, "--since", "now") + return []string{"--since", "now"} case SeekHead: - args = append(args, "--no-tail") + return []string{"--no-tail"} + default: + // That should never happen + return []string{} } - - return args } // New instantiates and starts a reader for journald logs. @@ -130,15 +140,16 @@ func New( mode SeekMode, cursor string, since time.Duration, - file string) (*Reader, error) { + file string, + newJctl JctlFactory, +) (*Reader, error) { + logger = logger.Named("reader") args := []string{"--utc", "--output=json", "--follow"} if file != "" && file != localSystemJournalID { args = append(args, "--file", file) } - args = handleSeekAndCursor(args, mode, since, cursor) - for _, u := range units { args = append(args, "--unit", u) } @@ -155,80 +166,23 @@ func New( args = append(args, fmt.Sprintf("_TRANSPORT=%s", m)) } - logger.Infof("Journalctl command: journalctl %s", strings.Join(args, " ")) - cmd := exec.Command("journalctl", args...) + otherArgs := handleSeekAndCursor(mode, since, cursor) - stdout, err := cmd.StdoutPipe() - if err != nil { - return &Reader{}, fmt.Errorf("cannot get stdout pipe: %w", err) - } - stderr, err := cmd.StderrPipe() + jctl, err := newJctl(canceler, logger.Named("journalctl-runner"), "journalctl", append(args, otherArgs...)...) if err != nil { - return &Reader{}, fmt.Errorf("cannot get stderr pipe: %w", err) + return &Reader{}, err } r := Reader{ - cmd: cmd, - dataChan: make(chan []byte), - errChan: make(chan string), - logger: logger, - stdout: stdout, - stderr: stderr, - canceler: canceler, + args: args, + cursor: cursor, + jctl: jctl, + logger: logger, + canceler: canceler, + jctlFactory: newJctl, + backoff: backoff.NewExpBackoff(canceler.Done(), 100*time.Millisecond, 2*time.Second), } - // Goroutine to read errors from stderr - r.wg.Add(1) - go func() { - defer r.logger.Debug("stderr reader goroutine done") - defer close(r.errChan) - defer r.wg.Done() - reader := bufio.NewReader(r.stderr) - for { - line, err := reader.ReadString('\n') - if err != nil { - if errors.Is(err, io.EOF) { - return - } - logger.Errorf("cannot read from journalctl stderr: %s", err) - return - } - - r.errChan <- fmt.Sprintf("Journalctl wrote to stderr: %s", line) - } - }() - - // Goroutine to read events from stdout - r.wg.Add(1) - go func() { - defer r.logger.Debug("stdout reader goroutine done") - defer close(r.dataChan) - defer r.wg.Done() - reader := bufio.NewReader(r.stdout) - for { - data, err := reader.ReadBytes('\n') - if errors.Is(err, io.EOF) { - return - } - - select { - case <-r.canceler.Done(): - return - case r.dataChan <- data: - } - } - }() - - if err := cmd.Start(); err != nil { - return &Reader{}, fmt.Errorf("cannot start journalctl: %w", err) - } - - go func() { - if err := cmd.Wait(); err != nil { - r.logger.Errorf("journalctl exited with an error, exit code %d ", cmd.ProcessState.ExitCode()) - } - }() - return &r, nil } @@ -236,36 +190,12 @@ func New( // goroutines to return, the canceller passed to `New` should // be cancelled before `Close` is called func (r *Reader) Close() error { - if r.cmd == nil { - return nil - } - - // Try reading stderr until EOF. If there is too much data to read, - // timeout after 1 minute and proceed to kill the journalctl process. - readStderrTimeout, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - r.logger.Infof("shutting down journalctl, waiting up to: %s", time.Minute) -ReadErrForLoop: - for { - select { - case <-readStderrTimeout.Done(): - r.logger.Error("timedout while reading stderr from journalctl, the process will be killed") - break ReadErrForLoop - case stderrLine, isOpen := <-r.errChan: - r.logger.Error(stderrLine) - if !isOpen { - break ReadErrForLoop - } - } - } - if err := r.cmd.Process.Kill(); err != nil { - return fmt.Errorf("cannot stop journalctl: %w", err) + if err := r.jctl.Kill(); err != nil { + return fmt.Errorf("error stopping journalctl: %w", err) } - r.logger.Debug("waiting for all goroutines to finish") - r.wg.Wait() return nil } @@ -275,50 +205,87 @@ ReadErrForLoop: // If cancel is cancelled, Next returns a zero value JournalEntry // and ErrCancelled. func (r *Reader) Next(cancel input.Canceler) (JournalEntry, error) { + d, err := r.jctl.Next(cancel) + + // Check if the input has been cancelled select { case <-cancel.Done(): - return JournalEntry{}, ErrCancelled - case d, open := <-r.dataChan: - if !open { - return JournalEntry{}, errors.New("no more data to read, journalctl might have exited unexpectedly") - } - fields := map[string]any{} - if err := json.Unmarshal(d, &fields); err != nil { - r.logger.Error("journal event cannot be parsed as map[string]any, look at the events log file for the raw journal event") - // Log raw data to events log file - msg := fmt.Sprintf("data cannot be parsed as map[string]any JSON: '%s'", string(d)) - r.logger.Errorw(msg, logp.TypeKey, logp.EventType, "error.message", err.Error()) - return JournalEntry{}, fmt.Errorf("cannot decode Journald JSON: %w", err) - } - - ts, isString := fields["__REALTIME_TIMESTAMP"].(string) - if !isString { - return JournalEntry{}, fmt.Errorf("'__REALTIME_TIMESTAMP': '%[1]v', type %[1]T is not a string", fields["__REALTIME_TIMESTAMP"]) - } - unixTS, err := strconv.ParseUint(ts, 10, 64) + // Input has been cancelled, ignore the message? + return JournalEntry{}, err + default: + // Two options: + // - No error, go parse the message + // - Error, journalctl is not running any more, restart it if err != nil { - return JournalEntry{}, fmt.Errorf("could not convert '__REALTIME_TIMESTAMP' to uint64: %w", err) - } + r.logger.Warnf("reader error: '%s', restarting...", err) + // Copy r.args and if needed, add the cursor flag + args := append([]string{}, r.args...) + if r.cursor != "" { + args = append(args, "--after-cursor", r.cursor) + } - monotomicTs, isString := fields["__MONOTONIC_TIMESTAMP"].(string) - if !isString { - return JournalEntry{}, fmt.Errorf("'__MONOTONIC_TIMESTAMP': '%[1]v', type %[1]T is not a string", fields["__MONOTONIC_TIMESTAMP"]) - } - monotonicTSInt, err := strconv.ParseUint(monotomicTs, 10, 64) - if err != nil { - return JournalEntry{}, fmt.Errorf("could not convert '__MONOTONIC_TIMESTAMP' to uint64: %w", err) - } + // If the last restart (if any) was more than 5s ago, + // recreate the backoff and do not wait. + // We recreate the backoff so r.backoff.Last().IsZero() + // will return true next time it's called making us to + // wait in case jouranlctl crashes in less than 5s. + if !r.backoff.Last().IsZero() && time.Now().Sub(r.backoff.Last()) > 5*time.Second { + r.backoff = backoff.NewExpBackoff(cancel.Done(), 100*time.Millisecond, 2*time.Second) + } else { + r.backoff.Wait() + } + + jctl, err := r.jctlFactory(r.canceler, r.logger.Named("journalctl-runner"), "journalctl", args...) + if err != nil { + // If we cannot restart journalct, there is nothing we can do. + return JournalEntry{}, fmt.Errorf("cannot restart journalctl: %w", err) + } + r.jctl = jctl - cursor, isString := fields["__CURSOR"].(string) - if !isString { - return JournalEntry{}, fmt.Errorf("'_CURSOR': '%[1]v', type %[1]T is not a string", fields["_CURSOR"]) + // Return an empty message and wait for the input to call us again + return JournalEntry{}, ErrRestarting } + } + + fields := map[string]any{} + if err := json.Unmarshal(d, &fields); err != nil { + r.logger.Error("journal event cannot be parsed as map[string]any, look at the events log file for the raw journal event") + // Log raw data to events log file + msg := fmt.Sprintf("data cannot be parsed as map[string]any JSON: '%s'", string(d)) + r.logger.Errorw(msg, logp.TypeKey, logp.EventType, "error.message", err.Error()) + return JournalEntry{}, fmt.Errorf("cannot decode Journald JSON: %w", err) + } + + ts, isString := fields["__REALTIME_TIMESTAMP"].(string) + if !isString { + return JournalEntry{}, fmt.Errorf("'__REALTIME_TIMESTAMP': '%[1]v', type %[1]T is not a string", fields["__REALTIME_TIMESTAMP"]) + } + unixTS, err := strconv.ParseUint(ts, 10, 64) + if err != nil { + return JournalEntry{}, fmt.Errorf("could not convert '__REALTIME_TIMESTAMP' to uint64: %w", err) + } - return JournalEntry{ - Fields: fields, - RealtimeTimestamp: unixTS, - Cursor: cursor, - MonotonicTimestamp: monotonicTSInt, - }, nil + monotomicTs, isString := fields["__MONOTONIC_TIMESTAMP"].(string) + if !isString { + return JournalEntry{}, fmt.Errorf("'__MONOTONIC_TIMESTAMP': '%[1]v', type %[1]T is not a string", fields["__MONOTONIC_TIMESTAMP"]) + } + monotonicTSInt, err := strconv.ParseUint(monotomicTs, 10, 64) + if err != nil { + return JournalEntry{}, fmt.Errorf("could not convert '__MONOTONIC_TIMESTAMP' to uint64: %w", err) } + + cursor, isString := fields["__CURSOR"].(string) + if !isString { + return JournalEntry{}, fmt.Errorf("'_CURSOR': '%[1]v', type %[1]T is not a string", fields["_CURSOR"]) + } + + // Update our cursor so we can restart journalctl if needed + r.cursor = cursor + + return JournalEntry{ + Fields: fields, + RealtimeTimestamp: unixTS, + Cursor: cursor, + MonotonicTimestamp: monotonicTSInt, + }, nil } diff --git a/filebeat/input/journald/pkg/journalctl/reader_test.go b/filebeat/input/journald/pkg/journalctl/reader_test.go index f135cdd1cf8..2cd29e83a35 100644 --- a/filebeat/input/journald/pkg/journalctl/reader_test.go +++ b/filebeat/input/journald/pkg/journalctl/reader_test.go @@ -18,14 +18,16 @@ package journalctl import ( - "bytes" "context" _ "embed" "encoding/json" + "errors" "fmt" - "io" + "sync/atomic" "testing" + "github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalfield" + input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/elastic-agent-libs/logp" ) @@ -35,7 +37,7 @@ var coredumpJSON []byte // TestEventWithNonStringData ensures the Reader can read data that is not a // string. There is at least one real example of that: coredumps. // This test uses a real example captured from journalctl -o json. -// + // If needed more test cases can be added in the future func TestEventWithNonStringData(t *testing.T) { testCases := []json.RawMessage{} @@ -43,22 +45,18 @@ func TestEventWithNonStringData(t *testing.T) { t.Fatalf("could not unmarshal the contents from 'testdata/message-byte-array.json' into map[string]any: %s", err) } - for idx, event := range testCases { + for idx, rawEvent := range testCases { t.Run(fmt.Sprintf("test %d", idx), func(t *testing.T) { - stdout := io.NopCloser(&bytes.Buffer{}) - stderr := io.NopCloser(&bytes.Buffer{}) + mock := JctlMock{ + NextFunc: func(canceler input.Canceler) ([]byte, error) { + return rawEvent, nil + }, + } r := Reader{ - logger: logp.L(), - dataChan: make(chan []byte), - errChan: make(chan string), - stdout: stdout, - stderr: stderr, + logger: logp.L(), + jctl: &mock, } - go func() { - r.dataChan <- []byte(event) - }() - _, err := r.Next(context.Background()) if err != nil { t.Fatalf("did not expect an error: %s", err) @@ -66,3 +64,66 @@ func TestEventWithNonStringData(t *testing.T) { }) } } + +//go:embed testdata/sample-journal-event.json +var jdEvent []byte + +func TestRestartsJournalctlOnError(t *testing.T) { + ctx := context.Background() + + mock := JctlMock{ + NextFunc: func(canceler input.Canceler) ([]byte, error) { + return jdEvent, errors.New("journalctl exited with code 42") + }, + } + + factoryCalls := atomic.Uint32{} + factory := func(canceller input.Canceler, logger *logp.Logger, binary string, args ...string) (Jctl, error) { + factoryCalls.Add(1) + // Add a log to make debugging easier and better mimic the behaviour of the real factory/journalctl + logger.Debugf("starting new mock journalclt ID: %d", factoryCalls.Load()) + // If no calls to next have been made, return a mock + // that will fail every time Next is called + if len(mock.NextCalls()) == 0 { + return &mock, nil + } + + // If calls have been made, change the Next function to always succeed + // and return it + mock.NextFunc = func(canceler input.Canceler) ([]byte, error) { + return jdEvent, nil + } + + return &mock, nil + } + + reader, err := New(logp.L(), ctx, nil, nil, nil, journalfield.IncludeMatches{}, SeekHead, "", 0, "", factory) + if err != nil { + t.Fatalf("cannot instantiate journalctl reader: %s", err) + } + + isEntryEmpty := func(entry JournalEntry) bool { + return len(entry.Fields) == 0 && entry.Cursor == "" && entry.MonotonicTimestamp == 0 && entry.RealtimeTimestamp == 0 + } + + // In the first call the mock will return an error, simulating journalctl crashing + // so we should get ErrRestarting + entry, err := reader.Next(ctx) + if !errors.Is(err, ErrRestarting) { + t.Fatalf("expecting ErrRestarting when calling Next and journalctl crashed: %s", err) + } + if !isEntryEmpty(entry) { + t.Fatal("the first call to Next must return an empty JournalEntry because 'journalctl has crashed'") + } + + for i := 0; i < 2; i++ { + entry, err := reader.Next(ctx) + if err != nil { + t.Fatalf("did not expect an error when calling Next 'after journalctl restart': %s", err) + } + + if isEntryEmpty(entry) { + t.Fatal("the second and third calls to Next must succeed") + } + } +} diff --git a/filebeat/input/journald/pkg/journalctl/testdata/sample-journal-event.json b/filebeat/input/journald/pkg/journalctl/testdata/sample-journal-event.json new file mode 100644 index 00000000000..64250a5cca1 --- /dev/null +++ b/filebeat/input/journald/pkg/journalctl/testdata/sample-journal-event.json @@ -0,0 +1,32 @@ +{ + "MESSAGE" : "Count: 0000000001", + "PRIORITY" : "6", + "SYSLOG_IDENTIFIER" : "TestRestartsJournalctlOnError", + "_AUDIT_LOGINUID" : "1000", + "_AUDIT_SESSION" : "2", + "_BOOT_ID" : "567980bb85ae41da8518f409570b0cb9", + "_CAP_EFFECTIVE" : "0", + "_CMDLINE" : "/bin/cat", + "_COMM" : "cat", + "_EXE" : "/usr/bin/cat", + "_GID" : "1000", + "_HOSTNAME" : "millennium-falcon", + "_MACHINE_ID" : "851f339d77174301b29e417ecb2ec6a8", + "_PID" : "235728", + "_RUNTIME_SCOPE" : "system", + "_STREAM_ID" : "92765bf7ba214e23a2ee986d76578bef", + "_SYSTEMD_CGROUP" : "/user.slice/user-1000.slice/session-2.scope", + "_SYSTEMD_INVOCATION_ID" : "89e7dffc4a0140f086a3171235fae8d9", + "_SYSTEMD_OWNER_UID" : "1000", + "_SYSTEMD_SESSION" : "2", + "_SYSTEMD_SLICE" : "user-1000.slice", + "_SYSTEMD_UNIT" : "session-2.scope", + "_SYSTEMD_USER_SLICE" : "-.slice", + "_TRANSPORT" : "stdout", + "_UID" : "1000", + "__CURSOR" : "s=e82795fad4ce42b79fb3da0866d91f7e;i=4eb1b1;b=567980bb85ae41da8518f409570b0cb9;m=2bd4e2166;t=6200adaf0a66a;x=d9b1ac66921eaac9", + "__MONOTONIC_TIMESTAMP" : "11765948774", + "__REALTIME_TIMESTAMP" : "1724080855230058", + "__SEQNUM" : "5157297", + "__SEQNUM_ID" : "e82795fad4ce42b79fb3da0866d91f7e" +} diff --git a/filebeat/input/mqtt/client_mocked.go b/filebeat/input/mqtt/client_mocked.go index 79fd0be50cf..fcab2b80053 100644 --- a/filebeat/input/mqtt/client_mocked.go +++ b/filebeat/input/mqtt/client_mocked.go @@ -72,6 +72,7 @@ type mockedBackoff struct { waits []bool waitIndex int + last time.Time } var _ backoff.Backoff = new(mockedBackoff) @@ -79,6 +80,7 @@ var _ backoff.Backoff = new(mockedBackoff) func (m *mockedBackoff) Wait() bool { wait := m.waits[m.waitIndex] m.waitIndex++ + m.last = time.Now() return wait } @@ -86,6 +88,10 @@ func (m *mockedBackoff) Reset() { m.resetCount++ } +func (m *mockedBackoff) Last() time.Time { + return m.last +} + type mockedToken struct { timeout bool } diff --git a/filebeat/tests/integration/journald_test.go b/filebeat/tests/integration/journald_test.go new file mode 100644 index 00000000000..447e49b82bb --- /dev/null +++ b/filebeat/tests/integration/journald_test.go @@ -0,0 +1,120 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build integration && linux + +package integration + +import ( + "context" + _ "embed" + "encoding/json" + "fmt" + "os/exec" + "path/filepath" + "syscall" + "testing" + "time" + + "github.com/gofrs/uuid/v5" + + "github.com/elastic/beats/v7/libbeat/tests/integration" +) + +func generateJournaldLogs(t *testing.T, ctx context.Context, syslogID string, max int) { + cmd := exec.Command("systemd-cat", "-t", syslogID) + w, err := cmd.StdinPipe() + if err != nil { + t.Errorf("cannot get stdin pipe from systemd-cat: %s", err) + } + if err := cmd.Start(); err != nil { + t.Errorf("cannot start 'systemd-cat': %s", err) + } + defer func() { + // Make sure systemd-cat terminates successfully so the messages + // are correctly written to the journal + if err := cmd.Wait(); err != nil { + t.Errorf("error waiting for system-cat to finish: %s", err) + } + + if !cmd.ProcessState.Success() { + t.Errorf("systemd-cat exited with %d", cmd.ProcessState.ExitCode()) + } + }() + + for count := 1; count <= max; count++ { + written, err := fmt.Fprintf(w, "Count: %03d\n", count) + if err != nil { + t.Errorf("could not write message to journald: %s", err) + } + if written != 11 { + t.Errorf("could not write the whole message, expecing to write 11 bytes, but wrote %d", written) + } + time.Sleep(time.Millisecond) + } + + if err := w.Close(); err != nil { + t.Errorf("could not close stdin from systemd-cat, messages are likely not written to the journal: %s", err) + } +} + +//go:embed testdata/filebeat_journald.yml +var journaldInputCfg string + +func TestJournaldInput(t *testing.T) { + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + // render configuration + syslogID := fmt.Sprintf("%s-%s", t.Name(), uuid.Must(uuid.NewV4()).String()) + yamlCfg := fmt.Sprintf(journaldInputCfg, syslogID, filebeat.TempDir()) + + go generateJournaldLogs(t, context.Background(), syslogID, 3) + + filebeat.WriteConfigFile(yamlCfg) + filebeat.Start() + filebeat.WaitForLogs("journalctl started with PID", 10*time.Second, "journalctl did not start") + + pidLine := filebeat.GetLogLine("journalctl started with PID") + logEntry := struct{ Message string }{} + if err := json.Unmarshal([]byte(pidLine), &logEntry); err != nil { + t.Errorf("could not parse PID log entry as JSON: %s", err) + } + + pid := 0 + fmt.Sscanf(logEntry.Message, "journalctl started with PID %d", &pid) + + filebeat.WaitForLogs("Count: 003", 5*time.Second, "did not find the third event in published events") + + // Kill journalctl + if err := syscall.Kill(pid, syscall.SIGKILL); err != nil { + t.Fatalf("coluld not kill journalctl with PID %d: %s", pid, err) + } + + go generateJournaldLogs(t, context.Background(), syslogID, 5) + filebeat.WaitForLogs("journalctl started with PID", 10*time.Second, "journalctl did not start") + filebeat.WaitForLogs("Count: 005", time.Second, "expected log message not found in published events SECOND") + + eventsPublished := filebeat.CountFileLines(filepath.Join(filebeat.TempDir(), "output-*.ndjson")) + + if eventsPublished != 8 { + t.Fatalf("expecting 8 published events, got %d instead'", eventsPublished) + } +} diff --git a/filebeat/tests/integration/testdata/filebeat_journald.yml b/filebeat/tests/integration/testdata/filebeat_journald.yml new file mode 100644 index 00000000000..824c8cac141 --- /dev/null +++ b/filebeat/tests/integration/testdata/filebeat_journald.yml @@ -0,0 +1,21 @@ +filebeat.inputs: + - type: journald + id: filestream-input-id + enabled: true + syslog_identifiers: + - "%s" + +path.home: %s + +queue.mem: + flush.timeout: 0 + +output: + file: + path: ${path.home} + filename: "output" + +logging: + level: debug + selectors: + - "*" diff --git a/filebeat/tests/mockjournalctl/main.go b/filebeat/tests/mockjournalctl/main.go new file mode 100644 index 00000000000..484c6f44790 --- /dev/null +++ b/filebeat/tests/mockjournalctl/main.go @@ -0,0 +1,99 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "encoding/json" + "fmt" + "math/rand/v2" + "os" + "time" +) + +// This simple mock for journalclt that can be used to test error conditions. +// If a file called 'exit' exists in the same folder as the Filebeat binary +// then this mock will exit immediately, otherwise it will generate errors +// randomly and eventually exit. +// +// The easiest way to use this mock is to compile it as 'journalctl' and +// manipulate the $PATH environment variable from the Filebeat process you're +// testing. +func main() { + if _, err := os.Stat("exit"); err == nil { + os.Exit(42) + } + + fatalChance := 10 + stdoutTicker := time.NewTicker(time.Second) + stderrTicker := time.NewTicker(time.Second) + fatalTicker := time.NewTicker(time.Second) + + jsonEncoder := json.NewEncoder(os.Stdout) + count := uint64(0) + for { + count++ + select { + case t := <-stdoutTicker.C: + mockData["MESSAGE"] = fmt.Sprintf("Count: %010d", count) + mockData["__CURSOR"] = fmt.Sprintf("cursor-%010d-now-%s", count, time.Now().Format(time.RFC3339Nano)) + mockData["__REALTIME_TIMESTAMP"] = fmt.Sprintf("%d", t.UnixMicro()) + jsonEncoder.Encode(mockData) //nolint:errcheck // it will never fail and it's a mock for testing. + case t := <-stderrTicker.C: + fmt.Fprintf(os.Stderr, "a random error at %s, count: %010d\n", t.Format(time.RFC3339), count) + case t := <-fatalTicker.C: + chance := rand.IntN(100) + if chance < fatalChance { + fmt.Fprintf(os.Stderr, "fatal error, exiting at %s\n", t.Format(time.RFC3339)) + os.Exit(rand.IntN(125)) + } + } + } +} + +var mockData = map[string]string{ + "MESSAGE": "Count: 0000000001", + "PRIORITY": "6", + "SYSLOG_IDENTIFIER": "TestRestartsJournalctlOnError", + "_AUDIT_LOGINUID": "1000", + "_AUDIT_SESSION": "2", + "_BOOT_ID": "567980bb85ae41da8518f409570b0cb9", + "_CAP_EFFECTIVE": "0", + "_CMDLINE": "/bin/cat", + "_COMM": "cat", + "_EXE": "/usr/bin/cat", + "_GID": "1000", + "_HOSTNAME": "millennium-falcon", + "_MACHINE_ID": "851f339d77174301b29e417ecb2ec6a8", + "_PID": "235728", + "_RUNTIME_SCOPE": "system", + "_STREAM_ID": "92765bf7ba214e23a2ee986d76578bef", + "_SYSTEMD_CGROUP": "/user.slice/user-1000.slice/session-2.scope", + "_SYSTEMD_INVOCATION_ID": "89e7dffc4a0140f086a3171235fae8d9", + "_SYSTEMD_OWNER_UID": "1000", + "_SYSTEMD_SESSION": "2", + "_SYSTEMD_SLICE": "user-1000.slice", + "_SYSTEMD_UNIT": "session-2.scope", + "_SYSTEMD_USER_SLICE": "-.slice", + "_TRANSPORT": "stdout", + "_UID": "1000", + "__CURSOR": "s=e82795fad4ce42b79fb3da0866d91f7e;i=4eb1b1;b=567980bb85ae41da8518f409570b0cb9;m=2bd4e2166;t=6200adaf0a66a;x=d9b1ac66921eaac9", + "__MONOTONIC_TIMESTAMP": "11765948774", + "__REALTIME_TIMESTAMP": "1724080855230058", + "__SEQNUM": "5157297", + "__SEQNUM_ID": "e82795fad4ce42b79fb3da0866d91f7e", +} diff --git a/libbeat/common/backoff/backoff.go b/libbeat/common/backoff/backoff.go index 5439e38cfbb..6128d1d713f 100644 --- a/libbeat/common/backoff/backoff.go +++ b/libbeat/common/backoff/backoff.go @@ -17,6 +17,8 @@ package backoff +import "time" + // Backoff defines the interface for backoff strategies. type Backoff interface { // Wait blocks for a duration of time governed by the backoff strategy. @@ -24,6 +26,9 @@ type Backoff interface { // Reset resets the backoff duration to an initial value governed by the backoff strategy. Reset() + + Last() time.Time + // Last returns the time when the last call to Wait returned } // WaitOnError is a convenience method, if an error is received it will block, if not errors is diff --git a/libbeat/common/backoff/equal_jitter.go b/libbeat/common/backoff/equal_jitter.go index d5b5c7d250c..77a0f0642c6 100644 --- a/libbeat/common/backoff/equal_jitter.go +++ b/libbeat/common/backoff/equal_jitter.go @@ -71,3 +71,8 @@ func (b *EqualJitterBackoff) Wait() bool { return true } } + +// Last returns the time when the last call to Wait returned +func (b *EqualJitterBackoff) Last() time.Time { + return b.last +} diff --git a/libbeat/common/backoff/exponential.go b/libbeat/common/backoff/exponential.go index c7211480cd1..ab4236b8185 100644 --- a/libbeat/common/backoff/exponential.go +++ b/libbeat/common/backoff/exponential.go @@ -65,3 +65,8 @@ func (b *ExpBackoff) Wait() bool { return true } } + +// Last returns the time when the last call to Wait returned +func (b *ExpBackoff) Last() time.Time { + return b.last +} diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 1daf8948bc1..9b8002f1176 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -21,6 +21,7 @@ package integration import ( "bufio" + "bytes" "context" "encoding/json" "errors" @@ -252,6 +253,14 @@ func (b *BeatProc) startBeat() { require.NoError(b.t, err, "error starting beat process") b.Process = cmd.Process + + b.t.Cleanup(func() { + // If the test failed, print the whole cmd line to help debugging + if b.t.Failed() { + args := strings.Join(cmd.Args, " ") + b.t.Log("CMD line to execute Beat:", cmd.Path, args) + } + }) } // waitBeatToExit blocks until the Beat exits. @@ -383,7 +392,7 @@ func (b *BeatProc) LogContains(s string) bool { defer logFile.Close() var found bool - found, b.logFileOffset = b.searchStrInLogs(logFile, s, b.logFileOffset) + found, b.logFileOffset, _ = b.searchStrInLogs(logFile, s, b.logFileOffset) if found { return found } @@ -393,16 +402,39 @@ func (b *BeatProc) LogContains(s string) bool { return false } defer eventLogFile.Close() - found, b.eventLogFileOffset = b.searchStrInLogs(eventLogFile, s, b.eventLogFileOffset) + found, b.eventLogFileOffset, _ = b.searchStrInLogs(eventLogFile, s, b.eventLogFileOffset) return found } +// GetLogLine search for the string s starting at the beginning +// of the logs, if it is found the whole log line is returned, otherwise +// an empty string is returned. GetLogLine does not keep track of +// any offset +func (b *BeatProc) GetLogLine(s string) string { + logFile := b.openLogFile() + defer logFile.Close() + + found, _, line := b.searchStrInLogs(logFile, s, 0) + if found { + return line + } + + eventLogFile := b.openEventLogFile() + if eventLogFile == nil { + return "" + } + defer eventLogFile.Close() + _, _, line = b.searchStrInLogs(eventLogFile, s, 0) + + return line +} + // searchStrInLogs search for s as a substring of any line in logFile starting // from offset. // // It will close logFile and return the current offset. -func (b *BeatProc) searchStrInLogs(logFile *os.File, s string, offset int64) (bool, int64) { +func (b *BeatProc) searchStrInLogs(logFile *os.File, s string, offset int64) (bool, int64, string) { t := b.t _, err := logFile.Seek(offset, io.SeekStart) @@ -432,11 +464,11 @@ func (b *BeatProc) searchStrInLogs(logFile *os.File, s string, offset int64) (bo } if strings.Contains(line, s) { - return true, offset + return true, offset, line } } - return false, offset + return false, offset, "" } // WaitForLogs waits for the specified string s to be present in the logs within @@ -891,3 +923,14 @@ func GenerateLogFile(t *testing.T, path string, count int, append bool) { } } } + +func (b *BeatProc) CountFileLines(glob string) int { + file := b.openGlobFile(glob, true) + defer file.Close() + data, err := io.ReadAll(file) + if err != nil { + b.t.Fatalf("could not read file '%s': %s", file.Name(), err) + } + + return bytes.Count(data, []byte{'\n'}) +}