Skip to content

Commit

Permalink
Extract CompressWithGzip into the internal package
Browse files Browse the repository at this point in the history
  • Loading branch information
mihaitodor committed Oct 5, 2018
1 parent c5502a0 commit 4a49fee
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 48 deletions.
20 changes: 20 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package internal
import (
"bufio"
"bytes"
"compress/gzip"
"crypto/rand"
"errors"
"io"
"log"
"math/big"
"os"
Expand Down Expand Up @@ -208,3 +210,21 @@ func ExitStatus(err error) (int, bool) {
}
return 0, false
}

// CompressWithGzip takes an io.Reader as input and pipes
// it through a gzip.Writer returning an io.Reader containing
// the gzipped data.
// An error is returned if passing data to the gzip.Writer fails
func CompressWithGzip(data io.Reader) (io.Reader, error) {
pipeReader, pipeWriter := io.Pipe()
gzipWriter := gzip.NewWriter(pipeWriter)

var err error
go func() {
_, err = io.Copy(gzipWriter, data)
gzipWriter.Close()
pipeWriter.Close()
}()

return pipeReader, err
}
20 changes: 20 additions & 0 deletions internal/internal_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package internal

import (
"bytes"
"compress/gzip"
"io/ioutil"
"os/exec"
"testing"
"time"
Expand Down Expand Up @@ -162,3 +165,20 @@ func TestDuration(t *testing.T) {
d.UnmarshalTOML([]byte(`1.5`))
assert.Equal(t, time.Second, d.Duration)
}

func TestCompressWithGzip(t *testing.T) {
testData := "the quick brown fox jumps over the lazy dog"
inputBuffer := bytes.NewBuffer([]byte(testData))

outputBuffer, err := CompressWithGzip(inputBuffer)
assert.NoError(t, err)

gzipReader, err := gzip.NewReader(outputBuffer)
assert.NoError(t, err)
defer gzipReader.Close()

output, err := ioutil.ReadAll(gzipReader)
assert.NoError(t, err)

assert.Equal(t, testData, string(output))
}
17 changes: 1 addition & 16 deletions plugins/outputs/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package http

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -173,7 +172,7 @@ func (h *HTTP) write(reqBody []byte) error {
var err error

if h.ContentEncoding == "gzip" && h.Method == http.MethodPost {
reqBodyBuffer, err = compressWithGzip(reqBodyBuffer)
reqBodyBuffer, err = internal.CompressWithGzip(reqBodyBuffer)
if err != nil {
return err
}
Expand Down Expand Up @@ -210,20 +209,6 @@ func (h *HTTP) write(reqBody []byte) error {
return nil
}

func compressWithGzip(data io.Reader) (io.Reader, error) {
pr, pw := io.Pipe()
gw := gzip.NewWriter(pw)
var err error

go func() {
_, err = io.Copy(gw, data)
gw.Close()
pw.Close()
}()

return pr, err
}

func init() {
outputs.Add("http", func() telegraf.Output {
return &HTTP{
Expand Down
18 changes: 2 additions & 16 deletions plugins/outputs/influxdb/http.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package influxdb

import (
"compress/gzip"
"context"
"crypto/tls"
"encoding/json"
Expand All @@ -16,6 +15,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/serializers/influx"
)

Expand Down Expand Up @@ -360,7 +360,7 @@ func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) {
func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) {
var err error
if c.ContentEncoding == "gzip" {
body, err = compressWithGzip(body)
body, err = internal.CompressWithGzip(body)
if err != nil {
return nil, err
}
Expand All @@ -381,20 +381,6 @@ func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) {
return req, nil
}

func compressWithGzip(data io.Reader) (io.Reader, error) {
pr, pw := io.Pipe()
gw := gzip.NewWriter(pw)
var err error

go func() {
_, err = io.Copy(gw, data)
gw.Close()
pw.Close()
}()

return pr, err
}

func (c *httpClient) addHeaders(req *http.Request) {
if c.Username != "" || c.Password != "" {
req.SetBasicAuth(c.Username, c.Password)
Expand Down
18 changes: 2 additions & 16 deletions plugins/outputs/influxdb_v2/http.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package influxdb_v2

import (
"compress/gzip"
"context"
"crypto/tls"
"encoding/json"
Expand All @@ -17,6 +16,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/serializers/influx"
)

Expand Down Expand Up @@ -231,7 +231,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) {
var err error
if c.ContentEncoding == "gzip" {
body, err = compressWithGzip(body)
body, err = internal.CompressWithGzip(body)
if err != nil {
return nil, err
}
Expand All @@ -258,20 +258,6 @@ func (c *httpClient) addHeaders(req *http.Request) {
}
}

func compressWithGzip(data io.Reader) (io.Reader, error) {
pipeReader, pipeWriter := io.Pipe()
gzipWriter := gzip.NewWriter(pipeWriter)
var err error

go func() {
_, err = io.Copy(gzipWriter, data)
gzipWriter.Close()
pipeWriter.Close()
}()

return pipeReader, err
}

func makeWriteURL(loc url.URL, org, bucket string) (string, error) {
params := url.Values{}
params.Set("bucket", bucket)
Expand Down

0 comments on commit 4a49fee

Please sign in to comment.