From 9d2d7e3a867c9c4a9265e34d3a72e8601bee9496 Mon Sep 17 00:00:00 2001 From: senthil Date: Thu, 13 Aug 2020 22:47:36 +0530 Subject: [PATCH 1/4] deprioritize unreconcilable missingPvtData Signed-off-by: senthil --- core/ledger/kvledger/kv_ledger.go | 2 +- core/ledger/pvtdatastorage/helper.go | 31 +- core/ledger/pvtdatastorage/kv_encoding.go | 128 ++-- .../ledger/pvtdatastorage/kv_encoding_test.go | 55 +- .../reconcile_missing_pvtdata.go | 290 +++++--- .../reconcile_missing_pvtdata_test.go | 672 ++++++++++++++---- core/ledger/pvtdatastorage/store.go | 97 +-- core/ledger/pvtdatastorage/store_test.go | 128 +++- core/ledger/pvtdatastorage/test_exports.go | 2 + 9 files changed, 1012 insertions(+), 393 deletions(-) diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index 29b4fc0b3c1..88e3e05959e 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -754,7 +754,7 @@ func (l *kvLedger) CommitPvtDataOfOldBlocks(reconciledPvtdata []*ledger.Reconcil logger.Debugf("[%s:] Committing pvtData of [%d] old blocks to the pvtdatastore", l.ledgerID, len(reconciledPvtdata)) - err = l.pvtdataStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData) + err = l.pvtdataStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData, nil) if err != nil { return nil, err } diff --git a/core/ledger/pvtdatastorage/helper.go b/core/ledger/pvtdatastorage/helper.go index 73bbf064706..24e4519b9b1 100644 --- a/core/ledger/pvtdatastorage/helper.go +++ b/core/ledger/pvtdatastorage/helper.go @@ -139,25 +139,38 @@ func getOrCreateExpiryData(mapByExpiringBlk map[uint64]*ExpiryData, expiringBlk } // deriveKeys constructs dataKeys and missingDataKey from an expiryEntry -func deriveKeys(expiryEntry *expiryEntry) (dataKeys []*dataKey, missingDataKeys []*missingDataKey) { +func deriveKeys(expiryEntry *expiryEntry) ([]*dataKey, []*missingDataKey) { + var dataKeys []*dataKey + var missingDataKeys []*missingDataKey + for ns, colls := range expiryEntry.value.Map { - // 1. constructs dataKeys of expired existing pvt data for coll, txNums := range colls.Map { for _, txNum := range txNums.List { dataKeys = append(dataKeys, - &dataKey{nsCollBlk{ns, coll, expiryEntry.key.committingBlk}, txNum}) + &dataKey{ + nsCollBlk: nsCollBlk{ + ns: ns, + coll: coll, + blkNum: expiryEntry.key.committingBlk, + }, + txNum: txNum, + }) } } - // 2. constructs missingDataKeys of expired missing pvt data + for coll := range colls.MissingDataMap { - // one key for eligible entries and another for ieligible entries missingDataKeys = append(missingDataKeys, - &missingDataKey{nsCollBlk{ns, coll, expiryEntry.key.committingBlk}, true}) - missingDataKeys = append(missingDataKeys, - &missingDataKey{nsCollBlk{ns, coll, expiryEntry.key.committingBlk}, false}) + &missingDataKey{ + nsCollBlk: nsCollBlk{ + ns: ns, + coll: coll, + blkNum: expiryEntry.key.committingBlk, + }, + }) } } - return + + return dataKeys, missingDataKeys } func passesFilter(dataKey *dataKey, filter ledger.PvtNsCollFilter) bool { diff --git a/core/ledger/pvtdatastorage/kv_encoding.go b/core/ledger/pvtdatastorage/kv_encoding.go index 1b1f03c3e57..ec3972cf555 100644 --- a/core/ledger/pvtdatastorage/kv_encoding.go +++ b/core/ledger/pvtdatastorage/kv_encoding.go @@ -19,29 +19,30 @@ import ( ) var ( - pendingCommitKey = []byte{0} - lastCommittedBlkkey = []byte{1} - pvtDataKeyPrefix = []byte{2} - expiryKeyPrefix = []byte{3} - eligibleMissingDataKeyPrefix = []byte{4} - ineligibleMissingDataKeyPrefix = []byte{5} - collElgKeyPrefix = []byte{6} - lastUpdatedOldBlocksKey = []byte{7} + pendingCommitKey = []byte{0} + lastCommittedBlkkey = []byte{1} + pvtDataKeyPrefix = []byte{2} + expiryKeyPrefix = []byte{3} + elgPrioritizedMissingDataGroup = []byte{4} + inelgMissingDataGroup = []byte{5} + collElgKeyPrefix = []byte{6} + lastUpdatedOldBlocksKey = []byte{7} + elgDeprioritizedMissingDataGroup = []byte{8} nilByte = byte(0) emptyValue = []byte{} ) -func getDataKeysForRangeScanByBlockNum(blockNum uint64) (startKey, endKey []byte) { - startKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...) - endKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum+1, 0).ToBytes()...) - return +func getDataKeysForRangeScanByBlockNum(blockNum uint64) ([]byte, []byte) { + startKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...) + endKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum+1, 0).ToBytes()...) + return startKey, endKey } -func getExpiryKeysForRangeScan(minBlkNum, maxBlkNum uint64) (startKey, endKey []byte) { - startKey = append(expiryKeyPrefix, version.NewHeight(minBlkNum, 0).ToBytes()...) - endKey = append(expiryKeyPrefix, version.NewHeight(maxBlkNum+1, 0).ToBytes()...) - return +func getExpiryKeysForRangeScan(minBlkNum, maxBlkNum uint64) ([]byte, []byte) { + startKey := append(expiryKeyPrefix, version.NewHeight(minBlkNum, 0).ToBytes()...) + endKey := append(expiryKeyPrefix, version.NewHeight(maxBlkNum+1, 0).ToBytes()...) + return startKey, endKey } func encodeLastCommittedBlockVal(blockNum uint64) []byte { @@ -107,47 +108,45 @@ func decodeDataValue(datavalueBytes []byte) (*rwset.CollectionPvtReadWriteSet, e return collPvtdata, err } -func encodeMissingDataKey(key *missingDataKey) []byte { - if key.isEligible { - // When missing pvtData reconciler asks for missing data info, - // it is necessary to pass the missing pvtdata info associated with - // the most recent block so that missing pvtdata in the state db can - // be fixed sooner to reduce the "private data matching public hash version - // is not available" error during endorserments. In order to give priority - // to missing pvtData in the most recent block, we use reverse order - // preserving encoding for the missing data key. This simplifies the - // implementation of GetMissingPvtDataInfoForMostRecentBlocks(). - keyBytes := append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(key.blkNum)...) - keyBytes = append(keyBytes, []byte(key.ns)...) - keyBytes = append(keyBytes, nilByte) - return append(keyBytes, []byte(key.coll)...) - } - - keyBytes := append(ineligibleMissingDataKeyPrefix, []byte(key.ns)...) - keyBytes = append(keyBytes, nilByte) - keyBytes = append(keyBytes, []byte(key.coll)...) - keyBytes = append(keyBytes, nilByte) - return append(keyBytes, []byte(encodeReverseOrderVarUint64(key.blkNum))...) +func encodeElgMissingDataKey(group []byte, key *missingDataKey) []byte { + // When missing pvtData reconciler asks for missing data info, + // it is necessary to pass the missing pvtdata info associated with + // the most recent block so that missing pvtdata in the state db can + // be fixed sooner to reduce the "private data matching public hash version + // is not available" error during endorserments. In order to give priority + // to missing pvtData in the most recent block, we use reverse order + // preserving encoding for the missing data key. This simplifies the + // implementation of GetMissingPvtDataInfoForMostRecentBlocks(). + encKey := append(group, encodeReverseOrderVarUint64(key.blkNum)...) + encKey = append(encKey, []byte(key.ns)...) + encKey = append(encKey, nilByte) + return append(encKey, []byte(key.coll)...) } -func decodeMissingDataKey(keyBytes []byte) *missingDataKey { +func decodeElgMissingDataKey(keyBytes []byte) *missingDataKey { key := &missingDataKey{nsCollBlk: nsCollBlk{}} - if keyBytes[0] == eligibleMissingDataKeyPrefix[0] { - blkNum, numBytesConsumed := decodeReverseOrderVarUint64(keyBytes[1:]) - - splittedKey := bytes.Split(keyBytes[numBytesConsumed+1:], []byte{nilByte}) - key.ns = string(splittedKey[0]) - key.coll = string(splittedKey[1]) - key.blkNum = blkNum - key.isEligible = true - return key - } + blkNum, numBytesConsumed := decodeReverseOrderVarUint64(keyBytes[1:]) + splittedKey := bytes.Split(keyBytes[numBytesConsumed+1:], []byte{nilByte}) + key.ns = string(splittedKey[0]) + key.coll = string(splittedKey[1]) + key.blkNum = blkNum + return key +} + +func encodeInelgMissingDataKey(key *missingDataKey) []byte { + encKey := append(inelgMissingDataGroup, []byte(key.ns)...) + encKey = append(encKey, nilByte) + encKey = append(encKey, []byte(key.coll)...) + encKey = append(encKey, nilByte) + return append(encKey, []byte(encodeReverseOrderVarUint64(key.blkNum))...) +} +func decodeInelgMissingDataKey(keyBytes []byte) *missingDataKey { + key := &missingDataKey{nsCollBlk: nsCollBlk{}} splittedKey := bytes.SplitN(keyBytes[1:], []byte{nilByte}, 3) //encoded bytes for blknum may contain empty bytes key.ns = string(splittedKey[0]) key.coll = string(splittedKey[1]) key.blkNum, _ = decodeReverseOrderVarUint64(splittedKey[2]) - key.isEligible = false return key } @@ -184,27 +183,28 @@ func decodeCollElgVal(b []byte) (*CollElgInfo, error) { return m, nil } -func createRangeScanKeysForEligibleMissingDataEntries(blkNum uint64) (startKey, endKey []byte) { - startKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(blkNum)...) - endKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(0)...) +func createRangeScanKeysForElgMissingData(blkNum uint64, group []byte) ([]byte, []byte) { + startKey := append(group, encodeReverseOrderVarUint64(blkNum)...) + endKey := append(group, encodeReverseOrderVarUint64(0)...) return startKey, endKey } -func createRangeScanKeysForIneligibleMissingData(maxBlkNum uint64, ns, coll string) (startKey, endKey []byte) { - startKey = encodeMissingDataKey( +func createRangeScanKeysForInelgMissingData(maxBlkNum uint64, ns, coll string) ([]byte, []byte) { + startKey := encodeInelgMissingDataKey( &missingDataKey{ nsCollBlk: nsCollBlk{ns: ns, coll: coll, blkNum: maxBlkNum}, isEligible: false, }, ) - endKey = encodeMissingDataKey( + endKey := encodeInelgMissingDataKey( &missingDataKey{ nsCollBlk: nsCollBlk{ns: ns, coll: coll, blkNum: 0}, isEligible: false, }, ) - return + + return startKey, endKey } func createRangeScanKeysForCollElg() (startKey, endKey []byte) { @@ -212,16 +212,16 @@ func createRangeScanKeysForCollElg() (startKey, endKey []byte) { encodeCollElgKey(0) } -func datakeyRange(blockNum uint64) (startKey, endKey []byte) { - startKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...) - endKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, math.MaxUint64).ToBytes()...) - return +func datakeyRange(blockNum uint64) ([]byte, []byte) { + startKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...) + endKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum, math.MaxUint64).ToBytes()...) + return startKey, endKey } -func eligibleMissingdatakeyRange(blkNum uint64) (startKey, endKey []byte) { - startKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(blkNum)...) - endKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(blkNum-1)...) - return +func eligibleMissingdatakeyRange(blkNum uint64) ([]byte, []byte) { + startKey := append(elgPrioritizedMissingDataGroup, encodeReverseOrderVarUint64(blkNum)...) + endKey := append(elgPrioritizedMissingDataGroup, encodeReverseOrderVarUint64(blkNum-1)...) + return startKey, endKey } // encodeReverseOrderVarUint64 returns a byte-representation for a uint64 number such that diff --git a/core/ledger/pvtdatastorage/kv_encoding_test.go b/core/ledger/pvtdatastorage/kv_encoding_test.go index 46c49f97aba..65ddd3f6367 100644 --- a/core/ledger/pvtdatastorage/kv_encoding_test.go +++ b/core/ledger/pvtdatastorage/kv_encoding_test.go @@ -21,7 +21,7 @@ func TestDataKeyEncoding(t *testing.T) { require.Equal(t, dataKey1, datakey2) } -func TestDatakeyRange(t *testing.T) { +func TestDataKeyRange(t *testing.T) { blockNum := uint64(20) startKey, endKey := datakeyRange(blockNum) var txNum uint64 @@ -51,27 +51,39 @@ func TestDatakeyRange(t *testing.T) { } } -func TestEligibleMissingdataRange(t *testing.T) { +func TestEligibleMissingDataRange(t *testing.T) { blockNum := uint64(20) startKey, endKey := eligibleMissingdatakeyRange(blockNum) var txNum uint64 for txNum = 0; txNum < 100; txNum++ { - keyOfBlock := encodeMissingDataKey( + keyOfBlock := encodeElgMissingDataKey( + elgPrioritizedMissingDataGroup, &missingDataKey{ - nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum}, - isEligible: true, + nsCollBlk: nsCollBlk{ + ns: "ns", + coll: "coll", + blkNum: blockNum, + }, }, ) - keyOfPreviousBlock := encodeMissingDataKey( + keyOfPreviousBlock := encodeElgMissingDataKey( + elgPrioritizedMissingDataGroup, &missingDataKey{ - nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum - 1}, - isEligible: true, + nsCollBlk: nsCollBlk{ + ns: "ns", + coll: "coll", + blkNum: blockNum - 1, + }, }, ) - keyOfNextBlock := encodeMissingDataKey( + keyOfNextBlock := encodeElgMissingDataKey( + elgPrioritizedMissingDataGroup, &missingDataKey{ - nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum + 1}, - isEligible: true, + nsCollBlk: nsCollBlk{ + ns: "ns", + coll: "coll", + blkNum: blockNum + 1, + }, }, ) require.Equal(t, bytes.Compare(keyOfNextBlock, startKey), -1) @@ -99,19 +111,26 @@ func testEncodeDecodeMissingdataKey(t *testing.T, blkNum uint64) { t.Run("ineligibileKey", func(t *testing.T) { - key.isEligible = false - decodedKey := decodeMissingDataKey( - encodeMissingDataKey(key), + decodedKey := decodeInelgMissingDataKey( + encodeInelgMissingDataKey(key), ) require.Equal(t, key, decodedKey) }, ) - t.Run("ineligibileKey", + t.Run("eligiblePrioritizedKey", + func(t *testing.T) { + decodedKey := decodeElgMissingDataKey( + encodeElgMissingDataKey(elgPrioritizedMissingDataGroup, key), + ) + require.Equal(t, key, decodedKey) + }, + ) + + t.Run("eligibleDeprioritizedKey", func(t *testing.T) { - key.isEligible = true - decodedKey := decodeMissingDataKey( - encodeMissingDataKey(key), + decodedKey := decodeElgMissingDataKey( + encodeElgMissingDataKey(elgDeprioritizedMissingDataGroup, key), ) require.Equal(t, key, decodedKey) }, diff --git a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go index fc5c6a19657..9db1d7796b6 100644 --- a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go +++ b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go @@ -10,6 +10,7 @@ import ( "github.com/hyperledger/fabric-protos-go/ledger/rwset" "github.com/hyperledger/fabric/common/ledger/util/leveldbhelper" "github.com/hyperledger/fabric/core/ledger" + "github.com/pkg/errors" "github.com/willf/bitset" ) @@ -21,91 +22,169 @@ import ( // from the above created data entries // (2) create a db update batch from the update entries // (3) commit the update batch to the pvtStore -func (s *Store) CommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error { +func (s *Store) CommitPvtDataOfOldBlocks( + blocksPvtData map[uint64][]*ledger.TxPvtData, + deprioritizedMissingData ledger.MissingPvtDataInfo, +) error { + s.purgerLock.Lock() + defer s.purgerLock.Unlock() + if s.isLastUpdatedOldBlocksSet { return &ErrIllegalCall{`The lastUpdatedOldBlocksList is set. It means that the stateDB may not be in sync with the pvtStore`} } - oldBlkDataProccessor := &oldBlockDataProcessor{ + p := &oldBlockDataProcessor{ Store: s, + entries: &entriesForPvtDataOfOldBlocks{ + dataEntries: make(map[dataKey]*rwset.CollectionPvtReadWriteSet), + expiryEntries: make(map[expiryKey]*ExpiryData), + prioritizedMissingDataEntries: make(map[nsCollBlk]*bitset.BitSet), + deprioritizedMissingDataEntries: make(map[nsCollBlk]*bitset.BitSet), + }, } - logger.Debugf("Constructing pvtdatastore entries for pvtData of [%d] old blocks", len(blocksPvtData)) - entries, err := oldBlkDataProccessor.constructEntries(blocksPvtData) - if err != nil { + if err := p.prepareDataAndExpiryEntries(blocksPvtData); err != nil { + return err + } + + if err := p.prepareMissingDataEntriesToReflectReconciledData(); err != nil { return err } - logger.Debug("Constructing update batch from pvtdatastore entries") - batch := s.db.NewUpdateBatch() - if err := entries.addToUpdateBatch(batch); err != nil { + if err := p.prepareMissingDataEntriesToReflectPriority(deprioritizedMissingData); err != nil { return err } - logger.Debug("Committing the update batch to pvtdatastore") + batch, err := p.constructDBUpdateBatch() + if err != nil { + return err + } return s.db.WriteBatch(batch, true) } type oldBlockDataProcessor struct { *Store + entries *entriesForPvtDataOfOldBlocks } -func (p *oldBlockDataProcessor) constructEntries(blocksPvtData map[uint64][]*ledger.TxPvtData) (*entriesForPvtDataOfOldBlocks, error) { +func (p *oldBlockDataProcessor) prepareDataAndExpiryEntries(blocksPvtData map[uint64][]*ledger.TxPvtData) error { var dataEntries []*dataEntry + var expData *ExpiryData + for blkNum, pvtData := range blocksPvtData { dataEntries = append(dataEntries, prepareDataEntries(blkNum, pvtData)...) } - entries := &entriesForPvtDataOfOldBlocks{ - dataEntries: make(map[dataKey]*rwset.CollectionPvtReadWriteSet), - expiryEntries: make(map[expiryKey]*ExpiryData), - missingDataEntries: make(map[nsCollBlk]*bitset.BitSet), - } - for _, dataEntry := range dataEntries { - var expData *ExpiryData nsCollBlk := dataEntry.key.nsCollBlk txNum := dataEntry.key.txNum - expKey, err := p.constructExpiryKeyFromDataEntry(dataEntry) + expKey, err := p.constructExpiryKey(dataEntry) if err != nil { - return nil, err + return err } - if !neverExpires(expKey.expiringBlk) { - if expData, err = p.getExpiryDataFromEntriesOrStore(entries, expKey); err != nil { - return nil, err - } - if expData == nil { - // data entry is already expired - // and purged (a rare scenario) - continue - } - expData.addPresentData(nsCollBlk.ns, nsCollBlk.coll, txNum) + + if neverExpires(expKey.expiringBlk) { + p.entries.dataEntries[*dataEntry.key] = dataEntry.value + continue + } + + if expData, err = p.getExpiryData(expKey); err != nil { + return err } + if expData == nil { + // if expiryData is not available, it means that + // the pruge scheduler removed these entries and the + // associated data entry is no longer needed. Note + // that the associated missingData entry would also + // be not present. Hence, we can skip this data entry. + continue + } + expData.addPresentData(nsCollBlk.ns, nsCollBlk.coll, txNum) + + p.entries.dataEntries[*dataEntry.key] = dataEntry.value + p.entries.expiryEntries[expKey] = expData + } + return nil +} + +func (p *oldBlockDataProcessor) prepareMissingDataEntriesToReflectReconciledData() error { + for dataKey := range p.entries.dataEntries { + key := dataKey.nsCollBlk + txNum := uint(dataKey.txNum) - var missingData *bitset.BitSet - if missingData, err = p.getMissingDataFromEntriesOrStore(entries, nsCollBlk); err != nil { - return nil, err + prioMissingData, err := p.getMissingDataFromPrioritizedList(key) + if err != nil { + return err } - if missingData == nil { - // data entry is already expired - // and purged (a rare scenario) + if prioMissingData != nil && prioMissingData.Test(txNum) { + p.entries.prioritizedMissingDataEntries[key] = prioMissingData.Clear(txNum) continue } - missingData.Clear(uint(txNum)) - entries.add(dataEntry, expKey, expData, missingData) + deprioMissingData, err := p.getMissingDataFromDeprioritizedList(key) + if err != nil { + return err + } + if deprioMissingData != nil && deprioMissingData.Test(txNum) { + p.entries.deprioritizedMissingDataEntries[key] = deprioMissingData.Clear(txNum) + } + // if the missing data entry is already purged by the purge scheduler, we would + // get nil missingData from both prioritized and deprioritized list + } + + return nil +} + +func (p *oldBlockDataProcessor) prepareMissingDataEntriesToReflectPriority(deprioritizedList ledger.MissingPvtDataInfo) error { + for blkNum, blkMissingData := range deprioritizedList { + for txNum, txMissingData := range blkMissingData { + for _, nsColl := range txMissingData { + key := nsCollBlk{ + ns: nsColl.Namespace, + coll: nsColl.Collection, + blkNum: blkNum, + } + txNum := uint(txNum) + + prioMissingData, err := p.getMissingDataFromPrioritizedList(key) + if err != nil { + return err + } + if prioMissingData == nil { + // we would reach here when either of the following happens: + // (1) when the purge scheduler already removed the respective + // missing data entry. + // (2) when the missing data info is already persistent in the + // deprioritized list. Currently, we do not have different + // levels of deprioritized list. + // In both of the above case, we can continue to the next entry. + continue + } + p.entries.prioritizedMissingDataEntries[key] = prioMissingData.Clear(txNum) + + deprioMissingData, err := p.getMissingDataFromDeprioritizedList(key) + if err != nil { + return err + } + if deprioMissingData == nil { + deprioMissingData = &bitset.BitSet{} + } + p.entries.deprioritizedMissingDataEntries[key] = deprioMissingData.Set(txNum) + } + } } - return entries, nil + + return nil } -func (p *oldBlockDataProcessor) constructExpiryKeyFromDataEntry(dataEntry *dataEntry) (expiryKey, error) { +func (p *oldBlockDataProcessor) constructExpiryKey(dataEntry *dataEntry) (expiryKey, error) { // get the expiryBlk number to construct the expiryKey nsCollBlk := dataEntry.key.nsCollBlk expiringBlk, err := p.btlPolicy.GetExpiringBlock(nsCollBlk.ns, nsCollBlk.coll, nsCollBlk.blkNum) if err != nil { - return expiryKey{}, err + return expiryKey{}, errors.WithMessagef(err, "error while constructing expiry data key") } return expiryKey{ @@ -114,115 +193,140 @@ func (p *oldBlockDataProcessor) constructExpiryKeyFromDataEntry(dataEntry *dataE }, nil } -func (p *oldBlockDataProcessor) getExpiryDataFromEntriesOrStore(entries *entriesForPvtDataOfOldBlocks, expiryKey expiryKey) (*ExpiryData, error) { - if expiryData, ok := entries.expiryEntries[expiryKey]; ok { +func (p *oldBlockDataProcessor) getExpiryData(expKey expiryKey) (*ExpiryData, error) { + if expiryData, ok := p.entries.expiryEntries[expKey]; ok { return expiryData, nil } - expiryData, err := p.getExpiryDataOfExpiryKey(&expiryKey) + expData, err := p.db.Get(encodeExpiryKey(&expKey)) if err != nil { return nil, err } - return expiryData, nil + if expData == nil { + return nil, errors.Wrap(err, "error while getting expiry data from the store") + } + return decodeExpiryValue(expData) } -func (p *oldBlockDataProcessor) getMissingDataFromEntriesOrStore(entries *entriesForPvtDataOfOldBlocks, nsCollBlk nsCollBlk) (*bitset.BitSet, error) { - if missingData, ok := entries.missingDataEntries[nsCollBlk]; ok { +func (p *oldBlockDataProcessor) getMissingDataFromPrioritizedList(nsCollBlk nsCollBlk) (*bitset.BitSet, error) { + missingData, ok := p.entries.prioritizedMissingDataEntries[nsCollBlk] + if ok { return missingData, nil } - missingDataKey := &missingDataKey{ - nsCollBlk: nsCollBlk, - isEligible: true, - } - missingData, err := p.getBitmapOfMissingDataKey(missingDataKey) - if err != nil { - return nil, err - } - return missingData, nil + return p.getMissingDataBitmapFromStore(elgPrioritizedMissingDataGroup, nsCollBlk) } -type entriesForPvtDataOfOldBlocks struct { - dataEntries map[dataKey]*rwset.CollectionPvtReadWriteSet - expiryEntries map[expiryKey]*ExpiryData - missingDataEntries map[nsCollBlk]*bitset.BitSet +func (p *oldBlockDataProcessor) getMissingDataFromDeprioritizedList(nsCollBlk nsCollBlk) (*bitset.BitSet, error) { + missingData, ok := p.entries.deprioritizedMissingDataEntries[nsCollBlk] + if ok { + return missingData, nil + } + + return p.getMissingDataBitmapFromStore(elgDeprioritizedMissingDataGroup, nsCollBlk) } -func (e *entriesForPvtDataOfOldBlocks) add(datEntry *dataEntry, expKey expiryKey, expData *ExpiryData, missingData *bitset.BitSet) { - dataKey := dataKey{ - nsCollBlk: datEntry.key.nsCollBlk, - txNum: datEntry.key.txNum, - } - e.dataEntries[dataKey] = datEntry.value +func (p *oldBlockDataProcessor) getMissingDataBitmapFromStore(group []byte, nsCollBlk nsCollBlk) (*bitset.BitSet, error) { + key := encodeElgMissingDataKey( + group, + &missingDataKey{ + nsCollBlk: nsCollBlk, + }, + ) - if expData != nil { - e.expiryEntries[expKey] = expData + missingData, err := p.db.Get(key) + if err != nil { + return nil, errors.Wrap(err, "error while getting missing data bitmap from the store") + } + if missingData == nil { + return nil, nil } - e.missingDataEntries[dataKey.nsCollBlk] = missingData + return decodeMissingDataValue(missingData) } -func (e *entriesForPvtDataOfOldBlocks) addToUpdateBatch(batch *leveldbhelper.UpdateBatch) error { - if err := e.addDataEntriesToUpdateBatch(batch); err != nil { - return err +func (p *oldBlockDataProcessor) constructDBUpdateBatch() (*leveldbhelper.UpdateBatch, error) { + batch := p.db.NewUpdateBatch() + + if err := p.entries.addDataEntriesTo(batch); err != nil { + return nil, errors.WithMessage(err, "error while adding data entries to the update batch") } - if err := e.addExpiryEntriesToUpdateBatch(batch); err != nil { - return err + if err := p.entries.addExpiryEntriesTo(batch); err != nil { + return nil, errors.WithMessage(err, "error while adding expiry entries to the update batch") + } + + if err := p.entries.addMissingDataEntriesTo(batch); err != nil { + return nil, errors.WithMessage(err, "error while adding missing data entries to the update batch") } - return e.addMissingDataEntriesToUpdateBatch(batch) + return batch, nil } -func (e *entriesForPvtDataOfOldBlocks) addDataEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch) error { +type entriesForPvtDataOfOldBlocks struct { + dataEntries map[dataKey]*rwset.CollectionPvtReadWriteSet + expiryEntries map[expiryKey]*ExpiryData + prioritizedMissingDataEntries map[nsCollBlk]*bitset.BitSet + deprioritizedMissingDataEntries map[nsCollBlk]*bitset.BitSet +} + +func (e *entriesForPvtDataOfOldBlocks) addDataEntriesTo(batch *leveldbhelper.UpdateBatch) error { var key, val []byte var err error for dataKey, pvtData := range e.dataEntries { key = encodeDataKey(&dataKey) if val, err = encodeDataValue(pvtData); err != nil { - return err + return errors.Wrap(err, "error while encoding data value") } batch.Put(key, val) } return nil } -func (e *entriesForPvtDataOfOldBlocks) addExpiryEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch) error { +func (e *entriesForPvtDataOfOldBlocks) addExpiryEntriesTo(batch *leveldbhelper.UpdateBatch) error { var key, val []byte var err error for expiryKey, expiryData := range e.expiryEntries { key = encodeExpiryKey(&expiryKey) if val, err = encodeExpiryValue(expiryData); err != nil { - return err + return errors.Wrap(err, "error while encoding expiry value") } batch.Put(key, val) } return nil } -func (e *entriesForPvtDataOfOldBlocks) addMissingDataEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch) error { +func (e *entriesForPvtDataOfOldBlocks) addMissingDataEntriesTo(batch *leveldbhelper.UpdateBatch) error { var key, val []byte var err error - for nsCollBlk, missingData := range e.missingDataEntries { - key = encodeMissingDataKey( - &missingDataKey{ - nsCollBlk: nsCollBlk, - isEligible: true, - }, - ) + entries := map[string]map[nsCollBlk]*bitset.BitSet{ + string(elgPrioritizedMissingDataGroup): e.prioritizedMissingDataEntries, + string(elgDeprioritizedMissingDataGroup): e.deprioritizedMissingDataEntries, + } - if missingData.None() { - batch.Delete(key) - continue - } + for group, missingDataList := range entries { + for nsCollBlk, missingData := range missingDataList { + key = encodeElgMissingDataKey( + []byte(group), + &missingDataKey{ + nsCollBlk: nsCollBlk, + isEligible: true, + }, + ) + + if missingData.None() { + batch.Delete(key) + continue + } - if val, err = encodeMissingDataValue(missingData); err != nil { - return err + if val, err = encodeMissingDataValue(missingData); err != nil { + return errors.Wrap(err, "error while encoding missing data bitmap") + } + batch.Put(key, val) } - batch.Put(key, val) } return nil } diff --git a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go index 93653ebabff..4ff255e29cd 100644 --- a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go +++ b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go @@ -7,196 +7,568 @@ SPDX-License-Identifier: Apache-2.0 package pvtdatastorage import ( + "fmt" "testing" - "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/ledger" btltestutil "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy/testutil" "github.com/stretchr/testify/require" + "github.com/willf/bitset" ) +type blockTxPvtDataInfoForTest struct { + blkNum uint64 + txNum uint64 + pvtDataPresent map[string][]string + pvtDataMissing map[string][]string +} + +type pvtDataForTest struct { + blockNum uint64 + pvtData []*ledger.TxPvtData + dataKeys []*dataKey + missingDataInfo ledger.TxMissingPvtData +} + func TestCommitPvtDataOfOldBlocks(t *testing.T) { btlPolicy := btltestutil.SampleBTLPolicy( map[[2]string]uint64{ - {"ns-1", "coll-1"}: 3, - {"ns-1", "coll-2"}: 1, + {"ns-1", "coll-1"}: 0, + {"ns-1", "coll-2"}: 0, {"ns-2", "coll-1"}: 0, - {"ns-2", "coll-2"}: 1, - {"ns-3", "coll-1"}: 0, - {"ns-3", "coll-2"}: 3, - {"ns-4", "coll-1"}: 0, - {"ns-4", "coll-2"}: 0, + {"ns-2", "coll-2"}: 0, }, ) env := NewTestStoreEnv(t, "TestCommitPvtDataOfOldBlocks", btlPolicy, pvtDataConf()) defer env.Cleanup() store := env.TestStore - testData := []*ledger.TxPvtData{ - produceSamplePvtdata(t, 2, []string{"ns-2:coll-1", "ns-2:coll-2"}), - produceSamplePvtdata(t, 4, []string{"ns-1:coll-1", "ns-1:coll-2", "ns-2:coll-1", "ns-2:coll-2"}), - } - - // CONSTRUCT MISSING DATA FOR BLOCK 1 - blk1MissingData := make(ledger.TxMissingPvtData) - - // eligible missing data in tx1 - blk1MissingData.Add(1, "ns-1", "coll-1", true) - blk1MissingData.Add(1, "ns-1", "coll-2", true) - blk1MissingData.Add(1, "ns-2", "coll-1", true) - blk1MissingData.Add(1, "ns-2", "coll-2", true) - // eligible missing data in tx2 - blk1MissingData.Add(2, "ns-1", "coll-1", true) - blk1MissingData.Add(2, "ns-1", "coll-2", true) - blk1MissingData.Add(2, "ns-3", "coll-1", true) - blk1MissingData.Add(2, "ns-3", "coll-2", true) - - // CONSTRUCT MISSING DATA FOR BLOCK 2 - blk2MissingData := make(ledger.TxMissingPvtData) - // eligible missing data in tx1 - blk2MissingData.Add(1, "ns-1", "coll-1", true) - blk2MissingData.Add(1, "ns-1", "coll-2", true) - // eligible missing data in tx3 - blk2MissingData.Add(3, "ns-1", "coll-1", true) - - // COMMIT BLOCK 0 WITH NO DATA - require.NoError(t, store.Commit(0, nil, nil)) + blockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1", "coll-2"}, + "ns-2": {"coll-1", "coll-2"}, + }, + }, + { + blkNum: 1, + txNum: 2, + pvtDataPresent: map[string][]string{ + "ns-2": {"coll-1", "coll-2"}, + }, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1", "coll-2"}, + }, + }, + { + blkNum: 1, + txNum: 4, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1", "coll-2"}, + "ns-2": {"coll-1", "coll-2"}, + }, + }, + { + blkNum: 2, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1", "coll-2"}, + }, + }, + { + blkNum: 2, + txNum: 3, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + }, + }, + } - // COMMIT BLOCK 1 WITH PVTDATA AND MISSINGDATA - require.NoError(t, store.Commit(1, testData, blk1MissingData)) - - // COMMIT BLOCK 2 WITH PVTDATA AND MISSINGDATA - require.NoError(t, store.Commit(2, nil, blk2MissingData)) - - // CHECK MISSINGDATA ENTRIES ARE CORRECTLY STORED - expectedMissingPvtDataInfo := make(ledger.MissingPvtDataInfo) - // missing data in block1, tx1 - expectedMissingPvtDataInfo.Add(1, 1, "ns-1", "coll-1") - expectedMissingPvtDataInfo.Add(1, 1, "ns-1", "coll-2") - expectedMissingPvtDataInfo.Add(1, 1, "ns-2", "coll-1") - expectedMissingPvtDataInfo.Add(1, 1, "ns-2", "coll-2") - - // missing data in block1, tx2 - expectedMissingPvtDataInfo.Add(1, 2, "ns-1", "coll-1") - expectedMissingPvtDataInfo.Add(1, 2, "ns-1", "coll-2") - expectedMissingPvtDataInfo.Add(1, 2, "ns-3", "coll-1") - expectedMissingPvtDataInfo.Add(1, 2, "ns-3", "coll-2") - - // missing data in block2, tx1 - expectedMissingPvtDataInfo.Add(2, 1, "ns-1", "coll-1") - expectedMissingPvtDataInfo.Add(2, 1, "ns-1", "coll-2") - // missing data in block2, tx3 - expectedMissingPvtDataInfo.Add(2, 3, "ns-1", "coll-1") - - missingPvtDataInfo, err := store.GetMissingPvtDataInfoForMostRecentBlocks(2) - require.NoError(t, err) - require.Equal(t, expectedMissingPvtDataInfo, missingPvtDataInfo) + blocksPvtData, missingDataSummary := constructPvtDataForTest(t, blockTxPvtDataInfo) - // COMMIT THE MISSINGDATA IN BLOCK 1 AND BLOCK 2 - oldBlocksPvtData := make(map[uint64][]*ledger.TxPvtData) - oldBlocksPvtData[1] = []*ledger.TxPvtData{ - produceSamplePvtdata(t, 1, []string{"ns-1:coll-1", "ns-2:coll-1"}), - produceSamplePvtdata(t, 2, []string{"ns-1:coll-1", "ns-3:coll-1"}), - } - oldBlocksPvtData[2] = []*ledger.TxPvtData{ - produceSamplePvtdata(t, 3, []string{"ns-1:coll-1"}), + require.NoError(t, store.Commit(0, nil, nil)) + require.NoError(t, store.Commit(1, blocksPvtData[1].pvtData, blocksPvtData[1].missingDataInfo)) + require.NoError(t, store.Commit(2, blocksPvtData[2].pvtData, blocksPvtData[2].missingDataInfo)) + + assertMissingDataInfo(t, store, missingDataSummary, 2) + + // COMMIT some of the missing data in the block 1 and block 2 + oldBlockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-2"}, + "ns-2": {"coll-2"}, + }, + }, + { + blkNum: 1, + txNum: 2, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + }, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-2"}, + }, + }, + { + blkNum: 2, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1", "coll-2"}, + }, + }, + { + blkNum: 2, + txNum: 3, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + }, + }, } - err = store.CommitPvtDataOfOldBlocks(oldBlocksPvtData) - require.NoError(t, err) + blocksPvtData, missingDataSummary = constructPvtDataForTest(t, oldBlockTxPvtDataInfo) + oldBlocksPvtData := map[uint64][]*ledger.TxPvtData{ + 1: blocksPvtData[1].pvtData, + 2: blocksPvtData[2].pvtData, + } + require.NoError(t, store.CommitPvtDataOfOldBlocks(oldBlocksPvtData, nil)) - // ENSURE THAT THE PREVIOUSLY MISSING PVTDATA OF BLOCK 1 & 2 EXIST IN THE STORE - ns1Coll1Blk1Tx1 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: 1}, txNum: 1} - ns2Coll1Blk1Tx1 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-2", coll: "coll-1", blkNum: 1}, txNum: 1} - ns1Coll1Blk1Tx2 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: 1}, txNum: 2} - ns3Coll1Blk1Tx2 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-3", coll: "coll-1", blkNum: 1}, txNum: 2} - ns1Coll1Blk2Tx3 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: 2}, txNum: 3} - - require.True(t, testDataKeyExists(t, store, ns1Coll1Blk1Tx1)) - require.True(t, testDataKeyExists(t, store, ns2Coll1Blk1Tx1)) - require.True(t, testDataKeyExists(t, store, ns1Coll1Blk1Tx2)) - require.True(t, testDataKeyExists(t, store, ns3Coll1Blk1Tx2)) - require.True(t, testDataKeyExists(t, store, ns1Coll1Blk2Tx3)) - - // pvt data retrieval for block 2 should return the just committed pvtdata - var nilFilter ledger.PvtNsCollFilter - retrievedData, err := store.GetPvtDataByBlockNum(2, nilFilter) - require.NoError(t, err) - for i, data := range retrievedData { - require.Equal(t, data.SeqInBlock, oldBlocksPvtData[2][i].SeqInBlock) - require.True(t, proto.Equal(data.WriteSet, oldBlocksPvtData[2][i].WriteSet)) + for _, b := range blocksPvtData { + for _, dkey := range b.dataKeys { + require.True(t, testDataKeyExists(t, store, dkey)) + } } + assertMissingDataInfo(t, store, missingDataSummary, 2) +} + +func TestCommitPvtDataOfOldBlocksWithBTL(t *testing.T) { + btlPolicy := btltestutil.SampleBTLPolicy( + map[[2]string]uint64{ + {"ns-1", "coll-1"}: 1, + {"ns-2", "coll-1"}: 1, + }, + ) + env := NewTestStoreEnv(t, "TestCommitPvtDataOfOldBlocksWithBTL", btlPolicy, pvtDataConf()) + defer env.Cleanup() + store := env.TestStore - expectedMissingPvtDataInfo = make(ledger.MissingPvtDataInfo) - // missing data in block1, tx1 - expectedMissingPvtDataInfo.Add(1, 1, "ns-1", "coll-2") - expectedMissingPvtDataInfo.Add(1, 1, "ns-2", "coll-2") + blockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 1, + txNum: 2, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 1, + txNum: 3, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + } - // missing data in block1, tx2 - expectedMissingPvtDataInfo.Add(1, 2, "ns-1", "coll-2") - expectedMissingPvtDataInfo.Add(1, 2, "ns-3", "coll-2") + blocksPvtData, missingDataSummary := constructPvtDataForTest(t, blockTxPvtDataInfo) - // missing data in block2, tx1 - expectedMissingPvtDataInfo.Add(2, 1, "ns-1", "coll-1") - expectedMissingPvtDataInfo.Add(2, 1, "ns-1", "coll-2") + require.NoError(t, store.Commit(0, nil, nil)) + require.NoError(t, store.Commit(1, blocksPvtData[1].pvtData, blocksPvtData[1].missingDataInfo)) - missingPvtDataInfo, err = store.GetMissingPvtDataInfoForMostRecentBlocks(2) - require.NoError(t, err) - require.Equal(t, expectedMissingPvtDataInfo, missingPvtDataInfo) + assertMissingDataInfo(t, store, missingDataSummary, 1) - // COMMIT BLOCK 3 WITH NO PVTDATA + // COMMIT BLOCK 2 & 3 WITH NO PVTDATA + require.NoError(t, store.Commit(2, nil, nil)) require.NoError(t, store.Commit(3, nil, nil)) - // IN BLOCK 1, NS-1:COLL-2 AND NS-2:COLL-2 SHOULD HAVE EXPIRED BUT NOT PURGED - // HENCE, THE FOLLOWING COMMIT SHOULD CREATE ENTRIES IN THE STORE - oldBlocksPvtData = make(map[uint64][]*ledger.TxPvtData) - oldBlocksPvtData[1] = []*ledger.TxPvtData{ - produceSamplePvtdata(t, 1, []string{"ns-1:coll-2"}), // though expired, it - // would get committed to the store as it is not purged yet - produceSamplePvtdata(t, 2, []string{"ns-3:coll-2"}), // never expires + // in block 1, ns-1:coll-1 and ns-2:coll-2 should have expired but not purged. + // hence, the commit of pvtdata of block 1 transaction 1 should create entries + // in the store + oldBlockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, } - err = store.CommitPvtDataOfOldBlocks(oldBlocksPvtData) - require.NoError(t, err) - - ns1Coll2Blk1Tx1 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-2", blkNum: 1}, txNum: 1} - ns2Coll2Blk1Tx1 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-2", coll: "coll-2", blkNum: 1}, txNum: 1} - ns1Coll2Blk1Tx2 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-2", blkNum: 1}, txNum: 2} - ns3Coll2Blk1Tx2 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-3", coll: "coll-2", blkNum: 1}, txNum: 2} + blocksPvtData, _ = constructPvtDataForTest(t, oldBlockTxPvtDataInfo) + oldBlocksPvtData := map[uint64][]*ledger.TxPvtData{ + 1: blocksPvtData[1].pvtData, + } + require.NoError(t, store.CommitPvtDataOfOldBlocks(oldBlocksPvtData, nil)) - // though the pvtdata are expired but not purged yet, we do - // commit the data and hence the entries would exist in the - // store - require.True(t, testDataKeyExists(t, store, ns1Coll2Blk1Tx1)) // expired but committed - require.False(t, testDataKeyExists(t, store, ns2Coll2Blk1Tx1)) // expired but still missing - require.False(t, testDataKeyExists(t, store, ns1Coll2Blk1Tx2)) // expired still missing - require.True(t, testDataKeyExists(t, store, ns3Coll2Blk1Tx2)) // never expires + for _, b := range blocksPvtData { + for _, dkey := range b.dataKeys { + require.True(t, testDataKeyExists(t, store, dkey)) + } + } + // as all missing data are expired, get missing info would return nil though + // it is not purged yet + assertMissingDataInfo(t, store, make(ledger.MissingPvtDataInfo), 1) - // COMMIT BLOCK 4 WITH NO PVTDATA + // while committing the next block, all entries related to expiry data are purged require.NoError(t, store.Commit(4, nil, nil)) testWaitForPurgerRoutineToFinish(store) + for _, b := range blocksPvtData { + for _, dkey := range b.dataKeys { + require.False(t, testDataKeyExists(t, store, dkey)) + } + } - // IN BLOCK 1, NS-1:COLL-2 AND NS-2:COLL-2 SHOULD HAVE EXPIRED BUT NOT PURGED - // HENCE, THE FOLLOWING COMMIT SHOULD NOT CREATE ENTRIES IN THE STORE - oldBlocksPvtData = make(map[uint64][]*ledger.TxPvtData) - oldBlocksPvtData[1] = []*ledger.TxPvtData{ - // both data are expired and purged. hence, it won't be - // committed to the store - produceSamplePvtdata(t, 1, []string{"ns-2:coll-2"}), - produceSamplePvtdata(t, 2, []string{"ns-1:coll-2"}), + // in block 1, ns-1:coll-1 and ns-2:coll-2 should have expired and purged. + // hence, the commit of pvtdata of block 1 transaction 2 should not create + // entries in the store + oldBlockTxPvtDataInfo = []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 2, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + } + oldBlocksPvtData = map[uint64][]*ledger.TxPvtData{ + 1: blocksPvtData[1].pvtData, + } + deprioritizedList := ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 3: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + { + Namespace: "ns-2", + Collection: "coll-1", + }, + }, + }, } + require.NoError(t, store.CommitPvtDataOfOldBlocks(oldBlocksPvtData, deprioritizedList)) - err = store.CommitPvtDataOfOldBlocks(oldBlocksPvtData) - require.NoError(t, err) + for _, b := range blocksPvtData { + for _, dkey := range b.dataKeys { + require.False(t, testDataKeyExists(t, store, dkey)) + } + } + // deprioritized list should not be present + p := &oldBlockDataProcessor{ + Store: store, + } + keys := []nsCollBlk{ + { + ns: "ns-1", + coll: "coll-1", + blkNum: 1, + }, + { + ns: "ns-2", + coll: "coll-1", + blkNum: 1, + }, + } - ns1Coll2Blk1Tx1 = &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-2", blkNum: 1}, txNum: 1} - ns2Coll2Blk1Tx1 = &dataKey{nsCollBlk: nsCollBlk{ns: "ns-2", coll: "coll-2", blkNum: 1}, txNum: 1} - ns1Coll2Blk1Tx2 = &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-2", blkNum: 1}, txNum: 2} - ns3Coll2Blk1Tx2 = &dataKey{nsCollBlk: nsCollBlk{ns: "ns-3", coll: "coll-2", blkNum: 1}, txNum: 2} + for _, k := range keys { + bitmap, err := p.getMissingDataBitmapFromStore(elgDeprioritizedMissingDataGroup, k) + require.NoError(t, err) + require.Nil(t, bitmap) + } +} - require.False(t, testDataKeyExists(t, store, ns1Coll2Blk1Tx1)) // purged - require.False(t, testDataKeyExists(t, store, ns2Coll2Blk1Tx1)) // purged - require.False(t, testDataKeyExists(t, store, ns1Coll2Blk1Tx2)) // purged - require.True(t, testDataKeyExists(t, store, ns3Coll2Blk1Tx2)) // never expires +func TestCommitPvtDataOfOldBlocksWithDeprioritization(t *testing.T) { + sampleDataForTest := func() (map[uint64]*pvtDataForTest, ledger.MissingPvtDataInfo) { + blockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 1, + txNum: 2, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 2, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 2, + txNum: 2, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + } + + blocksPvtData, missingDataSummary := constructPvtDataForTest(t, blockTxPvtDataInfo) + + return blocksPvtData, missingDataSummary + } + + _, missingDataSummary := sampleDataForTest() + + tests := []struct { + name string + deprioritizedList ledger.MissingPvtDataInfo + expectedPrioMissingDataKeys ledger.MissingPvtDataInfo + }{ + { + name: "all keys deprioritized", + deprioritizedList: missingDataSummary, + expectedPrioMissingDataKeys: make(ledger.MissingPvtDataInfo), + }, + { + name: "some keys deprioritized", + deprioritizedList: ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 2: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + }, + }, + 2: ledger.MissingBlockPvtdataInfo{ + 2: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + }, + }, + }, + expectedPrioMissingDataKeys: ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 1: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + { + Namespace: "ns-2", + Collection: "coll-1", + }, + }, + 2: { + { + Namespace: "ns-2", + Collection: "coll-1", + }, + }, + }, + 2: ledger.MissingBlockPvtdataInfo{ + 1: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + { + Namespace: "ns-2", + Collection: "coll-1", + }, + }, + 2: { + { + Namespace: "ns-2", + Collection: "coll-1", + }, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + btlPolicy := btltestutil.SampleBTLPolicy( + map[[2]string]uint64{ + {"ns-1", "coll-1"}: 0, + {"ns-2", "coll-1"}: 0, + }, + ) + env := NewTestStoreEnv(t, "TestCommitPvtDataOfOldBlocksWithDeprio", btlPolicy, pvtDataConf()) + defer env.Cleanup() + store := env.TestStore + + blocksPvtData, missingDataSummary := sampleDataForTest() + + // COMMIT BLOCK 0 WITH NO DATA + require.NoError(t, store.Commit(0, nil, nil)) + require.NoError(t, store.Commit(1, blocksPvtData[1].pvtData, blocksPvtData[1].missingDataInfo)) + require.NoError(t, store.Commit(2, blocksPvtData[2].pvtData, blocksPvtData[2].missingDataInfo)) + + assertMissingDataInfo(t, store, missingDataSummary, 2) + + require.NoError(t, store.CommitPvtDataOfOldBlocks(nil, tt.deprioritizedList)) + + prioMissingData, err := store.getMissingData(elgPrioritizedMissingDataGroup, 3) + require.NoError(t, err) + require.Equal(t, len(tt.expectedPrioMissingDataKeys), len(prioMissingData)) + for blkNum, txsMissingData := range tt.expectedPrioMissingDataKeys { + for txNum, expectedMissingData := range txsMissingData { + require.ElementsMatch(t, expectedMissingData, prioMissingData[blkNum][txNum]) + } + } + + deprioMissingData, err := store.getMissingData(elgDeprioritizedMissingDataGroup, 3) + require.NoError(t, err) + require.Equal(t, len(tt.deprioritizedList), len(deprioMissingData)) + for blkNum, txsMissingData := range tt.deprioritizedList { + for txNum, expectedMissingData := range txsMissingData { + require.ElementsMatch(t, expectedMissingData, deprioMissingData[blkNum][txNum]) + } + } + + oldBlockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 1, + txNum: 2, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 2, + txNum: 1, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 2, + txNum: 2, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + } + + blocksPvtData, _ = constructPvtDataForTest(t, oldBlockTxPvtDataInfo) + oldBlocksPvtData := map[uint64][]*ledger.TxPvtData{ + 1: blocksPvtData[1].pvtData, + 2: blocksPvtData[2].pvtData, + } + require.NoError(t, store.CommitPvtDataOfOldBlocks(oldBlocksPvtData, nil)) + + prioMissingData, err = store.getMissingData(elgPrioritizedMissingDataGroup, 3) + require.NoError(t, err) + require.Equal(t, make(ledger.MissingPvtDataInfo), prioMissingData) + + deprioMissingData, err = store.getMissingData(elgDeprioritizedMissingDataGroup, 3) + require.NoError(t, err) + require.Equal(t, make(ledger.MissingPvtDataInfo), deprioMissingData) + }) + } +} + +func constructPvtDataForTest(t *testing.T, blockInfo []*blockTxPvtDataInfoForTest) (map[uint64]*pvtDataForTest, ledger.MissingPvtDataInfo) { + blocksPvtData := make(map[uint64]*pvtDataForTest) + missingPvtDataInfoSummary := make(ledger.MissingPvtDataInfo) + + for _, b := range blockInfo { + p, ok := blocksPvtData[b.blkNum] + if !ok { + p = &pvtDataForTest{ + missingDataInfo: make(ledger.TxMissingPvtData), + } + blocksPvtData[b.blkNum] = p + } + + for ns, colls := range b.pvtDataMissing { + for _, coll := range colls { + p.missingDataInfo.Add(b.txNum, ns, coll, true) + missingPvtDataInfoSummary.Add(b.blkNum, b.txNum, ns, coll) + } + } + + var nsColls []string + for ns, colls := range b.pvtDataPresent { + for _, coll := range colls { + nsColls = append(nsColls, fmt.Sprintf("%s:%s", ns, coll)) + p.dataKeys = append(p.dataKeys, &dataKey{ + nsCollBlk: nsCollBlk{ + ns: ns, + coll: coll, + blkNum: b.blkNum, + }, + txNum: b.txNum, + }) + } + } + + if len(nsColls) == 0 { + continue + } + p.pvtData = append( + p.pvtData, + produceSamplePvtdata(t, b.txNum, nsColls), + ) + } + + return blocksPvtData, missingPvtDataInfoSummary +} + +func assertMissingDataInfo(t *testing.T, store *Store, expected ledger.MissingPvtDataInfo, numRecentBlocks int) { + missingPvtDataInfo, err := store.GetMissingPvtDataInfoForMostRecentBlocks(numRecentBlocks) + require.NoError(t, err) + require.Equal(t, len(expected), len(missingPvtDataInfo)) + for blkNum, txsMissingData := range expected { + for txNum, expectedMissingData := range txsMissingData { + require.ElementsMatch(t, expectedMissingData, missingPvtDataInfo[blkNum][txNum]) + } + } +} + +func constructBitSetForTest(txNums ...uint) *bitset.BitSet { + bitmap := &bitset.BitSet{} + for _, txNum := range txNums { + bitmap.Set(txNum) + } + return bitmap } diff --git a/core/ledger/pvtdatastorage/store.go b/core/ledger/pvtdatastorage/store.go index 49c80a3b0b2..45e87e31767 100644 --- a/core/ledger/pvtdatastorage/store.go +++ b/core/ledger/pvtdatastorage/store.go @@ -63,6 +63,9 @@ type Store struct { // in the stateDB needs to be updated before finishing the // recovery operation. isLastUpdatedOldBlocksSet bool + + iterSinceDeprioMissingDataAccess int + deprioritizedMissingDataPeriodicity int } type blkTranNumKey []byte @@ -140,6 +143,7 @@ func (p *Provider) OpenStore(ledgerid string) (*Store, error) { return nil, err } s.launchCollElgProc() + s.deprioritizedMissingDataPeriodicity = 1000 logger.Debugf("Pvtdata store opened. Initial state: isEmpty [%t], lastCommittedBlock [%d]", s.isEmpty, s.lastCommittedBlock) return s, nil @@ -212,7 +216,7 @@ func (s *Store) Commit(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvtD batch := s.db.NewUpdateBatch() var err error - var keyBytes, valBytes []byte + var key, val []byte storeEntries, err := prepareStoreEntries(blockNum, pvtData, s.btlPolicy, missingPvtData) if err != nil { @@ -220,27 +224,32 @@ func (s *Store) Commit(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvtD } for _, dataEntry := range storeEntries.dataEntries { - keyBytes = encodeDataKey(dataEntry.key) - if valBytes, err = encodeDataValue(dataEntry.value); err != nil { + key = encodeDataKey(dataEntry.key) + if val, err = encodeDataValue(dataEntry.value); err != nil { return err } - batch.Put(keyBytes, valBytes) + batch.Put(key, val) } for _, expiryEntry := range storeEntries.expiryEntries { - keyBytes = encodeExpiryKey(expiryEntry.key) - if valBytes, err = encodeExpiryValue(expiryEntry.value); err != nil { + key = encodeExpiryKey(expiryEntry.key) + if val, err = encodeExpiryValue(expiryEntry.value); err != nil { return err } - batch.Put(keyBytes, valBytes) + batch.Put(key, val) } for missingDataKey, missingDataValue := range storeEntries.missingDataEntries { - keyBytes = encodeMissingDataKey(&missingDataKey) - if valBytes, err = encodeMissingDataValue(missingDataValue); err != nil { + switch { + case missingDataKey.isEligible: + key = encodeElgMissingDataKey(elgPrioritizedMissingDataGroup, &missingDataKey) + default: + key = encodeInelgMissingDataKey(&missingDataKey) + } + if val, err = encodeMissingDataValue(missingDataValue); err != nil { return err } - batch.Put(keyBytes, valBytes) + batch.Put(key, val) } committingBlockNum := s.nextBlockNum() @@ -403,16 +412,27 @@ func (s *Store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.M return nil, nil } + if s.iterSinceDeprioMissingDataAccess == s.deprioritizedMissingDataPeriodicity { + s.iterSinceDeprioMissingDataAccess = 0 + return s.getMissingData(elgDeprioritizedMissingDataGroup, maxBlock) + } + + s.iterSinceDeprioMissingDataAccess++ + return s.getMissingData(elgPrioritizedMissingDataGroup, maxBlock) +} + +func (s *Store) getMissingData(group []byte, maxBlock int) (ledger.MissingPvtDataInfo, error) { missingPvtDataInfo := make(ledger.MissingPvtDataInfo) numberOfBlockProcessed := 0 lastProcessedBlock := uint64(0) isMaxBlockLimitReached := false + // as we are not acquiring a read lock, new blocks can get committed while we // construct the MissingPvtDataInfo. As a result, lastCommittedBlock can get // changed. To ensure consistency, we atomically load the lastCommittedBlock value lastCommittedBlock := atomic.LoadUint64(&s.lastCommittedBlock) - startKey, endKey := createRangeScanKeysForEligibleMissingDataEntries(lastCommittedBlock) + startKey, endKey := createRangeScanKeysForElgMissingData(lastCommittedBlock, group) dbItr, err := s.db.GetIterator(startKey, endKey) if err != nil { return nil, err @@ -421,7 +441,7 @@ func (s *Store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.M for dbItr.Next() { missingDataKeyBytes := dbItr.Key() - missingDataKey := decodeMissingDataKey(missingDataKeyBytes) + missingDataKey := decodeElgMissingDataKey(missingDataKeyBytes) if isMaxBlockLimitReached && (missingDataKey.blkNum != lastProcessedBlock) { // ensures that exactly maxBlock number @@ -512,26 +532,38 @@ func (s *Store) performPurgeIfScheduled(latestCommittedBlk uint64) { } func (s *Store) purgeExpiredData(minBlkNum, maxBlkNum uint64) error { - batch := s.db.NewUpdateBatch() expiryEntries, err := s.retrieveExpiryEntries(minBlkNum, maxBlkNum) if err != nil || len(expiryEntries) == 0 { return err } + + batch := s.db.NewUpdateBatch() for _, expiryEntry := range expiryEntries { - // this encoding could have been saved if the function retrieveExpiryEntries also returns the encoded expiry keys. - // However, keeping it for better readability batch.Delete(encodeExpiryKey(expiryEntry.key)) dataKeys, missingDataKeys := deriveKeys(expiryEntry) + for _, dataKey := range dataKeys { batch.Delete(encodeDataKey(dataKey)) } + for _, missingDataKey := range missingDataKeys { - batch.Delete(encodeMissingDataKey(missingDataKey)) + batch.Delete( + encodeElgMissingDataKey(elgPrioritizedMissingDataGroup, missingDataKey), + ) + batch.Delete( + encodeElgMissingDataKey(elgDeprioritizedMissingDataGroup, missingDataKey), + ) + batch.Delete( + encodeInelgMissingDataKey(missingDataKey), + ) } + if err := s.db.WriteBatch(batch, false); err != nil { return err } + batch.Reset() } + logger.Infof("[%s] - [%d] Entries purged from private data storage till block number [%d]", s.ledgerid, len(expiryEntries), maxBlkNum) return nil } @@ -606,7 +638,7 @@ func (s *Store) processCollElgEvents() error { var coll string for _, coll = range colls.Entries { logger.Infof("Converting missing data entries from ineligible to eligible for [ns=%s, coll=%s]", ns, coll) - startKey, endKey := createRangeScanKeysForIneligibleMissingData(blkNum, ns, coll) + startKey, endKey := createRangeScanKeysForInelgMissingData(blkNum, ns, coll) collItr, err := s.db.GetIterator(startKey, endKey) if err != nil { return err @@ -615,12 +647,15 @@ func (s *Store) processCollElgEvents() error { for collItr.Next() { // each entry originalKey, originalVal := collItr.Key(), collItr.Value() - modifiedKey := decodeMissingDataKey(originalKey) + modifiedKey := decodeInelgMissingDataKey(originalKey) modifiedKey.isEligible = true batch.Delete(originalKey) copyVal := make([]byte, len(originalVal)) copy(copyVal, originalVal) - batch.Put(encodeMissingDataKey(modifiedKey), copyVal) + batch.Put( + encodeElgMissingDataKey(elgPrioritizedMissingDataGroup, modifiedKey), + copyVal, + ) collEntriesConverted++ if batch.Len() > s.maxBatchSize { s.db.WriteBatch(batch, true) @@ -711,30 +746,6 @@ func (c *collElgProcSync) waitForDone() { <-c.procComplete } -func (s *Store) getBitmapOfMissingDataKey(missingDataKey *missingDataKey) (*bitset.BitSet, error) { - var v []byte - var err error - if v, err = s.db.Get(encodeMissingDataKey(missingDataKey)); err != nil { - return nil, err - } - if v == nil { - return nil, nil - } - return decodeMissingDataValue(v) -} - -func (s *Store) getExpiryDataOfExpiryKey(expiryKey *expiryKey) (*ExpiryData, error) { - var v []byte - var err error - if v, err = s.db.Get(encodeExpiryKey(expiryKey)); err != nil { - return nil, err - } - if v == nil { - return nil, nil - } - return decodeExpiryValue(v) -} - // ErrIllegalCall is to be thrown by a store impl if the store does not expect a call to Prepare/Commit/Rollback/InitLastCommittedBlock type ErrIllegalCall struct { msg string diff --git a/core/ledger/pvtdatastorage/store_test.go b/core/ledger/pvtdatastorage/store_test.go index 6f4871fe3e2..52e5d195cd4 100644 --- a/core/ledger/pvtdatastorage/store_test.go +++ b/core/ledger/pvtdatastorage/store_test.go @@ -191,6 +191,72 @@ func TestStoreIteratorError(t *testing.T) { }) } +func TestGetMissingDataInfo(t *testing.T) { + ledgerid := "TestGetMissingDataInfoFromPrioDeprioList" + btlPolicy := btltestutil.SampleBTLPolicy( + map[[2]string]uint64{ + {"ns-1", "coll-1"}: 0, + {"ns-1", "coll-2"}: 0, + }, + ) + env := NewTestStoreEnv(t, ledgerid, btlPolicy, pvtDataConf()) + defer env.Cleanup() + store := env.TestStore + + // construct missing data for block 1 + blk1MissingData := make(ledger.TxMissingPvtData) + blk1MissingData.Add(1, "ns-1", "coll-1", true) + blk1MissingData.Add(1, "ns-1", "coll-2", true) + + require.NoError(t, store.Commit(0, nil, nil)) + require.NoError(t, store.Commit(1, nil, blk1MissingData)) + + deprioritizedList := ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 1: { + { + Namespace: "ns-1", + Collection: "coll-2", + }, + }, + }, + } + require.NoError(t, store.CommitPvtDataOfOldBlocks(nil, deprioritizedList)) + + expectedPrioMissingDataInfo := ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 1: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + }, + }, + } + + expectedDeprioMissingDataInfo := ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 1: { + { + Namespace: "ns-1", + Collection: "coll-2", + }, + }, + }, + } + + store.deprioritizedMissingDataPeriodicity = 10 + for i := 1; i <= 55; i++ { + if i%11 == 0 { + // after ever 10 iterations of accessing the prioritized list, the + // deprioritized list would be accessed + assertMissingDataInfo(t, store, expectedDeprioMissingDataInfo, 2) + continue + } + assertMissingDataInfo(t, store, expectedPrioMissingDataInfo, 2) + } +} + func TestExpiryDataNotIncluded(t *testing.T) { ledgerid := "TestExpiryDataNotIncluded" btlPolicy := btltestutil.SampleBTLPolicy( @@ -339,6 +405,9 @@ func TestStorePurge(t *testing.T) { // eligible missing data in tx1 blk1MissingData.Add(1, "ns-1", "coll-1", true) blk1MissingData.Add(1, "ns-1", "coll-2", true) + // eligible missing data in tx3 + blk1MissingData.Add(3, "ns-1", "coll-1", true) + blk1MissingData.Add(3, "ns-1", "coll-2", true) // ineligible missing data in tx4 blk1MissingData.Add(4, "ns-3", "coll-1", false) blk1MissingData.Add(4, "ns-3", "coll-2", false) @@ -368,11 +437,27 @@ func TestStorePurge(t *testing.T) { require.True(t, testDataKeyExists(t, s, ns1Coll1)) require.True(t, testDataKeyExists(t, s, ns2Coll2)) - require.True(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD)) - require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD)) + require.True(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD, false)) + require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD, false)) + + require.True(t, testMissingDataKeyExists(t, s, ns3Coll1inelgMD, false)) + require.True(t, testMissingDataKeyExists(t, s, ns3Coll2inelgMD, false)) - require.True(t, testMissingDataKeyExists(t, s, ns3Coll1inelgMD)) - require.True(t, testMissingDataKeyExists(t, s, ns3Coll2inelgMD)) + deprioritizedList := ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 3: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + { + Namespace: "ns-1", + Collection: "coll-2", + }, + }, + }, + } + require.NoError(t, s.CommitPvtDataOfOldBlocks(nil, deprioritizedList)) // write pvt data for block 3 require.NoError(t, s.Commit(3, nil, nil)) @@ -381,11 +466,14 @@ func TestStorePurge(t *testing.T) { require.True(t, testDataKeyExists(t, s, ns1Coll1)) require.True(t, testDataKeyExists(t, s, ns2Coll2)) // eligible missingData entries for ns-1:coll-1, ns-1:coll-2 (neverExpires) should exist in store - require.True(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD)) - require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD)) + require.True(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD, false)) + require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD, false)) + // some transactions which miss ns-1:coll-1 and ns-1:coll-2 has be moved to deprioritizedList list + require.True(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD, true)) + require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD, true)) // ineligible missingData entries for ns-3:col-1, ns-3:coll-2 (neverExpires) should exist in store - require.True(t, testMissingDataKeyExists(t, s, ns3Coll1inelgMD)) - require.True(t, testMissingDataKeyExists(t, s, ns3Coll2inelgMD)) + require.True(t, testMissingDataKeyExists(t, s, ns3Coll1inelgMD, false)) + require.True(t, testMissingDataKeyExists(t, s, ns3Coll2inelgMD, false)) // write pvt data for block 4 require.NoError(t, s.Commit(4, nil, nil)) @@ -395,11 +483,13 @@ func TestStorePurge(t *testing.T) { require.False(t, testDataKeyExists(t, s, ns1Coll1)) require.True(t, testDataKeyExists(t, s, ns2Coll2)) // eligible missingData entries for ns-1:coll-1 should have expired and ns-1:coll-2 (neverExpires) should exist in store - require.False(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD)) - require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD)) + require.False(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD, false)) + require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD, false)) + require.False(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD, true)) + require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD, true)) // ineligible missingData entries for ns-3:col-1 should have expired and ns-3:coll-2 (neverExpires) should exist in store - require.False(t, testMissingDataKeyExists(t, s, ns3Coll1inelgMD)) - require.True(t, testMissingDataKeyExists(t, s, ns3Coll2inelgMD)) + require.False(t, testMissingDataKeyExists(t, s, ns3Coll1inelgMD, false)) + require.True(t, testMissingDataKeyExists(t, s, ns3Coll2inelgMD, false)) // write pvt data for block 5 require.NoError(t, s.Commit(5, nil, nil)) @@ -689,9 +779,17 @@ func testDataKeyExists(t *testing.T, s *Store, dataKey *dataKey) bool { return len(val) != 0 } -func testMissingDataKeyExists(t *testing.T, s *Store, missingDataKey *missingDataKey) bool { - dataKeyBytes := encodeMissingDataKey(missingDataKey) - val, err := s.db.Get(dataKeyBytes) +func testMissingDataKeyExists(t *testing.T, s *Store, missingDataKey *missingDataKey, deprioritized bool) bool { + var key []byte + switch { + case missingDataKey.isEligible && !deprioritized: + key = encodeElgMissingDataKey(elgPrioritizedMissingDataGroup, missingDataKey) + case missingDataKey.isEligible && deprioritized: + key = encodeElgMissingDataKey(elgDeprioritizedMissingDataGroup, missingDataKey) + default: + key = encodeInelgMissingDataKey(missingDataKey) + } + val, err := s.db.Get(key) require.NoError(t, err) return len(val) != 0 } diff --git a/core/ledger/pvtdatastorage/test_exports.go b/core/ledger/pvtdatastorage/test_exports.go index caf36a06c6a..d7bfec4c89b 100644 --- a/core/ledger/pvtdatastorage/test_exports.go +++ b/core/ledger/pvtdatastorage/test_exports.go @@ -69,6 +69,8 @@ func (env *StoreEnv) CloseAndReopen() { // Cleanup cleansup the store env after testing func (env *StoreEnv) Cleanup() { + env.TestStoreProvider.Close() + env.TestStore.db.Close() if err := os.RemoveAll(env.conf.StorePath); err != nil { env.t.Errorf("error while removing path %s, %v", env.conf.StorePath, err) } From 422e2540a3bc2358728475090fc9ced7236e1308 Mon Sep 17 00:00:00 2001 From: senthil Date: Mon, 31 Aug 2020 18:45:51 +0530 Subject: [PATCH 2/4] address review comment Signed-off-by: senthil --- core/ledger/pvtdatastorage/helper.go | 50 ++++++++--- core/ledger/pvtdatastorage/kv_encoding.go | 25 ++++-- .../ledger/pvtdatastorage/kv_encoding_test.go | 13 ++- .../reconcile_missing_pvtdata.go | 89 +++++++++++-------- .../reconcile_missing_pvtdata_test.go | 5 +- core/ledger/pvtdatastorage/store.go | 51 +++++++---- core/ledger/pvtdatastorage/store_test.go | 70 ++++++++------- 7 files changed, 184 insertions(+), 119 deletions(-) diff --git a/core/ledger/pvtdatastorage/helper.go b/core/ledger/pvtdatastorage/helper.go index 24e4519b9b1..446aed9436e 100644 --- a/core/ledger/pvtdatastorage/helper.go +++ b/core/ledger/pvtdatastorage/helper.go @@ -19,17 +19,19 @@ func prepareStoreEntries(blockNum uint64, pvtData []*ledger.TxPvtData, btlPolicy missingPvtData ledger.TxMissingPvtData) (*storeEntries, error) { dataEntries := prepareDataEntries(blockNum, pvtData) - missingDataEntries := prepareMissingDataEntries(blockNum, missingPvtData) + elgMissingDataEntries, inelgMissingDataEntries := prepareMissingDataEntries(blockNum, missingPvtData) - expiryEntries, err := prepareExpiryEntries(blockNum, dataEntries, missingDataEntries, btlPolicy) + expiryEntries, err := prepareExpiryEntries(blockNum, dataEntries, elgMissingDataEntries, inelgMissingDataEntries, btlPolicy) if err != nil { return nil, err } return &storeEntries{ - dataEntries: dataEntries, - expiryEntries: expiryEntries, - missingDataEntries: missingDataEntries}, nil + dataEntries: dataEntries, + expiryEntries: expiryEntries, + elgMissingDataEntries: elgMissingDataEntries, + inelgMissingDataEntries: inelgMissingDataEntries, + }, nil } func prepareDataEntries(blockNum uint64, pvtData []*ledger.TxPvtData) []*dataEntry { @@ -48,13 +50,29 @@ func prepareDataEntries(blockNum uint64, pvtData []*ledger.TxPvtData) []*dataEnt return dataEntries } -func prepareMissingDataEntries(committingBlk uint64, missingPvtData ledger.TxMissingPvtData) map[missingDataKey]*bitset.BitSet { - missingDataEntries := make(map[missingDataKey]*bitset.BitSet) +func prepareMissingDataEntries( + committingBlk uint64, + missingPvtData ledger.TxMissingPvtData, +) (map[missingDataKey]*bitset.BitSet, map[missingDataKey]*bitset.BitSet) { + elgMissingDataEntries := make(map[missingDataKey]*bitset.BitSet) + inelgMissingDataEntries := make(map[missingDataKey]*bitset.BitSet) + + var missingDataEntries map[missingDataKey]*bitset.BitSet for txNum, missingData := range missingPvtData { for _, nsColl := range missingData { - key := missingDataKey{nsCollBlk{nsColl.Namespace, nsColl.Collection, committingBlk}, - nsColl.IsEligible} + key := missingDataKey{ + nsCollBlk{ + ns: nsColl.Namespace, + coll: nsColl.Collection, + blkNum: committingBlk, + }, + } + + missingDataEntries = elgMissingDataEntries + if !nsColl.IsEligible { + missingDataEntries = inelgMissingDataEntries + } if _, ok := missingDataEntries[key]; !ok { missingDataEntries[key] = &bitset.BitSet{} @@ -65,25 +83,29 @@ func prepareMissingDataEntries(committingBlk uint64, missingPvtData ledger.TxMis } } - return missingDataEntries + return elgMissingDataEntries, inelgMissingDataEntries } // prepareExpiryEntries returns expiry entries for both private data which is present in the committingBlk // and missing private. -func prepareExpiryEntries(committingBlk uint64, dataEntries []*dataEntry, missingDataEntries map[missingDataKey]*bitset.BitSet, +func prepareExpiryEntries(committingBlk uint64, dataEntries []*dataEntry, elgMissingDataEntries, inelgMissingDataEntries map[missingDataKey]*bitset.BitSet, btlPolicy pvtdatapolicy.BTLPolicy) ([]*expiryEntry, error) { var expiryEntries []*expiryEntry mapByExpiringBlk := make(map[uint64]*ExpiryData) - // 1. prepare expiryData for non-missing data for _, dataEntry := range dataEntries { if err := prepareExpiryEntriesForPresentData(mapByExpiringBlk, dataEntry.key, btlPolicy); err != nil { return nil, err } } - // 2. prepare expiryData for missing data - for missingDataKey := range missingDataEntries { + for missingDataKey := range elgMissingDataEntries { + if err := prepareExpiryEntriesForMissingData(mapByExpiringBlk, &missingDataKey, btlPolicy); err != nil { + return nil, err + } + } + + for missingDataKey := range inelgMissingDataEntries { if err := prepareExpiryEntriesForMissingData(mapByExpiringBlk, &missingDataKey, btlPolicy); err != nil { return nil, err } diff --git a/core/ledger/pvtdatastorage/kv_encoding.go b/core/ledger/pvtdatastorage/kv_encoding.go index ec3972cf555..a67985248c0 100644 --- a/core/ledger/pvtdatastorage/kv_encoding.go +++ b/core/ledger/pvtdatastorage/kv_encoding.go @@ -108,7 +108,7 @@ func decodeDataValue(datavalueBytes []byte) (*rwset.CollectionPvtReadWriteSet, e return collPvtdata, err } -func encodeElgMissingDataKey(group []byte, key *missingDataKey) []byte { +func encodeElgPrioMissingDataKey(key *missingDataKey) []byte { // When missing pvtData reconciler asks for missing data info, // it is necessary to pass the missing pvtdata info associated with // the most recent block so that missing pvtdata in the state db can @@ -117,7 +117,14 @@ func encodeElgMissingDataKey(group []byte, key *missingDataKey) []byte { // to missing pvtData in the most recent block, we use reverse order // preserving encoding for the missing data key. This simplifies the // implementation of GetMissingPvtDataInfoForMostRecentBlocks(). - encKey := append(group, encodeReverseOrderVarUint64(key.blkNum)...) + encKey := append(elgPrioritizedMissingDataGroup, encodeReverseOrderVarUint64(key.blkNum)...) + encKey = append(encKey, []byte(key.ns)...) + encKey = append(encKey, nilByte) + return append(encKey, []byte(key.coll)...) +} + +func encodeElgDeprioMissingDataKey(key *missingDataKey) []byte { + encKey := append(elgDeprioritizedMissingDataGroup, encodeReverseOrderVarUint64(key.blkNum)...) encKey = append(encKey, []byte(key.ns)...) encKey = append(encKey, nilByte) return append(encKey, []byte(key.coll)...) @@ -193,14 +200,20 @@ func createRangeScanKeysForElgMissingData(blkNum uint64, group []byte) ([]byte, func createRangeScanKeysForInelgMissingData(maxBlkNum uint64, ns, coll string) ([]byte, []byte) { startKey := encodeInelgMissingDataKey( &missingDataKey{ - nsCollBlk: nsCollBlk{ns: ns, coll: coll, blkNum: maxBlkNum}, - isEligible: false, + nsCollBlk: nsCollBlk{ + ns: ns, + coll: coll, + blkNum: maxBlkNum, + }, }, ) endKey := encodeInelgMissingDataKey( &missingDataKey{ - nsCollBlk: nsCollBlk{ns: ns, coll: coll, blkNum: 0}, - isEligible: false, + nsCollBlk: nsCollBlk{ + ns: ns, + coll: coll, + blkNum: 0, + }, }, ) diff --git a/core/ledger/pvtdatastorage/kv_encoding_test.go b/core/ledger/pvtdatastorage/kv_encoding_test.go index 65ddd3f6367..71639f9243e 100644 --- a/core/ledger/pvtdatastorage/kv_encoding_test.go +++ b/core/ledger/pvtdatastorage/kv_encoding_test.go @@ -56,8 +56,7 @@ func TestEligibleMissingDataRange(t *testing.T) { startKey, endKey := eligibleMissingdatakeyRange(blockNum) var txNum uint64 for txNum = 0; txNum < 100; txNum++ { - keyOfBlock := encodeElgMissingDataKey( - elgPrioritizedMissingDataGroup, + keyOfBlock := encodeElgPrioMissingDataKey( &missingDataKey{ nsCollBlk: nsCollBlk{ ns: "ns", @@ -66,8 +65,7 @@ func TestEligibleMissingDataRange(t *testing.T) { }, }, ) - keyOfPreviousBlock := encodeElgMissingDataKey( - elgPrioritizedMissingDataGroup, + keyOfPreviousBlock := encodeElgPrioMissingDataKey( &missingDataKey{ nsCollBlk: nsCollBlk{ ns: "ns", @@ -76,8 +74,7 @@ func TestEligibleMissingDataRange(t *testing.T) { }, }, ) - keyOfNextBlock := encodeElgMissingDataKey( - elgPrioritizedMissingDataGroup, + keyOfNextBlock := encodeElgPrioMissingDataKey( &missingDataKey{ nsCollBlk: nsCollBlk{ ns: "ns", @@ -121,7 +118,7 @@ func testEncodeDecodeMissingdataKey(t *testing.T, blkNum uint64) { t.Run("eligiblePrioritizedKey", func(t *testing.T) { decodedKey := decodeElgMissingDataKey( - encodeElgMissingDataKey(elgPrioritizedMissingDataGroup, key), + encodeElgPrioMissingDataKey(key), ) require.Equal(t, key, decodedKey) }, @@ -130,7 +127,7 @@ func testEncodeDecodeMissingdataKey(t *testing.T, blkNum uint64) { t.Run("eligibleDeprioritizedKey", func(t *testing.T) { decodedKey := decodeElgMissingDataKey( - encodeElgMissingDataKey(elgDeprioritizedMissingDataGroup, key), + encodeElgDeprioMissingDataKey(key), ) require.Equal(t, key, decodedKey) }, diff --git a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go index 9db1d7796b6..1e53e3d4fe5 100644 --- a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go +++ b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go @@ -14,6 +14,13 @@ import ( "github.com/willf/bitset" ) +type elgMissingDataGroup int + +const ( + prioritized elgMissingDataGroup = iota + deprioritized +) + // CommitPvtDataOfOldBlocks commits the pvtData (i.e., previously missing data) of old blockp. // The parameter `blocksPvtData` refers a list of old block's pvtdata which are missing in the pvtstore. // Given a list of old block's pvtData, `CommitPvtDataOfOldBlocks` performs the following three @@ -24,11 +31,13 @@ import ( // (3) commit the update batch to the pvtStore func (s *Store) CommitPvtDataOfOldBlocks( blocksPvtData map[uint64][]*ledger.TxPvtData, - deprioritizedMissingData ledger.MissingPvtDataInfo, + unreconciledMissingData ledger.MissingPvtDataInfo, ) error { s.purgerLock.Lock() defer s.purgerLock.Unlock() + deprioritizedMissingData := unreconciledMissingData + if s.isLastUpdatedOldBlocksSet { return &ErrIllegalCall{`The lastUpdatedOldBlocksList is set. It means that the stateDB may not be in sync with the pvtStore`} @@ -90,7 +99,7 @@ func (p *oldBlockDataProcessor) prepareDataAndExpiryEntries(blocksPvtData map[ui continue } - if expData, err = p.getExpiryData(expKey); err != nil { + if expData, err = p.getExpiryDataFromEntriesOrStore(expKey); err != nil { return err } if expData == nil { @@ -114,7 +123,7 @@ func (p *oldBlockDataProcessor) prepareMissingDataEntriesToReflectReconciledData key := dataKey.nsCollBlk txNum := uint(dataKey.txNum) - prioMissingData, err := p.getMissingDataFromPrioritizedList(key) + prioMissingData, err := p.getMissingDataFromEntriesOrStore(prioritized, key) if err != nil { return err } @@ -123,15 +132,13 @@ func (p *oldBlockDataProcessor) prepareMissingDataEntriesToReflectReconciledData continue } - deprioMissingData, err := p.getMissingDataFromDeprioritizedList(key) + deprioMissingData, err := p.getMissingDataFromEntriesOrStore(deprioritized, key) if err != nil { return err } if deprioMissingData != nil && deprioMissingData.Test(txNum) { p.entries.deprioritizedMissingDataEntries[key] = deprioMissingData.Clear(txNum) } - // if the missing data entry is already purged by the purge scheduler, we would - // get nil missingData from both prioritized and deprioritized list } return nil @@ -148,7 +155,7 @@ func (p *oldBlockDataProcessor) prepareMissingDataEntriesToReflectPriority(depri } txNum := uint(txNum) - prioMissingData, err := p.getMissingDataFromPrioritizedList(key) + prioMissingData, err := p.getMissingDataFromEntriesOrStore(prioritized, key) if err != nil { return err } @@ -164,7 +171,7 @@ func (p *oldBlockDataProcessor) prepareMissingDataEntriesToReflectPriority(depri } p.entries.prioritizedMissingDataEntries[key] = prioMissingData.Clear(txNum) - deprioMissingData, err := p.getMissingDataFromDeprioritizedList(key) + deprioMissingData, err := p.getMissingDataFromEntriesOrStore(deprioritized, key) if err != nil { return err } @@ -193,7 +200,7 @@ func (p *oldBlockDataProcessor) constructExpiryKey(dataEntry *dataEntry) (expiry }, nil } -func (p *oldBlockDataProcessor) getExpiryData(expKey expiryKey) (*ExpiryData, error) { +func (p *oldBlockDataProcessor) getExpiryDataFromEntriesOrStore(expKey expiryKey) (*ExpiryData, error) { if expiryData, ok := p.entries.expiryEntries[expKey]; ok { return expiryData, nil } @@ -203,36 +210,38 @@ func (p *oldBlockDataProcessor) getExpiryData(expKey expiryKey) (*ExpiryData, er return nil, err } if expData == nil { - return nil, errors.Wrap(err, "error while getting expiry data from the store") + return nil, nil } + return decodeExpiryValue(expData) } -func (p *oldBlockDataProcessor) getMissingDataFromPrioritizedList(nsCollBlk nsCollBlk) (*bitset.BitSet, error) { - missingData, ok := p.entries.prioritizedMissingDataEntries[nsCollBlk] - if ok { - return missingData, nil +func (p *oldBlockDataProcessor) getMissingDataFromEntriesOrStore(group elgMissingDataGroup, nsCollBlk nsCollBlk) (*bitset.BitSet, error) { + switch group { + case prioritized: + missingData, ok := p.entries.prioritizedMissingDataEntries[nsCollBlk] + if ok { + return missingData, nil + } + case deprioritized: + missingData, ok := p.entries.deprioritizedMissingDataEntries[nsCollBlk] + if ok { + return missingData, nil + } } - return p.getMissingDataBitmapFromStore(elgPrioritizedMissingDataGroup, nsCollBlk) -} - -func (p *oldBlockDataProcessor) getMissingDataFromDeprioritizedList(nsCollBlk nsCollBlk) (*bitset.BitSet, error) { - missingData, ok := p.entries.deprioritizedMissingDataEntries[nsCollBlk] - if ok { - return missingData, nil + missingKey := &missingDataKey{ + nsCollBlk: nsCollBlk, } - return p.getMissingDataBitmapFromStore(elgDeprioritizedMissingDataGroup, nsCollBlk) -} + var key []byte -func (p *oldBlockDataProcessor) getMissingDataBitmapFromStore(group []byte, nsCollBlk nsCollBlk) (*bitset.BitSet, error) { - key := encodeElgMissingDataKey( - group, - &missingDataKey{ - nsCollBlk: nsCollBlk, - }, - ) + switch group { + case prioritized: + key = encodeElgPrioMissingDataKey(missingKey) + case deprioritized: + key = encodeElgDeprioMissingDataKey(missingKey) + } missingData, err := p.db.Get(key) if err != nil { @@ -303,19 +312,21 @@ func (e *entriesForPvtDataOfOldBlocks) addMissingDataEntriesTo(batch *leveldbhel var err error entries := map[string]map[nsCollBlk]*bitset.BitSet{ - string(elgPrioritizedMissingDataGroup): e.prioritizedMissingDataEntries, - string(elgDeprioritizedMissingDataGroup): e.deprioritizedMissingDataEntries, + "prioritized": e.prioritizedMissingDataEntries, + "deprioritized": e.deprioritizedMissingDataEntries, } for group, missingDataList := range entries { for nsCollBlk, missingData := range missingDataList { - key = encodeElgMissingDataKey( - []byte(group), - &missingDataKey{ - nsCollBlk: nsCollBlk, - isEligible: true, - }, - ) + missingKey := &missingDataKey{ + nsCollBlk: nsCollBlk, + } + switch group { + case "prioritized": + key = encodeElgPrioMissingDataKey(missingKey) + case "deprioritized": + key = encodeElgDeprioMissingDataKey(missingKey) + } if missingData.None() { batch.Delete(key) diff --git a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go index 4ff255e29cd..5a0cff8db43 100644 --- a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go +++ b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go @@ -292,9 +292,10 @@ func TestCommitPvtDataOfOldBlocksWithBTL(t *testing.T) { } for _, k := range keys { - bitmap, err := p.getMissingDataBitmapFromStore(elgDeprioritizedMissingDataGroup, k) + encKey := encodeElgPrioMissingDataKey(&missingDataKey{k}) + missingData, err := p.db.Get(encKey) require.NoError(t, err) - require.Nil(t, bitmap) + require.Nil(t, missingData) } } diff --git a/core/ledger/pvtdatastorage/store.go b/core/ledger/pvtdatastorage/store.go index 45e87e31767..b229f5de051 100644 --- a/core/ledger/pvtdatastorage/store.go +++ b/core/ledger/pvtdatastorage/store.go @@ -21,7 +21,18 @@ import ( "github.com/willf/bitset" ) -var logger = flogging.MustGetLogger("pvtdatastorage") +var ( + logger = flogging.MustGetLogger("pvtdatastorage") + // The missing data entries are classified into three categories: + // (1) eligible prioritized + // (2) eligible deprioritized + // (3) ineligible + // The reconciler would fetch the eligible prioritized missing data + // from other peers. A chance for eligible deprioritized missing data + // would be given after giving deprioritizedMissingDataPeriodicity number + // of chances to the eligible prioritized missing data + deprioritizedMissingDataPeriodicity = 1000 +) // Provider provides handle to specific 'Store' that in turn manages // private write sets for a ledger @@ -64,8 +75,7 @@ type Store struct { // recovery operation. isLastUpdatedOldBlocksSet bool - iterSinceDeprioMissingDataAccess int - deprioritizedMissingDataPeriodicity int + iterSinceDeprioMissingDataAccess int } type blkTranNumKey []byte @@ -97,13 +107,13 @@ type dataKey struct { type missingDataKey struct { nsCollBlk - isEligible bool } type storeEntries struct { - dataEntries []*dataEntry - expiryEntries []*expiryEntry - missingDataEntries map[missingDataKey]*bitset.BitSet + dataEntries []*dataEntry + expiryEntries []*expiryEntry + elgMissingDataEntries map[missingDataKey]*bitset.BitSet + inelgMissingDataEntries map[missingDataKey]*bitset.BitSet } // lastUpdatedOldBlocksList keeps the list of last updated blocks @@ -143,7 +153,6 @@ func (p *Provider) OpenStore(ledgerid string) (*Store, error) { return nil, err } s.launchCollElgProc() - s.deprioritizedMissingDataPeriodicity = 1000 logger.Debugf("Pvtdata store opened. Initial state: isEmpty [%t], lastCommittedBlock [%d]", s.isEmpty, s.lastCommittedBlock) return s, nil @@ -239,13 +248,18 @@ func (s *Store) Commit(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvtD batch.Put(key, val) } - for missingDataKey, missingDataValue := range storeEntries.missingDataEntries { - switch { - case missingDataKey.isEligible: - key = encodeElgMissingDataKey(elgPrioritizedMissingDataGroup, &missingDataKey) - default: - key = encodeInelgMissingDataKey(&missingDataKey) + for missingDataKey, missingDataValue := range storeEntries.elgMissingDataEntries { + key = encodeElgPrioMissingDataKey(&missingDataKey) + + if val, err = encodeMissingDataValue(missingDataValue); err != nil { + return err } + batch.Put(key, val) + } + + for missingDataKey, missingDataValue := range storeEntries.inelgMissingDataEntries { + key = encodeInelgMissingDataKey(&missingDataKey) + if val, err = encodeMissingDataValue(missingDataValue); err != nil { return err } @@ -412,7 +426,7 @@ func (s *Store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.M return nil, nil } - if s.iterSinceDeprioMissingDataAccess == s.deprioritizedMissingDataPeriodicity { + if s.iterSinceDeprioMissingDataAccess == deprioritizedMissingDataPeriodicity { s.iterSinceDeprioMissingDataAccess = 0 return s.getMissingData(elgDeprioritizedMissingDataGroup, maxBlock) } @@ -548,10 +562,10 @@ func (s *Store) purgeExpiredData(minBlkNum, maxBlkNum uint64) error { for _, missingDataKey := range missingDataKeys { batch.Delete( - encodeElgMissingDataKey(elgPrioritizedMissingDataGroup, missingDataKey), + encodeElgPrioMissingDataKey(missingDataKey), ) batch.Delete( - encodeElgMissingDataKey(elgDeprioritizedMissingDataGroup, missingDataKey), + encodeElgDeprioMissingDataKey(missingDataKey), ) batch.Delete( encodeInelgMissingDataKey(missingDataKey), @@ -648,12 +662,11 @@ func (s *Store) processCollElgEvents() error { for collItr.Next() { // each entry originalKey, originalVal := collItr.Key(), collItr.Value() modifiedKey := decodeInelgMissingDataKey(originalKey) - modifiedKey.isEligible = true batch.Delete(originalKey) copyVal := make([]byte, len(originalVal)) copy(copyVal, originalVal) batch.Put( - encodeElgMissingDataKey(elgPrioritizedMissingDataGroup, modifiedKey), + encodeElgPrioMissingDataKey(modifiedKey), copyVal, ) collEntriesConverted++ diff --git a/core/ledger/pvtdatastorage/store_test.go b/core/ledger/pvtdatastorage/store_test.go index 52e5d195cd4..b62898b973e 100644 --- a/core/ledger/pvtdatastorage/store_test.go +++ b/core/ledger/pvtdatastorage/store_test.go @@ -245,7 +245,7 @@ func TestGetMissingDataInfo(t *testing.T) { }, } - store.deprioritizedMissingDataPeriodicity = 10 + deprioritizedMissingDataPeriodicity = 10 for i := 1; i <= 55; i++ { if i%11 == 0 { // after ever 10 iterations of accessing the prioritized list, the @@ -426,22 +426,22 @@ func TestStorePurge(t *testing.T) { ns2Coll2 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-2", coll: "coll-2", blkNum: 1}, txNum: 2} // eligible missingData entries for ns-1:coll-1, ns-1:coll-2 (neverExpires) should exist in store - ns1Coll1elgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: 1}, isEligible: true} - ns1Coll2elgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-2", blkNum: 1}, isEligible: true} + ns1Coll1elgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: 1}} + ns1Coll2elgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-2", blkNum: 1}} // ineligible missingData entries for ns-3:col-1, ns-3:coll-2 (neverExpires) should exist in store - ns3Coll1inelgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-3", coll: "coll-1", blkNum: 1}, isEligible: false} - ns3Coll2inelgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-3", coll: "coll-2", blkNum: 1}, isEligible: false} + ns3Coll1inelgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-3", coll: "coll-1", blkNum: 1}} + ns3Coll2inelgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-3", coll: "coll-2", blkNum: 1}} testWaitForPurgerRoutineToFinish(s) require.True(t, testDataKeyExists(t, s, ns1Coll1)) require.True(t, testDataKeyExists(t, s, ns2Coll2)) - require.True(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD, false)) - require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD, false)) + require.True(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll1elgMD)) + require.True(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll2elgMD)) - require.True(t, testMissingDataKeyExists(t, s, ns3Coll1inelgMD, false)) - require.True(t, testMissingDataKeyExists(t, s, ns3Coll2inelgMD, false)) + require.True(t, testInelgMissingDataKeyExists(t, s, ns3Coll1inelgMD)) + require.True(t, testInelgMissingDataKeyExists(t, s, ns3Coll2inelgMD)) deprioritizedList := ledger.MissingPvtDataInfo{ 1: ledger.MissingBlockPvtdataInfo{ @@ -466,14 +466,14 @@ func TestStorePurge(t *testing.T) { require.True(t, testDataKeyExists(t, s, ns1Coll1)) require.True(t, testDataKeyExists(t, s, ns2Coll2)) // eligible missingData entries for ns-1:coll-1, ns-1:coll-2 (neverExpires) should exist in store - require.True(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD, false)) - require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD, false)) + require.True(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll1elgMD)) + require.True(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll2elgMD)) // some transactions which miss ns-1:coll-1 and ns-1:coll-2 has be moved to deprioritizedList list - require.True(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD, true)) - require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD, true)) + require.True(t, testElgDeprioMissingDataKeyExists(t, s, ns1Coll1elgMD)) + require.True(t, testElgDeprioMissingDataKeyExists(t, s, ns1Coll2elgMD)) // ineligible missingData entries for ns-3:col-1, ns-3:coll-2 (neverExpires) should exist in store - require.True(t, testMissingDataKeyExists(t, s, ns3Coll1inelgMD, false)) - require.True(t, testMissingDataKeyExists(t, s, ns3Coll2inelgMD, false)) + require.True(t, testInelgMissingDataKeyExists(t, s, ns3Coll1inelgMD)) + require.True(t, testInelgMissingDataKeyExists(t, s, ns3Coll2inelgMD)) // write pvt data for block 4 require.NoError(t, s.Commit(4, nil, nil)) @@ -483,13 +483,13 @@ func TestStorePurge(t *testing.T) { require.False(t, testDataKeyExists(t, s, ns1Coll1)) require.True(t, testDataKeyExists(t, s, ns2Coll2)) // eligible missingData entries for ns-1:coll-1 should have expired and ns-1:coll-2 (neverExpires) should exist in store - require.False(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD, false)) - require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD, false)) - require.False(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD, true)) - require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD, true)) + require.False(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll1elgMD)) + require.True(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll2elgMD)) + require.False(t, testElgDeprioMissingDataKeyExists(t, s, ns1Coll1elgMD)) + require.True(t, testElgDeprioMissingDataKeyExists(t, s, ns1Coll2elgMD)) // ineligible missingData entries for ns-3:col-1 should have expired and ns-3:coll-2 (neverExpires) should exist in store - require.False(t, testMissingDataKeyExists(t, s, ns3Coll1inelgMD, false)) - require.True(t, testMissingDataKeyExists(t, s, ns3Coll2inelgMD, false)) + require.False(t, testInelgMissingDataKeyExists(t, s, ns3Coll1inelgMD)) + require.True(t, testInelgMissingDataKeyExists(t, s, ns3Coll2inelgMD)) // write pvt data for block 5 require.NoError(t, s.Commit(5, nil, nil)) @@ -779,16 +779,24 @@ func testDataKeyExists(t *testing.T, s *Store, dataKey *dataKey) bool { return len(val) != 0 } -func testMissingDataKeyExists(t *testing.T, s *Store, missingDataKey *missingDataKey, deprioritized bool) bool { - var key []byte - switch { - case missingDataKey.isEligible && !deprioritized: - key = encodeElgMissingDataKey(elgPrioritizedMissingDataGroup, missingDataKey) - case missingDataKey.isEligible && deprioritized: - key = encodeElgMissingDataKey(elgDeprioritizedMissingDataGroup, missingDataKey) - default: - key = encodeInelgMissingDataKey(missingDataKey) - } +func testElgPrioMissingDataKeyExists(t *testing.T, s *Store, missingDataKey *missingDataKey) bool { + key := encodeElgPrioMissingDataKey(missingDataKey) + + val, err := s.db.Get(key) + require.NoError(t, err) + return len(val) != 0 +} + +func testElgDeprioMissingDataKeyExists(t *testing.T, s *Store, missingDataKey *missingDataKey) bool { + key := encodeElgDeprioMissingDataKey(missingDataKey) + + val, err := s.db.Get(key) + require.NoError(t, err) + return len(val) != 0 +} +func testInelgMissingDataKeyExists(t *testing.T, s *Store, missingDataKey *missingDataKey) bool { + key := encodeInelgMissingDataKey(missingDataKey) + val, err := s.db.Get(key) require.NoError(t, err) return len(val) != 0 From 290961a49f5c761d71104eba5566fb31423f6a6b Mon Sep 17 00:00:00 2001 From: senthil Date: Tue, 1 Sep 2020 12:36:53 +0530 Subject: [PATCH 3/4] add review comments Signed-off-by: senthil --- core/ledger/pvtdatastorage/helper.go | 6 +- .../reconcile_missing_pvtdata.go | 65 +++++++++------ .../reconcile_missing_pvtdata_test.go | 81 ++++++++----------- core/ledger/pvtdatastorage/store_test.go | 5 ++ 4 files changed, 84 insertions(+), 73 deletions(-) diff --git a/core/ledger/pvtdatastorage/helper.go b/core/ledger/pvtdatastorage/helper.go index 446aed9436e..df35533cca7 100644 --- a/core/ledger/pvtdatastorage/helper.go +++ b/core/ledger/pvtdatastorage/helper.go @@ -69,8 +69,10 @@ func prepareMissingDataEntries( }, } - missingDataEntries = elgMissingDataEntries - if !nsColl.IsEligible { + switch nsColl.IsEligible { + case true: + missingDataEntries = elgMissingDataEntries + default: missingDataEntries = inelgMissingDataEntries } diff --git a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go index 1e53e3d4fe5..85743e23fbd 100644 --- a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go +++ b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go @@ -265,8 +265,12 @@ func (p *oldBlockDataProcessor) constructDBUpdateBatch() (*leveldbhelper.UpdateB return nil, errors.WithMessage(err, "error while adding expiry entries to the update batch") } - if err := p.entries.addMissingDataEntriesTo(batch); err != nil { - return nil, errors.WithMessage(err, "error while adding missing data entries to the update batch") + if err := p.entries.addElgPrioMissingDataEntriesTo(batch); err != nil { + return nil, errors.WithMessage(err, "error while adding eligible prioritized missing data entries to the update batch") + } + + if err := p.entries.addElgDeprioMissingDataEntriesTo(batch); err != nil { + return nil, errors.WithMessage(err, "error while adding eligible deprioritized missing data entries to the update batch") } return batch, nil @@ -307,37 +311,48 @@ func (e *entriesForPvtDataOfOldBlocks) addExpiryEntriesTo(batch *leveldbhelper.U return nil } -func (e *entriesForPvtDataOfOldBlocks) addMissingDataEntriesTo(batch *leveldbhelper.UpdateBatch) error { +func (e *entriesForPvtDataOfOldBlocks) addElgPrioMissingDataEntriesTo(batch *leveldbhelper.UpdateBatch) error { var key, val []byte var err error - entries := map[string]map[nsCollBlk]*bitset.BitSet{ - "prioritized": e.prioritizedMissingDataEntries, - "deprioritized": e.deprioritizedMissingDataEntries, + for nsCollBlk, missingData := range e.prioritizedMissingDataEntries { + missingKey := &missingDataKey{ + nsCollBlk: nsCollBlk, + } + key = encodeElgPrioMissingDataKey(missingKey) + + if missingData.None() { + batch.Delete(key) + continue + } + + if val, err = encodeMissingDataValue(missingData); err != nil { + return errors.Wrap(err, "error while encoding missing data bitmap") + } + batch.Put(key, val) } + return nil +} - for group, missingDataList := range entries { - for nsCollBlk, missingData := range missingDataList { - missingKey := &missingDataKey{ - nsCollBlk: nsCollBlk, - } - switch group { - case "prioritized": - key = encodeElgPrioMissingDataKey(missingKey) - case "deprioritized": - key = encodeElgDeprioMissingDataKey(missingKey) - } +func (e *entriesForPvtDataOfOldBlocks) addElgDeprioMissingDataEntriesTo(batch *leveldbhelper.UpdateBatch) error { + var key, val []byte + var err error - if missingData.None() { - batch.Delete(key) - continue - } + for nsCollBlk, missingData := range e.deprioritizedMissingDataEntries { + missingKey := &missingDataKey{ + nsCollBlk: nsCollBlk, + } + key = encodeElgDeprioMissingDataKey(missingKey) - if val, err = encodeMissingDataValue(missingData); err != nil { - return errors.Wrap(err, "error while encoding missing data bitmap") - } - batch.Put(key, val) + if missingData.None() { + batch.Delete(key) + continue } + + if val, err = encodeMissingDataValue(missingData); err != nil { + return errors.Wrap(err, "error while encoding missing data bitmap") + } + batch.Put(key, val) } return nil } diff --git a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go index 5a0cff8db43..2fc43ad5836 100644 --- a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go +++ b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go @@ -275,9 +275,6 @@ func TestCommitPvtDataOfOldBlocksWithBTL(t *testing.T) { } } // deprioritized list should not be present - p := &oldBlockDataProcessor{ - Store: store, - } keys := []nsCollBlk{ { ns: "ns-1", @@ -292,56 +289,50 @@ func TestCommitPvtDataOfOldBlocksWithBTL(t *testing.T) { } for _, k := range keys { - encKey := encodeElgPrioMissingDataKey(&missingDataKey{k}) - missingData, err := p.db.Get(encKey) + encKey := encodeElgDeprioMissingDataKey(&missingDataKey{k}) + missingData, err := store.db.Get(encKey) require.NoError(t, err) require.Nil(t, missingData) } } func TestCommitPvtDataOfOldBlocksWithDeprioritization(t *testing.T) { - sampleDataForTest := func() (map[uint64]*pvtDataForTest, ledger.MissingPvtDataInfo) { - blockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ - { - blkNum: 1, - txNum: 1, - pvtDataMissing: map[string][]string{ - "ns-1": {"coll-1"}, - "ns-2": {"coll-1"}, - }, + blockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, }, - { - blkNum: 1, - txNum: 2, - pvtDataMissing: map[string][]string{ - "ns-1": {"coll-1"}, - "ns-2": {"coll-1"}, - }, + }, + { + blkNum: 1, + txNum: 2, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, }, - { - blkNum: 2, - txNum: 1, - pvtDataMissing: map[string][]string{ - "ns-1": {"coll-1"}, - "ns-2": {"coll-1"}, - }, + }, + { + blkNum: 2, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, }, - { - blkNum: 2, - txNum: 2, - pvtDataMissing: map[string][]string{ - "ns-1": {"coll-1"}, - "ns-2": {"coll-1"}, - }, + }, + { + blkNum: 2, + txNum: 2, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, }, - } - - blocksPvtData, missingDataSummary := constructPvtDataForTest(t, blockTxPvtDataInfo) - - return blocksPvtData, missingDataSummary + }, } - _, missingDataSummary := sampleDataForTest() + blocksPvtData, missingDataSummary := constructPvtDataForTest(t, blockTxPvtDataInfo) tests := []struct { name string @@ -426,8 +417,6 @@ func TestCommitPvtDataOfOldBlocksWithDeprioritization(t *testing.T) { defer env.Cleanup() store := env.TestStore - blocksPvtData, missingDataSummary := sampleDataForTest() - // COMMIT BLOCK 0 WITH NO DATA require.NoError(t, store.Commit(0, nil, nil)) require.NoError(t, store.Commit(1, blocksPvtData[1].pvtData, blocksPvtData[1].missingDataInfo)) @@ -490,10 +479,10 @@ func TestCommitPvtDataOfOldBlocksWithDeprioritization(t *testing.T) { }, } - blocksPvtData, _ = constructPvtDataForTest(t, oldBlockTxPvtDataInfo) + pvtDataOfOldBlocks, _ := constructPvtDataForTest(t, oldBlockTxPvtDataInfo) oldBlocksPvtData := map[uint64][]*ledger.TxPvtData{ - 1: blocksPvtData[1].pvtData, - 2: blocksPvtData[2].pvtData, + 1: pvtDataOfOldBlocks[1].pvtData, + 2: pvtDataOfOldBlocks[2].pvtData, } require.NoError(t, store.CommitPvtDataOfOldBlocks(oldBlocksPvtData, nil)) diff --git a/core/ledger/pvtdatastorage/store_test.go b/core/ledger/pvtdatastorage/store_test.go index b62898b973e..f1e1efb522a 100644 --- a/core/ledger/pvtdatastorage/store_test.go +++ b/core/ledger/pvtdatastorage/store_test.go @@ -245,7 +245,12 @@ func TestGetMissingDataInfo(t *testing.T) { }, } + defaultVal := deprioritizedMissingDataPeriodicity deprioritizedMissingDataPeriodicity = 10 + defer func() { + deprioritizedMissingDataPeriodicity = defaultVal + }() + for i := 1; i <= 55; i++ { if i%11 == 0 { // after ever 10 iterations of accessing the prioritized list, the From 26f0813ee0a9c63bca4da924a0e5dd89c064a4bb Mon Sep 17 00:00:00 2001 From: senthil Date: Tue, 1 Sep 2020 22:31:56 +0530 Subject: [PATCH 4/4] address review comments Signed-off-by: senthil --- core/ledger/pvtdatastorage/helper.go | 19 +- .../reconcile_missing_pvtdata.go | 56 ++-- .../reconcile_missing_pvtdata_test.go | 303 +++++++++++------- 3 files changed, 221 insertions(+), 157 deletions(-) diff --git a/core/ledger/pvtdatastorage/helper.go b/core/ledger/pvtdatastorage/helper.go index df35533cca7..ec5ec5b430d 100644 --- a/core/ledger/pvtdatastorage/helper.go +++ b/core/ledger/pvtdatastorage/helper.go @@ -57,8 +57,6 @@ func prepareMissingDataEntries( elgMissingDataEntries := make(map[missingDataKey]*bitset.BitSet) inelgMissingDataEntries := make(map[missingDataKey]*bitset.BitSet) - var missingDataEntries map[missingDataKey]*bitset.BitSet - for txNum, missingData := range missingPvtData { for _, nsColl := range missingData { key := missingDataKey{ @@ -71,17 +69,16 @@ func prepareMissingDataEntries( switch nsColl.IsEligible { case true: - missingDataEntries = elgMissingDataEntries + if _, ok := elgMissingDataEntries[key]; !ok { + elgMissingDataEntries[key] = &bitset.BitSet{} + } + elgMissingDataEntries[key].Set(uint(txNum)) default: - missingDataEntries = inelgMissingDataEntries - } - - if _, ok := missingDataEntries[key]; !ok { - missingDataEntries[key] = &bitset.BitSet{} + if _, ok := inelgMissingDataEntries[key]; !ok { + inelgMissingDataEntries[key] = &bitset.BitSet{} + } + inelgMissingDataEntries[key].Set(uint(txNum)) } - bitmap := missingDataEntries[key] - - bitmap.Set(uint(txNum)) } } diff --git a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go index 85743e23fbd..2030c6b2c1f 100644 --- a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go +++ b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go @@ -123,7 +123,7 @@ func (p *oldBlockDataProcessor) prepareMissingDataEntriesToReflectReconciledData key := dataKey.nsCollBlk txNum := uint(dataKey.txNum) - prioMissingData, err := p.getMissingDataFromEntriesOrStore(prioritized, key) + prioMissingData, err := p.getPrioMissingDataFromEntriesOrStore(key) if err != nil { return err } @@ -132,7 +132,7 @@ func (p *oldBlockDataProcessor) prepareMissingDataEntriesToReflectReconciledData continue } - deprioMissingData, err := p.getMissingDataFromEntriesOrStore(deprioritized, key) + deprioMissingData, err := p.getDeprioMissingDataFromEntriesOrStore(key) if err != nil { return err } @@ -155,7 +155,7 @@ func (p *oldBlockDataProcessor) prepareMissingDataEntriesToReflectPriority(depri } txNum := uint(txNum) - prioMissingData, err := p.getMissingDataFromEntriesOrStore(prioritized, key) + prioMissingData, err := p.getPrioMissingDataFromEntriesOrStore(key) if err != nil { return err } @@ -171,7 +171,7 @@ func (p *oldBlockDataProcessor) prepareMissingDataEntriesToReflectPriority(depri } p.entries.prioritizedMissingDataEntries[key] = prioMissingData.Clear(txNum) - deprioMissingData, err := p.getMissingDataFromEntriesOrStore(deprioritized, key) + deprioMissingData, err := p.getDeprioMissingDataFromEntriesOrStore(key) if err != nil { return err } @@ -216,42 +216,48 @@ func (p *oldBlockDataProcessor) getExpiryDataFromEntriesOrStore(expKey expiryKey return decodeExpiryValue(expData) } -func (p *oldBlockDataProcessor) getMissingDataFromEntriesOrStore(group elgMissingDataGroup, nsCollBlk nsCollBlk) (*bitset.BitSet, error) { - switch group { - case prioritized: - missingData, ok := p.entries.prioritizedMissingDataEntries[nsCollBlk] - if ok { - return missingData, nil - } - case deprioritized: - missingData, ok := p.entries.deprioritizedMissingDataEntries[nsCollBlk] - if ok { - return missingData, nil - } +func (p *oldBlockDataProcessor) getPrioMissingDataFromEntriesOrStore(nsCollBlk nsCollBlk) (*bitset.BitSet, error) { + missingData, ok := p.entries.prioritizedMissingDataEntries[nsCollBlk] + if ok { + return missingData, nil } missingKey := &missingDataKey{ nsCollBlk: nsCollBlk, } + key := encodeElgPrioMissingDataKey(missingKey) - var key []byte + encMissingData, err := p.db.Get(key) + if err != nil { + return nil, errors.Wrap(err, "error while getting missing data bitmap from the store") + } + if encMissingData == nil { + return nil, nil + } - switch group { - case prioritized: - key = encodeElgPrioMissingDataKey(missingKey) - case deprioritized: - key = encodeElgDeprioMissingDataKey(missingKey) + return decodeMissingDataValue(encMissingData) +} + +func (p *oldBlockDataProcessor) getDeprioMissingDataFromEntriesOrStore(nsCollBlk nsCollBlk) (*bitset.BitSet, error) { + missingData, ok := p.entries.deprioritizedMissingDataEntries[nsCollBlk] + if ok { + return missingData, nil + } + + missingKey := &missingDataKey{ + nsCollBlk: nsCollBlk, } + key := encodeElgDeprioMissingDataKey(missingKey) - missingData, err := p.db.Get(key) + encMissingData, err := p.db.Get(key) if err != nil { return nil, errors.Wrap(err, "error while getting missing data bitmap from the store") } - if missingData == nil { + if encMissingData == nil { return nil, nil } - return decodeMissingDataValue(missingData) + return decodeMissingDataValue(encMissingData) } func (p *oldBlockDataProcessor) constructDBUpdateBatch() (*leveldbhelper.UpdateBatch, error) { diff --git a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go index 2fc43ad5836..4ac4749e184 100644 --- a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go +++ b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go @@ -150,150 +150,211 @@ func TestCommitPvtDataOfOldBlocks(t *testing.T) { } func TestCommitPvtDataOfOldBlocksWithBTL(t *testing.T) { - btlPolicy := btltestutil.SampleBTLPolicy( - map[[2]string]uint64{ - {"ns-1", "coll-1"}: 1, - {"ns-2", "coll-1"}: 1, - }, - ) - env := NewTestStoreEnv(t, "TestCommitPvtDataOfOldBlocksWithBTL", btlPolicy, pvtDataConf()) - defer env.Cleanup() - store := env.TestStore - - blockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ - { - blkNum: 1, - txNum: 1, - pvtDataMissing: map[string][]string{ - "ns-1": {"coll-1"}, - "ns-2": {"coll-1"}, + setup := func(store *Store) { + blockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, }, - }, - { - blkNum: 1, - txNum: 2, - pvtDataMissing: map[string][]string{ - "ns-1": {"coll-1"}, - "ns-2": {"coll-1"}, + { + blkNum: 1, + txNum: 2, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, }, - }, - { - blkNum: 1, - txNum: 3, - pvtDataMissing: map[string][]string{ - "ns-1": {"coll-1"}, - "ns-2": {"coll-1"}, + { + blkNum: 1, + txNum: 3, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, }, - }, - } + } - blocksPvtData, missingDataSummary := constructPvtDataForTest(t, blockTxPvtDataInfo) + blocksPvtData, missingDataSummary := constructPvtDataForTest(t, blockTxPvtDataInfo) - require.NoError(t, store.Commit(0, nil, nil)) - require.NoError(t, store.Commit(1, blocksPvtData[1].pvtData, blocksPvtData[1].missingDataInfo)) + require.NoError(t, store.Commit(0, nil, nil)) + require.NoError(t, store.Commit(1, blocksPvtData[1].pvtData, blocksPvtData[1].missingDataInfo)) - assertMissingDataInfo(t, store, missingDataSummary, 1) + assertMissingDataInfo(t, store, missingDataSummary, 1) - // COMMIT BLOCK 2 & 3 WITH NO PVTDATA - require.NoError(t, store.Commit(2, nil, nil)) - require.NoError(t, store.Commit(3, nil, nil)) + // COMMIT BLOCK 2 & 3 WITH NO PVTDATA + require.NoError(t, store.Commit(2, nil, nil)) + require.NoError(t, store.Commit(3, nil, nil)) + } - // in block 1, ns-1:coll-1 and ns-2:coll-2 should have expired but not purged. - // hence, the commit of pvtdata of block 1 transaction 1 should create entries - // in the store - oldBlockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ - { - blkNum: 1, - txNum: 1, - pvtDataPresent: map[string][]string{ - "ns-1": {"coll-1"}, - "ns-2": {"coll-1"}, + t.Run("expired but not purged", func(t *testing.T) { + btlPolicy := btltestutil.SampleBTLPolicy( + map[[2]string]uint64{ + {"ns-1", "coll-1"}: 1, + {"ns-2", "coll-1"}: 1, }, - }, - } + ) + env := NewTestStoreEnv(t, "TestCommitPvtDataOfOldBlocksWithBTL", btlPolicy, pvtDataConf()) + defer env.Cleanup() + store := env.TestStore + + setup(store) + // in block 1, ns-1:coll-1 and ns-2:coll-2 should have expired but not purged. + // hence, the commit of pvtdata of block 1 transaction 1 should create entries + // in the store + oldBlockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + } - blocksPvtData, _ = constructPvtDataForTest(t, oldBlockTxPvtDataInfo) - oldBlocksPvtData := map[uint64][]*ledger.TxPvtData{ - 1: blocksPvtData[1].pvtData, - } - require.NoError(t, store.CommitPvtDataOfOldBlocks(oldBlocksPvtData, nil)) + blocksPvtData, _ := constructPvtDataForTest(t, oldBlockTxPvtDataInfo) + oldBlocksPvtData := map[uint64][]*ledger.TxPvtData{ + 1: blocksPvtData[1].pvtData, + } + deprioritizedList := ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 2: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + { + Namespace: "ns-2", + Collection: "coll-1", + }, + }, + }, + } + require.NoError(t, store.CommitPvtDataOfOldBlocks(oldBlocksPvtData, deprioritizedList)) - for _, b := range blocksPvtData { - for _, dkey := range b.dataKeys { - require.True(t, testDataKeyExists(t, store, dkey)) + for _, b := range blocksPvtData { + for _, dkey := range b.dataKeys { + require.True(t, testDataKeyExists(t, store, dkey)) + } + } + // as all missing data are expired, get missing info would return nil though + // it is not purged yet + assertMissingDataInfo(t, store, make(ledger.MissingPvtDataInfo), 1) + + // deprioritized list should be present + tests := []struct { + key nsCollBlk + expectedBitmap *bitset.BitSet + }{ + { + key: nsCollBlk{ + ns: "ns-1", + coll: "coll-1", + blkNum: 1, + }, + expectedBitmap: constructBitSetForTest(2), + }, + { + key: nsCollBlk{ + ns: "ns-2", + coll: "coll-1", + blkNum: 1, + }, + expectedBitmap: constructBitSetForTest(2), + }, } - } - // as all missing data are expired, get missing info would return nil though - // it is not purged yet - assertMissingDataInfo(t, store, make(ledger.MissingPvtDataInfo), 1) - // while committing the next block, all entries related to expiry data are purged - require.NoError(t, store.Commit(4, nil, nil)) + for _, tt := range tests { + encKey := encodeElgDeprioMissingDataKey(&missingDataKey{tt.key}) + missingData, err := store.db.Get(encKey) + require.NoError(t, err) - testWaitForPurgerRoutineToFinish(store) - for _, b := range blocksPvtData { - for _, dkey := range b.dataKeys { - require.False(t, testDataKeyExists(t, store, dkey)) + expectedMissingData, err := encodeMissingDataValue(tt.expectedBitmap) + require.NoError(t, err) + require.Equal(t, expectedMissingData, missingData) } - } + }) - // in block 1, ns-1:coll-1 and ns-2:coll-2 should have expired and purged. - // hence, the commit of pvtdata of block 1 transaction 2 should not create - // entries in the store - oldBlockTxPvtDataInfo = []*blockTxPvtDataInfoForTest{ - { - blkNum: 1, - txNum: 2, - pvtDataPresent: map[string][]string{ - "ns-1": {"coll-1"}, - "ns-2": {"coll-1"}, + t.Run("expired and purged", func(t *testing.T) { + btlPolicy := btltestutil.SampleBTLPolicy( + map[[2]string]uint64{ + {"ns-1", "coll-1"}: 1, + {"ns-2", "coll-1"}: 1, }, - }, - } - oldBlocksPvtData = map[uint64][]*ledger.TxPvtData{ - 1: blocksPvtData[1].pvtData, - } - deprioritizedList := ledger.MissingPvtDataInfo{ - 1: ledger.MissingBlockPvtdataInfo{ - 3: { - { - Namespace: "ns-1", - Collection: "coll-1", + ) + env := NewTestStoreEnv(t, "TestCommitPvtDataOfOldBlocksWithBTL", btlPolicy, pvtDataConf()) + defer env.Cleanup() + store := env.TestStore + + setup(store) + require.NoError(t, store.Commit(4, nil, nil)) + + testWaitForPurgerRoutineToFinish(store) + + // in block 1, ns-1:coll-1 and ns-2:coll-2 should have expired and purged. + // hence, the commit of pvtdata of block 1 transaction 2 should not create + // entries in the store + oldBlockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 2, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, }, - { - Namespace: "ns-2", - Collection: "coll-1", + }, + } + blocksPvtData, _ := constructPvtDataForTest(t, oldBlockTxPvtDataInfo) + oldBlocksPvtData := map[uint64][]*ledger.TxPvtData{ + 1: blocksPvtData[1].pvtData, + } + deprioritizedList := ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 3: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + { + Namespace: "ns-2", + Collection: "coll-1", + }, }, }, - }, - } - require.NoError(t, store.CommitPvtDataOfOldBlocks(oldBlocksPvtData, deprioritizedList)) + } + require.NoError(t, store.CommitPvtDataOfOldBlocks(oldBlocksPvtData, deprioritizedList)) - for _, b := range blocksPvtData { - for _, dkey := range b.dataKeys { - require.False(t, testDataKeyExists(t, store, dkey)) + for _, b := range blocksPvtData { + for _, dkey := range b.dataKeys { + require.False(t, testDataKeyExists(t, store, dkey)) + } } - } - // deprioritized list should not be present - keys := []nsCollBlk{ - { - ns: "ns-1", - coll: "coll-1", - blkNum: 1, - }, - { - ns: "ns-2", - coll: "coll-1", - blkNum: 1, - }, - } - for _, k := range keys { - encKey := encodeElgDeprioMissingDataKey(&missingDataKey{k}) - missingData, err := store.db.Get(encKey) - require.NoError(t, err) - require.Nil(t, missingData) - } + // deprioritized list should not be present + keys := []nsCollBlk{ + { + ns: "ns-1", + coll: "coll-1", + blkNum: 1, + }, + { + ns: "ns-2", + coll: "coll-1", + blkNum: 1, + }, + } + + for _, k := range keys { + encKey := encodeElgDeprioMissingDataKey(&missingDataKey{k}) + missingData, err := store.db.Get(encKey) + require.NoError(t, err) + require.Nil(t, missingData) + } + }) } func TestCommitPvtDataOfOldBlocksWithDeprioritization(t *testing.T) {