Skip to content

Commit

Permalink
Fix buffer option related condition which prevents the proxy block to…
Browse files Browse the repository at this point in the history
… stream a backend response #766
  • Loading branch information
malud committed Nov 22, 2023
1 parent 1185c27 commit a50a0fc
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 9 deletions.
2 changes: 1 addition & 1 deletion eval/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func newBerespValues(ctx context.Context, readBody bool, beresp *http.Response)
// beresp body is not referenced and can be closed
// prevent resource leak, free connection
_ = beresp.Body.Close()
} else if !ws {
} else if !ws && (bufferOption&BufferNone) != BufferNone {
parseSetRespBody(beresp)
}
}
Expand Down
44 changes: 36 additions & 8 deletions internal/test/test_backend.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package test

import (
"bytes"
"crypto/tls"
"encoding/json"
"io"
Expand Down Expand Up @@ -70,20 +71,21 @@ func NewExpiredBackend() (*Backend, *server.SelfSignedCertificate) {

func registerHTTPHandler(b *Backend) {
// test handler
b.mux.HandleFunc("/anything", createAnythingHandler(http.StatusOK))
b.mux.HandleFunc("/anything/", createAnythingHandler(http.StatusOK))
b.mux.HandleFunc("/", createAnythingHandler(http.StatusNotFound))
b.mux.HandleFunc("/ws", echo)
b.mux.HandleFunc("/redirect", redirect)
b.mux.HandleFunc("/pdf", pdf)
b.mux.HandleFunc("/small", small)
b.mux.HandleFunc("/health", health)
b.mux.HandleFunc("/jwks.json", jwks)
b.mux.HandleFunc("/.well-known/openid-configuration", oidc)
b.mux.HandleFunc("/anything", createAnythingHandler(http.StatusOK))
b.mux.HandleFunc("/anything/", createAnythingHandler(http.StatusOK))
b.mux.HandleFunc("/error", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
})
b.mux.HandleFunc("/health", health)
b.mux.HandleFunc("/jwks.json", jwks)
b.mux.HandleFunc("/pdf", pdf)
b.mux.HandleFunc("/redirect", redirect)
b.mux.HandleFunc("/reflect", reflect)
b.mux.HandleFunc("/reflectDelay", reflectDelay)
b.mux.HandleFunc("/small", small)
b.mux.HandleFunc("/ws", echo)
}

func createAnythingHandler(status int) func(rw http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -278,3 +280,29 @@ func reflect(rw http.ResponseWriter, req *http.Request) {
}
_, _ = io.Copy(rw, req.Body)
}

func reflectDelay(rw http.ResponseWriter, req *http.Request) {
body, err := io.ReadAll(req.Body)
if err != nil {
rw.WriteHeader(http.StatusInternalServerError)
return
}
_ = req.Body.Close()

bb := bytes.NewBuffer(body)
for bb.Len() > 0 {
chunk := make([]byte, 1024)
n, readErr := bb.Read(chunk)
if readErr == io.EOF {
if n == 0 {
break
}
}
_, _ = rw.Write(chunk[:n])
if fl, ok := rw.(http.Flusher); ok {
fl.Flush()
}

time.Sleep(time.Millisecond * 100) // related to backend flush writer default interval
}
}
69 changes: 69 additions & 0 deletions server/http_proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package server_test

import (
"bytes"
"crypto/rand"
"io"
"net/http"
"testing"
"time"

"github.com/avenga/couper/internal/test"
)

func TestHTTPProxy_Stream(t *testing.T) {
helper := test.New(t)

shutdown, hook := newCouper("testdata/integration/proxy/01_couper.hcl", helper)
defer shutdown()

randomBytes := make([]byte, 64*1024) // doubled amount of the proxy byte buffer (32k)
_, err := rand.Read(randomBytes)
helper.Must(err)

outreq, err := http.NewRequest(http.MethodPost, "http://stream.me:8080/", bytes.NewBuffer(randomBytes))
helper.Must(err)

client := newClient()
time.Sleep(time.Second)
for _, e := range hook.AllEntries() {
t.Log(e.String())
}

res, err := client.Do(outreq)
helper.Must(err)

if res.StatusCode != http.StatusOK {
t.Errorf("expected status OK, got %d", res.StatusCode)
}

lastRead := time.Now()
lastReadAv := time.Duration(0)
totalBytes := 0
readCount := 0
for {
lrd := time.Since(lastRead)
lastReadAv += lrd
bf := make([]byte, 1024)
n, rerr := res.Body.Read(bf)
totalBytes += n
if rerr == io.EOF {
break
}

lastRead = time.Now()
readCount += 1
}
helper.Must(res.Body.Close())

if totalBytes != 65536 {
t.Errorf("expected 64k bytes, got: %d", totalBytes)
}

// lastReadAv is within nanosecond range (<100ns),
// should be greater than 0.3 millisecond range while streaming since the backend delays the response chunks.
if lastReadAv/time.Duration(readCount) < time.Nanosecond*300 {
t.Errorf("expected slower read times with delayed streaming, got an average of: %s", lastReadAv/time.Duration(readCount))
}
t.Log(lastReadAv / time.Duration(readCount))
}
13 changes: 13 additions & 0 deletions server/testdata/integration/proxy/01_couper.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
server {
endpoint "/buffer" {
proxy {
url = "${env.COUPER_TEST_BACKEND_ADDR}/json"
}
}

endpoint "/" {
proxy {
url = "${env.COUPER_TEST_BACKEND_ADDR}/reflectDelay"
}
}
}

0 comments on commit a50a0fc

Please sign in to comment.