From b0c805e6cd51a7fc3e5fe42cb57fc87a1cb74dbe Mon Sep 17 00:00:00 2001 From: Josh Powers Date: Tue, 12 Mar 2024 07:08:27 -0600 Subject: [PATCH] feat(outputs.parquet): Introduce Parquet output fixes: #14786 --- go.mod | 12 +- go.sum | 23 +- plugins/inputs/sql/drivers.go | 2 +- plugins/outputs/all/parquet.go | 5 + plugins/outputs/parquet/README.md | 106 ++++++++ plugins/outputs/parquet/parquet.go | 338 ++++++++++++++++++++++++ plugins/outputs/parquet/parquet_test.go | 208 +++++++++++++++ plugins/outputs/parquet/sample.conf | 14 + 8 files changed, 690 insertions(+), 18 deletions(-) create mode 100644 plugins/outputs/all/parquet.go create mode 100644 plugins/outputs/parquet/README.md create mode 100644 plugins/outputs/parquet/parquet.go create mode 100644 plugins/outputs/parquet/parquet_test.go create mode 100644 plugins/outputs/parquet/sample.conf diff --git a/go.mod b/go.mod index a6a316177f75a..56f346fc05847 100644 --- a/go.mod +++ b/go.mod @@ -35,10 +35,10 @@ require ( github.com/antchfx/jsonquery v1.3.3 github.com/antchfx/xmlquery v1.4.0 github.com/antchfx/xpath v1.3.0 - github.com/apache/arrow/go/v13 v13.0.0 - github.com/apache/arrow/go/v16 v16.0.0-20240319161736-1ee3da0064a0 + github.com/apache/arrow/go/v16 v16.1.0 + github.com/apache/arrow/go/v17 v17.0.0 github.com/apache/iotdb-client-go v1.2.0-tsbs - github.com/apache/thrift v0.19.0 + github.com/apache/thrift v0.20.0 github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 github.com/awnumar/memguard v0.22.5 @@ -344,7 +344,7 @@ require ( github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/google/flatbuffers v24.3.7+incompatible // indirect + github.com/google/flatbuffers v24.3.25+incompatible // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -367,7 +367,7 @@ require ( github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/hashicorp/packer-plugin-sdk v0.3.2 // indirect github.com/hashicorp/serf v0.10.1 // indirect - github.com/huandu/xstrings v1.3.3 // indirect + github.com/huandu/xstrings v1.4.0 // indirect github.com/imdario/mergo v0.3.16 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect @@ -388,7 +388,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/jzelinskie/whirlpool v0.0.0-20201016144138-0675e54bb004 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect - github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/kr/fs v0.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b // indirect diff --git a/go.sum b/go.sum index 9b163ee2e6213..1d0c7dbfaf368 100644 --- a/go.sum +++ b/go.sum @@ -827,18 +827,18 @@ github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8 github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0= github.com/apache/arrow/go/v11 v11.0.0/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4xei5aX110hRiI= -github.com/apache/arrow/go/v13 v13.0.0 h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUuSVcWk= -github.com/apache/arrow/go/v13 v13.0.0/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc= github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE= github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA= -github.com/apache/arrow/go/v16 v16.0.0-20240319161736-1ee3da0064a0 h1:XbC214lVvnAnDzowGV7dYiv4f4Aa6jhtIby08OgbcUg= -github.com/apache/arrow/go/v16 v16.0.0-20240319161736-1ee3da0064a0/go.mod h1:VVbdJivCXZAJ6IhOSCSzk/RVQ/PlcitjskAWEST3Sc0= +github.com/apache/arrow/go/v16 v16.1.0 h1:dwgfOya6s03CzH9JrjCBx6bkVb4yPD4ma3haj9p7FXI= +github.com/apache/arrow/go/v16 v16.1.0/go.mod h1:9wnc9mn6vEDTRIm4+27pEjQpRKuTvBaessPoEXQzxWA= +github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN5+F54= +github.com/apache/arrow/go/v17 v17.0.0/go.mod h1:jR7QHkODl15PfYyjM2nU+yTLScZ/qfj7OSUZmJ8putc= github.com/apache/iotdb-client-go v1.2.0-tsbs h1:hezGUydAkDSceCvsetYorI87S2e8HZ4hTQHmGZgOGDY= github.com/apache/iotdb-client-go v1.2.0-tsbs/go.mod h1:3D6QYkqRmASS/4HsjU+U/3fscyc5M9xKRfywZsKuoZY= github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= -github.com/apache/thrift v0.19.0 h1:sOqkWPzMj7w6XaYbJQG7m4sGqVolaW/0D28Ln7yPzMk= -github.com/apache/thrift v0.19.0/go.mod h1:SUALL216IiaOw2Oy+5Vs9lboJ/t9g40C+G07Dc0QC1I= +github.com/apache/thrift v0.20.0 h1:631+KvYbsBZxmuJjYwhezVsrfc/TbqtZV4QcxOX1fOI= +github.com/apache/thrift v0.20.0/go.mod h1:hOk1BQqcp2OLzGsyVXdfMk7YFlMxK3aoEVhjD06QhB8= github.com/apex/log v1.6.0/go.mod h1:x7s+P9VtvFBXge9Vbn+8TrqKmuzmD35TTkeBHul8UtY= github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo= github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy8kCu4PNA+aP7WUV72eXWJeP9/r3/K9aLE= @@ -1382,8 +1382,8 @@ github.com/google/cel-go v0.20.1 h1:nDx9r8S3L4pE61eDdt8igGj8rf5kjYR3ILxWIpWNi84= github.com/google/cel-go v0.20.1/go.mod h1:kWcIzTsPX0zmQ+H3TirHstLLf9ep5QTsZBN9u4dOYLg= github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= -github.com/google/flatbuffers v24.3.7+incompatible h1:BxGUkIQnOciBu33bd5BdvqY8Qvo0O/GR4SPhh7x9Ed0= -github.com/google/flatbuffers v24.3.7+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI= +github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/gnxi v0.0.0-20231026134436-d82d9936af15 h1:EETGSLGKBReUUYZdztSp45EzTE6CHw2qMKIfyPrgp6c= @@ -1593,8 +1593,9 @@ github.com/henrybear327/go-proton-api v1.0.0/go.mod h1:w63MZuzufKcIZ93pwRgiOtxMX github.com/hetznercloud/hcloud-go/v2 v2.4.0 h1:MqlAE+w125PLvJRCpAJmEwrIxoVdUdOyuFUhE/Ukbok= github.com/hetznercloud/hcloud-go/v2 v2.4.0/go.mod h1:l7fA5xsncFBzQTyw29/dw5Yr88yEGKKdc6BHf24ONS0= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/huandu/xstrings v1.3.3 h1:/Gcsuc1x8JVbJ9/rlye4xZnVAbEkGauT8lbebqcQws4= github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= +github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU= +github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -1766,8 +1767,8 @@ github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2 github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= -github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/klauspost/pgzip v1.2.4/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU= diff --git a/plugins/inputs/sql/drivers.go b/plugins/inputs/sql/drivers.go index 372325591275d..1c902eace2a5a 100644 --- a/plugins/inputs/sql/drivers.go +++ b/plugins/inputs/sql/drivers.go @@ -5,7 +5,7 @@ import ( _ "github.com/ClickHouse/clickhouse-go" _ "github.com/IBM/nzgo/v12" _ "github.com/SAP/go-hdb/driver" - _ "github.com/apache/arrow/go/v13/arrow/flight/flightsql/driver" + _ "github.com/apache/arrow/go/v17/arrow/flight/flightsql/driver" _ "github.com/go-sql-driver/mysql" _ "github.com/jackc/pgx/v4/stdlib" _ "github.com/microsoft/go-mssqldb" diff --git a/plugins/outputs/all/parquet.go b/plugins/outputs/all/parquet.go new file mode 100644 index 0000000000000..195a7fe0983bf --- /dev/null +++ b/plugins/outputs/all/parquet.go @@ -0,0 +1,5 @@ +//go:build !custom || outputs || outputs.parquet + +package all + +import _ "github.com/influxdata/telegraf/plugins/outputs/parquet" // register plugin diff --git a/plugins/outputs/parquet/README.md b/plugins/outputs/parquet/README.md new file mode 100644 index 0000000000000..fe22ec71d81c3 --- /dev/null +++ b/plugins/outputs/parquet/README.md @@ -0,0 +1,106 @@ +# Parquet Output Plugin + +This plugin sends writes metrics to parquet files. By default, the parquet +output will groups metrics by metric name and write those metrics all to the +same file. If a metric schema does not match then metrics are dropped. + +To lean more about Parquet check out the [Parquet docs][] as well as a blog +post on [Querying Parquet][]. + +[Parquet docs]: https://parquet.apache.org/docs/ +[Querying Parquet]: https://www.influxdata.com/blog/querying-parquet-millisecond-latency/ + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml @sample.conf +# A plugin that writes metrics to parquet files +[[outputs.parquet]] + ## Directory to write parquet files in. If a file already exists the output + ## will attempt to continue using the existing file. + # directory = "." + + ## Files are rotated after the time interval specified. When set to 0 no time + ## based rotation is performed. + # rotation_interval = "0h" + + ## Timestamp field name + ## Field name to use to store the timestamp. If set to an empty string, then + ## the timestamp is omitted. + # timestamp_field_name = "timestamp" +``` + +## Building Parquet Files + +### Schema + +Parquet files require a schema when writing files. To generate a schema, +Telegraf will go through all grouped metrics and generate an Apache Arrow schema +based on the union of all fields and tags. If a field and tag have the same name +then the field takes precedence. + +The consequence of schema generation is that the very first flush sequence a +metric is seen takes much longer due to the additional looping through the +metrics to generate the schema. Subsequent flush intervals are significantly +faster. + +When writing to a file, the schema is used to look for each value and if it is +not present a null value is added. The result is that if additional fields are +present after the first metric flush those fields are omitted. + +### Write + +The plugin makes use of the buffered writer. This may buffer some metrics into +memory before writing it to disk. This method is used as it can more compactly +write multiple flushes of metrics into a single Parquet row group. + +Additionally, the Parquet format requires a proper footer, so close must be +called on the file to ensure it is properly formatted. + +## File Rotation + +If a file with the same target name exists at start, the existing file is +rotated to avoid over-writing it or conflicting schema. + +File rotation is available via a time based interval that a user can optionally +set. Due to the usage of a buffered writer, a size based rotation is not +possible as the file may not actually get data at each interval. + +## Explore Parquet Files + +If a user wishes to explore a schema or data in a Parquet file quickly, then +look at the + +### CLI + +The Arrow repo contains a Go CLI tool to read and parse Parquet files: + +```s +go install github.com/apache/arrow/go/v16/parquet/cmd/parquet_reader@latest +parquet_reader +``` + +### Python + +Users can also use the [pyarrow][] library to quick open and explore Parquet +files: + +```python +import pyarrow.parquet as pq + +table = pq.read_table('example.parquet') +``` + +Once created, a user can look the various [pyarrow.Table][] functions to further +explore the data. + +[pyarrow]: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html +[pyarrow.Table]: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table diff --git a/plugins/outputs/parquet/parquet.go b/plugins/outputs/parquet/parquet.go new file mode 100644 index 0000000000000..328d3797e19d5 --- /dev/null +++ b/plugins/outputs/parquet/parquet.go @@ -0,0 +1,338 @@ +//go:generate ../../../tools/readme_config_includer/generator +package parquet + +import ( + _ "embed" + "errors" + "fmt" + "os" + "strconv" + "time" + + "github.com/apache/arrow/go/v16/arrow" + "github.com/apache/arrow/go/v16/arrow/array" + "github.com/apache/arrow/go/v16/arrow/memory" + "github.com/apache/arrow/go/v16/parquet" + "github.com/apache/arrow/go/v16/parquet/pqarrow" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/outputs" +) + +//go:embed sample.conf +var sampleConfig string + +type metricGroup struct { + filename string + builder *array.RecordBuilder + schema *arrow.Schema + writer *pqarrow.FileWriter +} + +type Parquet struct { + Directory string `toml:"directory"` + RotationInterval config.Duration `toml:"rotation_interval"` + TimestampFieldName *string `toml:"timestamp_field_name"` + Log telegraf.Logger `toml:"-"` + + metricGroups map[string]*metricGroup +} + +func (*Parquet) SampleConfig() string { + return sampleConfig +} + +func (p *Parquet) Init() error { + if p.Directory == "" { + p.Directory = "." + } + + if p.TimestampFieldName == nil { + timestampFieldName := "timestamp" + p.TimestampFieldName = ×tampFieldName + } + + stat, err := os.Stat(p.Directory) + if os.IsNotExist(err) { + if err := os.MkdirAll(p.Directory, 0750); err != nil { + return fmt.Errorf("failed to create directory %q: %w", p.Directory, err) + } + } else if !stat.IsDir() { + return fmt.Errorf("provided directory %q is not a directory", p.Directory) + } + + p.metricGroups = make(map[string]*metricGroup) + + return nil +} + +func (p *Parquet) Connect() error { + return nil +} + +func (p *Parquet) Close() error { + var errorOccurred bool + + for _, metrics := range p.metricGroups { + if err := metrics.writer.Close(); err != nil { + p.Log.Errorf("failed to close file %q: %v", metrics.filename, err) + errorOccurred = true + } + } + + if errorOccurred { + return errors.New("failed to closing parquet files") + } + + return nil +} + +func (p *Parquet) Write(metrics []telegraf.Metric) error { + groupedMetrics := make(map[string][]telegraf.Metric) + for _, metric := range metrics { + groupedMetrics[metric.Name()] = append(groupedMetrics[metric.Name()], metric) + } + + for name, metrics := range groupedMetrics { + if _, ok := p.metricGroups[name]; !ok { + filename := fmt.Sprintf("%s/%s.parquet", p.Directory, name) + schema, err := p.createSchema(metrics) + if err != nil { + return fmt.Errorf("failed to create schema for file %q: %w", name, err) + } + writer, err := p.createWriter(name, filename, schema) + if err != nil { + return fmt.Errorf("failed to create writer for file %q: %w", name, err) + } + p.metricGroups[name] = &metricGroup{ + builder: array.NewRecordBuilder(memory.DefaultAllocator, schema), + filename: filename, + schema: schema, + writer: writer, + } + } + + record, err := p.createRecord(metrics, p.metricGroups[name].builder, p.metricGroups[name].schema) + if err != nil { + return fmt.Errorf("failed to create record for file %q: %w", p.metricGroups[name].filename, err) + } + if err = p.metricGroups[name].writer.WriteBuffered(record); err != nil { + return fmt.Errorf("failed to write to file %q: %w", p.metricGroups[name].filename, err) + } + record.Release() + + if p.RotationInterval != 0 { + if err := p.rotateIfNeeded(name); err != nil { + return fmt.Errorf("failed to rotate file %q: %w", p.metricGroups[name].filename, err) + } + } + } + + return nil +} + +func (p *Parquet) rotateIfNeeded(name string) error { + fileInfo, err := os.Stat(p.metricGroups[name].filename) + if err != nil { + return fmt.Errorf("failed to stat file %q: %w", p.metricGroups[name].filename, err) + } + + expireTime := fileInfo.ModTime().Add(time.Duration(p.RotationInterval)) + if time.Now().Before(expireTime) { + return nil + } + + if err := p.metricGroups[name].writer.Close(); err != nil { + return fmt.Errorf("failed to close file for rotation %q: %w", p.metricGroups[name].filename, err) + } + + writer, err := p.createWriter(name, p.metricGroups[name].filename, p.metricGroups[name].schema) + if err != nil { + return fmt.Errorf("failed to create new writer for file %q: %w", p.metricGroups[name].filename, err) + } + p.metricGroups[name].writer = writer + + return nil +} + +func (p *Parquet) createRecord(metrics []telegraf.Metric, builder *array.RecordBuilder, schema *arrow.Schema) (arrow.Record, error) { + for index, col := range schema.Fields() { + for _, m := range metrics { + if p.TimestampFieldName != nil && *p.TimestampFieldName != "" && col.Name == *p.TimestampFieldName { + builder.Field(index).(*array.Int64Builder).Append(m.Time().UnixNano()) + continue + } + + // Try to get the value from a field first, then from a tag. + var value any + var ok bool + value, ok = m.GetField(col.Name) + if !ok { + value, ok = m.GetTag(col.Name) + } + + // if neither field nor tag exists, append a null value + if !ok { + switch col.Type { + case arrow.PrimitiveTypes.Int8: + builder.Field(index).(*array.Int8Builder).AppendNull() + case arrow.PrimitiveTypes.Int16: + builder.Field(index).(*array.Int16Builder).AppendNull() + case arrow.PrimitiveTypes.Int32: + builder.Field(index).(*array.Int32Builder).AppendNull() + case arrow.PrimitiveTypes.Int64: + builder.Field(index).(*array.Int64Builder).AppendNull() + case arrow.PrimitiveTypes.Uint8: + builder.Field(index).(*array.Uint8Builder).AppendNull() + case arrow.PrimitiveTypes.Uint16: + builder.Field(index).(*array.Uint16Builder).AppendNull() + case arrow.PrimitiveTypes.Uint32: + builder.Field(index).(*array.Uint32Builder).AppendNull() + case arrow.PrimitiveTypes.Uint64: + builder.Field(index).(*array.Uint64Builder).AppendNull() + case arrow.PrimitiveTypes.Float32: + builder.Field(index).(*array.Float32Builder).AppendNull() + case arrow.PrimitiveTypes.Float64: + builder.Field(index).(*array.Float64Builder).AppendNull() + case arrow.BinaryTypes.String: + builder.Field(index).(*array.StringBuilder).AppendNull() + case arrow.FixedWidthTypes.Boolean: + builder.Field(index).(*array.BooleanBuilder).AppendNull() + default: + return nil, fmt.Errorf("unsupported type: %T", value) + } + + continue + } + + switch col.Type { + case arrow.PrimitiveTypes.Int8: + builder.Field(index).(*array.Int8Builder).Append(value.(int8)) + case arrow.PrimitiveTypes.Int16: + builder.Field(index).(*array.Int16Builder).Append(value.(int16)) + case arrow.PrimitiveTypes.Int32: + builder.Field(index).(*array.Int32Builder).Append(value.(int32)) + case arrow.PrimitiveTypes.Int64: + builder.Field(index).(*array.Int64Builder).Append(value.(int64)) + case arrow.PrimitiveTypes.Uint8: + builder.Field(index).(*array.Uint8Builder).Append(value.(uint8)) + case arrow.PrimitiveTypes.Uint16: + builder.Field(index).(*array.Uint16Builder).Append(value.(uint16)) + case arrow.PrimitiveTypes.Uint32: + builder.Field(index).(*array.Uint32Builder).Append(value.(uint32)) + case arrow.PrimitiveTypes.Uint64: + builder.Field(index).(*array.Uint64Builder).Append(value.(uint64)) + case arrow.PrimitiveTypes.Float32: + builder.Field(index).(*array.Float32Builder).Append(value.(float32)) + case arrow.PrimitiveTypes.Float64: + builder.Field(index).(*array.Float64Builder).Append(value.(float64)) + case arrow.BinaryTypes.String: + builder.Field(index).(*array.StringBuilder).Append(value.(string)) + case arrow.FixedWidthTypes.Boolean: + builder.Field(index).(*array.BooleanBuilder).Append(value.(bool)) + default: + return nil, fmt.Errorf("unsupported type: %T", value) + } + } + } + + record := builder.NewRecord() + return record, nil +} + +func (p *Parquet) createSchema(metrics []telegraf.Metric) (*arrow.Schema, error) { + rawFields := make(map[string]arrow.DataType, 0) + for _, metric := range metrics { + for _, field := range metric.FieldList() { + if _, ok := rawFields[field.Key]; !ok { + arrowType, err := goToArrowType(field.Value) + if err != nil { + return nil, fmt.Errorf("error converting '%s=%s' field to arrow type: %w", field.Key, field.Value, err) + } + rawFields[field.Key] = arrowType + } + } + for _, tag := range metric.TagList() { + if _, ok := rawFields[tag.Key]; !ok { + arrowType, err := goToArrowType(tag.Value) + if err != nil { + return nil, fmt.Errorf("error converting '%s=%s' tag to arrow type: %w", tag.Key, tag.Value, err) + } + rawFields[tag.Key] = arrowType + } + } + } + + fields := make([]arrow.Field, 0) + for key, value := range rawFields { + fields = append(fields, arrow.Field{ + Name: key, + Type: value, + }) + } + + if p.TimestampFieldName != nil && *p.TimestampFieldName != "" { + fields = append(fields, arrow.Field{ + Name: *p.TimestampFieldName, + Type: arrow.PrimitiveTypes.Int64, + }) + } + + return arrow.NewSchema(fields, nil), nil +} + +func (p *Parquet) createWriter(name string, filename string, schema *arrow.Schema) (*pqarrow.FileWriter, error) { + if _, err := os.Stat(filename); err == nil { + now := time.Now() + rotatedFilename := fmt.Sprintf("%s/%s-%s-%s.parquet", p.Directory, name, now.Format("2006-01-02"), strconv.FormatInt(now.Unix(), 10)) + if err := os.Rename(filename, rotatedFilename); err != nil { + return nil, fmt.Errorf("failed to rename file %q: %w", filename, err) + } + } + file, err := os.Create(filename) + if err != nil { + return nil, fmt.Errorf("failed to create file %q: %w", filename, err) + } + + writer, err := pqarrow.NewFileWriter(schema, file, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps()) + if err != nil { + return nil, fmt.Errorf("failed to create parquet writer for file %q: %w", filename, err) + } + + return writer, nil +} + +func goToArrowType(value interface{}) (arrow.DataType, error) { + switch value.(type) { + case int8: + return arrow.PrimitiveTypes.Int8, nil + case int16: + return arrow.PrimitiveTypes.Int16, nil + case int32: + return arrow.PrimitiveTypes.Int32, nil + case int64, int: + return arrow.PrimitiveTypes.Int64, nil + case uint8: + return arrow.PrimitiveTypes.Uint8, nil + case uint16: + return arrow.PrimitiveTypes.Uint16, nil + case uint32: + return arrow.PrimitiveTypes.Uint32, nil + case uint64, uint: + return arrow.PrimitiveTypes.Uint64, nil + case float32: + return arrow.PrimitiveTypes.Float32, nil + case float64: + return arrow.PrimitiveTypes.Float64, nil + case string: + return arrow.BinaryTypes.String, nil + case bool: + return arrow.FixedWidthTypes.Boolean, nil + default: + return nil, fmt.Errorf("unsupported type: %T", value) + } +} + +func init() { + outputs.Add("parquet", func() telegraf.Output { return &Parquet{} }) +} diff --git a/plugins/outputs/parquet/parquet_test.go b/plugins/outputs/parquet/parquet_test.go new file mode 100644 index 0000000000000..61ec4a3cc49d9 --- /dev/null +++ b/plugins/outputs/parquet/parquet_test.go @@ -0,0 +1,208 @@ +package parquet + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/apache/arrow/go/v16/parquet/file" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestCases(t *testing.T) { + type testcase struct { + name string + metrics []telegraf.Metric + numRows int + numColumns int + } + + var testcases = []testcase{ + { + name: "basic single metric", + metrics: []telegraf.Metric{ + testutil.MustMetric( + "test", + map[string]string{}, + map[string]interface{}{ + "value": 1.0, + }, + time.Now(), + ), + }, + numRows: 1, + numColumns: 2, + }, + { + name: "mix of tags and fields", + metrics: []telegraf.Metric{ + testutil.MustMetric( + "test", + map[string]string{ + "tag": "tag", + }, + map[string]interface{}{ + "value": 1.0, + }, + time.Now(), + ), + testutil.MustMetric( + "test", + map[string]string{ + "tag": "tag2", + }, + map[string]interface{}{ + "value": 2.0, + }, + time.Now(), + ), + }, + numRows: 2, + numColumns: 3, + }, + { + name: "null values", + metrics: []telegraf.Metric{ + testutil.MustMetric( + "test", + map[string]string{ + "host": "tag", + }, + map[string]interface{}{ + "value_old": 1.0, + }, + time.Now(), + ), + testutil.MustMetric( + "test", + map[string]string{ + "tag": "tag2", + }, + map[string]interface{}{ + "value_new": 2.0, + }, + time.Now(), + ), + }, + numRows: 2, + numColumns: 5, + }, + { + name: "data types", + metrics: []telegraf.Metric{ + testutil.MustMetric( + "test", + map[string]string{}, + map[string]interface{}{ + "int": int(0), + "int8": int8(1), + "int16": int16(2), + "int32": int32(3), + "int64": int64(4), + "uint": uint(5), + "uint8": uint8(6), + "uint16": uint16(7), + "uint32": uint32(8), + "uint64": uint64(9), + "float32": float32(10.0), + "float64": float64(11.0), + "string": "string", + "bool": true, + }, + time.Now(), + ), + }, + numRows: 1, + numColumns: 15, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + testDir := t.TempDir() + plugin := &Parquet{ + Directory: testDir, + } + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + require.NoError(t, plugin.Write(tc.metrics)) + require.NoError(t, plugin.Close()) + + // Read metrics from parquet file + result := filepath.Join(testDir, "test.parquet") + reader, err := file.OpenParquetFile(result, true) + require.NoError(t, err) + defer reader.Close() + + metadata := reader.MetaData() + require.Equal(t, tc.numRows, int(metadata.NumRows)) + require.Equal(t, tc.numColumns, metadata.Schema.NumColumns()) + }) + } +} + +func TestRotation(t *testing.T) { + metrics := []telegraf.Metric{ + testutil.MustMetric( + "test", + map[string]string{}, + map[string]interface{}{ + "value": 1.0, + }, + time.Now(), + ), + } + + testDir := t.TempDir() + plugin := &Parquet{ + Directory: testDir, + RotationInterval: config.Duration(1 * time.Second), + } + + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + require.Eventually(t, func() bool { + require.NoError(t, plugin.Write(metrics)) + files, err := os.ReadDir(testDir) + require.NoError(t, err) + return len(files) == 2 + }, 5*time.Second, time.Second) + require.NoError(t, plugin.Close()) +} + +func TestOmitTimestamp(t *testing.T) { + metrics := []telegraf.Metric{ + testutil.MustMetric( + "test", + map[string]string{}, + map[string]interface{}{ + "value": 1.0, + }, + time.Now(), + ), + } + + emptyTimestamp := "" + testDir := t.TempDir() + plugin := &Parquet{ + Directory: testDir, + TimestampFieldName: &emptyTimestamp, + } + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + require.NoError(t, plugin.Write(metrics)) + require.NoError(t, plugin.Close()) + + result := filepath.Join(testDir, "test.parquet") + reader, err := file.OpenParquetFile(result, true) + require.NoError(t, err) + defer reader.Close() + + metadata := reader.MetaData() + require.Equal(t, 1, int(metadata.NumRows)) + require.Equal(t, 1, metadata.Schema.NumColumns()) +} diff --git a/plugins/outputs/parquet/sample.conf b/plugins/outputs/parquet/sample.conf new file mode 100644 index 0000000000000..5e7e16ae5a89f --- /dev/null +++ b/plugins/outputs/parquet/sample.conf @@ -0,0 +1,14 @@ +# A plugin that writes metrics to parquet files +[[outputs.parquet]] + ## Directory to write parquet files in. If a file already exists the output + ## will attempt to continue using the existing file. + # directory = "." + + ## Files are rotated after the time interval specified. When set to 0 no time + ## based rotation is performed. + # rotation_interval = "0h" + + ## Timestamp field name + ## Field name to use to store the timestamp. If set to an empty string, then + ## the timestamp is omitted. + # timestamp_field_name = "timestamp"