Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(outputs.remotefile): Create a new serializer instance per output file #15968

Merged
merged 4 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,25 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table)
return running, err
}

func (c *Config) probeSerializer(table *ast.Table) bool {
dataFormat := c.getFieldString(table, "data_format")
if dataFormat == "" {
dataFormat = "influx"
}

creator, ok := serializers.Serializers[dataFormat]
if !ok {
return false
}

// Try to parse the options to detect if any of them is misspelled
serializer := creator()
//nolint:errcheck // We don't actually use the parser, so no need to check the error.
c.toml.UnmarshalTable(table, serializer)

return true
}

func (c *Config) addSerializer(parentname string, table *ast.Table) (*models.RunningSerializer, error) {
conf := &models.SerializerConfig{
Parent: parentname,
Expand Down Expand Up @@ -1140,6 +1159,15 @@ func (c *Config) setupProcessor(name string, creator processors.StreamingCreator
t.SetSerializer(serializer)
optionTestCount++
}
if t, ok := processor.(telegraf.SerializerFuncPlugin); ok {
if !c.probeSerializer(table) {
return nil, 0, errors.New("serializer not found")
}
t.SetSerializerFunc(func() (telegraf.Serializer, error) {
return c.addSerializer(name, table)
})
optionTestCount++
}

if err := c.toml.UnmarshalTable(table, processor); err != nil {
return nil, 0, fmt.Errorf("unmarshalling failed: %w", err)
Expand All @@ -1154,8 +1182,8 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
return nil
}

// For inputs with parsers we need to compute the set of
// options that is not covered by both, the parser and the input.
// For outputs with serializers we need to compute the set of
// options that is not covered by both, the serializer and the input.
// We achieve this by keeping a local book of missing entries
// that counts the number of misses. In case we have a parser
// for the input both need to miss the entry. We count the
Expand Down Expand Up @@ -1196,6 +1224,16 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
t.SetSerializer(serializer)
}

if t, ok := output.(telegraf.SerializerFuncPlugin); ok {
missThreshold = 1
if !c.probeSerializer(table) {
return errors.New("serializer not found")
}
t.SetSerializerFunc(func() (telegraf.Serializer, error) {
return c.addSerializer(name, table)
})
}

outputConfig, err := c.buildOutput(name, table)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Parser interface {
SetDefaultTags(tags map[string]string)
}

// ParserFunc is a function to create a new instance of a parser
type ParserFunc func() (Parser, error)

// ParserPlugin is an interface for plugins that are able to parse
Expand Down
9 changes: 9 additions & 0 deletions plugins/outputs/remotefile/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ to use them.
## Maximum size of the cache on disk (infinite by default)
# cache_max_size = -1

## Forget files after not being touched for longer than the given time
## This is useful to prevent memory leaks when using time-based filenames
## as it allows internal structures to be cleaned up.
## Note: When writing to a file after is has been forgotten, the file is
## treated as a new file which might cause file-headers to be appended
## again by certain serializers like CSV.
## By default files will be kept indefinitely.
# forget_files_after = "0s"

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand Down
40 changes: 33 additions & 7 deletions plugins/outputs/remotefile/remotefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)

//go:embed sample.conf
Expand All @@ -35,22 +34,25 @@ type File struct {
MaxCacheSize config.Size `toml:"cache_max_size"`
UseBatchFormat bool `toml:"use_batch_format"`
Trace bool `toml:"trace" deprecated:"1.33.0;1.35.0;use 'log_level = \"trace\"' instead"`
ForgetFiles config.Duration `toml:"forget_files_after"`
Log telegraf.Logger `toml:"-"`

root *vfs.VFS
fscancel context.CancelFunc
vfsopts vfscommon.Options

templates []*template.Template
serializer serializers.Serializer
templates []*template.Template
serializerFunc telegraf.SerializerFunc
serializers map[string]telegraf.Serializer
modified map[string]time.Time
}

func (*File) SampleConfig() string {
return sampleConfig
}

