diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a45a61e3e41..a632f9a5598 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -289,6 +289,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Make HTTP Endpoint input GA. {issue}38979[38979] {pull}39410[39410] - Update CEL mito extensions to v1.12.2. {pull}39755[39755] - Add support for base64-encoded HMAC headers to HTTP Endpoint. {pull}39655[39655] +- Journald input validates the minimum compatible version of Systemd and will fail to start if the Systemd version in the host < v255. {issue}34077[34077] {pull}39605[39605] *Auditbeat* diff --git a/Vagrantfile b/Vagrantfile index 47bc686d74f..6482271f2f1 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -47,6 +47,7 @@ TEST_BOXES = [ {:name => "ubuntu1604", :box => "ubuntu/xenial64", :platform => "ubuntu"}, {:name => "ubuntu1804", :box => "ubuntu/bionic64", :platform => "ubuntu"}, {:name => "ubuntu2004", :box => "ubuntu/focal64", :platform => "ubuntu"}, + {:name => "ubuntu2204", :box => "ubuntu/jammy64", :platform => "ubuntu"}, {:name => "debian8", :box => "generic/debian8", :platform => "debian"}, {:name => "debian9", :box => "debian/stretch64", :platform => "debian"}, diff --git a/filebeat/cmd/root.go b/filebeat/cmd/root.go index 4a5a2607b18..98c19c1d6c4 100644 --- a/filebeat/cmd/root.go +++ b/filebeat/cmd/root.go @@ -63,6 +63,14 @@ func Filebeat(inputs beater.PluginFactory, settings instance.Settings) *cmd.Beat command.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("M")) command.TestCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules")) command.SetupCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules")) + + // main_test.go calls this function before the Journald input is initialised + // to avoid panics, we check whether the flag is defined before calling + // AddGoFlag + if ignoreSystemdFlag := flag.CommandLine.Lookup("ignore-journald-version"); ignoreSystemdFlag != nil { + command.Flags().AddGoFlag(ignoreSystemdFlag) + } + command.AddCommand(cmd.GenModulesCmd(Name, "", buildModulesManager)) command.AddCommand(genGenerateCmd()) return command diff --git a/filebeat/docs/inputs/input-journald.asciidoc b/filebeat/docs/inputs/input-journald.asciidoc index 5b932b4d133..39a4aae5515 100644 --- a/filebeat/docs/inputs/input-journald.asciidoc +++ b/filebeat/docs/inputs/input-journald.asciidoc @@ -13,6 +13,9 @@ https://www.freedesktop.org/software/systemd/man/systemd-journald.service.html[` is a system service that collects and stores logging data. The `journald` input reads this log data and the metadata associated with it. +WARNING: There is bug in journald < v255 that causes {beatname_uc} to +crash. Only use this input with journald >= v255. + The simplest configuration example is one that reads all logs from the default journal. diff --git a/filebeat/include/list.go b/filebeat/include/list.go index d0c0ea511c4..6e74633e1ee 100644 --- a/filebeat/include/list.go +++ b/filebeat/include/list.go @@ -28,6 +28,7 @@ import ( // Import packages that perform 'func init()'. _ "github.com/elastic/beats/v7/filebeat/input" _ "github.com/elastic/beats/v7/filebeat/input/container" + _ "github.com/elastic/beats/v7/filebeat/input/journald" _ "github.com/elastic/beats/v7/filebeat/input/log" _ "github.com/elastic/beats/v7/filebeat/input/mqtt" _ "github.com/elastic/beats/v7/filebeat/input/redis" diff --git a/filebeat/input/journald/environment_test.go b/filebeat/input/journald/environment_test.go index fdcf201d365..4cf79e3c1e3 100644 --- a/filebeat/input/journald/environment_test.go +++ b/filebeat/input/journald/environment_test.go @@ -72,7 +72,7 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{}) e.t.Helper() e.grp = unison.TaskGroup{} manager := e.getManager() - if err := manager.Init(&e.grp, v2.ModeRun); err != nil { + if err := manager.Init(&e.grp); err != nil { e.t.Fatalf("failed to initialise manager: %+v", err) } diff --git a/filebeat/input/journald/input.go b/filebeat/input/journald/input.go index c32d677ffa4..33a14bb2579 100644 --- a/filebeat/input/journald/input.go +++ b/filebeat/input/journald/input.go @@ -20,8 +20,15 @@ package journald import ( + "context" + "errors" + "flag" + "fmt" + "regexp" + "strconv" "time" + "github.com/coreos/go-systemd/v22/dbus" "github.com/coreos/go-systemd/v22/sdjournal" "github.com/urso/sderr" @@ -37,6 +44,15 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) +var noVersionCheck bool + +func init() { + flag.BoolVar(&noVersionCheck, + "ignore-journald-version", + false, + "Does not check Journald version when starting the Journald input. This might cause Filebeat to crash!") +} + type journald struct { Backoff time.Duration MaxBackoff time.Duration @@ -63,21 +79,54 @@ const localSystemJournalID = "LOCAL_SYSTEM_JOURNAL" const pluginName = "journald" +// ErrSystemdVersionNotSupported is returned by the plugin manager when the +// Systemd version is not supported. +var ErrSystemdVersionNotSupported = errors.New("systemd version must be >= 255") + +// ErrCannotGetSystemdVersion is returned by the plugin manager when it is +// not possible to get the Systemd version via D-Bus. +var ErrCannotGetSystemdVersion = errors.New("cannot get systemd version") + // Plugin creates a new journald input plugin for creating a stateful input. func Plugin(log *logp.Logger, store cursor.StateStore) input.Plugin { - return input.Plugin{ + m := &cursor.InputManager{ + Logger: log, + StateStore: store, + Type: pluginName, + Configure: configure, + } + p := input.Plugin{ Name: pluginName, Stability: feature.Experimental, Deprecated: false, Info: "journald input", Doc: "The journald input collects logs from the local journald service", - Manager: &cursor.InputManager{ - Logger: log, - StateStore: store, - Type: pluginName, - Configure: configure, - }, + Manager: m, + } + + if noVersionCheck { + log.Warn("Journald version check has been DISABLED! Filebeat might crash if Journald version is < 255.") + return p } + + version, err := systemdVersion() + if err != nil { + configErr := fmt.Errorf("%w: %s", ErrCannotGetSystemdVersion, err) + m.Configure = func(_ *conf.C) ([]cursor.Source, cursor.Input, error) { + return nil, nil, configErr + } + return p + } + + if version < 255 { + configErr := fmt.Errorf("%w. Systemd version: %d", ErrSystemdVersionNotSupported, version) + m.Configure = func(_ *conf.C) ([]cursor.Source, cursor.Input, error) { + return nil, nil, configErr + } + return p + } + + return p } type pathSource string @@ -303,3 +352,64 @@ func (r *readerAdapter) Next() (reader.Message, error) { return m, nil } + +// parseSystemdVersion parses the string version from Systemd fetched via D-Bus. +// The function will parse and return the 3 digit major version, minor version +// and patch are ignored. +func parseSystemdVersion(ver string) (int, error) { + re := regexp.MustCompile(`(v)?(?P\d\d\d)(\.)?`) + matches := re.FindStringSubmatch(ver) + if len(matches) == 0 { + return 0, fmt.Errorf("unsupported Systemd version format '%s'", ver) + } + + // This should never fail because the regexp ensures we're getting a 3-digt + // integer, however, better safe than sorry. + version, err := strconv.Atoi(matches[2]) + if err != nil { + return 0, fmt.Errorf("could not convert '%s' to int: %w", matches[2], err) + } + + return version, nil +} + +// getSystemdVersionViaDBus gets the Systemd version from sd-bus +// +// The Systemd D-Bus documentation states: +// +// Version encodes the version string of the running systemd +// instance. Note that the version string is purely informational, +// it should not be parsed, one may not assume the version to be +// formatted in any particular way. We take the liberty to change +// the versioning scheme at any time and it is not part of the API. +// Source: https://www.freedesktop.org/wiki/Software/systemd/dbus/ +func getSystemdVersionViaDBus() (string, error) { + // Get a context with timeout just to be on the safe side + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + conn, err := dbus.NewSystemConnectionContext(ctx) + if err != nil { + return "", fmt.Errorf("cannot connect to sd-bus: %w", err) + } + + version, err := conn.GetManagerProperty("Version") + if err != nil { + return "", fmt.Errorf("cannot get version property: %w", err) + } + + return version, nil +} + +func systemdVersion() (int, error) { + versionStr, err := getSystemdVersionViaDBus() + if err != nil { + return 0, fmt.Errorf("caanot get Systemd version: %w", err) + } + + version, err := parseSystemdVersion(versionStr) + if err != nil { + return 0, fmt.Errorf("cannot parse Systemd version: %w", err) + } + + return version, nil +} diff --git a/filebeat/input/journald/input_test.go b/filebeat/input/journald/input_test.go index f42bac174fb..8b4af9dd200 100644 --- a/filebeat/input/journald/input_test.go +++ b/filebeat/input/journald/input_test.go @@ -21,6 +21,7 @@ package journald import ( "context" + "flag" "fmt" "path" "testing" @@ -28,6 +29,17 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" ) +func TestMain(m *testing.M) { + // We use TestMain because all of our current (May 2024) supported Linux + // distributions ship with a version of Systemd that will cause Filebeat + // input to crash when the journal reached the maximum number of files and + // a new rotation happens. To allow the Journald input to be instantiated + // we need to set a CLI flag and that needs to be done before all tests run. + flag.Parse() + noVersionCheck = true + m.Run() +} + func TestInputFieldsTranslation(t *testing.T) { // A few random keys to verify keysToCheck := map[string]string{ @@ -84,3 +96,55 @@ func TestInputFieldsTranslation(t *testing.T) { }) } } + +func TestParseSystemdVersion(t *testing.T) { + foo := map[string]struct { + data string + expected int + }{ + "Archlinux": { + expected: 255, + data: `255.6-1-arch`, + }, + "AmazonLinux2": { + expected: 252, + data: `252.16-1.amzn2023.0.2`, + }, + "Ubuntu 2204": { + expected: 249, + data: `249.11-0ubuntu3.12`, + }, + "Debain 10": { + expected: 241, + data: "241", + }, + "Red Hat Enterprise Linux 8": { + expected: 239, + data: "239 (239-78.el8)", + }, + } + + for name, tc := range foo { + t.Run(name, func(t *testing.T) { + version, err := parseSystemdVersion(tc.data) + if err != nil { + t.Errorf("did not expect an error: %s", err) + } + + if version != tc.expected { + t.Errorf("expecting version %d, got %d", tc.expected, version) + } + }) + } +} + +func TestGetJounraldVersion(t *testing.T) { + version, err := getSystemdVersionViaDBus() + if err != nil { + t.Fatalf("did not expect an error: %s", err) + } + + if version == "" { + t.Fatal("version must not be an empty string") + } +} diff --git a/filebeat/magefile.go b/filebeat/magefile.go index a8defd10562..e94969ac5fc 100644 --- a/filebeat/magefile.go +++ b/filebeat/magefile.go @@ -28,6 +28,7 @@ import ( devtools "github.com/elastic/beats/v7/dev-tools/mage" "github.com/elastic/beats/v7/dev-tools/mage/target/build" + "github.com/elastic/beats/v7/dev-tools/mage/target/unittest" filebeat "github.com/elastic/beats/v7/filebeat/scripts/mage" //mage:import @@ -45,6 +46,7 @@ import ( func init() { common.RegisterCheckDeps(Update) test.RegisterDeps(IntegTest) + unittest.RegisterGoTestDeps(TestJournaldInput) devtools.BeatDescription = "Filebeat sends log files to Logstash or directly to Elasticsearch." } @@ -214,3 +216,21 @@ func PythonIntegTest(ctx context.Context) error { mg.Deps(Fields, Dashboards, devtools.BuildSystemTestBinary) return devtools.PythonIntegTestFromHost(devtools.DefaultPythonTestIntegrationFromHostArgs()) } + +// TestJournaldInput executes the Journald input unit tests. +// +// It requires Systemd and D-Bus to be installed +// on the host. +// +// Use TEST_COVERAGE=true to enable code coverage profiling. +// Use RACE_DETECTOR=true to enable the race detector. +func TestJournaldInput(ctx context.Context) error { + if devtools.Platform.GOOS == "linux" { + testArgs := devtools.DefaultGoTestUnitArgs() + testArgs.Packages = []string{"./input/journald"} + testArgs.ExtraFlags = append(testArgs.ExtraFlags, "-tags=withjournald") + return devtools.GoTest(ctx, testArgs) + } + + return nil +} diff --git a/libbeat/docs/command-reference.asciidoc b/libbeat/docs/command-reference.asciidoc index 91daaf097be..3574a7144ac 100644 --- a/libbeat/docs/command-reference.asciidoc +++ b/libbeat/docs/command-reference.asciidoc @@ -744,12 +744,19 @@ the end of the file is reached. By default harvesters are closed after The `--once` option is not currently supported with the {filebeat-ref}/filebeat-input-filestream.html[`filestream`] input type. +*`--ignore-journald-version`*:: +When the `--ignore-journald-version` is used, the Journald input +**will not** validate the minimum Systemd version during the input +initialisation. Running the Journald input with an unsupported version +of Systemd might cause {beatname_uc} to crash. endif::[] +ifeval::["{beatname_lc}"=="metricbeat"] *`--system.hostfs MOUNT_POINT`*:: Specifies the mount point of the host's filesystem for use in monitoring a host. This flag is depricated, and an alternate hostfs should be specified via the `hostfs` module config value. +endif::[] ifeval::["{beatname_lc}"=="packetbeat"] diff --git a/x-pack/filebeat/magefile.go b/x-pack/filebeat/magefile.go index aaf1326de35..93b6ce41a75 100644 --- a/x-pack/filebeat/magefile.go +++ b/x-pack/filebeat/magefile.go @@ -16,6 +16,7 @@ import ( devtools "github.com/elastic/beats/v7/dev-tools/mage" "github.com/elastic/beats/v7/dev-tools/mage/target/build" + "github.com/elastic/beats/v7/dev-tools/mage/target/unittest" filebeat "github.com/elastic/beats/v7/filebeat/scripts/mage" //mage:import @@ -33,6 +34,7 @@ import ( func init() { common.RegisterCheckDeps(Update) test.RegisterDeps(IntegTest) + unittest.RegisterGoTestDeps(TestJournaldInput) devtools.BeatDescription = "Filebeat sends log files to Logstash or directly to Elasticsearch." devtools.BeatLicense = "Elastic License" @@ -187,3 +189,16 @@ func PythonIntegTest(ctx context.Context) error { mg.Deps(Fields, Dashboards, devtools.BuildSystemTestBinary) return devtools.PythonIntegTestFromHost(devtools.DefaultPythonTestIntegrationFromHostArgs()) } + +// TestJournald executes the Journald input tests +// Use TEST_COVERAGE=true to enable code coverage profiling. +// Use RACE_DETECTOR=true to enable the race detector. +func TestJournaldInput(ctx context.Context) error { + utArgs := devtools.DefaultGoTestUnitArgs() + utArgs.Packages = []string{"../../filebeat/input/journald"} + if devtools.Platform.GOOS == "linux" { + utArgs.ExtraFlags = append(utArgs.ExtraFlags, "-tags=withjournald") + } + + return devtools.GoTest(ctx, utArgs) +}