Skip to content

Commit

Permalink
move to init
Browse files Browse the repository at this point in the history
  • Loading branch information
neelayu committed Jan 23, 2024
1 parent e3adc7d commit fb528eb
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 18 deletions.
36 changes: 19 additions & 17 deletions plugins/outputs/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ type NATS struct {

Log telegraf.Logger `toml:"-"`

conn *nats.Conn
jetstreamClient jetstream.JetStream
serializer serializers.Serializer
conn *nats.Conn
jetstreamClient jetstream.JetStream
jetstreamStreamConfig *jetstream.StreamConfig
serializer serializers.Serializer
}

// StreamConfig is the configuration for creating stream
Expand Down Expand Up @@ -137,20 +138,7 @@ func (n *NATS) Connect() error {
if err != nil {
return fmt.Errorf("failed to connect to jetstream: %w", err)
}

if len(n.Jetstream.Subjects) == 0 {
n.Jetstream.Subjects = []string{n.Subject}
}
// If the overall-subject is already present anywhere in the Jetstream subject we go from there,
// otherwise we should append the overall-subject as the last element.
if !choice.Contains(n.Subject, n.Jetstream.Subjects) {
n.Jetstream.Subjects = append(n.Jetstream.Subjects, n.Subject)
}
streamConfig, err := n.getJetstreamConfig()
if err != nil {
return fmt.Errorf("failed to parse jetstream config: %w", err)
}
_, err = n.jetstreamClient.CreateOrUpdateStream(context.Background(), *streamConfig)
_, err = n.jetstreamClient.CreateOrUpdateStream(context.Background(), *n.jetstreamStreamConfig)
if err != nil {
return fmt.Errorf("failed to create or update stream: %w", err)
}
Expand Down Expand Up @@ -244,6 +232,20 @@ func (n *NATS) Init() error {
if strings.TrimSpace(n.Jetstream.Name) == "" {
return errors.New("stream cannot be empty")
}

if len(n.Jetstream.Subjects) == 0 {
n.Jetstream.Subjects = []string{n.Subject}
}
// If the overall-subject is already present anywhere in the Jetstream subject we go from there,
// otherwise we should append the overall-subject as the last element.
if !choice.Contains(n.Subject, n.Jetstream.Subjects) {
n.Jetstream.Subjects = append(n.Jetstream.Subjects, n.Subject)
}
var err error
n.jetstreamStreamConfig, err = n.getJetstreamConfig()
if err != nil {
return fmt.Errorf("failed to parse jetstream config: %w", err)
}
}
return nil
}
Expand Down
1 change: 1 addition & 0 deletions plugins/outputs/nats/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func TestConnectAndWriteIntegration(t *testing.T) {
server := []string{fmt.Sprintf("nats://%s:%s", tc.container.Address, tc.container.Ports[natsServicePort])}
tc.nats.Servers = server
// Verify that we can connect to the NATS daemon
require.NoError(t, tc.nats.Init())
err = tc.nats.Connect()
if tc.wantErr {
require.Error(t, err)
Expand Down
2 changes: 1 addition & 1 deletion plugins/outputs/nats/testcases/js-config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
data_format = "influx"
[outputs.nats.jetstream]
name = "my-telegraf-stream"
retention = "new"
retention = "workqueue"
max_consumers = 10
discard = "old"
storage = "memory"
Expand Down

0 comments on commit fb528eb

Please sign in to comment.