diff --git a/rpc/import.go b/rpc/import.go index 8ff817f1..1bca6187 100644 --- a/rpc/import.go +++ b/rpc/import.go @@ -98,8 +98,9 @@ func (ic *importClient) Send(ctx context.Context, s capnp.Send) (*capnp.Answer, q := ic.c.newQuestion(s.Method) // Send call message. + var err error syncutil.Without(&ic.c.mu, func() { - ic.c.sendMessage(ctx, func(m rpccp.Message) error { + err = ic.c.sendMessage(ctx, func(m rpccp.Message) error { return ic.c.newImportCallMessage(m, ic.id, q.id, s) }, func(err error) { ic.c.mu.Lock() @@ -120,6 +121,10 @@ func (ic *importClient) Send(ctx context.Context, s capnp.Send) (*capnp.Answer, }) }) + if err != nil { + return capnp.ErrorAnswer(s.Method, err), func() {} + } + ans := q.p.Answer() return ans, func() { <-ans.Done() diff --git a/rpc/question.go b/rpc/question.go index 97725a4e..01997d69 100644 --- a/rpc/question.go +++ b/rpc/question.go @@ -151,9 +151,10 @@ func (q *question) PipelineSend(ctx context.Context, transform []capnp.PipelineO q.mark(transform) q2 := q.c.newQuestion(s.Method) + var err error syncutil.Without(&q.c.mu, func() { // Send call message. - q.c.sendMessage(ctx, func(m rpccp.Message) error { + err = q.c.sendMessage(ctx, func(m rpccp.Message) error { return q.c.newPipelineCallMessage(m, q.id, transform, q2.id, s) }, func(err error) { if err != nil { @@ -171,9 +172,12 @@ func (q *question) PipelineSend(ctx context.Context, transform []capnp.PipelineO q2.handleCancel(ctx) }() }) - }) + if err != nil { + return capnp.ErrorAnswer(s.Method, err), func() {} + } + ans := q2.p.Answer() return ans, func() { <-ans.Done()