From ad80c0b5c5b439413bf1feaa779315f1512a4df9 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Fri, 19 Aug 2022 18:02:16 -0400 Subject: [PATCH] [Filebeat] Add lumberjack input (#32175) Add an input for receiving data over the Lumberjack protocol as defined in https://github.com/elastic/go-lumber. The raw data is written into the lumberjack field which is mapped as flattened. The client's remote address is written to `source.address` And if mTLS is used the client cert's CN is written to `tls.client.subject`. Metrics from the input are published under the `dataset` namespace. They are also available under the http endpoint at `/dataset`. The logging selectors for the input are `input.lumberjack` and `input.lumberjack.go-lumber`. --- NOTICE.txt | 217 ++++++++++++++-- filebeat/docs/fields.asciidoc | 18 ++ go.mod | 4 +- go.sum | 8 +- x-pack/filebeat/include/list.go | 1 + .../input/default-inputs/inputs_aix.go | 2 + .../input/default-inputs/inputs_other.go | 2 + .../input/lumberjack/_meta/fields.yml | 9 + x-pack/filebeat/input/lumberjack/ack.go | 78 ++++++ x-pack/filebeat/input/lumberjack/ack_test.go | 46 ++++ x-pack/filebeat/input/lumberjack/config.go | 39 +++ .../filebeat/input/lumberjack/config_test.go | 74 ++++++ x-pack/filebeat/input/lumberjack/fields.go | 23 ++ .../input/lumberjack/generate_certs_test.go | 153 +++++++++++ x-pack/filebeat/input/lumberjack/input.go | 94 +++++++ x-pack/filebeat/input/lumberjack/logger.go | 40 +++ x-pack/filebeat/input/lumberjack/metrics.go | 47 ++++ x-pack/filebeat/input/lumberjack/server.go | 182 ++++++++++++++ .../filebeat/input/lumberjack/server_test.go | 238 ++++++++++++++++++ 19 files changed, 1255 insertions(+), 20 deletions(-) create mode 100644 x-pack/filebeat/input/lumberjack/_meta/fields.yml create mode 100644 x-pack/filebeat/input/lumberjack/ack.go create mode 100644 x-pack/filebeat/input/lumberjack/ack_test.go create mode 100644 x-pack/filebeat/input/lumberjack/config.go create mode 100644 x-pack/filebeat/input/lumberjack/config_test.go create mode 100644 x-pack/filebeat/input/lumberjack/fields.go create mode 100644 x-pack/filebeat/input/lumberjack/generate_certs_test.go create mode 100644 x-pack/filebeat/input/lumberjack/input.go create mode 100644 x-pack/filebeat/input/lumberjack/logger.go create mode 100644 x-pack/filebeat/input/lumberjack/metrics.go create mode 100644 x-pack/filebeat/input/lumberjack/server.go create mode 100644 x-pack/filebeat/input/lumberjack/server_test.go diff --git a/NOTICE.txt b/NOTICE.txt index 5ccfea34fde9..e23ebb81db43 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -11440,25 +11440,214 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-lookslike@v0 -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-lumber -Version: v0.1.0 +Version: v0.1.2-0.20220819171948-335fde24ea0f Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-lumber@v0.1.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/go-lumber@v0.1.2-0.20220819171948-335fde24ea0f/LICENSE: -Copyright (c) 2012–2016 Elasticsearch -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ - http://www.apache.org/licenses/LICENSE-2.0 + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. -------------------------------------------------------------------------------- @@ -39265,11 +39454,11 @@ OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -------------------------------------------------------------------------------- Dependency : github.com/klauspost/compress -Version: v1.13.6 +Version: v1.15.9 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/klauspost/compress@v1.13.6/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/klauspost/compress@v1.15.9/LICENSE: Copyright (c) 2012 The Go Authors. All rights reserved. Copyright (c) 2019 Klaus Post. All rights reserved. diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc index 66af34c3469e..51a6eb4ee115 100644 --- a/filebeat/docs/fields.asciidoc +++ b/filebeat/docs/fields.asciidoc @@ -56,6 +56,7 @@ grouped in the following categories: * <> * <> * <> +* <> * <> * <> * <> @@ -87575,6 +87576,23 @@ alias to: event.duration -- +[[exported-fields-lumberjack]] +== Lumberjack fields + +Fields from Lumberjack input. + + + +*`lumberjack`*:: ++ +-- +Structured data received in an event sent over the Lumberjack protocol. + + +type: flattened + +-- + [[exported-fields-microsoft]] == Microsoft fields diff --git a/go.mod b/go.mod index 363674896590..97a4c2a4a840 100644 --- a/go.mod +++ b/go.mod @@ -78,7 +78,7 @@ require ( github.com/elastic/go-libaudit/v2 v2.3.2-0.20220729123722-f8f7d5c19e6b github.com/elastic/go-licenser v0.4.0 github.com/elastic/go-lookslike v0.3.0 - github.com/elastic/go-lumber v0.1.0 + github.com/elastic/go-lumber v0.1.2-0.20220819171948-335fde24ea0f github.com/elastic/go-perf v0.0.0-20191212140718-9c656876f595 github.com/elastic/go-seccomp-bpf v1.2.0 github.com/elastic/go-structform v0.0.10 @@ -294,7 +294,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/karrick/godirwalk v1.15.8 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect - github.com/klauspost/compress v1.13.6 // indirect + github.com/klauspost/compress v1.15.9 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/markbates/pkger v0.17.0 // indirect github.com/mattn/go-isatty v0.0.14 // indirect diff --git a/go.sum b/go.sum index 1f616d559c68..7d4451b58186 100644 --- a/go.sum +++ b/go.sum @@ -622,8 +622,8 @@ github.com/elastic/go-licenser v0.4.0 h1:jLq6A5SilDS/Iz1ABRkO6BHy91B9jBora8FwGRs github.com/elastic/go-licenser v0.4.0/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU= github.com/elastic/go-lookslike v0.3.0 h1:HDI/DQ65V85ZqM7D/sbxcK2wFFnh3+7iFvBk2v2FTHs= github.com/elastic/go-lookslike v0.3.0/go.mod h1:AhH+rdJux5RlVjs+6ej4jkvYyoNRkj2crxmqeHlj3hA= -github.com/elastic/go-lumber v0.1.0 h1:HUjpyg36v2HoKtXlEC53EJ3zDFiDRn65d7B8dBHNius= -github.com/elastic/go-lumber v0.1.0/go.mod h1:8YvjMIRYypWuPvpxx7WoijBYdbB7XIh/9FqSYQZTtxQ= +github.com/elastic/go-lumber v0.1.2-0.20220819171948-335fde24ea0f h1:TsPpU5EAwlt7YZoupKlxZ093qTZYdGou3EhfTF1U0B4= +github.com/elastic/go-lumber v0.1.2-0.20220819171948-335fde24ea0f/go.mod h1:HHaWnZamYKWsR9/eZNHqRHob8iQDKnchHmmskT/SKko= github.com/elastic/go-perf v0.0.0-20191212140718-9c656876f595 h1:q8n4QjcLa4q39Q3fqHRknTBXBtegjriHFrB42YKgXGI= github.com/elastic/go-perf v0.0.0-20191212140718-9c656876f595/go.mod h1:s09U1b4P1ZxnKx2OsqY7KlHdCesqZWIhyq0Gs/QC/Us= github.com/elastic/go-plugins-helpers v0.0.0-20200207104224-bdf17607b79f h1:FvsqAVIFZtJtK+koSvFU+/KoNQo1m14kgV5qJ8ImN+U= @@ -1223,8 +1223,8 @@ github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYs github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= -github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg= github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index 91bf5fee773f..c1382ae36275 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -13,6 +13,7 @@ import ( _ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/cometd" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/gcppubsub" + _ "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" _ "github.com/elastic/beats/v7/x-pack/filebeat/module/activemq" _ "github.com/elastic/beats/v7/x-pack/filebeat/module/aws" diff --git a/x-pack/filebeat/input/default-inputs/inputs_aix.go b/x-pack/filebeat/input/default-inputs/inputs_aix.go index a1cdf5da43ef..f46d8ed1f254 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_aix.go +++ b/x-pack/filebeat/input/default-inputs/inputs_aix.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" + "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" "github.com/elastic/elastic-agent-libs/logp" ) @@ -21,5 +22,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 httpjson.Plugin(log, store), o365audit.Plugin(log, store), awss3.Plugin(store), + lumberjack.Plugin(), } } diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index b100612d58cd..b87faaed46ac 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -16,6 +16,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" + "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" "github.com/elastic/elastic-agent-libs/logp" ) @@ -28,5 +29,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 o365audit.Plugin(log, store), awss3.Plugin(store), awscloudwatch.Plugin(), + lumberjack.Plugin(), } } diff --git a/x-pack/filebeat/input/lumberjack/_meta/fields.yml b/x-pack/filebeat/input/lumberjack/_meta/fields.yml new file mode 100644 index 000000000000..ee3ef012006e --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/_meta/fields.yml @@ -0,0 +1,9 @@ +- key: lumberjack + title: "Lumberjack" + description: > + Fields from Lumberjack input. + fields: + - name: lumberjack + type: flattened + description: > + Structured data received in an event sent over the Lumberjack protocol. diff --git a/x-pack/filebeat/input/lumberjack/ack.go b/x-pack/filebeat/input/lumberjack/ack.go new file mode 100644 index 000000000000..ab15ad157dc4 --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/ack.go @@ -0,0 +1,78 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "sync" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/acker" +) + +// batchACKTracker invokes batchACK when all events associated to the batch +// have been published and acknowledged by an output. +type batchACKTracker struct { + batchACK func() + + mutex sync.Mutex // mutex synchronizes access to pendingACKs. + pendingACKs int64 // Number of Beat events in lumberjack batch that are pending ACKs. +} + +// newBatchACKTracker returns a new batchACKTracker. The provided batchACK function +// is invoked after the full batch has been acknowledged. Ready() must be invoked +// after all events in the batch are published. +func newBatchACKTracker(batchACKCallback func()) *batchACKTracker { + return &batchACKTracker{ + batchACK: batchACKCallback, + pendingACKs: 1, // Ready() must be called to consume this "1". + } +} + +// Ready signals that the batch has been fully consumed. Only +// after the batch is marked as "ready" can the lumberjack batch +// be ACKed. This prevents the batch from being ACKed prematurely. +func (t *batchACKTracker) Ready() { + t.ACK() +} + +// Add increments the number of pending ACKs. +func (t *batchACKTracker) Add() { + t.mutex.Lock() + defer t.mutex.Unlock() + + t.pendingACKs++ +} + +// ACK decrements the number of pending event ACKs. When all pending ACKs are +// received then the lumberjack batch is ACKed. +func (t *batchACKTracker) ACK() { + t.mutex.Lock() + defer t.mutex.Unlock() + + if t.pendingACKs <= 0 { + panic("misuse detected: negative ACK counter") + } + + t.pendingACKs-- + if t.pendingACKs == 0 { + t.batchACK() + } +} + +// newEventACKHandler returns a beat ACKer that can receive callbacks when +// an event has been ACKed an output. If the event contains a private metadata +// pointing to a batchACKTracker then it will invoke the tracker's ACK() method +// to decrement the number of pending ACKs. +func newEventACKHandler() beat.ACKer { + return acker.ConnectionOnly( + acker.EventPrivateReporter(func(_ int, privates []interface{}) { + for _, private := range privates { + if ack, ok := private.(*batchACKTracker); ok { + ack.ACK() + } + } + }), + ) +} diff --git a/x-pack/filebeat/input/lumberjack/ack_test.go b/x-pack/filebeat/input/lumberjack/ack_test.go new file mode 100644 index 000000000000..90e03819488d --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/ack_test.go @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/go-lumber/lj" +) + +func TestBatchACKTracker(t *testing.T) { + t.Run("empty", func(t *testing.T) { + batch := lj.NewBatch(nil) + + acker := newBatchACKTracker(batch.ACK) + require.False(t, isACKed(batch)) + + acker.Ready() + require.True(t, isACKed(batch)) + }) + + t.Run("single_event", func(t *testing.T) { + batch := lj.NewBatch(nil) + + acker := newBatchACKTracker(batch.ACK) + acker.Add() + acker.ACK() + require.False(t, isACKed(batch)) + + acker.Ready() + require.True(t, isACKed(batch)) + }) +} + +func isACKed(batch *lj.Batch) bool { + select { + case <-batch.Await(): + return true + default: + return false + } +} diff --git a/x-pack/filebeat/input/lumberjack/config.go b/x-pack/filebeat/input/lumberjack/config.go new file mode 100644 index 000000000000..53ceed2f8ce9 --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/config.go @@ -0,0 +1,39 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "fmt" + "strings" + "time" + + "github.com/elastic/elastic-agent-libs/transport/tlscommon" +) + +type config struct { + ListenAddress string `config:"listen_address" validate:"nonzero"` // Bind address for the server (e.g. address:port). Default to localhost:5044. + Versions []string `config:"versions"` // List of Lumberjack version (e.g. v1, v2). + TLS *tlscommon.ServerConfig `config:"ssl"` // TLS options. + Keepalive time.Duration `config:"keepalive" validate:"min=0"` // Keepalive interval for notifying clients that batches that are not yet ACKed. + Timeout time.Duration `config:"timeout" validate:"min=0"` // Read / write timeouts for Lumberjack server. + MaxConnections int `config:"max_connections" validate:"min=0"` // Maximum number of concurrent connections. Default is 0 which means no limit. +} + +func (c *config) InitDefaults() { + c.ListenAddress = "localhost:5044" + c.Versions = []string{"v1", "v2"} +} + +func (c *config) Validate() error { + for _, v := range c.Versions { + switch strings.ToLower(v) { + case "v1", "v2": + default: + return fmt.Errorf("invalid lumberjack version %q: allowed values are v1 and v2", v) + } + } + + return nil +} diff --git a/x-pack/filebeat/input/lumberjack/config_test.go b/x-pack/filebeat/input/lumberjack/config_test.go new file mode 100644 index 000000000000..5b9e73d4d7c4 --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/config_test.go @@ -0,0 +1,74 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "testing" + + "github.com/stretchr/testify/require" + + conf "github.com/elastic/elastic-agent-libs/config" +) + +func TestConfig(t *testing.T) { + testCases := []struct { + name string + userConfig map[string]interface{} + expected *config + expectedErr string + }{ + { + "defaults", + map[string]interface{}{}, + &config{ + ListenAddress: "localhost:5044", + Versions: []string{"v1", "v2"}, + }, + "", + }, + { + "validate version", + map[string]interface{}{ + "versions": []string{"v3"}, + }, + nil, + `invalid lumberjack version "v3"`, + }, + { + "validate keepalive", + map[string]interface{}{ + "keepalive": "-1s", + }, + nil, + `requires duration >= 0`, + }, + { + "validate max_connections", + map[string]interface{}{ + "max_connections": -1, + }, + nil, + `requires value >= 0 accessing 'max_connections'`, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + c := conf.MustNewConfigFrom(tc.userConfig) + + var ljConf config + err := c.Unpack(&ljConf) + + if tc.expectedErr != "" { + require.Error(t, err, "expected error: %s", tc.expectedErr) + require.Contains(t, err.Error(), tc.expectedErr) + return + } + + require.Equal(t, *tc.expected, ljConf) + }) + } +} diff --git a/x-pack/filebeat/input/lumberjack/fields.go b/x-pack/filebeat/input/lumberjack/fields.go new file mode 100644 index 000000000000..d54be5d16eb4 --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/fields.go @@ -0,0 +1,23 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Code generated by beats/dev-tools/cmd/asset/asset.go - DO NOT EDIT. + +package lumberjack + +import ( + "github.com/elastic/beats/v7/libbeat/asset" +) + +func init() { + if err := asset.SetFields("filebeat", "lumberjack", asset.ModuleFieldsPri, AssetLumberjack); err != nil { + panic(err) + } +} + +// AssetLumberjack returns asset data. +// This is the base64 encoded zlib format compressed contents of input/lumberjack. +func AssetLumberjack() string { + return "eJxsjjEOwjAQBHu/YpU+eYALSio6XmDsjTBxbOtyjpTfo0QIEGKLK/ZGmu0xcbNIbb5RHs5PBtCoiRbd5V12BghcvMSqsWSLkwGAc2QKC0YpMz4wYq5NBwOMx98ebI/sZv6Y9uhWaTEmp8rM8Gr/2PZcVZrXJgwITh2EnnFlQMxwGVyZFct+ykqB3vk9rErR4ksazDMAAP//JmxQDQ==" +} diff --git a/x-pack/filebeat/input/lumberjack/generate_certs_test.go b/x-pack/filebeat/input/lumberjack/generate_certs_test.go new file mode 100644 index 000000000000..e66eb1b5b8b2 --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/generate_certs_test.go @@ -0,0 +1,153 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "bytes" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" + "net" + "testing" + "time" +) + +type Cert struct { + signedCertDER []byte // DER encoded certificate from x509.CreateCertificate. + key *rsa.PrivateKey // RSA public / private key pair. +} + +// CertPEM returns the cert encoded as PEM. +func (c Cert) CertPEM(t testing.TB) []byte { return pemEncode(t, c.signedCertDER, "CERTIFICATE") } + +// KeyPEM returns the private key encoded as PEM. +func (c Cert) KeyPEM(t testing.TB) []byte { + return pemEncode(t, x509.MarshalPKCS1PrivateKey(c.key), "RSA PRIVATE KEY") +} + +func (c Cert) TLSCertificate(t testing.TB) tls.Certificate { + pair, err := tls.X509KeyPair(c.CertPEM(t), c.KeyPEM(t)) + if err != nil { + t.Fatal(err) + } + + return pair +} + +// generateCertData creates a root CA, server, and client cert suitable for +// testing mTLS. +func generateCertData(t testing.TB) (rootCA, client, server Cert) { + t.Helper() + + // CA cert + ca := &x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{ + Organization: []string{"Elastic"}, + Country: []string{"US"}, + Locality: []string{"San Francisco"}, + StreetAddress: []string{"West El Camino Real"}, + PostalCode: []string{"94040"}, + }, + NotBefore: time.Now(), + NotAfter: time.Now().AddDate(0, 0, 1), + IsCA: true, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign, + BasicConstraintsValid: true, + } + + var err error + rootCA.key, err = rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + t.Fatal(err) + } + + rootCA.signedCertDER, err = x509.CreateCertificate(rand.Reader, ca, ca, &rootCA.key.PublicKey, rootCA.key) + if err != nil { + t.Fatal(err) + } + + // Server cert + { + // set up our server certificate + serverCert := &x509.Certificate{ + SerialNumber: big.NewInt(2), + Subject: pkix.Name{ + Organization: []string{"Elastic"}, + Country: []string{"US"}, + Locality: []string{"San Francisco"}, + StreetAddress: []string{"West El Camino Real"}, + PostalCode: []string{"94040"}, + CommonName: "server", + }, + IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1), net.IPv6loopback}, + DNSNames: []string{"localhost"}, + NotBefore: time.Now(), + NotAfter: time.Now().AddDate(0, 0, 1), + SubjectKeyId: []byte{1, 2, 3, 4, 5}, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + KeyUsage: x509.KeyUsageDigitalSignature, + } + + server.key, err = rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + t.Fatal(err) + } + + server.signedCertDER, err = x509.CreateCertificate(rand.Reader, serverCert, ca, &server.key.PublicKey, rootCA.key) + if err != nil { + t.Fatal(err) + } + } + + // Client cert. + { + clientCert := &x509.Certificate{ + SerialNumber: big.NewInt(3), + Subject: pkix.Name{ + Organization: []string{"Elastic"}, + Country: []string{"US"}, + Locality: []string{"San Francisco"}, + StreetAddress: []string{"West El Camino Real"}, + PostalCode: []string{"94040"}, + CommonName: "client", + }, + NotBefore: time.Now(), + NotAfter: time.Now().AddDate(0, 0, 1), + SubjectKeyId: []byte{1, 2, 3, 4, 5}, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + KeyUsage: x509.KeyUsageDigitalSignature, + EmailAddresses: []string{"client@example.com"}, + } + + client.key, err = rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + t.Fatal(err) + } + + client.signedCertDER, err = x509.CreateCertificate(rand.Reader, clientCert, ca, &client.key.PublicKey, rootCA.key) + if err != nil { + t.Fatal(err) + } + } + + return rootCA, client, server +} + +func pemEncode(t testing.TB, certBytes []byte, certType string) []byte { + t.Helper() + + pemData := new(bytes.Buffer) + if err := pem.Encode(pemData, &pem.Block{Type: certType, Bytes: certBytes}); err != nil { + t.Fatal(err) + } + + return pemData.Bytes() +} diff --git a/x-pack/filebeat/input/lumberjack/input.go b/x-pack/filebeat/input/lumberjack/input.go new file mode 100644 index 000000000000..9471bb35e92b --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/input.go @@ -0,0 +1,94 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "fmt" + + inputv2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/feature" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/monitoring" +) + +const ( + inputName = "lumberjack" +) + +func Plugin() inputv2.Plugin { + return inputv2.Plugin{ + Name: inputName, + Stability: feature.Beta, + Info: "Receives data streamed via the Lumberjack protocol.", + Manager: inputv2.ConfigureWith(configure), + } +} + +func configure(cfg *conf.C) (inputv2.Input, error) { + var lumberjackConfig config + if err := cfg.Unpack(&lumberjackConfig); err != nil { + return nil, err + } + + return newLumberjackInput(lumberjackConfig) +} + +// lumberjackInput implements the Filebeat input V2 interface. The input is stateless. +type lumberjackInput struct { + config config +} + +var _ inputv2.Input = (*lumberjackInput)(nil) + +func newLumberjackInput(lumberjackConfig config) (*lumberjackInput, error) { + return &lumberjackInput{config: lumberjackConfig}, nil +} + +func (i *lumberjackInput) Name() string { return inputName } + +func (i *lumberjackInput) Test(inputCtx inputv2.TestContext) error { + s, err := newServer(i.config, inputCtx.Logger, nil, nil) + if err != nil { + return err + } + return s.Close() +} + +func (i *lumberjackInput) Run(inputCtx inputv2.Context, pipeline beat.Pipeline) error { + inputCtx.Logger.Info("Starting " + inputName + " input") + defer inputCtx.Logger.Info(inputName + " input stopped") + + // Create client for publishing events and receive notification of their ACKs. + client, err := pipeline.ConnectWith(beat.ClientConfig{ + CloseRef: inputCtx.Cancelation, + ACKHandler: newEventACKHandler(), + }) + if err != nil { + return fmt.Errorf("failed to create pipeline client: %w", err) + } + defer client.Close() + + setGoLumberLogger(inputCtx.Logger.Named("go-lumber")) + + metricRegistry := monitoring.GetNamespace("dataset").GetRegistry() + metrics := newInputMetrics(metricRegistry, inputCtx.ID) + defer metrics.Close() + + s, err := newServer(i.config, inputCtx.Logger, client.Publish, metrics) + if err != nil { + return err + } + defer s.Close() + + // Shutdown the server when cancellation is signaled. + go func() { + <-inputCtx.Cancelation.Done() + s.Close() + }() + + // Run server until the cancellation signal. + return s.Run() +} diff --git a/x-pack/filebeat/input/lumberjack/logger.go b/x-pack/filebeat/input/lumberjack/logger.go new file mode 100644 index 000000000000..0f15b2b0d114 --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/logger.go @@ -0,0 +1,40 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "sync" + + "go.uber.org/zap" + + "github.com/elastic/elastic-agent-libs/logp" + lumberlog "github.com/elastic/go-lumber/log" +) + +var setGoLumberLoggerOnce sync.Once + +func setGoLumberLogger(parent *logp.Logger) { + setGoLumberLoggerOnce.Do(func() { + lumberlog.Logger = &goLumberLogger{parent: parent.WithOptions(zap.AddCallerSkip(2))} + }) +} + +// goLumberLogger implements the go-lumber/log.Logging interface to route +// log message from go-lumber to Beats logp. +type goLumberLogger struct { + parent *logp.Logger +} + +func (l *goLumberLogger) Printf(s string, i ...interface{}) { + l.parent.Debugf(s, i...) +} + +func (l *goLumberLogger) Println(i ...interface{}) { + l.parent.Debug(i...) +} + +func (l *goLumberLogger) Print(i ...interface{}) { + l.parent.Debug(i...) +} diff --git a/x-pack/filebeat/input/lumberjack/metrics.go b/x-pack/filebeat/input/lumberjack/metrics.go new file mode 100644 index 000000000000..ebceeb397b71 --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/metrics.go @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "github.com/rcrowley/go-metrics" + + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" +) + +type inputMetrics struct { + id string // Input ID. + parent *monitoring.Registry // Parent registry holding this input's ID as a key. + bindAddress *monitoring.String // Bind address of input. + + batchesReceivedTotal *monitoring.Uint // Number of Lumberjack batches received (not necessarily processed fully). + batchesACKedTotal *monitoring.Uint // Number of Lumberjack batches ACKed. + messagesReceivedTotal *monitoring.Uint // Number of Lumberjack messages received (not necessarily processed fully). + batchProcessingTime metrics.Sample // Histogram of the elapsed batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches). +} + +// Close removes the metrics from the registry. +func (m *inputMetrics) Close() { + m.parent.Remove(m.id) +} + +func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics { + reg := parent.NewRegistry(id) + monitoring.NewString(reg, "input").Set(inputName) + monitoring.NewString(reg, "id").Set(id) + out := &inputMetrics{ + id: id, + parent: reg, + bindAddress: monitoring.NewString(reg, "bind_address"), + batchesReceivedTotal: monitoring.NewUint(reg, "batches_received_total"), + batchesACKedTotal: monitoring.NewUint(reg, "batches_acked_total"), + messagesReceivedTotal: monitoring.NewUint(reg, "messages_received_total"), + batchProcessingTime: metrics.NewUniformSample(1024), + } + adapter.NewGoMetrics(reg, "batch_processing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.batchProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. + + return out +} diff --git a/x-pack/filebeat/input/lumberjack/server.go b/x-pack/filebeat/input/lumberjack/server.go new file mode 100644 index 000000000000..96d0366e2b5f --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/server.go @@ -0,0 +1,182 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "crypto/tls" + "net" + "strings" + "sync" + "time" + + "golang.org/x/net/netutil" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/transport/tlscommon" + lumber "github.com/elastic/go-lumber/server" +) + +type server struct { + config config + log *logp.Logger + publish func(beat.Event) + metrics *inputMetrics + ljSvr lumber.Server + ljSvrCloseOnce sync.Once + bindAddress string +} + +func newServer(c config, log *logp.Logger, pub func(beat.Event), metrics *inputMetrics) (*server, error) { + ljSvr, bindAddress, err := newLumberjack(c) + if err != nil { + return nil, err + } + + if metrics == nil { + metrics = newInputMetrics(monitoring.NewRegistry(), "") + } + + bindURI := "tcp://" + bindAddress + if c.TLS.IsEnabled() { + bindURI = "tls://" + bindAddress + } + log.Infof(inputName+" is listening at %v.", bindURI) + metrics.bindAddress.Set(bindURI) + + return &server{ + config: c, + log: log, + publish: pub, + metrics: metrics, + ljSvr: ljSvr, + bindAddress: bindAddress, + }, nil +} + +func (s *server) Close() error { + var err error + s.ljSvrCloseOnce.Do(func() { + err = s.ljSvr.Close() + }) + return err +} + +func (s *server) Run() error { + // Process batches until the input is stopped. + for batch := range s.ljSvr.ReceiveChan() { + s.metrics.batchesReceivedTotal.Inc() + + if len(batch.Events) == 0 { + batch.ACK() + s.metrics.batchesACKedTotal.Inc() + continue + } + s.metrics.messagesReceivedTotal.Add(uint64(len(batch.Events))) + + // Track all the Beat events associated to the Lumberjack batch so that + // the batch can be ACKed after the Beat events are delivered successfully. + start := time.Now() + acker := newBatchACKTracker(func() { + batch.ACK() + s.metrics.batchesACKedTotal.Inc() + s.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds()) + }) + + for _, ljEvent := range batch.Events { + acker.Add() + s.publish(makeEvent(batch.RemoteAddr, batch.TLS, ljEvent, acker)) + } + + // Mark the batch as "ready" after Beat events are generated for each + // Lumberjack event. + acker.Ready() + } + + return nil +} + +func makeEvent(remoteAddr string, tlsState *tls.ConnectionState, lumberjackEvent interface{}, acker *batchACKTracker) beat.Event { + event := beat.Event{ + Timestamp: time.Now().UTC(), + Fields: map[string]interface{}{ + "source": map[string]interface{}{ + "address": remoteAddr, + }, + "lumberjack": lumberjackEvent, + }, + Private: acker, + } + + if tlsState != nil && len(tlsState.PeerCertificates) > 0 { + event.Fields["tls"] = map[string]interface{}{ + "client": map[string]interface{}{ + "subject": tlsState.PeerCertificates[0].Subject.CommonName, + }, + } + } + + return event +} + +func newLumberjack(c config) (lj lumber.Server, bindAddress string, err error) { + // Setup optional TLS. + var tlsConfig *tls.Config + if c.TLS.IsEnabled() { + elasticTLSConfig, err := tlscommon.LoadTLSServerConfig(c.TLS) + if err != nil { + return nil, "", err + } + + // NOTE: Passing an empty string disables checking the client certificate for a + // specific hostname. + tlsConfig = elasticTLSConfig.BuildServerConfig("") + } + + // Start listener. + l, err := net.Listen("tcp", c.ListenAddress) + if err != nil { + return nil, "", err + } + if tlsConfig != nil { + l = tls.NewListener(l, tlsConfig) + } + if c.MaxConnections > 0 { + l = netutil.LimitListener(l, c.MaxConnections) + } + + // Start lumberjack server. + s, err := lumber.NewWithListener(l, makeLumberjackOptions(c)...) + if err != nil { + return nil, "", err + } + + return s, l.Addr().String(), nil +} + +func makeLumberjackOptions(c config) []lumber.Option { + var opts []lumber.Option + + // Versions + for _, p := range c.Versions { + switch strings.ToLower(p) { + case "v1": + opts = append(opts, lumber.V1(true)) + case "v2": + opts = append(opts, lumber.V2(true)) + } + } + + if c.Keepalive > 0 { + opts = append(opts, lumber.Keepalive(c.Keepalive)) + } + + if c.Timeout > 0 { + opts = append(opts, lumber.Timeout(c.Keepalive)) + } + + return opts +} diff --git a/x-pack/filebeat/input/lumberjack/server_test.go b/x-pack/filebeat/input/lumberjack/server_test.go new file mode 100644 index 000000000000..971a37f7255f --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/server_test.go @@ -0,0 +1,238 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "net" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/transport/tlscommon" + client "github.com/elastic/go-lumber/client/v2" +) + +const testTimeout = 10 * time.Second + +func TestServer(t *testing.T) { + makeTestConfig := func() config { + var c config + c.InitDefaults() + c.ListenAddress = "localhost:0" + c.MaxConnections = 1 + c.Keepalive = time.Second + c.Timeout = time.Second + return c + } + + t.Run("empty_batch", func(t *testing.T) { + testSendReceive(t, makeTestConfig(), 0, nil) + }) + + t.Run("no tls", func(t *testing.T) { + testSendReceive(t, makeTestConfig(), 10, nil) + }) + + t.Run("tls", func(t *testing.T) { + clientConf, serverConf := tlsSetup(t) + clientConf.Certificates = nil + + c := makeTestConfig() + c.TLS = serverConf + // Disable mTLS requirements in the server. + c.TLS.ClientAuth = 0 // tls.NoClientCert + c.TLS.VerificationMode = tlscommon.VerifyNone + + testSendReceive(t, c, 10, clientConf) + }) + + t.Run("mutual tls", func(t *testing.T) { + clientConf, serverConf := tlsSetup(t) + + c := makeTestConfig() + c.TLS = serverConf + + testSendReceive(t, c, 10, clientConf) + }) +} + +func testSendReceive(t testing.TB, c config, numberOfEvents int, clientTLSConfig *tls.Config) { + require.NoError(t, logp.TestingSetup()) + log := logp.NewLogger(inputName).With("test_name", t.Name()) + + ctx, shutdown := context.WithTimeout(context.Background(), testTimeout) + t.Cleanup(shutdown) + collect := newEventCollector(ctx, numberOfEvents) + + // Start server. + s, err := newServer(c, log, collect.Publish, nil) + require.NoError(t, err) + go func() { + <-ctx.Done() + s.Close() + }() + + // Asynchronously send and receive events. + var wg errgroup.Group + wg.Go(s.Run) + wg.Go(func() error { + // The client returns on error or after an E2E ACK is received. + // In both cases the test should shutdown. + defer shutdown() + + return sendData(ctx, t, s.bindAddress, numberOfEvents, clientTLSConfig) + }) + + // Wait for the expected number of events. + collect.Await(t) + + // Check for errors from client and server. + require.NoError(t, wg.Wait()) +} + +func sendData(ctx context.Context, t testing.TB, bindAddress string, numberOfEvents int, clientTLSConfig *tls.Config) error { + _, port, err := net.SplitHostPort(bindAddress) + if err != nil { + return err + } + + dialFunc := net.Dial + if clientTLSConfig != nil { + dialer := &tls.Dialer{ + Config: clientTLSConfig, + } + dialFunc = dialer.Dial + } + + c, err := client.SyncDialWith(dialFunc, net.JoinHostPort("localhost", port)) + if err != nil { + return fmt.Errorf("client dial error: %w", err) + } + defer c.Close() + go func() { + <-ctx.Done() + c.Close() + }() + t.Log("Lumberjack client connected.") + + var events []interface{} + for i := 0; i < numberOfEvents; i++ { + events = append(events, map[string]interface{}{ + "message": "hello world!", + "index": i, + }) + } + + if _, err = c.Send(events); err != nil { + return fmt.Errorf("failed sending lumberjack events: %w", err) + } + t.Log("Lumberjack client sent", len(events), "events.") + + return nil +} + +type eventCollector struct { + sync.Mutex + events []beat.Event + awaitCtx context.Context // awaitCtx is cancelled when events length is expectedSize. + awaitCancel context.CancelFunc + expectedSize int +} + +func newEventCollector(ctx context.Context, expectedSize int) *eventCollector { + ctx, cancel := context.WithCancel(ctx) + if expectedSize == 0 { + cancel() + } + + return &eventCollector{ + awaitCtx: ctx, + awaitCancel: cancel, + expectedSize: expectedSize, + } +} + +func (c *eventCollector) Publish(evt beat.Event) { + c.Lock() + defer c.Unlock() + + c.events = append(c.events, evt) + evt.Private.(*batchACKTracker).ACK() + + if len(c.events) == c.expectedSize { + c.awaitCancel() + } +} + +func (c *eventCollector) Await(t testing.TB) []beat.Event { + t.Helper() + + <-c.awaitCtx.Done() + if errors.Is(c.awaitCtx.Err(), context.DeadlineExceeded) { + t.Fatal(c.awaitCtx.Err()) + } + + c.Lock() + defer c.Unlock() + + if len(c.events) > c.expectedSize { + t.Fatalf("more events received than expected, got %d, want %d", len(c.events), c.expectedSize) + } + + events := make([]beat.Event, len(c.events)) + copy(events, c.events) + return events +} + +var ( + certDataOnce sync.Once + certData = struct { + ca, client, server Cert + }{} +) + +// tlsSetup return client and server configurations ready to test mutual TLS. +func tlsSetup(t *testing.T) (clientConfig *tls.Config, serverConfig *tlscommon.ServerConfig) { + t.Helper() + + certDataOnce.Do(func() { + certData.ca, certData.client, certData.server = generateCertData(t) + }) + + certPool := x509.NewCertPool() + certPool.AppendCertsFromPEM(certData.ca.CertPEM(t)) + + clientConfig = &tls.Config{ + RootCAs: certPool, + Certificates: []tls.Certificate{certData.client.TLSCertificate(t)}, + MinVersion: tls.VersionTLS12, + } + + serverConfig = &tlscommon.ServerConfig{ + // NOTE: VerifyCertificate is ineffective unless ClientAuth is set to RequireAndVerifyClientCert. + VerificationMode: tlscommon.VerifyCertificate, + // Unfortunately ServerConfig uses an unexported type in an exported field. + ClientAuth: 4, // tls.RequireAndVerifyClientCert + CAs: []string{ + string(certData.ca.CertPEM(t)), + }, + Certificate: tlscommon.CertificateConfig{ + Certificate: string(certData.server.CertPEM(t)), + Key: string(certData.server.KeyPEM(t)), + }, + } + + return clientConfig, serverConfig +}