Skip to content

Commit

Permalink
Add gzip to elastic&splunk output plugins (#650)
Browse files Browse the repository at this point in the history
* Add gzip to elastic output

* Splunk output fasthttp + gzip

* Beautify elastic output

* Fallback to raw after gzip error

* Add gzip compression level

* Add doc

* Fix docker compose
  • Loading branch information
kirillov6 authored Aug 5, 2024
1 parent 3f2080c commit 42d0568
Show file tree
Hide file tree
Showing 7 changed files with 456 additions and 104 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ test-e2e:
.PHONY: test-e2e-docker-up
test-e2e-docker-up:
for dc in $(shell find e2e -name 'docker-compose.yml') ; do \
docker-compose -f $$dc up -d ; \
docker compose -f $$dc up -d ; \
done

.PHONY: test-e2e-docker-down
test-e2e-docker-down:
for dc in $(shell find e2e -name 'docker-compose.yml') ; do \
docker-compose -f $$dc down ; \
docker compose -f $$dc down ; \
done

.PHONY: bench-file
Expand Down
12 changes: 12 additions & 0 deletions plugin/output/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ The list of elasticsearch endpoints in the following format: `SCHEMA://HOST:PORT

<br>

**`use_gzip`** *`bool`* *`default=false`*

If set, the plugin will use gzip encoding.

<br>

**`gzip_compression_level`** *`string`* *`default=default`* *`options=default|no|best-speed|best-compression|huffman-only`*

Gzip compression level. Used if `use_gzip=true`.

<br>

**`username`** *`string`*

Username for HTTP Basic Authentication.
Expand Down
158 changes: 107 additions & 51 deletions plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,57 @@ If a network error occurs, the batch will infinitely try to be delivered to the
}*/

const (
outPluginType = "elasticsearch"
NDJSONContentType = "application/x-ndjson"
outPluginType = "elasticsearch"

NDJSONContentType = "application/x-ndjson"
gzipContentEncoding = "gzip"
)

type gzipCompressionLevel int

const (
gzipCompressionLevelDefault gzipCompressionLevel = iota
gzipCompressionLevelNo
gzipCompressionLevelBestSpeed
gzipCompressionLevelBestCompression
gzipCompressionLevelHuffmanOnly
)

func (l gzipCompressionLevel) toFastHTTP() int {
switch l {
case gzipCompressionLevelNo:
return fasthttp.CompressNoCompression
case gzipCompressionLevelBestSpeed:
return fasthttp.CompressBestSpeed
case gzipCompressionLevelBestCompression:
return fasthttp.CompressBestCompression
case gzipCompressionLevelHuffmanOnly:
return fasthttp.CompressHuffmanOnly
default:
return fasthttp.CompressDefaultCompression
}
}

var (
strAuthorization = []byte(fasthttp.HeaderAuthorization)
)

type Plugin struct {
logger *zap.Logger
client *fasthttp.Client
endpoints []*fasthttp.URI
cancel context.CancelFunc
config *Config
authHeader []byte
config *Config

client *fasthttp.Client
endpoints []*fasthttp.URI
authHeader []byte

logger *zap.Logger
controller pipeline.OutputPluginController

batcher *pipeline.RetriableBatcher
avgEventSize int

time string
headerPrefix string
batcher *pipeline.RetriableBatcher
controller pipeline.OutputPluginController
cancel context.CancelFunc
mu *sync.Mutex

// plugin metrics
Expand All @@ -63,6 +94,17 @@ type Config struct {
// > The list of elasticsearch endpoints in the following format: `SCHEMA://HOST:PORT`
Endpoints []string `json:"endpoints" required:"true"` // *

// > @3@4@5@6
// >
// > If set, the plugin will use gzip encoding.
UseGzip bool `json:"use_gzip" default:"false"` // *

// > @3@4@5@6
// >
// > Gzip compression level. Used if `use_gzip=true`.
GzipCompressionLevel string `json:"gzip_compression_level" default:"default" options:"default|no|best-speed|best-compression|huffman-only"` // *
GzipCompressionLevel_ gzipCompressionLevel

// > @3@4@5@6
// >
// > Username for HTTP Basic Authentication.
Expand Down Expand Up @@ -192,36 +234,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.config.IndexValues = append(p.config.IndexValues, "@time")
}

