Skip to content

Commit

Permalink
internal/http3: add request/response body transfer
Browse files Browse the repository at this point in the history
For golang/go#70914

Change-Id: I372458214fe73f8156e0ec291168b043c10221e6
Reviewed-on: https://go-review.googlesource.com/c/net/+/644915
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Auto-Submit: Damien Neil <dneil@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
  • Loading branch information
neild authored and gopherbot committed Feb 4, 2025
1 parent 145b2d7 commit 938a9fb
Show file tree
Hide file tree
Showing 5 changed files with 707 additions and 12 deletions.
142 changes: 142 additions & 0 deletions internal/http3/body.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.24

package http3

import (
"errors"
"fmt"
"io"
"sync"
)

// A bodyWriter writes a request or response body to a stream
// as a series of DATA frames.
type bodyWriter struct {
st *stream
remain int64 // -1 when content-length is not known
flush bool // flush the stream after every write
name string // "request" or "response"
}

func (w *bodyWriter) Write(p []byte) (n int, err error) {
if w.remain >= 0 && int64(len(p)) > w.remain {
return 0, &streamError{
code: errH3InternalError,
message: w.name + " body longer than specified content length",
}
}
w.st.writeVarint(int64(frameTypeData))
w.st.writeVarint(int64(len(p)))
n, err = w.st.Write(p)
if w.remain >= 0 {
w.remain -= int64(n)
}
if w.flush && err == nil {
err = w.st.Flush()
}
if err != nil {
err = fmt.Errorf("writing %v body: %w", w.name, err)
}
return n, err
}

func (w *bodyWriter) Close() error {
if w.remain > 0 {
return errors.New(w.name + " body shorter than specified content length")
}
return nil
}

// A bodyReader reads a request or response body from a stream.
type bodyReader struct {
st *stream

mu sync.Mutex
remain int64
err error
}

func (r *bodyReader) Read(p []byte) (n int, err error) {
// The HTTP/1 and HTTP/2 implementations both permit concurrent reads from a body,
// in the sense that the race detector won't complain.
// Use a mutex here to provide the same behavior.
r.mu.Lock()
defer r.mu.Unlock()
if r.err != nil {
return 0, r.err
}
defer func() {
if err != nil {
r.err = err
}
}()
if r.st.lim == 0 {
// We've finished reading the previous DATA frame, so end it.
if err := r.st.endFrame(); err != nil {
return 0, err
}
}
// Read the next DATA frame header,
// if we aren't already in the middle of one.
for r.st.lim < 0 {
ftype, err := r.st.readFrameHeader()
if err == io.EOF && r.remain > 0 {
return 0, &streamError{
code: errH3MessageError,
message: "body shorter than content-length",
}
}
if err != nil {
return 0, err
}
switch ftype {
case frameTypeData:
if r.remain >= 0 && r.st.lim > r.remain {
return 0, &streamError{
code: errH3MessageError,
message: "body longer than content-length",
}
}
// Fall out of the loop and process the frame body below.
case frameTypeHeaders:
// This HEADERS frame contains the message trailers.
if r.remain > 0 {
return 0, &streamError{
code: errH3MessageError,
message: "body shorter than content-length",
}
}
// TODO: Fill in Request.Trailer.
if err := r.st.discardFrame(); err != nil {
return 0, err
}
return 0, io.EOF
default:
if err := r.st.discardUnknownFrame(ftype); err != nil {
return 0, err
}
}
}
// We are now reading the content of a DATA frame.
// Fill the read buffer or read to the end of the frame,
// whichever comes first.
if int64(len(p)) > r.st.lim {
p = p[:r.st.lim]
}
n, err = r.st.Read(p)
if r.remain > 0 {
r.remain -= int64(n)
}
return n, err
}

func (r *bodyReader) Close() error {
// Unlike the HTTP/1 and HTTP/2 body readers (at the time of this comment being written),
// calling Close concurrently with Read will interrupt the read.
r.st.stream.CloseRead()
return nil
}
7 changes: 7 additions & 0 deletions internal/http3/http3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,10 @@ func unhex(s string) []byte {
}
return b
}

// testReader implements io.Reader.
type testReader struct {
readFunc func([]byte) (int, error)
}

func (r testReader) Read(p []byte) (n int, err error) { return r.readFunc(p) }
124 changes: 112 additions & 12 deletions internal/http3/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,74 @@
package http3

import (
"errors"
"io"
"net/http"
"strconv"
"sync"

"golang.org/x/net/internal/httpcommon"
)

