diff --git a/orderer/common/deliver/deliver.go b/orderer/common/deliver/deliver.go index 23d81025299..631be82530c 100644 --- a/orderer/common/deliver/deliver.go +++ b/orderer/common/deliver/deliver.go @@ -23,7 +23,7 @@ import ( "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/orderer/common/filter" "github.com/hyperledger/fabric/orderer/common/sigfilter" - ordererledger "github.com/hyperledger/fabric/orderer/ledger" + "github.com/hyperledger/fabric/orderer/ledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "github.com/op/go-logging" @@ -50,7 +50,7 @@ type Support interface { PolicyManager() policies.Manager // Reader returns the chain Reader for the chain - Reader() ordererledger.Reader + Reader() ledger.Reader // SharedConfig returns the shared config manager for this chain SharedConfig() configvaluesapi.Orderer diff --git a/orderer/common/deliver/deliver_test.go b/orderer/common/deliver/deliver_test.go index 31bd443c77a..5301dccef78 100644 --- a/orderer/common/deliver/deliver_test.go +++ b/orderer/common/deliver/deliver_test.go @@ -26,7 +26,7 @@ import ( mockconfigvaluesorderer "github.com/hyperledger/fabric/common/mocks/configvalues/channel/orderer" mockpolicies "github.com/hyperledger/fabric/common/mocks/policies" "github.com/hyperledger/fabric/common/policies" - ordererledger "github.com/hyperledger/fabric/orderer/ledger" + "github.com/hyperledger/fabric/orderer/ledger" ramledger "github.com/hyperledger/fabric/orderer/ledger/ram" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" @@ -81,7 +81,7 @@ func (mm *mockSupportManager) GetChain(chainID string) (Support, bool) { } type mockSupport struct { - ledger ordererledger.ReadWriter + ledger ledger.ReadWriter sharedConfig *mockconfigvaluesorderer.SharedConfig policyManager *mockpolicies.Manager } @@ -90,11 +90,11 @@ func (mcs *mockSupport) PolicyManager() policies.Manager { return mcs.policyManager } -func (mcs *mockSupport) Reader() ordererledger.Reader { +func (mcs *mockSupport) Reader() ledger.Reader { return mcs.ledger } -func NewRAMLedger() ordererledger.ReadWriter { +func NewRAMLedger() ledger.ReadWriter { rlf := ramledger.New(ledgerSize + 1) rl, _ := rlf.GetOrCreate(provisional.TestChainID) rl.Append(genesisBlock) @@ -118,11 +118,11 @@ func newMockMultichainManager() *mockSupportManager { return mm } -var seekOldest = &ab.SeekPosition{Type: &ab.SeekPosition_Oldest{&ab.SeekOldest{}}} -var seekNewest = &ab.SeekPosition{Type: &ab.SeekPosition_Newest{&ab.SeekNewest{}}} +var seekOldest = &ab.SeekPosition{Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}}} +var seekNewest = &ab.SeekPosition{Type: &ab.SeekPosition_Newest{Newest: &ab.SeekNewest{}}} func seekSpecified(number uint64) *ab.SeekPosition { - return &ab.SeekPosition{Type: &ab.SeekPosition_Specified{&ab.SeekSpecified{Number: number}}} + return &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: number}}} } func makeSeek(chainID string, seekInfo *ab.SeekInfo) *cb.Envelope { @@ -142,8 +142,8 @@ func makeSeek(chainID string, seekInfo *ab.SeekInfo) *cb.Envelope { func TestOldestSeek(t *testing.T) { mm := newMockMultichainManager() for i := 1; i < ledgerSize; i++ { - ledger := mm.chains[systemChainID].ledger - ledger.Append(ordererledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) + l := mm.chains[systemChainID].ledger + l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) } m := newMockD() @@ -181,8 +181,8 @@ func TestOldestSeek(t *testing.T) { func TestNewestSeek(t *testing.T) { mm := newMockMultichainManager() for i := 1; i < ledgerSize; i++ { - ledger := mm.chains[systemChainID].ledger - ledger.Append(ordererledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) + l := mm.chains[systemChainID].ledger + l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) } m := newMockD() @@ -213,8 +213,8 @@ func TestNewestSeek(t *testing.T) { func TestSpecificSeek(t *testing.T) { mm := newMockMultichainManager() for i := 1; i < ledgerSize; i++ { - ledger := mm.chains[systemChainID].ledger - ledger.Append(ordererledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) + l := mm.chains[systemChainID].ledger + l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) } m := newMockD() @@ -251,8 +251,8 @@ func TestSpecificSeek(t *testing.T) { func TestUnauthorizedSeek(t *testing.T) { mm := newMockMultichainManager() for i := 1; i < ledgerSize; i++ { - ledger := mm.chains[systemChainID].ledger - ledger.Append(ordererledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) + l := mm.chains[systemChainID].ledger + l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) } mm.chains[systemChainID].policyManager.Policy.Err = fmt.Errorf("Fail to evaluate policy") @@ -277,8 +277,8 @@ func TestUnauthorizedSeek(t *testing.T) { func TestBadSeek(t *testing.T) { mm := newMockMultichainManager() for i := 1; i < ledgerSize; i++ { - ledger := mm.chains[systemChainID].ledger - ledger.Append(ordererledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) + l := mm.chains[systemChainID].ledger + l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) } m := newMockD() @@ -302,8 +302,8 @@ func TestBadSeek(t *testing.T) { func TestFailFastSeek(t *testing.T) { mm := newMockMultichainManager() for i := 1; i < ledgerSize; i++ { - ledger := mm.chains[systemChainID].ledger - ledger.Append(ordererledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) + l := mm.chains[systemChainID].ledger + l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) } m := newMockD() @@ -336,8 +336,8 @@ func TestFailFastSeek(t *testing.T) { func TestBlockingSeek(t *testing.T) { mm := newMockMultichainManager() for i := 1; i < ledgerSize; i++ { - ledger := mm.chains[systemChainID].ledger - ledger.Append(ordererledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) + l := mm.chains[systemChainID].ledger + l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) } m := newMockD() @@ -363,8 +363,8 @@ func TestBlockingSeek(t *testing.T) { case <-time.After(50 * time.Millisecond): } - ledger := mm.chains[systemChainID].ledger - ledger.Append(ordererledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", ledgerSize+1))}})) + l := mm.chains[systemChainID].ledger + l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", ledgerSize+1))}})) select { case deliverReply := <-m.sendChan: diff --git a/orderer/ledger/blackbox_test.go b/orderer/ledger/blackbox_test.go index 6f7cae6cf7c..9ea355074ed 100644 --- a/orderer/ledger/blackbox_test.go +++ b/orderer/ledger/blackbox_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package ordererledger_test +package ledger_test import ( "bytes" diff --git a/orderer/ledger/file/factory.go b/orderer/ledger/file/factory.go index ce60ded6bf7..14310aff04d 100644 --- a/orderer/ledger/file/factory.go +++ b/orderer/ledger/file/factory.go @@ -21,58 +21,58 @@ import ( "github.com/hyperledger/fabric/common/ledger/blkstorage" "github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage" - ordererledger "github.com/hyperledger/fabric/orderer/ledger" + "github.com/hyperledger/fabric/orderer/ledger" ) type fileLedgerFactory struct { blkstorageProvider blkstorage.BlockStoreProvider - ledgers map[string]ordererledger.ReadWriter + ledgers map[string]ledger.ReadWriter mutex sync.Mutex } // GetOrCreate gets an existing ledger (if it exists) or creates it if it does not -func (lf *fileLedgerFactory) GetOrCreate(chainID string) (ordererledger.ReadWriter, error) { - lf.mutex.Lock() - defer lf.mutex.Unlock() +func (flf *fileLedgerFactory) GetOrCreate(chainID string) (ledger.ReadWriter, error) { + flf.mutex.Lock() + defer flf.mutex.Unlock() key := chainID // check cache - ledger, ok := lf.ledgers[key] + ledger, ok := flf.ledgers[key] if ok { return ledger, nil } // open fresh - blockStore, err := lf.blkstorageProvider.OpenBlockStore(key) + blockStore, err := flf.blkstorageProvider.OpenBlockStore(key) if err != nil { return nil, err } ledger = &fileLedger{blockStore: blockStore, signal: make(chan struct{})} - lf.ledgers[key] = ledger + flf.ledgers[key] = ledger return ledger, nil } -// ChainIDs returns the chain IDs the Factory is aware of -func (lf *fileLedgerFactory) ChainIDs() []string { - chainIDs, err := lf.blkstorageProvider.List() +// ChainIDs returns the chain IDs the factory is aware of +func (flf *fileLedgerFactory) ChainIDs() []string { + chainIDs, err := flf.blkstorageProvider.List() if err != nil { - panic(err) + logger.Panic(err) } return chainIDs } -// Close closes the file ledgers served by this factory -func (lf *fileLedgerFactory) Close() { - lf.blkstorageProvider.Close() +// Close releases all resources acquired by the factory +func (flf *fileLedgerFactory) Close() { + flf.blkstorageProvider.Close() } // New creates a new ledger factory -func New(directory string) ordererledger.Factory { +func New(directory string) ledger.Factory { return &fileLedgerFactory{ blkstorageProvider: fsblkstorage.NewProvider( fsblkstorage.NewConf(directory, -1), &blkstorage.IndexConfig{ AttrsToIndex: []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNum}}, ), - ledgers: make(map[string]ordererledger.ReadWriter), + ledgers: make(map[string]ledger.ReadWriter), } } diff --git a/orderer/ledger/file/fileledger.go b/orderer/ledger/file/impl.go similarity index 91% rename from orderer/ledger/file/fileledger.go rename to orderer/ledger/file/impl.go index 7a38952f6e3..3798d031091 100644 --- a/orderer/ledger/file/fileledger.go +++ b/orderer/ledger/file/impl.go @@ -18,13 +18,13 @@ package fileledger import ( "github.com/hyperledger/fabric/common/ledger/blkstorage" - ordererledger "github.com/hyperledger/fabric/orderer/ledger" + ledger "github.com/hyperledger/fabric/orderer/ledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "github.com/op/go-logging" ) -var logger = logging.MustGetLogger("ordererledger/fileledger") +var logger = logging.MustGetLogger("orderer/fileledger") var closedChan chan struct{} func init() { @@ -69,33 +69,33 @@ func (i *fileLedgerIterator) ReadyChan() <-chan struct{} { // Iterator returns an Iterator, as specified by a cb.SeekInfo message, and its // starting block number -func (fl *fileLedger) Iterator(startPosition *ab.SeekPosition) (ordererledger.Iterator, uint64) { +func (fl *fileLedger) Iterator(startPosition *ab.SeekPosition) (ledger.Iterator, uint64) { switch start := startPosition.Type.(type) { case *ab.SeekPosition_Oldest: return &fileLedgerIterator{ledger: fl, blockNumber: 0}, 0 case *ab.SeekPosition_Newest: info, err := fl.blockStore.GetBlockchainInfo() if err != nil { - panic(err) + logger.Panic(err) } newestBlockNumber := info.Height - 1 return &fileLedgerIterator{ledger: fl, blockNumber: newestBlockNumber}, newestBlockNumber case *ab.SeekPosition_Specified: height := fl.Height() if start.Specified.Number > height { - return &ordererledger.NotFoundErrorIterator{}, 0 + return &ledger.NotFoundErrorIterator{}, 0 } return &fileLedgerIterator{ledger: fl, blockNumber: start.Specified.Number}, start.Specified.Number } // This line should be unreachable, but the compiler requires it - return &ordererledger.NotFoundErrorIterator{}, 0 + return &ledger.NotFoundErrorIterator{}, 0 } // Height returns the number of blocks on the ledger func (fl *fileLedger) Height() uint64 { info, err := fl.blockStore.GetBlockchainInfo() if err != nil { - panic(err) + logger.Panic(err) } return info.Height } diff --git a/orderer/ledger/file/fileledger_test.go b/orderer/ledger/file/impl_test.go similarity index 89% rename from orderer/ledger/file/fileledger_test.go rename to orderer/ledger/file/impl_test.go index e10739249bf..7fbd3f640eb 100644 --- a/orderer/ledger/file/fileledger_test.go +++ b/orderer/ledger/file/impl_test.go @@ -23,7 +23,7 @@ import ( "testing" "github.com/hyperledger/fabric/common/configtx/tool/provisional" - ordererledger "github.com/hyperledger/fabric/orderer/ledger" + "github.com/hyperledger/fabric/orderer/ledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" @@ -39,7 +39,7 @@ func init() { type testEnv struct { t *testing.T location string - flf ordererledger.Factory + flf ledger.Factory } func initialize(t *testing.T) (*testEnv, *fileLedger) { @@ -75,7 +75,7 @@ func TestInitialization(t *testing.T) { if fl.Height() != 1 { t.Fatalf("Block height should be 1") } - block := ordererledger.GetBlock(fl, 0) + block := ledger.GetBlock(fl, 0) if block == nil { t.Fatalf("Error retrieving genesis block") } @@ -92,7 +92,7 @@ func TestReinitialization(t *testing.T) { defer tev.tearDown() // create a block to add to the ledger - b1 := ordererledger.CreateNextBlock(leger1, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}) + b1 := ledger.CreateNextBlock(leger1, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}) // add the block to the ledger leger1.Append(b1) @@ -122,7 +122,7 @@ func TestReinitialization(t *testing.T) { if fl.Height() != 2 { t.Fatalf("Block height should be 2. Got %v", fl.Height()) } - block := ordererledger.GetBlock(fl, 1) + block := ledger.GetBlock(fl, 1) if block == nil { t.Fatalf("Error retrieving block 1") } @@ -159,11 +159,11 @@ func TestAddition(t *testing.T) { defer tev.tearDown() info, _ := fl.blockStore.GetBlockchainInfo() prevHash := info.CurrentBlockHash - fl.Append(ordererledger.CreateNextBlock(fl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})) + fl.Append(ledger.CreateNextBlock(fl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})) if fl.Height() != 2 { t.Fatalf("Block height should be 2") } - block := ordererledger.GetBlock(fl, 1) + block := ledger.GetBlock(fl, 1) if block == nil { t.Fatalf("Error retrieving genesis block") } @@ -175,7 +175,7 @@ func TestAddition(t *testing.T) { func TestRetrieval(t *testing.T) { tev, fl := initialize(t) defer tev.tearDown() - fl.Append(ordererledger.CreateNextBlock(fl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})) + fl.Append(ledger.CreateNextBlock(fl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})) it, num := fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}}) if num != 0 { t.Fatalf("Expected genesis block iterator, but got %d", num) @@ -221,7 +221,7 @@ func TestBlockedRetrieval(t *testing.T) { t.Fatalf("Should not be ready for block read") default: } - fl.Append(ordererledger.CreateNextBlock(fl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})) + fl.Append(ledger.CreateNextBlock(fl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})) select { case <-signal: default: diff --git a/orderer/ledger/fileledger_test.go b/orderer/ledger/file_test.go similarity index 98% rename from orderer/ledger/fileledger_test.go rename to orderer/ledger/file_test.go index 19efbf8fcb2..605fd86beb9 100644 --- a/orderer/ledger/fileledger_test.go +++ b/orderer/ledger/file_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package ordererledger_test +package ledger_test import ( "io/ioutil" diff --git a/orderer/ledger/json/factory.go b/orderer/ledger/json/factory.go new file mode 100644 index 00000000000..fab2521acbe --- /dev/null +++ b/orderer/ledger/json/factory.go @@ -0,0 +1,164 @@ +/* +Copyright IBM Corp. 2017 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package jsonledger + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sync" + + "github.com/golang/protobuf/jsonpb" + "github.com/hyperledger/fabric/orderer/ledger" +) + +type jsonLedgerFactory struct { + directory string + ledgers map[string]ledger.ReadWriter + mutex sync.Mutex +} + +// GetOrCreate gets an existing ledger (if it exists) or creates it if it does not +func (jlf *jsonLedgerFactory) GetOrCreate(chainID string) (ledger.ReadWriter, error) { + jlf.mutex.Lock() + defer jlf.mutex.Unlock() + + key := chainID + + l, ok := jlf.ledgers[key] + if ok { + return l, nil + } + + directory := filepath.Join(jlf.directory, fmt.Sprintf(chainDirectoryFormatString, chainID)) + + logger.Debugf("Initializing chain at: %s", directory) + + if err := os.MkdirAll(directory, 0700); err != nil { + return nil, err + } + + ch := newChain(directory) + jlf.ledgers[key] = ch + return ch, nil +} + +// newChain creates a new chain backed by a JSON ledger +func newChain(directory string) ledger.ReadWriter { + jl := &jsonLedger{ + directory: directory, + signal: make(chan struct{}), + marshaler: &jsonpb.Marshaler{Indent: " "}, + } + jl.initializeBlockHeight() + logger.Debugf("Initialized to block height %d with hash %x", jl.height-1, jl.lastHash) + return jl +} + +// initializeBlockHeight verifies that all blocks exist between 0 and the block +// height, and populates the lastHash +func (jl *jsonLedger) initializeBlockHeight() { + infos, err := ioutil.ReadDir(jl.directory) + if err != nil { + logger.Panic(err) + } + nextNumber := uint64(0) + for _, info := range infos { + if info.IsDir() { + continue + } + var number uint64 + _, err := fmt.Sscanf(info.Name(), blockFileFormatString, &number) + if err != nil { + continue + } + if number != nextNumber { + logger.Panicf("Missing block %d in the chain", nextNumber) + } + nextNumber++ + } + jl.height = nextNumber + if jl.height == 0 { + return + } + block, found := jl.readBlock(jl.height - 1) + if !found { + logger.Panicf("Block %d was in directory listing but error reading", jl.height-1) + } + if block == nil { + logger.Panicf("Error reading block %d", jl.height-1) + } + jl.lastHash = block.Header.Hash() +} + +// ChainIDs returns the chain IDs the factory is aware of +func (jlf *jsonLedgerFactory) ChainIDs() []string { + jlf.mutex.Lock() + defer jlf.mutex.Unlock() + ids := make([]string, len(jlf.ledgers)) + + i := 0 + for key := range jlf.ledgers { + ids[i] = key + i++ + } + + return ids +} + +// Close is a no-op for the JSON ledger +func (jlf *jsonLedgerFactory) Close() { + return // nothing to do +} + +// New creates a new ledger factory +func New(directory string) ledger.Factory { + logger.Debugf("Initializing ledger at: %s", directory) + if err := os.MkdirAll(directory, 0700); err != nil { + logger.Fatalf("Could not create directory %s: %s", directory, err) + } + + jlf := &jsonLedgerFactory{ + directory: directory, + ledgers: make(map[string]ledger.ReadWriter), + } + + infos, err := ioutil.ReadDir(jlf.directory) + if err != nil { + logger.Panicf("Error reading from directory %s while initializing ledger: %s", jlf.directory, err) + } + + for _, info := range infos { + if !info.IsDir() { + continue + } + var chainID string + _, err := fmt.Sscanf(info.Name(), chainDirectoryFormatString, &chainID) + if err != nil { + continue + } + jl, err := jlf.GetOrCreate(chainID) + if err != nil { + logger.Warningf("Failed to initialize chain from %s: %s", chainID, err) + continue + } + jlf.ledgers[chainID] = jl + } + + return jlf +} diff --git a/orderer/ledger/json/impl.go b/orderer/ledger/json/impl.go new file mode 100644 index 00000000000..daa730fa1ce --- /dev/null +++ b/orderer/ledger/json/impl.go @@ -0,0 +1,164 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package jsonledger + +import ( + "bytes" + "fmt" + "os" + "path/filepath" + + ledger "github.com/hyperledger/fabric/orderer/ledger" + cb "github.com/hyperledger/fabric/protos/common" + ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/op/go-logging" + + "github.com/golang/protobuf/jsonpb" +) + +var logger = logging.MustGetLogger("orderer/jsonledger") +var closedChan chan struct{} + +func init() { + closedChan = make(chan struct{}) + close(closedChan) +} + +const ( + blockFileFormatString = "block_%020d.json" + chainDirectoryFormatString = "chain_%s" +) + +type cursor struct { + jl *jsonLedger + blockNumber uint64 +} + +type jsonLedger struct { + directory string + fqFormatString string + height uint64 + signal chan struct{} + lastHash []byte + marshaler *jsonpb.Marshaler +} + +// readBlock returns the block or nil, and whether the block was found or not, (nil,true) generally indicates an irrecoverable problem +func (jl *jsonLedger) readBlock(number uint64) (*cb.Block, bool) { + file, err := os.Open(jl.blockFilename(number)) + if err == nil { + defer file.Close() + block := &cb.Block{} + err = jsonpb.Unmarshal(file, block) + if err != nil { + return nil, true + } + logger.Debugf("Read block %d", block.Header.Number) + return block, true + } + return nil, false +} + +// Next blocks until there is a new block available, or returns an error if the +// next block is no longer retrievable +func (cu *cursor) Next() (*cb.Block, cb.Status) { + // This only loops once, as signal reading + // indicates the new block has been written + for { + block, found := cu.jl.readBlock(cu.blockNumber) + if found { + if block == nil { + return nil, cb.Status_SERVICE_UNAVAILABLE + } + cu.blockNumber++ + return block, cb.Status_SUCCESS + } + <-cu.jl.signal + } +} + +// ReadyChan supplies a channel which will block until Next will not block +func (cu *cursor) ReadyChan() <-chan struct{} { + signal := cu.jl.signal + if _, err := os.Stat(cu.jl.blockFilename(cu.blockNumber)); os.IsNotExist(err) { + return signal + } + return closedChan +} + +// Iterator returns an Iterator, as specified by a cb.SeekInfo message, and its +// starting block number +func (jl *jsonLedger) Iterator(startPosition *ab.SeekPosition) (ledger.Iterator, uint64) { + switch start := startPosition.Type.(type) { + case *ab.SeekPosition_Oldest: + return &cursor{jl: jl, blockNumber: 0}, 0 + case *ab.SeekPosition_Newest: + high := jl.height - 1 + return &cursor{jl: jl, blockNumber: high}, high + case *ab.SeekPosition_Specified: + if start.Specified.Number > jl.height { + return &ledger.NotFoundErrorIterator{}, 0 + } + return &cursor{jl: jl, blockNumber: start.Specified.Number}, start.Specified.Number + } + + // This line should be unreachable, but the compiler requires it + return &ledger.NotFoundErrorIterator{}, 0 +} + +// Height returns the number of blocks on the ledger +func (jl *jsonLedger) Height() uint64 { + return jl.height +} + +// Append appends a new block to the ledger +func (jl *jsonLedger) Append(block *cb.Block) error { + if block.Header.Number != jl.height { + return fmt.Errorf("Block number should have been %d but was %d", jl.height, block.Header.Number) + } + + if !bytes.Equal(block.Header.PreviousHash, jl.lastHash) { + return fmt.Errorf("Block should have had previous hash of %x but was %x", jl.lastHash, block.Header.PreviousHash) + } + + jl.writeBlock(block) + jl.lastHash = block.Header.Hash() + jl.height++ + close(jl.signal) + jl.signal = make(chan struct{}) + return nil +} + +// writeBlock commits a block to disk +func (jl *jsonLedger) writeBlock(block *cb.Block) { + file, err := os.Create(jl.blockFilename(block.Header.Number)) + if err != nil { + panic(err) + } + defer file.Close() + err = jl.marshaler.Marshal(file, block) + logger.Debugf("Wrote block %d", block.Header.Number) + if err != nil { + logger.Panic(err) + } +} + +// blockFilename returns the fully qualified path to where a block +// of a given number should be stored on disk +func (jl *jsonLedger) blockFilename(number uint64) string { + return filepath.Join(jl.directory, fmt.Sprintf(blockFileFormatString, number)) +} diff --git a/orderer/ledger/json/jsonledger_test.go b/orderer/ledger/json/impl_test.go similarity index 91% rename from orderer/ledger/json/jsonledger_test.go rename to orderer/ledger/json/impl_test.go index ea42fdfb91f..14874b62ad6 100644 --- a/orderer/ledger/json/jsonledger_test.go +++ b/orderer/ledger/json/impl_test.go @@ -23,7 +23,7 @@ import ( "testing" "github.com/hyperledger/fabric/common/configtx/tool/provisional" - ordererledger "github.com/hyperledger/fabric/orderer/ledger" + "github.com/hyperledger/fabric/orderer/ledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" @@ -81,7 +81,7 @@ func TestInitialization(t *testing.T) { func TestReinitialization(t *testing.T) { tev, ofl := initialize(t) defer tev.tearDown() - ofl.Append(ordererledger.CreateNextBlock(ofl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})) + ofl.Append(ledger.CreateNextBlock(ofl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})) flf := New(tev.location) chains := flf.ChainIDs() if len(chains) != 1 { @@ -132,7 +132,7 @@ func TestAddition(t *testing.T) { tev, fl := initialize(t) defer tev.tearDown() prevHash := fl.lastHash - fl.Append(ordererledger.CreateNextBlock(fl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})) + fl.Append(ledger.CreateNextBlock(fl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})) if fl.height != 2 { t.Fatalf("Block height should be 2") } @@ -148,7 +148,7 @@ func TestAddition(t *testing.T) { func TestRetrieval(t *testing.T) { tev, fl := initialize(t) defer tev.tearDown() - fl.Append(ordererledger.CreateNextBlock(fl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})) + fl.Append(ledger.CreateNextBlock(fl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})) it, num := fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}}) if num != 0 { t.Fatalf("Expected genesis block iterator, but got %d", num) @@ -194,7 +194,7 @@ func TestBlockedRetrieval(t *testing.T) { t.Fatalf("Should not be ready for block read") default: } - fl.Append(ordererledger.CreateNextBlock(fl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})) + fl.Append(ledger.CreateNextBlock(fl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}})) select { case <-signal: default: diff --git a/orderer/ledger/json/jsonledger.go b/orderer/ledger/json/jsonledger.go deleted file mode 100644 index 255b416e624..00000000000 --- a/orderer/ledger/json/jsonledger.go +++ /dev/null @@ -1,297 +0,0 @@ -/* -Copyright IBM Corp. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package jsonledger - -import ( - "bytes" - "fmt" - "io/ioutil" - "os" - "sync" - - ordererledger "github.com/hyperledger/fabric/orderer/ledger" - cb "github.com/hyperledger/fabric/protos/common" - ab "github.com/hyperledger/fabric/protos/orderer" - "github.com/op/go-logging" - - "github.com/golang/protobuf/jsonpb" -) - -var logger = logging.MustGetLogger("ordererledger/jsonledger") -var closedChan chan struct{} - -func init() { - closedChan = make(chan struct{}) - close(closedChan) -} - -const ( - blockFileFormatString = "block_%020d.json" - chainDirectoryFormatString = "chain_%s" -) - -type cursor struct { - jl *jsonLedger - blockNumber uint64 -} - -type jsonLedger struct { - directory string - fqFormatString string - height uint64 - signal chan struct{} - lastHash []byte - marshaler *jsonpb.Marshaler -} - -type jsonLedgerFactory struct { - directory string - ledgers map[string]ordererledger.ReadWriter - mutex sync.Mutex -} - -// New creates a new jsonledger Factory and the ordering system chain specified by the systemGenesis block (if it does not already exist) -func New(directory string) ordererledger.Factory { - - logger.Debugf("Initializing jsonLedger at '%s'", directory) - if err := os.MkdirAll(directory, 0700); err != nil { - logger.Fatalf("Could not create directory %s: %s", directory, err) - } - - jlf := &jsonLedgerFactory{ - directory: directory, - ledgers: make(map[string]ordererledger.ReadWriter), - } - - infos, err := ioutil.ReadDir(jlf.directory) - if err != nil { - logger.Panicf("Error reading from directory %s while initializing jsonledger: %s", jlf.directory, err) - } - - for _, info := range infos { - if !info.IsDir() { - continue - } - var chainID string - _, err := fmt.Sscanf(info.Name(), chainDirectoryFormatString, &chainID) - if err != nil { - continue - } - jl, err := jlf.GetOrCreate(chainID) - if err != nil { - logger.Warningf("Failed to initialize chain from %s:", err) - continue - } - jlf.ledgers[chainID] = jl - } - - return jlf -} - -func (jlf *jsonLedgerFactory) ChainIDs() []string { - jlf.mutex.Lock() - defer jlf.mutex.Unlock() - ids := make([]string, len(jlf.ledgers)) - - i := 0 - for key := range jlf.ledgers { - ids[i] = key - i++ - } - - return ids -} - -func (jlf *jsonLedgerFactory) GetOrCreate(chainID string) (ordererledger.ReadWriter, error) { - jlf.mutex.Lock() - defer jlf.mutex.Unlock() - - key := chainID - - l, ok := jlf.ledgers[key] - if ok { - return l, nil - } - - directory := fmt.Sprintf("%s/"+chainDirectoryFormatString, jlf.directory, chainID) - - logger.Debugf("Initializing chain at '%s'", directory) - - if err := os.MkdirAll(directory, 0700); err != nil { - return nil, err - } - - ch := newChain(directory) - jlf.ledgers[key] = ch - return ch, nil -} - -// Close does nothing for json ledger -func (jlf *jsonLedgerFactory) Close() { - return // nothing to do -} - -// newChain creates a new chain backed by a json ledger -func newChain(directory string) ordererledger.ReadWriter { - jl := &jsonLedger{ - directory: directory, - fqFormatString: directory + "/" + blockFileFormatString, - signal: make(chan struct{}), - marshaler: &jsonpb.Marshaler{Indent: " "}, - } - jl.initializeBlockHeight() - logger.Debugf("Initialized to block height %d with hash %x", jl.height-1, jl.lastHash) - return jl -} - -// initializeBlockHeight verifies all blocks exist between 0 and the block height, and populates the lastHash -func (jl *jsonLedger) initializeBlockHeight() { - infos, err := ioutil.ReadDir(jl.directory) - if err != nil { - panic(err) - } - nextNumber := uint64(0) - for _, info := range infos { - if info.IsDir() { - continue - } - var number uint64 - _, err := fmt.Sscanf(info.Name(), blockFileFormatString, &number) - if err != nil { - continue - } - if number != nextNumber { - panic(fmt.Errorf("Missing block %d in the chain", nextNumber)) - } - nextNumber++ - } - jl.height = nextNumber - if jl.height == 0 { - return - } - block, found := jl.readBlock(jl.height - 1) - if !found { - panic(fmt.Errorf("Block %d was in directory listing but error reading", jl.height-1)) - } - if block == nil { - panic(fmt.Errorf("Error reading block %d", jl.height-1)) - } - jl.lastHash = block.Header.Hash() -} - -// blockFilename returns the fully qualified path to where a block of a given number should be stored on disk -func (jl *jsonLedger) blockFilename(number uint64) string { - return fmt.Sprintf(jl.fqFormatString, number) -} - -// writeBlock commits a block to disk -func (jl *jsonLedger) writeBlock(block *cb.Block) { - file, err := os.Create(jl.blockFilename(block.Header.Number)) - if err != nil { - panic(err) - } - defer file.Close() - err = jl.marshaler.Marshal(file, block) - logger.Debugf("Wrote block %d", block.Header.Number) - if err != nil { - panic(err) - } - -} - -// readBlock returns the block or nil, and whether the block was found or not, (nil,true) generally indicates an irrecoverable problem -func (jl *jsonLedger) readBlock(number uint64) (*cb.Block, bool) { - file, err := os.Open(jl.blockFilename(number)) - if err == nil { - defer file.Close() - block := &cb.Block{} - err = jsonpb.Unmarshal(file, block) - if err != nil { - return nil, true - } - logger.Debugf("Read block %d", block.Header.Number) - return block, true - } - return nil, false -} - -// Height returns the highest block number in the chain, plus one -func (jl *jsonLedger) Height() uint64 { - return jl.height -} - -// Append appends a new block to the ledger -func (jl *jsonLedger) Append(block *cb.Block) error { - if block.Header.Number != jl.height { - return fmt.Errorf("Block number should have been %d but was %d", jl.height, block.Header.Number) - } - - if !bytes.Equal(block.Header.PreviousHash, jl.lastHash) { - return fmt.Errorf("Block should have had previous hash of %x but was %x", jl.lastHash, block.Header.PreviousHash) - } - - jl.writeBlock(block) - jl.lastHash = block.Header.Hash() - jl.height++ - close(jl.signal) - jl.signal = make(chan struct{}) - return nil -} - -// Iterator implements the ordererledger.Reader definition -func (jl *jsonLedger) Iterator(startPosition *ab.SeekPosition) (ordererledger.Iterator, uint64) { - switch start := startPosition.Type.(type) { - case *ab.SeekPosition_Oldest: - return &cursor{jl: jl, blockNumber: 0}, 0 - case *ab.SeekPosition_Newest: - high := jl.height - 1 - return &cursor{jl: jl, blockNumber: high}, high - case *ab.SeekPosition_Specified: - if start.Specified.Number > jl.height { - return &ordererledger.NotFoundErrorIterator{}, 0 - } - return &cursor{jl: jl, blockNumber: start.Specified.Number}, start.Specified.Number - } - - // This line should be unreachable, but the compiler requires it - return &ordererledger.NotFoundErrorIterator{}, 0 -} - -// Next blocks until there is a new block available, or returns an error if the next block is no longer retrievable -func (cu *cursor) Next() (*cb.Block, cb.Status) { - // This only loops once, as signal reading indicates the new block has been written - for { - block, found := cu.jl.readBlock(cu.blockNumber) - if found { - if block == nil { - return nil, cb.Status_SERVICE_UNAVAILABLE - } - cu.blockNumber++ - return block, cb.Status_SUCCESS - } - <-cu.jl.signal - } -} - -// ReadyChan returns a channel that will close when Next is ready to be called without blocking -func (cu *cursor) ReadyChan() <-chan struct{} { - signal := cu.jl.signal - if _, err := os.Stat(cu.jl.blockFilename(cu.blockNumber)); os.IsNotExist(err) { - return signal - } - return closedChan -} diff --git a/orderer/ledger/jsonledger_test.go b/orderer/ledger/json_test.go similarity index 98% rename from orderer/ledger/jsonledger_test.go rename to orderer/ledger/json_test.go index a2b593f1dc2..92f55c6ab50 100644 --- a/orderer/ledger/jsonledger_test.go +++ b/orderer/ledger/json_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package ordererledger_test +package ledger_test import ( "io/ioutil" diff --git a/orderer/ledger/ordererledger.go b/orderer/ledger/ledger.go similarity index 73% rename from orderer/ledger/ordererledger.go rename to orderer/ledger/ledger.go index 36944b18693..9d2abe55206 100644 --- a/orderer/ledger/ordererledger.go +++ b/orderer/ledger/ledger.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package ordererledger +package ledger import ( cb "github.com/hyperledger/fabric/protos/common" @@ -23,7 +23,8 @@ import ( // Factory retrieves or creates new ledgers by chainID type Factory interface { - // GetOrCreate gets an existing ledger (if it exists) or creates it if it does not + // GetOrCreate gets an existing ledger (if it exists) + // or creates it if it does not GetOrCreate(chainID string) (ReadWriter, error) // ChainIDs returns the chain IDs the Factory is aware of @@ -35,27 +36,29 @@ type Factory interface { // Iterator is useful for a chain Reader to stream blocks as they are created type Iterator interface { - // Next blocks until there is a new block available, or returns an error if the next block is no longer retrievable + // Next blocks until there is a new block available, or returns an error if + // the next block is no longer retrievable Next() (*cb.Block, cb.Status) // ReadyChan supplies a channel which will block until Next will not block ReadyChan() <-chan struct{} } -// Reader allows the caller to inspect the orderer ledger +// Reader allows the caller to inspect the ledger type Reader interface { - // Iterator retrieves an Iterator, as specified by an cb.SeekInfo message, returning an iterator, and its starting block number + // Iterator returns an Iterator, as specified by a cb.SeekInfo message, and + // its starting block number Iterator(startType *ab.SeekPosition) (Iterator, uint64) - // Height returns the highest block number in the chain, plus one + // Height returns the number of blocks on the ledger Height() uint64 } -// Writer allows the caller to modify the orderer ledger +// Writer allows the caller to modify the ledger type Writer interface { // Append a new block to the ledger Append(block *cb.Block) error } -// ReadWriter encapsulated both the reading and writing functions of the ordererledger +// ReadWriter encapsulates the read/write functions of the ledger type ReadWriter interface { Reader Writer diff --git a/orderer/ledger/ram/factory.go b/orderer/ledger/ram/factory.go new file mode 100644 index 00000000000..954119c61df --- /dev/null +++ b/orderer/ledger/ram/factory.go @@ -0,0 +1,97 @@ +/* +Copyright IBM Corp. 2017 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ramledger + +import ( + "sync" + + "github.com/hyperledger/fabric/orderer/ledger" + cb "github.com/hyperledger/fabric/protos/common" +) + +type ramLedgerFactory struct { + maxSize int + ledgers map[string]ledger.ReadWriter + mutex sync.Mutex +} + +// GetOrCreate gets an existing ledger (if it exists) or creates it if it does not +func (rlf *ramLedgerFactory) GetOrCreate(chainID string) (ledger.ReadWriter, error) { + rlf.mutex.Lock() + defer rlf.mutex.Unlock() + + key := chainID + + l, ok := rlf.ledgers[key] + if ok { + return l, nil + } + + ch := newChain(rlf.maxSize) + rlf.ledgers[key] = ch + return ch, nil +} + +// newChain creates a new chain backed by a RAM ledger +func newChain(maxSize int) ledger.ReadWriter { + preGenesis := &cb.Block{ + Header: &cb.BlockHeader{ + Number: ^uint64(0), + }, + } + + rl := &ramLedger{ + maxSize: maxSize, + size: 1, + oldest: &simpleList{ + signal: make(chan struct{}), + block: preGenesis, + }, + } + rl.newest = rl.oldest + return rl +} + +// ChainIDs returns the chain IDs the factory is aware of +func (rlf *ramLedgerFactory) ChainIDs() []string { + rlf.mutex.Lock() + defer rlf.mutex.Unlock() + ids := make([]string, len(rlf.ledgers)) + + i := 0 + for key := range rlf.ledgers { + ids[i] = key + i++ + } + + return ids +} + +// Close is a no-op for the RAM ledger +func (rlf *ramLedgerFactory) Close() { + return // nothing to do +} + +// New creates a new ledger factory +func New(maxSize int) ledger.Factory { + rlf := &ramLedgerFactory{ + maxSize: maxSize, + ledgers: make(map[string]ledger.ReadWriter), + } + + return rlf +} diff --git a/orderer/ledger/ram/ramledger.go b/orderer/ledger/ram/impl.go similarity index 63% rename from orderer/ledger/ram/ramledger.go rename to orderer/ledger/ram/impl.go index 4a9af830084..3141de17c5b 100644 --- a/orderer/ledger/ram/ramledger.go +++ b/orderer/ledger/ram/impl.go @@ -19,15 +19,14 @@ package ramledger import ( "bytes" "fmt" - "sync" - ordererledger "github.com/hyperledger/fabric/orderer/ledger" + "github.com/hyperledger/fabric/orderer/ledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "github.com/op/go-logging" ) -var logger = logging.MustGetLogger("ordererledger/ramledger") +var logger = logging.MustGetLogger("orderer/ramledger") type cursor struct { list *simpleList @@ -46,85 +45,27 @@ type ramLedger struct { newest *simpleList } -type ramLedgerFactory struct { - maxSize int - ledgers map[string]ordererledger.ReadWriter - mutex sync.Mutex -} - -// New creates a new ramledger factory and system ordering chain based on the given systemGenesis block, -// because there is no persistence, the new ReadWriter will have only the genesis block contained -func New(maxSize int) ordererledger.Factory { - rlf := &ramLedgerFactory{ - maxSize: maxSize, - ledgers: make(map[string]ordererledger.ReadWriter), - } - - return rlf -} - -func (rlf *ramLedgerFactory) GetOrCreate(chainID string) (ordererledger.ReadWriter, error) { - rlf.mutex.Lock() - defer rlf.mutex.Unlock() - - key := chainID - - l, ok := rlf.ledgers[key] - if ok { - return l, nil - } - - ch := newChain(rlf.maxSize) - rlf.ledgers[key] = ch - return ch, nil -} - -func (rlf *ramLedgerFactory) ChainIDs() []string { - rlf.mutex.Lock() - defer rlf.mutex.Unlock() - ids := make([]string, len(rlf.ledgers)) - - i := 0 - for key := range rlf.ledgers { - ids[i] = key - i++ - } - - return ids -} - -// Close does nothing for ram ledger -func (rlf *ramLedgerFactory) Close() { - return // nothing to do -} - -// newChain creates a new instance of the ram ledger for a chain -func newChain(maxSize int) ordererledger.ReadWriter { - preGenesis := &cb.Block{ - Header: &cb.BlockHeader{ - Number: ^uint64(0), - }, - } - - rl := &ramLedger{ - maxSize: maxSize, - size: 1, - oldest: &simpleList{ - signal: make(chan struct{}), - block: preGenesis, - }, +// Next blocks until there is a new block available, or returns an error if the +// next block is no longer retrievable +func (cu *cursor) Next() (*cb.Block, cb.Status) { + // This only loops once, as signal reading indicates non-nil next + for { + if cu.list.next != nil { + cu.list = cu.list.next + return cu.list.block, cb.Status_SUCCESS + } + <-cu.list.signal } - rl.newest = rl.oldest - return rl } -// Height returns the highest block number in the chain, plus one -func (rl *ramLedger) Height() uint64 { - return rl.newest.block.Header.Number + 1 +// ReadyChan supplies a channel which will block until Next will not block +func (cu *cursor) ReadyChan() <-chan struct{} { + return cu.list.signal } -// Iterator implements the ordererledger.Reader definition -func (rl *ramLedger) Iterator(startPosition *ab.SeekPosition) (ordererledger.Iterator, uint64) { +// Iterator returns an Iterator, as specified by a cb.SeekInfo message, and its +// starting block number +func (rl *ramLedger) Iterator(startPosition *ab.SeekPosition) (ledger.Iterator, uint64) { var list *simpleList switch start := startPosition.Type.(type) { case *ab.SeekPosition_Oldest: @@ -150,8 +91,9 @@ func (rl *ramLedger) Iterator(startPosition *ab.SeekPosition) (ordererledger.Ite // Note the two +1's here is to accomodate the 'preGenesis' block of ^uint64(0) if specified+1 < oldest.block.Header.Number+1 || specified > rl.newest.block.Header.Number+1 { - logger.Debugf("Returning error iterator because specified seek was %d with oldest %d and newest %d", specified, rl.oldest.block.Header.Number, rl.newest.block.Header.Number) - return &ordererledger.NotFoundErrorIterator{}, 0 + logger.Debugf("Returning error iterator because specified seek was %d with oldest %d and newest %d", + specified, rl.oldest.block.Header.Number, rl.newest.block.Header.Number) + return &ledger.NotFoundErrorIterator{}, 0 } if specified == oldest.block.Header.Number { @@ -184,33 +126,22 @@ func (rl *ramLedger) Iterator(startPosition *ab.SeekPosition) (ordererledger.Ite return cursor, blockNum } -// Next blocks until there is a new block available, or returns an error if the next block is no longer retrievable -func (cu *cursor) Next() (*cb.Block, cb.Status) { - // This only loops once, as signal reading indicates non-nil next - for { - if cu.list.next != nil { - cu.list = cu.list.next - return cu.list.block, cb.Status_SUCCESS - } - - <-cu.list.signal - } -} - -// ReadyChan returns a channel that will close when Next is ready to be called without blocking -func (cu *cursor) ReadyChan() <-chan struct{} { - return cu.list.signal +// Height returns the number of blocks on the ledger +func (rl *ramLedger) Height() uint64 { + return rl.newest.block.Header.Number + 1 } // Append appends a new block to the ledger func (rl *ramLedger) Append(block *cb.Block) error { if block.Header.Number != rl.newest.block.Header.Number+1 { - return fmt.Errorf("Block number should have been %d but was %d", rl.newest.block.Header.Number+1, block.Header.Number) + return fmt.Errorf("Block number should have been %d but was %d", + rl.newest.block.Header.Number+1, block.Header.Number) } if rl.newest.block.Header.Number+1 != 0 { // Skip this check for genesis block insertion if !bytes.Equal(block.Header.PreviousHash, rl.newest.block.Header.Hash()) { - return fmt.Errorf("Block should have had previous hash of %x but was %x", rl.newest.block.Header.Hash(), block.Header.PreviousHash) + return fmt.Errorf("Block should have had previous hash of %x but was %x", + rl.newest.block.Header.Hash(), block.Header.PreviousHash) } } @@ -232,7 +163,8 @@ func (rl *ramLedger) appendBlock(block *cb.Block) { rl.size++ if rl.size > rl.maxSize { - logger.Debugf("RAM ledger max size about to be exceeded, removing oldest item: %d", rl.oldest.block.Header.Number) + logger.Debugf("RAM ledger max size about to be exceeded, removing oldest item: %d", + rl.oldest.block.Header.Number) rl.oldest = rl.oldest.next rl.size-- } diff --git a/orderer/ledger/ram/ramledger_test.go b/orderer/ledger/ram/impl_test.go similarity index 100% rename from orderer/ledger/ram/ramledger_test.go rename to orderer/ledger/ram/impl_test.go diff --git a/orderer/ledger/ramledger_test.go b/orderer/ledger/ram_test.go similarity index 98% rename from orderer/ledger/ramledger_test.go rename to orderer/ledger/ram_test.go index 8fc8f95d5e9..9cde780704e 100644 --- a/orderer/ledger/ramledger_test.go +++ b/orderer/ledger/ram_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package ordererledger_test +package ledger_test import ( "github.com/hyperledger/fabric/common/configtx/tool/provisional" diff --git a/orderer/ledger/util.go b/orderer/ledger/util.go index 423752b0c61..01e7f7cd0ce 100644 --- a/orderer/ledger/util.go +++ b/orderer/ledger/util.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package ordererledger +package ledger import ( "github.com/golang/protobuf/proto" diff --git a/orderer/main.go b/orderer/main.go index fd0bca1191b..2954e223551 100644 --- a/orderer/main.go +++ b/orderer/main.go @@ -31,7 +31,7 @@ import ( "github.com/hyperledger/fabric/core/comm" "github.com/hyperledger/fabric/orderer/common/bootstrap/file" "github.com/hyperledger/fabric/orderer/kafka" - ordererledger "github.com/hyperledger/fabric/orderer/ledger" + "github.com/hyperledger/fabric/orderer/ledger" jsonledger "github.com/hyperledger/fabric/orderer/ledger/json" ramledger "github.com/hyperledger/fabric/orderer/ledger/ram" "github.com/hyperledger/fabric/orderer/localconfig" @@ -90,7 +90,7 @@ func main() { panic(fmt.Errorf("Failed initializing crypto [%s]", err)) } - var lf ordererledger.Factory + var lf ledger.Factory switch conf.General.LedgerType { case "file": // just use the json ledger type for now diff --git a/orderer/multichain/chainsupport.go b/orderer/multichain/chainsupport.go index 673678710c0..5204fb4dd64 100644 --- a/orderer/multichain/chainsupport.go +++ b/orderer/multichain/chainsupport.go @@ -27,7 +27,7 @@ import ( "github.com/hyperledger/fabric/orderer/common/filter" "github.com/hyperledger/fabric/orderer/common/sigfilter" "github.com/hyperledger/fabric/orderer/common/sizefilter" - ordererledger "github.com/hyperledger/fabric/orderer/ledger" + "github.com/hyperledger/fabric/orderer/ledger" cb "github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/utils" ) @@ -81,7 +81,7 @@ type ChainSupport interface { PolicyManager() policies.Manager // Reader returns the chain Reader for the chain - Reader() ordererledger.Reader + Reader() ledger.Reader broadcast.Support ConsenterSupport @@ -123,7 +123,7 @@ func newChainSupport( var err error - lastBlock := ordererledger.GetBlock(cs.Reader(), cs.Reader().Height()-1) + lastBlock := ledger.GetBlock(cs.Reader(), cs.Reader().Height()-1) metadata, err := utils.GetMetadataFromBlock(lastBlock, cb.BlockMetadataIndex_ORDERER) // Assuming a block created with cb.NewBlock(), this should not // error even if the orderer metadata is an empty byte slice @@ -184,7 +184,7 @@ func (cs *chainSupport) BlockCutter() blockcutter.Receiver { return cs.cutter } -func (cs *chainSupport) Reader() ordererledger.Reader { +func (cs *chainSupport) Reader() ledger.Reader { return cs.ledger } @@ -193,7 +193,7 @@ func (cs *chainSupport) Enqueue(env *cb.Envelope) bool { } func (cs *chainSupport) CreateNextBlock(messages []*cb.Envelope) *cb.Block { - return ordererledger.CreateNextBlock(cs.ledger, messages) + return ledger.CreateNextBlock(cs.ledger, messages) } func (cs *chainSupport) addBlockSignature(block *cb.Block) { diff --git a/orderer/multichain/chainsupport_test.go b/orderer/multichain/chainsupport_test.go index 711a3085a14..39080648435 100644 --- a/orderer/multichain/chainsupport_test.go +++ b/orderer/multichain/chainsupport_test.go @@ -23,7 +23,7 @@ import ( "github.com/golang/protobuf/proto" mockconfigtx "github.com/hyperledger/fabric/common/mocks/configtx" "github.com/hyperledger/fabric/orderer/common/filter" - ordererledger "github.com/hyperledger/fabric/orderer/ledger" + "github.com/hyperledger/fabric/orderer/ledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "github.com/hyperledger/fabric/protos/utils" @@ -42,7 +42,7 @@ func (mlw *mockLedgerReadWriter) Append(block *cb.Block) error { return nil } -func (mlw *mockLedgerReadWriter) Iterator(startType *ab.SeekPosition) (ordererledger.Iterator, uint64) { +func (mlw *mockLedgerReadWriter) Iterator(startType *ab.SeekPosition) (ledger.Iterator, uint64) { panic("Unimplemented") } diff --git a/orderer/multichain/manager.go b/orderer/multichain/manager.go index e79f4148c21..b01202b4dc7 100644 --- a/orderer/multichain/manager.go +++ b/orderer/multichain/manager.go @@ -22,7 +22,7 @@ import ( "github.com/hyperledger/fabric/common/configtx" configtxapi "github.com/hyperledger/fabric/common/configtx/api" configvaluesapi "github.com/hyperledger/fabric/common/configvalues" - ordererledger "github.com/hyperledger/fabric/orderer/ledger" + "github.com/hyperledger/fabric/orderer/ledger" cb "github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/utils" "github.com/op/go-logging" @@ -52,24 +52,24 @@ func (cr *configResources) SharedConfig() configvaluesapi.Orderer { type ledgerResources struct { *configResources - ledger ordererledger.ReadWriter + ledger ledger.ReadWriter } type multiLedger struct { chains map[string]*chainSupport consenters map[string]Consenter - ledgerFactory ordererledger.Factory + ledgerFactory ledger.Factory signer crypto.LocalSigner systemChannelID string } -func getConfigTx(reader ordererledger.Reader) *cb.Envelope { - lastBlock := ordererledger.GetBlock(reader, reader.Height()-1) +func getConfigTx(reader ledger.Reader) *cb.Envelope { + lastBlock := ledger.GetBlock(reader, reader.Height()-1) index, err := utils.GetLastConfigIndexFromBlock(lastBlock) if err != nil { logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err) } - configBlock := ordererledger.GetBlock(reader, index) + configBlock := ledger.GetBlock(reader, index) if configBlock == nil { logger.Panicf("Config block does not exist") } @@ -78,7 +78,7 @@ func getConfigTx(reader ordererledger.Reader) *cb.Envelope { } // NewManagerImpl produces an instance of a Manager -func NewManagerImpl(ledgerFactory ordererledger.Factory, consenters map[string]Consenter, signer crypto.LocalSigner) Manager { +func NewManagerImpl(ledgerFactory ledger.Factory, consenters map[string]Consenter, signer crypto.LocalSigner) Manager { ml := &multiLedger{ chains: make(map[string]*chainSupport), ledgerFactory: ledgerFactory, @@ -187,7 +187,7 @@ func (ml *multiLedger) newLedgerResources(configTx *cb.Envelope) *ledgerResource func (ml *multiLedger) newChain(configtx *cb.Envelope) { ledgerResources := ml.newLedgerResources(configtx) - ledgerResources.ledger.Append(ordererledger.CreateNextBlock(ledgerResources.ledger, []*cb.Envelope{configtx})) + ledgerResources.ledger.Append(ledger.CreateNextBlock(ledgerResources.ledger, []*cb.Envelope{configtx})) // Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is newChains := make(map[string]*chainSupport) diff --git a/orderer/multichain/manager_test.go b/orderer/multichain/manager_test.go index a5e9ddab5cd..7cb870eb941 100644 --- a/orderer/multichain/manager_test.go +++ b/orderer/multichain/manager_test.go @@ -26,7 +26,7 @@ import ( genesisconfig "github.com/hyperledger/fabric/common/configtx/tool/localconfig" "github.com/hyperledger/fabric/common/configtx/tool/provisional" mockcrypto "github.com/hyperledger/fabric/common/mocks/crypto" - ordererledger "github.com/hyperledger/fabric/orderer/ledger" + "github.com/hyperledger/fabric/orderer/ledger" ramledger "github.com/hyperledger/fabric/orderer/ledger/ram" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" @@ -71,7 +71,7 @@ func (mch mockCryptoRejectorHelper) VerifySignature(sd *cb.SignedData) error { return errors.New("Nope") } -func NewRAMLedgerAndFactory(maxSize int) (ordererledger.Factory, ordererledger.ReadWriter) { +func NewRAMLedgerAndFactory(maxSize int) (ledger.Factory, ledger.ReadWriter) { rlf := ramledger.New(10) rl, err := rlf.GetOrCreate(provisional.TestChainID) if err != nil { @@ -84,7 +84,7 @@ func NewRAMLedgerAndFactory(maxSize int) (ordererledger.Factory, ordererledger.R return rlf, rl } -func NewRAMLedger(maxSize int) ordererledger.ReadWriter { +func NewRAMLedger(maxSize int) ledger.ReadWriter { _, rl := NewRAMLedgerAndFactory(maxSize) return rl } @@ -93,13 +93,13 @@ func NewRAMLedger(maxSize int) ordererledger.ReadWriter { func TestGetConfigTx(t *testing.T) { rl := NewRAMLedger(10) for i := 0; i < 5; i++ { - rl.Append(ordererledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, i)})) + rl.Append(ledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, i)})) } - rl.Append(ordererledger.CreateNextBlock(rl, []*cb.Envelope{makeConfigTx(provisional.TestChainID, 5)})) + rl.Append(ledger.CreateNextBlock(rl, []*cb.Envelope{makeConfigTx(provisional.TestChainID, 5)})) ctx := makeConfigTx(provisional.TestChainID, 6) - rl.Append(ordererledger.CreateNextBlock(rl, []*cb.Envelope{ctx})) + rl.Append(ledger.CreateNextBlock(rl, []*cb.Envelope{ctx})) - block := ordererledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, 7)}) + block := ledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, 7)}) block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIG] = utils.MarshalOrPanic(&cb.Metadata{Value: utils.MarshalOrPanic(&cb.LastConfig{Index: 7})}) rl.Append(block) @@ -114,12 +114,12 @@ func TestGetConfigTx(t *testing.T) { func TestGetConfigTxFailure(t *testing.T) { rl := NewRAMLedger(10) for i := 0; i < 10; i++ { - rl.Append(ordererledger.CreateNextBlock(rl, []*cb.Envelope{ + rl.Append(ledger.CreateNextBlock(rl, []*cb.Envelope{ makeNormalTx(provisional.TestChainID, i), makeConfigTx(provisional.TestChainID, i), })) } - rl.Append(ordererledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, 11)})) + rl.Append(ledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, 11)})) defer func() { if recover() == nil { t.Fatalf("Should have panic-ed because there was no config tx") diff --git a/orderer/sbft_test.go b/orderer/sbft_test.go index de3fa008047..c37a2b63d8a 100644 --- a/orderer/sbft_test.go +++ b/orderer/sbft_test.go @@ -278,7 +278,7 @@ func broadcastSender(t *testing.T, resultch chan item, errorch chan error, clien resultch <- item{itemtype: sent, payload: mpl} } -func newRAMLedgerFactory() ordererledger.Factory { +func newRAMLedgerFactory() ledger.Factory { rlf := ramledger.New(10) rl, err := rlf.GetOrCreate(provisional.TestChainID) if err != nil {