Skip to content

Commit

Permalink
Allow to forget old serializers
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Oct 2, 2024
1 parent e66ddcf commit 3dee6f3
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 0 deletions.
9 changes: 9 additions & 0 deletions plugins/outputs/remotefile/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ to use them.
## Maximum size of the cache on disk (infinite by default)
# cache_max_size = -1

## Forget files after not being touched for longer than the given time
## This is useful to prevent memory leaks when using time-based filenames
## as it allows internal structures to be cleaned up.
## Note: When writing to a file after is has been forgotten, the file is
## treated as a new file which might cause file-headers to be appended
## again by certain serializers like CSV.
## By default files will be kept indefinitely.
# forget_files_after = "0s"

## Output log messages of the underlying library as debug messages
## NOTE: You need to enable this option AND run Telegraf in debug mode!
# trace = false
Expand Down
16 changes: 16 additions & 0 deletions plugins/outputs/remotefile/remotefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type File struct {
MaxCacheSize config.Size `toml:"cache_max_size"`
UseBatchFormat bool `toml:"use_batch_format"`
Trace bool `toml:"trace"`
ForgetFiles config.Duration `toml:"forget_files_after"`
Log telegraf.Logger `toml:"-"`

root *vfs.VFS
Expand All @@ -43,6 +44,7 @@ type File struct {
templates []*template.Template
serializerFunc telegraf.SerializerFunc
serializers map[string]telegraf.Serializer
modified map[string]time.Time
}

func (*File) SampleConfig() string {
Expand Down Expand Up @@ -101,6 +103,7 @@ func (f *File) Init() error {
}

f.serializers = make(map[string]telegraf.Serializer)
f.modified = make(map[string]time.Time)

return nil
}
Expand Down Expand Up @@ -216,6 +219,7 @@ func (f *File) Write(metrics []telegraf.Metric) error {
}

// Write the files
t := time.Now()
for fn, serialized := range groupBuffer {
// Make sure the directory exists
dir := filepath.Dir(filepath.ToSlash(fn))
Expand All @@ -241,6 +245,18 @@ func (f *File) Write(metrics []telegraf.Metric) error {
return fmt.Errorf("writing metrics to file %q failed: %w", fn, err)
}
file.Close()

f.modified[fn] = t
}

// Cleanup internal structures for old files
if f.ForgetFiles > 0 {
for fn, tmod := range f.modified {
if t.Sub(tmod) > time.Duration(f.ForgetFiles) {
delete(f.serializers, fn)
delete(f.modified, fn)
}
}
}

return nil
Expand Down
61 changes: 61 additions & 0 deletions plugins/outputs/remotefile/remotefile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,65 @@ func TestCSVSerialization(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expectedContent, string(actual))
}

require.Len(t, plugin.modified, 2)
require.Contains(t, plugin.modified, "test-a.csv")
require.Contains(t, plugin.modified, "test-b.csv")
require.Len(t, plugin.serializers, 2)
require.Contains(t, plugin.serializers, "test-a.csv")
require.Contains(t, plugin.serializers, "test-b.csv")
}

func TestForgettingFiles(t *testing.T) {
input := []telegraf.Metric{
metric.New(
"test",
map[string]string{"source": "a"},
map[string]interface{}{"value": 42},
time.Unix(1587686400, 0),
),
metric.New(
"test",
map[string]string{"source": "b"},
map[string]interface{}{"value": 23},
time.Unix(1587686400, 0),
),
}

tmpdir, err := os.MkdirTemp("", "telegraf-remotefile-*")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)

// Setup the plugin including the serializer
plugin := &File{
Remote: config.NewSecret([]byte("local:" + tmpdir)),
Files: []string{`test-{{.Tag "source"}}.csv`},
WriteBackInterval: config.Duration(100 * time.Millisecond),
ForgetFiles: config.Duration(10 * time.Millisecond),
Log: &testutil.Logger{},
}

plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
serializer := &csv.Serializer{Header: true}
err := serializer.Init()
return serializer, err
})

require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()

// Write the input metrics and close the plugin. This is required to
// actually flush the data to disk
require.NoError(t, plugin.Write(input[:1]))
time.Sleep(100 * time.Millisecond)
require.NoError(t, plugin.Write(input[1:]))

plugin.Close()

// Check the result
require.Len(t, plugin.modified, 1)
require.Contains(t, plugin.modified, "test-b.csv")
require.Len(t, plugin.serializers, 1)
require.Contains(t, plugin.serializers, "test-b.csv")
}
9 changes: 9 additions & 0 deletions plugins/outputs/remotefile/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@
## Maximum size of the cache on disk (infinite by default)
# cache_max_size = -1

## Forget files after not being touched for longer than the given time
## This is useful to prevent memory leaks when using time-based filenames
## as it allows internal structures to be cleaned up.
## Note: When writing to a file after is has been forgotten, the file is
## treated as a new file which might cause file-headers to be appended
## again by certain serializers like CSV.
## By default files will be kept indefinitely.
# forget_files_after = "0s"

## Output log messages of the underlying library as debug messages
## NOTE: You need to enable this option AND run Telegraf in debug mode!
# trace = false
Expand Down

0 comments on commit 3dee6f3

Please sign in to comment.