Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[blockdao] Optimize derialization when retrieving receipts #4221

Merged
merged 4 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type (
// ActionsByAddress returns all actions associated with an address
ActionsByAddress(addr address.Address, start uint64, count uint64) ([]*iotexapi.ActionInfo, error)
// ActionByActionHash returns action by action hash
ActionByActionHash(h hash.Hash256) (*action.SealedEnvelope, hash.Hash256, uint64, uint32, error)
ActionByActionHash(h hash.Hash256) (*action.SealedEnvelope, *block.Block, uint32, error)
// PendingActionByActionHash returns action by action hash
PendingActionByActionHash(h hash.Hash256) (*action.SealedEnvelope, error)
// ActPoolActions returns the all Transaction Identifiers in the actpool
Expand Down Expand Up @@ -1089,24 +1089,24 @@ func (core *coreService) BlockHashByBlockHeight(blkHeight uint64) (hash.Hash256,
}

// ActionByActionHash returns action by action hash
func (core *coreService) ActionByActionHash(h hash.Hash256) (*action.SealedEnvelope, hash.Hash256, uint64, uint32, error) {
func (core *coreService) ActionByActionHash(h hash.Hash256) (*action.SealedEnvelope, *block.Block, uint32, error) {
if err := core.checkActionIndex(); err != nil {
return nil, hash.ZeroHash256, 0, 0, status.Error(codes.NotFound, blockindex.ErrActionIndexNA.Error())
return nil, nil, 0, status.Error(codes.NotFound, blockindex.ErrActionIndexNA.Error())
}

actIndex, err := core.indexer.GetActionIndex(h[:])
if err != nil {
return nil, hash.ZeroHash256, 0, 0, errors.Wrap(ErrNotFound, err.Error())
return nil, nil, 0, errors.Wrap(ErrNotFound, err.Error())
}
blk, err := core.dao.GetBlockByHeight(actIndex.BlockHeight())
if err != nil {
return nil, hash.ZeroHash256, 0, 0, errors.Wrap(ErrNotFound, err.Error())
return nil, nil, 0, errors.Wrap(ErrNotFound, err.Error())
}
selp, index, err := blk.ActionByHash(h)
if err != nil {
return nil, hash.ZeroHash256, 0, 0, errors.Wrap(ErrNotFound, err.Error())
return nil, nil, 0, errors.Wrap(ErrNotFound, err.Error())
}
return selp, blk.HashBlock(), actIndex.BlockHeight(), index, nil
return selp, blk, index, nil
}

// ActionByActionHash returns action by action hash
Expand Down Expand Up @@ -1290,9 +1290,9 @@ func (core *coreService) pendingAction(selp *action.SealedEnvelope) (*iotexapi.A
}

func (core *coreService) getAction(actHash hash.Hash256, checkPending bool) (*iotexapi.ActionInfo, error) {
selp, blkHash, blkHeight, actIndex, err := core.ActionByActionHash(actHash)
selp, blk, actIndex, err := core.ActionByActionHash(actHash)
if err == nil {
act, err := core.committedAction(selp, blkHash, blkHeight)
act, err := core.committedAction(selp, blk.HashBlock(), blk.Height())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion api/grpcserver_integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2344,7 +2344,7 @@ func TestGrpcServer_GetActionByActionHashIntegrity(t *testing.T) {
}()

for _, test := range _getActionByActionHashTest {
ret, _, _, _, err := svr.core.ActionByActionHash(test.h)
ret, _, _, err := svr.core.ActionByActionHash(test.h)
require.NoError(err)
require.Equal(test.expectedNounce, ret.Envelope.Nonce())
}
Expand Down
16 changes: 5 additions & 11 deletions api/web3server.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,11 +585,11 @@ func (svr *web3Handler) getTransactionByHash(in *gjson.Result) (interface{}, err
return nil, err
}

selp, blkHash, _, _, err := svr.coreService.ActionByActionHash(actHash)
selp, blk, _, err := svr.coreService.ActionByActionHash(actHash)
if err == nil {
receipt, err := svr.coreService.ReceiptByActionHash(actHash)
if err == nil {
return svr.assembleConfirmedTransaction(blkHash, selp, receipt)
return svr.assembleConfirmedTransaction(blk.HashBlock(), selp, receipt)
}
if errors.Cause(err) == ErrNotFound {
return nil, nil
Expand Down Expand Up @@ -629,7 +629,7 @@ func (svr *web3Handler) getTransactionReceipt(in *gjson.Result) (interface{}, er
}

// acquire action receipt by action hash
selp, blockHash, _, _, err := svr.coreService.ActionByActionHash(actHash)
selp, blk, _, err := svr.coreService.ActionByActionHash(actHash)
if err != nil {
if errors.Cause(err) == ErrNotFound {
return nil, nil
Expand All @@ -649,19 +649,13 @@ func (svr *web3Handler) getTransactionReceipt(in *gjson.Result) (interface{}, er
}

// acquire logsBloom from blockMeta
blkHash := hex.EncodeToString(blockHash[:])
blk, err := svr.coreService.BlockByHash(blkHash)
if err != nil {
return nil, err
}

var logsBloomStr string
if logsBloom := blk.Block.LogsBloomfilter(); logsBloom != nil {
if logsBloom := blk.LogsBloomfilter(); logsBloom != nil {
logsBloomStr = hex.EncodeToString(logsBloom.Bytes())
}

return &getReceiptResult{
blockHash: blockHash,
blockHash: blk.HashBlock(),
from: selp.SenderAddress(),
to: to,
contractAddress: contractAddr,
Expand Down
21 changes: 13 additions & 8 deletions api/web3server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,15 @@ func TestGetTransactionByHash(t *testing.T) {
ContractAddress: "test",
TxIndex: 1,
}
core.EXPECT().ActionByActionHash(gomock.Any()).Return(selp, hash.Hash256b([]byte("test")), uint64(0), uint32(0), nil)
blk, err := block.NewTestingBuilder().
SetHeight(1).
SetVersion(111).
SetPrevBlockHash(hash.ZeroHash256).
SetTimeStamp(time.Now()).
AddActions(selp).
SignAndBuild(identityset.PrivateKey(0))
require.NoError(err)
core.EXPECT().ActionByActionHash(gomock.Any()).Return(selp, &blk, uint32(0), nil)
core.EXPECT().ReceiptByActionHash(gomock.Any()).Return(receipt, nil)
core.EXPECT().EVMNetworkID().Return(uint32(0))

Expand All @@ -642,7 +650,7 @@ func TestGetTransactionByHash(t *testing.T) {
require.Equal(receipt, rlt.receipt)

// get pending transaction
core.EXPECT().ActionByActionHash(gomock.Any()).Return(nil, hash.ZeroHash256, uint64(0), uint32(0), ErrNotFound)
core.EXPECT().ActionByActionHash(gomock.Any()).Return(nil, nil, uint32(0), ErrNotFound)
core.EXPECT().PendingActionByActionHash(gomock.Any()).Return(selp, nil)
core.EXPECT().EVMNetworkID().Return(uint32(0))
ret, err = web3svr.getTransactionByHash(&in)
Expand All @@ -657,7 +665,7 @@ func TestGetTransactionByHash(t *testing.T) {
require.NoError(err)
txHash, err = selp.Hash()
require.NoError(err)
core.EXPECT().ActionByActionHash(gomock.Any()).Return(nil, hash.ZeroHash256, uint64(0), uint32(0), ErrNotFound)
core.EXPECT().ActionByActionHash(gomock.Any()).Return(nil, nil, uint32(0), ErrNotFound)
core.EXPECT().PendingActionByActionHash(gomock.Any()).Return(selp, nil)
core.EXPECT().EVMNetworkID().Return(uint32(0))
ret, err = web3svr.getTransactionByHash(&in)
Expand Down Expand Up @@ -737,8 +745,6 @@ func TestGetTransactionReceipt(t *testing.T) {
ContractAddress: "test",
TxIndex: 1,
}
core.EXPECT().ActionByActionHash(gomock.Any()).Return(selp, hash.Hash256b([]byte("test")), uint64(0), uint32(0), nil)
core.EXPECT().ReceiptByActionHash(gomock.Any()).Return(receipt, nil)
blk, err := block.NewTestingBuilder().
SetHeight(1).
SetVersion(111).
Expand All @@ -747,9 +753,8 @@ func TestGetTransactionReceipt(t *testing.T) {
AddActions(selp).
SignAndBuild(identityset.PrivateKey(0))
require.NoError(err)
core.EXPECT().BlockByHash(gomock.Any()).Return(&apitypes.BlockWithReceipts{
Block: &blk,
}, nil)
core.EXPECT().ActionByActionHash(gomock.Any()).Return(selp, &blk, uint32(0), nil)
core.EXPECT().ReceiptByActionHash(gomock.Any()).Return(receipt, nil)

t.Run("nil params", func(t *testing.T) {
inNil := gjson.Parse(`{"params":[]}`)
Expand Down
44 changes: 33 additions & 11 deletions blockchain/block/block_deserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,25 +101,36 @@ func (bd *Deserializer) DeserializeBody(buf []byte) (*Body, error) {
return &b, nil
}

// FromBlockStoreProto converts protobuf to block store
func (bd *Deserializer) FromBlockStoreProto(pb *iotextypes.BlockStore) (*Store, error) {
in := &Store{}
func (bd *Deserializer) BlockFromBlockStoreProto(pb *iotextypes.BlockStore) (*Block, error) {
return bd.blockFromBlockStoreProto(pb)
}

func (bd *Deserializer) blockFromBlockStoreProto(pb *iotextypes.BlockStore) (*Block, error) {
blk, err := bd.FromBlockProto(pb.Block)
if err != nil {
return nil, err
}
// verify merkle root can match after deserialize
if err := blk.VerifyTxRoot(); err != nil {
return nil, err
}
// TODO: Reenable this if necessary
// // verify merkle root can match after deserialize
// if err := blk.VerifyTxRoot(); err != nil {
// return nil, err
// }
Copy link
Member

@dustinxie dustinxie Apr 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uncomment this?
this is not the bottleneck for now, also it how has a cache for blocks now

return blk, nil
}

func (bd *Deserializer) ReceiptsFromBlockStoreProto(pb *iotextypes.BlockStore) ([]*action.Receipt, error) {
return bd.receiptsFromBlockStoreProto(pb)
}

in.Block = blk
func (bd *Deserializer) receiptsFromBlockStoreProto(pb *iotextypes.BlockStore) ([]*action.Receipt, error) {
receipts := make([]*action.Receipt, 0)
for _, receiptPb := range pb.Receipts {
receipt := &action.Receipt{}
receipt.ConvertFromReceiptPb(receiptPb)
in.Receipts = append(in.Receipts, receipt)
receipts = append(receipts, receipt)
}
return in, nil

return receipts, nil
}

// DeserializeBlockStore de-serializes a block store
Expand All @@ -128,5 +139,16 @@ func (bd *Deserializer) DeserializeBlockStore(buf []byte) (*Store, error) {
if err := proto.Unmarshal(buf, &pb); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal block store")
}
return bd.FromBlockStoreProto(&pb)
blk, err := bd.blockFromBlockStoreProto(&pb)
if err != nil {
return nil, err
}
receipts, err := bd.receiptsFromBlockStoreProto(&pb)
if err != nil {
return nil, err
}
return &Store{
Block: blk,
Receipts: receipts,
}, nil
}
8 changes: 4 additions & 4 deletions blockchain/block/block_deserializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ func TestBlockStoreDeserializer(t *testing.T) {
require.NotNil(storeProto)

bd := Deserializer{}
store1, err := bd.FromBlockStoreProto(storeProto)
store1, err := bd.BlockFromBlockStoreProto(storeProto)
require.NoError(err)

require.Equal(store1.Block.height, store.Block.height)
require.Equal(store1.Block.Header.prevBlockHash, store.Block.Header.prevBlockHash)
require.Equal(store1.Block.Header.blockSig, store.Block.Header.blockSig)
require.Equal(store1.height, store.Block.height)
require.Equal(store1.Header.prevBlockHash, store.Block.Header.prevBlockHash)
require.Equal(store1.Header.blockSig, store.Block.Header.blockSig)
}
54 changes: 30 additions & 24 deletions blockchain/filedao/filedao_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,19 @@ var (
type (
// fileDAOv2 handles chain db file after file split activation at v1.1.2
fileDAOv2 struct {
filename string
header *FileHeader
tip *FileTip
blkBuffer *stagingBuffer
blkCache cache.LRUCache
kvStore db.KVStore
batch batch.KVStoreBatch
hashStore db.CountingIndex // store block hash
blkStore db.CountingIndex // store raw blocks
sysStore db.CountingIndex // store transaction log
deser *block.Deserializer
filename string
header *FileHeader
tip *FileTip
blkBuffer *stagingBuffer
blkStorePbCache cache.LRUCache
blkCache cache.LRUCache
receiptCache cache.LRUCache
kvStore db.KVStore
batch batch.KVStoreBatch
hashStore db.CountingIndex // store block hash
blkStore db.CountingIndex // store raw blocks
sysStore db.CountingIndex // store transaction log
deser *block.Deserializer
}
)

Expand All @@ -68,22 +70,26 @@ func newFileDAOv2(bottom uint64, cfg db.Config, deser *block.Deserializer) (*fil
tip: &FileTip{
Height: bottom - 1,
},
blkCache: cache.NewThreadSafeLruCache(16),
kvStore: db.NewBoltDB(cfg),
batch: batch.NewBatch(),
deser: deser,
blkStorePbCache: cache.NewThreadSafeLruCache(16),
blkCache: cache.NewThreadSafeLruCache(256),
receiptCache: cache.NewThreadSafeLruCache(256),
kvStore: db.NewBoltDB(cfg),
batch: batch.NewBatch(),
deser: deser,
}
return &fd, nil
}

// openFileDAOv2 opens an existing v2 file
func openFileDAOv2(cfg db.Config, deser *block.Deserializer) *fileDAOv2 {
return &fileDAOv2{
filename: cfg.DbPath,
blkCache: cache.NewThreadSafeLruCache(16),
kvStore: db.NewBoltDB(cfg),
batch: batch.NewBatch(),
deser: deser,
filename: cfg.DbPath,
blkStorePbCache: cache.NewThreadSafeLruCache(16),
blkCache: cache.NewThreadSafeLruCache(256),
receiptCache: cache.NewThreadSafeLruCache(256),
kvStore: db.NewBoltDB(cfg),
batch: batch.NewBatch(),
deser: deser,
}
}

Expand Down Expand Up @@ -185,19 +191,19 @@ func (fd *fileDAOv2) GetBlockByHeight(height uint64) (*block.Block, error) {
if height == 0 {
return block.GenesisBlock(), nil
}
blkInfo, err := fd.getBlockStore(height)
blk, err := fd.getBlock(height)
if err != nil {
return nil, errors.Wrapf(err, "failed to get block at height %d", height)
}
return blkInfo.Block, nil
return blk, nil
}

func (fd *fileDAOv2) GetReceipts(height uint64) ([]*action.Receipt, error) {
blkInfo, err := fd.getBlockStore(height)
receipts, err := fd.getReceipt(height)
if err != nil {
return nil, errors.Wrapf(err, "failed to get receipts at height %d", height)
}
return blkInfo.Receipts, nil
return receipts, nil
}

func (fd *fileDAOv2) ContainsTransactionLog() bool {
Expand Down
Loading
Loading