diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1c9642b83cf..c3d2e047075 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -121,6 +121,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix storage metricset to allow config without region/zone. {issue}17623[17623] {pull}17624[17624] - Fix overflow on Prometheus rates when new buckets are added on the go. {pull}17753[17753] - Fix tags_filter for cloudwatch metricset in aws. {pull}18524[18524] +- Fix panic on `metricbeat test modules` when modules are configured in `metricbeat.modules`. {issue}18789[18789] {pull}18797[18797] - Add missing network.sent_packets_count metric into compute metricset in googlecloud module. {pull}18802[18802] *Packetbeat* diff --git a/libbeat/publisher/pipeline/nilpipeline.go b/libbeat/publisher/pipeline/nilpipeline.go new file mode 100644 index 00000000000..f32785a8d22 --- /dev/null +++ b/libbeat/publisher/pipeline/nilpipeline.go @@ -0,0 +1,83 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +import "github.com/elastic/beats/v7/libbeat/beat" + +type nilPipeline struct{} + +type nilClient struct { + eventer beat.ClientEventer + ackCount func(int) + ackEvents func([]interface{}) + ackLastEvent func(interface{}) +} + +var _nilPipeline = (*nilPipeline)(nil) + +// NewNilPipeline returns a new pipeline that is compatible with +// beats.PipelineConnector. The pipeline will discard all events that have been +// published. Client ACK handlers will still be executed, but the callbacks +// will be executed immediately when the event is published. +func NewNilPipeline() beat.PipelineConnector { return _nilPipeline } + +func (p *nilPipeline) Connect() (beat.Client, error) { + return p.ConnectWith(beat.ClientConfig{}) +} + +func (p *nilPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { + return &nilClient{ + eventer: cfg.Events, + ackCount: cfg.ACKCount, + ackEvents: cfg.ACKEvents, + ackLastEvent: cfg.ACKLastEvent, + }, nil +} + +func (c *nilClient) Publish(event beat.Event) { + c.PublishAll([]beat.Event{event}) +} + +func (c *nilClient) PublishAll(events []beat.Event) { + L := len(events) + if L == 0 { + return + } + + if c.ackLastEvent != nil { + c.ackLastEvent(events[L-1].Private) + } + if c.ackEvents != nil { + tmp := make([]interface{}, L) + for i := range events { + tmp[i] = events[i].Private + } + c.ackEvents(tmp) + } + if c.ackCount != nil { + c.ackCount(L) + } +} + +func (c *nilClient) Close() error { + if c.eventer != nil { + c.eventer.Closing() + c.eventer.Closed() + } + return nil +} diff --git a/metricbeat/cmd/test/modules.go b/metricbeat/cmd/test/modules.go index 93950973a5d..6bf312d17b9 100644 --- a/metricbeat/cmd/test/modules.go +++ b/metricbeat/cmd/test/modules.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cmd/instance" + "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/testing" "github.com/elastic/beats/v7/metricbeat/beater" ) @@ -49,6 +50,8 @@ func GenTestModulesCmd(name, beatVersion string, create beat.Creator) *cobra.Com os.Exit(1) } + // A publisher is needed for modules that add their own pipelines + b.Beat.Publisher = newPublisher() mb, err := create(&b.Beat, b.Beat.BeatConfig) if err != nil { fmt.Fprintf(os.Stderr, "Error initializing metricbeat: %s\n", err) @@ -78,3 +81,17 @@ func GenTestModulesCmd(name, beatVersion string, create beat.Creator) *cobra.Com }, } } + +type publisher struct { + beat.PipelineConnector +} + +// newPublisher returns a functional publisher that does nothing. +func newPublisher() *publisher { + return &publisher{pipeline.NewNilPipeline()} +} + +// SetACKHandler is a dummy implementation of the ack handler for the test publisher. +func (*publisher) SetACKHandler(beat.PipelineACKHandler) error { + return nil +} diff --git a/metricbeat/mb/module/configuration.go b/metricbeat/mb/module/configuration.go index 8b0701c0ae9..d5049690b48 100644 --- a/metricbeat/mb/module/configuration.go +++ b/metricbeat/mb/module/configuration.go @@ -30,7 +30,7 @@ func ConfiguredModules(modulesData []*common.Config, configModulesData *common.C var modules []*Wrapper for _, moduleCfg := range modulesData { - module, err := NewWrapper(moduleCfg, mb.Registry, nil) + module, err := NewWrapper(moduleCfg, mb.Registry, moduleOptions...) if err != nil { return nil, err } diff --git a/metricbeat/tests/system/test_cmd.py b/metricbeat/tests/system/test_cmd.py index 740beedb97a..ad9a507d08c 100644 --- a/metricbeat/tests/system/test_cmd.py +++ b/metricbeat/tests/system/test_cmd.py @@ -124,6 +124,21 @@ def test_modules_test(self): assert self.log_contains("cpu...OK") assert self.log_contains("memory...OK") + def test_modules_test_with_module_in_main_config(self): + self.render_config_template(reload=False, modules=[{ + "name": "system", + "metricsets": ["cpu", "memory"], + "period": "10s", + }]) + + exit_code = self.run_beat( + logging_args=None, + extra_args=["test", "modules"]) + + assert exit_code == 0 + assert self.log_contains("cpu...OK") + assert self.log_contains("memory...OK") + def test_modules_test_error(self): """ Test test modules command with an error result