Skip to content

Commit

Permalink
Add xhttp pkg (#674)
Browse files Browse the repository at this point in the history
* Add xhttp pkg

* Fixik

* Fix test

* Fix test
  • Loading branch information
kirillov6 committed Sep 12, 2024
1 parent cfa0fd2 commit bbc5cd0
Show file tree
Hide file tree
Showing 7 changed files with 376 additions and 405 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ tests-offsets
testdata

.idea/
.vscode
.DS_Store
182 changes: 53 additions & 129 deletions plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@ import (
"context"
"encoding/base64"
"fmt"
"math/rand"
"net/http"
"sync"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/xtls"
"github.com/ozontech/file.d/xhttp"
"github.com/prometheus/client_golang/prometheus"
"github.com/valyala/fasthttp"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -30,45 +27,13 @@ If a network error occurs, the batch will infinitely try to be delivered to the
const (
outPluginType = "elasticsearch"

NDJSONContentType = "application/x-ndjson"
gzipContentEncoding = "gzip"
)

type gzipCompressionLevel int

const (
gzipCompressionLevelDefault gzipCompressionLevel = iota
gzipCompressionLevelNo
gzipCompressionLevelBestSpeed
gzipCompressionLevelBestCompression
gzipCompressionLevelHuffmanOnly
)

func (l gzipCompressionLevel) toFastHTTP() int {
switch l {
case gzipCompressionLevelNo:
return fasthttp.CompressNoCompression
case gzipCompressionLevelBestSpeed:
return fasthttp.CompressBestSpeed
case gzipCompressionLevelBestCompression:
return fasthttp.CompressBestCompression
case gzipCompressionLevelHuffmanOnly:
return fasthttp.CompressHuffmanOnly
default:
return fasthttp.CompressDefaultCompression
}
}

var (
strAuthorization = []byte(fasthttp.HeaderAuthorization)
NDJSONContentType = "application/x-ndjson"
)

type Plugin struct {
config *Config

client *fasthttp.Client
endpoints []*fasthttp.URI
authHeader []byte
client *xhttp.Client

logger *zap.Logger
controller pipeline.OutputPluginController
Expand Down Expand Up @@ -102,8 +67,7 @@ type Config struct {
// > @3@4@5@6
// >
// > Gzip compression level. Used if `use_gzip=true`.
GzipCompressionLevel string `json:"gzip_compression_level" default:"default" options:"default|no|best-speed|best-compression|huffman-only"` // *
GzipCompressionLevel_ gzipCompressionLevel
GzipCompressionLevel string `json:"gzip_compression_level" default:"default" options:"default|no|best-speed|best-compression|huffman-only"` // *

// > @3@4@5@6
// >
Expand Down Expand Up @@ -322,37 +286,46 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
}

func (p *Plugin) prepareClient() {
p.client = &fasthttp.Client{
ReadTimeout: p.config.ConnectionTimeout_ * 2,
WriteTimeout: p.config.ConnectionTimeout_ * 2,

MaxIdleConnDuration: p.config.KeepAlive.MaxIdleConnDuration_,
MaxConnDuration: p.config.KeepAlive.MaxConnDuration_,
config := &xhttp.ClientConfig{
Endpoints: prepareEndpoints(p.config.Endpoints),
ConnectionTimeout: p.config.ConnectionTimeout_ * 2,
AuthHeader: p.getAuthHeader(),
KeepAlive: &xhttp.ClientKeepAliveConfig{
MaxConnDuration: p.config.KeepAlive.MaxConnDuration_,
MaxIdleConnDuration: p.config.KeepAlive.MaxIdleConnDuration_,
},
}
if p.config.CACert != "" {
b := xtls.NewConfigBuilder()
err := b.AppendCARoot(p.config.CACert)
if err != nil {
p.logger.Fatal("can't append CA root", zap.Error(err))
config.TLS = &xhttp.ClientTLSConfig{
CACert: p.config.CACert,
}

p.client.TLSConfig = b.Build()
}
if p.config.UseGzip {
config.GzipCompressionLevel = p.config.GzipCompressionLevel
}

for _, endpoint := range p.config.Endpoints {
if endpoint[len(endpoint)-1] == '/' {
endpoint = endpoint[:len(endpoint)-1]
}
var err error
p.client, err = xhttp.NewClient(config)
if err != nil {
p.logger.Fatal("can't create http client", zap.Error(err))
}
}

uri := &fasthttp.URI{}
if err := uri.Parse(nil, []byte(endpoint+"/_bulk?_source=false")); err != nil {
logger.Fatalf("can't parse ES endpoint %s: %s", endpoint, err.Error())
func prepareEndpoints(endpoints []string) []string {
res := make([]string, 0, len(endpoints))
for _, e := range endpoints {
if e[len(e)-1] == '/' {
e = e[:len(e)-1]
}

p.endpoints = append(p.endpoints, uri)
res = append(res, e+"/_bulk?_source=false")
}
return res
}

p.authHeader = p.getAuthHeader()
func (p *Plugin) maintenance(_ *pipeline.WorkerData) {
p.mu.Lock()
p.time = time.Now().Format(p.config.TimeFormat)
p.mu.Unlock()
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error {
Expand All @@ -373,63 +346,15 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
data.outBuf = p.appendEvent(data.outBuf, event)
})

err := p.send(data.outBuf)
_, err := p.client.DoTimeout(http.MethodPost, NDJSONContentType, data.outBuf,
p.config.ConnectionTimeout_, p.reportESErrors)

if err != nil {
p.sendErrorMetric.Inc()
p.logger.Error("can't send to the elastic, will try other endpoint", zap.Error(err))
}
return err
}

func (p *Plugin) send(body []byte) error {
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)

endpoint := p.endpoints[rand.Int()%len(p.endpoints)]
p.prepareRequest(req, endpoint, body)

if err := p.client.DoTimeout(req, resp, p.config.ConnectionTimeout_); err != nil {
return fmt.Errorf("can't send batch to %s: %s", endpoint.String(), err.Error())
}

respContent := resp.Body()

if statusCode := resp.Header.StatusCode(); statusCode < http.StatusOK || statusCode > http.StatusAccepted {
return fmt.Errorf("response status from %s isn't OK: status=%d, body=%s", endpoint.String(), statusCode, string(respContent))
}

root, err := insaneJSON.DecodeBytes(respContent)
if err != nil {
return fmt.Errorf("wrong response from %s: %s", endpoint.String(), err.Error())
}
defer insaneJSON.Release(root)

p.reportESErrors(root)

return nil
}

func (p *Plugin) prepareRequest(req *fasthttp.Request, endpoint *fasthttp.URI, body []byte) {
req.SetURI(endpoint)

req.Header.SetMethod(fasthttp.MethodPost)
req.Header.SetContentType(NDJSONContentType)

if p.authHeader != nil {
req.Header.SetBytesKV(strAuthorization, p.authHeader)
}

if p.config.UseGzip {
if _, err := fasthttp.WriteGzipLevel(req.BodyWriter(), body, p.config.GzipCompressionLevel_.toFastHTTP()); err != nil {
req.SetBodyRaw(body)
} else {
req.Header.SetContentEncoding(gzipContentEncoding)
}
} else {
req.SetBodyRaw(body)
}
return err
}

func (p *Plugin) appendEvent(outBuf []byte, event *pipeline.Event) []byte {
Expand Down Expand Up @@ -473,23 +398,15 @@ func (p *Plugin) appendIndexName(outBuf []byte, event *pipeline.Event) []byte {
return outBuf
}

func (p *Plugin) maintenance(_ *pipeline.WorkerData) {
p.mu.Lock()
p.time = time.Now().Format(p.config.TimeFormat)
p.mu.Unlock()
}

func (p *Plugin) getAuthHeader() []byte {
func (p *Plugin) getAuthHeader() string {
if p.config.APIKey != "" {
return []byte("ApiKey " + p.config.APIKey)
return "ApiKey " + p.config.APIKey
}
if p.config.Username != "" && p.config.Password != "" {
credentials := []byte(p.config.Username + ":" + p.config.Password)
buf := make([]byte, base64.StdEncoding.EncodedLen(len(credentials)))
base64.StdEncoding.Encode(buf, credentials)
return append([]byte("Basic "), buf...)
return "Basic " + base64.StdEncoding.EncodeToString(credentials)
}
return nil
return ""
}

// example of an ElasticSearch response that returned an indexing error for the first log:
Expand Down Expand Up @@ -533,17 +450,23 @@ func (p *Plugin) getAuthHeader() []byte {
// }
// ]
// }
func (p *Plugin) reportESErrors(root *insaneJSON.Root) {
func (p *Plugin) reportESErrors(data []byte) error {
root, err := insaneJSON.DecodeBytes(data)
defer insaneJSON.Release(root)
if err != nil {
return fmt.Errorf("can't decode response: %w", err)
}

if !root.Dig("errors").AsBool() {
return
return nil
}

items := root.Dig("items").AsArray()
if len(items) == 0 {
p.logger.Error("unknown elasticsearch error, 'items' field in the response is empty",
zap.String("response", root.EncodeToString()),
)
return
return nil
}

indexingErrors := 0
Expand Down Expand Up @@ -575,4 +498,5 @@ func (p *Plugin) reportESErrors(root *insaneJSON.Root) {
}

p.logger.Error("some events from batch aren't written, check previous logs for more information")
return nil
}
Loading

0 comments on commit bbc5cd0

Please sign in to comment.