Skip to content

Commit

Permalink
EVM-660 Use batch write inside blockchain.go (LevelDB/memory) (#1569)
Browse files Browse the repository at this point in the history
Co-authored-by: Igor Crevar <crewce@gmail.com>
Co-authored-by: Victor Castell <victor@polygon.technology>
  • Loading branch information
3 people committed Jul 5, 2023
1 parent 18e5529 commit 2d35db6
Show file tree
Hide file tree
Showing 12 changed files with 813 additions and 549 deletions.
300 changes: 131 additions & 169 deletions blockchain/blockchain.go

Large diffs are not rendered by default.

160 changes: 143 additions & 17 deletions blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import (
"reflect"
"testing"

"github.com/0xPolygon/polygon-edge/helper/common"
"github.com/0xPolygon/polygon-edge/helper/hex"
"github.com/0xPolygon/polygon-edge/state"
"github.com/hashicorp/go-hclog"
lru "github.com/hashicorp/golang-lru"

"github.com/0xPolygon/polygon-edge/chain"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/0xPolygon/polygon-edge/blockchain/storage"
"github.com/0xPolygon/polygon-edge/blockchain/storage/memory"
Expand All @@ -25,8 +29,7 @@ func TestGenesis(t *testing.T) {
genesis := &types.Header{Difficulty: 1, Number: 0}
genesis.ComputeHash()

_, err := b.advanceHead(genesis)
assert.NoError(t, err)
assert.NoError(t, b.writeGenesisImpl(genesis))

header := b.Header()
assert.Equal(t, header.Hash, genesis.Hash)
Expand Down Expand Up @@ -464,7 +467,8 @@ func TestInsertHeaders(t *testing.T) {

// run the history
for i := 1; i < len(cc.History); i++ {
if err := b.WriteHeaders([]*types.Header{chain.headers[cc.History[i].header.hash]}); err != nil {
headers := []*types.Header{chain.headers[cc.History[i].header.hash]}
if err := b.WriteHeadersWithBodies(headers); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -533,11 +537,15 @@ func TestForkUnknownParents(t *testing.T) {
h1 := AppendNewTestHeaders(h0[:5], 10)

// Write genesis
_, err := b.advanceHead(h0[0])
assert.NoError(t, err)
batchWriter := storage.NewBatchWriter(b.db)
td := new(big.Int).SetUint64(h0[0].Difficulty)

batchWriter.PutCanonicalHeader(h0[0], td)

assert.NoError(t, b.writeBatchAndUpdate(batchWriter, h0[0], td, true))

// Write 10 headers
assert.NoError(t, b.WriteHeaders(h0[1:]))
assert.NoError(t, b.WriteHeadersWithBodies(h0[1:]))

// Cannot write this header because the father h1[11] is not known
assert.Error(t, b.WriteHeadersWithBodies([]*types.Header{h1[12]}))
Expand All @@ -553,14 +561,15 @@ func TestBlockchainWriteBody(t *testing.T) {
newChain := func(
t *testing.T,
txFromByTxHash map[types.Hash]types.Address,
path string,
) *Blockchain {
t.Helper()

storage, err := memory.NewMemoryStorage(nil)
dbStorage, err := memory.NewMemoryStorage(nil)
assert.NoError(t, err)

chain := &Blockchain{
db: storage,
db: dbStorage,
txSigner: &mockSigner{
txFromByTxHash: txFromByTxHash,
},
Expand Down Expand Up @@ -590,12 +599,15 @@ func TestBlockchainWriteBody(t *testing.T) {

txFromByTxHash := map[types.Hash]types.Address{}

chain := newChain(t, txFromByTxHash)
chain := newChain(t, txFromByTxHash, "t1")
defer chain.db.Close()
batchWriter := storage.NewBatchWriter(chain.db)

assert.NoError(
t,
chain.writeBody(block),
chain.writeBody(batchWriter, block),
)
assert.NoError(t, batchWriter.WriteBatch())
})

t.Run("should return error if tx doesn't have from and recovering address fails", func(t *testing.T) {
Expand All @@ -618,13 +630,16 @@ func TestBlockchainWriteBody(t *testing.T) {

txFromByTxHash := map[types.Hash]types.Address{}

chain := newChain(t, txFromByTxHash)
chain := newChain(t, txFromByTxHash, "t2")
defer chain.db.Close()
batchWriter := storage.NewBatchWriter(chain.db)

assert.ErrorIs(
t,
errRecoveryAddressFailed,
chain.writeBody(block),
chain.writeBody(batchWriter, block),
)
assert.NoError(t, batchWriter.WriteBatch())
})

t.Run("should recover from address and store to storage", func(t *testing.T) {
Expand All @@ -649,9 +664,12 @@ func TestBlockchainWriteBody(t *testing.T) {
tx.Hash: addr,
}

chain := newChain(t, txFromByTxHash)
chain := newChain(t, txFromByTxHash, "t3")
defer chain.db.Close()
batchWriter := storage.NewBatchWriter(chain.db)

assert.NoError(t, chain.writeBody(block))
assert.NoError(t, chain.writeBody(batchWriter, block))
assert.NoError(t, batchWriter.WriteBatch())

readBody, ok := chain.readBody(block.Hash())
assert.True(t, ok)
Expand Down Expand Up @@ -854,20 +872,22 @@ func Test_recoverFromFieldsInTransactions(t *testing.T) {
}

func TestBlockchainReadBody(t *testing.T) {
storage, err := memory.NewMemoryStorage(nil)
dbStorage, err := memory.NewMemoryStorage(nil)
assert.NoError(t, err)

txFromByTxHash := make(map[types.Hash]types.Address)
addr := types.StringToAddress("1")

b := &Blockchain{
logger: hclog.NewNullLogger(),
db: storage,
db: dbStorage,
txSigner: &mockSigner{
txFromByTxHash: txFromByTxHash,
},
}

batchWriter := storage.NewBatchWriter(b.db)

tx := &types.Transaction{
Value: big.NewInt(10),
V: big.NewInt(1),
Expand All @@ -886,10 +906,12 @@ func TestBlockchainReadBody(t *testing.T) {

txFromByTxHash[tx.Hash] = types.ZeroAddress

if err := b.writeBody(block); err != nil {
if err := b.writeBody(batchWriter, block); err != nil {
t.Fatal(err)
}

assert.NoError(t, batchWriter.WriteBatch())

txFromByTxHash[tx.Hash] = addr

readBody, found := b.readBody(block.Hash())
Expand Down Expand Up @@ -1390,3 +1412,107 @@ func TestBlockchain_CalculateBaseFee(t *testing.T) {
})
}
}

func TestBlockchain_WriteFullBlock(t *testing.T) {
t.Parallel()

getKey := func(p []byte, k []byte) []byte {
return append(append(make([]byte, 0, len(p)+len(k)), p...), k...)
}
db := map[string][]byte{}
consensusMock := &MockVerifier{
processHeadersFn: func(hs []*types.Header) error {
assert.Len(t, hs, 1)

return nil
},
}

storageMock := storage.NewMockStorage()
storageMock.HookNewBatch(func() storage.Batch {
return memory.NewBatchMemory(db)
})

bc := &Blockchain{
gpAverage: &gasPriceAverage{
count: new(big.Int),
},
logger: hclog.NewNullLogger(),
db: storageMock,
consensus: consensusMock,
config: &chain.Chain{
Params: &chain.Params{
Forks: &chain.Forks{
chain.London: chain.NewFork(5),
},
},
Genesis: &chain.Genesis{
BaseFeeEM: 4,
},
},
stream: &eventStream{},
}

bc.headersCache, _ = lru.New(10)
bc.difficultyCache, _ = lru.New(10)

existingTD := big.NewInt(1)
existingHeader := &types.Header{Number: 1}
header := &types.Header{
Number: 2,
}
receipts := []*types.Receipt{
{GasUsed: 100},
{GasUsed: 200},
}
tx := &types.Transaction{
Value: big.NewInt(1),
}

tx.ComputeHash()
header.ComputeHash()
existingHeader.ComputeHash()
bc.currentHeader.Store(existingHeader)
bc.currentDifficulty.Store(existingTD)

header.ParentHash = existingHeader.Hash
bc.txSigner = &mockSigner{
txFromByTxHash: map[types.Hash]types.Address{
tx.Hash: {1, 2},
},
}

// already existing block write
err := bc.WriteFullBlock(&types.FullBlock{
Block: &types.Block{
Header: existingHeader,
Transactions: []*types.Transaction{tx},
},
Receipts: receipts,
}, "polybft")

require.NoError(t, err)
require.Equal(t, 0, len(db))
require.Equal(t, uint64(1), bc.currentHeader.Load().Number)

// already existing block write
err = bc.WriteFullBlock(&types.FullBlock{
Block: &types.Block{
Header: header,
Transactions: []*types.Transaction{tx},
},
Receipts: receipts,
}, "polybft")

require.NoError(t, err)
require.Equal(t, 8, len(db))
require.Equal(t, uint64(2), bc.currentHeader.Load().Number)
require.NotNil(t, db[hex.EncodeToHex(getKey(storage.BODY, header.Hash.Bytes()))])
require.NotNil(t, db[hex.EncodeToHex(getKey(storage.TX_LOOKUP_PREFIX, tx.Hash.Bytes()))])
require.NotNil(t, db[hex.EncodeToHex(getKey(storage.HEADER, header.Hash.Bytes()))])
require.NotNil(t, db[hex.EncodeToHex(getKey(storage.HEAD, storage.HASH))])
require.NotNil(t, db[hex.EncodeToHex(getKey(storage.CANONICAL, common.EncodeUint64ToBytes(header.Number)))])
require.NotNil(t, db[hex.EncodeToHex(getKey(storage.DIFFICULTY, header.Hash.Bytes()))])
require.NotNil(t, db[hex.EncodeToHex(getKey(storage.CANONICAL, common.EncodeUint64ToBytes(header.Number)))])
require.NotNil(t, db[hex.EncodeToHex(getKey(storage.RECEIPTS, header.Hash.Bytes()))])
}
96 changes: 96 additions & 0 deletions blockchain/storage/batch_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package storage

import (
"math/big"

"github.com/0xPolygon/polygon-edge/helper/common"
"github.com/0xPolygon/polygon-edge/types"
"github.com/umbracle/fastrlp"
)

type Batch interface {
Delete(key []byte)
Write() error
Put(k []byte, v []byte)
}

type BatchWriter struct {
batch Batch
}

func NewBatchWriter(storage Storage) *BatchWriter {
return &BatchWriter{batch: storage.NewBatch()}
}

func (b *BatchWriter) PutHeader(h *types.Header) {
b.putRlp(HEADER, h.Hash.Bytes(), h)
}

func (b *BatchWriter) PutBody(hash types.Hash, body *types.Body) {
b.putRlp(BODY, hash.Bytes(), body)
}

func (b *BatchWriter) PutHeadHash(h types.Hash) {
b.putWithPrefix(HEAD, HASH, h.Bytes())
}

func (b *BatchWriter) PutTxLookup(hash types.Hash, blockHash types.Hash) {
ar := &fastrlp.Arena{}
vr := ar.NewBytes(blockHash.Bytes()).MarshalTo(nil)

b.putWithPrefix(TX_LOOKUP_PREFIX, hash.Bytes(), vr)
}

func (b *BatchWriter) PutHeadNumber(n uint64) {
b.putWithPrefix(HEAD, NUMBER, common.EncodeUint64ToBytes(n))
}

func (b *BatchWriter) PutReceipts(hash types.Hash, receipts []*types.Receipt) {
rr := types.Receipts(receipts)

b.putRlp(RECEIPTS, hash.Bytes(), &rr)
}

func (b *BatchWriter) PutCanonicalHeader(h *types.Header, diff *big.Int) {
b.PutHeader(h)
b.PutHeadHash(h.Hash)
b.PutHeadNumber(h.Number)
b.PutCanonicalHash(h.Number, h.Hash)
b.PutTotalDifficulty(h.Hash, diff)
}

func (b *BatchWriter) PutCanonicalHash(n uint64, hash types.Hash) {
b.putWithPrefix(CANONICAL, common.EncodeUint64ToBytes(n), hash.Bytes())
}

func (b *BatchWriter) PutTotalDifficulty(hash types.Hash, diff *big.Int) {
b.putWithPrefix(DIFFICULTY, hash.Bytes(), diff.Bytes())
}

func (b *BatchWriter) PutForks(forks []types.Hash) {
ff := Forks(forks)

b.putRlp(FORK, EMPTY, &ff)
}

func (b *BatchWriter) putRlp(p, k []byte, raw types.RLPMarshaler) {
var data []byte

if obj, ok := raw.(types.RLPStoreMarshaler); ok {
data = obj.MarshalStoreRLPTo(nil)
} else {
data = raw.MarshalRLPTo(nil)
}

b.putWithPrefix(p, k, data)
}

func (b *BatchWriter) putWithPrefix(p, k, data []byte) {
fullKey := append(append(make([]byte, 0, len(p)+len(k)), p...), k...)

b.batch.Put(fullKey, data)
}

func (b *BatchWriter) WriteBatch() error {
return b.batch.Write()
}
Loading

0 comments on commit 2d35db6

Please sign in to comment.