From 62bc2e13f4153e32060d1c797abc149994513353 Mon Sep 17 00:00:00 2001 From: Josh Powers Date: Tue, 23 Jul 2024 07:09:47 -0600 Subject: [PATCH] Fixes from review: * extra word typo in readme * initialize timestamp field in init(), update tests * always use string arrow type for tags --- plugins/outputs/parquet/README.md | 2 +- plugins/outputs/parquet/parquet.go | 19 ++++++++----------- plugins/outputs/parquet/parquet_test.go | 12 ++++++------ 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/plugins/outputs/parquet/README.md b/plugins/outputs/parquet/README.md index 0c69bf9778111..428dcb3a88ade 100644 --- a/plugins/outputs/parquet/README.md +++ b/plugins/outputs/parquet/README.md @@ -1,6 +1,6 @@ # Parquet Output Plugin -This plugin sends writes metrics to parquet files. By default, the parquet +This plugin writes metrics to parquet files. By default, the parquet output groups metrics by metric name and write those metrics all to the same file. If a metric schema does not match then metrics are dropped. diff --git a/plugins/outputs/parquet/parquet.go b/plugins/outputs/parquet/parquet.go index 87d9412248c6c..39e459cb1c4bf 100644 --- a/plugins/outputs/parquet/parquet.go +++ b/plugins/outputs/parquet/parquet.go @@ -22,6 +22,8 @@ import ( //go:embed sample.conf var sampleConfig string +var defaultTimestampFieldName = "timestamp" + type metricGroup struct { filename string builder *array.RecordBuilder @@ -47,11 +49,6 @@ func (p *Parquet) Init() error { p.Directory = "." } - if p.TimestampFieldName == nil { - timestampFieldName := "timestamp" - p.TimestampFieldName = ×tampFieldName - } - stat, err := os.Stat(p.Directory) if os.IsNotExist(err) { if err := os.MkdirAll(p.Directory, 0750); err != nil { @@ -255,11 +252,7 @@ func (p *Parquet) createSchema(metrics []telegraf.Metric) (*arrow.Schema, error) } for _, tag := range metric.TagList() { if _, ok := rawFields[tag.Key]; !ok { - arrowType, err := goToArrowType(tag.Value) - if err != nil { - return nil, fmt.Errorf("error converting '%s=%s' tag to arrow type: %w", tag.Key, tag.Value, err) - } - rawFields[tag.Key] = arrowType + rawFields[tag.Key] = arrow.BinaryTypes.String } } } @@ -335,5 +328,9 @@ func goToArrowType(value interface{}) (arrow.DataType, error) { } func init() { - outputs.Add("parquet", func() telegraf.Output { return &Parquet{} }) + outputs.Add("parquet", func() telegraf.Output { + return &Parquet{ + TimestampFieldName: &defaultTimestampFieldName, + } + }) } diff --git a/plugins/outputs/parquet/parquet_test.go b/plugins/outputs/parquet/parquet_test.go index 5f61764fc4540..0c54a068f135b 100644 --- a/plugins/outputs/parquet/parquet_test.go +++ b/plugins/outputs/parquet/parquet_test.go @@ -125,7 +125,8 @@ func TestCases(t *testing.T) { t.Run(tc.name, func(t *testing.T) { testDir := t.TempDir() plugin := &Parquet{ - Directory: testDir, + Directory: testDir, + TimestampFieldName: &defaultTimestampFieldName, } require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -161,8 +162,9 @@ func TestRotation(t *testing.T) { testDir := t.TempDir() plugin := &Parquet{ - Directory: testDir, - RotationInterval: config.Duration(1 * time.Second), + Directory: testDir, + RotationInterval: config.Duration(1 * time.Second), + TimestampFieldName: &defaultTimestampFieldName, } require.NoError(t, plugin.Init()) @@ -188,11 +190,9 @@ func TestOmitTimestamp(t *testing.T) { ), } - emptyTimestamp := "" testDir := t.TempDir() plugin := &Parquet{ - Directory: testDir, - TimestampFieldName: &emptyTimestamp, + Directory: testDir, } require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect())