Skip to content

Commit

Permalink
Merge pull request capnproto#317 from zenhack/fix-315
Browse files Browse the repository at this point in the history
Fix 315
  • Loading branch information
zenhack authored Oct 9, 2022
2 parents bb2e650 + 0fe712f commit 066ffec
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 0 deletions.
7 changes: 7 additions & 0 deletions rpc/question.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ func (q *question) handleCancel(ctx context.Context) {
}
close(q.finishMsgSend)

// First remove the question from the table, so we don't
// double-resolve the promise. We can't free the id though,
// since the finish failed.
syncutil.With(&q.c.mu, func() {
q.c.questions[q.id] = nil
})

q.p.Reject(rejectErr)
if q.bootstrapPromise != nil {
q.bootstrapPromise.Fulfill(q.p.Answer().Client())
Expand Down
8 changes: 8 additions & 0 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func (c *Conn) release() {
exports := c.exports
embargoes := c.embargoes
answers := c.answers
questions := c.questions
c.imports = nil
c.exports = nil
c.embargoes = nil
Expand All @@ -357,6 +358,7 @@ func (c *Conn) release() {
c.releaseExports(exports)
c.liftEmbargoes(embargoes)
c.releaseAnswers(answers)
c.releaseQuestions(questions)

}

Expand Down Expand Up @@ -395,6 +397,12 @@ func (c *Conn) releaseAnswers(answers map[answerID]*answer) {
}
}

func (c *Conn) releaseQuestions(questions []*question) {
for _, q := range questions {
q.Reject(ExcClosed)
}
}

// If abortErr != nil, send abort message. IO and alloc errors are ignored.
// Called by 'shutdown'. Callers MUST hold c.mu.
func (c *Conn) abort(abortErr error) {
Expand Down
60 changes: 60 additions & 0 deletions rpc/shutdown_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package rpc_test

import (
"context"
"net"
"testing"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
"capnproto.org/go/capnp/v3/rpc/internal/testcapnp"
"capnproto.org/go/capnp/v3/rpc/transport"
)

// TestRejectOnDisconnect verifies that, when a connection is dropped, outstanding calls
// fail with a "disconnected" exception.
func TestRejectOnDisconnect(t *testing.T) {
t.Parallel()

ctx := context.Background()

serverNetConn, clientNetConn := net.Pipe()

// The remote server will close this to acknowledge that the call has reached the other
// side; this makes sure we are testing the right logic, not the scenario where the
// connection is dropped before the message is sent, which could fail with an IO error,
// as opposed to being rejected on shutdown.
readyCh := make(chan struct{})

serverRpcConn := rpc.NewConn(transport.NewStream(serverNetConn), &rpc.Options{
BootstrapClient: capnp.Client(testcapnp.PingPong_ServerToClient(dropPingServer{
readyCh: readyCh,
})),
})

clientRpcConn := rpc.NewConn(transport.NewStream(clientNetConn), nil)

client := testcapnp.PingPong(clientRpcConn.Bootstrap(ctx))
defer client.Release()
future, release := client.EchoNum(ctx, nil)
defer release()
<-readyCh
serverNetConn.Close()
_, err := future.Struct()
if !capnp.IsDisconnected(err) {
t.Fatalf("Wanted disconnected error but got %v", err)
}
<-serverRpcConn.Done()
<-clientRpcConn.Done()
}

type dropPingServer struct {
readyCh chan<- struct{}
}

func (s dropPingServer) EchoNum(ctx context.Context, p testcapnp.PingPong_echoNum) error {
p.Ack()
close(s.readyCh)
<-ctx.Done()
return nil
}

0 comments on commit 066ffec

Please sign in to comment.