Skip to content

Commit

Permalink
define streamconfig struct in telegraf
Browse files Browse the repository at this point in the history
  • Loading branch information
neelayu committed Jan 13, 2024
1 parent 59547ec commit 2a6a552
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 139 deletions.
12 changes: 9 additions & 3 deletions plugins/outputs/nats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,21 @@ to use them.
# subjects = []

## Full jetstream create stream config, refer: https://docs.nats.io/nats-concepts/jetstream/streams
# retention = "limits"
## Some of the configuration fields such as `retention`, `storage`, `discard` accept integer values.
## For ex- storage = 1 refers to memory(1) storage.

## 0: limits(default), 1: interest, 2: workqueue
# retention = 0
# max_consumers = -1
# max_msgs_per_subject = -1
# max_msgs = -1
# max_bytes = -1
# max_age = 0
# max_msg_size = -1
# storage = "file"
# discard = "old"
## 0: file(default), 1: memory
# storage = 0
## 0: old(default), 1: new
# discard = 0
# num_replicas = 1
# duplicate_window = 120000000000
# sealed = false
Expand Down
118 changes: 62 additions & 56 deletions plugins/outputs/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ package nats
import (
"context"
_ "embed"
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"
"time"

"github.com/influxdata/toml"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"

Expand All @@ -25,14 +25,14 @@ import (
var sampleConfig string

type NATS struct {
Servers []string `toml:"servers"`
Secure bool `toml:"secure"`
Name string `toml:"name"`
Username config.Secret `toml:"username"`
Password config.Secret `toml:"password"`
Credentials string `toml:"credentials"`
Subject string `toml:"subject"`
Jetstream *JetstreamConfigWrapper `toml:"jetstream"`
Servers []string `toml:"servers"`
Secure bool `toml:"secure"`
Name string `toml:"name"`
Username config.Secret `toml:"username"`
Password config.Secret `toml:"password"`
Credentials string `toml:"credentials"`
Subject string `toml:"subject"`
Jetstream *StreamConfig `toml:"jetstream"`
tls.ClientConfig

Log telegraf.Logger `toml:"-"`
Expand All @@ -42,51 +42,44 @@ type NATS struct {
serializer serializers.Serializer
}

type JetstreamConfigWrapper struct {
jetstream.StreamConfig
}

func (jw *JetstreamConfigWrapper) UnmarshalTOML(data []byte) error {
var tomlMap map[string]interface{}

if err := toml.Unmarshal(data, &tomlMap); err != nil {
return err
}

// Extract the deeply nested table by specifying the keys(in order)
keys := []string{"outputs", "nats", "jetstream"}

nestedTable, err := extractNestedTable(tomlMap, keys...)
if err != nil {
return err
}
jsonBytes, err := json.Marshal(nestedTable)
if err != nil {
return err
}
return json.Unmarshal(jsonBytes, &jw.StreamConfig)
}

// recursive function to extract a nested table
func extractNestedTable(tomlMap map[string]interface{}, keys ...string) (map[string]interface{}, error) {
if len(keys) == 0 {
return tomlMap, nil
}

key := keys[0]
remainingKeys := keys[1:]

value, ok := tomlMap[key]
if !ok {
return nil, fmt.Errorf("key '%s' not found in TOML data", key)
}

innerMap, ok := value.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("value of key '%s' is not a table", key)
}

return extractNestedTable(innerMap, remainingKeys...)
// StreamConfig is the configuration for creating stream
// Almost a mirror of https://pkg.go.dev/github.com/nats-io/nats.go/jetstream#StreamConfig but with
// TOML tags.
//
// Some custom types such as RetentionPolicy still point to the source to reuse Stringer interface.
type StreamConfig struct {
Name string `toml:"name"`
Description string `toml:"description,omitempty"`
Subjects []string `toml:"subjects,omitempty"`
Retention jetstream.RetentionPolicy `toml:"retention"`
MaxConsumers int `toml:"max_consumers"`
MaxMsgs int64 `toml:"max_msgs"`
MaxBytes int64 `toml:"max_bytes"`
Discard jetstream.DiscardPolicy `toml:"discard"`
DiscardNewPerSubject bool `toml:"discard_new_per_subject,omitempty"`
MaxAge time.Duration `toml:"max_age"`
MaxMsgsPerSubject int64 `toml:"max_msgs_per_subject"`
MaxMsgSize int32 `toml:"max_msg_size,omitempty"`
Storage jetstream.StorageType `toml:"storage"`
Replicas int `toml:"num_replicas"`
NoAck bool `toml:"no_ack,omitempty"`
Template string `toml:"template_owner,omitempty"`
Duplicates time.Duration `toml:"duplicate_window,omitempty"`
Placement *jetstream.Placement `toml:"placement,omitempty"`
Mirror *jetstream.StreamSource `toml:"mirror,omitempty"`
Sources []*jetstream.StreamSource `toml:"sources,omitempty"`
Sealed bool `toml:"sealed,omitempty"`
DenyDelete bool `toml:"deny_delete,omitempty"`
DenyPurge bool `toml:"deny_purge,omitempty"`
AllowRollup bool `toml:"allow_rollup_hdrs,omitempty"`
Compression jetstream.StoreCompression `toml:"compression"`
FirstSeq uint64 `toml:"first_seq,omitempty"`
SubjectTransform *jetstream.SubjectTransformConfig `toml:"subject_transform,omitempty"`
RePublish *jetstream.RePublish `toml:"republish,omitempty"`
AllowDirect bool `toml:"allow_direct"`
MirrorDirect bool `toml:"mirror_direct"`
ConsumerLimits jetstream.StreamConsumerLimits `toml:"consumer_limits,omitempty"`
Metadata map[string]string `toml:"metadata,omitempty"`
}

func (*NATS) SampleConfig() string {
Expand Down Expand Up @@ -155,7 +148,9 @@ func (n *NATS) Connect() error {
if !choice.Contains(n.Subject, n.Jetstream.Subjects) {
n.Jetstream.Subjects = append(n.Jetstream.Subjects, n.Subject)
}
_, err = n.jetstreamClient.CreateOrUpdateStream(context.Background(), n.Jetstream.StreamConfig)
var streamConfig jetstream.StreamConfig
n.convertToJetstreamConfig(&streamConfig)
_, err = n.jetstreamClient.CreateOrUpdateStream(context.Background(), streamConfig)
if err != nil {
return fmt.Errorf("failed to create or update stream: %w", err)
}
Expand All @@ -164,6 +159,17 @@ func (n *NATS) Connect() error {
return nil
}

func (n *NATS) convertToJetstreamConfig(streamConfig *jetstream.StreamConfig) {
telegrafStreamConfig := reflect.ValueOf(n.Jetstream).Elem()
natsStreamConfig := reflect.ValueOf(streamConfig).Elem()
for i := 0; i < telegrafStreamConfig.NumField(); i++ {
destField := natsStreamConfig.FieldByName(telegrafStreamConfig.Type().Field(i).Name)
if destField.IsValid() && destField.CanSet() {
destField.Set(telegrafStreamConfig.Field(i))
}
}
}

func (n *NATS) Init() error {
if n.Jetstream != nil {
if strings.TrimSpace(n.Jetstream.Name) == "" {
Expand Down
91 changes: 17 additions & 74 deletions plugins/outputs/nats/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ func TestConnectAndWriteNATSIntegration(t *testing.T) {
nats: &NATS{
Name: "telegraf",
Subject: "telegraf",
Jetstream: &JetstreamConfigWrapper{
StreamConfig: jetstream.StreamConfig{
Name: "my-telegraf-stream",
},
Jetstream: &StreamConfig{
Name: "my-telegraf-stream",
},
serializer: &influx.Serializer{},
Log: testutil.Logger{},
Expand All @@ -79,22 +77,20 @@ func TestConnectAndWriteNATSIntegration(t *testing.T) {
nats: &NATS{
Name: "telegraf",
Subject: "my-tel-sub2",
Jetstream: &JetstreamConfigWrapper{
StreamConfig: jetstream.StreamConfig{
Name: "telegraf-stream-with-cfg",
Subjects: []string{"my-tel-sub0", "my-tel-sub1", "my-tel-sub2"},
Retention: jetstream.WorkQueuePolicy,
MaxConsumers: 10,
Discard: jetstream.DiscardOld,
Storage: jetstream.FileStorage,
MaxMsgs: 100000,
MaxBytes: 104857600,
MaxAge: 86400000000000,
Replicas: 1,
Duplicates: 180000000000,
MaxMsgSize: 120,
MaxMsgsPerSubject: 500,
},
Jetstream: &StreamConfig{
Name: "telegraf-stream-with-cfg",
Subjects: []string{"my-tel-sub0", "my-tel-sub1", "my-tel-sub2"},
Retention: jetstream.WorkQueuePolicy,
MaxConsumers: 10,
Discard: jetstream.DiscardOld,
Storage: jetstream.FileStorage,
MaxMsgs: 100000,
MaxBytes: 104857600,
MaxAge: 86400000000000,
Replicas: 1,
Duplicates: 180000000000,
MaxMsgSize: 120,
MaxMsgsPerSubject: 500,
},
serializer: &influx.Serializer{},
Log: testutil.Logger{},
Expand Down Expand Up @@ -138,7 +134,7 @@ func TestConnectAndWriteNATSIntegration(t *testing.T) {
si, err := stream.Info(context.Background())
require.NoError(t, err)
// compare only relevant fields, since defaults for fields like max_bytes is not 0
fieldsEqualHelper(t, tc.nats.Jetstream.StreamConfig, si.Config, tc.streamConfigCompareFields...)
fieldsEqualHelper(t, *tc.nats.Jetstream, si.Config, tc.streamConfigCompareFields...)
}
// Verify that we can successfully write data to the NATS daemon
err = tc.nats.Write(testutil.MockMetrics())
Expand All @@ -156,11 +152,6 @@ func fieldsEqualHelper(t *testing.T, a, b interface{}, fieldNames ...string) {
return
}

if valA.Type() != valB.Type() {
t.Error("Both parameters must be of the same type")
return
}

for _, fieldName := range fieldNames {
fieldA := valA.FieldByName(fieldName)
fieldB := valB.FieldByName(fieldName)
Expand All @@ -169,54 +160,6 @@ func fieldsEqualHelper(t *testing.T, a, b interface{}, fieldNames ...string) {
}
}

func Test_extractNestedTable(t *testing.T) {
tests := []struct {
name string
tomlMap map[string]interface{}
keys []string
want map[string]interface{}
wantErr bool
}{
{
name: "Valid Nested Table",
tomlMap: map[string]interface{}{
"outer": map[string]interface{}{
"name": "outer",
"inner": map[string]interface{}{
"field1": "abc",
"field2": "pqr",
},
},
},
keys: []string{"outer", "inner"},
want: map[string]interface{}{"field1": "abc", "field2": "pqr"},
},
{
name: "Invalid Key",
tomlMap: map[string]interface{}{"key": "value"},
keys: []string{"nonexistent"},
wantErr: true,
},
{
name: "Non-Table Value",
tomlMap: map[string]interface{}{"key": "value"},
keys: []string{"key"},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual, err := extractNestedTable(tt.tomlMap, tt.keys...)
if tt.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tt.want, actual)
})
}
}

func TestConfigParsing(t *testing.T) {

// Define test cases
Expand Down
12 changes: 9 additions & 3 deletions plugins/outputs/nats/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,21 @@
# subjects = []

## Full jetstream create stream config, refer: https://docs.nats.io/nats-concepts/jetstream/streams
# retention = "limits"
## Some of the configuration fields such as `retention`, `storage`, `discard` accept integer values.
## For ex- storage = 1 refers to memory(1) storage.

## 0: limits(default), 1: interest, 2: workqueue
# retention = 0
# max_consumers = -1
# max_msgs_per_subject = -1
# max_msgs = -1
# max_bytes = -1
# max_age = 0
# max_msg_size = -1
# storage = "file"
# discard = "old"
## 0: file(default), 1: memory
# storage = 0
## 0: old(default), 1: new
# discard = 0
# num_replicas = 1
# duplicate_window = 120000000000
# sealed = false
Expand Down
6 changes: 3 additions & 3 deletions plugins/outputs/nats/testcases/js-config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
data_format = "influx"
[outputs.nats.jetstream]
name = "my-telegraf-stream"
retention = "workqueue"
retention = 1
max_consumers = 10
discard = "old"
storage = "file"
discard = 0
storage = 1
max_msgs = 100000
max_bytes = 104857600 # 100 MB
max_age = 86400000000000 # in the int64 format
Expand Down

0 comments on commit 2a6a552

Please sign in to comment.