Skip to content

Commit

Permalink
Fix gzip support in socket_listener with tcp sockets (influxdata#7446)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and idohalevi committed Sep 23, 2020
1 parent 38ee57d commit fcecf4e
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 24 deletions.
62 changes: 61 additions & 1 deletion internal/content_coding.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,78 @@
package internal

import (
"bufio"
"bytes"
"compress/gzip"
"errors"
"io"
)

// NewStreamContentDecoder returns a reader that will decode the stream
// according to the encoding type.
func NewStreamContentDecoder(encoding string, r io.Reader) (io.Reader, error) {
switch encoding {
case "gzip":
return NewGzipReader(r)
case "identity", "":
return r, nil
default:
return nil, errors.New("invalid value for content_encoding")
}
}

// GzipReader is similar to gzip.Reader but reads only a single gzip stream per read.
type GzipReader struct {
r io.Reader
z *gzip.Reader
endOfStream bool
}

func NewGzipReader(r io.Reader) (io.Reader, error) {
// We need a read that implements ByteReader in order to line up the next
// stream.
br := bufio.NewReader(r)

// Reads the first gzip stream header.
z, err := gzip.NewReader(br)
if err != nil {
return nil, err
}

// Prevent future calls to Read from reading the following gzip header.
z.Multistream(false)

return &GzipReader{r: br, z: z}, nil
}

func (r *GzipReader) Read(b []byte) (int, error) {
if r.endOfStream {
// Reads the next gzip header and prepares for the next stream.
err := r.z.Reset(r.r)
if err != nil {
return 0, err
}
r.z.Multistream(false)
r.endOfStream = false
}

n, err := r.z.Read(b)

// Since multistream is disabled, io.EOF indicates the end of the gzip
// sequence. On the next read we must read the next gzip header.
if err == io.EOF {
r.endOfStream = true
return n, nil
}
return n, err

}

// NewContentEncoder returns a ContentEncoder for the encoding type.
func NewContentEncoder(encoding string) (ContentEncoder, error) {
switch encoding {
case "gzip":
return NewGzipEncoder()

case "identity", "":
return NewIdentityEncoder(), nil
default:
Expand Down
36 changes: 36 additions & 0 deletions internal/content_coding_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package internal

import (
"bytes"
"io/ioutil"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -56,3 +58,37 @@ func TestIdentityEncodeDecode(t *testing.T) {

require.Equal(t, "howdy", string(actual))
}

func TestStreamIdentityDecode(t *testing.T) {
var r bytes.Buffer
n, err := r.Write([]byte("howdy"))
require.NoError(t, err)
require.Equal(t, 5, n)

dec, err := NewStreamContentDecoder("identity", &r)
require.NoError(t, err)

data, err := ioutil.ReadAll(dec)
require.NoError(t, err)

require.Equal(t, []byte("howdy"), data)
}

func TestStreamGzipDecode(t *testing.T) {
enc, err := NewGzipEncoder()
require.NoError(t, err)
written, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)

w := bytes.NewBuffer(written)

dec, err := NewStreamContentDecoder("gzip", w)
require.NoError(t, err)

b := make([]byte, 10)
n, err := dec.Read(b)
require.NoError(t, err)
require.Equal(t, 5, n)

require.Equal(t, []byte("howdy"), b[:n])
}
33 changes: 15 additions & 18 deletions plugins/inputs/socket_listener/socket_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ func (ssl *streamSocketListener) read(c net.Conn) {
defer ssl.removeConnection(c)
defer c.Close()

scnr := bufio.NewScanner(c)
decoder, err := internal.NewStreamContentDecoder(ssl.ContentEncoding, c)
if err != nil {
ssl.Log.Error("Read error: %v", err)
}

scnr := bufio.NewScanner(decoder)
for {
if ssl.ReadTimeout != nil && ssl.ReadTimeout.Duration > 0 {
c.SetReadDeadline(time.Now().Add(ssl.ReadTimeout.Duration))
Expand All @@ -120,11 +125,7 @@ func (ssl *streamSocketListener) read(c net.Conn) {
break
}

body, err := ssl.decoder.Decode(scnr.Bytes())
if err != nil {
ssl.Log.Errorf("Unable to decode incoming line: %s", err.Error())
continue
}
body := scnr.Bytes()

metrics, err := ssl.Parse(body)
if err != nil {
Expand All @@ -149,6 +150,7 @@ func (ssl *streamSocketListener) read(c net.Conn) {
type packetSocketListener struct {
net.PacketConn
*SocketListener
decoder internal.ContentDecoder
}

func (psl *packetSocketListener) listen() {
Expand Down Expand Up @@ -196,7 +198,6 @@ type SocketListener struct {
parsers.Parser
telegraf.Accumulator
io.Closer
decoder internal.ContentDecoder
}

func (sl *SocketListener) Description() string {
Expand Down Expand Up @@ -283,12 +284,6 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
protocol := spl[0]
addr := spl[1]

var err error
sl.decoder, err = internal.NewContentDecoder(sl.ContentEncoding)
if err != nil {
return err
}

if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" {
// no good way of testing for "file does not exist".
// Instead just ignore error and blow up when we try to listen, which will
Expand All @@ -298,16 +293,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {

switch protocol {
case "tcp", "tcp4", "tcp6", "unix", "unixpacket":
var (
err error
l net.Listener
)

tlsCfg, err := sl.ServerConfig.TLSConfig()
if err != nil {
return err
}

var l net.Listener
if tlsCfg == nil {
l, err = net.Listen(protocol, addr)
} else {
Expand Down Expand Up @@ -344,6 +335,11 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
ssl.listen()
}()
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
decoder, err := internal.NewContentDecoder(sl.ContentEncoding)
if err != nil {
return err
}

pc, err := udpListen(protocol, addr)
if err != nil {
return err
Expand Down Expand Up @@ -373,6 +369,7 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
psl := &packetSocketListener{
PacketConn: pc,
SocketListener: sl,
decoder: decoder,
}

sl.Closer = psl
Expand Down
6 changes: 1 addition & 5 deletions plugins/inputs/socket_listener/socket_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestSocketListenerDecode_udp(t *testing.T) {

func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) {
mstr12 := []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n")
mstr3 := []byte("test,foo=zab v=3i 123456791")
mstr3 := []byte("test,foo=zab v=3i 123456791\n")

if sl.ContentEncoding == "gzip" {
encoder, err := internal.NewContentEncoder(sl.ContentEncoding)
Expand All @@ -238,10 +238,6 @@ func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) {

client.Write(mstr12)
client.Write(mstr3)
if client.LocalAddr().Network() != "udp" {
// stream connection. needs trailing newline to terminate mstr3
client.Write([]byte{'\n'})
}

acc := sl.Accumulator.(*testutil.Accumulator)

Expand Down

0 comments on commit fcecf4e

Please sign in to comment.