for _, endpoint := range p.config.Endpoints {
if endpoint[len(endpoint)-1] == '/' {
endpoint = endpoint[:len(endpoint)-1]
}

uri := &fasthttp.URI{}
if err := uri.Parse(nil, []byte(endpoint+"/_bulk?_source=false")); err != nil {
logger.Fatalf("can't parse ES endpoint %s: %s", endpoint, err.Error())
}

p.endpoints = append(p.endpoints, uri)
}

p.client = &fasthttp.Client{
ReadTimeout: p.config.ConnectionTimeout_ * 2,
WriteTimeout: p.config.ConnectionTimeout_ * 2,
MaxConnDuration: time.Minute * 5,
}

if p.config.CACert != "" {
b := xtls.NewConfigBuilder()
err := b.AppendCARoot(p.config.CACert)
if err != nil {
p.logger.Fatal("can't append CA root", zap.Error(err))
}

p.client.TLSConfig = b.Build()
}

p.authHeader = p.getAuthHeader()
p.prepareClient()

p.maintenance(nil)

Expand Down Expand Up @@ -286,6 +299,38 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.indexingErrorsMetric = ctl.RegisterCounter("output_elasticsearch_index_error", "Number of elasticsearch indexing errors")
}

func (p *Plugin) prepareClient() {
p.client = &fasthttp.Client{
ReadTimeout: p.config.ConnectionTimeout_ * 2,
WriteTimeout: p.config.ConnectionTimeout_ * 2,
MaxConnDuration: time.Minute * 5,
}
if p.config.CACert != "" {
b := xtls.NewConfigBuilder()
err := b.AppendCARoot(p.config.CACert)
if err != nil {
p.logger.Fatal("can't append CA root", zap.Error(err))
}

p.client.TLSConfig = b.Build()
}

for _, endpoint := range p.config.Endpoints {
if endpoint[len(endpoint)-1] == '/' {
endpoint = endpoint[:len(endpoint)-1]
}

uri := &fasthttp.URI{}
if err := uri.Parse(nil, []byte(endpoint+"/_bulk?_source=false")); err != nil {
logger.Fatalf("can't parse ES endpoint %s: %s", endpoint, err.Error())
}

p.endpoints = append(p.endpoints, uri)
}

p.authHeader = p.getAuthHeader()
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error {
if *workerData == nil {
*workerData = &data{
Expand Down Expand Up @@ -319,11 +364,7 @@ func (p *Plugin) send(body []byte) error {
defer fasthttp.ReleaseResponse(resp)

endpoint := p.endpoints[rand.Int()%len(p.endpoints)]
req.SetURI(endpoint)
req.SetBodyRaw(body)
req.Header.SetMethod(fasthttp.MethodPost)
req.Header.SetContentType(NDJSONContentType)
p.setAuthHeader(req)
p.prepareRequest(req, endpoint, body)

if err := p.client.DoTimeout(req, resp, p.config.ConnectionTimeout_); err != nil {
return fmt.Errorf("can't send batch to %s: %s", endpoint.String(), err.Error())
Expand All @@ -346,6 +387,27 @@ func (p *Plugin) send(body []byte) error {
return nil
}

func (p *Plugin) prepareRequest(req *fasthttp.Request, endpoint *fasthttp.URI, body []byte) {
req.SetURI(endpoint)

req.Header.SetMethod(fasthttp.MethodPost)
req.Header.SetContentType(NDJSONContentType)

if p.authHeader != nil {
req.Header.SetBytesKV(strAuthorization, p.authHeader)
}

if p.config.UseGzip {
if _, err := fasthttp.WriteGzipLevel(req.BodyWriter(), body, p.config.GzipCompressionLevel_.toFastHTTP()); err != nil {
req.SetBodyRaw(body)
} else {
req.Header.SetContentEncoding(gzipContentEncoding)
}
} else {
req.SetBodyRaw(body)
}
}

func (p *Plugin) appendEvent(outBuf []byte, event *pipeline.Event) []byte {
// index command
outBuf = p.appendIndexName(outBuf, event)
Expand Down Expand Up @@ -406,12 +468,6 @@ func (p *Plugin) getAuthHeader() []byte {
return nil
}

func (p *Plugin) setAuthHeader(req *fasthttp.Request) {
if p.authHeader != nil {
req.Header.SetBytesKV(strAuthorization, p.authHeader)
}
}

// example of an ElasticSearch response that returned an indexing error for the first log:
//
// {
Expand Down
82 changes: 82 additions & 0 deletions plugin/output/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/valyala/fasthttp"
insaneJSON "github.com/vitkovskii/insane-json"

"github.com/ozontech/file.d/pipeline"
Expand Down Expand Up @@ -108,3 +109,84 @@ func TestConfig(t *testing.T) {
assert.Equal(t, results[i], p.endpoints[i].String())
}
}

func TestPrepareRequest(t *testing.T) {
type wantData struct {
uri string
method []byte
contentType []byte
contentEncoding []byte
auth []byte
body []byte
}

cases := []struct {
name string
config *Config

body string
want wantData
}{
{
name: "raw",
config: &Config{
Endpoints: []string{"http://endpoint:9000"},
APIKey: "test",
},
body: "test",
want: wantData{
uri: "http://endpoint:9000/_bulk?_source=false",
method: []byte(fasthttp.MethodPost),
contentType: []byte(NDJSONContentType),
auth: []byte("ApiKey test"),
body: []byte("test"),
},
},
{
name: "gzip",
config: &Config{
Endpoints: []string{"http://endpoint:9000"},
UseGzip: true,
GzipCompressionLevel_: gzipCompressionLevelBestSpeed,
},
body: "test",
want: wantData{
uri: "http://endpoint:9000/_bulk?_source=false",
method: []byte(fasthttp.MethodPost),
contentType: []byte(NDJSONContentType),
contentEncoding: []byte(gzipContentEncoding),
body: []byte("test"),
},
},
}
for _, tt := range cases {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

p := Plugin{
config: tt.config,
}
p.prepareClient()

req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)

p.prepareRequest(req, p.endpoints[0], []byte(tt.body))

require.Equal(t, tt.want.uri, req.URI().String(), "wrong uri")
require.Equal(t, tt.want.method, req.Header.Method(), "wrong method")
require.Equal(t, tt.want.contentType, req.Header.ContentType(), "wrong content type")
require.Equal(t, tt.want.contentEncoding, req.Header.ContentEncoding(), "wrong content encoding")
require.Equal(t, tt.want.auth, req.Header.PeekBytes(strAuthorization), "wrong auth")

var body []byte
if tt.config.UseGzip {
body, _ = req.BodyUncompressed()
} else {
body = req.Body()
}
require.Equal(t, tt.want.body, body, "wrong body")
})
}
}
12 changes: 12 additions & 0 deletions plugin/output/splunk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ A full URI address of splunk HEC endpoint. Format: `http://127.0.0.1:8088/servic

<br>

**`use_gzip`** *`bool`* *`default=false`*

If set, the plugin will use gzip encoding.

<br>

**`gzip_compression_level`** *`string`* *`default=default`* *`options=default|no|best-speed|best-compression|huffman-only`*

Gzip compression level. Used if `use_gzip=true`.

<br>

**`token`** *`string`* *`required`*

Token for an authentication for a HEC endpoint.
Expand Down
Loading

0 comments on commit 42d0568

Please sign in to comment.