From 7707a04306b806f676a898b2698ee05392ce76e1 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Mon, 7 Feb 2022 12:00:47 -0800 Subject: [PATCH] Front-load handling of callback responses from the client. Previously, a notification handler that issues a call back to the client could block delivery of the reply for its own callback: The barrier we use to preserve issue order means another batch cannot be issued to the dispatcher until all previously-issued notifications have completed. To prevent the handler from deadlocking itself in this case, filter out response messages from the client when the input is received, rather than enqueuing them with the handlers. This basically just moves the existing logic earlier in the transaction, but it means replies can be delivered even if the barrier is active. Fixes #78. --- server.go | 56 ++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/server.go b/server.go index 735557c..670a8ed 100644 --- a/server.go +++ b/server.go @@ -282,20 +282,11 @@ func (s *Server) checkAndAssign(next jmessages) tasks { dup := make(map[string]*task) // :: id ⇒ first task in batch with id // Phase 1: Filter out responses from push calls and check for duplicate - // request ID.s + // request IDs. for _, req := range next { fid := fixID(req.ID) id := string(fid) - if !req.isRequestOrNotification() && s.call[id] != nil { - // This is a result or error for a pending push-call. - // - // N.B. It is important to check for this before checking for - // duplicate request IDs, since the ID spaces could overlap. - rsp := s.call[id] - delete(s.call, id) - rsp.ch <- req - continue // don't send a reply for this - } else if req.err != nil { + if req.err != nil { // keep the existing error } else if !s.versionOK(req.V) { req.err = ErrInvalidVersion @@ -651,16 +642,51 @@ func (s *Server) read(ch receiver) { } else if len(in) == 0 { s.pushError(errEmptyBatch) } else { - s.log("Received request batch of size %d (qlen=%d)", len(in), s.inq.size()) - s.inq.push(in) - if s.inq.size() == 1 { // the queue was empty - s.signal() + // Filter out response messages. It's possible that the entire batch + // was responses, so re-check the length after doing this. + keep := s.filterBatch(in) + if len(keep) != 0 { + s.log("Received request batch of size %d (qlen=%d)", len(keep), s.inq.size()) + s.inq.push(keep) + if s.inq.size() == 1 { // the queue was empty + s.signal() + } } } s.mu.Unlock() } } +// filterBatch removes and handles any response messages from next, dispatching +// replies to pending callbacks as required. The remainder is returned. +// The caller must hold s.mu, and must re-check that the result is not empty. +func (s *Server) filterBatch(next jmessages) jmessages { + var keep jmessages + for _, req := range next { + if req.isRequestOrNotification() { + keep = append(keep, req) + continue + } + + // If this is a response implicating the ID of a pending push-call, + // deliver the result to that call. Do this early to avoid deadlocking on + // the sequencing barrier (see #78). + // + // Note, however, if it does NOT correspond to a known push-call, keep it + // in the batch so it can be serviced as an error. + id := string(fixID(req.ID)) + if s.call[id] != nil { + rsp := s.call[id] + delete(s.call, id) + rsp.ch <- req + s.log("Received response for callback %q", id) + } else { + keep = append(keep, req) + } + } + return keep +} + // ServerInfo is the concrete type of responses from the rpc.serverInfo method. type ServerInfo struct { // The list of method names exported by this server.