From 41762278c34d0ada3b0d4a790580735e9b17c00a Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 2 Nov 2023 10:27:09 -0400 Subject: [PATCH] [filelogreceiver]: Add ability to sort by mtime (#28850) **Description:** * Adds a new `mtime` sort type, which will sort files by their modified time * Add a feature gate for `mtime` sort type An optional follow-up performance improvement may be made here, to have the finder return fs.DirEntry directly to query the mtime without making an extra call to os.Stat for each file. **Link to tracking Issue:** #27812 **Testing:** * Added unit tests for new functionality **Documentation:** * Added new `mode` parameter to filelogreceiver docs --- .../filelogreceiver_sort-by-mtime-rework.yaml | 27 +++++++ .../matcher/internal/filter/filter.go | 8 ++- .../matcher/internal/filter/sort.go | 58 +++++++++++++-- .../matcher/internal/filter/sort_test.go | 71 +++++++++++++++++++ pkg/stanza/fileconsumer/matcher/matcher.go | 44 ++++++++++-- .../fileconsumer/matcher/matcher_test.go | 49 ++++++++++++- receiver/filelogreceiver/README.md | 2 +- 7 files changed, 240 insertions(+), 19 deletions(-) create mode 100755 .chloggen/filelogreceiver_sort-by-mtime-rework.yaml diff --git a/.chloggen/filelogreceiver_sort-by-mtime-rework.yaml b/.chloggen/filelogreceiver_sort-by-mtime-rework.yaml new file mode 100755 index 000000000000..3b5b2ab4c94c --- /dev/null +++ b/.chloggen/filelogreceiver_sort-by-mtime-rework.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add the ability to order files by mtime, to only read the most recently modified files + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27812] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/pkg/stanza/fileconsumer/matcher/internal/filter/filter.go b/pkg/stanza/fileconsumer/matcher/internal/filter/filter.go index ec1d31b15cb6..686ddb78a183 100644 --- a/pkg/stanza/fileconsumer/matcher/internal/filter/filter.go +++ b/pkg/stanza/fileconsumer/matcher/internal/filter/filter.go @@ -11,7 +11,7 @@ import ( ) type Option interface { - // Returned error is for explanitory purposes only. + // Returned error is for explanatory purposes only. // All options will be called regardless of error. apply([]*item) ([]*item, error) } @@ -49,6 +49,12 @@ type item struct { } func newItem(value string, regex *regexp.Regexp) (*item, error) { + if regex == nil { + return &item{ + value: value, + }, nil + } + match := regex.FindStringSubmatch(value) if match == nil { return nil, fmt.Errorf("'%s' does not match regex", value) diff --git a/pkg/stanza/fileconsumer/matcher/internal/filter/sort.go b/pkg/stanza/fileconsumer/matcher/internal/filter/sort.go index 93c869b67f09..67002f7e3405 100644 --- a/pkg/stanza/fileconsumer/matcher/internal/filter/sort.go +++ b/pkg/stanza/fileconsumer/matcher/internal/filter/sort.go @@ -5,6 +5,7 @@ package filter // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "fmt" + "os" "sort" "strconv" "time" @@ -18,24 +19,24 @@ type parseFunc func(string) (any, error) type compareFunc func(a, b any) bool -type sortOption struct { +type regexSortOption struct { regexKey string parseFunc compareFunc } -func newSortOption(regexKey string, parseFunc parseFunc, compareFunc compareFunc) (Option, error) { +func newRegexSortOption(regexKey string, parseFunc parseFunc, compareFunc compareFunc) (Option, error) { if regexKey == "" { return nil, fmt.Errorf("regex key must be specified") } - return sortOption{ + return regexSortOption{ regexKey: regexKey, parseFunc: parseFunc, compareFunc: compareFunc, }, nil } -func (o sortOption) apply(items []*item) ([]*item, error) { +func (o regexSortOption) apply(items []*item) ([]*item, error) { // Special case where sort.Slice will not run the 'less' func. // We still need to ensure it parses in order to ensure the file should be included. if len(items) == 1 { @@ -80,7 +81,7 @@ func (o sortOption) apply(items []*item) ([]*item, error) { } func SortNumeric(regexKey string, ascending bool) (Option, error) { - return newSortOption(regexKey, + return newRegexSortOption(regexKey, func(s string) (any, error) { return strconv.Atoi(s) }, @@ -94,7 +95,7 @@ func SortNumeric(regexKey string, ascending bool) (Option, error) { } func SortAlphabetical(regexKey string, ascending bool) (Option, error) { - return newSortOption(regexKey, + return newRegexSortOption(regexKey, func(s string) (any, error) { return s, nil }, @@ -118,7 +119,7 @@ func SortTemporal(regexKey string, ascending bool, layout string, location strin if err != nil { return nil, fmt.Errorf("load location %s: %w", loc, err) } - return newSortOption(regexKey, + return newRegexSortOption(regexKey, func(s string) (any, error) { return timeutils.ParseStrptime(layout, s, loc) }, @@ -130,3 +131,46 @@ func SortTemporal(regexKey string, ascending bool, layout string, location strin }, ) } + +type mtimeSortOption struct{} + +type mtimeItem struct { + mtime time.Time + path string + item *item +} + +func (m mtimeSortOption) apply(items []*item) ([]*item, error) { + mtimeItems := make([]mtimeItem, 0, len(items)) + var errs error + for _, item := range items { + path := item.value + fi, err := os.Stat(path) + if err != nil { + errs = multierr.Append(errs, err) + continue + } + + mtimeItems = append(mtimeItems, mtimeItem{ + mtime: fi.ModTime(), + path: path, + item: item, + }) + } + + sort.SliceStable(mtimeItems, func(i, j int) bool { + // This checks if item i > j, in order to reverse the sort (most recently modified file is first in the list) + return mtimeItems[i].mtime.After(mtimeItems[j].mtime) + }) + + filteredValues := make([]*item, 0, len(items)) + for _, mtimeItem := range mtimeItems { + filteredValues = append(filteredValues, mtimeItem.item) + } + + return filteredValues, errs +} + +func SortMtime() Option { + return mtimeSortOption{} +} diff --git a/pkg/stanza/fileconsumer/matcher/internal/filter/sort_test.go b/pkg/stanza/fileconsumer/matcher/internal/filter/sort_test.go index 9790b3622323..9257d8b12c18 100644 --- a/pkg/stanza/fileconsumer/matcher/internal/filter/sort_test.go +++ b/pkg/stanza/fileconsumer/matcher/internal/filter/sort_test.go @@ -5,8 +5,11 @@ package filter import ( "fmt" + "os" + "path/filepath" "regexp" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -169,3 +172,71 @@ func TestSort(t *testing.T) { } } } + +func TestMTimeFilter(t *testing.T) { + epoch := time.Unix(0, 0) + cases := []struct { + name string + files []string + fileMTimes []time.Time + expectedErr string + expect []string + }{ + { + name: "No files", + files: []string{}, + fileMTimes: []time.Time{}, + expect: []string{}, + }, + { + name: "Single file", + files: []string{"a.log"}, + fileMTimes: []time.Time{epoch}, + expect: []string{"a.log"}, + }, + { + name: "Multiple files", + files: []string{"a.log", "b.log"}, + fileMTimes: []time.Time{epoch, epoch.Add(time.Hour)}, + expect: []string{"b.log", "a.log"}, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tmpDir := t.TempDir() + items := []*item{} + // Create files with specified mtime + for i, file := range tc.files { + mtime := tc.fileMTimes[i] + fullPath := filepath.Join(tmpDir, file) + + f, err := os.Create(fullPath) + require.NoError(t, err) + require.NoError(t, f.Close()) + require.NoError(t, os.Chtimes(fullPath, epoch, mtime)) + + it, err := newItem(fullPath, nil) + require.NoError(t, err) + + items = append(items, it) + } + + f := SortMtime() + result, err := f.apply(items) + if tc.expectedErr != "" { + require.EqualError(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + } + + relativeResult := []string{} + for _, r := range result { + rel, err := filepath.Rel(tmpDir, r.value) + require.NoError(t, err) + relativeResult = append(relativeResult, rel) + } + + require.Equal(t, tc.expect, relativeResult) + }) + } +} diff --git a/pkg/stanza/fileconsumer/matcher/matcher.go b/pkg/stanza/fileconsumer/matcher/matcher.go index 76cdd1bd4feb..a1fc7109a19e 100644 --- a/pkg/stanza/fileconsumer/matcher/matcher.go +++ b/pkg/stanza/fileconsumer/matcher/matcher.go @@ -8,6 +8,8 @@ import ( "fmt" "regexp" + "go.opentelemetry.io/collector/featuregate" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher/internal/filter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher/internal/finder" ) @@ -16,12 +18,20 @@ const ( sortTypeNumeric = "numeric" sortTypeTimestamp = "timestamp" sortTypeAlphabetical = "alphabetical" + sortTypeMtime = "mtime" ) const ( defaultOrderingCriteriaTopN = 1 ) +var mtimeSortTypeFeatureGate = featuregate.GlobalRegistry().MustRegister( + "filelog.mtimeSortType", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, allows usage of `ordering_criteria.mode` = `mtime`."), + featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27812"), +) + type Criteria struct { Include []string `mapstructure:"include,omitempty"` Exclude []string `mapstructure:"exclude,omitempty"` @@ -63,10 +73,6 @@ func New(c Criteria) (*Matcher, error) { }, nil } - if c.OrderingCriteria.Regex == "" { - return nil, fmt.Errorf("'regex' must be specified when 'sort_by' is specified") - } - if c.OrderingCriteria.TopN < 0 { return nil, fmt.Errorf("'top_n' must be a positive integer") } @@ -75,9 +81,17 @@ func New(c Criteria) (*Matcher, error) { c.OrderingCriteria.TopN = defaultOrderingCriteriaTopN } - regex, err := regexp.Compile(c.OrderingCriteria.Regex) - if err != nil { - return nil, fmt.Errorf("compile regex: %w", err) + var regex *regexp.Regexp + if orderingCriteriaNeedsRegex(c.OrderingCriteria.SortBy) { + if c.OrderingCriteria.Regex == "" { + return nil, fmt.Errorf("'regex' must be specified when 'sort_by' is specified") + } + + var err error + regex, err = regexp.Compile(c.OrderingCriteria.Regex) + if err != nil { + return nil, fmt.Errorf("compile regex: %w", err) + } } var filterOpts []filter.Option @@ -101,6 +115,11 @@ func New(c Criteria) (*Matcher, error) { return nil, fmt.Errorf("timestamp sort: %w", err) } filterOpts = append(filterOpts, f) + case sortTypeMtime: + if !mtimeSortTypeFeatureGate.IsEnabled() { + return nil, fmt.Errorf("the %q feature gate must be enabled to use %q sort type", mtimeSortTypeFeatureGate.ID(), sortTypeMtime) + } + filterOpts = append(filterOpts, filter.SortMtime()) default: return nil, fmt.Errorf("'sort_type' must be specified") } @@ -115,6 +134,17 @@ func New(c Criteria) (*Matcher, error) { }, nil } +// orderingCriteriaNeedsRegex returns true if any of the sort options require a regex to be set. +func orderingCriteriaNeedsRegex(sorts []Sort) bool { + for _, s := range sorts { + switch s.SortType { + case sortTypeNumeric, sortTypeAlphabetical, sortTypeTimestamp: + return true + } + } + return false +} + type Matcher struct { include []string exclude []string diff --git a/pkg/stanza/fileconsumer/matcher/matcher_test.go b/pkg/stanza/fileconsumer/matcher/matcher_test.go index 1d9de6f17f87..2c8f799ac915 100644 --- a/pkg/stanza/fileconsumer/matcher/matcher_test.go +++ b/pkg/stanza/fileconsumer/matcher/matcher_test.go @@ -10,13 +10,15 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/featuregate" ) func TestNew(t *testing.T) { cases := []struct { - name string - criteria Criteria - expectedErr string + name string + criteria Criteria + expectedErr string + enableMtimeFeatureGate bool }{ { name: "IncludeEmpty", @@ -179,9 +181,41 @@ func TestNew(t *testing.T) { }, expectedErr: "timestamp sort: regex key must be specified", }, + { + name: "SortByMtime", + criteria: Criteria{ + Include: []string{"*.log"}, + OrderingCriteria: OrderingCriteria{ + SortBy: []Sort{ + { + SortType: "mtime", + }, + }, + }, + }, + enableMtimeFeatureGate: true, + }, + { + name: "SortByMtimeGateDisabled", + criteria: Criteria{ + Include: []string{"*.log"}, + OrderingCriteria: OrderingCriteria{ + SortBy: []Sort{ + { + SortType: "mtime", + }, + }, + }, + }, + expectedErr: `the "filelog.mtimeSortType" feature gate must be enabled to use "mtime" sort type`, + }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { + if tc.enableMtimeFeatureGate { + enableSortByMTimeFeature(t) + } + matcher, err := New(tc.criteria) if tc.expectedErr != "" { assert.EqualError(t, err, tc.expectedErr) @@ -714,3 +748,12 @@ func TestMatcher(t *testing.T) { }) } } + +func enableSortByMTimeFeature(t *testing.T) { + if !mtimeSortTypeFeatureGate.IsEnabled() { + require.NoError(t, featuregate.GlobalRegistry().Set(mtimeSortTypeFeatureGate.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(mtimeSortTypeFeatureGate.ID(), false)) + }) + } +} diff --git a/receiver/filelogreceiver/README.md b/receiver/filelogreceiver/README.md index 2b23516953af..8f0795d4d7ed 100644 --- a/receiver/filelogreceiver/README.md +++ b/receiver/filelogreceiver/README.md @@ -52,7 +52,7 @@ Tails and parses logs from files. | `retry_on_failure.max_elapsed_time` | `5m` | Maximum amount of [time](#time-parameters) (including retries) spent trying to send a logs batch to a downstream consumer. Once this value is reached, the data is discarded. Retrying never stops if set to `0`. | `ordering_criteria.regex` | | Regular expression used for sorting, should contain a named capture groups that are to be used in `regex_key`. | | `ordering_criteria.top_n` | 1 | The number of files to track when using file ordering. The top N files are tracked after applying the ordering criteria. | -| `ordering_criteria.sort_by.sort_type` | | Type of sorting to be performed (e.g., `numeric`, `alphabetical`, `timestamp`) | +| `ordering_criteria.sort_by.sort_type` | | Type of sorting to be performed (e.g., `numeric`, `alphabetical`, `timestamp`, `mtime`) | | `ordering_criteria.sort_by.location` | | Relevant if `sort_type` is set to `timestamp`. Defines the location of the timestamp of the file. | | `ordering_criteria.sort_by.format` | | Relevant if `sort_type` is set to `timestamp`. Defines the strptime format of the timestamp being sorted. | | `ordering_criteria.sort_by.ascending` | | Sort direction |