Skip to content

Commit

Permalink
Fix HEC exporter throwing 400s
Browse files Browse the repository at this point in the history
This commit fixes the HEC exporter sending log payload with missing the GZIP footer.

gzip.Writer.Flush doesn't write GZIP footer. As a result reading the compressed payload fails with "unexpected EOF". This commit changes gzip.Writer.Flush call to gzip.Writer.Close to make sure that the GZIP footer is always set.
  • Loading branch information
dmitryax committed Apr 9, 2021
1 parent 1638363 commit c51dce2
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 14 deletions.
4 changes: 1 addition & 3 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (err error) {
gzipBuffer := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthLogs))
gzipWriter.Reset(gzipBuffer)

defer gzipWriter.Close()

// Callback when each batch is to be sent.
send := func(ctx context.Context, buf *bytes.Buffer) (err error) {
shouldCompress := buf.Len() >= minCompressionLen && !c.config.DisableCompression
Expand All @@ -146,7 +144,7 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (err error) {
return fmt.Errorf("failed copying buffer to gzip writer: %v", err)
}

if err = gzipWriter.Flush(); err != nil {
if err = gzipWriter.Close(); err != nil {
return fmt.Errorf("failed flushing compressed data to gzip writer: %v", err)
}

Expand Down
100 changes: 89 additions & 11 deletions exporter/splunkhecexporter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package splunkhecexporter

import (
"bytes"
"compress/gzip"
"context"
"errors"
Expand Down Expand Up @@ -128,7 +129,7 @@ func createLogData(numResources int, numLibraries int, numRecords int) pdata.Log

type CapturingData struct {
testing *testing.T
receivedRequest chan string
receivedRequest chan []byte
statusCode int
checkCompression bool
}
Expand All @@ -146,7 +147,7 @@ func (c *CapturingData) ServeHTTP(w http.ResponseWriter, r *http.Request) {
panic(err)
}
go func() {
c.receivedRequest <- string(body)
c.receivedRequest <- body
}()
w.WriteHeader(c.statusCode)
}
Expand All @@ -163,7 +164,7 @@ func runMetricsExport(disableCompression bool, numberOfDataPoints int, t *testin
cfg.DisableCompression = disableCompression
cfg.Token = "1234-1234"

receivedRequest := make(chan string)
receivedRequest := make(chan []byte)
capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !cfg.DisableCompression}
s := &http.Server{
Handler: &capture,
Expand All @@ -184,7 +185,7 @@ func runMetricsExport(disableCompression bool, numberOfDataPoints int, t *testin
assert.NoError(t, err)
select {
case request := <-receivedRequest:
return request, nil
return string(request), nil
case <-time.After(1 * time.Second):
return "", errors.New("timeout")
}
Expand All @@ -202,7 +203,7 @@ func runTraceExport(disableCompression bool, numberOfTraces int, t *testing.T) (
cfg.DisableCompression = disableCompression
cfg.Token = "1234-1234"

receivedRequest := make(chan string)
receivedRequest := make(chan []byte)
capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !cfg.DisableCompression}
s := &http.Server{
Handler: &capture,
Expand All @@ -223,13 +224,13 @@ func runTraceExport(disableCompression bool, numberOfTraces int, t *testing.T) (
assert.NoError(t, err)
select {
case request := <-receivedRequest:
return request, nil
return string(request), nil
case <-time.After(1 * time.Second):
return "", errors.New("timeout")
}
}

func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([]string, error) {
func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([][]byte, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
Expand All @@ -238,7 +239,7 @@ func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([]string, error) {
cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
cfg.Token = "1234-1234"

receivedRequest := make(chan string)
receivedRequest := make(chan []byte)
capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !cfg.DisableCompression}
s := &http.Server{
Handler: &capture,
Expand All @@ -256,7 +257,7 @@ func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([]string, error) {
err = exporter.ConsumeLogs(context.Background(), ld)
assert.NoError(t, err)

var requests []string
var requests [][]byte
for {
select {
case request := <-receivedRequest:
Expand Down Expand Up @@ -286,8 +287,13 @@ func TestReceiveLogs(t *testing.T) {
type wantType struct {
batches []string
numBatches int
compressed bool
}

// The test cases depend on the constant minCompressionLen = 1500.
// If the constant changed, the test cases with want.compressed=true must be updated.
require.Equal(t, minCompressionLen, 1500)

tests := []struct {
name string
conf *Config
Expand Down Expand Up @@ -348,6 +354,62 @@ func TestReceiveLogs(t *testing.T) {
numBatches: 2,
},
},
{
name: "1 compressed batch of 1837 bytes, make sure the event size is more than minCompressionLen=1500 to trigger compression",
logs: createLogData(1, 1, 10),
conf: func() *Config {
return NewFactory().CreateDefaultConfig().(*Config)
}(),
want: wantType{
batches: []string{
`{"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.001,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.002,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.003,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.004,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.005,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.006,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.007,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.008,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.009,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n",
},
numBatches: 1,
compressed: true,
},
},
{
name: "2 compressed batches - 1652 bytes each, make sure the log size is more than minCompressionLen=1500 to trigger compression",
logs: createLogData(1, 1, 18), // comes to HEC events payload size - 1837 bytes
conf: func() *Config {
cfg := NewFactory().CreateDefaultConfig().(*Config)
cfg.MaxContentLengthLogs = 1700
return cfg
}(),
want: wantType{
batches: []string{
`{"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.001,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.002,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.003,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.004,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.005,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.006,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.007,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.008,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n",
`{"time":0.009,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.01,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.011,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.012,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.013,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.014,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.015,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.016,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" +
`{"time":0.017,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n",
},
numBatches: 2,
compressed: true,
},
},
}

for _, test := range tests {
Expand All @@ -359,7 +421,11 @@ func TestReceiveLogs(t *testing.T) {

for i := 0; i < test.want.numBatches; i++ {
require.NotZero(t, got[i])
assert.Equal(t, test.want.batches[i], got[i])
if test.want.compressed {
validateCompressedEqual(t, test.want.batches[i], got[i])
} else {
assert.Equal(t, test.want.batches[i], string(got[i]))
}

}
})
Expand Down Expand Up @@ -391,7 +457,7 @@ func TestReceiveMetricsWithCompression(t *testing.T) {
}

func TestErrorReceived(t *testing.T) {
receivedRequest := make(chan string)
receivedRequest := make(chan []byte)
capture := CapturingData{receivedRequest: receivedRequest, statusCode: 500}
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
Expand Down Expand Up @@ -720,3 +786,15 @@ func TestSubLogs(t *testing.T) {
// The name of the sole log record should be 1_1_2.
assert.Equal(t, "1_1_2", got.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(0).Name())
}

// validateCompressedEqual validates that GZipped `got` contains `expected` string
func validateCompressedEqual(t *testing.T, expected string, got []byte) {
z, err := gzip.NewReader(bytes.NewReader(got))
require.NoError(t, err)
defer z.Close()

p, err := ioutil.ReadAll(z)
require.NoError(t, err)

assert.Equal(t, expected, string(p))
}

0 comments on commit c51dce2

Please sign in to comment.