Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipelined calls on snapshots #529

Merged
merged 3 commits into from
Jun 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,11 +596,18 @@ func (cs ClientSnapshot) IsPromise() bool {

// Send implements ClientHook.Send
func (cs ClientSnapshot) Send(ctx context.Context, s Send) (*Answer, ReleaseFunc) {
if cs.hook == nil {
return ErrorAnswer(s.Method, errors.New("call on null client")), func() {}
}
return cs.hook.Value().Send(ctx, s)
}

// Recv implements ClientHook.Recv
func (cs ClientSnapshot) Recv(ctx context.Context, r Recv) PipelineCaller {
if cs.hook == nil {
r.Reject(errors.New("call on null client"))
return nil
}
return cs.hook.Value().Recv(ctx, r)
}

Expand Down
23 changes: 20 additions & 3 deletions rpc/answer.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ type ansReturner struct {
// Set by AllocResults and setBootstrap, but contents can only be read
// if flags has resultsReady but not finishReceived set.
results rpccp.Payload

// Snapshots of the clients results's capTable, at the time of PrepareReturn().
// Pipelined calls are invoked on the snapshots, rather than the clients,
// to respect the invariant regarding the 4-way race condition that table
// entries must not path-shorten.
resultsCapTable []capnp.ClientSnapshot
}

type answerFlags uint8
Expand Down Expand Up @@ -266,6 +272,15 @@ func (ans *ansent) prepareSendReturn(dq *deferred.Queue) {
c.er.ReportError(rpcerr.Annotate(err, "send return"))
}
// Continue. Don't fail to send return if cap table isn't fully filled.
results := ans.returner.results
if results.IsValid() {
capTable := results.Message().CapTable()
snapshots := make([]capnp.ClientSnapshot, capTable.Len())
for i := range snapshots {
snapshots[i] = capTable.At(i).Snapshot()
}
ans.returner.resultsCapTable = snapshots
}

select {
case <-c.bgctx.Done():
Expand Down Expand Up @@ -353,15 +368,17 @@ func (ans *ansent) completeSendException(dq *deferred.Queue) {
}
}

// destroy removes the answer from the table and returns ReleaseFuncs to
// run. The answer must have sent a return and received a finish.
// The caller must be holding onto ans.c.lk.
// destroy removes the answer from the table and schedule ReleaseFuncs to
// run using dq. The answer must have sent a return and received a finish.
//
// shutdown has its own strategy for cleaning up an answer.
func (ans *ansent) destroy(dq *deferred.Queue) error {
dq.Defer(ans.returner.msgReleaser.Decr)
c := ans.lockedConn()
delete(c.lk.answers, ans.returner.id)
for _, s := range ans.returner.resultsCapTable {
dq.Defer(s.Release)
}
if !ans.flags.Contains(releaseResultCapsFlag) || len(ans.exportRefs) == 0 {
return nil

Expand Down
22 changes: 16 additions & 6 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,13 @@ func (c *lockedConn) liftEmbargoes(dq *deferred.Queue, embargoes []*embargo) {

func (c *lockedConn) releaseAnswers(dq *deferred.Queue, answers map[answerID]*ansent) {
for _, a := range answers {
if a != nil && a.returner.msgReleaser != nil {
dq.Defer(a.returner.msgReleaser.Decr)
if a != nil {
for _, s := range a.returner.resultsCapTable {
dq.Defer(s.Release)
}
if a.returner.msgReleaser != nil {
dq.Defer(a.returner.msgReleaser.Decr)
}
}
}
}
Expand Down Expand Up @@ -899,11 +904,15 @@ func (c *Conn) handleCall(ctx context.Context, in transport.IncomingMessage) err
return nil
}
iface := sub.Interface()
var tgt capnp.Client
var tgt capnp.ClientSnapshot
if sub.IsValid() && !iface.IsValid() {
tgt = capnp.ErrorClient(rpcerr.Failed(ErrNotACapability))
tgt = capnp.ErrorClient(rpcerr.Failed(ErrNotACapability)).Snapshot()
} else {
tgt = tgtAns.returner.results.Message().CapTable().Get(iface)
capID := iface.Capability()
capTable := tgtAns.returner.resultsCapTable
if int(capID) < len(capTable) {
tgt = capTable[capID].AddRef()
}
}

c.tasks.Add(1) // will be finished by answer.Return
Expand All @@ -912,7 +921,8 @@ func (c *Conn) handleCall(ctx context.Context, in transport.IncomingMessage) err
pcall := newPromisedPipelineCaller()
ans.setPipelineCaller(p.method, pcall)
dq.Defer(func() {
pcall.resolve(tgt.RecvCall(callCtx, recv))
defer tgt.Release()
pcall.resolve(tgt.Recv(callCtx, recv))
})
} else {
// Results not ready, use pipeline caller.
Expand Down