From 039b24b6251ccdce1094cd8e581fd38b357aa38e Mon Sep 17 00:00:00 2001 From: "Bryan C. Mills" Date: Tue, 1 Nov 2022 12:38:53 -0400 Subject: [PATCH] internal/jsonrpc2_v2: initiate shutdown when the Writer breaks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prior to this CL we already shut down a jsonrpc2_v2.Conn when its Reader breaks, which we expect to be the common shutdown path. However, with certain kinds of connections (notably those over stdin+stdout), it is possible for the Writer side to fail while the Reader remains working. If the Writer has failed, we have no way to return the required Response messages for incoming calls, nor to write new Request messages of our own. Since we have no way to return a response, we will now mark those incoming calls as canceled. However, even if the Writer has failed we may still be able to read the responses for any outgoing calls that are already in flight. When our in-flight calls complete, we could in theory even continue to process Notification messages from the Reader; however, those are unlikely to be useful with half the connection broken. It seems more helpful — and less surprising — to go ahead and shut down the connection completely when it becomes idle. This is a redo of CL 446315, with additional fixes for bugs exposed on the -race builders and some extra code cleanup from the process of diagnosing those bugs. Updates golang/go#46520. Updates golang/go#49387. Change-Id: I746409a7aa2c22d5651448ed0135b5ac21a9808e Reviewed-on: https://go-review.googlesource.com/c/tools/+/447035 Auto-Submit: Bryan Mills Run-TryBot: Bryan Mills TryBot-Result: Gopher Robot gopls-CI: kokoro Reviewed-by: Alan Donovan --- internal/jsonrpc2_v2/conn.go | 267 ++++++++++++++++++++++++---------- internal/jsonrpc2_v2/frame.go | 6 + 2 files changed, 196 insertions(+), 77 deletions(-) diff --git a/internal/jsonrpc2_v2/conn.go b/internal/jsonrpc2_v2/conn.go index 7c48e2ec616..085e775a741 100644 --- a/internal/jsonrpc2_v2/conn.go +++ b/internal/jsonrpc2_v2/conn.go @@ -68,10 +68,9 @@ type Connection struct { stateMu sync.Mutex state inFlightState // accessed only in updateInFlight + done chan struct{} // closed (under stateMu) when state.closed is true and all goroutines have completed - closer io.Closer // shuts down connection when Close has been called or the reader fails - closeErr chan error // 1-buffered; stores the error from closer.Close - writer chan Writer // 1-buffered; stores the writer when not in use + writer chan Writer // 1-buffered; stores the writer when not in use handler Handler @@ -82,10 +81,22 @@ type Connection struct { // inFlightState records the state of the incoming and outgoing calls on a // Connection. type inFlightState struct { - closing bool // disallow enqueuing further requests, and close the Closer when transitioning to idle - readErr error + connClosing bool // true when the Connection's Close method has been called + readErr error // non-nil when the readIncoming goroutine exits (typically io.EOF) + writeErr error // non-nil if a call to the Writer has failed with a non-canceled Context + + // closer shuts down and cleans up the Reader and Writer state, ideally + // interrupting any Read or Write call that is currently blocked. It is closed + // when the state is idle and one of: connClosing is true, readErr is non-nil, + // or writeErr is non-nil. + // + // After the closer has been invoked, the closer field is set to nil + // and the closeErr field is simultaneously set to its result. + closer io.Closer + closeErr error // error returned from closer.Close - outgoing map[ID]*AsyncCall // calls only + outgoingCalls map[ID]*AsyncCall // calls only + outgoingNotifications int // # of notifications awaiting "write" // incoming stores the total number of incoming calls and notifications // that have not yet written or processed a result. @@ -98,13 +109,11 @@ type inFlightState struct { // The queue does not include the request currently being handled (if any). handlerQueue []*incomingRequest handlerRunning bool - - closed bool // true after the closer has been invoked } // updateInFlight locks the state of the connection's in-flight requests, allows // f to mutate that state, and closes the connection if it is idle and either -// is closing or has a read error. +// is closing or has a read or write error. func (c *Connection) updateInFlight(f func(*inFlightState)) { c.stateMu.Lock() defer c.stateMu.Unlock() @@ -113,14 +122,70 @@ func (c *Connection) updateInFlight(f func(*inFlightState)) { f(s) - idle := s.incoming == 0 && len(s.outgoing) == 0 && !s.handlerRunning - if idle && (s.closing || s.readErr != nil) && !s.closed { - c.closeErr <- c.closer.Close() - if c.onDone != nil { - c.onDone() + select { + case <-c.done: + // The connection was already completely done at the start of this call to + // updateInFlight, so it must remain so. (The call to f should have noticed + // that and avoided making any updates that would cause the state to be + // non-idle.) + if !s.idle() { + panic("jsonrpc2_v2: updateInFlight transitioned to non-idle when already done") + } + return + default: + } + + if s.idle() && s.shuttingDown(ErrUnknown) != nil { + if s.closer != nil { + s.closeErr = s.closer.Close() + s.closer = nil // prevent duplicate Close calls } - s.closed = true + if s.readErr == nil { + // The readIncoming goroutine is still running. Our call to Close should + // cause it to exit soon, at which point it will make another call to + // updateInFlight, set s.readErr to a non-nil error, and mark the + // Connection done. + } else { + // The readIncoming goroutine has exited. Since everything else is idle, + // we're completely done. + if c.onDone != nil { + c.onDone() + } + close(c.done) + } + } +} + +// idle reports whether the connction is in a state with no pending calls or +// notifications. +// +// If idle returns true, the readIncoming goroutine may still be running, +// but no other goroutines are doing work on behalf of the connnection. +func (s *inFlightState) idle() bool { + return len(s.outgoingCalls) == 0 && s.outgoingNotifications == 0 && s.incoming == 0 && !s.handlerRunning +} + +// shuttingDown reports whether the connection is in a state that should +// disallow new (incoming and outgoing) calls. It returns either nil or +// an error that is or wraps the provided errClosing. +func (s *inFlightState) shuttingDown(errClosing error) error { + if s.connClosing { + // If Close has been called explicitly, it doesn't matter what state the + // Reader and Writer are in: we shouldn't be starting new work because the + // caller told us not to start new work. + return errClosing + } + if s.readErr != nil { + // If the read side of the connection is broken, we cannot read new call + // requests, and cannot read responses to our outgoing calls. + return fmt.Errorf("%w: %v", errClosing, s.readErr) } + if s.writeErr != nil { + // If the write side of the connection is broken, we cannot write responses + // for incoming calls, and cannot write requests for outgoing calls. + return fmt.Errorf("%w: %v", errClosing, s.writeErr) + } + return nil } // incomingRequest is used to track an incoming request as it is being handled @@ -149,11 +214,16 @@ func newConnection(bindCtx context.Context, rwc io.ReadWriteCloser, binder Binde ctx := notDone{bindCtx} c := &Connection{ - closer: rwc, - closeErr: make(chan error, 1), - writer: make(chan Writer, 1), - onDone: onDone, + state: inFlightState{closer: rwc}, + done: make(chan struct{}), + writer: make(chan Writer, 1), + onDone: onDone, } + // It's tempting to set a finalizer on c to verify that the state has gone + // idle when the connection becomes unreachable. Unfortunately, the Binder + // interface makes that unsafe: it allows the Handler to close over the + // Connection, which could create a reference cycle that would cause the + // Connection to become uncollectable. options := binder.Bind(bindCtx, c) framer := options.Framer @@ -170,10 +240,11 @@ func newConnection(bindCtx context.Context, rwc io.ReadWriteCloser, binder Binde reader := framer.Reader(rwc) c.updateInFlight(func(s *inFlightState) { - if !s.closed { - // The goroutine started here will continue until the underlying stream is closed. - go c.readIncoming(ctx, reader, options.Preempter) - } + // The goroutine started here will continue until the underlying stream is closed. + // + // (If the Binder closed the Connection already, this should error out and + // return almost immediately.) + go c.readIncoming(ctx, reader, options.Preempter) }) return c } @@ -181,20 +252,48 @@ func newConnection(bindCtx context.Context, rwc io.ReadWriteCloser, binder Binde // Notify invokes the target method but does not wait for a response. // The params will be marshaled to JSON before sending over the wire, and will // be handed to the method invoked. -func (c *Connection) Notify(ctx context.Context, method string, params interface{}) error { - notify, err := NewNotification(method, params) - if err != nil { - return fmt.Errorf("marshaling notify parameters: %v", err) - } +func (c *Connection) Notify(ctx context.Context, method string, params interface{}) (err error) { ctx, done := event.Start(ctx, method, tag.Method.Of(method), tag.RPCDirection.Of(tag.Outbound), ) + attempted := false + + defer func() { + labelStatus(ctx, err) + done() + if attempted { + c.updateInFlight(func(s *inFlightState) { + s.outgoingNotifications-- + }) + } + }() + + c.updateInFlight(func(s *inFlightState) { + // If the connection is shutting down, allow outgoing notifications only if + // there is at least one call still in flight. The number of calls in flight + // cannot increase once shutdown begins, and allowing outgoing notifications + // may permit notifications that will cancel in-flight calls. + if len(s.outgoingCalls) == 0 && len(s.incomingByID) == 0 { + err = s.shuttingDown(ErrClientClosing) + if err != nil { + return + } + } + s.outgoingNotifications++ + attempted = true + }) + if err != nil { + return err + } + + notify, err := NewNotification(method, params) + if err != nil { + return fmt.Errorf("marshaling notify parameters: %v", err) + } + event.Metric(ctx, tag.Started.Of(1)) - err = c.write(ctx, notify) - labelStatus(ctx, err) - done() - return err + return c.write(ctx, notify) } // Call invokes the target method and returns an object that can be used to await the response. @@ -228,21 +327,14 @@ func (c *Connection) Call(ctx context.Context, method string, params interface{} } c.updateInFlight(func(s *inFlightState) { - if s.closing { - err = ErrClientClosing - return - } - if s.readErr != nil { - // We must not start a new Call request if the read end of the connection - // has already failed: a Call request requires a response, but with the - // read side broken we have no way to receive that response. - err = fmt.Errorf("%w: %v", ErrClientClosing, s.readErr) + err = s.shuttingDown(ErrClientClosing) + if err != nil { return } - if s.outgoing == nil { - s.outgoing = make(map[ID]*AsyncCall) + if s.outgoingCalls == nil { + s.outgoingCalls = make(map[ID]*AsyncCall) } - s.outgoing[ac.id] = ac + s.outgoingCalls[ac.id] = ac }) if err != nil { ac.retire(&Response{ID: id, Error: err}) @@ -254,8 +346,8 @@ func (c *Connection) Call(ctx context.Context, method string, params interface{} // Sending failed. We will never get a response, so deliver a fake one if it // wasn't already retired by the connection breaking. c.updateInFlight(func(s *inFlightState) { - if s.outgoing[ac.id] == ac { - delete(s.outgoing, ac.id) + if s.outgoingCalls[ac.id] == ac { + delete(s.outgoingCalls, ac.id) ac.retire(&Response{ID: id, Error: err}) } else { // ac was already retired by the readIncoming goroutine: @@ -365,8 +457,11 @@ func (c *Connection) Cancel(id ID) { // Wait blocks until the connection is fully closed, but does not close it. func (c *Connection) Wait() error { - err := <-c.closeErr - c.closeErr <- err + var err error + <-c.done + c.updateInFlight(func(s *inFlightState) { + err = s.closeErr + }) return err } @@ -380,7 +475,7 @@ func (c *Connection) Wait() error { func (c *Connection) Close() error { // Stop handling new requests, and interrupt the reader (by closing the // connection) as soon as the active requests finish. - c.updateInFlight(func(s *inFlightState) { s.closing = true }) + c.updateInFlight(func(s *inFlightState) { s.connClosing = true }) return c.Wait() } @@ -405,8 +500,8 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, preempter case *Response: c.updateInFlight(func(s *inFlightState) { - if ac, ok := s.outgoing[msg.ID]; ok { - delete(s.outgoing, msg.ID) + if ac, ok := s.outgoingCalls[msg.ID]; ok { + delete(s.outgoingCalls, msg.ID) ac.retire(msg) } else { // TODO: How should we report unexpected responses? @@ -423,10 +518,10 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, preempter // Retire any outgoing requests that were still in flight: with the Reader no // longer being processed, they necessarily cannot receive a response. - for id, ac := range s.outgoing { + for id, ac := range s.outgoingCalls { ac.retire(&Response{ID: id, Error: err}) } - s.outgoing = nil + s.outgoingCalls = nil }) } @@ -474,14 +569,11 @@ func (c *Connection) acceptRequest(ctx context.Context, msg *Request, msgBytes i } s.incomingByID[req.ID] = req - if s.closing { - // When closing, reject all new Call requests, even if they could - // theoretically be handled by the preempter. The preempter could return - // ErrAsyncResponse, which would increase the amount of work in flight - // when we're trying to ensure that it strictly decreases. - err = ErrServerClosing - return - } + // When shutting down, reject all new Call requests, even if they could + // theoretically be handled by the preempter. The preempter could return + // ErrAsyncResponse, which would increase the amount of work in flight + // when we're trying to ensure that it strictly decreases. + err = s.shuttingDown(ErrServerClosing) } }) if err != nil { @@ -504,11 +596,12 @@ func (c *Connection) acceptRequest(ctx context.Context, msg *Request, msgBytes i } c.updateInFlight(func(s *inFlightState) { - if s.closing { - // If the connection is closing, don't enqueue anything to the handler — not - // even notifications. That ensures that if the handler continues to make - // progress, it will eventually become idle and close the connection. - err = ErrServerClosing + // If the connection is shutting down, don't enqueue anything to the + // handler — not even notifications. That ensures that if the handler + // continues to make progress, it will eventually become idle and + // close the connection. + err = s.shuttingDown(ErrServerClosing) + if err != nil { return } @@ -557,12 +650,20 @@ func (c *Connection) handleAsync() { return } - var result interface{} - err := req.ctx.Err() - if err == nil { - // Only deliver to the Handler if not already cancelled. - result, err = c.handler.Handle(req.ctx, req.Request) + // Only deliver to the Handler if not already canceled. + if err := req.ctx.Err(); err != nil { + c.updateInFlight(func(s *inFlightState) { + if s.writeErr != nil { + // Assume that req.ctx was canceled due to s.writeErr. + // TODO(#51365): use a Context API to plumb this through req.ctx. + err = fmt.Errorf("%w: %v", ErrServerClosing, s.writeErr) + } + }) + c.processResult("handleAsync", req, nil, err) + continue } + + result, err := c.handler.Handle(req.ctx, req.Request) c.processResult(c.handler, req, result, err) } } @@ -646,12 +747,24 @@ func (c *Connection) write(ctx context.Context, msg Message) error { n, err := writer.Write(ctx, msg) event.Metric(ctx, tag.SentBytes.Of(n)) - // TODO: if err != nil, that suggests that future writes will not succeed, - // so we cannot possibly write the results of incoming Call requests. - // If the read side of the connection is also broken, we also might not have - // a way to receive cancellation notifications. - // - // Should we cancel the pending calls implicitly? + if err != nil && ctx.Err() == nil { + // The call to Write failed, and since ctx.Err() is nil we can't attribute + // the failure (even indirectly) to Context cancellation. The writer appears + // to be broken, and future writes are likely to also fail. + // + // If the read side of the connection is also broken, we might not even be + // able to receive cancellation notifications. Since we can't reliably write + // the results of incoming calls and can't receive explicit cancellations, + // cancel the calls now. + c.updateInFlight(func(s *inFlightState) { + if s.writeErr == nil { + s.writeErr = err + for _, r := range s.incomingByID { + r.cancel() + } + } + }) + } return err } diff --git a/internal/jsonrpc2_v2/frame.go b/internal/jsonrpc2_v2/frame.go index b2b7dc1a172..e4248328132 100644 --- a/internal/jsonrpc2_v2/frame.go +++ b/internal/jsonrpc2_v2/frame.go @@ -120,6 +120,12 @@ func (r *headerReader) Read(ctx context.Context) (Message, int64, error) { line, err := r.in.ReadString('\n') total += int64(len(line)) if err != nil { + if err == io.EOF { + if total == 0 { + return nil, 0, io.EOF + } + err = io.ErrUnexpectedEOF + } return nil, total, fmt.Errorf("failed reading header line: %w", err) } line = strings.TrimSpace(line)