Skip to content

Commit

Permalink
simplify connect process
Browse files Browse the repository at this point in the history
  • Loading branch information
neelayu committed Jan 5, 2024
1 parent 47e16a0 commit 7b876c6
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 61 deletions.
19 changes: 7 additions & 12 deletions plugins/outputs/nats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,16 @@ to use them.
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"

## If the value is non-empty, enable jetstream based publishing.
## Name of the stream where nats jetstream will publish the messages.
## If the stream already exists, it will Update it using the fields specified in the jetstream section.
## Else it will create it.
# jetstream_stream = "telegraf-metrics-stream"

## Jetstream specific configuration
## If this section is empty, and jetstream_stream is specified, the stream_create config would have
## just the two fields- Name(jetstream_stream) and Subjects([]string{subject})
## Jetstream specific configuration. If not nil, it will assume Jetstream context.
## Since this is a table, it should be present at the end of the plugin section. Else you can use inline table format.
# [outputs.nats.jetstream]
## Full jetstream create stream config, refer: https://docs.nats.io/nats-concepts/jetstream/streams
## The `name` and `subjects` fields from configuration will be ignored, and the values will be determined as follows:
## The stream name (`name`) will be taken from the `jetstream_stream` field in the `outputs.nats` section of the Telegraf configuration.
## The subjects (`Subjects`) for the stream will be derived from the `subject` field in the `outputs.nats` section of the Telegraf configuration.
## Telegraf will use the subject present in the outer nats configuration.
## If the subjects field is not present, subjects will be set to subject
## If the subjects field is present, subject will be appended to this list, only if subject is not present to avoid duplicates.
## name of the stream. Required
# name = ""
# subjects = []
# retention = "limits"
# max_consumers = -1
# max_msgs_per_subject = -1
Expand Down
44 changes: 25 additions & 19 deletions plugins/outputs/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
_ "embed"
"encoding/json"
"errors"
"fmt"
"strings"

