Skip to content

Commit

Permalink
Front-load handling of callback responses from the client.
Browse files Browse the repository at this point in the history
Previously, a notification handler that issues a call back to the client could
block delivery of the reply for its own callback: The barrier we use to
preserve issue order means another batch cannot be issued to the dispatcher
until all previously-issued notifications have completed.

To prevent the handler from deadlocking itself in this case, filter out
response messages from the client when the input is received, rather than
enqueuing them with the handlers. This basically just moves the existing logic
earlier in the transaction, but it means replies can be delivered even if the
barrier is active.

Fixes #78.
  • Loading branch information
creachadair committed Feb 7, 2022
1 parent f2aaf13 commit 7707a04
Showing 1 changed file with 41 additions and 15 deletions.
56 changes: 41 additions & 15 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,20 +282,11 @@ func (s *Server) checkAndAssign(next jmessages) tasks {
dup := make(map[string]*task) // :: id ⇒ first task in batch with id

// Phase 1: Filter out responses from push calls and check for duplicate
// request ID.s
// request IDs.
for _, req := range next {
fid := fixID(req.ID)
id := string(fid)
if !req.isRequestOrNotification() && s.call[id] != nil {
// This is a result or error for a pending push-call.
//
// N.B. It is important to check for this before checking for
// duplicate request IDs, since the ID spaces could overlap.
rsp := s.call[id]
delete(s.call, id)
rsp.ch <- req
continue // don't send a reply for this
} else if req.err != nil {
if req.err != nil {
// keep the existing error
} else if !s.versionOK(req.V) {
req.err = ErrInvalidVersion
Expand Down Expand Up @@ -651,16 +642,51 @@ func (s *Server) read(ch receiver) {
} else if len(in) == 0 {
s.pushError(errEmptyBatch)
} else {
s.log("Received request batch of size %d (qlen=%d)", len(in), s.inq.size())
s.inq.push(in)
if s.inq.size() == 1 { // the queue was empty
s.signal()
// Filter out response messages. It's possible that the entire batch
// was responses, so re-check the length after doing this.
keep := s.filterBatch(in)
if len(keep) != 0 {
s.log("Received request batch of size %d (qlen=%d)", len(keep), s.inq.size())
s.inq.push(keep)
if s.inq.size() == 1 { // the queue was empty
s.signal()
}
}
}
s.mu.Unlock()
}
}

// filterBatch removes and handles any response messages from next, dispatching
// replies to pending callbacks as required. The remainder is returned.
// The caller must hold s.mu, and must re-check that the result is not empty.
func (s *Server) filterBatch(next jmessages) jmessages {
var keep jmessages
for _, req := range next {
if req.isRequestOrNotification() {
keep = append(keep, req)
continue
}

// If this is a response implicating the ID of a pending push-call,
// deliver the result to that call. Do this early to avoid deadlocking on
// the sequencing barrier (see #78).
//
// Note, however, if it does NOT correspond to a known push-call, keep it
// in the batch so it can be serviced as an error.
id := string(fixID(req.ID))
if s.call[id] != nil {
rsp := s.call[id]
delete(s.call, id)
rsp.ch <- req
s.log("Received response for callback %q", id)
} else {
keep = append(keep, req)
}
}
return keep
}

// ServerInfo is the concrete type of responses from the rpc.serverInfo method.
type ServerInfo struct {
// The list of method names exported by this server.
Expand Down

0 comments on commit 7707a04

Please sign in to comment.