diff --git a/pkg/acquisition/acquisition.go b/pkg/acquisition/acquisition.go index 1ad385105d3..ef5a413b91f 100644 --- a/pkg/acquisition/acquisition.go +++ b/pkg/acquisition/acquisition.go @@ -337,6 +337,20 @@ func GetMetrics(sources []DataSource, aggregated bool) error { return nil } +// There's no need for an actual deep copy +// The event is almost empty, we are mostly interested in allocating new maps for Parsed/Meta/... +func copyEvent(evt types.Event, line string) types.Event { + evtCopy := types.MakeEvent(evt.ExpectMode == types.TIMEMACHINE, evt.Type, evt.Process) + evtCopy.Line = evt.Line + evtCopy.Line.Raw = line + evtCopy.Line.Labels = make(map[string]string) + for k, v := range evt.Line.Labels { + evtCopy.Line.Labels[k] = v + } + + return evtCopy +} + func transform(transformChan chan types.Event, output chan types.Event, AcquisTomb *tomb.Tomb, transformRuntime *vm.Program, logger *log.Entry) { defer trace.CatchPanic("crowdsec/acquis") logger.Infof("transformer started") @@ -363,8 +377,7 @@ func transform(transformChan chan types.Event, output chan types.Event, AcquisTo switch v := out.(type) { case string: logger.Tracef("transform expression returned %s", v) - evt.Line.Raw = v - output <- evt + output <- copyEvent(evt, v) case []interface{}: logger.Tracef("transform expression returned %v", v) //nolint:asasalint // We actually want to log the slice content @@ -373,19 +386,16 @@ func transform(transformChan chan types.Event, output chan types.Event, AcquisTo if !ok { logger.Errorf("transform expression returned []interface{}, but cannot assert an element to string") output <- evt - continue } - evt.Line.Raw = l - output <- evt + output <- copyEvent(evt, l) } case []string: logger.Tracef("transform expression returned %v", v) for _, line := range v { - evt.Line.Raw = line - output <- evt + output <- copyEvent(evt, line) } default: logger.Errorf("transform expression returned an invalid type %T, sending event as-is", out) diff --git a/pkg/acquisition/http.go b/pkg/acquisition/http.go new file mode 100644 index 00000000000..59745772b62 --- /dev/null +++ b/pkg/acquisition/http.go @@ -0,0 +1,12 @@ +//go:build !no_datasource_http + +package acquisition + +import ( + httpacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/http" +) + +//nolint:gochecknoinits +func init() { + registerDataSource("http", func() DataSource { return &httpacquisition.HTTPSource{} }) +} diff --git a/pkg/acquisition/modules/appsec/utils.go b/pkg/acquisition/modules/appsec/utils.go index 4fb1a979d14..4f890dd513c 100644 --- a/pkg/acquisition/modules/appsec/utils.go +++ b/pkg/acquisition/modules/appsec/utils.go @@ -195,10 +195,7 @@ func AppsecEventGeneration(inEvt types.Event) (*types.Event, error) { } func EventFromRequest(r *appsec.ParsedRequest, labels map[string]string) (types.Event, error) { - evt := types.Event{} - // we might want to change this based on in-band vs out-of-band ? - evt.Type = types.LOG - evt.ExpectMode = types.LIVE + evt := types.MakeEvent(false, types.LOG, true) // def needs fixing evt.Stage = "s00-raw" evt.Parsed = map[string]string{ diff --git a/pkg/acquisition/modules/cloudwatch/cloudwatch.go b/pkg/acquisition/modules/cloudwatch/cloudwatch.go index 2df70b3312b..ba267c9050b 100644 --- a/pkg/acquisition/modules/cloudwatch/cloudwatch.go +++ b/pkg/acquisition/modules/cloudwatch/cloudwatch.go @@ -710,7 +710,7 @@ func (cw *CloudwatchSource) CatLogStream(ctx context.Context, cfg *LogStreamTail func cwLogToEvent(log *cloudwatchlogs.OutputLogEvent, cfg *LogStreamTailConfig) (types.Event, error) { l := types.Line{} - evt := types.Event{} + evt := types.MakeEvent(cfg.ExpectMode == types.TIMEMACHINE, types.LOG, true) if log.Message == nil { return evt, errors.New("nil message") } @@ -726,9 +726,6 @@ func cwLogToEvent(log *cloudwatchlogs.OutputLogEvent, cfg *LogStreamTailConfig) l.Process = true l.Module = "cloudwatch" evt.Line = l - evt.Process = true - evt.Type = types.LOG - evt.ExpectMode = cfg.ExpectMode cfg.logger.Debugf("returned event labels : %+v", evt.Line.Labels) return evt, nil } diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 2f79d4dcee6..b27255ec13f 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -334,7 +334,10 @@ func (d *DockerSource) OneShotAcquisition(ctx context.Context, out chan types.Ev if d.metricsLevel != configuration.METRICS_NONE { linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc() } - evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} + evt := types.MakeEvent(true, types.LOG, true) + evt.Line = l + evt.Process = true + evt.Type = types.LOG out <- evt d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw) } @@ -579,12 +582,8 @@ func (d *DockerSource) TailDocker(ctx context.Context, container *ContainerConfi l.Src = container.Name l.Process = true l.Module = d.GetName() - var evt types.Event - if !d.Config.UseTimeMachine { - evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE} - } else { - evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} - } + evt := types.MakeEvent(d.Config.UseTimeMachine, types.LOG, true) + evt.Line = l linesRead.With(prometheus.Labels{"source": container.Name}).Inc() outChan <- evt d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw) diff --git a/pkg/acquisition/modules/file/file.go b/pkg/acquisition/modules/file/file.go index f752d04aada..9f439b0c82e 100644 --- a/pkg/acquisition/modules/file/file.go +++ b/pkg/acquisition/modules/file/file.go @@ -621,11 +621,9 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai // we're tailing, it must be real time logs logger.Debugf("pushing %+v", l) - expectMode := types.LIVE - if f.config.UseTimeMachine { - expectMode = types.TIMEMACHINE - } - out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: expectMode} + evt := types.MakeEvent(f.config.UseTimeMachine, types.LOG, true) + evt.Line = l + out <- evt } } } @@ -684,7 +682,7 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom linesRead.With(prometheus.Labels{"source": filename}).Inc() // we're reading logs at once, it must be time-machine buckets - out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} + out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE, Unmarshaled: make(map[string]interface{})} } } diff --git a/pkg/acquisition/modules/http/http.go b/pkg/acquisition/modules/http/http.go new file mode 100644 index 00000000000..98af134c84e --- /dev/null +++ b/pkg/acquisition/modules/http/http.go @@ -0,0 +1,416 @@ +package httpacquisition + +import ( + "compress/gzip" + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "os" + "time" + + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + + "gopkg.in/tomb.v2" + "gopkg.in/yaml.v3" + + "github.com/crowdsecurity/go-cs-lib/trace" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + "github.com/crowdsecurity/crowdsec/pkg/types" +) + +var ( + dataSourceName = "http" +) + +var linesRead = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "cs_httpsource_hits_total", + Help: "Total lines that were read from http source", + }, + []string{"path", "src"}) + +type HttpConfiguration struct { + //IPFilter []string `yaml:"ip_filter"` + //ChunkSize *int64 `yaml:"chunk_size"` + ListenAddr string `yaml:"listen_addr"` + Path string `yaml:"path"` + AuthType string `yaml:"auth_type"` + BasicAuth *BasicAuthConfig `yaml:"basic_auth"` + Headers *map[string]string `yaml:"headers"` + TLS *TLSConfig `yaml:"tls"` + CustomStatusCode *int `yaml:"custom_status_code"` + CustomHeaders *map[string]string `yaml:"custom_headers"` + MaxBodySize *int64 `yaml:"max_body_size"` + Timeout *time.Duration `yaml:"timeout"` + configuration.DataSourceCommonCfg `yaml:",inline"` +} + +type BasicAuthConfig struct { + Username string `yaml:"username"` + Password string `yaml:"password"` +} + +type TLSConfig struct { + InsecureSkipVerify bool `yaml:"insecure_skip_verify"` + ServerCert string `yaml:"server_cert"` + ServerKey string `yaml:"server_key"` + CaCert string `yaml:"ca_cert"` +} + +type HTTPSource struct { + metricsLevel int + Config HttpConfiguration + logger *log.Entry + Server *http.Server +} + +func (h *HTTPSource) GetUuid() string { + return h.Config.UniqueId +} + +func (h *HTTPSource) UnmarshalConfig(yamlConfig []byte) error { + h.Config = HttpConfiguration{} + err := yaml.Unmarshal(yamlConfig, &h.Config) + if err != nil { + return fmt.Errorf("cannot parse %s datasource configuration: %w", dataSourceName, err) + } + + if h.Config.Mode == "" { + h.Config.Mode = configuration.TAIL_MODE + } + + return nil +} + +func (hc *HttpConfiguration) Validate() error { + if hc.ListenAddr == "" { + return errors.New("listen_addr is required") + } + + if hc.Path == "" { + hc.Path = "/" + } + if hc.Path[0] != '/' { + return errors.New("path must start with /") + } + + switch hc.AuthType { + case "basic_auth": + baseErr := "basic_auth is selected, but" + if hc.BasicAuth == nil { + return errors.New(baseErr + " basic_auth is not provided") + } + if hc.BasicAuth.Username == "" { + return errors.New(baseErr + " username is not provided") + } + if hc.BasicAuth.Password == "" { + return errors.New(baseErr + " password is not provided") + } + case "headers": + if hc.Headers == nil { + return errors.New("headers is selected, but headers is not provided") + } + case "mtls": + if hc.TLS == nil || hc.TLS.CaCert == "" { + return errors.New("mtls is selected, but ca_cert is not provided") + } + default: + return errors.New("invalid auth_type: must be one of basic_auth, headers, mtls") + } + + if hc.TLS != nil { + if hc.TLS.ServerCert == "" { + return errors.New("server_cert is required") + } + if hc.TLS.ServerKey == "" { + return errors.New("server_key is required") + } + } + + if hc.MaxBodySize != nil && *hc.MaxBodySize <= 0 { + return errors.New("max_body_size must be positive") + } + + /* + if hc.ChunkSize != nil && *hc.ChunkSize <= 0 { + return errors.New("chunk_size must be positive") + } + */ + + if hc.CustomStatusCode != nil { + statusText := http.StatusText(*hc.CustomStatusCode) + if statusText == "" { + return errors.New("invalid HTTP status code") + } + } + + return nil +} + +func (h *HTTPSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { + h.logger = logger + h.metricsLevel = MetricsLevel + err := h.UnmarshalConfig(yamlConfig) + if err != nil { + return err + } + + if err := h.Config.Validate(); err != nil { + return fmt.Errorf("invalid configuration: %w", err) + } + + return nil +} + +func (h *HTTPSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error { + return fmt.Errorf("%s datasource does not support command-line acquisition", dataSourceName) +} + +func (h *HTTPSource) GetMode() string { + return h.Config.Mode +} + +func (h *HTTPSource) GetName() string { + return dataSourceName +} + +func (h *HTTPSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { + return fmt.Errorf("%s datasource does not support one-shot acquisition", dataSourceName) +} + +func (h *HTTPSource) CanRun() error { + return nil +} + +func (h *HTTPSource) GetMetrics() []prometheus.Collector { + return []prometheus.Collector{linesRead} +} + +func (h *HTTPSource) GetAggregMetrics() []prometheus.Collector { + return []prometheus.Collector{linesRead} +} + +func (h *HTTPSource) Dump() interface{} { + return h +} + +func (hc *HttpConfiguration) NewTLSConfig() (*tls.Config, error) { + tlsConfig := tls.Config{ + InsecureSkipVerify: hc.TLS.InsecureSkipVerify, + } + + if hc.TLS.ServerCert != "" && hc.TLS.ServerKey != "" { + cert, err := tls.LoadX509KeyPair(hc.TLS.ServerCert, hc.TLS.ServerKey) + if err != nil { + return nil, fmt.Errorf("failed to load server cert/key: %w", err) + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + if hc.AuthType == "mtls" && hc.TLS.CaCert != "" { + caCert, err := os.ReadFile(hc.TLS.CaCert) + if err != nil { + return nil, fmt.Errorf("failed to read ca cert: %w", err) + } + + caCertPool, err := x509.SystemCertPool() + if err != nil { + return nil, fmt.Errorf("failed to load system cert pool: %w", err) + } + + if caCertPool == nil { + caCertPool = x509.NewCertPool() + } + caCertPool.AppendCertsFromPEM(caCert) + tlsConfig.ClientCAs = caCertPool + tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert + } + + return &tlsConfig, nil +} + +func authorizeRequest(r *http.Request, hc *HttpConfiguration) error { + if hc.AuthType == "basic_auth" { + username, password, ok := r.BasicAuth() + if !ok { + return errors.New("missing basic auth") + } + if username != hc.BasicAuth.Username || password != hc.BasicAuth.Password { + return errors.New("invalid basic auth") + } + } + if hc.AuthType == "headers" { + for key, value := range *hc.Headers { + if r.Header.Get(key) != value { + return errors.New("invalid headers") + } + } + } + return nil +} + +func (h *HTTPSource) processRequest(w http.ResponseWriter, r *http.Request, hc *HttpConfiguration, out chan types.Event) error { + if hc.MaxBodySize != nil && r.ContentLength > *hc.MaxBodySize { + w.WriteHeader(http.StatusRequestEntityTooLarge) + return fmt.Errorf("body size exceeds max body size: %d > %d", r.ContentLength, *hc.MaxBodySize) + } + + srcHost, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + return err + } + + defer r.Body.Close() + + reader := r.Body + + if r.Header.Get("Content-Encoding") == "gzip" { + reader, err = gzip.NewReader(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return fmt.Errorf("failed to create gzip reader: %w", err) + } + defer reader.Close() + } + + decoder := json.NewDecoder(reader) + for { + var message json.RawMessage + + if err := decoder.Decode(&message); err != nil { + if err == io.EOF { + break + } + w.WriteHeader(http.StatusBadRequest) + return fmt.Errorf("failed to decode: %w", err) + } + + line := types.Line{ + Raw: string(message), + Src: srcHost, + Time: time.Now().UTC(), + Labels: hc.Labels, + Process: true, + Module: h.GetName(), + } + + if h.metricsLevel == configuration.METRICS_AGGREGATE { + line.Src = hc.Path + } + + evt := types.MakeEvent(h.Config.UseTimeMachine, types.LOG, true) + evt.Line = line + + if h.metricsLevel == configuration.METRICS_AGGREGATE { + linesRead.With(prometheus.Labels{"path": hc.Path, "src": ""}).Inc() + } else if h.metricsLevel == configuration.METRICS_FULL { + linesRead.With(prometheus.Labels{"path": hc.Path, "src": srcHost}).Inc() + } + + h.logger.Tracef("line to send: %+v", line) + out <- evt + } + + return nil +} + +func (h *HTTPSource) RunServer(out chan types.Event, t *tomb.Tomb) error { + mux := http.NewServeMux() + mux.HandleFunc(h.Config.Path, func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + h.logger.Errorf("method not allowed: %s", r.Method) + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + if err := authorizeRequest(r, &h.Config); err != nil { + h.logger.Errorf("failed to authorize request from '%s': %s", r.RemoteAddr, err) + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + err := h.processRequest(w, r, &h.Config, out) + if err != nil { + h.logger.Errorf("failed to process request from '%s': %s", r.RemoteAddr, err) + return + } + + if h.Config.CustomHeaders != nil { + for key, value := range *h.Config.CustomHeaders { + w.Header().Set(key, value) + } + } + if h.Config.CustomStatusCode != nil { + w.WriteHeader(*h.Config.CustomStatusCode) + } else { + w.WriteHeader(http.StatusOK) + } + + w.Write([]byte("OK")) + }) + + h.Server = &http.Server{ + Addr: h.Config.ListenAddr, + Handler: mux, + } + + if h.Config.Timeout != nil { + h.Server.ReadTimeout = *h.Config.Timeout + } + + if h.Config.TLS != nil { + tlsConfig, err := h.Config.NewTLSConfig() + if err != nil { + return fmt.Errorf("failed to create tls config: %w", err) + } + h.logger.Tracef("tls config: %+v", tlsConfig) + h.Server.TLSConfig = tlsConfig + } + + t.Go(func() error { + defer trace.CatchPanic("crowdsec/acquis/http/server") + if h.Config.TLS != nil { + h.logger.Infof("start https server on %s", h.Config.ListenAddr) + err := h.Server.ListenAndServeTLS(h.Config.TLS.ServerCert, h.Config.TLS.ServerKey) + if err != nil && err != http.ErrServerClosed { + return fmt.Errorf("https server failed: %w", err) + } + } else { + h.logger.Infof("start http server on %s", h.Config.ListenAddr) + err := h.Server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + return fmt.Errorf("http server failed: %w", err) + } + } + return nil + }) + + //nolint //fp + for { + select { + case <-t.Dying(): + h.logger.Infof("%s datasource stopping", dataSourceName) + if err := h.Server.Close(); err != nil { + return fmt.Errorf("while closing %s server: %w", dataSourceName, err) + } + return nil + } + } +} + +func (h *HTTPSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { + h.logger.Debugf("start http server on %s", h.Config.ListenAddr) + + t.Go(func() error { + defer trace.CatchPanic("crowdsec/acquis/http/live") + return h.RunServer(out, t) + }) + + return nil +} diff --git a/pkg/acquisition/modules/http/http_test.go b/pkg/acquisition/modules/http/http_test.go new file mode 100644 index 00000000000..d79ef5d3869 --- /dev/null +++ b/pkg/acquisition/modules/http/http_test.go @@ -0,0 +1,887 @@ +package httpacquisition + +import ( + "compress/gzip" + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "io" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/crowdsecurity/go-cs-lib/cstest" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" +) + +const ( + testHTTPServerAddr = "http://127.0.0.1:8080" + testHTTPServerAddrTLS = "https://127.0.0.1:8080" +) + +func TestConfigure(t *testing.T) { + tests := []struct { + config string + expectedErr string + }{ + { + config: ` +foobar: bla`, + expectedErr: "invalid configuration: listen_addr is required", + }, + { + config: ` +source: http +listen_addr: 127.0.0.1:8080 +path: wrongpath`, + expectedErr: "invalid configuration: path must start with /", + }, + { + config: ` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: basic_auth`, + expectedErr: "invalid configuration: basic_auth is selected, but basic_auth is not provided", + }, + { + config: ` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers`, + expectedErr: "invalid configuration: headers is selected, but headers is not provided", + }, + { + config: ` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: basic_auth +basic_auth: + username: 132`, + expectedErr: "invalid configuration: basic_auth is selected, but password is not provided", + }, + { + config: ` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: basic_auth +basic_auth: + password: 132`, + expectedErr: "invalid configuration: basic_auth is selected, but username is not provided", + }, + { + config: ` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers +headers:`, + expectedErr: "invalid configuration: headers is selected, but headers is not provided", + }, + { + config: ` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: toto`, + expectedErr: "invalid configuration: invalid auth_type: must be one of basic_auth, headers, mtls", + }, + { + config: ` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers +headers: + key: value +tls: + server_key: key`, + expectedErr: "invalid configuration: server_cert is required", + }, + { + config: ` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers +headers: + key: value +tls: + server_cert: cert`, + expectedErr: "invalid configuration: server_key is required", + }, + { + config: ` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: mtls +tls: + server_cert: cert + server_key: key`, + expectedErr: "invalid configuration: mtls is selected, but ca_cert is not provided", + }, + { + config: ` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers +headers: + key: value +max_body_size: 0`, + expectedErr: "invalid configuration: max_body_size must be positive", + }, + { + config: ` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers +headers: + key: value +timeout: toto`, + expectedErr: "cannot parse http datasource configuration: yaml: unmarshal errors:\n line 8: cannot unmarshal !!str `toto` into time.Duration", + }, + { + config: ` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers +headers: + key: value +custom_status_code: 999`, + expectedErr: "invalid configuration: invalid HTTP status code", + }, + } + + subLogger := log.WithFields(log.Fields{ + "type": "http", + }) + + for _, test := range tests { + h := HTTPSource{} + err := h.Configure([]byte(test.config), subLogger, 0) + cstest.AssertErrorContains(t, err, test.expectedErr) + } +} + +func TestGetUuid(t *testing.T) { + h := HTTPSource{} + h.Config.UniqueId = "test" + if h.GetUuid() != "test" { + t.Fatalf("expected 'test', got '%s'", h.GetUuid()) + } +} + +func TestUnmarshalConfig(t *testing.T) { + h := HTTPSource{} + err := h.UnmarshalConfig([]byte(` +source: http +listen_addr: 127.0.0.1:8080 +path: 15 + auth_type: headers`)) + cstest.AssertErrorMessage(t, err, "cannot parse http datasource configuration: yaml: line 4: found a tab character that violates indentation") +} + +func TestConfigureByDSN(t *testing.T) { + h := HTTPSource{} + err := h.ConfigureByDSN("http://localhost:8080/test", map[string]string{}, log.WithFields(log.Fields{ + "type": "http", + }), "test") + cstest.AssertErrorMessage( + t, + err, + "http datasource does not support command-line acquisition", + ) +} + +func TestGetMode(t *testing.T) { + h := HTTPSource{} + h.Config.Mode = "test" + if h.GetMode() != "test" { + t.Fatalf("expected 'test', got '%s'", h.GetMode()) + } +} + +func TestGetName(t *testing.T) { + h := HTTPSource{} + if h.GetName() != "http" { + t.Fatalf("expected 'http', got '%s'", h.GetName()) + } +} + +func SetupAndRunHTTPSource(t *testing.T, h *HTTPSource, config []byte, metricLevel int) (chan types.Event, *tomb.Tomb) { + ctx := context.Background() + subLogger := log.WithFields(log.Fields{ + "type": "http", + }) + err := h.Configure(config, subLogger, metricLevel) + if err != nil { + t.Fatalf("unable to configure http source: %s", err) + } + tomb := tomb.Tomb{} + out := make(chan types.Event) + err = h.StreamingAcquisition(ctx, out, &tomb) + if err != nil { + t.Fatalf("unable to start streaming acquisition: %s", err) + } + + for _, metric := range h.GetMetrics() { + prometheus.Register(metric) + } + + return out, &tomb +} + +func TestStreamingAcquisitionWrongHTTPMethod(t *testing.T) { + h := &HTTPSource{} + _, tomb := SetupAndRunHTTPSource(t, h, []byte(` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: basic_auth +basic_auth: + username: test + password: test`), 0) + + time.Sleep(1 * time.Second) + + res, err := http.Get(fmt.Sprintf("%s/test", testHTTPServerAddr)) + if err != nil { + t.Fatalf("unable to get http response: %s", err) + } + if res.StatusCode != http.StatusMethodNotAllowed { + t.Fatalf("expected status code %d, got %d", http.StatusMethodNotAllowed, res.StatusCode) + } + + h.Server.Close() + tomb.Kill(nil) + tomb.Wait() + +} + +func TestStreamingAcquisitionUnknownPath(t *testing.T) { + h := &HTTPSource{} + _, tomb := SetupAndRunHTTPSource(t, h, []byte(` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: basic_auth +basic_auth: + username: test + password: test`), 0) + + time.Sleep(1 * time.Second) + + res, err := http.Get(fmt.Sprintf("%s/unknown", testHTTPServerAddr)) + if err != nil { + t.Fatalf("unable to get http response: %s", err) + } + + if res.StatusCode != http.StatusNotFound { + t.Fatalf("expected status code %d, got %d", http.StatusNotFound, res.StatusCode) + } + + h.Server.Close() + tomb.Kill(nil) + tomb.Wait() +} + +func TestStreamingAcquisitionBasicAuth(t *testing.T) { + h := &HTTPSource{} + _, tomb := SetupAndRunHTTPSource(t, h, []byte(` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: basic_auth +basic_auth: + username: test + password: test`), 0) + + time.Sleep(1 * time.Second) + + client := &http.Client{} + + resp, err := http.Post(fmt.Sprintf("%s/test", testHTTPServerAddr), "application/json", strings.NewReader("test")) + if err != nil { + t.Fatalf("unable to post http request: %s", err) + } + if resp.StatusCode != http.StatusUnauthorized { + t.Fatalf("expected status code %d, got %d", http.StatusUnauthorized, resp.StatusCode) + } + + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader("test")) + if err != nil { + t.Fatalf("unable to create http request: %s", err) + } + req.SetBasicAuth("test", "WrongPassword") + resp, err = client.Do(req) + if err != nil { + t.Fatalf("unable to post http request: %s", err) + } + if resp.StatusCode != http.StatusUnauthorized { + t.Fatalf("expected status code %d, got %d", http.StatusUnauthorized, resp.StatusCode) + } + + h.Server.Close() + tomb.Kill(nil) + tomb.Wait() +} + +func TestStreamingAcquisitionBadHeaders(t *testing.T) { + h := &HTTPSource{} + _, tomb := SetupAndRunHTTPSource(t, h, []byte(` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers +headers: + key: test`), 0) + + time.Sleep(1 * time.Second) + + client := &http.Client{} + + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader("test")) + if err != nil { + t.Fatalf("unable to create http request: %s", err) + } + req.Header.Add("Key", "wrong") + resp, err := client.Do(req) + if err != nil { + t.Fatalf("unable to post http request: %s", err) + } + if resp.StatusCode != http.StatusUnauthorized { + t.Fatalf("expected status code %d, got %d", http.StatusUnauthorized, resp.StatusCode) + } + + h.Server.Close() + tomb.Kill(nil) + tomb.Wait() +} + +func TestStreamingAcquisitionMaxBodySize(t *testing.T) { + h := &HTTPSource{} + _, tomb := SetupAndRunHTTPSource(t, h, []byte(` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers +headers: + key: test +max_body_size: 5`), 0) + + time.Sleep(1 * time.Second) + + client := &http.Client{} + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader("testtest")) + if err != nil { + t.Fatalf("unable to create http request: %s", err) + } + req.Header.Add("Key", "test") + resp, err := client.Do(req) + if err != nil { + t.Fatalf("unable to post http request: %s", err) + } + if resp.StatusCode != http.StatusRequestEntityTooLarge { + t.Fatalf("expected status code %d, got %d", http.StatusRequestEntityTooLarge, resp.StatusCode) + } + + h.Server.Close() + tomb.Kill(nil) + tomb.Wait() +} + +func TestStreamingAcquisitionSuccess(t *testing.T) { + h := &HTTPSource{} + out, tomb := SetupAndRunHTTPSource(t, h, []byte(` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers +headers: + key: test`), 2) + + time.Sleep(1 * time.Second) + rawEvt := `{"test": "test"}` + + errChan := make(chan error) + go assertEvents(out, []string{rawEvt}, errChan) + + client := &http.Client{} + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(rawEvt)) + if err != nil { + t.Fatalf("unable to create http request: %s", err) + } + req.Header.Add("Key", "test") + resp, err := client.Do(req) + if err != nil { + t.Fatalf("unable to post http request: %s", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) + } + + err = <-errChan + if err != nil { + t.Fatalf("error: %s", err) + } + + assertMetrics(t, h.GetMetrics(), 1) + + h.Server.Close() + tomb.Kill(nil) + tomb.Wait() +} + +func TestStreamingAcquisitionCustomStatusCodeAndCustomHeaders(t *testing.T) { + h := &HTTPSource{} + out, tomb := SetupAndRunHTTPSource(t, h, []byte(` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers +headers: + key: test +custom_status_code: 201 +custom_headers: + success: true`), 2) + + time.Sleep(1 * time.Second) + + rawEvt := `{"test": "test"}` + errChan := make(chan error) + go assertEvents(out, []string{rawEvt}, errChan) + + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(rawEvt)) + if err != nil { + t.Fatalf("unable to create http request: %s", err) + } + req.Header.Add("Key", "test") + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("unable to post http request: %s", err) + } + + if resp.StatusCode != http.StatusCreated { + t.Fatalf("expected status code %d, got %d", http.StatusCreated, resp.StatusCode) + } + + if resp.Header.Get("Success") != "true" { + t.Fatalf("expected header 'success' to be 'true', got '%s'", resp.Header.Get("Success")) + } + + err = <-errChan + if err != nil { + t.Fatalf("error: %s", err) + } + + assertMetrics(t, h.GetMetrics(), 1) + + h.Server.Close() + tomb.Kill(nil) + tomb.Wait() +} + +type slowReader struct { + delay time.Duration + body []byte + index int +} + +func (sr *slowReader) Read(p []byte) (int, error) { + if sr.index >= len(sr.body) { + return 0, io.EOF + } + time.Sleep(sr.delay) // Simulate a delay in reading + n := copy(p, sr.body[sr.index:]) + sr.index += n + return n, nil +} + +func assertEvents(out chan types.Event, expected []string, errChan chan error) { + readLines := []types.Event{} + + for i := 0; i < len(expected); i++ { + select { + case event := <-out: + readLines = append(readLines, event) + case <-time.After(2 * time.Second): + errChan <- errors.New("timeout waiting for event") + return + } + } + + if len(readLines) != len(expected) { + errChan <- fmt.Errorf("expected %d lines, got %d", len(expected), len(readLines)) + return + } + + for i, evt := range readLines { + if evt.Line.Raw != expected[i] { + errChan <- fmt.Errorf(`expected %s, got '%+v'`, expected, evt.Line.Raw) + return + } + if evt.Line.Src != "127.0.0.1" { + errChan <- fmt.Errorf("expected '127.0.0.1', got '%s'", evt.Line.Src) + return + } + if evt.Line.Module != "http" { + errChan <- fmt.Errorf("expected 'http', got '%s'", evt.Line.Module) + return + } + } + errChan <- nil +} + +func TestStreamingAcquisitionTimeout(t *testing.T) { + h := &HTTPSource{} + _, tomb := SetupAndRunHTTPSource(t, h, []byte(` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers +headers: + key: test +timeout: 1s`), 0) + + time.Sleep(1 * time.Second) + + slow := &slowReader{ + delay: 2 * time.Second, + body: []byte(`{"test": "delayed_payload"}`), + } + + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), slow) + if err != nil { + t.Fatalf("Error creating request: %v", err) + } + req.Header.Add("Key", "test") + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + t.Fatalf("Error sending request: %v", err) + } + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("expected status code %d, got %d", http.StatusBadRequest, resp.StatusCode) + } + + h.Server.Close() + tomb.Kill(nil) + tomb.Wait() +} + +func TestStreamingAcquisitionTLSHTTPRequest(t *testing.T) { + h := &HTTPSource{} + _, tomb := SetupAndRunHTTPSource(t, h, []byte(` +source: http +listen_addr: 127.0.0.1:8080 +auth_type: mtls +path: /test +tls: + server_cert: testdata/server.crt + server_key: testdata/server.key + ca_cert: testdata/ca.crt`), 0) + + time.Sleep(1 * time.Second) + + resp, err := http.Post(fmt.Sprintf("%s/test", testHTTPServerAddr), "application/json", strings.NewReader("test")) + if err != nil { + t.Fatalf("unable to post http request: %s", err) + } + + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("expected status code %d, got %d", http.StatusBadRequest, resp.StatusCode) + } + + h.Server.Close() + tomb.Kill(nil) + tomb.Wait() +} + +func TestStreamingAcquisitionTLSWithHeadersAuthSuccess(t *testing.T) { + h := &HTTPSource{} + out, tomb := SetupAndRunHTTPSource(t, h, []byte(` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers +headers: + key: test +tls: + server_cert: testdata/server.crt + server_key: testdata/server.key +`), 0) + + time.Sleep(1 * time.Second) + + caCert, err := os.ReadFile("testdata/server.crt") + if err != nil { + t.Fatalf("unable to read ca cert: %s", err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + tlsConfig := &tls.Config{ + RootCAs: caCertPool, + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + } + + rawEvt := `{"test": "test"}` + errChan := make(chan error) + go assertEvents(out, []string{rawEvt}, errChan) + + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddrTLS), strings.NewReader(rawEvt)) + if err != nil { + t.Fatalf("unable to create http request: %s", err) + } + req.Header.Add("Key", "test") + resp, err := client.Do(req) + if err != nil { + t.Fatalf("unable to post http request: %s", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) + } + + err = <-errChan + if err != nil { + t.Fatalf("error: %s", err) + } + + assertMetrics(t, h.GetMetrics(), 0) + + h.Server.Close() + tomb.Kill(nil) + tomb.Wait() +} + +func TestStreamingAcquisitionMTLS(t *testing.T) { + h := &HTTPSource{} + out, tomb := SetupAndRunHTTPSource(t, h, []byte(` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: mtls +tls: + server_cert: testdata/server.crt + server_key: testdata/server.key + ca_cert: testdata/ca.crt`), 0) + + time.Sleep(1 * time.Second) + + // init client cert + cert, err := tls.LoadX509KeyPair("testdata/client.crt", "testdata/client.key") + if err != nil { + t.Fatalf("unable to load client cert: %s", err) + } + + caCert, err := os.ReadFile("testdata/ca.crt") + if err != nil { + t.Fatalf("unable to read ca cert: %s", err) + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + } + + rawEvt := `{"test": "test"}` + errChan := make(chan error) + go assertEvents(out, []string{rawEvt}, errChan) + + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddrTLS), strings.NewReader(rawEvt)) + if err != nil { + t.Fatalf("unable to create http request: %s", err) + } + + resp, err := client.Do(req) + if err != nil { + t.Fatalf("unable to post http request: %s", err) + } + + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) + } + + err = <-errChan + if err != nil { + t.Fatalf("error: %s", err) + } + + assertMetrics(t, h.GetMetrics(), 0) + + h.Server.Close() + tomb.Kill(nil) + tomb.Wait() +} + +func TestStreamingAcquisitionGzipData(t *testing.T) { + h := &HTTPSource{} + out, tomb := SetupAndRunHTTPSource(t, h, []byte(` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers +headers: + key: test`), 2) + + time.Sleep(1 * time.Second) + + rawEvt := `{"test": "test"}` + errChan := make(chan error) + go assertEvents(out, []string{rawEvt, rawEvt}, errChan) + + var b strings.Builder + gz := gzip.NewWriter(&b) + if _, err := gz.Write([]byte(rawEvt)); err != nil { + t.Fatalf("unable to write gzipped data: %s", err) + } + if _, err := gz.Write([]byte(rawEvt)); err != nil { + t.Fatalf("unable to write gzipped data: %s", err) + } + if err := gz.Close(); err != nil { + t.Fatalf("unable to close gzip writer: %s", err) + } + + // send gzipped compressed data + client := &http.Client{} + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(b.String())) + if err != nil { + t.Fatalf("unable to create http request: %s", err) + } + req.Header.Add("Key", "test") + req.Header.Add("Content-Encoding", "gzip") + req.Header.Add("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + t.Fatalf("unable to post http request: %s", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) + } + + err = <-errChan + if err != nil { + t.Fatalf("error: %s", err) + } + + assertMetrics(t, h.GetMetrics(), 2) + + h.Server.Close() + tomb.Kill(nil) + tomb.Wait() +} + +func TestStreamingAcquisitionNDJson(t *testing.T) { + h := &HTTPSource{} + out, tomb := SetupAndRunHTTPSource(t, h, []byte(` +source: http +listen_addr: 127.0.0.1:8080 +path: /test +auth_type: headers +headers: + key: test`), 2) + + time.Sleep(1 * time.Second) + rawEvt := `{"test": "test"}` + + errChan := make(chan error) + go assertEvents(out, []string{rawEvt, rawEvt}, errChan) + + client := &http.Client{} + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(fmt.Sprintf("%s\n%s\n", rawEvt, rawEvt))) + + if err != nil { + t.Fatalf("unable to create http request: %s", err) + } + + req.Header.Add("Key", "test") + req.Header.Add("Content-Type", "application/x-ndjson") + + resp, err := client.Do(req) + if err != nil { + t.Fatalf("unable to post http request: %s", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) + } + + err = <-errChan + if err != nil { + t.Fatalf("error: %s", err) + } + + assertMetrics(t, h.GetMetrics(), 2) + + h.Server.Close() + tomb.Kill(nil) + tomb.Wait() +} + +func assertMetrics(t *testing.T, metrics []prometheus.Collector, expected int) { + promMetrics, err := prometheus.DefaultGatherer.Gather() + if err != nil { + t.Fatalf("unable to gather metrics: %s", err) + } + isExist := false + for _, metricFamily := range promMetrics { + if metricFamily.GetName() == "cs_httpsource_hits_total" { + isExist = true + if len(metricFamily.GetMetric()) != 1 { + t.Fatalf("expected 1 metricFamily, got %d", len(metricFamily.GetMetric())) + } + for _, metric := range metricFamily.GetMetric() { + if metric.GetCounter().GetValue() != float64(expected) { + t.Fatalf("expected %d, got %f", expected, metric.GetCounter().GetValue()) + } + labels := metric.GetLabel() + if len(labels) != 2 { + t.Fatalf("expected 2 label, got %d", len(labels)) + } + if labels[0].GetName() != "path" || labels[0].GetValue() != "/test" { + t.Fatalf("expected label path:/test, got %s:%s", labels[0].GetName(), labels[0].GetValue()) + } + if labels[1].GetName() != "src" || labels[1].GetValue() != "127.0.0.1" { + t.Fatalf("expected label src:127.0.0.1, got %s:%s", labels[1].GetName(), labels[1].GetValue()) + } + } + } + } + if !isExist && expected > 0 { + t.Fatalf("expected metric cs_httpsource_hits_total not found") + } + + for _, metric := range metrics { + metric.(*prometheus.CounterVec).Reset() + } +} diff --git a/pkg/acquisition/modules/http/testdata/ca.crt b/pkg/acquisition/modules/http/testdata/ca.crt new file mode 100644 index 00000000000..ac81b9db8a6 --- /dev/null +++ b/pkg/acquisition/modules/http/testdata/ca.crt @@ -0,0 +1,23 @@ +-----BEGIN CERTIFICATE----- +MIIDvzCCAqegAwIBAgIUHQfsFpWkCy7gAmDa3A6O+y5CvAswDQYJKoZIhvcNAQEL +BQAwbzELMAkGA1UEBhMCRlIxFjAUBgNVBAgTDUlsZS1kZS1GcmFuY2UxDjAMBgNV +BAcTBVBhcmlzMREwDwYDVQQKEwhDcm93ZHNlYzERMA8GA1UECxMIQ3Jvd2RzZWMx +EjAQBgNVBAMTCWxvY2FsaG9zdDAeFw0yNDEwMjMxMDAxMDBaFw0yOTEwMjIxMDAx +MDBaMG8xCzAJBgNVBAYTAkZSMRYwFAYDVQQIEw1JbGUtZGUtRnJhbmNlMQ4wDAYD +VQQHEwVQYXJpczERMA8GA1UEChMIQ3Jvd2RzZWMxETAPBgNVBAsTCENyb3dkc2Vj +MRIwEAYDVQQDEwlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK +AoIBAQCZSR2/A24bpVHSiEeSlelfdA32uhk9wHkauwy2qxos/G/UmKG/dgWrHzRh +LawlFVHtVn4u7Hjqz2y2EsH3bX42jC5NMVARgXIOBr1dE6F5/bPqA6SoVgkDm9wh +ZBigyAMxYsR4+3ahuf0pQflBShKrLZ1UYoe6tQXob7l3x5vThEhNkBawBkLfWpj7 +7Imm1tGyEZdxCMkT400KRtSmJRrnpiOCUosnacwgp7MCbKWOIOng07Eh16cVUiuI +BthWU/LycIuac2xaD9PFpeK/MpwASRRPXZgPUhiZuaa7vttD0phCdDaS46Oln5/7 +tFRZH0alBZmkpVZJCWAP4ujIA3vLAgMBAAGjUzBRMA4GA1UdDwEB/wQEAwIBBjAP +BgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBTwpg+WN1nZJs4gj5hfoa+fMSZjGTAP +BgNVHREECDAGhwR/AAABMA0GCSqGSIb3DQEBCwUAA4IBAQAZuOWT8zHcwbWvC6Jm +/ccgB/U7SbeIYFJrCZd9mTyqsgnkFNH8yJ5F4dXXtPXr+SO/uWWa3G5hams3qVFf +zWzzPDQdyhUhfh5fjUHR2RsSGBmCxcapYHpVvAP5aY1/ujYrXMvAJV0hfDO2tGHb +rveuJxhe8ymQ1Yb2u9NcmI1HG9IVt3Airz4gAIUJWbFvRigky0bukfddOkfiUiaF +DMPJQO6HAj8d8ctSHHVZWzhAInZ1pDg6HIHYF44m1tT27pSQoi0ZFocskDi/fC2f +EIF0nu5fRLUS6BZEfpnDi9U0lbJ/kUrgT5IFHMFqXdRpDqcnXpJZhYtp5l6GoqjM +gT33 +-----END CERTIFICATE----- diff --git a/pkg/acquisition/modules/http/testdata/client.crt b/pkg/acquisition/modules/http/testdata/client.crt new file mode 100644 index 00000000000..55efdddad09 --- /dev/null +++ b/pkg/acquisition/modules/http/testdata/client.crt @@ -0,0 +1,24 @@ +-----BEGIN CERTIFICATE----- +MIID7jCCAtagAwIBAgIUJMTPh3oPJLPgsnb9T85ieb4EuOQwDQYJKoZIhvcNAQEL +BQAwbzELMAkGA1UEBhMCRlIxFjAUBgNVBAgTDUlsZS1kZS1GcmFuY2UxDjAMBgNV +BAcTBVBhcmlzMREwDwYDVQQKEwhDcm93ZHNlYzERMA8GA1UECxMIQ3Jvd2RzZWMx +EjAQBgNVBAMTCWxvY2FsaG9zdDAeFw0yNDEwMjMxMDQ2MDBaFw0yNTEwMjMxMDQ2 +MDBaMHIxCzAJBgNVBAYTAkZSMRYwFAYDVQQIEw1JbGUtZGUtRnJhbmNlMQ4wDAYD +VQQHEwVQYXJpczERMA8GA1UEChMIQ3Jvd2RzZWMxFzAVBgNVBAsTDlRlc3Rpbmcg +Y2xpZW50MQ8wDQYDVQQDEwZjbGllbnQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw +ggEKAoIBAQDAUOdpRieRrrH6krUjgcjLgJg6TzoWAb/iv6rfcioX1L9bj9fZSkwu +GqKzXX/PceIXElzQgiGJZErbJtnTzhGS80QgtAB8BwWQIT2zgoGcYJf7pPFvmcMM +qMGFwK0dMC+LHPk+ePtFz8dskI2XJ8jgBdtuZcnDblMuVGtjYT6n0rszvRdo118+ +mlGCLPzOfsO1JdOqLWAR88yZfqCFt1TrwmzpRT1crJQeM6i7muw4aO0L7uSek9QM +6APHz0QexSq7/zHOtRjA4jnJbDzZJHRlwOdlsNU9cmTz6uWIQXlg+2ovD55YurNy ++jYfmfDYpimhoeGf54zaETp1fTuTJYpxAgMBAAGjfzB9MA4GA1UdDwEB/wQEAwIF +oDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDAYDVR0TAQH/BAIwADAd +BgNVHQ4EFgQUmH0/7RuKnoW7sEK4Cr8eVNGbb8swHwYDVR0jBBgwFoAU8KYPljdZ +2SbOII+YX6GvnzEmYxkwDQYJKoZIhvcNAQELBQADggEBAHVn9Zuoyxu9iTFoyJ50 +e/XKcmt2uK2M1x+ap2Av7Wb/Omikx/R2YPq7994BfiUCAezY2YtreZzkE6Io1wNM +qApijEJnlqEmOXiYJqlF89QrCcsAsz6lfaqitYBZSL3o4KT+7/uUDVxgNEjEksRz +9qy6DFBLvyhxbOM2zDEV+MVfemBWSvNiojHqXzDBkZnBHHclJLuIKsXDZDGhKbNd +hsoGU00RLevvcUpUJ3a68ekgwiYFJifm0uyfmao9lmiB3i+8ZW3Q4rbwHtD+U7U2 +3n+U5PkhiUAveuMfrvUMzsTolZiop9ZLtcALDUFaqyr4tjfVOf5+CGjiluio7oE1 +UYg= +-----END CERTIFICATE----- diff --git a/pkg/acquisition/modules/http/testdata/client.key b/pkg/acquisition/modules/http/testdata/client.key new file mode 100644 index 00000000000..f8ef2efbd58 --- /dev/null +++ b/pkg/acquisition/modules/http/testdata/client.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAwFDnaUYnka6x+pK1I4HIy4CYOk86FgG/4r+q33IqF9S/W4/X +2UpMLhqis11/z3HiFxJc0IIhiWRK2ybZ084RkvNEILQAfAcFkCE9s4KBnGCX+6Tx +b5nDDKjBhcCtHTAvixz5Pnj7Rc/HbJCNlyfI4AXbbmXJw25TLlRrY2E+p9K7M70X +aNdfPppRgiz8zn7DtSXTqi1gEfPMmX6ghbdU68Js6UU9XKyUHjOou5rsOGjtC+7k +npPUDOgDx89EHsUqu/8xzrUYwOI5yWw82SR0ZcDnZbDVPXJk8+rliEF5YPtqLw+e +WLqzcvo2H5nw2KYpoaHhn+eM2hE6dX07kyWKcQIDAQABAoIBAQChriKuza0MfBri +9x3UCRN/is/wDZVe1P+2KL8F9ZvPxytNVeP4qM7c38WzF8MQ6sRR8z0WiqCZOjj4 +f3QX7iG2MlAvUkUqAFk778ZIuUov5sE/bU8RLOrfJKz1vqOLa2w8/xHH5LwS1/jn +m6t9zZPCSwpMiMSUSZci1xQlS6b6POZMjeqLPqv9cP8PJNv9UNrHFcQnQi1iwKJH +MJ7CQI3R8FSeGad3P7tB9YDaBm7hHmd/TevuFkymcKNT44XBSgddPDfgKui6sHTY +QQWgWI9VGVO350ZBLRLkrk8wboY4vc15qbBzYFG66WiR/tNdLt3rDYxpcXaDvcQy +e47mYNVxAoGBAMFsUmPDssqzmOkmZxHDM+VmgHYPXjDqQdE299FtuClobUW4iU4g +By7o84aCIBQz2sp9f1KM+10lr+Bqw3s7QBbR5M67PA8Zm45DL9t70NR/NZHGzFRD +BR/NMbwzCqNtY2UGDhYQLGhW8heAwsYwir8ZqmOfKTd9aY1pu/S8m9AlAoGBAP6I +483EIN8R5y+beGcGynYeIrH5Gc+W2FxWIW9jh/G7vRbhMlW4z0GxV3uEAYmOlBH2 +AqUkV6+uzU0P4B/m3vCYqLycBVDwifJazDj9nskVL5kGMxia62iwDMXs5nqNS4WJ +ZM5Gl2xIiwmgWnYnujM3eKF2wbm439wj4na80SldAoGANdIqatA9o+GtntKsw2iJ +vD91Z2SHVR0aC1k8Q+4/3GXOYiQjMLYAybDQcpEq0/RJ4SZik1nfZ9/gvJV4p4Wp +I7Br9opq/9ikTEWtv2kIhtiO02151ciAWIUEXdXmE+uQSMASk1kUwkPPQXL2v6cq +NFqz6tyS33nqMQtG3abNxHECgYA4AEA2nmcpDRRTSh50dG8JC9pQU+EU5jhWIHEc +w8Y+LjMNHKDpcU7QQkdgGowICsGTLhAo61ULhycORGboPfBg+QVu8djNlQ6Urttt +0ocj8LBXN6D4UeVnVAyLY3LWFc4+5Bq0s51PKqrEhG5Cvrzd1d+JjspSpVVDZvXF +cAeI1QKBgC/cMN3+2Sc+2biu46DnkdYpdF/N0VGMOgzz+unSVD4RA2mEJ9UdwGga +feshtrtcroHtEmc+WDYgTTnAq1MbsVFQYIwZ5fL/GJ1R8ccaWiPuX2HrKALKG4Y3 +CMFpDUWhRgtaBsmuOpUq3FeS5cyPNMHk6axL1KyFoJk9AgfhqhTp +-----END RSA PRIVATE KEY----- diff --git a/pkg/acquisition/modules/http/testdata/server.crt b/pkg/acquisition/modules/http/testdata/server.crt new file mode 100644 index 00000000000..7a02c606c9d --- /dev/null +++ b/pkg/acquisition/modules/http/testdata/server.crt @@ -0,0 +1,23 @@ +-----BEGIN CERTIFICATE----- +MIID5jCCAs6gAwIBAgIUU3F6URi0oTe9ontkf7JqXOo89QYwDQYJKoZIhvcNAQEL +BQAwbzELMAkGA1UEBhMCRlIxFjAUBgNVBAgTDUlsZS1kZS1GcmFuY2UxDjAMBgNV +BAcTBVBhcmlzMREwDwYDVQQKEwhDcm93ZHNlYzERMA8GA1UECxMIQ3Jvd2RzZWMx +EjAQBgNVBAMTCWxvY2FsaG9zdDAeFw0yNDEwMjMxMDAzMDBaFw0yNTEwMjMxMDAz +MDBaMG8xCzAJBgNVBAYTAkZSMRYwFAYDVQQIEw1JbGUtZGUtRnJhbmNlMQ4wDAYD +VQQHEwVQYXJpczERMA8GA1UEChMIQ3Jvd2RzZWMxETAPBgNVBAsTCENyb3dkc2Vj +MRIwEAYDVQQDEwlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK +AoIBAQC/lnUubjBGe5x0LgIE5GeG52LRzj99iLWuvey4qbSwFZ07ECgv+JttVwDm +AjEeakj2ZR46WHvHAR9eBNkRCORyWX0iKVIzm09PXYi80KtwGLaA8YMEio9/08Cc ++LS0TuP0yiOcw+btrhmvvauDzcQhA6u55q8anCZiF2BlHfX9Sh6QKewA3NhOkzbU +VTxqrOqfcRsGNub7dheqfP5bfrPkF6Y6l/0Fhyx0NMsu1zaQ0hCls2hkTf0Y3XGt +IQNWoN22seexR3qRmPf0j3jBa0qOmGgd6kAd+YpsjDblgCNUIJZiVj51fVb0sGRx +ShkfKGU6t0eznTWPCqswujO/sn+pAgMBAAGjejB4MA4GA1UdDwEB/wQEAwIFoDAd +BgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDAYDVR0TAQH/BAIwADAdBgNV +HQ4EFgQUOiIF+7Wzx1J8Ki3DiBfx+E6zlSUwGgYDVR0RBBMwEYIJbG9jYWxob3N0 +hwR/AAABMA0GCSqGSIb3DQEBCwUAA4IBAQA0dzlhBr/0wXPyj/iWxMOXxZ1FNJ9f +lxBMhLAgX0WrT2ys+284J7Hcn0lJeqelluYpmeKn9vmCAEj3MmUmHzZyf//lhuUJ +0DlYWIHUsGaJHJ7A+1hQqrcXHhkcRy5WGIM9VoddKbBbg2b6qzTSvxn8EnuD7H4h +28wLyGLCzsSXoVcAB8u+svYt29TPuy6xmMAokyIShV8FsE77fjVTgtCuxmx1PKv3 +zd6+uEae7bbZ+GJH1zKF0vokejQvmByt+YuIXlNbMseaMUeDdpy+6qlRvbbN1dyp +rkQXfWvidMfSue5nH/akAn83v/CdKxG6tfW83d9Rud3naabUkywALDng +-----END CERTIFICATE----- diff --git a/pkg/acquisition/modules/http/testdata/server.key b/pkg/acquisition/modules/http/testdata/server.key new file mode 100644 index 00000000000..4d0ee53b4c2 --- /dev/null +++ b/pkg/acquisition/modules/http/testdata/server.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpQIBAAKCAQEAv5Z1Lm4wRnucdC4CBORnhudi0c4/fYi1rr3suKm0sBWdOxAo +L/ibbVcA5gIxHmpI9mUeOlh7xwEfXgTZEQjkcll9IilSM5tPT12IvNCrcBi2gPGD +BIqPf9PAnPi0tE7j9MojnMPm7a4Zr72rg83EIQOrueavGpwmYhdgZR31/UoekCns +ANzYTpM21FU8aqzqn3EbBjbm+3YXqnz+W36z5BemOpf9BYcsdDTLLtc2kNIQpbNo +ZE39GN1xrSEDVqDdtrHnsUd6kZj39I94wWtKjphoHepAHfmKbIw25YAjVCCWYlY+ +dX1W9LBkcUoZHyhlOrdHs501jwqrMLozv7J/qQIDAQABAoIBAF1Vd/rJlV0Q5RQ4 +QaWOe9zdpmedeZK3YgMh5UvE6RCLRxC5+0n7bASlSPvEf5dYofjfJA26g3pcUqKj +6/d/hIMsk2hsBu67L7TzVSTe51XxxB8nCPPSaLwWNZSDGM1qTWU4gIbjbQHHOh5C +YWcRfAW1WxhyiEWHYq+QwdYg9XCRrSg1UzvVvW1Yt2wDGcSZP5whbXipfw3BITDs +XU7ODYNkU1sjIzQZwzVGxOf9qKdhZFZ26Vhoz8OJNMLyJxY7EspuwR7HbDGt11Pb +CxOt/BV44LwdVYeqh57oIKtckQW33W/6EeaWr7GfMzyH5WSrsOJoK5IJVrZaPTcS +QiMYLA0CgYEA9vMVsGshBl3TeRGaU3XLHqooXD4kszbdnjfPrwGlfCO/iybhDqo5 +WFypM/bYcIWzbTez/ihufHEHPSCUbFEcN4B+oczGcuxTcZjFyvJYvq2ycxPUiDIi +JnVUcVxgh1Yn39+CsQ/b6meP7MumTD2P3I87CeQGlWTO5Ys9mdw0BjcCgYEAxpv1 +64l5UoFJGr4yElNKDIKnhEFbJZsLGKiiuVXcS1QVHW5Az5ar9fPxuepyHpz416l3 +ppncuhJiUIP+jbu5e0s0LsN46mLS3wkHLgYJj06CNT3uOSLSg1iFl7DusdbyiaA7 +wEJ/aotS1NZ4XaeryAWHwYJ6Kag3nz6NV3ZYuR8CgYEAxAFCuMj+6F+2RsTa+d1n +v8oMyNImLPyiQD9KHzyuTW7OTDMqtIoVg/Xf8re9KOpl9I0e1t7eevT3auQeCi8C +t2bMm7290V+UB3jbnO5n08hn+ADIUuV/x4ie4m8QyrpuYbm0sLbGtTFHwgoNzzuZ +oNUqZfpP42mk8fpnhWSLAlcCgYEAgpY7XRI4HkJ5ocbav2faMV2a7X/XgWNvKViA +HeJRhYoUlBRRMuz7xi0OjFKVlIFbsNlxna5fDk1WLWCMd/6tl168Qd8u2tX9lr6l +5OH9WSeiv4Un5JN73PbQaAvi9jXBpTIg92oBwzk2TlFyNQoxDcRtHZQ/5LIBWIhV +gOOEtLsCgYEA1wbGc4XlH+/nXVsvx7gmfK8pZG8XA4/ToeIEURwPYrxtQZLB4iZs +aqWGgIwiB4F4UkuKZIjMrgInU9y0fG6EL96Qty7Yjh7dGy1vJTZl6C+QU6o4sEwl +r5Id5BNLEaqISWQ0LvzfwdfABYlvFfBdaGbzUzLEitD79eyhxuNEOBw= +-----END RSA PRIVATE KEY----- diff --git a/pkg/acquisition/modules/journalctl/journalctl.go b/pkg/acquisition/modules/journalctl/journalctl.go index e7a35d5a3ba..27f20b9f446 100644 --- a/pkg/acquisition/modules/journalctl/journalctl.go +++ b/pkg/acquisition/modules/journalctl/journalctl.go @@ -136,12 +136,9 @@ func (j *JournalCtlSource) runJournalCtl(ctx context.Context, out chan types.Eve if j.metricsLevel != configuration.METRICS_NONE { linesRead.With(prometheus.Labels{"source": j.src}).Inc() } - var evt types.Event - if !j.config.UseTimeMachine { - evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE} - } else { - evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} - } + + evt := types.MakeEvent(j.config.UseTimeMachine, types.LOG, true) + evt.Line = l out <- evt case stderrLine := <-stderrChan: logger.Warnf("Got stderr message : %s", stderrLine) diff --git a/pkg/acquisition/modules/kafka/kafka.go b/pkg/acquisition/modules/kafka/kafka.go index a9a5e13e958..77fc44e310d 100644 --- a/pkg/acquisition/modules/kafka/kafka.go +++ b/pkg/acquisition/modules/kafka/kafka.go @@ -173,13 +173,8 @@ func (k *KafkaSource) ReadMessage(ctx context.Context, out chan types.Event) err if k.metricsLevel != configuration.METRICS_NONE { linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc() } - var evt types.Event - - if !k.Config.UseTimeMachine { - evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE} - } else { - evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} - } + evt := types.MakeEvent(k.Config.UseTimeMachine, types.LOG, true) + evt.Line = l out <- evt } } diff --git a/pkg/acquisition/modules/kinesis/kinesis.go b/pkg/acquisition/modules/kinesis/kinesis.go index 3cfc224aa25..3744e43f38d 100644 --- a/pkg/acquisition/modules/kinesis/kinesis.go +++ b/pkg/acquisition/modules/kinesis/kinesis.go @@ -322,12 +322,8 @@ func (k *KinesisSource) ParseAndPushRecords(records []*kinesis.Record, out chan } else { l.Src = k.Config.StreamName } - var evt types.Event - if !k.Config.UseTimeMachine { - evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE} - } else { - evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} - } + evt := types.MakeEvent(k.Config.UseTimeMachine, types.LOG, true) + evt.Line = l out <- evt } } diff --git a/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go b/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go index 30fc5c467ea..1fa6c894a32 100644 --- a/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go +++ b/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go @@ -207,11 +207,8 @@ func (ka *KubernetesAuditSource) webhookHandler(w http.ResponseWriter, r *http.R Process: true, Module: ka.GetName(), } - ka.outChan <- types.Event{ - Line: l, - Process: true, - Type: types.LOG, - ExpectMode: types.LIVE, - } + evt := types.MakeEvent(ka.config.UseTimeMachine, types.LOG, true) + evt.Line = l + ka.outChan <- evt } } diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index e39c76af22c..d50787b652b 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -307,16 +307,9 @@ func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]stri if l.metricsLevel != configuration.METRICS_NONE { linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc() } - expectMode := types.LIVE - if l.Config.UseTimeMachine { - expectMode = types.TIMEMACHINE - } - out <- types.Event{ - Line: ll, - Process: true, - Type: types.LOG, - ExpectMode: expectMode, - } + evt := types.MakeEvent(l.Config.UseTimeMachine, types.LOG, true) + evt.Line = ll + out <- evt } func (l *LokiSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error { diff --git a/pkg/acquisition/modules/s3/s3.go b/pkg/acquisition/modules/s3/s3.go index acd78ceba8f..cdc84a8a3ca 100644 --- a/pkg/acquisition/modules/s3/s3.go +++ b/pkg/acquisition/modules/s3/s3.go @@ -443,12 +443,8 @@ func (s *S3Source) readFile(bucket string, key string) error { } else if s.MetricsLevel == configuration.METRICS_AGGREGATE { l.Src = bucket } - var evt types.Event - if !s.Config.UseTimeMachine { - evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE} - } else { - evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} - } + evt := types.MakeEvent(s.Config.UseTimeMachine, types.LOG, true) + evt.Line = l s.out <- evt } } diff --git a/pkg/acquisition/modules/syslog/syslog.go b/pkg/acquisition/modules/syslog/syslog.go index 33a2f1542db..fb6a04600c1 100644 --- a/pkg/acquisition/modules/syslog/syslog.go +++ b/pkg/acquisition/modules/syslog/syslog.go @@ -235,11 +235,9 @@ func (s *SyslogSource) handleSyslogMsg(out chan types.Event, t *tomb.Tomb, c cha l.Time = ts l.Src = syslogLine.Client l.Process = true - if !s.config.UseTimeMachine { - out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE} - } else { - out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} - } + evt := types.MakeEvent(s.config.UseTimeMachine, types.LOG, true) + evt.Line = l + out <- evt } } } diff --git a/pkg/acquisition/modules/wineventlog/wineventlog_windows.go b/pkg/acquisition/modules/wineventlog/wineventlog_windows.go index 887be8b7dd3..aa2a89a7754 100644 --- a/pkg/acquisition/modules/wineventlog/wineventlog_windows.go +++ b/pkg/acquisition/modules/wineventlog/wineventlog_windows.go @@ -206,9 +206,9 @@ func (w *WinEventLogSource) getEvents(out chan types.Event, t *tomb.Tomb) error l.Src = w.name l.Process = true if !w.config.UseTimeMachine { - out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE} + out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE, Unmarshaled: make(map[string]interface{})} } else { - out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} + out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE, Unmarshaled: make(map[string]interface{})} } } } @@ -430,7 +430,9 @@ OUTER_LOOP: l.Time = time.Now() l.Src = w.name l.Process = true - out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} + csevt := types.MakeEvent(w.config.UseTimeMachine, types.LOG, true) + csevt.Line = l + out <- csevt } } } diff --git a/pkg/cwversion/component/component.go b/pkg/cwversion/component/component.go index 4036b63cf00..7ed596525e0 100644 --- a/pkg/cwversion/component/component.go +++ b/pkg/cwversion/component/component.go @@ -7,20 +7,21 @@ package component // Built is a map of all the known components, and whether they are built-in or not. // This is populated as soon as possible by the respective init() functions -var Built = map[string]bool { - "datasource_appsec": false, - "datasource_cloudwatch": false, - "datasource_docker": false, - "datasource_file": false, - "datasource_journalctl": false, - "datasource_k8s-audit": false, - "datasource_kafka": false, - "datasource_kinesis": false, - "datasource_loki": false, - "datasource_s3": false, - "datasource_syslog": false, - "datasource_wineventlog":false, - "cscli_setup": false, +var Built = map[string]bool{ + "datasource_appsec": false, + "datasource_cloudwatch": false, + "datasource_docker": false, + "datasource_file": false, + "datasource_journalctl": false, + "datasource_k8s-audit": false, + "datasource_kafka": false, + "datasource_kinesis": false, + "datasource_loki": false, + "datasource_s3": false, + "datasource_syslog": false, + "datasource_wineventlog": false, + "datasource_http": false, + "cscli_setup": false, } func Register(name string) { diff --git a/pkg/types/event.go b/pkg/types/event.go index e016d0294c4..9300626b927 100644 --- a/pkg/types/event.go +++ b/pkg/types/event.go @@ -47,6 +47,22 @@ type Event struct { Meta map[string]string `yaml:"Meta,omitempty" json:"Meta,omitempty"` } +func MakeEvent(timeMachine bool, evtType int, process bool) Event { + evt := Event{ + Parsed: make(map[string]string), + Meta: make(map[string]string), + Unmarshaled: make(map[string]interface{}), + Enriched: make(map[string]string), + ExpectMode: LIVE, + Process: process, + Type: evtType, + } + if timeMachine { + evt.ExpectMode = TIMEMACHINE + } + return evt +} + func (e *Event) SetMeta(key string, value string) bool { if e.Meta == nil { e.Meta = make(map[string]string)