Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create memhttp package to debug flaky testcases #594

Merged
merged 23 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4c43e4b
Debug flaky testcases with in memory network
emcfarlane Sep 18, 2023
1f1df5d
Fix lint
emcfarlane Sep 18, 2023
225aa0d
Add TODO for flaky test
emcfarlane Sep 18, 2023
27410bd
Use local networking for benchmarks
emcfarlane Sep 18, 2023
facfa53
Merge branch 'main' into ed/memtest
emcfarlane Sep 19, 2023
877b4df
Create memhttp and memhttp test packages
emcfarlane Oct 6, 2023
932533f
Fix race on RegisterShutdown
emcfarlane Oct 10, 2023
c057a62
Fix transport desc
emcfarlane Oct 10, 2023
f6a2bd7
Fix feedback
emcfarlane Oct 11, 2023
40594b7
Fix description
emcfarlane Oct 11, 2023
b9fccc6
Add Cleanup method test servers
emcfarlane Oct 13, 2023
dd98dd4
Revert moving options
emcfarlane Oct 13, 2023
716794b
Ensure response errors are reported consistently
emcfarlane Oct 22, 2023
ccd39d4
Document BlockUntilResponseReady behaviour
emcfarlane Oct 22, 2023
91afa1e
Ensure CloseWrite is called
emcfarlane Oct 23, 2023
ea743b3
Feedback remove changes to duplexHTTPCall
emcfarlane Oct 23, 2023
628915a
Add comment to clarify behaviour
emcfarlane Oct 23, 2023
23dc2da
Restrict log errors to New|Logger|Lshortfile
emcfarlane Oct 23, 2023
2ee0def
Document response close error handling
emcfarlane Oct 25, 2023
00d7f6a
Fix and document Close behaviour for pipe
emcfarlane Oct 25, 2023
758f889
Move responseBodyReady to group what it protects
emcfarlane Oct 25, 2023
fa4a554
Add CloseResponse checks in TestServer
emcfarlane Oct 25, 2023
7b062d1
Merge branch 'main' into ed/memtest
emcfarlane Nov 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions client_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,8 @@ func TestClientPeer(t *testing.T) {
err = clientStream.Send(&pingv1.SumRequest{})
assert.Nil(t, err)
// server streaming
serverStream, err := client.CountUp(ctx, connect.NewRequest(&pingv1.CountUpRequest{}))
serverStream, err := client.CountUp(ctx, connect.NewRequest(&pingv1.CountUpRequest{Number: 1}))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change relevant? Why would the request data make this flaky or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It causes the server to error with invalid_argument: number must be positive: got 0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. But was that not true before? Why wasn't this a problem before this PR? (I'm trying to understand why change it now.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is checking in the interceptor that the peer address and peer protocol is set. It's testing each request style by sending an empty but valid request, however server stream is the only one that will cause the server handler to error. This can now report the server error instead of success. I don't think this is intended behaviour of the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not setting Number will cause the following only for grpcweb:

--- FAIL: TestClientPeer (0.00s)
    --- FAIL: TestClientPeer/grpcweb (0.07s)
        client_ext_test.go:112:
            assertion:  assert.Nil
            got:        unknown: http2: response body closed

The grpcweb error response is a header only response, need to investigate why this error is returned on a no body.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now flaky with:

            client_ext_test.go:113:
                assertion:      assert.Nil
                got:    unknown: io: read/write on closed pipe

Discard is reading from a closed response which is causing an io.ErrClosedPipe. This is probably due to the new memhttp. Need to investigate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now flaky with:

By "now", do you mean after the last commit you pushed, which removed calls to response.Body.Close()?

Also, in that latest commit, who is the caller that is now responsible for calling Close? Is this done when StreamClientConn.CloseResponse() is called? Is there a chance some code path is failing to call that? (In particular, I'm suspicious why this would only happen for gRPC-Web and not others.)

Discard is reading from a closed response which is causing an io.ErrClosedPipe.

It seems like discard should be resilient against io.ErrClosedPipe and replace it with io.EOF when observed, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's due to the change in closing the pipe writer on Reader instead of Writer. net.Pipe will close both halves but the error is given on the opposite end. So closing the Reader made the Writer return nice io.EOF errors but the Reader to receive io.ErrClosedPipe. HTTP2 libraries set the request error to the response here

I'll need to write more testing around closing, and create an issue for testing with different latencies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP2 sets the error on the response then here

Copy link
Contributor Author

@emcfarlane emcfarlane Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discard should never see io.ErrClosedPipe, however I've added context error checks to avoid issues with cancelation errors bubbling up to CloseResponse() errors. These are tested in TestServer tests.

Working and passing with -count=100 locally, can't see any more flaky test issues. Could you please re-review.

t.Cleanup(func() {
// TODO(emcfarlane): debug flaky test close with error:
// "unknown: io: read/write on closed pipe"
assert.Nil(t, serverStream.Close())
})
assert.Nil(t, err)
Expand Down
14 changes: 8 additions & 6 deletions connect_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,15 +774,15 @@ func TestBidiRequiresHTTP2(t *testing.T) {
server.URL(),
)
stream := client.CumSum(context.Background())
// Stream creates an async request, can error on Send or Receive.
err := stream.Send(&pingv1.CumSumRequest{})
if err == nil {
assert.Nil(t, stream.CloseRequest())
_, err = stream.Receive()
if err := stream.Send(&pingv1.CumSumRequest{}); err != nil {
assert.ErrorIs(t, err, io.EOF)
}
assert.Nil(t, stream.CloseRequest())
emcfarlane marked this conversation as resolved.
Show resolved Hide resolved
_, err := stream.Receive()
assert.NotNil(t, err)
var connectErr *connect.Error
assert.True(t, errors.As(err, &connectErr))
t.Log(err)
assert.Equal(t, connectErr.Code(), connect.CodeUnimplemented)
assert.True(
t,
Expand Down Expand Up @@ -1988,13 +1988,14 @@ func TestBidiOverHTTP1(t *testing.T) {
server.URL(),
)
stream := client.CumSum(context.Background())
// Stream creates an async request, can error on Send or Receive.
if err := stream.Send(&pingv1.CumSumRequest{Number: 2}); err != nil {
assert.ErrorIs(t, err, io.EOF)
}
_, err := stream.Receive()
assert.NotNil(t, err)
assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown)
assert.Equal(t, err.Error(), "unknown: HTTP status 505 HTTP Version Not Supported")
assert.True(t, strings.HasSuffix(err.Error(), "HTTP status 505 HTTP Version Not Supported"))
emcfarlane marked this conversation as resolved.
Show resolved Hide resolved
assert.Nil(t, stream.CloseRequest())
assert.Nil(t, stream.CloseResponse())
}
Expand Down Expand Up @@ -2342,6 +2343,7 @@ func (p *pluggablePingServer) CumSum(

func failNoHTTP2(tb testing.TB, stream *connect.BidiStreamForClient[pingv1.CumSumRequest, pingv1.CumSumResponse]) {
tb.Helper()

if err := stream.Send(&pingv1.CumSumRequest{}); err != nil {
assert.ErrorIs(tb, err, io.EOF)
assert.Equal(tb, connect.CodeOf(err), connect.CodeUnknown)
Expand Down
95 changes: 37 additions & 58 deletions duplex_http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,11 @@ type duplexHTTPCall struct {
requestBodyReader *io.PipeReader
requestBodyWriter *io.PipeWriter

sendRequestOnce sync.Once
responseReady chan struct{}
request *http.Request
response *http.Response

errMu sync.Mutex
err error
requestSent bool
emcfarlane marked this conversation as resolved.
Show resolved Hide resolved
responseReady sync.WaitGroup
emcfarlane marked this conversation as resolved.
Show resolved Hide resolved
request *http.Request
response *http.Response
responseErr error
emcfarlane marked this conversation as resolved.
Show resolved Hide resolved
}

func newDuplexHTTPCall(
Expand Down Expand Up @@ -80,24 +78,23 @@ func newDuplexHTTPCall(
Body: pipeReader,
Host: url.Host,
}).WithContext(ctx)
return &duplexHTTPCall{
call := &duplexHTTPCall{
ctx: ctx,
httpClient: httpClient,
streamType: spec.StreamType,
requestBodyReader: pipeReader,
requestBodyWriter: pipeWriter,
request: request,
responseReady: make(chan struct{}),
}
call.responseReady.Add(1)
return call
}

// Write to the request body. Returns an error wrapping io.EOF after SetError
// is called.
// Write to the request body.
func (d *duplexHTTPCall) Write(data []byte) (int, error) {
d.ensureRequestMade()
// Before we send any data, check if the context has been canceled.
if err := d.ctx.Err(); err != nil {
d.SetError(err)
return 0, wrapIfContextError(err)
}
// It's safe to write to this side of the pipe while net/http concurrently
Expand Down Expand Up @@ -157,14 +154,12 @@ func (d *duplexHTTPCall) SetMethod(method string) {
func (d *duplexHTTPCall) Read(data []byte) (int, error) {
// First, we wait until we've gotten the response headers and established the
// server-to-client side of the stream.
d.BlockUntilResponseReady()
if err := d.getError(); err != nil {
if err := d.BlockUntilResponseReady(); err != nil {
// The stream is already closed or corrupted.
return 0, err
}
// Before we read, check if the context has been canceled.
if err := d.ctx.Err(); err != nil {
d.SetError(err)
return 0, wrapIfContextError(err)
}
if d.response == nil {
Expand All @@ -175,7 +170,7 @@ func (d *duplexHTTPCall) Read(data []byte) (int, error) {
}

func (d *duplexHTTPCall) CloseRead() error {
d.BlockUntilResponseReady()
d.responseReady.Wait()
emcfarlane marked this conversation as resolved.
Show resolved Hide resolved
if d.response == nil {
return nil
}
Expand All @@ -188,7 +183,9 @@ func (d *duplexHTTPCall) CloseRead() error {

// ResponseStatusCode is the response's HTTP status code.
func (d *duplexHTTPCall) ResponseStatusCode() (int, error) {
d.BlockUntilResponseReady()
if err := d.BlockUntilResponseReady(); err != nil {
return 0, err
}
if d.response == nil {
return 0, fmt.Errorf("nil response from %v", d.request.URL)
}
Expand All @@ -197,7 +194,7 @@ func (d *duplexHTTPCall) ResponseStatusCode() (int, error) {

// ResponseHeader returns the response HTTP headers.
func (d *duplexHTTPCall) ResponseHeader() http.Header {
d.BlockUntilResponseReady()
_ = d.BlockUntilResponseReady()
if d.response != nil {
return d.response.Header
}
Expand All @@ -206,56 +203,39 @@ func (d *duplexHTTPCall) ResponseHeader() http.Header {

// ResponseTrailer returns the response HTTP trailers.
func (d *duplexHTTPCall) ResponseTrailer() http.Header {
d.BlockUntilResponseReady()
_ = d.BlockUntilResponseReady()
if d.response != nil {
return d.response.Trailer
}
return make(http.Header)
}

// SetError stores any error encountered processing the response. All
// subsequent calls to Read return this error, and all subsequent calls to
// Write return an error wrapping io.EOF. It's safe to call concurrently with
// any other method.
func (d *duplexHTTPCall) SetError(err error) {
d.errMu.Lock()
if d.err == nil {
d.err = wrapIfContextError(err)
}
// Closing the read side of the request body pipe acquires an internal lock,
// so we want to scope errMu's usage narrowly and avoid defer.
d.errMu.Unlock()

// We've already hit an error, so we should stop writing to the request body.
// It's safe to call Close more than once and/or concurrently (calls after
// the first are no-ops), so it's okay for us to call this even though
// net/http sometimes closes the reader too.
//
// It's safe to ignore the returned error here. Under the hood, Close calls
// CloseWithError, which is documented to always return nil.
_ = d.requestBodyReader.Close()
}

// SetValidateResponse sets the response validation function. The function runs
// in a background goroutine.
func (d *duplexHTTPCall) SetValidateResponse(validate func(*http.Response) *Error) {
d.validateResponse = validate
}

func (d *duplexHTTPCall) BlockUntilResponseReady() {
<-d.responseReady
func (d *duplexHTTPCall) BlockUntilResponseReady() error {
d.responseReady.Wait()
return d.responseErr
}

// ensureRequestMade sends the request headers and starts the response stream.
// It is not safe to call this concurrently. Write and CloseWrite call this but
// ensure that they're not called concurrently.
func (d *duplexHTTPCall) ensureRequestMade() {
d.sendRequestOnce.Do(func() {
go d.makeRequest()
})
if d.requestSent {
return // already sent
}
d.requestSent = true
go d.makeRequest()
}

func (d *duplexHTTPCall) makeRequest() {
// This runs concurrently with Write and CloseWrite. Read and CloseRead wait
// on d.responseReady, so we can't race with them.
defer close(d.responseReady)
defer d.responseReady.Done()

// Promote the header Host to the request object.
if host := d.request.Header.Get(headerHost); len(host) > 0 {
Expand All @@ -276,33 +256,32 @@ func (d *duplexHTTPCall) makeRequest() {
if _, ok := asError(err); !ok {
err = NewError(CodeUnavailable, err)
}
d.SetError(err)
d.responseErr = err
d.requestBodyReader.CloseWithError(io.EOF)
return
}
d.response = response
if err := d.validateResponse(response); err != nil {
d.SetError(err)
d.responseErr = err
d.response.Body.Close()
d.requestBodyReader.CloseWithError(io.EOF)
return
}
if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 {
// If we somehow dialed an HTTP/1.x server, fail with an explicit message
// rather than returning a more cryptic error later on.
d.SetError(errorf(
d.responseErr = errorf(
CodeUnimplemented,
"response from %v is HTTP/%d.%d: bidi streams require at least HTTP/2",
d.request.URL,
response.ProtoMajor,
response.ProtoMinor,
))
)
d.response.Body.Close()
d.requestBodyReader.CloseWithError(io.EOF)
}
}

func (d *duplexHTTPCall) getError() error {
d.errMu.Lock()
defer d.errMu.Unlock()
return d.err
}

// See: https://cs.opensource.google/go/go/+/refs/tags/go1.20.1:src/net/http/clone.go;l=22-33
func cloneURL(oldURL *url.URL) *url.URL {
if oldURL == nil {
Expand Down
9 changes: 5 additions & 4 deletions protocol_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@
func (cc *connectUnaryClientConn) CloseRequest() error {
return cc.duplexCall.CloseWrite()
}

Check failure on line 479 in protocol_connect.go

View workflow job for this annotation

GitHub Actions / ci (1.21.x)

Error return value of `cc.duplexCall.BlockUntilResponseReady` is not checked (errcheck)
func (cc *connectUnaryClientConn) Receive(msg any) error {
cc.duplexCall.BlockUntilResponseReady()
if err := cc.unmarshaler.Unmarshal(msg); err != nil {
Expand All @@ -484,12 +484,12 @@
}
return nil // must be a literal nil: nil *Error is a non-nil error
}

Check failure on line 487 in protocol_connect.go

View workflow job for this annotation

GitHub Actions / ci (1.21.x)

Error return value of `cc.duplexCall.BlockUntilResponseReady` is not checked (errcheck)
func (cc *connectUnaryClientConn) ResponseHeader() http.Header {
cc.duplexCall.BlockUntilResponseReady()
return cc.responseHeader
}

Check failure on line 492 in protocol_connect.go

View workflow job for this annotation

GitHub Actions / ci (1.21.x)

Error return value of `cc.duplexCall.BlockUntilResponseReady` is not checked (errcheck)
func (cc *connectUnaryClientConn) ResponseTrailer() http.Header {
cc.duplexCall.BlockUntilResponseReady()
return cc.responseTrailer
Expand Down Expand Up @@ -589,7 +589,9 @@
}

func (cc *connectStreamingClientConn) Receive(msg any) error {
cc.duplexCall.BlockUntilResponseReady()
if err := cc.duplexCall.BlockUntilResponseReady(); err != nil {
return err
}
err := cc.unmarshaler.Unmarshal(msg)
if err == nil {
return nil
Expand All @@ -603,7 +605,6 @@
// error.
serverErr.meta = cc.responseHeader.Clone()
mergeHeaders(serverErr.meta, cc.responseTrailer)
cc.duplexCall.SetError(serverErr)
return serverErr
}
// If the error is EOF but not from a last message, we want to return
Expand All @@ -614,8 +615,8 @@
// There's no error in the trailers, so this was probably an error
// converting the bytes to a message, an error reading from the network, or
// just an EOF. We're going to return it to the user, but we also want to
// setResponseError so Send errors out.
cc.duplexCall.SetError(err)
// close the writer so Send errors out.
_ = cc.duplexCall.CloseWrite()
return err
}

Expand Down
13 changes: 7 additions & 6 deletions protocol_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,9 @@ func (cc *grpcClientConn) CloseRequest() error {
}

func (cc *grpcClientConn) Receive(msg any) error {
cc.duplexCall.BlockUntilResponseReady()
if err := cc.duplexCall.BlockUntilResponseReady(); err != nil {
return err
}
err := cc.unmarshaler.Unmarshal(msg)
if err == nil {
return nil
Expand Down Expand Up @@ -409,23 +411,22 @@ func (cc *grpcClientConn) Receive(msg any) error {
// the stream has ended, Receive must return an error.
serverErr.meta = cc.responseHeader.Clone()
mergeHeaders(serverErr.meta, cc.responseTrailer)
cc.duplexCall.SetError(serverErr)
return serverErr
}
// This was probably an error converting the bytes to a message or an error
// reading from the network. We're going to return it to the
// user, but we also want to setResponseError so Send errors out.
cc.duplexCall.SetError(err)
// user, but we also want to close writes so Send errors out.
_ = cc.duplexCall.CloseWrite()
return err
}

func (cc *grpcClientConn) ResponseHeader() http.Header {
cc.duplexCall.BlockUntilResponseReady()
_ = cc.duplexCall.BlockUntilResponseReady()
return cc.responseHeader
}

func (cc *grpcClientConn) ResponseTrailer() http.Header {
cc.duplexCall.BlockUntilResponseReady()
_ = cc.duplexCall.BlockUntilResponseReady()
return cc.responseTrailer
}

Expand Down
Loading