diff --git a/consensus/pbft/batch.go b/consensus/pbft/batch.go index 40e7942a102..b9b09cbc0b9 100644 --- a/consensus/pbft/batch.go +++ b/consensus/pbft/batch.go @@ -84,7 +84,7 @@ func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatc op.pbft = newPbftCore(id, config, op, etf) op.manager.Start() op.externalEventReceiver.manager = op.manager - op.broadcaster = newBroadcaster(id, op.pbft.N, op.pbft.f, stack) + op.broadcaster = newBroadcaster(id, op.pbft.N, op.pbft.f, op.pbft.broadcastTimeout, stack) op.batchSize = config.GetInt("general.batchsize") op.batchStore = nil diff --git a/consensus/pbft/broadcast.go b/consensus/pbft/broadcast.go index bb30feea7bd..fc694c7d6b0 100644 --- a/consensus/pbft/broadcast.go +++ b/consensus/pbft/broadcast.go @@ -33,10 +33,11 @@ type communicator interface { type broadcaster struct { comm communicator - f int - msgChans map[uint64]chan *sendRequest - closed sync.WaitGroup - closedCh chan struct{} + f int + broadcastTimeout time.Duration + msgChans map[uint64]chan *sendRequest + closed sync.WaitGroup + closedCh chan struct{} } type sendRequest struct { @@ -44,15 +45,16 @@ type sendRequest struct { done chan bool } -func newBroadcaster(self uint64, N int, f int, c communicator) *broadcaster { +func newBroadcaster(self uint64, N int, f int, broadcastTimeout time.Duration, c communicator) *broadcaster { queueSize := 10 // XXX increase after testing chans := make(map[uint64]chan *sendRequest) b := &broadcaster{ - comm: c, - f: f, - msgChans: chans, - closedCh: make(chan struct{}), + comm: c, + f: f, + broadcastTimeout: broadcastTimeout, + msgChans: chans, + closedCh: make(chan struct{}), } for i := 0; i < N; i++ { if uint64(i) == self { @@ -172,7 +174,7 @@ func (b *broadcaster) send(msg *pb.Message, dest *uint64) error { } succeeded := 0 - timer := time.NewTimer(time.Second) // TODO, make this configurable + timer := time.NewTimer(b.broadcastTimeout) // This loop will try to send, until one of: // a) the required number of sends succeed diff --git a/consensus/pbft/broadcast_test.go b/consensus/pbft/broadcast_test.go index f30e3a97ea2..d8563ff3408 100644 --- a/consensus/pbft/broadcast_test.go +++ b/consensus/pbft/broadcast_test.go @@ -69,7 +69,7 @@ func TestBroadcast(t *testing.T) { } }() - b := newBroadcaster(1, 4, 1, m) + b := newBroadcaster(1, 4, 1, time.Second, m) msg := &pb.Message{Payload: []byte("hi")} b.Broadcast(msg) @@ -123,7 +123,7 @@ func TestBroadcastStuck(t *testing.T) { } }() - b := newBroadcaster(1, 4, 1, m) + b := newBroadcaster(1, 4, 1, time.Second, m) maxc := 20 for c := 0; c < maxc; c++ { @@ -168,7 +168,7 @@ func TestBroadcastUnicast(t *testing.T) { } }() - b := newBroadcaster(1, 4, 1, m) + b := newBroadcaster(1, 4, 1, time.Second, m) msg := &pb.Message{Payload: []byte("hi")} b.Unicast(msg, 0) @@ -206,7 +206,7 @@ func TestBroadcastAllFail(t *testing.T) { done: make(chan struct{}), } - b := newBroadcaster(1, 4, 1, m) + b := newBroadcaster(1, 4, 1, time.Second, m) maxc := 20 for c := 0; c < maxc; c++ { @@ -228,6 +228,41 @@ func TestBroadcastAllFail(t *testing.T) { } } +func TestBroadcastTimeout(t *testing.T) { + expectTime := 10 * time.Second + deltaTime := 50 * time.Millisecond + m := &mockIndefinitelyStuckComm{ + mockComm: mockComm{ + self: 1, + n: 4, + msgCh: make(chan mockMsg), + }, + done: make(chan struct{}), + } + + b := newBroadcaster(1, 4, 1, expectTime, m) + broadcastDone := make(chan time.Time) + + beginTime := time.Now() + go func() { + b.Broadcast(&pb.Message{Payload: []byte(fmt.Sprintf("%d", 1))}) + broadcastDone <- time.Now() + }() + + checkTime := expectTime + deltaTime + select { + case endTime := <-broadcastDone: + t.Log("Broadcast consume time: ", endTime.Sub(beginTime)) + close(broadcastDone) + close(m.done) + return + case <-time.After(checkTime): + close(broadcastDone) + close(m.done) + t.Fatalf("Broadcast timeout after %v, expected %v", checkTime, expectTime) + } +} + type mockIndefinitelyStuckComm struct { mockComm done chan struct{} @@ -250,7 +285,7 @@ func TestBroadcastIndefinitelyStuck(t *testing.T) { done: make(chan struct{}), } - b := newBroadcaster(1, 4, 1, m) + b := newBroadcaster(1, 4, 1, time.Second, m) broadcastDone := make(chan struct{}) diff --git a/consensus/pbft/config.yaml b/consensus/pbft/config.yaml index ba74553e9f0..2ed06c89e49 100644 --- a/consensus/pbft/config.yaml +++ b/consensus/pbft/config.yaml @@ -59,6 +59,9 @@ general: # Interval to send "keep-alive" null requests. Set to 0 to disable. If enabled, must be greater than request timeout nullrequest: 0s + # How long may a message broadcast take. + broadcast: 1s + ################################################################################ # # SECTION: EXECUTOR diff --git a/consensus/pbft/pbft-core.go b/consensus/pbft/pbft-core.go index 0a9405896ae..fe51bd92efd 100644 --- a/consensus/pbft/pbft-core.go +++ b/consensus/pbft/pbft-core.go @@ -155,6 +155,7 @@ type pbftCore struct { newViewTimeout time.Duration // progress timeout for new views newViewTimerReason string // what triggered the timer lastNewViewTimeout time.Duration // last timeout we used during this view change + broadcastTimeout time.Duration // progress timeout for broadcast outstandingReqBatches map[string]*RequestBatch // track whether we are waiting for request batches to execute nullRequestTimer events.Timer // timeout triggering a null request @@ -255,6 +256,10 @@ func newPbftCore(id uint64, config *viper.Viper, consumer innerStack, etf events if err != nil { instance.nullRequestTimeout = 0 } + instance.broadcastTimeout, err = time.ParseDuration(config.GetString("general.timeout.broadcast")) + if err != nil { + panic(fmt.Errorf("Cannot parse new broadcast timeout: %s", err)) + } instance.activeView = true instance.replicaCount = instance.N @@ -266,6 +271,7 @@ func newPbftCore(id uint64, config *viper.Viper, consumer innerStack, etf events logger.Infof("PBFT request timeout = %v", instance.requestTimeout) logger.Infof("PBFT view change timeout = %v", instance.newViewTimeout) logger.Infof("PBFT Checkpoint period (K) = %v", instance.K) + logger.Infof("PBFT broadcast timeout = %v", instance.broadcastTimeout) logger.Infof("PBFT Log multiplier = %v", instance.logMultiplier) logger.Infof("PBFT log size (L) = %v", instance.L) if instance.nullRequestTimeout > 0 { diff --git a/consensus/pbft/pbft-core_test.go b/consensus/pbft/pbft-core_test.go index 0517adb92e1..6cb656ff696 100644 --- a/consensus/pbft/pbft-core_test.go +++ b/consensus/pbft/pbft-core_test.go @@ -36,6 +36,34 @@ func init() { logging.SetLevel(logging.DEBUG, "") } +func TestConfigSet(t *testing.T) { + config := loadConfig() + + testKeys := []string{ + "general.mode", + "general.N", + "general.f", + "general.K", + "general.logmultiplier", + "general.batchsize", + "general.byzantine", + "general.viewchangeperiod", + "general.timeout.batch", + "general.timeout.request", + "general.timeout.viewchange", + "general.timeout.resendviewchange", + "general.timeout.nullrequest", + "general.timeout.broadcast", + "executor.queuesize", + } + + for _, key := range testKeys { + if ok := config.IsSet(key); !ok { + t.Errorf("Cannot test env override because \"%s\" does not seem to be set", key) + } + } +} + func TestEnvOverride(t *testing.T) { config := loadConfig() @@ -43,11 +71,6 @@ func TestEnvOverride(t *testing.T) { envName := "CORE_PBFT_GENERAL_MODE" // env override name overrideValue := "overide_test" // value to override default value with - // test key - if ok := config.IsSet("general.mode"); !ok { - t.Fatalf("Cannot test env override because \"%s\" does not seem to be set", key) - } - os.Setenv(envName, overrideValue) // The override config value will cause other calls to fail unless unset. defer func() { @@ -66,6 +89,104 @@ func TestEnvOverride(t *testing.T) { } +func TestIntEnvOverride(t *testing.T) { + config := loadConfig() + + tests := []struct { + key string + envName string + overrideValue string + expectValue int + }{ + {"general.N", "CORE_PBFT_GENERAL_N", "8", 8}, + {"general.f", "CORE_PBFT_GENERAL_F", "2", 2}, + {"general.K", "CORE_PBFT_GENERAL_K", "20", 20}, + {"general.logmultiplier", "CORE_PBFT_GENERAL_LOGMULTIPLIER", "6", 6}, + {"general.batchsize", "CORE_PBFT_GENERAL_BATCHSIZE", "200", 200}, + {"general.viewchangeperiod", "CORE_PBFT_GENERAL_VIEWCHANGEPERIOD", "5", 5}, + {"executor.queuesize", "CORE_PBFT_EXECUTOR_QUEUESIZE", "50", 50}, + } + + for _, test := range tests { + os.Setenv(test.envName, test.overrideValue) + + if ok := config.IsSet(test.key); !ok { + t.Errorf("Env override in place, and key \"%s\" is not set", test.key) + } + + configVal := config.GetInt(test.key) + if configVal != test.expectValue { + t.Errorf("Env override in place, expected key \"%s\" to be \"%v\" but instead got \"%d\"", test.key, test.expectValue, configVal) + } + + os.Unsetenv(test.envName) + } +} + +func TestDurationEnvOverride(t *testing.T) { + config := loadConfig() + + tests := []struct { + key string + envName string + overrideValue string + expectValue time.Duration + }{ + {"general.timeout.batch", "CORE_PBFT_GENERAL_TIMEOUT_BATCH", "2s", 2 * time.Second}, + {"general.timeout.request", "CORE_PBFT_GENERAL_TIMEOUT_REQUEST", "4s", 4 * time.Second}, + {"general.timeout.viewchange", "CORE_PBFT_GENERAL_TIMEOUT_VIEWCHANGE", "5s", 5 * time.Second}, + {"general.timeout.resendviewchange", "CORE_PBFT_GENERAL_TIMEOUT_RESENDVIEWCHANGE", "200ms", 200 * time.Millisecond}, + {"general.timeout.nullrequest", "CORE_PBFT_GENERAL_TIMEOUT_NULLREQUEST", "1s", time.Second}, + {"general.timeout.broadcast", "CORE_PBFT_GENERAL_TIMEOUT_BROADCAST", "1m", time.Minute}, + } + + for _, test := range tests { + os.Setenv(test.envName, test.overrideValue) + + if ok := config.IsSet(test.key); !ok { + t.Errorf("Env override in place, and key \"%s\" is not set", test.key) + } + + configVal := config.GetDuration(test.key) + if configVal != test.expectValue { + t.Errorf("Env override in place, expected key \"%s\" to be \"%v\" but instead got \"%v\"", test.key, test.expectValue, configVal) + } + + os.Unsetenv(test.envName) + } +} + +func TestBoolEnvOverride(t *testing.T) { + config := loadConfig() + + tests := []struct { + key string + envName string + overrideValue string + expectValue bool + }{ + {"general.byzantine", "CORE_PBFT_GENERAL_BYZANTINE", "false", false}, + {"general.byzantine", "CORE_PBFT_GENERAL_BYZANTINE", "0", false}, + {"general.byzantine", "CORE_PBFT_GENERAL_BYZANTINE", "true", true}, + {"general.byzantine", "CORE_PBFT_GENERAL_BYZANTINE", "1", true}, + } + + for i, test := range tests { + os.Setenv(test.envName, test.overrideValue) + + if ok := config.IsSet(test.key); !ok { + t.Errorf("Env override in place, and key \"%s\" is not set", test.key) + } + + configVal := config.GetBool(test.key) + if configVal != test.expectValue { + t.Errorf("Test %d Env override in place, expected key \"%s\" to be \"%v\" but instead got \"%v\"", i, test.key, test.expectValue, configVal) + } + + os.Unsetenv(test.envName) + } +} + func TestMaliciousPrePrepare(t *testing.T) { mock := &omniProto{ broadcastImpl: func(msgPayload []byte) {