Expand All @@ -14,6 +15,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
Expand All @@ -30,7 +32,6 @@ type NATS struct {
Password config.Secret `toml:"password"`
Credentials string `toml:"credentials"`
Subject string `toml:"subject"`
Stream string `toml:"jetstream_stream"`
Jetstream *JetstreamConfigWrapper `toml:"jetstream"`
tls.ClientConfig

Expand Down Expand Up @@ -141,27 +142,35 @@ func (n *NATS) Connect() error {
if err != nil {
return err
}
if n.Stream != "" {
n.Log.Info("Jetstream enabled for this plugin")
// connect to jetstream

if n.Jetstream != nil {
n.jetstreamClient, err = jetstream.New(n.conn)
if err != nil {
return fmt.Errorf("failed to connect to jetstream: %w", err)
}
_, err := n.jetstreamClient.Stream(context.Background(), n.Stream)
if err == nil {
return nil

if len(n.Jetstream.Subjects) == 0 {
n.Jetstream.Subjects = []string{n.Subject}
}
n.Log.Infof("stream %s does not exist. creating stream", n.Stream)
if n.Jetstream == nil {
n.Jetstream = &JetstreamConfigWrapper{}
if !choice.Contains(n.Subject, n.Jetstream.Subjects) {
n.Jetstream.Subjects = append(n.Jetstream.Subjects, n.Subject)
}
n.Jetstream.StreamConfig.Name = n.Stream
n.Jetstream.StreamConfig.Subjects = []string{n.Subject}
_, err = n.jetstreamClient.CreateStream(context.Background(), n.Jetstream.StreamConfig)
_, err = n.jetstreamClient.CreateOrUpdateStream(context.Background(), n.Jetstream.StreamConfig)
if err != nil {
return fmt.Errorf("failed to create stream: %w", err)
return fmt.Errorf("failed to create or update stream: %w", err)
}
n.Log.Infof("stream (%s) successfully created or updated", n.Jetstream.Name)
}
return nil
}

func (n *NATS) Init() error {
// validation currently only for jetstream
if n.Jetstream == nil {
return nil
}
if strings.TrimSpace(n.Jetstream.Name) == "" {
return errors.New("stream cannot be empty")
}
return nil
}
Expand All @@ -181,11 +190,8 @@ func (n *NATS) Write(metrics []telegraf.Metric) error {
n.Log.Debugf("Could not serialize metric: %v", err)
continue
}
if n.Jetstream != nil {
_, err = n.jetstreamClient.Publish(context.Background(), n.Subject, buf)
} else {
err = n.conn.Publish(n.Subject, buf)
}
// use the same Publish API for nats core and jetstream
err = n.conn.Publish(n.Subject, buf)
if err != nil {
return fmt.Errorf("FAILED to send NATS message: %w", err)
}
Expand Down
47 changes: 31 additions & 16 deletions plugins/outputs/nats/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
_ "embed"
"fmt"
"os"
"path/filepath"
"reflect"
"testing"
Expand Down Expand Up @@ -49,7 +48,7 @@ func TestConnectAndWriteNATSIntegration(t *testing.T) {
},
},
{
name: "valid with jetstream(stream created)",
name: "valid with jetstream",
container: testutil.Container{
Image: "nats:latest",
ExposedPorts: []string{natsServicePort},
Expand All @@ -59,10 +58,9 @@ func TestConnectAndWriteNATSIntegration(t *testing.T) {
nats: &NATS{
Name: "telegraf",
Subject: "telegraf",
Stream: "telegraf-stream",
Jetstream: &JetstreamConfigWrapper{
StreamConfig: jetstream.StreamConfig{
Name: "this will be ignored",
Name: "my-telegraf-stream",
},
},
serializer: &influx.Serializer{},
Expand All @@ -80,10 +78,11 @@ func TestConnectAndWriteNATSIntegration(t *testing.T) {
},
nats: &NATS{
Name: "telegraf",
Subject: "my-tel-sub",
Stream: "telegraf-stream-with-cfg",
Subject: "my-tel-sub2",
Jetstream: &JetstreamConfigWrapper{
StreamConfig: jetstream.StreamConfig{
Name: "telegraf-stream-with-cfg",
Subjects: []string{"my-tel-sub0", "my-tel-sub1", "my-tel-sub2"},
Retention: jetstream.WorkQueuePolicy,
MaxConsumers: 10,
Discard: jetstream.DiscardOld,
Expand Down Expand Up @@ -133,12 +132,11 @@ func TestConnectAndWriteNATSIntegration(t *testing.T) {
}
require.NoError(t, err)

if tc.nats.Stream != "" {
stream, err := tc.nats.jetstreamClient.Stream(context.Background(), tc.nats.Stream)
if tc.nats.Jetstream != nil {
stream, err := tc.nats.jetstreamClient.Stream(context.Background(), tc.nats.Jetstream.Name)
require.NoError(t, err)
si, err := stream.Info(context.Background())
require.NoError(t, err)
require.Equal(t, tc.nats.Stream, tc.nats.Jetstream.Name)
// compare only relevant fields, since defaults for fields like max_bytes is not 0
fieldsEqualHelper(t, tc.nats.Jetstream.StreamConfig, si.Config, tc.streamConfigCompareFields...)
}
Expand Down Expand Up @@ -224,22 +222,39 @@ func TestConfigParsing(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}

// Get all testcase directories
folders, err := os.ReadDir("testcases")
require.NoError(t, err)
// Define test cases
testCases := []struct {
name string
path string
wantErr bool
}{
{name: "Valid Default", path: filepath.Join("testcases", "no-js.conf")},
{name: "Valid JS", path: filepath.Join("testcases", "js-default.conf")},
{name: "Valid JS Config", path: filepath.Join("testcases", "js-config.conf")},
{name: "Subjects warning", path: filepath.Join("testcases", "js-subjects.conf")},
{name: "Invalid JS", path: filepath.Join("testcases", "js-no-stream.conf"), wantErr: true},
}

// Register the plugin
outputs.Add("nats", func() telegraf.Output {
return &NATS{}
})
srl := &influx.Serializer{}
require.NoError(t, srl.Init())

for _, f := range folders {
t.Run(f.Name(), func(t *testing.T) {
testcasePath := filepath.Join("testcases", f.Name())
// Run tests using the table-driven approach
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Configure the plugin
cfg := config.NewConfig()
require.NoError(t, cfg.LoadConfig(testcasePath))
require.NoError(t, cfg.LoadConfig(tc.path))
require.Len(t, cfg.Outputs, 1)
err := cfg.Outputs[0].Init()
if tc.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
19 changes: 7 additions & 12 deletions plugins/outputs/nats/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,16 @@
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"

## If the value is non-empty, enable jetstream based publishing.
## Name of the stream where nats jetstream will publish the messages.
## If the stream already exists, it will Update it using the fields specified in the jetstream section.
## Else it will create it.
# jetstream_stream = "telegraf-metrics-stream"

## Jetstream specific configuration
## If this section is empty, and jetstream_stream is specified, the stream_create config would have
## just the two fields- Name(jetstream_stream) and Subjects([]string{subject})
## Jetstream specific configuration. If not nil, it will assume Jetstream context.
## Since this is a table, it should be present at the end of the plugin section. Else you can use inline table format.
# [outputs.nats.jetstream]
## Full jetstream create stream config, refer: https://docs.nats.io/nats-concepts/jetstream/streams
## The `name` and `subjects` fields from configuration will be ignored, and the values will be determined as follows:
## The stream name (`name`) will be taken from the `jetstream_stream` field in the `outputs.nats` section of the Telegraf configuration.
## The subjects (`Subjects`) for the stream will be derived from the `subject` field in the `outputs.nats` section of the Telegraf configuration.
## Telegraf will use the subject present in the outer nats configuration.
## If the subjects field is not present, subjects will be set to subject
## If the subjects field is present, subject will be appended to this list, only if subject is not present to avoid duplicates.
## name of the stream. Required
# name = ""
# subjects = []
# retention = "limits"
# max_consumers = -1
# max_msgs_per_subject = -1
Expand Down
2 changes: 1 addition & 1 deletion plugins/outputs/nats/testcases/js-config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
servers = ["nats://localhost:4222"]
subject = "telegraf-subject"
data_format = "influx"
jetstream_stream = "my-telegraf-stream"
[outputs.nats.jetstream]
name = "my-telegraf-stream"
retention = "workqueue"
max_consumers = 10
discard = "old"
Expand Down
3 changes: 2 additions & 1 deletion plugins/outputs/nats/testcases/js-default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
servers = ["nats://localhost:4222"]
subject = "telegraf-subject"
data_format = "influx"
jetstream_stream = "my-telegraf-stream"
[outputs.nats.jetstream]
name = "my-telegraf-stream"
7 changes: 7 additions & 0 deletions plugins/outputs/nats/testcases/js-no-stream.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## NATS output with jetstream, but empty stream
[[outputs.nats]]
## URLs of NATS servers
servers = ["nats://localhost:4222"]
subject = "telegraf-subject"
data_format = "influx"
[outputs.nats.jetstream]
9 changes: 9 additions & 0 deletions plugins/outputs/nats/testcases/js-subjects.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## NATS output with jetstream, but subject mismatch
[[outputs.nats]]
## URLs of NATS servers
servers = ["nats://localhost:4222"]
subject = "telegraf-subject"
data_format = "influx"
[outputs.nats.jetstream]
name = "my-stream"
subjects = ["not", "here"]

0 comments on commit 7b876c6

Please sign in to comment.