Skip to content

Commit

Permalink
feat(migrations): Add migration for inputs.udp_listener (#14120)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Oct 16, 2023
1 parent 2e89bb5 commit 8583f22
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 0 deletions.
5 changes: 5 additions & 0 deletions migrations/all/inputs_udp_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || (migrations && (inputs || inputs.udp_listener))

package all

import _ "github.com/influxdata/telegraf/migrations/inputs_udp_listener" // register migration
Empty file.
73 changes: 73 additions & 0 deletions migrations/inputs_udp_listener/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package inputs_udp_listener

import (
"fmt"

"github.com/influxdata/toml"
"github.com/influxdata/toml/ast"

"github.com/influxdata/telegraf/migrations"
)

const allowPendingMessagesMsg = `
Replacement 'inputs.socket_listener' does not allow to configure
'allowed_pending_messages' and thus the setting will be dropped.
`

const udpPacketSizeMsg = `
The deprecated 'udp_buffer_size' setting will be dropped.
`

// Define "old" data structure
type udpListener map[string]interface{}

// Migration function
func migrate(tbl *ast.Table) ([]byte, string, error) {
// Decode the old data structure
var old udpListener
if err := toml.UnmarshalTable(tbl, &old); err != nil {
return nil, "", err
}

// Copy the setting except the special plugin ones to preserve
// all parser settings of the existing (deprecated) config.
var msg string
plugin := make(map[string]interface{}, len(old))
for k, v := range old {
switch k {
case "service_address":
addr, ok := v.(string)
if !ok {
return nil, "", fmt.Errorf("service_address is not a string but %T", v)
}
plugin["service_address"] = "udp://" + addr
case "allowed_pending_messages":
msg += allowPendingMessagesMsg
case "udp_packet_size":
msg += udpPacketSizeMsg
case "udp_buffer_size":
plugin["read_buffer_size"] = v
default:
plugin[k] = v
}
}

// Create the corresponding metric configurations
cfg := migrations.CreateTOMLStruct("inputs", "socket_listener")
cfg.Add("inputs", "socket_listener", plugin)

// Marshal the new configuration
buf, err := toml.Marshal(cfg)
if err != nil {
return nil, "", err
}
buf = append(buf, []byte("\n")...)

// Create the new content to output
return buf, msg, nil
}

// Register the migration function for the plugin type
func init() {
migrations.AddPluginMigration("inputs.udp_listener", migrate)
}
62 changes: 62 additions & 0 deletions migrations/inputs_udp_listener/migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package inputs_udp_listener_test

import (
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/config"
_ "github.com/influxdata/telegraf/migrations/inputs_udp_listener" // register migration
_ "github.com/influxdata/telegraf/plugins/inputs/socket_listener" // register plugin
_ "github.com/influxdata/telegraf/plugins/parsers/all" // register parsers
)

func TestCases(t *testing.T) {
// Get all directories in testdata
folders, err := os.ReadDir("testcases")
require.NoError(t, err)

for _, f := range folders {
// Only handle folders
if !f.IsDir() {
continue
}

t.Run(f.Name(), func(t *testing.T) {
testcasePath := filepath.Join("testcases", f.Name())
inputFile := filepath.Join(testcasePath, "telegraf.conf")
expectedFile := filepath.Join(testcasePath, "expected.conf")

// Read the expected output
expected := config.NewConfig()
require.NoError(t, expected.LoadConfig(expectedFile))
require.NotEmpty(t, expected.Inputs)

// Read the input data
input, remote, err := config.LoadConfigFile(inputFile)
require.NoError(t, err)
require.False(t, remote)
require.NotEmpty(t, input)

// Migrate
output, n, err := config.ApplyMigrations(input)
require.NoError(t, err)
require.NotEmpty(t, output)
require.GreaterOrEqual(t, n, uint64(1))
actual := config.NewConfig()
require.NoError(t, actual.LoadConfigData(output))

// Test the output
require.Len(t, actual.Inputs, len(expected.Inputs))
actualIDs := make([]string, 0, len(expected.Inputs))
expectedIDs := make([]string, 0, len(expected.Inputs))
for i := range actual.Inputs {
actualIDs = append(actualIDs, actual.Inputs[i].ID())
expectedIDs = append(expectedIDs, expected.Inputs[i].ID())
}
require.ElementsMatch(t, expectedIDs, actualIDs)
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[[inputs.socket_listener]]
service_address = "udp://127.0.0.1:8000"
data_format = "influx"
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[[inputs.udp_listener]]
service_address = "127.0.0.1:8000"
allowed_pending_messages = 1000
udp_packet_size = 1024
data_format = "influx"
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[[inputs.socket_listener]]
service_address = "udp://127.0.0.1:8000"
data_format = "influx"
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[[inputs.udp_listener]]
service_address = "127.0.0.1:8000"
allowed_pending_messages = 1000

data_format = "influx"
10 changes: 10 additions & 0 deletions migrations/inputs_udp_listener/testcases/parser/expected.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[[inputs.socket_listener]]
service_address = "udp://127.0.0.1:8000"
data_format = "xpath_json"
xpath_native_types = true
[[inputs.socket_listener.xpath]]
metric_name = "/name"
timestamp = "/timestamp"
timestamp_format = "unix_ms"
field_selection = "/fields/*"
tag_selection = "/tags/*"
13 changes: 13 additions & 0 deletions migrations/inputs_udp_listener/testcases/parser/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[[inputs.udp_listener]]
service_address = "127.0.0.1:8000"

data_format = "xpath_json"
xpath_native_types = true

# Configuration matching the first (ENERGY) message
[[inputs.udp_listener.xpath]]
metric_name = "/name"
timestamp = "/timestamp"
timestamp_format = "unix_ms"
field_selection = "/fields/*"
tag_selection = "/tags/*"
3 changes: 3 additions & 0 deletions migrations/inputs_udp_listener/testcases/simple/expected.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[[inputs.socket_listener]]
service_address = "udp://127.0.0.1:8000"
data_format = "influx"
4 changes: 4 additions & 0 deletions migrations/inputs_udp_listener/testcases/simple/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[[inputs.udp_listener]]
service_address = "127.0.0.1:8000"

data_format = "influx"

0 comments on commit 8583f22

Please sign in to comment.