From 9551d9544da2283eff0eec0be127daa435b2aa6f Mon Sep 17 00:00:00 2001 From: Vladimir Shteinman Date: Sat, 18 May 2019 17:04:30 +0300 Subject: [PATCH 1/2] Option for immediate header flush --- http.go | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/http.go b/http.go index 79241db19c..1557f6acaa 100644 --- a/http.go +++ b/http.go @@ -62,7 +62,8 @@ type Response struct { // Response header // // Copying Header by value is forbidden. Use pointer to Header instead. - Header ResponseHeader + Header ResponseHeader + immediateHeaderFlush bool bodyStream io.Reader w responseBodyWriter @@ -150,6 +151,16 @@ func (req *Request) SetConnectionClose() { req.Header.SetConnectionClose() } +// ImmediateHeaderFlush returns whether headers will be flushed without waiting for body bytes +func (resp *Response) ImmediateHeaderFlush() bool { + return resp.immediateHeaderFlush +} + +// SetImmediateHeaderFlush causes headers to be flushed without waiting for body bytes +func (resp *Response) SetImmediateHeaderFlush() { + resp.immediateHeaderFlush = true +} + // SendFile registers file on the given path to be used as response body // when Write is called. // @@ -870,6 +881,7 @@ func (resp *Response) Reset() { resp.SkipBody = false resp.raddr = nil resp.laddr = nil + resp.immediateHeaderFlush = false } func (resp *Response) resetSkipHeader() { @@ -1455,12 +1467,22 @@ func (resp *Response) writeBodyStream(w *bufio.Writer, sendBody bool) error { } if contentLength >= 0 { if err = resp.Header.Write(w); err == nil && sendBody { - err = writeBodyFixedSize(w, resp.bodyStream, int64(contentLength)) + if resp.immediateHeaderFlush { + err = w.Flush() + } + if err == nil { + err = writeBodyFixedSize(w, resp.bodyStream, int64(contentLength)) + } } } else { resp.Header.SetContentLength(-1) if err = resp.Header.Write(w); err == nil && sendBody { - err = writeBodyChunked(w, resp.bodyStream) + if resp.immediateHeaderFlush { + err = w.Flush() + } + if err == nil { + err = writeBodyChunked(w, resp.bodyStream) + } } } err1 := resp.closeBodyStream() From 74ad0f8e9b9d8e36d022b5d73173430f93545192 Mon Sep 17 00:00:00 2001 From: Vladimir Shteinman Date: Wed, 22 May 2019 23:23:02 +0300 Subject: [PATCH 2/2] Export struct var and add specs --- http.go | 23 ++++----- http_test.go | 130 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+), 15 deletions(-) diff --git a/http.go b/http.go index 1557f6acaa..ce30f8c977 100644 --- a/http.go +++ b/http.go @@ -62,8 +62,11 @@ type Response struct { // Response header // // Copying Header by value is forbidden. Use pointer to Header instead. - Header ResponseHeader - immediateHeaderFlush bool + Header ResponseHeader + + // Flush headers as soon as possible without waiting for first body bytes. + // Relevant for bodyStream only. + ImmediateHeaderFlush bool bodyStream io.Reader w responseBodyWriter @@ -151,16 +154,6 @@ func (req *Request) SetConnectionClose() { req.Header.SetConnectionClose() } -// ImmediateHeaderFlush returns whether headers will be flushed without waiting for body bytes -func (resp *Response) ImmediateHeaderFlush() bool { - return resp.immediateHeaderFlush -} - -// SetImmediateHeaderFlush causes headers to be flushed without waiting for body bytes -func (resp *Response) SetImmediateHeaderFlush() { - resp.immediateHeaderFlush = true -} - // SendFile registers file on the given path to be used as response body // when Write is called. // @@ -881,7 +874,7 @@ func (resp *Response) Reset() { resp.SkipBody = false resp.raddr = nil resp.laddr = nil - resp.immediateHeaderFlush = false + resp.ImmediateHeaderFlush = false } func (resp *Response) resetSkipHeader() { @@ -1467,7 +1460,7 @@ func (resp *Response) writeBodyStream(w *bufio.Writer, sendBody bool) error { } if contentLength >= 0 { if err = resp.Header.Write(w); err == nil && sendBody { - if resp.immediateHeaderFlush { + if resp.ImmediateHeaderFlush { err = w.Flush() } if err == nil { @@ -1477,7 +1470,7 @@ func (resp *Response) writeBodyStream(w *bufio.Writer, sendBody bool) error { } else { resp.Header.SetContentLength(-1) if err = resp.Header.Write(w); err == nil && sendBody { - if resp.immediateHeaderFlush { + if resp.ImmediateHeaderFlush { err = w.Flush() } if err == nil { diff --git a/http_test.go b/http_test.go index cd31f87cb1..03aa52d36c 100644 --- a/http_test.go +++ b/http_test.go @@ -1921,3 +1921,133 @@ func TestResponseRawBodyCopyTo(t *testing.T) { testResponseCopyTo(t, &resp) } + +type testReader struct { + read chan (int) + cb chan (struct{}) +} + +func (r *testReader) Read(b []byte) (int, error) { + read := <-r.read + + if read == -1 { + return 0, io.EOF + } + + r.cb <- struct{}{} + + for i := 0; i < read; i++ { + b[i] = 'x' + } + + return read, nil +} + +func TestResponseImmediateHeaderFlushRegressionFixedLength(t *testing.T) { + var r Response + + expectedS := "aaabbbccc" + buf := bytes.NewBufferString(expectedS) + r.SetBodyStream(buf, len(expectedS)) + r.ImmediateHeaderFlush = true + + testBodyWriteTo(t, &r, expectedS, false) +} + +func TestResponseImmediateHeaderFlushRegressionChunked(t *testing.T) { + var r Response + + expectedS := "aaabbbccc" + buf := bytes.NewBufferString(expectedS) + r.SetBodyStream(buf, -1) + r.ImmediateHeaderFlush = true + + testBodyWriteTo(t, &r, expectedS, false) +} + +func TestResponseImmediateHeaderFlushFixedLength(t *testing.T) { + var r Response + + r.ImmediateHeaderFlush = true + + ch := make(chan int) + cb := make(chan struct{}) + + buf := &testReader{read: ch, cb: cb} + + r.SetBodyStream(buf, 3) + + b := []byte{} + w := bytes.NewBuffer(b) + bb := bufio.NewWriter(w) + + bw := &r + + waitForIt := make(chan struct{}) + + go func() { + if err := bw.Write(bb); err != nil { + t.Fatalf("unexpected error: %s", err) + } + waitForIt <- struct{}{} + }() + + ch <- 3 + + if !strings.Contains(w.String(), "Content-Length: 3") { + t.Fatalf("Expected headers to be flushed") + } + + if strings.Contains(w.String(), "xxx") { + t.Fatalf("Did not expext body to be written yet") + } + + <-cb + ch <- -1 + + <-waitForIt +} + +func TestResponseImmediateHeaderFlushChunked(t *testing.T) { + var r Response + + r.ImmediateHeaderFlush = true + + ch := make(chan int) + cb := make(chan struct{}) + + buf := &testReader{read: ch, cb: cb} + + r.SetBodyStream(buf, -1) + + b := []byte{} + w := bytes.NewBuffer(b) + bb := bufio.NewWriter(w) + + bw := &r + + waitForIt := make(chan struct{}) + + go func() { + if err := bw.Write(bb); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + waitForIt <- struct{}{} + }() + + ch <- 3 + + if !strings.Contains(w.String(), "Transfer-Encoding: chunked") { + t.Fatalf("Expected headers to be flushed") + } + + if strings.Contains(w.String(), "xxx") { + t.Fatalf("Did not expext body to be written yet") + } + + <-cb + ch <- -1 + + <-waitForIt +}