Skip to content

Commit

Permalink
handle ndjson + adapt tests to code and not the inverse
Browse files Browse the repository at this point in the history
  • Loading branch information
he2ss committed Oct 25, 2024
1 parent 075bb92 commit bdb0a40
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 25 deletions.
66 changes: 53 additions & 13 deletions pkg/acquisition/modules/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -258,7 +259,19 @@ func ReadBody(r *http.Request) ([]byte, error) {
return io.ReadAll(gReader)
}

return io.ReadAll(r.Body)
decoder := json.NewDecoder(r.Body)
var body []byte
for {
var message json.RawMessage
if err := decoder.Decode(&message); err != nil {
if err == io.EOF {
break
}
return nil, fmt.Errorf("failed to decode: %w", err)
}
body = append(body, message...)
}
return body, nil
}

func (h *HTTPSource) processRequest(w http.ResponseWriter, r *http.Request, hc *HttpConfiguration, out chan types.Event, t *tomb.Tomb) error {
Expand All @@ -267,22 +280,37 @@ func (h *HTTPSource) processRequest(w http.ResponseWriter, r *http.Request, hc *
return fmt.Errorf("body size exceeds max body size: %d > %d", r.ContentLength, *hc.MaxBodySize)
}

body, err := ReadBody(r)
defer r.Body.Close()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return fmt.Errorf("failed to read body: %w", err)
}
h.logger.Tracef("body received: %+v", string(body))

srcHost, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return err
}

t.Go(func() error {
defer r.Body.Close()

reader := r.Body

if r.Header.Get("Content-Encoding") == "gzip" {
reader, err = gzip.NewReader(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return fmt.Errorf("failed to create gzip reader: %w", err)
}
defer reader.Close()
}

decoder := json.NewDecoder(reader)
for {
var message json.RawMessage
if err := decoder.Decode(&message); err != nil {
if err == io.EOF {
break
}
w.WriteHeader(http.StatusBadRequest)
return fmt.Errorf("failed to decode: %w", err)
}

line := types.Line{
Raw: string(body),
Raw: string(message),
Src: srcHost,
Time: time.Now().UTC(),
Labels: hc.Labels,
Expand All @@ -306,9 +334,21 @@ func (h *HTTPSource) processRequest(w http.ResponseWriter, r *http.Request, hc *
linesRead.With(prometheus.Labels{"path": hc.Path}).Inc()
}

h.logger.Tracef("line to send: %+v", line)
out <- evt
return nil
})
}

//body, err := ReadBody(r)
//defer r.Body.Close()
//if err != nil {
// w.WriteHeader(http.StatusBadRequest)
// return fmt.Errorf("failed to read body: %w", err)
//}
//h.logger.Tracef("body received: %+v", string(body))
//
//t.Go(func() error {
// return nil
//})

return nil
}
Expand Down
104 changes: 92 additions & 12 deletions pkg/acquisition/modules/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,9 @@ headers:
time.Sleep(1 * time.Second)
rawEvt := `{"test": "test"}`

errChan := make(chan error)
go assertEvent(out, rawEvt, errChan)

client := &http.Client{}
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(rawEvt))
if err != nil {
Expand All @@ -436,7 +439,10 @@ headers:
t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode)
}

assertFirstEvent(out, t, rawEvt)
err = <-errChan
if err != nil {
t.Fatalf("error: %s", err)
}

h.Server.Close()
tomb.Kill(nil)
Expand All @@ -459,6 +465,9 @@ custom_headers:
time.Sleep(1 * time.Second)

rawEvt := `{"test": "test"}`
errChan := make(chan error)
go assertEvent(out, rawEvt, errChan)

req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(rawEvt))
if err != nil {
t.Fatalf("unable to create http request: %s", err)
Expand All @@ -477,7 +486,10 @@ custom_headers:
t.Fatalf("expected header 'success' to be 'true', got '%s'", resp.Header.Get("Success"))
}

assertFirstEvent(out, t, rawEvt)
err = <-errChan
if err != nil {
t.Fatalf("error: %s", err)
}

h.Server.Close()
tomb.Kill(nil)
Expand All @@ -500,28 +512,34 @@ func (sr *slowReader) Read(p []byte) (int, error) {
return n, nil
}

