diff --git a/config.go b/config.go index 71dfa0a71..04f356771 100644 --- a/config.go +++ b/config.go @@ -759,6 +759,15 @@ func getConfig() *types.Configuration { c.Elasticsearch.NumberOfShards = 3 } + if c.Elasticsearch.Batching.Enabled { + if c.Elasticsearch.Batching.BatchSize <= 0 { + c.Elasticsearch.Batching.BatchSize = types.DefaultBatchSize + } + if c.Elasticsearch.Batching.FlushInterval <= 0 { + c.Elasticsearch.Batching.FlushInterval = types.DefaultFlushInterval + } + } + if c.Prometheus.ExtraLabels != "" { c.Prometheus.ExtraLabelsList = strings.Split(strings.ReplaceAll(c.Prometheus.ExtraLabels, " ", ""), ",") } diff --git a/config_example.yaml b/config_example.yaml index 2aba56997..366296d3d 100644 --- a/config_example.yaml +++ b/config_example.yaml @@ -101,6 +101,12 @@ elasticsearch: # numberofreplicas: 3 # number of replicas set by the index template (default: 3) # customHeaders: # Custom headers to add in POST, useful for Authentication # key: value + # enablecompression: false # if true enables gzip compression for http requests (default: false) + # batching: # batching configuration, improves throughput dramatically utilizing _bulk Elasticsearch API + # enabled: true # if true enables batching + # batchsize: 5242880 # batch size in bytes (default: 5 MB) + # flushinterval: 1s # batch fush interval (default: 1s) + # maxconcurrentrequests: 1 # max number of concurrent http requests (default: 1) quickwit: # hostport: "" # http(s)://{domain or ip}:{port}, if not empty, Quickwit output is enabled diff --git a/docs/outputs/elasticsearch.md b/docs/outputs/elasticsearch.md index aa573d1a6..d31d0935c 100644 --- a/docs/outputs/elasticsearch.md +++ b/docs/outputs/elasticsearch.md @@ -15,7 +15,7 @@ | Setting | Env var | Default value | Description | | ----------------------------------- | ----------------------------------- | ---------------- | ----------------------------------------------------------------------------------------------------------------------------------- | -| `elasticsearch.hosport` | `ELASTICSEARCH_HOSTPORT` | | http://{domain or ip}:{port}, if not empty, Elasticsearch output is **enabled** | +| `elasticsearch.hostport` | `ELASTICSEARCH_HOSTPORT` | | http://{domain or ip}:{port}, if not empty, Elasticsearch output is **enabled** | | `elasticsearch.index` | `ELASTICSEARCH_INDEX` | `falco` | Index | | `elasticsearch.type` | `ELASTICSEARCH_TYPE` | `_doc` | Index | | `elasticsearch.suffix` | `ELASTICSEARCH_SUFFIX` | `daily` | Date suffix for index rotation : `daily`, `monthly`, `annually`, `none` | @@ -29,10 +29,22 @@ | `elasticsearch.mutualtls` | `ELASTICSEARCH_MUTUALTLS` | `false` | Authenticate to the output with TLS, if true, checkcert flag will be ignored (server cert will always be checked) | | `elasticsearch.checkcert` | `ELASTICSEARCH_CHECKCERT` | `true` | Check if ssl certificate of the output is valid | | `elasticsearch.minimumpriority` | `ELASTICSEARCH_MINIMUMPRIORITY` | `""` (= `debug`) | Minimum priority of event for using this output, order is `emergency,alert,critical,error,warning,notice,informational,debug or ""` | +| `elasticsearch.maxconcurrentrequests` | `ELASTICSEARCH_MAXCONCURRENTREQUESTS` | `1` | Max number of concurrent requests | +| `elasticsearch.enablecompression` | `ELASTICSEARCH_ENABLECOMPRESSION` | `false` | Enables gzip compression | +| `elasticsearch.batching.enabled` | | `false` | Enables batching (utilizing Elasticsearch bulk API) +| `elasticsearch.batching.batchsize` | | `5242880` | Batch size in bytes, default 5MB +| `elasticsearch.batching.flushinterval` | | `1s` | Batch flush interval, use valid Go duration string > [!NOTE] The Env var values override the settings from yaml file. +> [!NOTE] +Increasing the default number of concurrent requests is a good way to increase throughput of the http outputs. This also increases the potential number of open connections. Choose wisely. + +> [!NOTE] +Enabling batching for Elasticsearch is invaluable when the expected number of falco alerts is in the hundreds or thousands per second. The batching of data can be fine-tuned for your specific use case. The batch request is sent to Elasticsearch when the pending data size reaches `batchsize` or upon the `flushinterval`. +Enabling gzip compression increases throughput even further. + > [!WARNING] By enabling the creation of the index template with `elasticsearch.createindextemplate=true`, the output fields of the Falco events will be flatten to avoid any mapping conflict. @@ -51,6 +63,12 @@ elasticsearch: # mutualtls: false # if true, checkcert flag will be ignored (server cert will always be checked) # checkcert: true # check if ssl certificate of the output is valid (default: true) # minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) + # enablecompression: # if true enables gzip compression for http requests (default: false) + # batching: # batching configuration, improves throughput dramatically utilizing _bulk Elasticsearch API + # enabled: true # if true enables batching + # batchsize: 5242880 # batch size in bytes (default: 5 MB) + # flushinterval: 1s # batch fush interval (default: 1s) + # maxconcurrentrequests: # max number of concurrent http requests (default: 1) ``` ## Screenshots diff --git a/docs/outputs/quickwit.md b/docs/outputs/quickwit.md index 480332698..780e28287 100644 --- a/docs/outputs/quickwit.md +++ b/docs/outputs/quickwit.md @@ -15,7 +15,7 @@ | Setting | Env var | Default value | Description | | ------------------------------- | ------------------------------- | ---------------- | -------------------------------------------------------------------------------------------------------------------------------------- | -| `quickwit.hosport` | `QUICKWIT_HOSTPORT` | | http://{domain or ip}:{port}, if not empty, Quickwit output is **enabled** | +| `quickwit.hostport` | `QUICKWIT_HOSTPORT` | | http://{domain or ip}:{port}, if not empty, Quickwit output is **enabled** | | `quickwit.apiendpoint` | `QUICKWIT_APIENDPOINT` | `api/v1` | API endpoint (containing the API version, overideable in case of quickwit behind a reverse proxy with URL rewriting) | | `quickwit.index` | `QUICKWIT_INDEX` | `falco` | Index | | `quickwit.version` | `QUICKWIT_VERSION` | `0.7` | Version of quickwit | diff --git a/go.mod b/go.mod index fc13c7ce0..34f47a2f4 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/embano1/memlog v0.4.6 github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43 github.com/emersion/go-smtp v0.21.3 + github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/googleapis/gax-go/v2 v2.12.5 github.com/jackc/pgx/v5 v5.6.0 diff --git a/internal/pkg/batcher/batcher.go b/internal/pkg/batcher/batcher.go new file mode 100644 index 000000000..bf7eef732 --- /dev/null +++ b/internal/pkg/batcher/batcher.go @@ -0,0 +1,132 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 + +package batcher + +import ( + "bytes" + "encoding/json" + "sync" + "time" + + "github.com/falcosecurity/falcosidekick/types" +) + +const ( + defaultBatchSize = 5 * 1024 * 1024 // max batch size in bytes, 5MB by default + defaultFlushInterval = time.Second +) + +type CallbackFunc func(falcoPayloads []types.FalcoPayload, serialized []byte) + +type OptionFunc func(b *Batcher) + +// MarshalFunc is a callback that allows the user of the batcher to overwrite the default JSON marshalling +type MarshalFunc func(payload types.FalcoPayload) ([]byte, error) + +// Batcher A simple generic implementation of Falco payloads batching +// Batching can be configured by the batchSize which is a max number of payloads in the batch or the flushInterval. +// The callback function is called when the number of payloads reaches the batchSize or upon the flushInterval +type Batcher struct { + batchSize int + flushInterval time.Duration + + callbackFn CallbackFunc + marshalFn MarshalFunc + + mx sync.Mutex + + pending bytes.Buffer + // Keeping the original payloads for errors resolution + pendingPayloads []types.FalcoPayload + + curTimer *time.Timer +} + +func New(opts ...OptionFunc) *Batcher { + b := &Batcher{ + batchSize: defaultBatchSize, + flushInterval: defaultFlushInterval, + callbackFn: func(falcoPayloads []types.FalcoPayload, batch []byte) {}, + marshalFn: jsonMarshal, + } + + for _, opt := range opts { + opt(b) + } + + return b +} + +func WithBatchSize(sz int) OptionFunc { + return func(b *Batcher) { + b.batchSize = sz + } +} + +func WithFlushInterval(interval time.Duration) OptionFunc { + return func(b *Batcher) { + b.flushInterval = interval + } +} + +func WithCallback(cb CallbackFunc) OptionFunc { + return func(b *Batcher) { + b.callbackFn = cb + } +} + +func WithMarshal(fn MarshalFunc) OptionFunc { + return func(b *Batcher) { + b.marshalFn = fn + } +} + +func (b *Batcher) Push(falcopayload types.FalcoPayload) error { + b.mx.Lock() + defer b.mx.Unlock() + + data, err := b.marshalFn(falcopayload) + if err != nil { + return err + } + if b.pending.Len() == 0 { + b.scheduleFlushInterval() + } else if b.pending.Len()+len(data) > b.batchSize { + b.flush() + b.scheduleFlushInterval() + } + _, _ = b.pending.Write(data) + b.pendingPayloads = append(b.pendingPayloads, falcopayload) + return nil +} + +func (b *Batcher) scheduleFlushInterval() { + if b.curTimer != nil { + b.curTimer.Stop() + } + b.curTimer = time.AfterFunc(b.flushInterval, b.flushOnTimer) +} + +func (b *Batcher) flushOnTimer() { + b.mx.Lock() + defer b.mx.Unlock() + b.flush() +} + +func (b *Batcher) flush() { + if b.pending.Len() == 0 { + return + } + + serialized := b.pending.Bytes() + falcoPayloads := b.pendingPayloads + + b.pending = bytes.Buffer{} + b.pendingPayloads = nil + b.callbackFn(falcoPayloads, serialized) +} + +// jsonMarshal default marshal function +func jsonMarshal(payload types.FalcoPayload) ([]byte, error) { + return json.Marshal(payload) +} diff --git a/internal/pkg/batcher/batcher_test.go b/internal/pkg/batcher/batcher_test.go new file mode 100644 index 000000000..314db15a4 --- /dev/null +++ b/internal/pkg/batcher/batcher_test.go @@ -0,0 +1,79 @@ +// SPDX-License-Identifier: MIT OR Apache-2.0 + +package batcher + +import ( + "encoding/json" + "sync" + "testing" + "time" + + "github.com/falcosecurity/falcosidekick/types" + + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" +) + +func TestElasticsearchBatcher(t *testing.T) { + const ( + batchSize = 1234 + testCount = 100 + flushInterval = 300 * time.Millisecond + ) + + // Just to emulated similar payload for testing, not strictly needed + type eSPayload struct { + types.FalcoPayload + Timestamp time.Time `json:"@timestamp"` + } + + marshalFunc := func(payload types.FalcoPayload) ([]byte, error) { + return json.Marshal(eSPayload{FalcoPayload: payload, Timestamp: payload.Time}) + } + + var wantBatches, gotBatches [][]byte + + var mx sync.Mutex + batcher := New( + WithBatchSize(batchSize), + WithFlushInterval(500*time.Millisecond), + WithMarshal(marshalFunc), + WithCallback(func(falcoPayloads []types.FalcoPayload, data []byte) { + mx.Lock() + defer mx.Unlock() + gotBatches = append(gotBatches, data) + })) + + var currentBatch []byte + for i := 0; i < testCount; i++ { + payload := types.FalcoPayload{UUID: uuid.Must(uuid.NewV7()).String()} + data, err := marshalFunc(payload) + if err != nil { + t.Fatal(err) + } + + if len(currentBatch)+len(data) > batchSize { + wantBatches = append(wantBatches, currentBatch) + currentBatch = nil + } + + currentBatch = append(currentBatch, data...) + + err = batcher.Push(payload) + if err != nil { + t.Fatal(err) + } + } + wantBatches = append(wantBatches, currentBatch) + + // give it time to flush + time.Sleep(flushInterval * 2) + + mx.Lock() + defer mx.Unlock() + diff := cmp.Diff(wantBatches, gotBatches) + if diff != "" { + t.Fatal(diff) + } + +} diff --git a/main.go b/main.go index cb5431827..47f142449 100644 --- a/main.go +++ b/main.go @@ -229,8 +229,7 @@ func init() { if config.Elasticsearch.HostPort != "" { var err error - endpointUrl := fmt.Sprintf("%s/%s/%s", config.Elasticsearch.HostPort, config.Elasticsearch.Index, config.Elasticsearch.Type) - elasticsearchClient, err = outputs.NewClient("Elasticsearch", endpointUrl, config.Elasticsearch.CommonConfig, *initClientArgs) + elasticsearchClient, err = outputs.NewElasticsearchClient(*initClientArgs) if err != nil { config.Elasticsearch.HostPort = "" } else { diff --git a/outputs/client.go b/outputs/client.go index 4c1dd99b6..acf7959a6 100644 --- a/outputs/client.go +++ b/outputs/client.go @@ -40,6 +40,7 @@ import ( timescaledb "github.com/jackc/pgx/v5/pgxpool" redis "github.com/redis/go-redis/v9" + "github.com/falcosecurity/falcosidekick/internal/pkg/batcher" "github.com/falcosecurity/falcosidekick/types" ) @@ -120,14 +121,25 @@ type Client struct { TimescaleDBClient *timescaledb.Pool RedisClient *redis.Client + // Enable gzip compression + EnableCompression bool + // cached http.Client httpcli *http.Client // lock for http client creation - mx sync.Mutex + mx sync.Mutex + + // common config cfg types.CommonConfig + // init once on first request initOnce sync.Once - sem *semaphore.Weighted + + // maxconcurrent requests limiter + sem *semaphore.Weighted + + // batcher + batcher *batcher.Batcher } // InitClient returns a new output.Client for accessing the different API. @@ -163,22 +175,41 @@ type RequestOptionFunc func(req *http.Request) // Get get a payload from Output with GET http method. func (c *Client) Get(opts ...RequestOptionFunc) error { - return c.sendRequest("GET", nil, opts...) + return c.sendRequest("GET", nil, nil, opts...) } // Post sends event (payload) to Output with POST http method. func (c *Client) Post(payload interface{}, opts ...RequestOptionFunc) error { - return c.sendRequest("POST", payload, opts...) + return c.sendRequest("POST", payload, nil, opts...) +} + +// PostWithResponse sends event (payload) to Output with POST http method and returns a stringified response body +// This is added in order to get the response body and avoid breaking any other code that relies on the Post implmentation +func (c *Client) PostWithResponse(payload interface{}, opts ...RequestOptionFunc) (string, error) { + var responseBody string + + err := c.sendRequest("POST", payload, &responseBody, opts...) + + return responseBody, err } // Put sends event (payload) to Output with PUT http method. func (c *Client) Put(payload interface{}, opts ...RequestOptionFunc) error { - return c.sendRequest("PUT", payload, opts...) + return c.sendRequest("PUT", payload, nil, opts...) } // Get the response body as inlined string -func getInlinedBodyAsString(resp *http.Response) string { +func (c *Client) getInlinedBodyAsString(resp *http.Response) string { body, _ := io.ReadAll(resp.Body) + contentEncoding := resp.Header.Get("Content-Encoding") + if contentEncoding == "gzip" { + dec, err := decompressData(body) + if err != nil { + log.Printf("[INFO] : %v - Failed to decompress response: %v", c.OutputType, err) + return "" + } + body = dec + } contentType := resp.Header.Get("Content-Type") if contentType == "application/json" { var compactedBody bytes.Buffer @@ -191,16 +222,49 @@ func getInlinedBodyAsString(resp *http.Response) string { return string(body) } +func compressData(reader io.Reader) ([]byte, error) { + var compressed bytes.Buffer + gw := gzip.NewWriter(&compressed) + + if _, err := io.Copy(gw, reader); err != nil { + return nil, err + } + if err := gw.Close(); err != nil { + return nil, err + } + return compressed.Bytes(), nil +} + +func decompressData(compressed []byte) (data []byte, err error) { + gr, err := gzip.NewReader(bytes.NewBuffer(compressed)) + if err != nil { + return nil, err + } + defer func() { + err = errors.Join(err, gr.Close()) + }() + + data, err = io.ReadAll(gr) + if err != nil { + return nil, err + } + + return data, nil +} + // Post sends event (payload) to Output. -func (c *Client) sendRequest(method string, payload interface{}, opts ...RequestOptionFunc) error { +// Returns stringified response body or error +func (c *Client) sendRequest(method string, payload interface{}, responseBody *string, opts ...RequestOptionFunc) error { // Initialize the semaphore once here // because currently there are multiple code paths // where the client is created directly without using NewClient constructor c.initOnce.Do(func() { if c.cfg.MaxConcurrentRequests == 0 { c.sem = semaphore.NewWeighted(math.MaxInt64) + log.Printf("[INFO] : %v - Max concurrent requests: unlimited", c.OutputType) } else { c.sem = semaphore.NewWeighted(int64(c.cfg.MaxConcurrentRequests)) + log.Printf("[INFO] : %v - Max concurrent requests: %v", c.OutputType, c.cfg.MaxConcurrentRequests) } }) @@ -213,7 +277,8 @@ func (c *Client) sendRequest(method string, payload interface{}, opts ...Request }(c) body := new(bytes.Buffer) - switch payload.(type) { + var reader io.Reader = body + switch v := payload.(type) { case influxdbPayload: fmt.Fprintf(body, "%v", payload) if c.Config.Debug { @@ -231,6 +296,10 @@ func (c *Client) sendRequest(method string, payload interface{}, opts ...Request log.Printf("[DEBUG] : %v payload : %v\n", c.OutputType, debugBody) } } + case io.Reader: + reader = v + case []byte: + reader = bytes.NewBuffer(v) default: if err := json.NewEncoder(body).Encode(payload); err != nil { log.Printf("[ERROR] : %v - %s", c.OutputType, err) @@ -240,6 +309,15 @@ func (c *Client) sendRequest(method string, payload interface{}, opts ...Request } } + if c.EnableCompression { + data, err := compressData(reader) + if err != nil { + log.Printf("[ERROR] : %v - Failed to compress data: %v\n", c.OutputType, err.Error()) + return err + } + reader = bytes.NewBuffer(data) + } + client := c.httpClient() var req *http.Request @@ -247,15 +325,20 @@ func (c *Client) sendRequest(method string, payload interface{}, opts ...Request if method == "GET" { req, err = http.NewRequest(method, c.EndpointURL.String(), nil) } else { - req, err = http.NewRequest(method, c.EndpointURL.String(), body) + req, err = http.NewRequest(method, c.EndpointURL.String(), reader) } if err != nil { log.Printf("[ERROR] : %v - %v\n", c.OutputType, err.Error()) return err } - req.Header.Add(ContentTypeHeaderKey, c.ContentType) - req.Header.Add(UserAgentHeaderKey, UserAgentHeaderValue) + req.Header.Set(ContentTypeHeaderKey, c.ContentType) + req.Header.Set(UserAgentHeaderKey, UserAgentHeaderValue) + + if c.EnableCompression { + req.Header.Set("Content-Encoding", "gzip") + req.Header.Set("Accept-Encoding", "gzip") + } // Call request options functions // Allows the clients to adjust request as needed @@ -264,7 +347,7 @@ func (c *Client) sendRequest(method string, payload interface{}, opts ...Request } // Using the background context for now - // TODO: Eventually pass the proper context to sendRequest, and pass it to NewRequest call as well + // TODO: Eventually pass the proper context to sendRequest, add pass it to NewRequest call as well // in order to make the requests cancellable ctx := context.Background() err = c.sem.Acquire(ctx, 1) @@ -288,36 +371,49 @@ func (c *Client) sendRequest(method string, payload interface{}, opts ...Request switch resp.StatusCode { case http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent: //200, 201, 202, 204 log.Printf("[INFO] : %v - %v OK (%v)\n", c.OutputType, method, resp.StatusCode) - if ot := c.OutputType; ot == Kubeless || ot == Openfaas || ot == Fission { - log.Printf("[INFO] : %v - Function Response : %s\n", ot, getInlinedBodyAsString(resp)) + ot := c.OutputType + logResponse := ot == Kubeless || ot == Openfaas || ot == Fission + if responseBody != nil || logResponse { + s := c.getInlinedBodyAsString(resp) + if responseBody != nil { + // In some cases now we need to capture the response on 200 + // For example the Elasticsearch output bulk request that returns 200 + // even when some items in the bulk failed + *responseBody = s + } + if logResponse { + log.Printf("[INFO] : %v - Function Response : %s\n", ot, s) + } } return nil case http.StatusBadRequest: //400 - msg := getInlinedBodyAsString(resp) + msg := c.getInlinedBodyAsString(resp) log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrHeaderMissing, resp.StatusCode, msg) if msg != "" { return errors.New(msg) } return ErrHeaderMissing case http.StatusUnauthorized: //401 - log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrClientAuthenticationError, resp.StatusCode, getInlinedBodyAsString(resp)) + log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrClientAuthenticationError, resp.StatusCode, c.getInlinedBodyAsString(resp)) return ErrClientAuthenticationError case http.StatusForbidden: //403 - log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrForbidden, resp.StatusCode, getInlinedBodyAsString(resp)) + log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrForbidden, resp.StatusCode, c.getInlinedBodyAsString(resp)) return ErrForbidden case http.StatusNotFound: //404 - log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrNotFound, resp.StatusCode, getInlinedBodyAsString(resp)) + log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrNotFound, resp.StatusCode, c.getInlinedBodyAsString(resp)) return ErrNotFound case http.StatusUnprocessableEntity: //422 - log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrUnprocessableEntityError, resp.StatusCode, getInlinedBodyAsString(resp)) + log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrUnprocessableEntityError, resp.StatusCode, c.getInlinedBodyAsString(resp)) return ErrUnprocessableEntityError case http.StatusTooManyRequests: //429 - log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrTooManyRequest, resp.StatusCode, getInlinedBodyAsString(resp)) + log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrTooManyRequest, resp.StatusCode, c.getInlinedBodyAsString(resp)) return ErrTooManyRequest case http.StatusInternalServerError: //500 log.Printf("[ERROR] : %v - %v (%v)\n", c.OutputType, ErrTooManyRequest, resp.StatusCode) return ErrInternalServer case http.StatusBadGateway: //502 + msg := c.getInlinedBodyAsString(resp) + fmt.Println(msg) log.Printf("[ERROR] : %v - %v (%v)\n", c.OutputType, ErrTooManyRequest, resp.StatusCode) return ErrBadGateway default: diff --git a/outputs/elasticsearch.go b/outputs/elasticsearch.go index ceaa2555b..0eee4c69f 100644 --- a/outputs/elasticsearch.go +++ b/outputs/elasticsearch.go @@ -3,6 +3,7 @@ package outputs import ( + "bytes" "encoding/json" "fmt" "log" @@ -12,6 +13,7 @@ import ( "strings" "time" + "github.com/falcosecurity/falcosidekick/internal/pkg/batcher" "github.com/falcosecurity/falcosidekick/types" ) @@ -20,52 +22,88 @@ type eSPayload struct { Timestamp time.Time `json:"@timestamp"` } -type mappingError struct { +type esResponse struct { Error struct { - RootCause []struct { - Type string `json:"type"` - Reason string `json:"reason"` - } `json:"root_cause"` Type string `json:"type"` Reason string `json:"reason"` } `json:"error"` Status int `json:"status"` } -// ElasticsearchPost posts event to Elasticsearch +type esBulkResponse struct { + Errors bool `json:"errors"` + Items []esItemResponse `json:"items"` +} + +type esItemResponse struct { + Create esResponse `json:"create"` +} + +func NewElasticsearchClient(params types.InitClientArgs) (*Client, error) { + esCfg := params.Config.Elasticsearch + endpointUrl := fmt.Sprintf("%s/%s/%s", esCfg.HostPort, esCfg.Index, esCfg.Type) + c, err := NewClient("Elasticsearch", endpointUrl, esCfg.CommonConfig, params) + if err != nil { + return nil, err + } + + if esCfg.Batching.Enabled { + log.Printf("[INFO] : %v - Batching enabled: %v max bytes, %v interval\n", c.OutputType, esCfg.Batching.BatchSize, esCfg.Batching.FlushInterval) + callbackFn := func(falcoPayloads []types.FalcoPayload, data []byte) { + go c.elasticsearchPost("", data, falcoPayloads...) + } + c.batcher = batcher.New( + batcher.WithBatchSize(esCfg.Batching.BatchSize), + batcher.WithFlushInterval(esCfg.Batching.FlushInterval), + batcher.WithMarshal(c.marshalESBulkPayload), + batcher.WithCallback(callbackFn), + ) + } + if esCfg.EnableCompression { + c.EnableCompression = true + log.Printf("[INFO] : %v - Compression enabled\n", c.OutputType) + } + + return c, nil +} + func (c *Client) ElasticsearchPost(falcopayload types.FalcoPayload) { - c.Stats.Elasticsearch.Add(Total, 1) + if c.Config.Elasticsearch.Batching.Enabled { + c.batcher.Push(falcopayload) + return + } + + payload, err := c.marshalESPayload(falcopayload) + if err != nil { + log.Printf("[ERROR] : %v - Failed to marshal payload: %v\n", c.OutputType, err) + } + + c.elasticsearchPost(c.getIndex(), payload, falcopayload) +} + +var esReasonMappingFieldsRegex *regexp.Regexp = regexp.MustCompile(`\[\w+(\.\w+)+\]`) + +// ElasticsearchPost posts event to Elasticsearch +func (c *Client) elasticsearchPost(index string, payload []byte, falcoPayloads ...types.FalcoPayload) { + sz := int64(len(falcoPayloads)) + c.Stats.Elasticsearch.Add(Total, sz) - current := time.Now() var eURL string - switch c.Config.Elasticsearch.Suffix { - case None: - eURL = c.Config.Elasticsearch.HostPort + "/" + c.Config.Elasticsearch.Index + "/" + c.Config.Elasticsearch.Type - case "monthly": - eURL = c.Config.Elasticsearch.HostPort + "/" + c.Config.Elasticsearch.Index + "-" + current.Format("2006.01") + "/" + c.Config.Elasticsearch.Type - case "annually": - eURL = c.Config.Elasticsearch.HostPort + "/" + c.Config.Elasticsearch.Index + "-" + current.Format("2006") + "/" + c.Config.Elasticsearch.Type - default: - eURL = c.Config.Elasticsearch.HostPort + "/" + c.Config.Elasticsearch.Index + "-" + current.Format("2006.01.02") + "/" + c.Config.Elasticsearch.Type + if index == "" { + eURL = c.Config.Elasticsearch.HostPort + "/_bulk" + } else { + eURL = c.Config.Elasticsearch.HostPort + "/" + index + "/" + c.Config.Elasticsearch.Type } endpointURL, err := url.Parse(eURL) if err != nil { - c.setElasticSearchErrorMetrics() - log.Printf("[ERROR] : %v - %v\n", c.OutputType, err.Error()) + c.setElasticSearchErrorMetrics(sz) + log.Printf("[ERROR] : %v - %v\n", c.OutputType, err) return } c.EndpointURL = endpointURL - payload := eSPayload{FalcoPayload: falcopayload, Timestamp: falcopayload.Time} - if c.Config.Elasticsearch.FlattenFields || c.Config.Elasticsearch.CreateIndexTemplate { - for i, j := range payload.OutputFields { - payload.OutputFields[strings.ReplaceAll(i, ".", "_")] = j - delete(payload.OutputFields, i) - } - } - reqOpt := func(req *http.Request) { if c.Config.Elasticsearch.Username != "" && c.Config.Elasticsearch.Password != "" { req.SetBasicAuth(c.Config.Elasticsearch.Username, c.Config.Elasticsearch.Password) @@ -76,51 +114,102 @@ func (c *Client) ElasticsearchPost(falcopayload types.FalcoPayload) { } } - err = c.Post(payload, reqOpt) - if err != nil { - var mappingErr mappingError - if err2 := json.Unmarshal([]byte(err.Error()), &mappingErr); err2 != nil { - c.setElasticSearchErrorMetrics() - return + var response string + if c.Config.Elasticsearch.Batching.Enabled { + // Use PostWithResponse call when batching is enabled in order to capture response body on 200 + res, err := c.PostWithResponse(payload, reqOpt) + if err != nil { + response = err.Error() + } else { + response = res } - if mappingErr.Error.Type == "document_parsing_exception" { - reg := regexp.MustCompile(`\[\w+(\.\w+)+\]`) - k := reg.FindStringSubmatch(mappingErr.Error.Reason) - if len(k) == 0 { - c.setElasticSearchErrorMetrics() + } else { + // Use regular Post call, this avoid parsing response on http status 200 + err = c.Post(payload, reqOpt) + if err != nil { + response = err.Error() + } + } + + if response != "" { + if c.Config.Elasticsearch.Batching.Enabled { + var resp esBulkResponse + if err2 := json.Unmarshal([]byte(response), &resp); err2 != nil { + c.setElasticSearchErrorMetrics(sz) return } - if !strings.Contains(k[0], "output_fields") { - c.setElasticSearchErrorMetrics() + if len(resp.Items) != len(falcoPayloads) { + log.Printf("[ERROR] : %v - mismatched %v responses with %v request payloads\n", c.OutputType, len(resp.Items), len(falcoPayloads)) + c.setElasticSearchErrorMetrics(sz) return } - s := strings.ReplaceAll(k[0], "[output_fields.", "") - s = strings.ReplaceAll(s, "]", "") - for i := range payload.OutputFields { - if strings.HasPrefix(i, s) { - delete(payload.OutputFields, i) + // Check errors. Not using the mapping errors retry approach for batched/bulk requests + // Only mark set the errors and stats + if resp.Errors { + failed := int64(0) + for _, item := range resp.Items { + switch item.Create.Status { + case http.StatusOK, http.StatusCreated: + default: + failed++ + } } + c.setElasticSearchErrorMetrics(failed) + // Set success sz that is reported at the end of this function + sz -= failed } - log.Printf("[INFO] : %v - %v\n", c.OutputType, "attempt to POST again the payload without the wrong field") - err = c.Post(payload, reqOpt) - if err != nil { - c.setElasticSearchErrorMetrics() + } else { + // Slightly refactored the original approach to mapping errors, but logic is still the same + // The Request is retried only once without the field that can't be mapped. + // One of the problems with this approach is that if the mapping has two "unmappable" fields + // only the first one is returned with the error and removed from the retried request. + // Do we need to retry without the field? Do we need to keep retrying and removing fields until it succeeds? + var resp esResponse + if err2 := json.Unmarshal([]byte(response), &resp); err2 != nil { + c.setElasticSearchErrorMetrics(sz) return } + + payload := falcoPayloads[0] + + if resp.Error.Type == "document_parsing_exception" { + k := esReasonMappingFieldsRegex.FindStringSubmatch(resp.Error.Reason) + if len(k) == 0 { + c.setElasticSearchErrorMetrics(sz) + return + } + if !strings.Contains(k[0], "output_fields") { + c.setElasticSearchErrorMetrics(sz) + return + } + s := strings.ReplaceAll(k[0], "[output_fields.", "") + s = strings.ReplaceAll(s, "]", "") + for i := range payload.OutputFields { + if strings.HasPrefix(i, s) { + delete(payload.OutputFields, i) + } + } + log.Printf("[INFO] : %v - %v\n", c.OutputType, "attempt to POST again the payload without the wrong field") + err = c.Post(payload, reqOpt) + if err != nil { + c.setElasticSearchErrorMetrics(sz) + return + } + } } } // Setting the success status - go c.CountMetric(Outputs, 1, []string{"output:elasticsearch", "status:ok"}) - c.Stats.Elasticsearch.Add(OK, 1) - c.PromStats.Outputs.With(map[string]string{"destination": "elasticsearch", "status": OK}).Inc() + go c.CountMetric(Outputs, sz, []string{"output:elasticsearch", "status:ok"}) + c.Stats.Elasticsearch.Add(OK, sz) + c.PromStats.Outputs.With(map[string]string{"destination": "elasticsearch", "status": OK}).Add(float64(sz)) } func (c *Client) ElasticsearchCreateIndexTemplate(config types.ElasticsearchOutputConfig) error { d := c indexExists, err := c.isIndexTemplateExist(config) if err != nil { - log.Printf("[ERROR] : %v - %v\n", c.OutputType, err.Error()) + log.Printf("[ERROR] : %v - %v\n", c.OutputType, err) return err } if indexExists { @@ -138,12 +227,12 @@ func (c *Client) ElasticsearchCreateIndexTemplate(config types.ElasticsearchOutp m = strings.ReplaceAll(m, "${REPLICAS}", fmt.Sprintf("%v", config.NumberOfReplicas)) j := make(map[string]interface{}) if err := json.Unmarshal([]byte(m), &j); err != nil { - log.Printf("[ERROR] : %v - %v\n", c.OutputType, err.Error()) + log.Printf("[ERROR] : %v - %v\n", c.OutputType, err) return err } // create the index template by PUT if d.Put(j) != nil { - log.Printf("[ERROR] : %v - %v\n", c.OutputType, err.Error()) + log.Printf("[ERROR] : %v - %v\n", c.OutputType, err) return err } @@ -168,8 +257,59 @@ func (c *Client) isIndexTemplateExist(config types.ElasticsearchOutputConfig) (b } // setElasticSearchErrorMetrics set the error stats -func (c *Client) setElasticSearchErrorMetrics() { - go c.CountMetric(Outputs, 1, []string{"output:elasticsearch", "status:error"}) - c.Stats.Elasticsearch.Add(Error, 1) - c.PromStats.Outputs.With(map[string]string{"destination": "elasticsearch", "status": Error}).Inc() +func (c *Client) setElasticSearchErrorMetrics(n int64) { + go c.CountMetric(Outputs, n, []string{"output:elasticsearch", "status:error"}) + c.Stats.Elasticsearch.Add(Error, n) + c.PromStats.Outputs.With(map[string]string{"destination": "elasticsearch", "status": Error}).Add(float64(n)) +} + +func (c *Client) buildESPayload(falcopayload types.FalcoPayload) eSPayload { + payload := eSPayload{FalcoPayload: falcopayload, Timestamp: falcopayload.Time} + + if c.Config.Elasticsearch.FlattenFields || c.Config.Elasticsearch.CreateIndexTemplate { + for i, j := range payload.OutputFields { + payload.OutputFields[strings.ReplaceAll(i, ".", "_")] = j + delete(payload.OutputFields, i) + } + } + return payload +} + +func (c *Client) marshalESPayload(falcopayload types.FalcoPayload) ([]byte, error) { + return json.Marshal(c.buildESPayload(falcopayload)) +} + +func (c *Client) marshalESBulkPayload(falcopayload types.FalcoPayload) ([]byte, error) { + body, err := c.marshalESPayload(falcopayload) + if err != nil { + return nil, err + } + + var buf bytes.Buffer + _, _ = buf.WriteString(`{"create":{`) + _, _ = buf.WriteString(`"_index":"`) + _, _ = buf.WriteString(c.getIndex()) + _, _ = buf.WriteString("\"}}\n") + + _, _ = buf.Write(body) + _, _ = buf.WriteRune('\n') + + return buf.Bytes(), nil +} + +func (c *Client) getIndex() string { + var index string + + current := time.Now() + switch c.Config.Elasticsearch.Suffix { + case None: + index = c.Config.Elasticsearch.Index + case "monthly": + index = c.Config.Elasticsearch.Index + "-" + current.Format("2006.01") + case "annually": + index = c.Config.Elasticsearch.Index + "-" + current.Format("2006") + default: + index = c.Config.Elasticsearch.Index + "-" + current.Format("2006.01.02") + } + return index } diff --git a/outputs/quickwit.go b/outputs/quickwit.go index c06713ff5..dfab69f36 100644 --- a/outputs/quickwit.go +++ b/outputs/quickwit.go @@ -54,7 +54,7 @@ func (c *Client) checkQuickwitIndexAlreadyExists(args types.InitClientArgs) bool return false } - if nil != quickwitCheckClient.sendRequest("GET", "") { + if nil != quickwitCheckClient.Get() { return false } diff --git a/types/types.go b/types/types.go index 8dfc691e4..bc50a9ef3 100644 --- a/types/types.go +++ b/types/types.go @@ -261,6 +261,17 @@ type AlertmanagerOutputConfig struct { CustomHeaders map[string]string } +const ( + DefaultBatchSize = 5 * 1024 * 1024 // 5 MB + DefaultFlushInterval = time.Second +) + +type BatchingConfig struct { + Enabled bool `json:"enabled" yaml:"enabled"` + BatchSize int `json:"batchsize" yaml:"batchsize"` + FlushInterval time.Duration `json:"flushinterval" yaml:"flushinterval"` +} + type ElasticsearchOutputConfig struct { CommonConfig `mapstructure:",squash"` HostPort string @@ -275,6 +286,8 @@ type ElasticsearchOutputConfig struct { NumberOfShards int NumberOfReplicas int CustomHeaders map[string]string + Batching BatchingConfig + EnableCompression bool } type QuickwitOutputConfig struct {