diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index abcac6e1a4..67673dceeb 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -400,19 +400,19 @@ func (b *Blockchain) writeGenesis(genesis *chain.Genesis) error { // writeGenesisImpl writes the genesis file to the DB + blockchain reference func (b *Blockchain) writeGenesisImpl(header *types.Header) error { - // Update the reference - b.genesis = header.Hash + batchWriter := storage.NewBatchWriter(b.db) - // Update the DB - if err := b.db.WriteHeader(header); err != nil { - return err - } + newTD := new(big.Int).SetUint64(header.Difficulty) + + batchWriter.PutCanonicalHeader(header, newTD) - // Advance the head - if _, err := b.advanceHead(header); err != nil { + if err := b.writeBatchAndUpdate(batchWriter, header, newTD, true); err != nil { return err } + // Update the reference + b.genesis = header.Hash + // Create an event and send it to the stream event := &Event{} event.AddNewHeader(header) @@ -440,68 +440,6 @@ func (b *Blockchain) GetTD(hash types.Hash) (*big.Int, bool) { return b.readTotalDifficulty(hash) } -// writeCanonicalHeader writes the new header -func (b *Blockchain) writeCanonicalHeader(event *Event, h *types.Header) error { - parentTD, ok := b.readTotalDifficulty(h.ParentHash) - if !ok { - return fmt.Errorf("parent difficulty not found") - } - - newTD := big.NewInt(0).Add(parentTD, new(big.Int).SetUint64(h.Difficulty)) - if err := b.db.WriteCanonicalHeader(h, newTD); err != nil { - return err - } - - event.Type = EventHead - event.AddNewHeader(h) - event.SetDifficulty(newTD) - - b.setCurrentHeader(h, newTD) - - return nil -} - -// advanceHead Sets the passed in header as the new head of the chain -func (b *Blockchain) advanceHead(newHeader *types.Header) (*big.Int, error) { - // Write the current head hash into storage - if err := b.db.WriteHeadHash(newHeader.Hash); err != nil { - return nil, err - } - - // Write the current head number into storage - if err := b.db.WriteHeadNumber(newHeader.Number); err != nil { - return nil, err - } - - // Matches the current head number with the current hash - if err := b.db.WriteCanonicalHash(newHeader.Number, newHeader.Hash); err != nil { - return nil, err - } - - // Check if there was a parent difficulty - parentTD := big.NewInt(0) - - if newHeader.ParentHash != types.StringToHash("") { - td, ok := b.readTotalDifficulty(newHeader.ParentHash) - if !ok { - return nil, fmt.Errorf("parent difficulty not found") - } - - parentTD = td - } - - // Calculate the new total difficulty - newTD := big.NewInt(0).Add(parentTD, big.NewInt(0).SetUint64(newHeader.Difficulty)) - if err := b.db.WriteTotalDifficulty(newHeader.Hash, newTD); err != nil { - return nil, err - } - - // Update the blockchain reference - b.setCurrentHeader(newHeader, newTD) - - return newTD, nil -} - // GetReceiptsByHash returns the receipts by their hash func (b *Blockchain) GetReceiptsByHash(hash types.Hash) ([]*types.Receipt, error) { return b.db.ReadReceipts(hash) @@ -555,7 +493,11 @@ func (b *Blockchain) readBody(hash types.Hash) (*types.Body, bool) { // To return from field in the transactions of the past blocks if updated := b.recoverFromFieldsInTransactions(bb.Transactions); updated { - if err := b.db.WriteBody(hash, bb); err != nil { + batchWriter := storage.NewBatchWriter(b.db) + + batchWriter.PutBody(hash, bb) + + if err := batchWriter.WriteBatch(); err != nil { b.logger.Warn("failed to write body into storage", "hash", hash, "err", err) } } @@ -604,11 +546,6 @@ func (b *Blockchain) GetHeaderByNumber(n uint64) (*types.Header, bool) { return h, true } -// WriteHeaders writes an array of headers -func (b *Blockchain) WriteHeaders(headers []*types.Header) error { - return b.WriteHeadersWithBodies(headers) -} - // WriteHeadersWithBodies writes a batch of headers func (b *Blockchain) WriteHeadersWithBodies(headers []*types.Header) error { // Check the size @@ -634,10 +571,18 @@ func (b *Blockchain) WriteHeadersWithBodies(headers []*types.Header) error { } } - // Write the actual headers - for _, h := range headers { + // Write the actual headers in separate batches for now + for _, header := range headers { event := &Event{} - if err := b.writeHeaderImpl(event, h); err != nil { + + batchWriter := storage.NewBatchWriter(b.db) + + isCanonical, newTD, err := b.writeHeaderImpl(batchWriter, event, header) + if err != nil { + return err + } + + if err := b.writeBatchAndUpdate(batchWriter, header, newTD, isCanonical); err != nil { return err } @@ -855,11 +800,11 @@ func (b *Blockchain) executeBlockTransactions(block *types.Block) (*BlockResult, // This function is a copy of WriteBlock but with a full block which does not // require to compute again the Receipts. func (b *Blockchain) WriteFullBlock(fblock *types.FullBlock, source string) error { - block := fblock.Block - b.writeLock.Lock() defer b.writeLock.Unlock() + block := fblock.Block + if block.Number() <= b.Header().Number { b.logger.Info("block already inserted", "block", block.Number(), "source", source) @@ -868,33 +813,39 @@ func (b *Blockchain) WriteFullBlock(fblock *types.FullBlock, source string) erro header := block.Header - if err := b.writeBody(block); err != nil { + batchWriter := storage.NewBatchWriter(b.db) + + if err := b.writeBody(batchWriter, block); err != nil { return err } // Write the header to the chain evnt := &Event{Source: source} - if err := b.writeHeaderImpl(evnt, header); err != nil { + + isCanonical, newTD, err := b.writeHeaderImpl(batchWriter, evnt, header) + if err != nil { return err } // write the receipts, do it only after the header has been written. // Otherwise, a client might ask for a header once the receipt is valid, // but before it is written into the storage - if err := b.db.WriteReceipts(block.Hash(), fblock.Receipts); err != nil { - return err - } + batchWriter.PutReceipts(block.Hash(), fblock.Receipts) // update snapshot if err := b.consensus.ProcessHeaders([]*types.Header{header}); err != nil { return err } - b.dispatchEvent(evnt) - // Update the average gas price b.updateGasPriceAvgWithBlock(block) + if err := b.writeBatchAndUpdate(batchWriter, header, newTD, isCanonical); err != nil { + return err + } + + b.dispatchEvent(evnt) + logArgs := []interface{}{ "number", header.Number, "txs", len(block.Transactions), @@ -927,13 +878,17 @@ func (b *Blockchain) WriteBlock(block *types.Block, source string) error { header := block.Header - if err := b.writeBody(block); err != nil { + batchWriter := storage.NewBatchWriter(b.db) + + if err := b.writeBody(batchWriter, block); err != nil { return err } // Write the header to the chain evnt := &Event{Source: source} - if err := b.writeHeaderImpl(evnt, header); err != nil { + + isCanonical, newTD, err := b.writeHeaderImpl(batchWriter, evnt, header) + if err != nil { return err } @@ -946,20 +901,22 @@ func (b *Blockchain) WriteBlock(block *types.Block, source string) error { // write the receipts, do it only after the header has been written. // Otherwise, a client might ask for a header once the receipt is valid, // but before it is written into the storage - if err := b.db.WriteReceipts(block.Hash(), blockReceipts); err != nil { - return err - } + batchWriter.PutReceipts(block.Hash(), blockReceipts) // update snapshot if err := b.consensus.ProcessHeaders([]*types.Header{header}); err != nil { return err } - b.dispatchEvent(evnt) - // Update the average gas price b.updateGasPriceAvgWithBlock(block) + if err := b.writeBatchAndUpdate(batchWriter, header, newTD, isCanonical); err != nil { + return err + } + + b.dispatchEvent(evnt) + logArgs := []interface{}{ "number", header.Number, "txs", len(block.Transactions), @@ -1035,7 +992,7 @@ func (b *Blockchain) updateGasPriceAvgWithBlock(block *types.Block) { // writeBody writes the block body to the DB. // Additionally, it also updates the txn lookup, for txnHash -> block lookups -func (b *Blockchain) writeBody(block *types.Block) error { +func (b *Blockchain) writeBody(batchWriter *storage.BatchWriter, block *types.Block) error { // Recover 'from' field in tx before saving // Because the block passed from the consensus layer doesn't have from field in tx, // due to missing encoding in RLP @@ -1044,15 +1001,11 @@ func (b *Blockchain) writeBody(block *types.Block) error { } // Write the full body (txns + receipts) - if err := b.db.WriteBody(block.Header.Hash, block.Body()); err != nil { - return err - } + batchWriter.PutBody(block.Header.Hash, block.Body()) // Write txn lookups (txHash -> block) for _, txn := range block.Transactions { - if err := b.db.WriteTxLookup(txn.Hash, block.Hash()); err != nil { - return err - } + batchWriter.PutTxLookup(txn.Hash, block.Hash()) } return nil @@ -1186,75 +1139,74 @@ func (b *Blockchain) dispatchEvent(evnt *Event) { } // writeHeaderImpl writes a block and the data, assumes the genesis is already set -func (b *Blockchain) writeHeaderImpl(evnt *Event, header *types.Header) error { - currentHeader := b.Header() - - // Write the data - if header.ParentHash == currentHeader.Hash { - // Fast path to save the new canonical header - return b.writeCanonicalHeader(evnt, header) - } - - if err := b.db.WriteHeader(header); err != nil { - return err - } - - currentTD, ok := b.readTotalDifficulty(currentHeader.Hash) - if !ok { - return errors.New("failed to get header difficulty") - } - +// Returnning parameters (is canonical header, new total difficulty, error) +func (b *Blockchain) writeHeaderImpl( + batchWriter *storage.BatchWriter, evnt *Event, header *types.Header) (bool, *big.Int, error) { // parent total difficulty of incoming header parentTD, ok := b.readTotalDifficulty(header.ParentHash) if !ok { - return fmt.Errorf( + return false, nil, fmt.Errorf( "parent of %s (%d) not found", header.Hash.String(), header.Number, ) } - // Write the difficulty - if err := b.db.WriteTotalDifficulty( - header.Hash, - big.NewInt(0).Add( - parentTD, - big.NewInt(0).SetUint64(header.Difficulty), - ), - ); err != nil { - return err + currentHeader := b.Header() + incomingTD := new(big.Int).Add(parentTD, new(big.Int).SetUint64(header.Difficulty)) + + // if parent of new header is current header just put everything in batch and update event + // new header will be canonical one + if header.ParentHash == currentHeader.Hash { + batchWriter.PutCanonicalHeader(header, incomingTD) + + evnt.Type = EventHead + evnt.AddNewHeader(header) + evnt.SetDifficulty(incomingTD) + + return true, incomingTD, nil } - // Update the headers cache - b.headersCache.Add(header.Hash, header) + currentTD, ok := b.readTotalDifficulty(currentHeader.Hash) + if !ok { + return false, nil, errors.New("failed to get header difficulty") + } - incomingTD := big.NewInt(0).Add(parentTD, big.NewInt(0).SetUint64(header.Difficulty)) if incomingTD.Cmp(currentTD) > 0 { // new block has higher difficulty, reorg the chain - if err := b.handleReorg(evnt, currentHeader, header); err != nil { - return err + if err := b.handleReorg(batchWriter, evnt, currentHeader, header, incomingTD); err != nil { + return false, nil, err } - } else { - // new block has lower difficulty, create a new fork - evnt.AddOldHeader(header) - evnt.Type = EventFork - if err := b.writeFork(header); err != nil { - return err - } + batchWriter.PutCanonicalHeader(header, incomingTD) + + return true, incomingTD, nil } - return nil + forks, err := b.getForksToWrite(header) + if err != nil { + return false, nil, err + } + + batchWriter.PutHeader(header) + batchWriter.PutTotalDifficulty(header.Hash, incomingTD) + batchWriter.PutForks(forks) + + // new block has lower difficulty, create a new fork + evnt.AddOldHeader(header) + evnt.Type = EventFork + + return false, nil, nil } -// writeFork writes the new header forks to the DB -func (b *Blockchain) writeFork(header *types.Header) error { +// getForksToWrite retrieves new header forks that should be written to the DB +func (b *Blockchain) getForksToWrite(header *types.Header) ([]types.Hash, error) { forks, err := b.db.ReadForks() if err != nil { if errors.Is(err, storage.ErrNotFound) { forks = []types.Hash{} } else { - return err + return nil, err } } @@ -1266,19 +1218,16 @@ func (b *Blockchain) writeFork(header *types.Header) error { } } - newForks = append(newForks, header.Hash) - if err := b.db.WriteForks(newForks); err != nil { - return err - } - - return nil + return append(newForks, header.Hash), nil } // handleReorg handles a reorganization event func (b *Blockchain) handleReorg( + batchWriter *storage.BatchWriter, evnt *Event, oldHeader *types.Header, newHeader *types.Header, + newTD *big.Int, ) error { newChainHead := newHeader oldChainHead := oldHeader @@ -1322,6 +1271,18 @@ func (b *Blockchain) handleReorg( oldChain = append(oldChain, oldHeader) } + forks, err := b.getForksToWrite(oldChainHead) + if err != nil { + return fmt.Errorf("failed to write the old header as fork: %w", err) + } + + batchWriter.PutForks(forks) + + // Update canonical chain numbers + for _, h := range newChain { + batchWriter.PutCanonicalHash(h.Number, h.Hash) + } + for _, b := range oldChain[:len(oldChain)-1] { evnt.AddOldHeader(b) } @@ -1333,25 +1294,9 @@ func (b *Blockchain) handleReorg( evnt.AddNewHeader(b) } - if err := b.writeFork(oldChainHead); err != nil { - return fmt.Errorf("failed to write the old header as fork: %w", err) - } - - // Update canonical chain numbers - for _, h := range newChain { - if err := b.db.WriteCanonicalHash(h.Number, h.Hash); err != nil { - return err - } - } - - diff, err := b.advanceHead(newChainHead) - if err != nil { - return err - } - // Set the event type and difficulty evnt.Type = EventReorg - evnt.SetDifficulty(diff) + evnt.SetDifficulty(newTD) return nil } @@ -1442,3 +1387,20 @@ func calcBaseFeeDelta(gasUsedDelta, parentGasTarget, baseFee uint64) uint64 { return y / defaultBaseFeeChangeDenom } + +func (b *Blockchain) writeBatchAndUpdate( + batchWriter *storage.BatchWriter, + header *types.Header, + newTD *big.Int, + isCanonnical bool) error { + if err := batchWriter.WriteBatch(); err != nil { + return err + } + + if isCanonnical { + b.headersCache.Add(header.Hash, header) + b.setCurrentHeader(header, newTD) // Update the blockchain reference + } + + return nil +} diff --git a/blockchain/blockchain_test.go b/blockchain/blockchain_test.go index d1f882d533..9d9c16a282 100644 --- a/blockchain/blockchain_test.go +++ b/blockchain/blockchain_test.go @@ -7,11 +7,15 @@ import ( "reflect" "testing" + "github.com/0xPolygon/polygon-edge/helper/common" + "github.com/0xPolygon/polygon-edge/helper/hex" "github.com/0xPolygon/polygon-edge/state" "github.com/hashicorp/go-hclog" + lru "github.com/hashicorp/golang-lru" "github.com/0xPolygon/polygon-edge/chain" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/0xPolygon/polygon-edge/blockchain/storage" "github.com/0xPolygon/polygon-edge/blockchain/storage/memory" @@ -25,8 +29,7 @@ func TestGenesis(t *testing.T) { genesis := &types.Header{Difficulty: 1, Number: 0} genesis.ComputeHash() - _, err := b.advanceHead(genesis) - assert.NoError(t, err) + assert.NoError(t, b.writeGenesisImpl(genesis)) header := b.Header() assert.Equal(t, header.Hash, genesis.Hash) @@ -464,7 +467,8 @@ func TestInsertHeaders(t *testing.T) { // run the history for i := 1; i < len(cc.History); i++ { - if err := b.WriteHeaders([]*types.Header{chain.headers[cc.History[i].header.hash]}); err != nil { + headers := []*types.Header{chain.headers[cc.History[i].header.hash]} + if err := b.WriteHeadersWithBodies(headers); err != nil { t.Fatal(err) } @@ -533,11 +537,15 @@ func TestForkUnknownParents(t *testing.T) { h1 := AppendNewTestHeaders(h0[:5], 10) // Write genesis - _, err := b.advanceHead(h0[0]) - assert.NoError(t, err) + batchWriter := storage.NewBatchWriter(b.db) + td := new(big.Int).SetUint64(h0[0].Difficulty) + + batchWriter.PutCanonicalHeader(h0[0], td) + + assert.NoError(t, b.writeBatchAndUpdate(batchWriter, h0[0], td, true)) // Write 10 headers - assert.NoError(t, b.WriteHeaders(h0[1:])) + assert.NoError(t, b.WriteHeadersWithBodies(h0[1:])) // Cannot write this header because the father h1[11] is not known assert.Error(t, b.WriteHeadersWithBodies([]*types.Header{h1[12]})) @@ -553,14 +561,15 @@ func TestBlockchainWriteBody(t *testing.T) { newChain := func( t *testing.T, txFromByTxHash map[types.Hash]types.Address, + path string, ) *Blockchain { t.Helper() - storage, err := memory.NewMemoryStorage(nil) + dbStorage, err := memory.NewMemoryStorage(nil) assert.NoError(t, err) chain := &Blockchain{ - db: storage, + db: dbStorage, txSigner: &mockSigner{ txFromByTxHash: txFromByTxHash, }, @@ -590,12 +599,15 @@ func TestBlockchainWriteBody(t *testing.T) { txFromByTxHash := map[types.Hash]types.Address{} - chain := newChain(t, txFromByTxHash) + chain := newChain(t, txFromByTxHash, "t1") + defer chain.db.Close() + batchWriter := storage.NewBatchWriter(chain.db) assert.NoError( t, - chain.writeBody(block), + chain.writeBody(batchWriter, block), ) + assert.NoError(t, batchWriter.WriteBatch()) }) t.Run("should return error if tx doesn't have from and recovering address fails", func(t *testing.T) { @@ -618,13 +630,16 @@ func TestBlockchainWriteBody(t *testing.T) { txFromByTxHash := map[types.Hash]types.Address{} - chain := newChain(t, txFromByTxHash) + chain := newChain(t, txFromByTxHash, "t2") + defer chain.db.Close() + batchWriter := storage.NewBatchWriter(chain.db) assert.ErrorIs( t, errRecoveryAddressFailed, - chain.writeBody(block), + chain.writeBody(batchWriter, block), ) + assert.NoError(t, batchWriter.WriteBatch()) }) t.Run("should recover from address and store to storage", func(t *testing.T) { @@ -649,9 +664,12 @@ func TestBlockchainWriteBody(t *testing.T) { tx.Hash: addr, } - chain := newChain(t, txFromByTxHash) + chain := newChain(t, txFromByTxHash, "t3") + defer chain.db.Close() + batchWriter := storage.NewBatchWriter(chain.db) - assert.NoError(t, chain.writeBody(block)) + assert.NoError(t, chain.writeBody(batchWriter, block)) + assert.NoError(t, batchWriter.WriteBatch()) readBody, ok := chain.readBody(block.Hash()) assert.True(t, ok) @@ -854,7 +872,7 @@ func Test_recoverFromFieldsInTransactions(t *testing.T) { } func TestBlockchainReadBody(t *testing.T) { - storage, err := memory.NewMemoryStorage(nil) + dbStorage, err := memory.NewMemoryStorage(nil) assert.NoError(t, err) txFromByTxHash := make(map[types.Hash]types.Address) @@ -862,12 +880,14 @@ func TestBlockchainReadBody(t *testing.T) { b := &Blockchain{ logger: hclog.NewNullLogger(), - db: storage, + db: dbStorage, txSigner: &mockSigner{ txFromByTxHash: txFromByTxHash, }, } + batchWriter := storage.NewBatchWriter(b.db) + tx := &types.Transaction{ Value: big.NewInt(10), V: big.NewInt(1), @@ -886,10 +906,12 @@ func TestBlockchainReadBody(t *testing.T) { txFromByTxHash[tx.Hash] = types.ZeroAddress - if err := b.writeBody(block); err != nil { + if err := b.writeBody(batchWriter, block); err != nil { t.Fatal(err) } + assert.NoError(t, batchWriter.WriteBatch()) + txFromByTxHash[tx.Hash] = addr readBody, found := b.readBody(block.Hash()) @@ -1390,3 +1412,107 @@ func TestBlockchain_CalculateBaseFee(t *testing.T) { }) } } + +func TestBlockchain_WriteFullBlock(t *testing.T) { + t.Parallel() + + getKey := func(p []byte, k []byte) []byte { + return append(append(make([]byte, 0, len(p)+len(k)), p...), k...) + } + db := map[string][]byte{} + consensusMock := &MockVerifier{ + processHeadersFn: func(hs []*types.Header) error { + assert.Len(t, hs, 1) + + return nil + }, + } + + storageMock := storage.NewMockStorage() + storageMock.HookNewBatch(func() storage.Batch { + return memory.NewBatchMemory(db) + }) + + bc := &Blockchain{ + gpAverage: &gasPriceAverage{ + count: new(big.Int), + }, + logger: hclog.NewNullLogger(), + db: storageMock, + consensus: consensusMock, + config: &chain.Chain{ + Params: &chain.Params{ + Forks: &chain.Forks{ + chain.London: chain.NewFork(5), + }, + }, + Genesis: &chain.Genesis{ + BaseFeeEM: 4, + }, + }, + stream: &eventStream{}, + } + + bc.headersCache, _ = lru.New(10) + bc.difficultyCache, _ = lru.New(10) + + existingTD := big.NewInt(1) + existingHeader := &types.Header{Number: 1} + header := &types.Header{ + Number: 2, + } + receipts := []*types.Receipt{ + {GasUsed: 100}, + {GasUsed: 200}, + } + tx := &types.Transaction{ + Value: big.NewInt(1), + } + + tx.ComputeHash() + header.ComputeHash() + existingHeader.ComputeHash() + bc.currentHeader.Store(existingHeader) + bc.currentDifficulty.Store(existingTD) + + header.ParentHash = existingHeader.Hash + bc.txSigner = &mockSigner{ + txFromByTxHash: map[types.Hash]types.Address{ + tx.Hash: {1, 2}, + }, + } + + // already existing block write + err := bc.WriteFullBlock(&types.FullBlock{ + Block: &types.Block{ + Header: existingHeader, + Transactions: []*types.Transaction{tx}, + }, + Receipts: receipts, + }, "polybft") + + require.NoError(t, err) + require.Equal(t, 0, len(db)) + require.Equal(t, uint64(1), bc.currentHeader.Load().Number) + + // already existing block write + err = bc.WriteFullBlock(&types.FullBlock{ + Block: &types.Block{ + Header: header, + Transactions: []*types.Transaction{tx}, + }, + Receipts: receipts, + }, "polybft") + + require.NoError(t, err) + require.Equal(t, 8, len(db)) + require.Equal(t, uint64(2), bc.currentHeader.Load().Number) + require.NotNil(t, db[hex.EncodeToHex(getKey(storage.BODY, header.Hash.Bytes()))]) + require.NotNil(t, db[hex.EncodeToHex(getKey(storage.TX_LOOKUP_PREFIX, tx.Hash.Bytes()))]) + require.NotNil(t, db[hex.EncodeToHex(getKey(storage.HEADER, header.Hash.Bytes()))]) + require.NotNil(t, db[hex.EncodeToHex(getKey(storage.HEAD, storage.HASH))]) + require.NotNil(t, db[hex.EncodeToHex(getKey(storage.CANONICAL, common.EncodeUint64ToBytes(header.Number)))]) + require.NotNil(t, db[hex.EncodeToHex(getKey(storage.DIFFICULTY, header.Hash.Bytes()))]) + require.NotNil(t, db[hex.EncodeToHex(getKey(storage.CANONICAL, common.EncodeUint64ToBytes(header.Number)))]) + require.NotNil(t, db[hex.EncodeToHex(getKey(storage.RECEIPTS, header.Hash.Bytes()))]) +} diff --git a/blockchain/storage/batch_writer.go b/blockchain/storage/batch_writer.go new file mode 100644 index 0000000000..e2032601a7 --- /dev/null +++ b/blockchain/storage/batch_writer.go @@ -0,0 +1,96 @@ +package storage + +import ( + "math/big" + + "github.com/0xPolygon/polygon-edge/helper/common" + "github.com/0xPolygon/polygon-edge/types" + "github.com/umbracle/fastrlp" +) + +type Batch interface { + Delete(key []byte) + Write() error + Put(k []byte, v []byte) +} + +type BatchWriter struct { + batch Batch +} + +func NewBatchWriter(storage Storage) *BatchWriter { + return &BatchWriter{batch: storage.NewBatch()} +} + +func (b *BatchWriter) PutHeader(h *types.Header) { + b.putRlp(HEADER, h.Hash.Bytes(), h) +} + +func (b *BatchWriter) PutBody(hash types.Hash, body *types.Body) { + b.putRlp(BODY, hash.Bytes(), body) +} + +func (b *BatchWriter) PutHeadHash(h types.Hash) { + b.putWithPrefix(HEAD, HASH, h.Bytes()) +} + +func (b *BatchWriter) PutTxLookup(hash types.Hash, blockHash types.Hash) { + ar := &fastrlp.Arena{} + vr := ar.NewBytes(blockHash.Bytes()).MarshalTo(nil) + + b.putWithPrefix(TX_LOOKUP_PREFIX, hash.Bytes(), vr) +} + +func (b *BatchWriter) PutHeadNumber(n uint64) { + b.putWithPrefix(HEAD, NUMBER, common.EncodeUint64ToBytes(n)) +} + +func (b *BatchWriter) PutReceipts(hash types.Hash, receipts []*types.Receipt) { + rr := types.Receipts(receipts) + + b.putRlp(RECEIPTS, hash.Bytes(), &rr) +} + +func (b *BatchWriter) PutCanonicalHeader(h *types.Header, diff *big.Int) { + b.PutHeader(h) + b.PutHeadHash(h.Hash) + b.PutHeadNumber(h.Number) + b.PutCanonicalHash(h.Number, h.Hash) + b.PutTotalDifficulty(h.Hash, diff) +} + +func (b *BatchWriter) PutCanonicalHash(n uint64, hash types.Hash) { + b.putWithPrefix(CANONICAL, common.EncodeUint64ToBytes(n), hash.Bytes()) +} + +func (b *BatchWriter) PutTotalDifficulty(hash types.Hash, diff *big.Int) { + b.putWithPrefix(DIFFICULTY, hash.Bytes(), diff.Bytes()) +} + +func (b *BatchWriter) PutForks(forks []types.Hash) { + ff := Forks(forks) + + b.putRlp(FORK, EMPTY, &ff) +} + +func (b *BatchWriter) putRlp(p, k []byte, raw types.RLPMarshaler) { + var data []byte + + if obj, ok := raw.(types.RLPStoreMarshaler); ok { + data = obj.MarshalStoreRLPTo(nil) + } else { + data = raw.MarshalRLPTo(nil) + } + + b.putWithPrefix(p, k, data) +} + +func (b *BatchWriter) putWithPrefix(p, k, data []byte) { + fullKey := append(append(make([]byte, 0, len(p)+len(k)), p...), k...) + + b.batch.Put(fullKey, data) +} + +func (b *BatchWriter) WriteBatch() error { + return b.batch.Write() +} diff --git a/blockchain/storage/keyvalue.go b/blockchain/storage/keyvalue.go index fd1494e86f..a11e969487 100644 --- a/blockchain/storage/keyvalue.go +++ b/blockchain/storage/keyvalue.go @@ -2,10 +2,10 @@ package storage import ( - "encoding/binary" "fmt" "math/big" + "github.com/0xPolygon/polygon-edge/helper/common" "github.com/0xPolygon/polygon-edge/types" "github.com/hashicorp/go-hclog" "github.com/umbracle/fastrlp" @@ -53,8 +53,8 @@ var ( // KV = Key-Value type KV interface { Close() error - Set(p []byte, v []byte) error Get(p []byte) ([]byte, bool, error) + NewBatch() Batch } // KeyValueStorage is a generic storage for kv databases @@ -68,22 +68,11 @@ func NewKeyValueStorage(logger hclog.Logger, db KV) Storage { return &KeyValueStorage{logger: logger, db: db} } -func (s *KeyValueStorage) encodeUint(n uint64) []byte { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b[:], n) - - return b[:] -} - -func (s *KeyValueStorage) decodeUint(b []byte) uint64 { - return binary.BigEndian.Uint64(b[:]) -} - // -- canonical hash -- // ReadCanonicalHash gets the hash from the number of the canonical chain func (s *KeyValueStorage) ReadCanonicalHash(n uint64) (types.Hash, bool) { - data, ok := s.get(CANONICAL, s.encodeUint(n)) + data, ok := s.get(CANONICAL, common.EncodeUint64ToBytes(n)) if !ok { return types.Hash{}, false } @@ -91,11 +80,6 @@ func (s *KeyValueStorage) ReadCanonicalHash(n uint64) (types.Hash, bool) { return types.BytesToHash(data), true } -// WriteCanonicalHash writes a hash for a number block in the canonical chain -func (s *KeyValueStorage) WriteCanonicalHash(n uint64, hash types.Hash) error { - return s.set(CANONICAL, s.encodeUint(n), hash.Bytes()) -} - // HEAD // // ReadHeadHash returns the hash of the head @@ -119,28 +103,11 @@ func (s *KeyValueStorage) ReadHeadNumber() (uint64, bool) { return 0, false } - return s.decodeUint(data), true -} - -// WriteHeadHash writes the hash of the head -func (s *KeyValueStorage) WriteHeadHash(h types.Hash) error { - return s.set(HEAD, HASH, h.Bytes()) -} - -// WriteHeadNumber writes the number of the head -func (s *KeyValueStorage) WriteHeadNumber(n uint64) error { - return s.set(HEAD, NUMBER, s.encodeUint(n)) + return common.EncodeBytesToUint64(data), true } // FORK // -// WriteForks writes the current forks -func (s *KeyValueStorage) WriteForks(forks []types.Hash) error { - ff := Forks(forks) - - return s.writeRLP(FORK, EMPTY, &ff) -} - // ReadForks read the current forks func (s *KeyValueStorage) ReadForks() ([]types.Hash, error) { forks := &Forks{} @@ -151,11 +118,6 @@ func (s *KeyValueStorage) ReadForks() ([]types.Hash, error) { // DIFFICULTY // -// WriteTotalDifficulty writes the difficulty -func (s *KeyValueStorage) WriteTotalDifficulty(hash types.Hash, diff *big.Int) error { - return s.set(DIFFICULTY, hash.Bytes(), diff.Bytes()) -} - // ReadTotalDifficulty reads the difficulty func (s *KeyValueStorage) ReadTotalDifficulty(hash types.Hash) (*big.Int, bool) { v, ok := s.get(DIFFICULTY, hash.Bytes()) @@ -168,11 +130,6 @@ func (s *KeyValueStorage) ReadTotalDifficulty(hash types.Hash) (*big.Int, bool) // HEADER // -// WriteHeader writes the header -func (s *KeyValueStorage) WriteHeader(h *types.Header) error { - return s.writeRLP(HEADER, h.Hash.Bytes(), h) -} - // ReadHeader reads the header func (s *KeyValueStorage) ReadHeader(hash types.Hash) (*types.Header, error) { header := &types.Header{} @@ -181,38 +138,8 @@ func (s *KeyValueStorage) ReadHeader(hash types.Hash) (*types.Header, error) { return header, err } -// WriteCanonicalHeader implements the storage interface -func (s *KeyValueStorage) WriteCanonicalHeader(h *types.Header, diff *big.Int) error { - if err := s.WriteHeader(h); err != nil { - return err - } - - if err := s.WriteHeadHash(h.Hash); err != nil { - return err - } - - if err := s.WriteHeadNumber(h.Number); err != nil { - return err - } - - if err := s.WriteCanonicalHash(h.Number, h.Hash); err != nil { - return err - } - - if err := s.WriteTotalDifficulty(h.Hash, diff); err != nil { - return err - } - - return nil -} - // BODY // -// WriteBody writes the body -func (s *KeyValueStorage) WriteBody(hash types.Hash, body *types.Body) error { - return s.writeRLP(BODY, hash.Bytes(), body) -} - // ReadBody reads the body func (s *KeyValueStorage) ReadBody(hash types.Hash) (*types.Body, error) { body := &types.Body{} @@ -223,13 +150,6 @@ func (s *KeyValueStorage) ReadBody(hash types.Hash) (*types.Body, error) { // RECEIPTS // -// WriteReceipts writes the receipts -func (s *KeyValueStorage) WriteReceipts(hash types.Hash, receipts []*types.Receipt) error { - rr := types.Receipts(receipts) - - return s.writeRLP(RECEIPTS, hash.Bytes(), &rr) -} - // ReadReceipts reads the receipts func (s *KeyValueStorage) ReadReceipts(hash types.Hash) ([]*types.Receipt, error) { receipts := &types.Receipts{} @@ -240,14 +160,6 @@ func (s *KeyValueStorage) ReadReceipts(hash types.Hash) ([]*types.Receipt, error // TX LOOKUP // -// WriteTxLookup maps the transaction hash to the block hash -func (s *KeyValueStorage) WriteTxLookup(hash types.Hash, blockHash types.Hash) error { - ar := &fastrlp.Arena{} - vr := ar.NewBytes(blockHash.Bytes()) - - return s.write2(TX_LOOKUP_PREFIX, hash.Bytes(), vr) -} - // ReadTxLookup reads the block hash using the transaction hash func (s *KeyValueStorage) ReadTxLookup(hash types.Hash) (types.Hash, bool) { parser := &fastrlp.Parser{} @@ -267,19 +179,6 @@ func (s *KeyValueStorage) ReadTxLookup(hash types.Hash) (types.Hash, bool) { return types.BytesToHash(blockHash), true } -// WRITE OPERATIONS // - -func (s *KeyValueStorage) writeRLP(p, k []byte, raw types.RLPMarshaler) error { - var data []byte - if obj, ok := raw.(types.RLPStoreMarshaler); ok { - data = obj.MarshalStoreRLPTo(nil) - } else { - data = raw.MarshalRLPTo(nil) - } - - return s.set(p, k, data) -} - var ErrNotFound = fmt.Errorf("not found") func (s *KeyValueStorage) readRLP(p, k []byte, raw types.RLPUnmarshaler) error { @@ -323,18 +222,6 @@ func (s *KeyValueStorage) read2(p, k []byte, parser *fastrlp.Parser) *fastrlp.Va return v } -func (s *KeyValueStorage) write2(p, k []byte, v *fastrlp.Value) error { - dst := v.MarshalTo(nil) - - return s.set(p, k, dst) -} - -func (s *KeyValueStorage) set(p []byte, k []byte, v []byte) error { - p = append(p, k...) - - return s.db.Set(p, v) -} - func (s *KeyValueStorage) get(p []byte, k []byte) ([]byte, bool) { p = append(p, k...) data, ok, err := s.db.Get(p) @@ -350,3 +237,8 @@ func (s *KeyValueStorage) get(p []byte, k []byte) ([]byte, bool) { func (s *KeyValueStorage) Close() error { return s.db.Close() } + +// NewBatch creates batch used for write/update/delete operations +func (s *KeyValueStorage) NewBatch() Batch { + return s.db.NewBatch() +} diff --git a/blockchain/storage/leveldb/batch.go b/blockchain/storage/leveldb/batch.go new file mode 100644 index 0000000000..4a87712016 --- /dev/null +++ b/blockchain/storage/leveldb/batch.go @@ -0,0 +1,32 @@ +package leveldb + +import ( + "github.com/0xPolygon/polygon-edge/blockchain/storage" + "github.com/syndtr/goleveldb/leveldb" +) + +var _ storage.Batch = (*batchLevelDB)(nil) + +type batchLevelDB struct { + db *leveldb.DB + b *leveldb.Batch +} + +func NewBatchLevelDB(db *leveldb.DB) *batchLevelDB { + return &batchLevelDB{ + db: db, + b: new(leveldb.Batch), + } +} + +func (b *batchLevelDB) Delete(key []byte) { + b.b.Delete(key) +} + +func (b *batchLevelDB) Put(k []byte, v []byte) { + b.b.Put(k, v) +} + +func (b *batchLevelDB) Write() error { + return b.db.Write(b.b, nil) +} diff --git a/blockchain/storage/leveldb/leveldb.go b/blockchain/storage/leveldb/leveldb.go index 02a4c5593b..e49ac04267 100644 --- a/blockchain/storage/leveldb/leveldb.go +++ b/blockchain/storage/leveldb/leveldb.go @@ -6,6 +6,12 @@ import ( "github.com/0xPolygon/polygon-edge/blockchain/storage" "github.com/hashicorp/go-hclog" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +const ( + DefaultCache = int(256) + DefaultHandles = int(256) ) // Factory creates a leveldb storage @@ -23,9 +29,21 @@ func Factory(config map[string]interface{}, logger hclog.Logger) (storage.Storag return NewLevelDBStorage(pathStr, logger) } -// NewLevelDBStorage creates the new storage reference with leveldb +// NewLevelDBStorage creates the new storage reference with leveldb default options func NewLevelDBStorage(path string, logger hclog.Logger) (storage.Storage, error) { - db, err := leveldb.OpenFile(path, nil) + // Set default options + options := &opt.Options{ + OpenFilesCacheCapacity: DefaultHandles, + BlockCacheCapacity: DefaultCache / 2 * opt.MiB, + WriteBuffer: DefaultCache / 4 * opt.MiB, // Two of these are used internally + } + + return NewLevelDBStorageWithOpt(path, logger, options) +} + +// NewLevelDBStorageWithOpt creates the new storage reference with leveldb with custom options +func NewLevelDBStorageWithOpt(path string, logger hclog.Logger, opts *opt.Options) (storage.Storage, error) { + db, err := leveldb.OpenFile(path, opts) if err != nil { return nil, err } @@ -63,3 +81,7 @@ func (l *levelDBKV) Get(p []byte) ([]byte, bool, error) { func (l *levelDBKV) Close() error { return l.db.Close() } + +func (l *levelDBKV) NewBatch() storage.Batch { + return NewBatchLevelDB(l.db) +} diff --git a/blockchain/storage/leveldb/leveldb_test.go b/blockchain/storage/leveldb/leveldb_test.go index e3ea4aef86..95f24c48ae 100644 --- a/blockchain/storage/leveldb/leveldb_test.go +++ b/blockchain/storage/leveldb/leveldb_test.go @@ -1,11 +1,21 @@ package leveldb import ( + "context" + "crypto/rand" + "math/big" "os" + "os/signal" + "path/filepath" + "syscall" "testing" + "time" + "github.com/0xPolygon/polygon-edge/blockchain" "github.com/0xPolygon/polygon-edge/blockchain/storage" + "github.com/0xPolygon/polygon-edge/types" "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" ) func newStorage(t *testing.T) (storage.Storage, func()) { @@ -37,3 +47,210 @@ func newStorage(t *testing.T) (storage.Storage, func()) { func TestStorage(t *testing.T) { storage.TestStorage(t, newStorage) } + +func generateTxs(t *testing.T, startNonce, count int, from types.Address, to *types.Address) []*types.Transaction { + t.Helper() + + txs := make([]*types.Transaction, count) + + for i := range txs { + tx := &types.Transaction{ + Gas: types.StateTransactionGasLimit, + Nonce: uint64(startNonce + i), + From: from, + To: to, + Value: big.NewInt(2000), + Type: types.DynamicFeeTx, + GasFeeCap: big.NewInt(100), + GasTipCap: big.NewInt(10), + } + + input := make([]byte, 1000) + _, err := rand.Read(input) + + require.NoError(t, err) + + tx.ComputeHash() + + txs[i] = tx + } + + return txs +} + +func generateBlock(t *testing.T, num uint64) *types.FullBlock { + t.Helper() + + transactionsCount := 2500 + status := types.ReceiptSuccess + addr1 := types.StringToAddress("17878aa") + addr2 := types.StringToAddress("2bf5653") + b := &types.FullBlock{ + Block: &types.Block{ + Header: &types.Header{ + Number: num, + ExtraData: make([]byte, 32), + Hash: types.ZeroHash, + }, + Transactions: generateTxs(t, 0, transactionsCount, addr1, &addr2), + Uncles: blockchain.NewTestHeaders(10), + }, + Receipts: make([]*types.Receipt, transactionsCount), + } + + logs := make([]*types.Log, 10) + + for i := 0; i < 10; i++ { + logs[i] = &types.Log{ + Address: addr1, + Topics: []types.Hash{types.StringToHash("t1"), types.StringToHash("t2"), types.StringToHash("t3")}, + Data: []byte{0xaa, 0xbb, 0xcc, 0xdd, 0xbb, 0xaa, 0x01, 0x012}, + } + } + + for i := 0; i < len(b.Block.Transactions); i++ { + b.Receipts[i] = &types.Receipt{ + TxHash: b.Block.Transactions[i].Hash, + Root: types.StringToHash("mockhashstring"), + TransactionType: types.LegacyTx, + GasUsed: uint64(100000), + Status: &status, + Logs: logs, + CumulativeGasUsed: uint64(100000), + ContractAddress: &types.Address{0xaa, 0xbb, 0xcc, 0xdd, 0xab, 0xac}, + } + } + + for i := 0; i < 5; i++ { + b.Receipts[i].LogsBloom = types.CreateBloom(b.Receipts) + } + + return b +} + +func newStorageP(t *testing.T) (storage.Storage, func(), string) { + t.Helper() + + p, err := os.MkdirTemp("", "leveldbtest") + require.NoError(t, err) + + require.NoError(t, os.MkdirAll(p, 0755)) + + s, err := NewLevelDBStorage(p, hclog.NewNullLogger()) + require.NoError(t, err) + + closeFn := func() { + require.NoError(t, s.Close()) + + if err := s.Close(); err != nil { + t.Fatal(err) + } + + require.NoError(t, os.RemoveAll(p)) + } + + return s, closeFn, p +} + +func countLdbFilesInPath(path string) int { + pattern := filepath.Join(path, "*.ldb") + + files, err := filepath.Glob(pattern) + if err != nil { + return -1 + } + + return len(files) +} + +func generateBlocks(t *testing.T, count int, ch chan *types.FullBlock, ctx context.Context) { + t.Helper() + + ticker := time.NewTicker(time.Second) + + for i := 1; i <= count; i++ { + b := generateBlock(t, uint64(i)) + select { + case <-ctx.Done(): + close(ch) + ticker.Stop() + + return + case <-ticker.C: + ch <- b + } + } +} + +func dirSize(t *testing.T, path string) int64 { + t.Helper() + + var size int64 + + err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { + if err != nil { + t.Fail() + } + if !info.IsDir() { + size += info.Size() + } + + return err + }) + if err != nil { + t.Log(err) + } + + return size +} + +func TestWriteFullBlock(t *testing.T) { + s, _, path := newStorageP(t) + defer s.Close() + + count := 100 + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*45) + + signchan := make(chan os.Signal, 1) + signal.Notify(signchan, syscall.SIGINT) + + go func() { + <-signchan + cancel() + }() + + blockchain := make(chan *types.FullBlock, 1) + go generateBlocks(t, count, blockchain, ctx) + +insertloop: + for i := 1; i <= count; i++ { + select { + case <-ctx.Done(): + break insertloop + case b := <-blockchain: + batchWriter := storage.NewBatchWriter(s) + + batchWriter.PutBody(b.Block.Hash(), b.Block.Body()) + + for _, tx := range b.Block.Transactions { + batchWriter.PutTxLookup(tx.Hash, b.Block.Hash()) + } + + batchWriter.PutHeader(b.Block.Header) + batchWriter.PutHeadNumber(uint64(i)) + batchWriter.PutHeadHash(b.Block.Header.Hash) + batchWriter.PutReceipts(b.Block.Hash(), b.Receipts) + batchWriter.PutCanonicalHash(uint64(i), b.Block.Hash()) + + if err := batchWriter.WriteBatch(); err != nil { + require.NoError(t, err) + } + + t.Logf("writing block %d", i) + + size := dirSize(t, path) + t.Logf("\tldb file count: %d", countLdbFilesInPath(path)) + t.Logf("\tdir size %d MBs", size/1_000_000) + } + } +} diff --git a/blockchain/storage/memory/batch.go b/blockchain/storage/memory/batch.go new file mode 100644 index 0000000000..e370dcf753 --- /dev/null +++ b/blockchain/storage/memory/batch.go @@ -0,0 +1,38 @@ +package memory + +import ( + "github.com/0xPolygon/polygon-edge/blockchain/storage" + "github.com/0xPolygon/polygon-edge/helper/hex" +) + +var _ storage.Batch = (*batchMemory)(nil) + +type batchMemory struct { + db map[string][]byte + keysToDelete [][]byte + valuesToPut [][2][]byte +} + +func NewBatchMemory(db map[string][]byte) *batchMemory { + return &batchMemory{db: db} +} + +func (b *batchMemory) Delete(key []byte) { + b.keysToDelete = append(b.keysToDelete, key) +} + +func (b *batchMemory) Put(k []byte, v []byte) { + b.valuesToPut = append(b.valuesToPut, [2][]byte{k, v}) +} + +func (b *batchMemory) Write() error { + for _, x := range b.keysToDelete { + delete(b.db, hex.EncodeToHex(x)) + } + + for _, x := range b.valuesToPut { + b.db[hex.EncodeToHex(x[0])] = x[1] + } + + return nil +} diff --git a/blockchain/storage/memory/memory.go b/blockchain/storage/memory/memory.go index 7af8bc7412..15cdf1e67d 100644 --- a/blockchain/storage/memory/memory.go +++ b/blockchain/storage/memory/memory.go @@ -36,3 +36,7 @@ func (m *memoryKV) Get(p []byte) ([]byte, bool, error) { func (m *memoryKV) Close() error { return nil } + +func (m *memoryKV) NewBatch() storage.Batch { + return NewBatchMemory(m.db) +} diff --git a/blockchain/storage/storage.go b/blockchain/storage/storage.go index 1010e8a2f1..15caaf111d 100644 --- a/blockchain/storage/storage.go +++ b/blockchain/storage/storage.go @@ -10,33 +10,24 @@ import ( // Storage is a generic blockchain storage type Storage interface { ReadCanonicalHash(n uint64) (types.Hash, bool) - WriteCanonicalHash(n uint64, hash types.Hash) error ReadHeadHash() (types.Hash, bool) ReadHeadNumber() (uint64, bool) - WriteHeadHash(h types.Hash) error - WriteHeadNumber(uint64) error - WriteForks(forks []types.Hash) error ReadForks() ([]types.Hash, error) - WriteTotalDifficulty(hash types.Hash, diff *big.Int) error ReadTotalDifficulty(hash types.Hash) (*big.Int, bool) - WriteHeader(h *types.Header) error ReadHeader(hash types.Hash) (*types.Header, error) - WriteCanonicalHeader(h *types.Header, diff *big.Int) error - - WriteBody(hash types.Hash, body *types.Body) error ReadBody(hash types.Hash) (*types.Body, error) - WriteReceipts(hash types.Hash, receipts []*types.Receipt) error ReadReceipts(hash types.Hash) ([]*types.Receipt, error) - WriteTxLookup(hash types.Hash, blockHash types.Hash) error ReadTxLookup(hash types.Hash) (types.Hash, bool) + NewBatch() Batch + Close() error } diff --git a/blockchain/storage/testing.go b/blockchain/storage/testing.go index 63330b2d4e..45615363ad 100644 --- a/blockchain/storage/testing.go +++ b/blockchain/storage/testing.go @@ -8,6 +8,7 @@ import ( "github.com/0xPolygon/polygon-edge/helper/hex" "github.com/0xPolygon/polygon-edge/types" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type PlaceholderStorage func(t *testing.T) (Storage, func()) @@ -76,6 +77,8 @@ func testCanonicalChain(t *testing.T, m PlaceholderStorage) { } for _, cc := range cases { + batch := NewBatchWriter(s) + h := &types.Header{ Number: cc.Number, ParentHash: cc.ParentHash, @@ -84,13 +87,10 @@ func testCanonicalChain(t *testing.T, m PlaceholderStorage) { hash := h.Hash - if err := s.WriteHeader(h); err != nil { - t.Fatal(err) - } + batch.PutHeader(h) + batch.PutCanonicalHash(cc.Number, hash) - if err := s.WriteCanonicalHash(cc.Number, hash); err != nil { - t.Fatal(err) - } + require.NoError(t, batch.WriteBatch()) data, ok := s.ReadCanonicalHash(cc.Number) if !ok { @@ -124,6 +124,8 @@ func testDifficulty(t *testing.T, m PlaceholderStorage) { } for indx, cc := range cases { + batch := NewBatchWriter(s) + h := &types.Header{ Number: uint64(indx), ExtraData: []byte{}, @@ -131,13 +133,10 @@ func testDifficulty(t *testing.T, m PlaceholderStorage) { hash := h.Hash - if err := s.WriteHeader(h); err != nil { - t.Fatal(err) - } + batch.PutHeader(h) + batch.PutTotalDifficulty(hash, cc.Diff) - if err := s.WriteTotalDifficulty(hash, cc.Diff); err != nil { - t.Fatal(err) - } + require.NoError(t, batch.WriteBatch()) diff, ok := s.ReadTotalDifficulty(hash) if !ok { @@ -157,23 +156,19 @@ func testHead(t *testing.T, m PlaceholderStorage) { defer closeFn() for i := uint64(0); i < 5; i++ { + batch := NewBatchWriter(s) + h := &types.Header{ Number: i, ExtraData: []byte{}, } hash := h.Hash - if err := s.WriteHeader(h); err != nil { - t.Fatal(err) - } - - if err := s.WriteHeadNumber(i); err != nil { - t.Fatal(err) - } + batch.PutHeader(h) + batch.PutHeadNumber(i) + batch.PutHeadHash(hash) - if err := s.WriteHeadHash(hash); err != nil { - t.Fatal(err) - } + require.NoError(t, batch.WriteBatch()) n2, ok := s.ReadHeadNumber() if !ok { @@ -209,9 +204,11 @@ func testForks(t *testing.T, m PlaceholderStorage) { } for _, cc := range cases { - if err := s.WriteForks(cc.Forks); err != nil { - t.Fatal(err) - } + batch := NewBatchWriter(s) + + batch.PutForks(cc.Forks) + + require.NoError(t, batch.WriteBatch()) forks, err := s.ReadForks() assert.NoError(t, err) @@ -238,9 +235,11 @@ func testHeader(t *testing.T, m PlaceholderStorage) { } header.ComputeHash() - if err := s.WriteHeader(header); err != nil { - t.Fatal(err) - } + batch := NewBatchWriter(s) + + batch.PutHeader(header) + + require.NoError(t, batch.WriteBatch()) header1, err := s.ReadHeader(header.Hash) assert.NoError(t, err) @@ -263,9 +262,12 @@ func testBody(t *testing.T, m PlaceholderStorage) { Timestamp: 10, ExtraData: []byte{}, // if not set it will fail } - if err := s.WriteHeader(header); err != nil { - t.Fatal(err) - } + + batch := NewBatchWriter(s) + + batch.PutHeader(header) + + require.NoError(t, batch.WriteBatch()) addr1 := types.StringToAddress("11") t0 := &types.Transaction{ @@ -296,8 +298,12 @@ func testBody(t *testing.T, m PlaceholderStorage) { Transactions: []*types.Transaction{t0, t1}, } + batch2 := NewBatchWriter(s) body0 := block.Body() - assert.NoError(t, s.WriteBody(header.Hash, body0)) + + batch2.PutBody(header.Hash, body0) + + require.NoError(t, batch2.WriteBatch()) body1, err := s.ReadBody(header.Hash) assert.NoError(t, err) @@ -321,65 +327,64 @@ func testReceipts(t *testing.T, m PlaceholderStorage) { s, closeFn := m(t) defer closeFn() + batch := NewBatchWriter(s) + h := &types.Header{ Difficulty: 133, Number: 11, ExtraData: []byte{}, } - if err := s.WriteHeader(h); err != nil { - t.Fatal(err) - } + h.ComputeHash() - txn := &types.Transaction{ - Nonce: 1000, - Gas: 50, - GasPrice: new(big.Int).SetUint64(100), - V: big.NewInt(11), - } body := &types.Body{ - Transactions: []*types.Transaction{txn}, - } - - if err := s.WriteBody(h.Hash, body); err != nil { - t.Fatal(err) - } - - r0 := &types.Receipt{ - Root: types.StringToHash("1"), - CumulativeGasUsed: 10, - TxHash: txn.Hash, - LogsBloom: types.Bloom{0x1}, - Logs: []*types.Log{ - { - Address: addr1, - Topics: []types.Hash{hash1, hash2}, - Data: []byte{0x1, 0x2}, - }, + Transactions: []*types.Transaction{ { - Address: addr2, - Topics: []types.Hash{hash1}, + Nonce: 1000, + Gas: 50, + GasPrice: new(big.Int).SetUint64(100), + V: big.NewInt(11), }, }, } - r1 := &types.Receipt{ - Root: types.StringToHash("1"), - CumulativeGasUsed: 10, - TxHash: txn.Hash, - LogsBloom: types.Bloom{0x1}, - GasUsed: 10, - ContractAddress: &types.Address{0x1}, - Logs: []*types.Log{ - { - Address: addr2, - Topics: []types.Hash{hash1}, + receipts := []*types.Receipt{ + { + Root: types.StringToHash("1"), + CumulativeGasUsed: 10, + TxHash: body.Transactions[0].Hash, + LogsBloom: types.Bloom{0x1}, + Logs: []*types.Log{ + { + Address: addr1, + Topics: []types.Hash{hash1, hash2}, + Data: []byte{0x1, 0x2}, + }, + { + Address: addr2, + Topics: []types.Hash{hash1}, + }, + }, + }, + { + Root: types.StringToHash("1"), + CumulativeGasUsed: 10, + TxHash: body.Transactions[0].Hash, + LogsBloom: types.Bloom{0x1}, + GasUsed: 10, + ContractAddress: &types.Address{0x1}, + Logs: []*types.Log{ + { + Address: addr2, + Topics: []types.Hash{hash1}, + }, }, }, } - receipts := []*types.Receipt{r0, r1} - if err := s.WriteReceipts(h.Hash, receipts); err != nil { - t.Fatal(err) - } + batch.PutHeader(h) + batch.PutBody(h.Hash, body) + batch.PutReceipts(h.Hash, receipts) + + require.NoError(t, batch.WriteBatch()) found, err := s.ReadReceipts(h.Hash) if err != nil { @@ -402,10 +407,11 @@ func testWriteCanonicalHeader(t *testing.T, m PlaceholderStorage) { h.ComputeHash() diff := new(big.Int).SetUint64(100) + batch := NewBatchWriter(s) - if err := s.WriteCanonicalHeader(h, diff); err != nil { - t.Fatal(err) - } + batch.PutCanonicalHeader(h, diff) + + require.NoError(t, batch.WriteBatch()) hh, err := s.ReadHeader(h.Hash) assert.NoError(t, err) @@ -445,49 +451,30 @@ func testWriteCanonicalHeader(t *testing.T, m PlaceholderStorage) { // Storage delegators type readCanonicalHashDelegate func(uint64) (types.Hash, bool) -type writeCanonicalHashDelegate func(uint64, types.Hash) error type readHeadHashDelegate func() (types.Hash, bool) type readHeadNumberDelegate func() (uint64, bool) -type writeHeadHashDelegate func(types.Hash) error -type writeHeadNumberDelegate func(uint64) error -type writeForksDelegate func([]types.Hash) error type readForksDelegate func() ([]types.Hash, error) -type writeTotalDifficultyDelegate func(types.Hash, *big.Int) error type readTotalDifficultyDelegate func(types.Hash) (*big.Int, bool) -type writeHeaderDelegate func(*types.Header) error type readHeaderDelegate func(types.Hash) (*types.Header, error) -type writeCanonicalHeaderDelegate func(*types.Header, *big.Int) error -type writeBodyDelegate func(types.Hash, *types.Body) error type readBodyDelegate func(types.Hash) (*types.Body, error) -type writeSnapshotDelegate func(types.Hash, []byte) error type readSnapshotDelegate func(types.Hash) ([]byte, bool) -type writeReceiptsDelegate func(types.Hash, []*types.Receipt) error type readReceiptsDelegate func(types.Hash) ([]*types.Receipt, error) -type writeTxLookupDelegate func(types.Hash, types.Hash) error type readTxLookupDelegate func(types.Hash) (types.Hash, bool) type closeDelegate func() error +type newBatchDelegate func() Batch type MockStorage struct { - readCanonicalHashFn readCanonicalHashDelegate - writeCanonicalHashFn writeCanonicalHashDelegate - readHeadHashFn readHeadHashDelegate - readHeadNumberFn readHeadNumberDelegate - writeHeadHashFn writeHeadHashDelegate - writeHeadNumberFn writeHeadNumberDelegate - writeForksFn writeForksDelegate - readForksFn readForksDelegate - writeTotalDifficultyFn writeTotalDifficultyDelegate - readTotalDifficultyFn readTotalDifficultyDelegate - writeHeaderFn writeHeaderDelegate - readHeaderFn readHeaderDelegate - writeCanonicalHeaderFn writeCanonicalHeaderDelegate - writeBodyFn writeBodyDelegate - readBodyFn readBodyDelegate - writeReceiptsFn writeReceiptsDelegate - readReceiptsFn readReceiptsDelegate - writeTxLookupFn writeTxLookupDelegate - readTxLookupFn readTxLookupDelegate - closeFn closeDelegate + readCanonicalHashFn readCanonicalHashDelegate + readHeadHashFn readHeadHashDelegate + readHeadNumberFn readHeadNumberDelegate + readForksFn readForksDelegate + readTotalDifficultyFn readTotalDifficultyDelegate + readHeaderFn readHeaderDelegate + readBodyFn readBodyDelegate + readReceiptsFn readReceiptsDelegate + readTxLookupFn readTxLookupDelegate + closeFn closeDelegate + newBatchFn newBatchDelegate } func NewMockStorage() *MockStorage { @@ -506,18 +493,6 @@ func (m *MockStorage) HookReadCanonicalHash(fn readCanonicalHashDelegate) { m.readCanonicalHashFn = fn } -func (m *MockStorage) WriteCanonicalHash(n uint64, hash types.Hash) error { - if m.writeCanonicalHashFn != nil { - return m.writeCanonicalHashFn(n, hash) - } - - return nil -} - -func (m *MockStorage) HookWriteCanonicalHash(fn writeCanonicalHashDelegate) { - m.writeCanonicalHashFn = fn -} - func (m *MockStorage) ReadHeadHash() (types.Hash, bool) { if m.readHeadHashFn != nil { return m.readHeadHashFn() @@ -542,42 +517,6 @@ func (m *MockStorage) HookReadHeadNumber(fn readHeadNumberDelegate) { m.readHeadNumberFn = fn } -func (m *MockStorage) WriteHeadHash(h types.Hash) error { - if m.writeHeadHashFn != nil { - return m.writeHeadHashFn(h) - } - - return nil -} - -func (m *MockStorage) HookWriteHeadHash(fn writeHeadHashDelegate) { - m.writeHeadHashFn = fn -} - -func (m *MockStorage) WriteHeadNumber(n uint64) error { - if m.writeHeadNumberFn != nil { - return m.writeHeadNumberFn(n) - } - - return nil -} - -func (m *MockStorage) HookWriteHeadNumber(fn writeHeadNumberDelegate) { - m.writeHeadNumberFn = fn -} - -func (m *MockStorage) WriteForks(forks []types.Hash) error { - if m.writeForksFn != nil { - return m.writeForksFn(forks) - } - - return nil -} - -func (m *MockStorage) HookWriteForks(fn writeForksDelegate) { - m.writeForksFn = fn -} - func (m *MockStorage) ReadForks() ([]types.Hash, error) { if m.readForksFn != nil { return m.readForksFn() @@ -590,18 +529,6 @@ func (m *MockStorage) HookReadForks(fn readForksDelegate) { m.readForksFn = fn } -func (m *MockStorage) WriteTotalDifficulty(hash types.Hash, diff *big.Int) error { - if m.writeTotalDifficultyFn != nil { - return m.writeTotalDifficultyFn(hash, diff) - } - - return nil -} - -func (m *MockStorage) HookWriteTotalDifficulty(fn writeTotalDifficultyDelegate) { - m.writeTotalDifficultyFn = fn -} - func (m *MockStorage) ReadTotalDifficulty(hash types.Hash) (*big.Int, bool) { if m.readTotalDifficultyFn != nil { return m.readTotalDifficultyFn(hash) @@ -614,18 +541,6 @@ func (m *MockStorage) HookReadTotalDifficulty(fn readTotalDifficultyDelegate) { m.readTotalDifficultyFn = fn } -func (m *MockStorage) WriteHeader(h *types.Header) error { - if m.writeHeaderFn != nil { - return m.writeHeaderFn(h) - } - - return nil -} - -func (m *MockStorage) HookWriteHeader(fn writeHeaderDelegate) { - m.writeHeaderFn = fn -} - func (m *MockStorage) ReadHeader(hash types.Hash) (*types.Header, error) { if m.readHeaderFn != nil { return m.readHeaderFn(hash) @@ -638,30 +553,6 @@ func (m *MockStorage) HookReadHeader(fn readHeaderDelegate) { m.readHeaderFn = fn } -func (m *MockStorage) WriteCanonicalHeader(h *types.Header, diff *big.Int) error { - if m.writeCanonicalHeaderFn != nil { - return m.writeCanonicalHeaderFn(h, diff) - } - - return nil -} - -func (m *MockStorage) HookWriteCanonicalHeader(fn writeCanonicalHeaderDelegate) { - m.writeCanonicalHeaderFn = fn -} - -func (m *MockStorage) WriteBody(hash types.Hash, body *types.Body) error { - if m.writeBodyFn != nil { - return m.writeBodyFn(hash, body) - } - - return nil -} - -func (m *MockStorage) HookWriteBody(fn writeBodyDelegate) { - m.writeBodyFn = fn -} - func (m *MockStorage) ReadBody(hash types.Hash) (*types.Body, error) { if m.readBodyFn != nil { return m.readBodyFn(hash) @@ -674,18 +565,6 @@ func (m *MockStorage) HookReadBody(fn readBodyDelegate) { m.readBodyFn = fn } -func (m *MockStorage) WriteReceipts(hash types.Hash, receipts []*types.Receipt) error { - if m.writeReceiptsFn != nil { - return m.writeReceiptsFn(hash, receipts) - } - - return nil -} - -func (m *MockStorage) HookWriteReceipts(fn writeReceiptsDelegate) { - m.writeReceiptsFn = fn -} - func (m *MockStorage) ReadReceipts(hash types.Hash) ([]*types.Receipt, error) { if m.readReceiptsFn != nil { return m.readReceiptsFn(hash) @@ -698,18 +577,6 @@ func (m *MockStorage) HookReadReceipts(fn readReceiptsDelegate) { m.readReceiptsFn = fn } -func (m *MockStorage) WriteTxLookup(hash types.Hash, blockHash types.Hash) error { - if m.writeTxLookupFn != nil { - return m.writeTxLookupFn(hash, blockHash) - } - - return nil -} - -func (m *MockStorage) HookWriteTxLookup(fn writeTxLookupDelegate) { - m.writeTxLookupFn = fn -} - func (m *MockStorage) ReadTxLookup(hash types.Hash) (types.Hash, bool) { if m.readTxLookupFn != nil { return m.readTxLookupFn(hash) @@ -733,3 +600,15 @@ func (m *MockStorage) Close() error { func (m *MockStorage) HookClose(fn closeDelegate) { m.closeFn = fn } + +func (m *MockStorage) HookNewBatch(fn newBatchDelegate) { + m.newBatchFn = fn +} + +func (m *MockStorage) NewBatch() Batch { + if m.newBatchFn != nil { + return m.newBatchFn() + } + + return nil +} diff --git a/blockchain/testing.go b/blockchain/testing.go index e0f30864eb..cac29f43bf 100644 --- a/blockchain/testing.go +++ b/blockchain/testing.go @@ -116,12 +116,17 @@ func NewTestBlockchain(t *testing.T, headers []*types.Header) *Blockchain { t.Fatal(err) } - if headers != nil { - if _, err := b.advanceHead(headers[0]); err != nil { + if len(headers) > 0 { + batchWriter := storage.NewBatchWriter(b.db) + td := new(big.Int).SetUint64(headers[0].Difficulty) + + batchWriter.PutCanonicalHeader(headers[0], td) + + if err := b.writeBatchAndUpdate(batchWriter, headers[0], td, true); err != nil { t.Fatal(err) } - if err := b.WriteHeaders(headers[1:]); err != nil { + if err := b.WriteHeadersWithBodies(headers[1:]); err != nil { t.Fatal(err) } }