Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up/fix locking in handleReturn #395

Merged
merged 2 commits into from
Dec 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 48 additions & 56 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,71 +978,63 @@ func (c *Conn) handleReturn(ctx context.Context, ret rpccp.Return, release capnp
c.er.ReportError(rpcerr.Annotate(pr.err, "incoming return"))
}

if q.bootstrapPromise == nil && pr.err == nil {
// The result of the message contains actual data (not just a
// client or an error), so we save the ReleaseFunc for later:
q.release = release
}
c.lk.Unlock()
// We're going to potentially block fulfilling some promises so fork
// off a goroutine to avoid blocking the receive loop.
//
// TODO(cleanup): This is a bit weird in that we hold the lock across
// the go statement, and do the unlock in the new goroutine, but before
// we actually block. This was less weird when the go statement wasn't
// there, and we should rework this so it's easier to understand what's
// going on.
go func() {
switch {
case q.bootstrapPromise != nil:
syncutil.Without(&c.lk, func() {
q.p.Resolve(pr.result, pr.err)
q.bootstrapPromise.Fulfill(q.p.Answer().Client())
q.p.ReleaseClients()
release()
})
case pr.err != nil:
// TODO(someday): send unimplemented message back to remote if
// pr.unimplemented == true.
lthibault marked this conversation as resolved.
Show resolved Hide resolved
syncutil.Without(&c.lk, func() {
q.p.Reject(pr.err)
release()
})
default:
q.release = release
syncutil.Without(&c.lk, func() {
q.p.Fulfill(pr.result)
})
q.p.Resolve(pr.result, pr.err)
if q.bootstrapPromise != nil {
q.bootstrapPromise.Fulfill(q.p.Answer().Client())
q.p.ReleaseClients()
// We can release now; root pointer of the result is a client, so the
// message won't be accessed:
release()
} else if pr.err != nil {
// We can release now; the result is an error, so data from the message
// won't be accessed:
release()
}
c.lk.Unlock()

// Send disembargoes. Failing to send one of these just never lifts
// the embargo on our side, but doesn't cause a leak.
//
// TODO(soon): make embargo resolve to error client.
for _, s := range pr.disembargoes {
c.sendMessage(ctx, s.buildDisembargo, func(err error) {
syncutil.With(&c.lk, func() {
// Send disembargoes. Failing to send one of these just never lifts
// the embargo on our side, but doesn't cause a leak.
//
// TODO(soon): make embargo resolve to error client.
for _, s := range pr.disembargoes {
c.sendMessage(ctx, s.buildDisembargo, func(err error) {
if err != nil {
err = fmt.Errorf("incoming return: send disembargo: %w", err)
c.er.ReportError(err)
}
})
}

// Send finish.
c.sendMessage(ctx, func(m rpccp.Message) error {
fin, err := m.NewFinish()
if err == nil {
fin.SetQuestionId(uint32(qid))
fin.SetReleaseResultCaps(false)
}
return err
}, func(err error) {
c.lk.Lock()
defer c.lk.Unlock()
defer close(q.finishMsgSend)

if err != nil {
err = fmt.Errorf("incoming return: send disembargo: %w", err)
err = fmt.Errorf("incoming return: send finish: build message: %w", err)
c.er.ReportError(err)
} else {
q.flags |= finishSent
c.lk.questionID.remove(uint32(qid))
}
})
}

// Send finish.
c.sendMessage(ctx, func(m rpccp.Message) error {
fin, err := m.NewFinish()
if err == nil {
fin.SetQuestionId(uint32(qid))
fin.SetReleaseResultCaps(false)
}
return err
}, func(err error) {
c.lk.Lock()
defer c.lk.Unlock()
defer close(q.finishMsgSend)

if err != nil {
err = fmt.Errorf("incoming return: send finish: build message: %w", err)
c.er.ReportError(err)
} else {
q.flags |= finishSent
c.lk.questionID.remove(uint32(qid))
}
})
}()

Expand Down