Skip to content

Commit

Permalink
Fix control flow in server-to-client callback handlers. (#42)
Browse files Browse the repository at this point in the history
Prior to this change, a client callback handler that panics could cause a
server running in the same process to stall waiting for a reply, if the panic
was absorbed at a higher level without cleaning up the client.

Although this combination of conditions is less common, it can be tricky to
debug. To avert this, recover a panic from the callback handler and convert it
into errors back to the server.

In addition, the server was not cleaning up pending push-calls when shutting
down in response to a client connection termination. This also fixes that
problem on the server side.

Add a regression test for callback handler panics.
  • Loading branch information
creachadair authored Feb 9, 2021
1 parent 8fb19ae commit 5f5e1a3
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 13 deletions.
11 changes: 6 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Client struct {
log func(string, ...interface{}) // write debug logs here
enctx encoder
snote func(*jmessage)
scall func(*jmessage) ([]byte, error)
scall func(*jmessage) []byte
chook func(*Client, *Response)

allow1 bool // tolerate v1 replies with no version marker
Expand Down Expand Up @@ -109,10 +109,11 @@ func (c *Client) handleRequest(msg *jmessage) {
}
} else if c.scall == nil {
c.log("Discarding callback request: %v", msg)
} else if bits, err := c.scall(msg); err != nil {
c.log("Callback for %v failed: %v", msg, err)
} else if err := c.ch.Send(bits); err != nil {
c.log("Sending reply for callback %v failed: %v", msg, err)
} else {
bits := c.scall(msg)
if err := c.ch.Send(bits); err != nil {
c.log("Sending reply for callback %v failed: %v", msg, err)
}
}
}

Expand Down
38 changes: 31 additions & 7 deletions opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ type ClientOptions struct {
// If unset, server requests are logged and discarded. At most one
// invocation of this callback will be active at a time.
// Server callbacks are a non-standard extension of JSON-RPC.
//
// If a callback handler panics, the client will recover the panic and
// report a system error back to the server describing the error.
OnCallback func(context.Context, *Request) (interface{}, error)

// If set, this function is called when the context for a request terminates.
Expand Down Expand Up @@ -205,20 +208,31 @@ func (c *ClientOptions) handleCancel() func(*Client, *Response) {
return c.OnCancel
}

func (c *ClientOptions) handleCallback() func(*jmessage) ([]byte, error) {
func (c *ClientOptions) handleCallback() func(*jmessage) []byte {
if c == nil || c.OnCallback == nil {
return nil
}
cb := c.OnCallback
return func(req *jmessage) ([]byte, error) {
return func(req *jmessage) []byte {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Recover panics from the callback handler to ensure the server gets a
// response even if the callback fails without a result.
//
// Otherwise, a client and a server (a) running in the same process, and
// (b) where panics are recovered at a higher level, and (c) without
// cleaning up the client, can cause the server to stall in a manner that
// is difficult to debug.
//
// See https://github.com/creachadair/jrpc2/issues/41.
rsp := &jmessage{V: Version, ID: req.ID}
v, err := cb(ctx, &Request{
id: req.ID,
method: req.M,
params: req.P,
v, err := panicToError(func() (interface{}, error) {
return cb(ctx, &Request{
id: req.ID,
method: req.M,
params: req.P,
})
})
if err == nil {
rsp.R, err = json.Marshal(v)
Expand All @@ -231,10 +245,20 @@ func (c *ClientOptions) handleCallback() func(*jmessage) ([]byte, error) {
rsp.E = &Error{code: code.FromError(err), message: err.Error()}
}
}
return json.Marshal(rsp)
bits, _ := json.Marshal(rsp)
return bits
}
}

func panicToError(f func() (interface{}, error)) (v interface{}, err error) {
defer func() {
if p := recover(); p != nil {
err = fmt.Errorf("panic in callback handler: %v", p)
}
}()
return f()
}

// An RPCLogger receives callbacks from a server to record the receipt of
// requests and the delivery of responses. These callbacks are invoked
// synchronously with the processing of the request.
Expand Down
36 changes: 36 additions & 0 deletions regression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jrpc2_test

import (
"context"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -48,3 +49,38 @@ func TestLockRaceRegression(t *testing.T) {
t.Log("Notification handler completed successfully")
}
}

// Verify that if a callback handler panics, the client will report an error
// back to the server. See https://github.com/creachadair/jrpc2/issues/41.
func TestOnCallbackPanicRegression(t *testing.T) {
const panicString = "the devil you say"

loc := server.NewLocal(handler.Map{
"Test": handler.New(func(ctx context.Context) error {
rsp, err := jrpc2.PushCall(ctx, "Poke", nil)
if err == nil {
t.Errorf("Callback unexpectedly succeeded: %#q", rsp.ResultString())
} else if !strings.HasSuffix(err.Error(), panicString) {
t.Errorf("Callback reported unexpected error: %v", err)
} else {
t.Logf("Callback reported expected error: %v", err)
}
return nil
}),
}, &server.LocalOptions{
Server: &jrpc2.ServerOptions{
AllowPush: true,
},
Client: &jrpc2.ClientOptions{
OnCallback: func(ctx context.Context, req *jrpc2.Request) (interface{}, error) {
t.Log("Entering callback handler; about to panic")
panic(panicString)
},
},
})
defer loc.Close()

if _, err := loc.Client.Call(context.Background(), "Test", nil); err != nil {
t.Errorf("Call unexpectedly failed: %v", err)
}
}
10 changes: 9 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,15 @@ func (s *Server) stop(err error) {
}
s.work.Broadcast()

// Cancel any in-flight requests that made it out of the queue.
// Cancel any in-flight requests that made it out of the queue, and
// terminate any pending callback invocations.
for id, rsp := range s.call {
rsp.ch <- &jmessage{
ID: json.RawMessage(id),
E: &Error{message: "client channel terminated", code: code.Cancelled},
}
delete(s.call, id)
}
for id, cancel := range s.used {
cancel()
delete(s.used, id)
Expand Down

0 comments on commit 5f5e1a3

Please sign in to comment.