diff --git a/orderer/common/blockcutter/blockcutter.go b/orderer/common/blockcutter/blockcutter.go new file mode 100644 index 00000000000..310a9c0b064 --- /dev/null +++ b/orderer/common/blockcutter/blockcutter.go @@ -0,0 +1,131 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package blockcutter + +import ( + "github.com/hyperledger/fabric/orderer/common/broadcastfilter" + "github.com/hyperledger/fabric/orderer/common/configtx" + cb "github.com/hyperledger/fabric/protos/common" + + "github.com/golang/protobuf/proto" + "github.com/op/go-logging" +) + +var logger = logging.MustGetLogger("orderer/common/blockcutter") + +func init() { + logging.SetLevel(logging.DEBUG, "") +} + +// Target defines a sink for the ordered broadcast messages +type Receiver interface { + // Ordered should be invoked sequentially as messages are ordered + // If the message is a valid normal message and does not fill the batch, nil, true is returned + // If the message is a valid normal message and fills a batch, the batch, true is returned + // If the message is a valid special message (like a config message) it terminates the current batch + // and returns the current batch (if it is not empty), plus a second batch containing the special transaction and true + // If the ordered message is determined to be invalid, then nil, false is returned + Ordered(msg *cb.Envelope) ([][]*cb.Envelope, bool) + + // Cut returns the current batch and starts a new one + Cut() []*cb.Envelope +} + +type receiver struct { + batchSize int + filters *broadcastfilter.RuleSet + configManager configtx.Manager + curBatch []*cb.Envelope +} + +func NewReceiverImpl(batchSize int, filters *broadcastfilter.RuleSet, configManager configtx.Manager) Receiver { + return &receiver{ + batchSize: batchSize, + filters: filters, + configManager: configManager, + } +} + +// Ordered should be invoked sequentially as messages are ordered +// If the message is a valid normal message and does not fill the batch, nil, true is returned +// If the message is a valid normal message and fills a batch, the batch, true is returned +// If the message is a valid special message (like a config message) it terminates the current batch +// and returns the current batch (if it is not empty), plus a second batch containing the special transaction and true +// If the ordered message is determined to be invalid, then nil, false is returned +func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, bool) { + // The messages must be filtered a second time in case configuration has changed since the message was received + action, _ := r.filters.Apply(msg) + switch action { + case broadcastfilter.Accept: + logger.Debugf("Enqueuing message into batch") + r.curBatch = append(r.curBatch, msg) + + if len(r.curBatch) < r.batchSize { + return nil, true + } + + logger.Debugf("Batch size met, creating block") + newBatch := r.curBatch + r.curBatch = nil + return [][]*cb.Envelope{newBatch}, true + case broadcastfilter.Reconfigure: + // TODO, this is unmarshaling for a second time, we need a cleaner interface, maybe Apply returns a second arg with thing to put in the batch + payload := &cb.Payload{} + if err := proto.Unmarshal(msg.Payload, payload); err != nil { + logger.Errorf("A change was flagged as configuration, but could not be unmarshaled: %v", err) + return nil, false + } + newConfig := &cb.ConfigurationEnvelope{} + if err := proto.Unmarshal(payload.Data, newConfig); err != nil { + logger.Errorf("A change was flagged as configuration, but could not be unmarshaled: %v", err) + return nil, false + } + err := r.configManager.Validate(newConfig) + if err != nil { + logger.Warningf("A configuration change made it through the ingress filter but could not be included in a batch: %v", err) + return nil, false + } + + logger.Debugf("Configuration change applied successfully, committing previous block and configuration block") + firstBatch := r.curBatch + r.curBatch = nil + secondBatch := []*cb.Envelope{msg} + if firstBatch == nil { + return [][]*cb.Envelope{secondBatch}, true + } else { + return [][]*cb.Envelope{firstBatch, secondBatch}, true + } + case broadcastfilter.Reject: + logger.Debugf("Rejecting message") + return nil, false + case broadcastfilter.Forward: + logger.Debugf("Ignoring message because it was not accepted by a filter") + return nil, false + default: + logger.Fatalf("Received an unknown rule response: %v", action) + } + + return nil, false // Unreachable + +} + +// Cut returns the current batch and starts a new one +func (r *receiver) Cut() []*cb.Envelope { + batch := r.curBatch + r.curBatch = nil + return batch +} diff --git a/orderer/common/blockcutter/blockcutter_test.go b/orderer/common/blockcutter/blockcutter_test.go new file mode 100644 index 00000000000..1aea9704530 --- /dev/null +++ b/orderer/common/blockcutter/blockcutter_test.go @@ -0,0 +1,321 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package blockcutter + +import ( + "bytes" + "fmt" + "testing" + + "github.com/hyperledger/fabric/orderer/common/broadcastfilter" + "github.com/hyperledger/fabric/orderer/common/configtx" + cb "github.com/hyperledger/fabric/protos/common" + + "github.com/golang/protobuf/proto" +) + +type mockConfigManager struct { + validated bool + validateErr error +} + +func (mcm *mockConfigManager) Validate(configtx *cb.ConfigurationEnvelope) error { + mcm.validated = true + return mcm.validateErr +} + +func (mcm *mockConfigManager) Apply(message *cb.ConfigurationEnvelope) error { + panic("Unimplemented") +} + +type mockConfigFilter struct { + manager configtx.Manager +} + +func (mcf *mockConfigFilter) Apply(msg *cb.Envelope) broadcastfilter.Action { + if bytes.Equal(msg.Payload, configTx.Payload) { + if mcf.manager == nil || mcf.manager.Validate(nil) != nil { + return broadcastfilter.Reject + } + return broadcastfilter.Reconfigure + } + return broadcastfilter.Forward +} + +type mockRejectFilter struct{} + +func (mrf mockRejectFilter) Apply(message *cb.Envelope) broadcastfilter.Action { + if bytes.Equal(message.Payload, badTx.Payload) { + return broadcastfilter.Reject + } + return broadcastfilter.Forward +} + +type mockAcceptFilter struct{} + +func (mrf mockAcceptFilter) Apply(message *cb.Envelope) broadcastfilter.Action { + if bytes.Equal(message.Payload, goodTx.Payload) { + return broadcastfilter.Accept + } + return broadcastfilter.Forward +} + +func getFiltersAndConfig() (*broadcastfilter.RuleSet, *mockConfigManager) { + cm := &mockConfigManager{} + filters := broadcastfilter.NewRuleSet([]broadcastfilter.Rule{ + &mockConfigFilter{cm}, + &mockRejectFilter{}, + &mockAcceptFilter{}, + }) + return filters, cm +} + +var badTx = &cb.Envelope{Payload: []byte("BAD")} +var goodTx = &cb.Envelope{Payload: []byte("GOOD")} +var configTx = &cb.Envelope{Payload: []byte("CONFIG")} +var unmatchedTx = &cb.Envelope{Payload: []byte("UNMATCHED")} + +func init() { + configBytes, err := proto.Marshal(&cb.ConfigurationEnvelope{}) + if err != nil { + panic("Error marshaling empty config tx") + } + configTx = &cb.Envelope{Payload: configBytes} + +} + +func TestNormalBatch(t *testing.T) { + filters, cm := getFiltersAndConfig() + batchSize := 2 + r := NewReceiverImpl(batchSize, filters, cm) + + batches, ok := r.Ordered(goodTx) + + if batches != nil { + t.Fatalf("Should not have created batch") + } + + if !ok { + t.Fatalf("Should have enqueued message into batch") + } + + batches, ok = r.Ordered(goodTx) + + if batches == nil { + t.Fatalf("Should have created batch") + } + + if !ok { + t.Fatalf("Should have enqueued second message into batch") + } + +} + +func TestBadMessageInBatch(t *testing.T) { + filters, cm := getFiltersAndConfig() + batchSize := 2 + r := NewReceiverImpl(batchSize, filters, cm) + + batches, ok := r.Ordered(badTx) + + if batches != nil { + t.Fatalf("Should not have created batch") + } + + if ok { + t.Fatalf("Should not have enqueued bad message into batch") + } + + batches, ok = r.Ordered(goodTx) + + if batches != nil { + t.Fatalf("Should not have created batch") + } + + if !ok { + t.Fatalf("Should have enqueued good message into batch") + } + + batches, ok = r.Ordered(badTx) + + if batches != nil { + t.Fatalf("Should not have created batch") + } + + if ok { + t.Fatalf("Should not have enqueued second bad message into batch") + } +} + +func TestUnmatchedMessageInBatch(t *testing.T) { + filters, cm := getFiltersAndConfig() + batchSize := 2 + r := NewReceiverImpl(batchSize, filters, cm) + + batches, ok := r.Ordered(unmatchedTx) + + if batches != nil { + t.Fatalf("Should not have created batch") + } + + if ok { + t.Fatalf("Should not have enqueued unmatched message into batch") + } + + batches, ok = r.Ordered(goodTx) + + if batches != nil { + t.Fatalf("Should not have created batch") + } + + if !ok { + t.Fatalf("Should have enqueued good message into batch") + } + + batches, ok = r.Ordered(unmatchedTx) + + if batches != nil { + t.Fatalf("Should not have created batch from unmatched message") + } + + if ok { + t.Fatalf("Should not have enqueued second bad message into batch") + } +} + +func TestReconfigureEmptyBatch(t *testing.T) { + filters, cm := getFiltersAndConfig() + batchSize := 2 + r := NewReceiverImpl(batchSize, filters, cm) + + batches, ok := r.Ordered(configTx) + + if !ok { + t.Fatalf("Should have enqueued config message") + } + + if !cm.validated { + t.Errorf("ConfigTx should have been validated before processing") + } + + if len(batches) != 1 { + t.Fatalf("Should created new batch, got %d", len(batches)) + } + + if len(batches[0]) != 1 { + t.Fatalf("Should have had one config tx in the second batch got %d", len(batches[1])) + } + + if !bytes.Equal(batches[0][0].Payload, configTx.Payload) { + t.Fatalf("Should have had the normal tx in the first batch") + } +} + +func TestReconfigurePartialBatch(t *testing.T) { + filters, cm := getFiltersAndConfig() + batchSize := 2 + r := NewReceiverImpl(batchSize, filters, cm) + + batches, ok := r.Ordered(goodTx) + + if batches != nil { + t.Fatalf("Should not have created batch") + } + + if !ok { + t.Fatalf("Should have enqueued good message into batch") + } + + batches, ok = r.Ordered(configTx) + + if !ok { + t.Fatalf("Should have enqueued config message") + } + + if !cm.validated { + t.Errorf("ConfigTx should have been validated before processing") + } + + if len(batches) != 2 { + t.Fatalf("Should have created two batches, got %d", len(batches)) + } + + if len(batches[0]) != 1 { + t.Fatalf("Should have had one normal tx in the first batch got %d", len(batches[0])) + } + + if !bytes.Equal(batches[0][0].Payload, goodTx.Payload) { + t.Fatalf("Should have had the normal tx in the first batch") + } + + if len(batches[1]) != 1 { + t.Fatalf("Should have had one config tx in the second batch got %d", len(batches[1])) + } + + if !bytes.Equal(batches[1][0].Payload, configTx.Payload) { + t.Fatalf("Should have had the normal tx in the first batch") + } +} + +func TestReconfigureFailToVerify(t *testing.T) { + filters, cm := getFiltersAndConfig() + cm.validateErr = fmt.Errorf("Fail to apply") + batchSize := 2 + r := NewReceiverImpl(batchSize, filters, cm) + + batches, ok := r.Ordered(goodTx) + + if batches != nil { + t.Fatalf("Should not have created batch") + } + + if !ok { + t.Fatalf("Should have enqueued good message into batch") + } + + batches, ok = r.Ordered(configTx) + + if !cm.validated { + t.Errorf("ConfigTx should have been validated before processing") + } + + if batches != nil { + t.Fatalf("Should not have created batch") + } + + if ok { + t.Fatalf("Should not have enqueued bad config message into batch") + } + + batches, ok = r.Ordered(goodTx) + + if batches == nil { + t.Fatalf("Should have created batch") + } + + if len(batches) != 1 { + t.Fatalf("Batches should only have had one batch") + } + + if len(batches[0]) != 2 { + t.Fatalf("Should have had full batch") + } + + if !ok { + t.Fatalf("Should have enqueued good message into batch") + } +} diff --git a/orderer/solo/consensus.go b/orderer/solo/consensus.go index 90d465c3cbe..dbb378b5fe6 100644 --- a/orderer/solo/consensus.go +++ b/orderer/solo/consensus.go @@ -19,12 +19,12 @@ package solo import ( "time" + "github.com/hyperledger/fabric/orderer/common/blockcutter" "github.com/hyperledger/fabric/orderer/common/broadcastfilter" "github.com/hyperledger/fabric/orderer/common/configtx" "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" - "github.com/golang/protobuf/proto" "github.com/op/go-logging" ) @@ -35,13 +35,11 @@ func init() { } type consenter struct { - batchSize int - batchTimeout time.Duration - rl rawledger.Writer - filter *broadcastfilter.RuleSet - configManager configtx.Manager - sendChan chan *cb.Envelope - exitChan chan struct{} + batchTimeout time.Duration + cutter blockcutter.Receiver + rl rawledger.Writer + sendChan chan *cb.Envelope + exitChan chan struct{} } func NewConsenter(batchSize int, batchTimeout time.Duration, rl rawledger.Writer, filters *broadcastfilter.RuleSet, configManager configtx.Manager) *consenter { @@ -52,13 +50,11 @@ func NewConsenter(batchSize int, batchTimeout time.Duration, rl rawledger.Writer func newPlainConsenter(batchSize int, batchTimeout time.Duration, rl rawledger.Writer, filters *broadcastfilter.RuleSet, configManager configtx.Manager) *consenter { bs := &consenter{ - batchSize: batchSize, - batchTimeout: batchTimeout, - rl: rl, - filter: filters, - configManager: configManager, - sendChan: make(chan *cb.Envelope), - exitChan: make(chan struct{}), + cutter: blockcutter.NewReceiverImpl(batchSize, filters, configManager), + batchTimeout: batchTimeout, + rl: rl, + sendChan: make(chan *cb.Envelope), + exitChan: make(chan struct{}), } return bs } @@ -78,66 +74,27 @@ func (bs *consenter) Enqueue(env *cb.Envelope) bool { } func (bs *consenter) main() { - var curBatch []*cb.Envelope var timer <-chan time.Time - cutBatch := func() { - bs.rl.Append(curBatch, nil) - curBatch = nil - timer = nil - } - for { select { case msg := <-bs.sendChan: - // The messages must be filtered a second time in case configuration has changed since the message was received - action, _ := bs.filter.Apply(msg) - switch action { - case broadcastfilter.Accept: - curBatch = append(curBatch, msg) - - if len(curBatch) >= bs.batchSize { - logger.Debugf("Batch size met, creating block") - cutBatch() - } else if len(curBatch) == 1 { - // If this is the first request in a batch, start the batch timer - timer = time.After(bs.batchTimeout) - } - case broadcastfilter.Reconfigure: - // TODO, this is unmarshaling for a second time, we need a cleaner interface, maybe Apply returns a second arg with thing to put in the batch - payload := &cb.Payload{} - if err := proto.Unmarshal(msg.Payload, payload); err != nil { - logger.Errorf("A change was flagged as configuration, but could not be unmarshaled: %v", err) - continue - } - newConfig := &cb.ConfigurationEnvelope{} - if err := proto.Unmarshal(payload.Data, newConfig); err != nil { - logger.Errorf("A change was flagged as configuration, but could not be unmarshaled: %v", err) - continue - } - err := bs.configManager.Apply(newConfig) - if err != nil { - logger.Warningf("A configuration change made it through the ingress filter but could not be included in a batch: %v", err) - continue - } - - logger.Debugf("Configuration change applied successfully, committing previous block and configuration block") - cutBatch() - bs.rl.Append([]*cb.Envelope{msg}, nil) - case broadcastfilter.Reject: - fallthrough - case broadcastfilter.Forward: - logger.Debugf("Ignoring message because it was not accepted by a filter") - default: - logger.Fatalf("Received an unknown rule response: %v", action) + batches, ok := bs.cutter.Ordered(msg) + if ok && len(batches) == 0 && timer == nil { + timer = time.After(bs.batchTimeout) + continue + } + for _, batch := range batches { + bs.rl.Append(batch, nil) } case <-timer: - if len(curBatch) == 0 { + batch := bs.cutter.Cut() + if len(batch) == 0 { logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug") continue } logger.Debugf("Batch timer expired, creating block") - cutBatch() + bs.rl.Append(batch, nil) case <-bs.exitChan: logger.Debugf("Exiting") return diff --git a/orderer/solo/consensus_test.go b/orderer/solo/consensus_test.go index 3f8f30d7484..fddb32419b1 100644 --- a/orderer/solo/consensus_test.go +++ b/orderer/solo/consensus_test.go @@ -192,39 +192,4 @@ func TestReconfigureGoodPath(t *testing.T) { if !cm.validated { t.Errorf("ConfigTx should have been validated before processing") } - - if !cm.applied { - t.Errorf("ConfigTx should have been applied after processing") - } -} - -func TestReconfigureFailToApply(t *testing.T) { - filters, cm := getFiltersAndConfig() - cm.applyErr = fmt.Errorf("Fail to apply") - batchSize := 2 - bs := newPlainConsenter(batchSize, time.Hour, ramledger.New(10, genesisBlock), filters, cm) - done := make(chan struct{}) - go func() { - bs.main() - close(done) - }() - - bs.sendChan <- &cb.Envelope{Payload: []byte("Msg1")} - bs.sendChan <- &cb.Envelope{Payload: configTx} - bs.sendChan <- &cb.Envelope{Payload: []byte("Msg2")} - - bs.halt() - <-done - expected := uint64(2) - if bs.rl.(rawledger.Reader).Height() != expected { - t.Fatalf("Expected %d blocks but got %d", expected, bs.rl.(rawledger.Reader).Height()) - } - - if !cm.validated { - t.Errorf("ConfigTx should have been validated before processing") - } - - if !cm.applied { - t.Errorf("ConfigTx should tried to apply") - } }