Skip to content

Commit

Permalink
Add decoding and tests to socket_listener (influxdata#6660)
Browse files Browse the repository at this point in the history
  • Loading branch information
nwneisen authored and idohalevi committed Sep 23, 2020
1 parent 24112e4 commit 73c11c2
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 7 deletions.
5 changes: 5 additions & 0 deletions plugins/inputs/socket_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ This is a sample configuration for the plugin.
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"

## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
# content_encoding = "identity"
```

## A Note on UDP OS Buffer Sizes
Expand All @@ -84,6 +88,7 @@ at least 8MB before trying to run large amounts of UDP traffic to your instance.
8MB is just a recommendation, and can be adjusted higher.

### Linux

Check the current UDP/IP receive buffer limit & default by typing the following
commands:

Expand Down
28 changes: 26 additions & 2 deletions plugins/inputs/socket_listener/socket_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,14 @@ func (ssl *streamSocketListener) read(c net.Conn) {
if !scnr.Scan() {
break
}
metrics, err := ssl.Parse(scnr.Bytes())

body, err := ssl.decoder.Decode(scnr.Bytes())
if err != nil {
ssl.Log.Errorf("Unable to decode incoming line: %s", err.Error())
continue
}

metrics, err := ssl.Parse(body)
if err != nil {
ssl.Log.Errorf("Unable to parse incoming line: %s", err.Error())
// TODO rate limit
Expand Down Expand Up @@ -155,7 +162,12 @@ func (psl *packetSocketListener) listen() {
break
}

metrics, err := psl.Parse(buf[:n])
body, err := psl.decoder.Decode(buf[:n])
if err != nil {
psl.Log.Errorf("Unable to decode incoming packet: %s", err.Error())
}

metrics, err := psl.Parse(body)
if err != nil {
psl.Log.Errorf("Unable to parse incoming packet: %s", err.Error())
// TODO rate limit
Expand All @@ -174,6 +186,7 @@ type SocketListener struct {
ReadTimeout *internal.Duration `toml:"read_timeout"`
KeepAlivePeriod *internal.Duration `toml:"keep_alive_period"`
SocketMode string `toml:"socket_mode"`
ContentEncoding string `toml:"content_encoding"`
tlsint.ServerConfig

wg sync.WaitGroup
Expand All @@ -183,6 +196,7 @@ type SocketListener struct {
parsers.Parser
telegraf.Accumulator
io.Closer
decoder internal.ContentDecoder
}

func (sl *SocketListener) Description() string {
Expand Down Expand Up @@ -244,6 +258,10 @@ func (sl *SocketListener) SampleConfig() string {
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
# content_encoding = "identity"
`
}

Expand All @@ -265,6 +283,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
protocol := spl[0]
addr := spl[1]

var err error
sl.decoder, err = internal.NewContentDecoder(sl.ContentEncoding)
if err != nil {
return err
}

if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" {
// no good way of testing for "file does not exist".
// Instead just ignore error and blow up when we try to listen, which will
Expand Down
63 changes: 58 additions & 5 deletions plugins/inputs/socket_listener/socket_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,65 @@ func TestSocketListener_unixgram(t *testing.T) {
testSocketListener(t, sl, client)
}

func TestSocketListenerDecode_tcp(t *testing.T) {
defer testEmptyLog(t)()

sl := newSocketListener()
sl.Log = testutil.Logger{}
sl.ServiceAddress = "tcp://127.0.0.1:0"
sl.ReadBufferSize = internal.Size{Size: 1024}
sl.ContentEncoding = "gzip"

acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()

client, err := net.Dial("tcp", sl.Closer.(net.Listener).Addr().String())
require.NoError(t, err)

testSocketListener(t, sl, client)
}

func TestSocketListenerDecode_udp(t *testing.T) {
defer testEmptyLog(t)()

sl := newSocketListener()
sl.Log = testutil.Logger{}
sl.ServiceAddress = "udp://127.0.0.1:0"
sl.ReadBufferSize = internal.Size{Size: 1024}
sl.ContentEncoding = "gzip"

acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()

client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String())
require.NoError(t, err)

testSocketListener(t, sl, client)
}

func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) {
mstr12 := "test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n"
mstr3 := "test,foo=zab v=3i 123456791"
client.Write([]byte(mstr12))
client.Write([]byte(mstr3))
if _, ok := client.(net.Conn); ok {
mstr12 := []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n")
mstr3 := []byte("test,foo=zab v=3i 123456791")

if sl.ContentEncoding == "gzip" {
encoder, err := internal.NewContentEncoder(sl.ContentEncoding)
require.NoError(t, err)
mstr12, err = encoder.Encode(mstr12)
require.NoError(t, err)

encoder, err = internal.NewContentEncoder(sl.ContentEncoding)
require.NoError(t, err)
mstr3, err = encoder.Encode(mstr3)
require.NoError(t, err)
}

client.Write(mstr12)
client.Write(mstr3)
if client.LocalAddr().Network() != "udp" {
// stream connection. needs trailing newline to terminate mstr3
client.Write([]byte{'\n'})
}
Expand Down

0 comments on commit 73c11c2

Please sign in to comment.