type roundTripState struct {
cc *ClientConn
st *stream

// Request body, provided by the caller.
onceCloseReqBody sync.Once
reqBody io.ReadCloser

reqBodyWriter bodyWriter

// Response.Body, provided to the caller.
respBody bodyReader

errOnce sync.Once
err error
}

// abort terminates the RoundTrip.
// It returns the first fatal error encountered by the RoundTrip call.
func (rt *roundTripState) abort(err error) error {
rt.errOnce.Do(func() {
rt.err = err
switch e := err.(type) {
case *connectionError:
rt.cc.abort(e)
case *streamError:
rt.st.stream.CloseRead()
rt.st.stream.Reset(uint64(e.code))
default:
rt.st.stream.CloseRead()
rt.st.stream.Reset(uint64(errH3NoError))
}
})
return rt.err
}

// closeReqBody closes the Request.Body, at most once.
func (rt *roundTripState) closeReqBody() {
if rt.reqBody != nil {
rt.onceCloseReqBody.Do(func() {
rt.reqBody.Close()
})
}
}

// RoundTrip sends a request on the connection.
func (cc *ClientConn) RoundTrip(req *http.Request) (_ *http.Response, err error) {
// Each request gets its own QUIC stream.
st, err := newConnStream(req.Context(), cc.qconn, streamTypeRequest)
if err != nil {
return nil, err
}
rt := &roundTripState{
cc: cc,
st: st,
}
defer func() {
switch e := err.(type) {
case nil:
case *connectionError:
cc.abort(e)
case *streamError:
st.stream.CloseRead()
st.stream.Reset(uint64(e.code))
default:
st.stream.CloseRead()
st.stream.Reset(uint64(errH3NoError))
if err != nil {
err = rt.abort(err)
}
}()

Expand Down Expand Up @@ -64,7 +107,13 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (_ *http.Response, err error)
}

if encr.HasBody {
// TODO: Send the request body.
// TODO: Defer sending the request body when "Expect: 100-continue" is set.
rt.reqBody = req.Body
rt.reqBodyWriter.st = st
rt.reqBodyWriter.remain = httpcommon.ActualContentLength(req)
rt.reqBodyWriter.flush = true
rt.reqBodyWriter.name = "request"
go copyRequestBody(rt)
}

// Read the response headers.
Expand All @@ -91,14 +140,16 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (_ *http.Response, err error)
if err != nil {
return nil, err
}
rt.respBody.st = st
rt.respBody.remain = contentLength
resp := &http.Response{
Proto: "HTTP/3.0",
ProtoMajor: 3,
Header: h,
StatusCode: statusCode,
Status: strconv.Itoa(statusCode) + " " + http.StatusText(statusCode),
ContentLength: contentLength,
Body: io.NopCloser(nil), // TODO: read the response body
Body: (*transportResponseBody)(rt),
}
// TODO: Automatic Content-Type: gzip decoding.
return resp, nil
Expand All @@ -114,6 +165,55 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (_ *http.Response, err error)
}
}

func copyRequestBody(rt *roundTripState) {
defer rt.closeReqBody()
_, err := io.Copy(&rt.reqBodyWriter, rt.reqBody)
if closeErr := rt.reqBodyWriter.Close(); err == nil {
err = closeErr
}
if err != nil {
// Something went wrong writing the body.
rt.abort(err)
} else {
// We wrote the whole body.
rt.st.stream.CloseWrite()
}
}

// transportResponseBody is the Response.Body returned by RoundTrip.
type transportResponseBody roundTripState

// Read is Response.Body.Read.
func (b *transportResponseBody) Read(p []byte) (n int, err error) {
return b.respBody.Read(p)
}

var errRespBodyClosed = errors.New("response body closed")

// Close is Response.Body.Close.
// Closing the response body is how the caller signals that they're done with a request.
func (b *transportResponseBody) Close() error {
rt := (*roundTripState)(b)
// Close the request body, which should wake up copyRequestBody if it's
// currently blocked reading the body.
rt.closeReqBody()
// Close the request stream, since we're done with the request.
// Reset closes the sending half of the stream.
rt.st.stream.Reset(uint64(errH3NoError))
// respBody.Close is responsible for closing the receiving half.
err := rt.respBody.Close()
if err == nil {
err = errRespBodyClosed
}
err = rt.abort(err)
if err == errRespBodyClosed {
// No other errors occurred before closing Response.Body,
// so consider this a successful request.
return nil
}
return err
}

func parseResponseContentLength(method string, statusCode int, h http.Header) (int64, error) {
clens := h["Content-Length"]
if len(clens) == 0 {
Expand Down
Loading

0 comments on commit 938a9fb

Please sign in to comment.