func (f *File) SetSerializer(serializer serializers.Serializer) {
f.serializer = serializer
func (f *File) SetSerializerFunc(sf telegraf.SerializerFunc) {
f.serializerFunc = sf
}

func (f *File) Init() error {
Expand Down Expand Up @@ -101,6 +103,9 @@ func (f *File) Init() error {
f.templates = append(f.templates, tmpl)
}

f.serializers = make(map[string]telegraf.Serializer)
f.modified = make(map[string]time.Time)

return nil
}

Expand Down Expand Up @@ -187,16 +192,24 @@ func (f *File) Write(metrics []telegraf.Metric) error {
// Serialize the metric groups
groupBuffer := make(map[string][]byte, len(groups))
for fn, fnMetrics := range groups {
if _, found := f.serializers[fn]; !found {
var err error
if f.serializers[fn], err = f.serializerFunc(); err != nil {
return fmt.Errorf("creating serializer failed: %w", err)
}
}
serializer := f.serializers[fn]

if f.UseBatchFormat {
serialized, err := f.serializer.SerializeBatch(fnMetrics)
serialized, err := serializer.SerializeBatch(fnMetrics)
if err != nil {
f.Log.Errorf("Could not serialize metrics: %v", err)
continue
}
groupBuffer[fn] = serialized
} else {
for _, m := range fnMetrics {
serialized, err := f.serializer.Serialize(m)
serialized, err := serializer.Serialize(m)
if err != nil {
f.Log.Debugf("Could not serialize metric: %v", err)
continue
Expand All @@ -207,6 +220,7 @@ func (f *File) Write(metrics []telegraf.Metric) error {
}

// Write the files
t := time.Now()
for fn, serialized := range groupBuffer {
// Make sure the directory exists
dir := filepath.Dir(filepath.ToSlash(fn))
Expand All @@ -232,6 +246,18 @@ func (f *File) Write(metrics []telegraf.Metric) error {
return fmt.Errorf("writing metrics to file %q failed: %w", fn, err)
}
file.Close()

f.modified[fn] = t
}

// Cleanup internal structures for old files
if f.ForgetFiles > 0 {
for fn, tmod := range f.modified {
if t.Sub(tmod) > time.Duration(f.ForgetFiles) {
delete(f.serializers, fn)
delete(f.modified, fn)
}
}
}

return nil
Expand Down
151 changes: 139 additions & 12 deletions plugins/outputs/remotefile/remotefile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers/csv"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
Expand Down Expand Up @@ -40,9 +41,11 @@ func TestStaticFileCreation(t *testing.T) {
Log: &testutil.Logger{},
}

serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
plugin.SetSerializer(serializer)
plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
serializer := &influx.Serializer{}
err := serializer.Init()
return serializer, err
})

require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
Expand Down Expand Up @@ -93,9 +96,11 @@ func TestStaticFileAppend(t *testing.T) {
Log: &testutil.Logger{},
}

serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
plugin.SetSerializer(serializer)
plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
serializer := &influx.Serializer{}
err := serializer.Init()
return serializer, err
})

require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
Expand Down Expand Up @@ -180,9 +185,11 @@ func TestDynamicFiles(t *testing.T) {
Log: &testutil.Logger{},
}

serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
plugin.SetSerializer(serializer)
plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
serializer := &influx.Serializer{}
err := serializer.Init()
return serializer, err
})

require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
Expand Down Expand Up @@ -246,9 +253,11 @@ func TestCustomTemplateFunctions(t *testing.T) {
Log: &testutil.Logger{},
}

serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
plugin.SetSerializer(serializer)
plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
serializer := &influx.Serializer{}
err := serializer.Init()
return serializer, err
})

require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
Expand All @@ -266,3 +275,121 @@ func TestCustomTemplateFunctions(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expected, string(actual))
}

func TestCSVSerialization(t *testing.T) {
input := []telegraf.Metric{
metric.New(
"test",
map[string]string{"source": "a"},
map[string]interface{}{"value": 42},
time.Unix(1587686400, 0),
),
metric.New(
"test",
map[string]string{"source": "b"},
map[string]interface{}{"value": 23},
time.Unix(1587686400, 0),
),
}
expected := map[string]string{
"test-a.csv": "timestamp,measurement,source,value\n1587686400,test,a,42\n",
"test-b.csv": "timestamp,measurement,source,value\n1587686400,test,b,23\n",
}

tmpdir, err := os.MkdirTemp("", "telegraf-remotefile-*")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)

// Setup the plugin including the serializer
plugin := &File{
Remote: config.NewSecret([]byte("local:" + tmpdir)),
Files: []string{`test-{{.Tag "source"}}.csv`},
WriteBackInterval: config.Duration(100 * time.Millisecond),
Log: &testutil.Logger{},
}

plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
serializer := &csv.Serializer{Header: true}
err := serializer.Init()
return serializer, err
})

require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()

// Write the input metrics and close the plugin. This is required to
// actually flush the data to disk
require.NoError(t, plugin.Write(input))
plugin.Close()

// Check the result
for expectedFilename, expectedContent := range expected {
require.FileExists(t, filepath.Join(tmpdir, expectedFilename))
buf, err := os.ReadFile(filepath.Join(tmpdir, expectedFilename))
require.NoError(t, err)
actual := strings.ReplaceAll(string(buf), "\r\n", "\n")
require.Equal(t, expectedContent, actual)
}

require.Len(t, plugin.modified, 2)
require.Contains(t, plugin.modified, "test-a.csv")
require.Contains(t, plugin.modified, "test-b.csv")
require.Len(t, plugin.serializers, 2)
require.Contains(t, plugin.serializers, "test-a.csv")
require.Contains(t, plugin.serializers, "test-b.csv")
}

func TestForgettingFiles(t *testing.T) {
input := []telegraf.Metric{
metric.New(
"test",
map[string]string{"source": "a"},
map[string]interface{}{"value": 42},
time.Unix(1587686400, 0),
),
metric.New(
"test",
map[string]string{"source": "b"},
map[string]interface{}{"value": 23},
time.Unix(1587686400, 0),
),
}

tmpdir, err := os.MkdirTemp("", "telegraf-remotefile-*")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)

// Setup the plugin including the serializer
plugin := &File{
Remote: config.NewSecret([]byte("local:" + tmpdir)),
Files: []string{`test-{{.Tag "source"}}.csv`},
WriteBackInterval: config.Duration(100 * time.Millisecond),
ForgetFiles: config.Duration(10 * time.Millisecond),
Log: &testutil.Logger{},
}

plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
serializer := &csv.Serializer{Header: true}
err := serializer.Init()
return serializer, err
})

require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()

// Write the input metrics and close the plugin. This is required to
// actually flush the data to disk
require.NoError(t, plugin.Write(input[:1]))
time.Sleep(100 * time.Millisecond)
require.NoError(t, plugin.Write(input[1:]))

plugin.Close()

// Check the result
require.Len(t, plugin.modified, 1)
require.Contains(t, plugin.modified, "test-b.csv")
require.Len(t, plugin.serializers, 1)
require.Contains(t, plugin.serializers, "test-b.csv")
}
Loading
Loading