From 8e868b8ed0cb8a02ade3e85f01cf7b5ab737eddf Mon Sep 17 00:00:00 2001 From: jiangyaoguo Date: Wed, 30 Nov 2016 20:46:49 +0800 Subject: [PATCH] fix DeliverService stop Currently when peer stopes, DeliverService.stop will be blocked. So you can't use "ctrl+c" or "kill" to interrupt or stop peer. Because DeliverService use a unbuffered channel stopChan to send stop signal. When peer is a gossip.orgLeader, DeliverService don't receive from stopChan. So DeliverService.stop will block at "d.stopChan <- true". Fix the block bug and use a atomic flag to distinguash unexpected connection error and initiative stop. Change-Id: If2afd226c5b074e3b78157d84e2f267e741208aa Signed-off-by: jiangyaoguo --- core/committer/noopssinglechain/client.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/core/committer/noopssinglechain/client.go b/core/committer/noopssinglechain/client.go index 997e60a336c..008d21ec89d 100644 --- a/core/committer/noopssinglechain/client.go +++ b/core/committer/noopssinglechain/client.go @@ -17,6 +17,7 @@ limitations under the License. package noopssinglechain import ( + "sync/atomic" "time" "github.com/golang/protobuf/proto" @@ -60,6 +61,7 @@ type DeliverService struct { gossip gossip.Gossip conn *grpc.ClientConn + stopFlag int32 stopChan chan bool } @@ -172,6 +174,8 @@ func (d *DeliverService) Start() { // Stop all service and release resources func (d *DeliverService) Stop() { + atomic.StoreInt32(&d.stopFlag, 1) + d.stopDeliver() d.stopChan <- true d.stateProvider.Stop() d.gossip.Stop() @@ -211,13 +215,23 @@ func (d *DeliverService) seekLatestFromCommitter(height uint64) error { }) } +// Internal function to check whenever we need to finish listening +// for new messages to arrive +func (d *DeliverService) isDone() bool { + + return atomic.LoadInt32(&d.stopFlag) == 1 +} + func (d *DeliverService) readUntilClose() { for { msg, err := d.client.Recv() if err != nil { + logger.Warningf("Receive error: %s", err.Error()) + if d.isDone() { + <-d.stopChan + } return } - switch t := msg.Type.(type) { case *orderer.DeliverResponse_Error: if t.Error == common.Status_SUCCESS {