From 5f1e184e15cea43c72f911f012b087756e920e41 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 23 May 2024 16:22:02 -0400 Subject: [PATCH] Refactoring and small code improvements --- filebeat/cmd/root.go | 9 +++++- filebeat/include/list.go | 1 + filebeat/input/journald/input.go | 43 ++++++++++++++++----------- filebeat/input/journald/input_test.go | 18 +++++++++-- x-pack/filebeat/magefile.go | 6 ++-- 5 files changed, 54 insertions(+), 23 deletions(-) diff --git a/filebeat/cmd/root.go b/filebeat/cmd/root.go index 81d87807fcf1..98c19c1d6c47 100644 --- a/filebeat/cmd/root.go +++ b/filebeat/cmd/root.go @@ -63,7 +63,14 @@ func Filebeat(inputs beater.PluginFactory, settings instance.Settings) *cmd.Beat command.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("M")) command.TestCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules")) command.SetupCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules")) - command.Flags().AddGoFlag(flag.CommandLine.Lookup("ignore-journald-version")) + + // main_test.go calls this function before the Journald input is initialised + // to avoid panics, we check whether the flag is defined before calling + // AddGoFlag + if ignoreSystemdFlag := flag.CommandLine.Lookup("ignore-journald-version"); ignoreSystemdFlag != nil { + command.Flags().AddGoFlag(ignoreSystemdFlag) + } + command.AddCommand(cmd.GenModulesCmd(Name, "", buildModulesManager)) command.AddCommand(genGenerateCmd()) return command diff --git a/filebeat/include/list.go b/filebeat/include/list.go index d0c0ea511c4e..6e74633e1ee4 100644 --- a/filebeat/include/list.go +++ b/filebeat/include/list.go @@ -28,6 +28,7 @@ import ( // Import packages that perform 'func init()'. _ "github.com/elastic/beats/v7/filebeat/input" _ "github.com/elastic/beats/v7/filebeat/input/container" + _ "github.com/elastic/beats/v7/filebeat/input/journald" _ "github.com/elastic/beats/v7/filebeat/input/log" _ "github.com/elastic/beats/v7/filebeat/input/mqtt" _ "github.com/elastic/beats/v7/filebeat/input/redis" diff --git a/filebeat/input/journald/input.go b/filebeat/input/journald/input.go index c633154a976f..161133a85e26 100644 --- a/filebeat/input/journald/input.go +++ b/filebeat/input/journald/input.go @@ -73,16 +73,17 @@ type checkpoint struct { MonotonicTimestamp uint64 } -// errCannotConnectToDBus is returned when the connection to D-Bus -// cannot be established. -var errCannotConnectToDBus = errors.New("cannot connect to D-Bus") - // LocalSystemJournalID is the ID of the local system journal. const localSystemJournalID = "LOCAL_SYSTEM_JOURNAL" const pluginName = "journald" +// ErrSystemdVersionNotSupported is returned by the plugin manager when the +// Systemd version is not supported. var ErrSystemdVersionNotSupported = errors.New("systemd version must be >= 255") + +// ErrCannotGetSystemdVersion is returned by the plugin manager when it is +// not possible to get the Systemd version via D-Bus. var ErrCannotGetSystemdVersion = errors.New("cannot get systemd version") // Plugin creates a new journald input plugin for creating a stateful input. @@ -359,26 +360,34 @@ func (r *readerAdapter) Next() (reader.Message, error) { // - 252.16-1.amzn2023.0.2 // // The function will parse and return the integer before the full stop. -func parseSystemdVersion(output string) (int, error) { - parts := strings.Split(output, ".") - if len(parts) < 2 { - return 0, errors.New("unexpected format for version.") - } - - version, err := strconv.Atoi(parts[0]) - if err != nil { - return 0, fmt.Errorf("cannot parse Systemd version: %s", err) +func parseSystemdVersion(ver string) (int, error) { + // First try, it's just the version number + version, err := strconv.Atoi(ver) + if err == nil { + return version, nil + } + + separators := []string{" ", "."} + // Second try, it's separated by '.' like: 255.6-1-arch + for _, sep := range separators { + parts := strings.Split(ver, sep) + if len(parts) >= 2 { + version, err := strconv.Atoi(parts[0]) + if err == nil { + return version, nil + } + } } - return version, err + return 0, fmt.Errorf("unknown format for Systemd version: '%s'", ver) } // getSystemdVersionViaDBus gets the Systemd version from D-Bus // // We get the version by reading the property // `org.freedesktop.systemd1.Manager.Version`. Even though this property is -// is documented as not being part of the official API and having an unstable -// scheme, on our tests it proved to be stable enough. +// documented as not being part of the official API and having an unstable +// scheme, on our tests it proved to be stable enough for this use. // // The Systemd D-Bus documentation states: // @@ -391,7 +400,7 @@ func parseSystemdVersion(output string) (int, error) { func getSystemdVersionViaDBus() (string, error) { conn, err := dbus.ConnectSessionBus() if err != nil { - return "", fmt.Errorf("%w: %w", errCannotConnectToDBus, err) + return "", fmt.Errorf("cannot connect to D-Bus: %w", err) } defer conn.Close() diff --git a/filebeat/input/journald/input_test.go b/filebeat/input/journald/input_test.go index fc4c0796a820..8b4af9dd2003 100644 --- a/filebeat/input/journald/input_test.go +++ b/filebeat/input/journald/input_test.go @@ -23,7 +23,6 @@ import ( "context" "flag" "fmt" - "os" "path" "testing" @@ -31,9 +30,14 @@ import ( ) func TestMain(m *testing.M) { + // We use TestMain because all of our current (May 2024) supported Linux + // distributions ship with a version of Systemd that will cause Filebeat + // input to crash when the journal reached the maximum number of files and + // a new rotation happens. To allow the Journald input to be instantiated + // we need to set a CLI flag and that needs to be done before all tests run. flag.Parse() noVersionCheck = true - os.Exit(m.Run()) + m.Run() } func TestInputFieldsTranslation(t *testing.T) { @@ -93,7 +97,7 @@ func TestInputFieldsTranslation(t *testing.T) { } } -func TestParseJournaldVersion(t *testing.T) { +func TestParseSystemdVersion(t *testing.T) { foo := map[string]struct { data string expected int @@ -110,6 +114,14 @@ func TestParseJournaldVersion(t *testing.T) { expected: 249, data: `249.11-0ubuntu3.12`, }, + "Debain 10": { + expected: 241, + data: "241", + }, + "Red Hat Enterprise Linux 8": { + expected: 239, + data: "239 (239-78.el8)", + }, } for name, tc := range foo { diff --git a/x-pack/filebeat/magefile.go b/x-pack/filebeat/magefile.go index e5b809f3abfa..93b6ce41a754 100644 --- a/x-pack/filebeat/magefile.go +++ b/x-pack/filebeat/magefile.go @@ -16,6 +16,7 @@ import ( devtools "github.com/elastic/beats/v7/dev-tools/mage" "github.com/elastic/beats/v7/dev-tools/mage/target/build" + "github.com/elastic/beats/v7/dev-tools/mage/target/unittest" filebeat "github.com/elastic/beats/v7/filebeat/scripts/mage" //mage:import @@ -33,6 +34,7 @@ import ( func init() { common.RegisterCheckDeps(Update) test.RegisterDeps(IntegTest) + unittest.RegisterGoTestDeps(TestJournaldInput) devtools.BeatDescription = "Filebeat sends log files to Logstash or directly to Elasticsearch." devtools.BeatLicense = "Elastic License" @@ -191,9 +193,9 @@ func PythonIntegTest(ctx context.Context) error { // TestJournald executes the Journald input tests // Use TEST_COVERAGE=true to enable code coverage profiling. // Use RACE_DETECTOR=true to enable the race detector. -func TestJournald(ctx context.Context) error { +func TestJournaldInput(ctx context.Context) error { utArgs := devtools.DefaultGoTestUnitArgs() - utArgs.Packages = []string{"./input/journald"} + utArgs.Packages = []string{"../../filebeat/input/journald"} if devtools.Platform.GOOS == "linux" { utArgs.ExtraFlags = append(utArgs.ExtraFlags, "-tags=withjournald") }