diff --git a/pkg/acquisition/modules/http/http.go b/pkg/acquisition/modules/http/http.go index 9f2a6197f57..2cdde786f44 100644 --- a/pkg/acquisition/modules/http/http.go +++ b/pkg/acquisition/modules/http/http.go @@ -5,6 +5,7 @@ import ( "context" "crypto/tls" "crypto/x509" + "encoding/json" "errors" "fmt" "io" @@ -258,7 +259,19 @@ func ReadBody(r *http.Request) ([]byte, error) { return io.ReadAll(gReader) } - return io.ReadAll(r.Body) + decoder := json.NewDecoder(r.Body) + var body []byte + for { + var message json.RawMessage + if err := decoder.Decode(&message); err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("failed to decode: %w", err) + } + body = append(body, message...) + } + return body, nil } func (h *HTTPSource) processRequest(w http.ResponseWriter, r *http.Request, hc *HttpConfiguration, out chan types.Event, t *tomb.Tomb) error { @@ -267,22 +280,37 @@ func (h *HTTPSource) processRequest(w http.ResponseWriter, r *http.Request, hc * return fmt.Errorf("body size exceeds max body size: %d > %d", r.ContentLength, *hc.MaxBodySize) } - body, err := ReadBody(r) - defer r.Body.Close() - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return fmt.Errorf("failed to read body: %w", err) - } - h.logger.Tracef("body received: %+v", string(body)) - srcHost, _, err := net.SplitHostPort(r.RemoteAddr) if err != nil { return err } - t.Go(func() error { + defer r.Body.Close() + + reader := r.Body + + if r.Header.Get("Content-Encoding") == "gzip" { + reader, err = gzip.NewReader(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return fmt.Errorf("failed to create gzip reader: %w", err) + } + defer reader.Close() + } + + decoder := json.NewDecoder(reader) + for { + var message json.RawMessage + if err := decoder.Decode(&message); err != nil { + if err == io.EOF { + break + } + w.WriteHeader(http.StatusBadRequest) + return fmt.Errorf("failed to decode: %w", err) + } + line := types.Line{ - Raw: string(body), + Raw: string(message), Src: srcHost, Time: time.Now().UTC(), Labels: hc.Labels, @@ -306,9 +334,21 @@ func (h *HTTPSource) processRequest(w http.ResponseWriter, r *http.Request, hc * linesRead.With(prometheus.Labels{"path": hc.Path}).Inc() } + h.logger.Tracef("line to send: %+v", line) out <- evt - return nil - }) + } + + //body, err := ReadBody(r) + //defer r.Body.Close() + //if err != nil { + // w.WriteHeader(http.StatusBadRequest) + // return fmt.Errorf("failed to read body: %w", err) + //} + //h.logger.Tracef("body received: %+v", string(body)) + // + //t.Go(func() error { + // return nil + //}) return nil } diff --git a/pkg/acquisition/modules/http/http_test.go b/pkg/acquisition/modules/http/http_test.go index e7bc162b9e0..b2efebf24b6 100644 --- a/pkg/acquisition/modules/http/http_test.go +++ b/pkg/acquisition/modules/http/http_test.go @@ -422,6 +422,9 @@ headers: time.Sleep(1 * time.Second) rawEvt := `{"test": "test"}` + errChan := make(chan error) + go assertEvent(out, rawEvt, errChan) + client := &http.Client{} req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(rawEvt)) if err != nil { @@ -436,7 +439,10 @@ headers: t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) } - assertFirstEvent(out, t, rawEvt) + err = <-errChan + if err != nil { + t.Fatalf("error: %s", err) + } h.Server.Close() tomb.Kill(nil) @@ -459,6 +465,9 @@ custom_headers: time.Sleep(1 * time.Second) rawEvt := `{"test": "test"}` + errChan := make(chan error) + go assertEvent(out, rawEvt, errChan) + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(rawEvt)) if err != nil { t.Fatalf("unable to create http request: %s", err) @@ -477,7 +486,10 @@ custom_headers: t.Fatalf("expected header 'success' to be 'true', got '%s'", resp.Header.Get("Success")) } - assertFirstEvent(out, t, rawEvt) + err = <-errChan + if err != nil { + t.Fatalf("error: %s", err) + } h.Server.Close() tomb.Kill(nil) @@ -500,28 +512,34 @@ func (sr *slowReader) Read(p []byte) (int, error) { return n, nil } -func assertFirstEvent(out chan types.Event, t *testing.T, expected string) { +func assertEvent(out chan types.Event, expected string, errChan chan error) { readLines := []types.Event{} select { case event := <-out: readLines = append(readLines, event) case <-time.After(2 * time.Second): - break + errChan <- fmt.Errorf("timeout waiting for event") + return } if len(readLines) != 1 { - t.Fatalf("expected 1 line, got %d", len(readLines)) + errChan <- fmt.Errorf("expected 1 line, got %d", len(readLines)) + return } if readLines[0].Line.Raw != expected { - t.Fatalf(`expected %s, got '%+v'`, expected, readLines[0].Line) + errChan <- fmt.Errorf(`expected %s, got '%+v'`, expected, readLines[0].Line.Raw) + return } if readLines[0].Line.Src != "127.0.0.1" { - t.Fatalf("expected '127.0.0.1', got '%s'", readLines[0].Line.Src) + errChan <- fmt.Errorf("expected '127.0.0.1', got '%s'", readLines[0].Line.Src) + return } if readLines[0].Line.Module != "http" { - t.Fatalf("expected 'http', got '%s'", readLines[0].Line.Module) + errChan <- fmt.Errorf("expected 'http', got '%s'", readLines[0].Line.Module) + return } + errChan <- nil } func TestStreamingAcquisitionTimeout(t *testing.T) { @@ -624,6 +642,8 @@ tls: } rawEvt := `{"test": "test"}` + errChan := make(chan error) + go assertEvent(out, rawEvt, errChan) req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddrTLS), strings.NewReader(rawEvt)) if err != nil { @@ -638,7 +658,10 @@ tls: t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) } - assertFirstEvent(out, t, rawEvt) + err = <-errChan + if err != nil { + t.Fatalf("error: %s", err) + } h.Server.Close() tomb.Kill(nil) @@ -685,6 +708,9 @@ tls: } rawEvt := `{"test": "test"}` + errChan := make(chan error) + go assertEvent(out, rawEvt, errChan) + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddrTLS), strings.NewReader(rawEvt)) if err != nil { t.Fatalf("unable to create http request: %s", err) @@ -699,7 +725,10 @@ tls: t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) } - assertFirstEvent(out, t, rawEvt) + err = <-errChan + if err != nil { + t.Fatalf("error: %s", err) + } h.Server.Close() tomb.Kill(nil) @@ -719,10 +748,13 @@ headers: time.Sleep(1 * time.Second) rawEvt := `{"test": "test"}` + errChan := make(chan error) + go assertEvent(out, rawEvt, errChan) + go assertEvent(out, rawEvt, errChan) // send gzipped compressed data client := &http.Client{} - req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(rawEvt)) + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(fmt.Sprintf("%s\n%s", rawEvt, rawEvt))) if err != nil { t.Fatalf("unable to create http request: %s", err) } @@ -749,7 +781,55 @@ headers: t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) } - assertFirstEvent(out, t, rawEvt) + err = <-errChan + if err != nil { + t.Fatalf("error: %s", err) + } + + h.Server.Close() + tomb.Kill(nil) + tomb.Wait() +} + +func TestStreamingAcquisitionNDJson(t *testing.T) { + h := &HTTPSource{} + out, tomb := SetupAndRunHTTPSource(t, h, []byte(` +source: http +port: 8080 +path: /test +auth_type: headers +headers: + key: test`)) + + time.Sleep(1 * time.Second) + rawEvt := `{"test": "test"}` + + errChan := make(chan error) + go assertEvent(out, rawEvt, errChan) + go assertEvent(out, rawEvt, errChan) + + client := &http.Client{} + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(fmt.Sprintf("%s\n%s\n", rawEvt, rawEvt))) + + if err != nil { + t.Fatalf("unable to create http request: %s", err) + } + + req.Header.Add("Key", "test") + req.Header.Add("Content-Type", "application/x-ndjson") + + resp, err := client.Do(req) + if err != nil { + t.Fatalf("unable to post http request: %s", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) + } + + err = <-errChan + if err != nil { + t.Fatalf("error: %s", err) + } h.Server.Close() tomb.Kill(nil)