Skip to content

Commit

Permalink
fix: writer cannot be nil, close writer on client side
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Stewart <christian@aperture.us>
  • Loading branch information
paralin committed Aug 2, 2024
1 parent 4e13e0e commit 0a75ea3
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 17 deletions.
4 changes: 1 addition & 3 deletions srpc/client-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ func (r *ClientRPC) HandleCallStart(pkt *CallStart) error {

// Close releases any resources held by the ClientRPC.
func (r *ClientRPC) Close() {
if r.writer != nil {
_ = r.WriteCallCancel()
}
_ = r.WriteCallCancel()

r.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
r.closeLocked(broadcast)
Expand Down
5 changes: 4 additions & 1 deletion srpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ func (c *client) NewStream(ctx context.Context, service, method string, firstMsg
return nil, err
}

return NewMsgStream(ctx, clientRPC, clientRPC.ctxCancel), nil
return NewMsgStream(ctx, clientRPC, func() {
clientRPC.ctxCancel()
_ = writer.Close()
}), nil
}

// _ is a type assertion
Expand Down
16 changes: 3 additions & 13 deletions srpc/common-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ func (c *commonRPC) ReadOne() ([]byte, error) {

// WriteCallData writes a call data packet.
func (c *commonRPC) WriteCallData(data []byte, complete bool, err error) error {
if c.writer == nil {
return ErrCompleted
}
outPkt := NewCallDataPacket(data, len(data) == 0, false, nil)
return c.writer.WritePacket(outPkt)
}
Expand All @@ -131,9 +128,7 @@ func (c *commonRPC) HandleStreamClose(closeErr error) {
}
c.dataClosed = true
c.ctxCancel()
if c.writer != nil {
_ = c.writer.Close()
}
_ = c.writer.Close()
broadcast()
})
}
Expand Down Expand Up @@ -175,10 +170,7 @@ func (c *commonRPC) HandleCallData(pkt *CallData) error {

// WriteCallCancel writes a call cancel packet.
func (c *commonRPC) WriteCallCancel() error {
if c.writer != nil {
return c.writer.WritePacket(NewCallCancelPacket())
}
return nil
return c.writer.WritePacket(NewCallCancelPacket())
}

// closeLocked releases resources held by the RPC.
Expand All @@ -187,9 +179,7 @@ func (c *commonRPC) closeLocked(broadcast func()) {
if c.remoteErr == nil {
c.remoteErr = context.Canceled
}
if c.writer != nil {
_ = c.writer.Close()
}
_ = c.writer.Close()
broadcast()
c.ctxCancel()
}
1 change: 1 addition & 0 deletions srpc/server-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (r *ServerRPC) HandleCallStart(pkt *CallStart) error {

// invokeRPC invokes the RPC after CallStart is received.
func (r *ServerRPC) invokeRPC(serviceID, methodID string) {
// on the server side, the writer is closed by invokeRPC.
strm := NewMsgStream(r.ctx, r, r.ctxCancel)
ok, err := r.invoker.InvokeMethod(serviceID, methodID, strm)
if err == nil && !ok {
Expand Down

0 comments on commit 0a75ea3

Please sign in to comment.