diff --git a/base.go b/base.go index 3667dce..7d0091b 100644 --- a/base.go +++ b/base.go @@ -10,10 +10,11 @@ import ( "strings" ) -// An Assigner assigns a Handler to handle the specified method name, or nil if -// no method is available to handle the request. +// An Assigner maps method names to Handler functions. type Assigner interface { - // Assign returns the handler for the named method, or nil. + // Assign returns the handler for the named method, or returns nil to + // indicate that the method is not known. + // // The implementation can obtain the complete request from ctx using the // jrpc2.InboundRequest function. Assign(ctx context.Context, method string) Handler @@ -26,10 +27,11 @@ type Namer interface { Names() []string } -// A Handler function implements a method given a request. The response value -// must be JSON-marshalable or nil. In case of error, the handler can return a -// value of type *jrpc2.Error to control the response code sent back to the -// caller; otherwise the server will wrap the resulting value. +// A Handler function implements a method. The request contains the method +// name, request ID, and parameters sent by the client. The result value must +// be JSON-marshalable or nil. In case of error, the handler can return a value +// of type *jrpc2.Error to control the response code sent back to the caller; +// otherwise the server will wrap the resulting value. // // The context passed to the handler by a *jrpc2.Server includes two special // values that the handler may extract. @@ -153,7 +155,7 @@ func (r *Response) UnmarshalResult(v any) error { return json.Unmarshal(r.result, v) } -// ResultString returns the encoded result message of r as a string. +// ResultString returns the encoded result value of r as a string. // If r has no result, for example if r is an error response, it returns "". func (r *Response) ResultString() string { return string(r.result) } diff --git a/channel/channel.go b/channel/channel.go index 964a6e4..aa47583 100644 --- a/channel/channel.go +++ b/channel/channel.go @@ -1,10 +1,10 @@ // Copyright (C) 2017 Michael J. Fromberger. All Rights Reserved. -// Package channel defines a basic communications channel. +// Package channel defines a communications channel. // -// A Channel encodes/transmits and decodes/receives data records over an -// unstructured stream, using a configurable framing discipline. This package -// provides some basic framing implementations. +// A Channel encodes/transmits and decodes/receives data records. The types in +// this package support sending and receiving over an unstructured stream using +// a configurable framing discipline. // // # Channels // @@ -42,7 +42,7 @@ import ( // A Channel represents the ability to transmit and receive data records. A // channel does not interpret the contents of a record, but may add and remove -// framing so that records can be embedded in higher-level protocols. +// framing so that records can be embedded in lower-level protocols. // // One sender and one receiver may use a Channel concurrently, but the methods // of a Channel are not otherwise required to be safe for concurrent use. The diff --git a/channel/hdr.go b/channel/hdr.go index 9268b6c..660fa30 100644 --- a/channel/hdr.go +++ b/channel/hdr.go @@ -12,7 +12,7 @@ import ( "strings" ) -// StrictHeader defines a framing that transmits and receives messages using a +// StrictHeader defines a Framing that transmits and receives messages using a // header prefix similar to HTTP, in which mimeType describes the content type. // // Specifically, each message is sent in the format: diff --git a/client.go b/client.go index 8ff8a26..aeaa362 100644 --- a/client.go +++ b/client.go @@ -14,7 +14,7 @@ import ( ) // A Client is a JSON-RPC 2.0 client. The client sends requests and receives -// responses on a channel.Channel provided by the caller. +// responses on a channel.Channel provided by the constructor. type Client struct { done *sync.WaitGroup // done when the reader is finished at shutdown time @@ -23,8 +23,8 @@ type Client struct { scall func(context.Context, *jmessage) []byte chook func(*Client, *Response) - cbctx context.Context // terminates when the client is closed - cbcancel func() // cancels cbctx + cbctx context.Context // terminates when the client is closed + cbcancel context.CancelFunc // cancels cbctx mu sync.Mutex // protects the fields below ch channel.Channel // channel to the server @@ -82,7 +82,7 @@ func (c *Client) accept(ch receiver) error { c.log("Decoding error: %v", err) } c.mu.Lock() - c.stop(err) + c.stopLocked(err) c.mu.Unlock() return err } @@ -94,16 +94,16 @@ func (c *Client) accept(ch receiver) error { c.mu.Lock() defer c.mu.Unlock() for _, rsp := range in { - c.deliver(rsp) + c.deliverLocked(rsp) } }() return nil } -// handleRequest handles a callback or notification from the server. The +// handleRequestLocked handles a callback or notification from the server. The // caller must hold c.mu. This function does not block for the handler. // Precondition: msg is a request or notification, not a response or error. -func (c *Client) handleRequest(msg *jmessage) { +func (c *Client) handleRequestLocked(msg *jmessage) { if msg.isNotification() { if c.snote == nil { c.log("Discarding notification: %v", msg) @@ -134,14 +134,14 @@ func (c *Client) handleRequest(msg *jmessage) { } } -// For each response, find the request pending on its ID and deliver it. The -// caller must hold c.mu. Unknown response IDs are logged and discarded. As -// we are under the lock, we do not wait for the pending receiver to pick up -// the response; we just drop it in their channel. The channel is buffered so -// we don't need to rendezvous. -func (c *Client) deliver(rsp *jmessage) { +// deliverLocked delivers rsp to the request pending on its ID. The caller +// must hold c.mu. Unknown response IDs are logged and discarded. As we are +// under the lock, we do not wait for the pending receiver to pick up the +// response; we just drop it in their channel. The channel is buffered so we +// don't need to rendezvous. +func (c *Client) deliverLocked(rsp *jmessage) { if rsp.isRequestOrNotification() { - c.handleRequest(rsp) + c.handleRequestLocked(rsp) return } @@ -285,9 +285,9 @@ func (c *Client) waitComplete(pctx context.Context, id string, p *Response) { } } -// Call initiates a single request and blocks until the response returns. -// A successful call reports a nil error and a non-nil response. Errors from -// the server have concrete type *jrpc2.Error. +// Call initiates a single request and blocks until the response returns or ctx +// ends. A successful call reports a nil error and a non-nil response. Errors +// from the server have concrete type *jrpc2.Error. // // rsp, err := c.Call(ctx, method, params) // if e, ok := err.(*jrpc2.Error); ok { @@ -324,8 +324,8 @@ func (c *Client) CallResult(ctx context.Context, method string, params, result a } // Batch initiates a batch of concurrent requests, and blocks until all the -// responses return. The responses are returned in the same order as the -// original specs, omitting notifications. +// responses return or ctx ends. The responses are returned in the same order +// as the original specs, omitting notifications. // // Any error reported by Batch represents an error in encoding or sending the // batch to the server. Errors reported by the server in response to requests @@ -356,7 +356,7 @@ func (c *Client) Batch(ctx context.Context, specs []Spec) ([]*Response, error) { } // A Spec combines a method name and parameter value as part of a Batch. If -// the Notify field is true, the request is sent as a notification. +// the Notify flag is true, the request is sent as a notification. type Spec struct { Method string Params any @@ -364,7 +364,7 @@ type Spec struct { } // Notify transmits a notification to the specified method and parameters. It -// blocks until the notification has been sent. +// blocks until the notification has been sent or ctx ends. func (c *Client) Notify(ctx context.Context, method string, params any) error { req, err := c.note(ctx, method, params) if err != nil { @@ -377,7 +377,7 @@ func (c *Client) Notify(ctx context.Context, method string, params any) error { // Close shuts down the client, terminating any pending in-flight requests. func (c *Client) Close() error { c.mu.Lock() - c.stop(errClientStopped) + c.stopLocked(errClientStopped) c.mu.Unlock() c.done.Wait() @@ -392,10 +392,10 @@ func isUninteresting(err error) bool { return err == io.EOF || channel.IsErrClosing(err) || err == errClientStopped } -// stop closes down the reader for c and records err as its final state. The -// caller must hold c.mu. If multiple callers invoke stop, only the first will -// successfully record its error status. -func (c *Client) stop(err error) { +// stopLocked closes down the reader for c and records err as its final state. +// The caller must hold c.mu. If multiple callers invoke stop, only the first +// will successfully record its error status. +func (c *Client) stopLocked(err error) { if c.ch == nil { return // nothing is running } diff --git a/code.go b/code.go index 955ba39..dd3aa71 100644 --- a/code.go +++ b/code.go @@ -1,6 +1,5 @@ // Copyright (C) 2017 Michael J. Fromberger. All Rights Reserved. -// Package code defines error code values used by the jrpc2 package. package jrpc2 import ( @@ -9,9 +8,9 @@ import ( "fmt" ) -// A Code is an error response. +// A Code is an error code included in the JSON-RPC error object. // -// Code values from and including -32768 to -32000 are reserved for pre-defined +// Code values from and including -32768 to -32000 are reserved for predefined // JSON-RPC errors. Any code within this range, but not defined explicitly // below is reserved for future use. The remainder of the space is available // for application defined errors. @@ -89,11 +88,12 @@ var stdError = map[Code]string{ } // ErrorCode returns a Code to categorize the specified error. -// If err == nil, it returns jrpc2.NoError. -// If err is (or wraps) an ErrCoder, it returns the reported code value. -// If err is context.Canceled, it returns jrpc2.Cancelled. -// If err is context.DeadlineExceeded, it returns jrpc2.DeadlineExceeded. -// Otherwise it returns jrpc2.SystemError. +// +// - If err == nil, it returns jrpc2.NoError. +// - If err is (or wraps) an ErrCoder, it returns the reported code value. +// - If err is context.Canceled, it returns jrpc2.Cancelled. +// - If err is context.DeadlineExceeded, it returns jrpc2.DeadlineExceeded. +// - Otherwise it returns jrpc2.SystemError. func ErrorCode(err error) Code { if err == nil { return NoError diff --git a/ctx.go b/ctx.go index f27d3f8..bac2b35 100644 --- a/ctx.go +++ b/ctx.go @@ -6,9 +6,9 @@ import ( "context" ) -// InboundRequest returns the inbound request associated with the given -// context, or nil if ctx does not have an inbound request. The context passed -// to the handler by *jrpc2.Server will include this value. +// InboundRequest returns the inbound request associated with the context +// passed to a Handler, or nil if ctx does not have an inbound request. +// A *jrpc2.Server populates this value for handler contexts. // // This is mainly useful to wrapped server methods that do not have the request // as an explicit parameter; for direct implementations of the Handler type the @@ -23,9 +23,8 @@ func InboundRequest(ctx context.Context) *Request { type inboundRequestKey struct{} -// ServerFromContext returns the server associated with the given context. -// This will be populated on the context passed to request handlers. -// This function is for use by handlers, and will panic for a non-handler context. +// ServerFromContext returns the server associated with the context passed to a +// Handler by a *jrpc2.Server. It will panic for a non-handler context. // // It is safe to retain the server and invoke its methods beyond the lifetime // of the context from which it was extracted; however, a handler must not @@ -36,9 +35,10 @@ func ServerFromContext(ctx context.Context) *Server { return ctx.Value(serverKey type serverKey struct{} // ClientFromContext returns the client associated with the given context. -// This will be populated on the context passed to callback handlers. +// This will be populated on the context passed by a *jrpc2.Client to a +// client-side callback handler. // -// A callback handler must not close the client, as the close will deadlock +// A callback handler MUST NOT close the client, as the close will deadlock // waiting for the callback to return. func ClientFromContext(ctx context.Context) *Client { return ctx.Value(clientKey{}).(*Client) } diff --git a/error.go b/error.go index 1eac43a..ce238d7 100644 --- a/error.go +++ b/error.go @@ -9,13 +9,14 @@ import ( ) // Error is the concrete type of errors returned from RPC calls. +// It also represents the JSON encoding of the JSON-RPC error object. type Error struct { Code Code `json:"code"` // the machine-readable error code Message string `json:"message,omitempty"` // the human-readable error message Data json.RawMessage `json:"data,omitempty"` // optional ancillary error data } -// Error renders e to a human-readable string for the error interface. +// Error returns a human-readable description of e. func (e Error) Error() string { return fmt.Sprintf("[%d] %s", e.Code, e.Message) } // ErrCode trivially satisfies the ErrCoder interface for an *Error. diff --git a/internal_test.go b/internal_test.go index ce31c85..1754dbb 100644 --- a/internal_test.go +++ b/internal_test.go @@ -227,12 +227,12 @@ func TestServer_specialMethods(t *testing.T) { }, nil) ctx := context.Background() for _, name := range []string{rpcServerInfo, "donkeybait"} { - if got := s.assign(ctx, name); got == nil { - t.Errorf("s.assign(%s): no method assigned", name) + if got := s.assignLocked(ctx, name); got == nil { + t.Errorf("s.assignLocked(%s): no method assigned", name) } } - if got := s.assign(ctx, "rpc.nonesuch"); got != nil { - t.Errorf("s.assign(rpc.nonesuch): got %p, want nil", got) + if got := s.assignLocked(ctx, "rpc.nonesuch"); got != nil { + t.Errorf("s.assignLocked(rpc.nonesuch): got %p, want nil", got) } } @@ -250,14 +250,14 @@ func TestServer_disableBuiltinHook(t *testing.T) { // With builtins disabled, the default rpc.* methods should not get assigned. for _, name := range []string{rpcServerInfo} { - if got := s.assign(ctx, name); got != nil { - t.Errorf("s.assign(%s): got %p, wanted nil", name, got) + if got := s.assignLocked(ctx, name); got != nil { + t.Errorf("s.assignLocked(%s): got %p, wanted nil", name, got) } } // However, user-assigned methods with this prefix should now work. - if got := s.assign(ctx, "rpc.nonesuch"); got == nil { - t.Error("s.assign(rpc.nonesuch): missing assignment") + if got := s.assignLocked(ctx, "rpc.nonesuch"); got == nil { + t.Error("s.assignLocked(rpc.nonesuch): missing assignment") } } diff --git a/json.go b/json.go index e04ed59..fd17591 100644 --- a/json.go +++ b/json.go @@ -7,9 +7,10 @@ import ( "encoding/json" ) -// ParseRequests parses a single request or a batch of requests from JSON. -// This function reports an error only if msg is not valid JSON. The caller -// must check the individual results for their validity. +// ParseRequests parses either a single request or a batch of requests from +// JSON. It reports an error only if msg is not valid JSON. The caller must +// check the Error field results to determine whether the individual requests +// are valid. func ParseRequests(msg []byte) ([]*ParsedRequest, error) { var reqs jmessages if err := reqs.parseJSON(msg); err != nil { diff --git a/opts.go b/opts.go index 6b29cb7..45dd995 100644 --- a/opts.go +++ b/opts.go @@ -12,7 +12,7 @@ import ( ) // ServerOptions control the behaviour of a server created by NewServer. -// A nil *ServerOptions provides sensible defaults. +// A nil *ServerOptions is valid and provides sensible defaults. // It is safe to share server options among multiple server instances. type ServerOptions struct { // If not nil, send debug text logs here. @@ -88,7 +88,7 @@ func (s *ServerOptions) rpcLog() RPCLogger { } // ClientOptions control the behaviour of a client created by NewClient. -// A nil *ClientOptions provides sensible defaults. +// A nil *ClientOptions is valid and provides sensible defaults. type ClientOptions struct { // If not nil, send debug text logs here. Logger Logger diff --git a/server.go b/server.go index 1a15b01..fc5b887 100644 --- a/server.go +++ b/server.go @@ -53,8 +53,8 @@ func init() { // expvar.Publish or similar. func ServerMetrics() *expvar.Map { return serverMetrics } -// A Server is a JSON-RPC 2.0 server. The server receives requests and sends -// responses on a channel.Channel provided by the caller, and dispatches +// Server implements a JSON-RPC 2.0 server. The server receives requests and +// sends responses on a channel.Channel provided by the caller, and dispatches // requests to user-defined Handlers. type Server struct { wg sync.WaitGroup // ready when workers are done at shutdown time @@ -90,10 +90,9 @@ type Server struct { // NewServer returns a new unstarted server that will dispatch incoming // JSON-RPC requests according to mux. To start serving, call Start. // -// N.B. It is only safe to modify mux after the server has been started if mux -// itself is safe for concurrent use by multiple goroutines. -// -// This function will panic if mux == nil. +// This function will panic if mux == nil. It is not safe to modify mux after +// the server has been started unless mux itself is safe for concurrent use by +// multiple goroutines. func NewServer(mux Assigner, opts *ServerOptions) *Server { if mux == nil { panic("nil assigner") @@ -115,9 +114,9 @@ func NewServer(mux Assigner, opts *ServerOptions) *Server { return s } -// Start enables processing of requests from c and returns. Start does not -// block while the server runs. This function will panic if the server is -// already running. It returns s to allow chaining with construction. +// Start initiates processing of requests from c and returns. Start does not +// block while the server runs. Start will panic if the server is already +// running. It returns s to allow chaining with construction. func (s *Server) Start(c channel.Channel) *Server { s.mu.Lock() defer s.mu.Unlock() @@ -213,7 +212,7 @@ func (s *Server) nextRequest() (func() error, error) { s.log("Dequeued request batch of length %d (qlen=%d)", len(next), s.inq.Len()) // Construct a dispatcher to run the handlers outside the lock. - return s.dispatch(next, ch), nil + return s.dispatchLocked(next, ch), nil } // waitForBarrier blocks until all notification handlers that have been issued @@ -230,18 +229,19 @@ func (s *Server) waitForBarrier(n int) { s.nbar.Add(n) } -// dispatch constructs a function that invokes each of the specified tasks. -// The caller must hold s.mu when calling dispatch, but the returned function -// should be executed outside the lock to wait for the handlers to return. +// dispatchLocked constructs a function that invokes each of the specified +// tasks. The caller must hold s.mu when calling dispatchLocked, but the +// returned function should be executed outside the lock to wait for the +// handlers to return. // -// dispatch blocks until any notification received prior to this batch has -// completed, to ensure that notifications are processed in a partial order +// dispatchLocked blocks until any notification received prior to this batch +// has completed, to ensure that notifications are processed in a partial order // that respects order of receipt. Notifications within a batch are handled // concurrently. -func (s *Server) dispatch(next jmessages, ch sender) func() error { +func (s *Server) dispatchLocked(next jmessages, ch sender) func() error { // Resolve all the task handlers or record errors. start := time.Now() - tasks := s.checkAndAssign(next) + tasks := s.checkAndAssignLocked(next) // Ensure all notifications already issued have completed; see #24. todo, notes := tasks.numToDo() @@ -294,7 +294,7 @@ func (s *Server) deliver(rsps jmessages, ch sender, elapsed time.Duration) error // cancelling its valid predecessor in that ID. for _, rsp := range rsps { if rsp.err == nil { - s.cancel(string(rsp.ID)) + s.cancelLocked(string(rsp.ID)) } } @@ -303,9 +303,9 @@ func (s *Server) deliver(rsps jmessages, ch sender, elapsed time.Duration) error return err } -// checkAndAssign resolves all the task handlers for the given batch, or +// checkAndAssignLocked resolves all the task handlers for the given batch, or // records errors for them as appropriate. The caller must hold s.mu. -func (s *Server) checkAndAssign(next jmessages) tasks { +func (s *Server) checkAndAssignLocked(next jmessages) tasks { var ts tasks var ids []string dup := make(map[string]*task) // :: id ⇒ first task in batch with id @@ -345,7 +345,7 @@ func (s *Server) checkAndAssign(next jmessages) tasks { t.err = errEmptyMethod } else { s.setContext(t, id) - t.m = s.assign(t.ctx, t.hreq.method) + t.m = s.assignLocked(t.ctx, t.hreq.method) if t.m == nil { t.err = errNoSuchMethod.WithData(t.hreq.method) } @@ -530,7 +530,7 @@ func (s *Server) pushReq(ctx context.Context, wantID bool, method string, params func (s *Server) Stop() { s.mu.Lock() defer s.mu.Unlock() - s.stop(errServerStopped) + s.stopLocked(errServerStopped) } // ServerStatus describes the status of a stopped server. @@ -575,10 +575,10 @@ func (s *Server) WaitStatus() ServerStatus { // It is equivalent to s.WaitStatus().Err. func (s *Server) Wait() error { return s.WaitStatus().Err } -// stop shuts down the connection and records err as its final state. The -// caller must hold s.mu. If multiple callers invoke stop, only the first will -// successfully record its error status. -func (s *Server) stop(err error) { +// stopLocked shuts down the connection and records err as its final state. +// The caller must hold s.mu. If multiple callers invoke stop, only the first +// will successfully record its error status. +func (s *Server) stopLocked(err error) { if s.ch == nil { return // nothing is running } @@ -596,7 +596,7 @@ func (s *Server) stop(err error) { keep = append(keep, req) s.log("Retaining notification %p", req) } else { - s.cancel(string(req.ID)) + s.cancelLocked(string(req.ID)) } } return true @@ -646,17 +646,17 @@ func (s *Server) read(ch receiver) { } s.mu.Lock() if err != nil { // receive failure; shut down - s.stop(err) + s.stopLocked(err) s.mu.Unlock() return } else if derr != nil { // parse failure; report and continue - s.pushError(derr) + s.pushErrorLocked(derr) } else if len(in) == 0 { - s.pushError(errEmptyBatch) + s.pushErrorLocked(errEmptyBatch) } else { // Filter out response messages. It's possible that the entire batch // was responses, so re-check the length after doing this. - keep := s.filterBatch(in) + keep := s.filterBatchLocked(in) if len(keep) != 0 { s.log("Received request batch of size %d (qlen=%d)", len(keep), s.inq.Len()) s.inq.Add(keep) @@ -669,10 +669,11 @@ func (s *Server) read(ch receiver) { } } -// filterBatch removes and handles any response messages from next, dispatching -// replies to pending callbacks as required. The remainder is returned. -// The caller must hold s.mu, and must re-check that the result is not empty. -func (s *Server) filterBatch(next jmessages) jmessages { +// filterBatchLocked removes and handles any response messages from next, +// dispatching replies to pending callbacks as required. The remainder is +// returned. The caller must hold s.mu, and must re-check that the result is +// not empty. +func (s *Server) filterBatchLocked(next jmessages) jmessages { keep := make(jmessages, 0, len(next)) for _, req := range next { if req.isRequestOrNotification() { @@ -711,9 +712,9 @@ type ServerInfo struct { StartTime time.Time `json:"startTime,omitempty"` } -// assign returns a Handler to handle the specified name, or nil. -// The caller must hold s.mu. -func (s *Server) assign(ctx context.Context, name string) Handler { +// assignLocked returns a Handler to handle the specified name, or nil. The +// caller must hold s.mu. +func (s *Server) assignLocked(ctx context.Context, name string) Handler { if s.builtin && strings.HasPrefix(name, "rpc.") { switch name { case rpcServerInfo: @@ -727,10 +728,10 @@ func (s *Server) assign(ctx context.Context, name string) Handler { return s.mux.Assign(ctx, name) } -// pushError reports an error for the given request ID directly back to the -// client, bypassing the normal request handling mechanism. The caller must -// hold s.mu when calling this method. -func (s *Server) pushError(err error) { +// pushErrorLocked reports an error for the given request ID directly back to +// the client, bypassing the normal request handling mechanism. The caller +// must hold s.mu when calling this method. +func (s *Server) pushErrorLocked(err error) { s.log("Invalid request: %v", err) var jerr *Error if e, ok := err.(*Error); ok { @@ -750,10 +751,10 @@ func (s *Server) pushError(err error) { } } -// cancel reports whether id is an active call. If so, it also calls the +// cancelLocked reports whether id is an active call. If so, it also calls the // cancellation function associated with id and removes it from the // reservations. The caller must hold s.mu. -func (s *Server) cancel(id string) bool { +func (s *Server) cancelLocked(id string) bool { cancel, ok := s.used[id] if ok { cancel() @@ -838,7 +839,7 @@ func (ts tasks) numToDo() (todo, notes int) { func (s *Server) CancelRequest(id string) { s.mu.Lock() defer s.mu.Unlock() - if s.cancel(id) { + if s.cancelLocked(id) { s.log("Cancelled request %s by client order", id) } } diff --git a/server/loop.go b/server/loop.go index e9d9d4b..d9ea530 100644 --- a/server/loop.go +++ b/server/loop.go @@ -12,11 +12,13 @@ import ( "github.com/creachadair/jrpc2/channel" ) -// Service is the interface used by the Loop function to start up a -// server. The methods of this interface allow the instance to manage its -// state: The Assigner method is called before the server is started, and can -// be used to initialize the service. The Finish method is called after the -// server exits, and can be used to clean up. +// Service is the interface used by the Loop function to start up a server. +// The methods of this interface allow the instance to manage its state: +// +// - The Assigner method is called before the server is started, to to +// initialize the service. +// +// - The Finish method is called after the server exits to clean up. type Service interface { // This method is called to create an assigner and initialize the service // for use. If it reports an error, the server is not started.