From 08b72b24b114d0edcedfa1b2dc3df43153272664 Mon Sep 17 00:00:00 2001 From: Kirk Stewart Date: Thu, 22 Aug 2024 01:08:57 -0700 Subject: [PATCH] Add an option to respect filesystem locks to the filelog receiver. (#34801) --- .chloggen/filelog-receiver-fs-lock.yaml | 27 +++++++++++++++++ pkg/stanza/docs/operators/file_input.md | 1 + pkg/stanza/fileconsumer/config.go | 2 ++ pkg/stanza/fileconsumer/config_test.go | 1 + .../fileconsumer/internal/reader/factory.go | 2 ++ .../fileconsumer/internal/reader/reader.go | 8 +++++ .../internal/reader/reader_other.go | 13 ++++++++ .../internal/reader/reader_unix.go | 30 +++++++++++++++++++ receiver/filelogreceiver/README.md | 1 + 9 files changed, 85 insertions(+) create mode 100644 .chloggen/filelog-receiver-fs-lock.yaml create mode 100644 pkg/stanza/fileconsumer/internal/reader/reader_other.go create mode 100644 pkg/stanza/fileconsumer/internal/reader/reader_unix.go diff --git a/.chloggen/filelog-receiver-fs-lock.yaml b/.chloggen/filelog-receiver-fs-lock.yaml new file mode 100644 index 0000000000000..614467ecdb367 --- /dev/null +++ b/.chloggen/filelog-receiver-fs-lock.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: If acquire_fs_lock is true, attempt to acquire a shared lock before reading a file. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34801] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Unix only. If a lock cannot be acquired then the file will be ignored until the next poll cycle. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/pkg/stanza/docs/operators/file_input.md b/pkg/stanza/docs/operators/file_input.md index efd77ec34504a..77be99e986f50 100644 --- a/pkg/stanza/docs/operators/file_input.md +++ b/pkg/stanza/docs/operators/file_input.md @@ -29,6 +29,7 @@ The `file_input` operator reads logs from files. It will place the lines read in | `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently (minimum = 2). If the number of files matched in the `include` pattern exceeds half of this number, then files will be processed in batches. | | `max_batches` | 0 | Only applicable when files must be batched in order to respect `max_concurrent_files`. This value limits the number of batches that will be processed during a single poll interval. A value of 0 indicates no limit. | | `delete_after_read` | `false` | If `true`, each log file will be read and then immediately deleted. Requires that the `filelog.allowFileDeletion` feature gate is enabled. | +| `acquire_fs_lock` | `false` | Whether to attempt to acquire a filesystem lock before reading a file (Unix only). | | `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. | | `resource` | {} | A map of `key: value` pairs to add to the entry's resource. | | `header` | nil | Specifies options for parsing header metadata. Requires that the `filelog.allowHeaderMetadataParsing` feature gate is enabled. See below for details. | diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 03c481cacc1f8..967e4d683daf6 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -87,6 +87,7 @@ type Config struct { DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` IncludeFileRecordNumber bool `mapstructure:"include_file_record_number,omitempty"` Compression string `mapstructure:"compression,omitempty"` + AcquireFSLock bool `mapstructure:"acquire_fs_lock,omitempty"` } type HeaderConfig struct { @@ -170,6 +171,7 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts DeleteAtEOF: c.DeleteAfterRead, IncludeFileRecordNumber: c.IncludeFileRecordNumber, Compression: c.Compression, + AcquireFSLock: c.AcquireFSLock, } var t tracker.Tracker diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 340d7f7f5ee0a..eeb43c67cd831 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -41,6 +41,7 @@ func TestNewConfig(t *testing.T) { assert.False(t, cfg.IncludeFileOwnerName) assert.False(t, cfg.IncludeFileOwnerGroupName) assert.False(t, cfg.IncludeFileRecordNumber) + assert.False(t, cfg.AcquireFSLock) } func TestUnmarshal(t *testing.T) { diff --git a/pkg/stanza/fileconsumer/internal/reader/factory.go b/pkg/stanza/fileconsumer/internal/reader/factory.go index 646aebae3be69..7287ca40dae7f 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory.go @@ -44,6 +44,7 @@ type Factory struct { DeleteAtEOF bool IncludeFileRecordNumber bool Compression string + AcquireFSLock bool } func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) { @@ -77,6 +78,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, deleteAtEOF: f.DeleteAtEOF, includeFileRecordNum: f.IncludeFileRecordNumber, compression: f.Compression, + acquireFSLock: f.AcquireFSLock, } r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName)) diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index df4c03498e92e..a0c93a63a0f93 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -52,10 +52,18 @@ type Reader struct { needsUpdateFingerprint bool includeFileRecordNum bool compression string + acquireFSLock bool } // ReadToEnd will read until the end of the file func (r *Reader) ReadToEnd(ctx context.Context) { + if r.acquireFSLock { + if !r.tryLockFile() { + return + } + defer r.unlockFile() + } + switch r.compression { case "gzip": // We need to create a gzip reader each time ReadToEnd is called because the underlying diff --git a/pkg/stanza/fileconsumer/internal/reader/reader_other.go b/pkg/stanza/fileconsumer/internal/reader/reader_other.go new file mode 100644 index 0000000000000..56f28e64c4467 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/reader/reader_other.go @@ -0,0 +1,13 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build !unix + +package reader // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" + +func (r *Reader) tryLockFile() bool { + return true +} + +func (r *Reader) unlockFile() { +} diff --git a/pkg/stanza/fileconsumer/internal/reader/reader_unix.go b/pkg/stanza/fileconsumer/internal/reader/reader_unix.go new file mode 100644 index 0000000000000..1fb7d6d982015 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/reader/reader_unix.go @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build unix + +package reader // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" + +import ( + "errors" + + "go.uber.org/zap" + "golang.org/x/sys/unix" +) + +func (r *Reader) tryLockFile() bool { + if err := unix.Flock(int(r.file.Fd()), unix.LOCK_SH|unix.LOCK_NB); err != nil { + if !errors.Is(err, unix.EWOULDBLOCK) { + r.set.Logger.Error("Failed to lock", zap.Error(err)) + } + return false + } + + return true +} + +func (r *Reader) unlockFile() { + if err := unix.Flock(int(r.file.Fd()), unix.LOCK_UN); err != nil { + r.set.Logger.Error("Failed to unlock", zap.Error(err)) + } +} diff --git a/receiver/filelogreceiver/README.md b/receiver/filelogreceiver/README.md index 5edb62ba96bc8..cb33d45dc1a9f 100644 --- a/receiver/filelogreceiver/README.md +++ b/receiver/filelogreceiver/README.md @@ -40,6 +40,7 @@ Tails and parses logs from files. | `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently. If the number of files matched in the `include` pattern exceeds this number, then files will be processed in batches. | | `max_batches` | 0 | Only applicable when files must be batched in order to respect `max_concurrent_files`. This value limits the number of batches that will be processed during a single poll interval. A value of 0 indicates no limit. | | `delete_after_read` | `false` | If `true`, each log file will be read and then immediately deleted. Requires that the `filelog.allowFileDeletion` feature gate is enabled. Must be `false` when `start_at` is set to `end`. | +| `acquire_fs_lock` | `false` | Whether to attempt to acquire a filesystem lock before reading a file (Unix only). | | `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. | | `resource` | {} | A map of `key: value` pairs to add to the entry's resource. | | `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details. |