func assertFirstEvent(out chan types.Event, t *testing.T, expected string) {
func assertEvent(out chan types.Event, expected string, errChan chan error) {
readLines := []types.Event{}

select {
case event := <-out:
readLines = append(readLines, event)
case <-time.After(2 * time.Second):
break
errChan <- fmt.Errorf("timeout waiting for event")

Check failure on line 522 in pkg/acquisition/modules/http/http_test.go

View workflow job for this annotation

GitHub Actions / Build + tests

fmt.Errorf can be replaced with errors.New (perfsprint)

Check failure on line 522 in pkg/acquisition/modules/http/http_test.go

View workflow job for this annotation

GitHub Actions / Build + tests

fmt.Errorf can be replaced with errors.New (perfsprint)
return
}

if len(readLines) != 1 {
t.Fatalf("expected 1 line, got %d", len(readLines))
errChan <- fmt.Errorf("expected 1 line, got %d", len(readLines))
return
}
if readLines[0].Line.Raw != expected {
t.Fatalf(`expected %s, got '%+v'`, expected, readLines[0].Line)
errChan <- fmt.Errorf(`expected %s, got '%+v'`, expected, readLines[0].Line.Raw)
return
}
if readLines[0].Line.Src != "127.0.0.1" {
t.Fatalf("expected '127.0.0.1', got '%s'", readLines[0].Line.Src)
errChan <- fmt.Errorf("expected '127.0.0.1', got '%s'", readLines[0].Line.Src)
return
}
if readLines[0].Line.Module != "http" {
t.Fatalf("expected 'http', got '%s'", readLines[0].Line.Module)
errChan <- fmt.Errorf("expected 'http', got '%s'", readLines[0].Line.Module)
return
}
errChan <- nil
}

func TestStreamingAcquisitionTimeout(t *testing.T) {
Expand Down Expand Up @@ -624,6 +642,8 @@ tls:
}

rawEvt := `{"test": "test"}`
errChan := make(chan error)
go assertEvent(out, rawEvt, errChan)

req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddrTLS), strings.NewReader(rawEvt))
if err != nil {
Expand All @@ -638,7 +658,10 @@ tls:
t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode)
}

assertFirstEvent(out, t, rawEvt)
err = <-errChan
if err != nil {
t.Fatalf("error: %s", err)
}

h.Server.Close()
tomb.Kill(nil)
Expand Down Expand Up @@ -685,6 +708,9 @@ tls:
}

rawEvt := `{"test": "test"}`
errChan := make(chan error)
go assertEvent(out, rawEvt, errChan)

req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddrTLS), strings.NewReader(rawEvt))
if err != nil {
t.Fatalf("unable to create http request: %s", err)
Expand All @@ -699,7 +725,10 @@ tls:
t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode)
}

assertFirstEvent(out, t, rawEvt)
err = <-errChan
if err != nil {
t.Fatalf("error: %s", err)
}

h.Server.Close()
tomb.Kill(nil)
Expand All @@ -719,10 +748,13 @@ headers:
time.Sleep(1 * time.Second)

rawEvt := `{"test": "test"}`
errChan := make(chan error)
go assertEvent(out, rawEvt, errChan)
go assertEvent(out, rawEvt, errChan)

// send gzipped compressed data
client := &http.Client{}
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(rawEvt))
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(fmt.Sprintf("%s\n%s", rawEvt, rawEvt)))
if err != nil {
t.Fatalf("unable to create http request: %s", err)
}
Expand All @@ -749,7 +781,55 @@ headers:
t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode)
}

assertFirstEvent(out, t, rawEvt)
err = <-errChan
if err != nil {
t.Fatalf("error: %s", err)
}

h.Server.Close()
tomb.Kill(nil)
tomb.Wait()
}

func TestStreamingAcquisitionNDJson(t *testing.T) {
h := &HTTPSource{}
out, tomb := SetupAndRunHTTPSource(t, h, []byte(`
source: http
port: 8080
path: /test
auth_type: headers
headers:
key: test`))

time.Sleep(1 * time.Second)
rawEvt := `{"test": "test"}`

errChan := make(chan error)
go assertEvent(out, rawEvt, errChan)
go assertEvent(out, rawEvt, errChan)

client := &http.Client{}
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(fmt.Sprintf("%s\n%s\n", rawEvt, rawEvt)))

if err != nil {
t.Fatalf("unable to create http request: %s", err)
}

req.Header.Add("Key", "test")
req.Header.Add("Content-Type", "application/x-ndjson")

resp, err := client.Do(req)
if err != nil {
t.Fatalf("unable to post http request: %s", err)
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode)
}

err = <-errChan
if err != nil {
t.Fatalf("error: %s", err)
}

h.Server.Close()
tomb.Kill(nil)
Expand Down

0 comments on commit bdb0a40

Please sign in to comment.