diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index 29b4fc0b3c1..88e3e05959e 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -754,7 +754,7 @@ func (l *kvLedger) CommitPvtDataOfOldBlocks(reconciledPvtdata []*ledger.Reconcil logger.Debugf("[%s:] Committing pvtData of [%d] old blocks to the pvtdatastore", l.ledgerID, len(reconciledPvtdata)) - err = l.pvtdataStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData) + err = l.pvtdataStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData, nil) if err != nil { return nil, err } diff --git a/core/ledger/pvtdatastorage/helper.go b/core/ledger/pvtdatastorage/helper.go index 73bbf064706..ec5ec5b430d 100644 --- a/core/ledger/pvtdatastorage/helper.go +++ b/core/ledger/pvtdatastorage/helper.go @@ -19,17 +19,19 @@ func prepareStoreEntries(blockNum uint64, pvtData []*ledger.TxPvtData, btlPolicy missingPvtData ledger.TxMissingPvtData) (*storeEntries, error) { dataEntries := prepareDataEntries(blockNum, pvtData) - missingDataEntries := prepareMissingDataEntries(blockNum, missingPvtData) + elgMissingDataEntries, inelgMissingDataEntries := prepareMissingDataEntries(blockNum, missingPvtData) - expiryEntries, err := prepareExpiryEntries(blockNum, dataEntries, missingDataEntries, btlPolicy) + expiryEntries, err := prepareExpiryEntries(blockNum, dataEntries, elgMissingDataEntries, inelgMissingDataEntries, btlPolicy) if err != nil { return nil, err } return &storeEntries{ - dataEntries: dataEntries, - expiryEntries: expiryEntries, - missingDataEntries: missingDataEntries}, nil + dataEntries: dataEntries, + expiryEntries: expiryEntries, + elgMissingDataEntries: elgMissingDataEntries, + inelgMissingDataEntries: inelgMissingDataEntries, + }, nil } func prepareDataEntries(blockNum uint64, pvtData []*ledger.TxPvtData) []*dataEntry { @@ -48,42 +50,61 @@ func prepareDataEntries(blockNum uint64, pvtData []*ledger.TxPvtData) []*dataEnt return dataEntries } -func prepareMissingDataEntries(committingBlk uint64, missingPvtData ledger.TxMissingPvtData) map[missingDataKey]*bitset.BitSet { - missingDataEntries := make(map[missingDataKey]*bitset.BitSet) +func prepareMissingDataEntries( + committingBlk uint64, + missingPvtData ledger.TxMissingPvtData, +) (map[missingDataKey]*bitset.BitSet, map[missingDataKey]*bitset.BitSet) { + elgMissingDataEntries := make(map[missingDataKey]*bitset.BitSet) + inelgMissingDataEntries := make(map[missingDataKey]*bitset.BitSet) for txNum, missingData := range missingPvtData { for _, nsColl := range missingData { - key := missingDataKey{nsCollBlk{nsColl.Namespace, nsColl.Collection, committingBlk}, - nsColl.IsEligible} - - if _, ok := missingDataEntries[key]; !ok { - missingDataEntries[key] = &bitset.BitSet{} + key := missingDataKey{ + nsCollBlk{ + ns: nsColl.Namespace, + coll: nsColl.Collection, + blkNum: committingBlk, + }, } - bitmap := missingDataEntries[key] - bitmap.Set(uint(txNum)) + switch nsColl.IsEligible { + case true: + if _, ok := elgMissingDataEntries[key]; !ok { + elgMissingDataEntries[key] = &bitset.BitSet{} + } + elgMissingDataEntries[key].Set(uint(txNum)) + default: + if _, ok := inelgMissingDataEntries[key]; !ok { + inelgMissingDataEntries[key] = &bitset.BitSet{} + } + inelgMissingDataEntries[key].Set(uint(txNum)) + } } } - return missingDataEntries + return elgMissingDataEntries, inelgMissingDataEntries } // prepareExpiryEntries returns expiry entries for both private data which is present in the committingBlk // and missing private. -func prepareExpiryEntries(committingBlk uint64, dataEntries []*dataEntry, missingDataEntries map[missingDataKey]*bitset.BitSet, +func prepareExpiryEntries(committingBlk uint64, dataEntries []*dataEntry, elgMissingDataEntries, inelgMissingDataEntries map[missingDataKey]*bitset.BitSet, btlPolicy pvtdatapolicy.BTLPolicy) ([]*expiryEntry, error) { var expiryEntries []*expiryEntry mapByExpiringBlk := make(map[uint64]*ExpiryData) - // 1. prepare expiryData for non-missing data for _, dataEntry := range dataEntries { if err := prepareExpiryEntriesForPresentData(mapByExpiringBlk, dataEntry.key, btlPolicy); err != nil { return nil, err } } - // 2. prepare expiryData for missing data - for missingDataKey := range missingDataEntries { + for missingDataKey := range elgMissingDataEntries { + if err := prepareExpiryEntriesForMissingData(mapByExpiringBlk, &missingDataKey, btlPolicy); err != nil { + return nil, err + } + } + + for missingDataKey := range inelgMissingDataEntries { if err := prepareExpiryEntriesForMissingData(mapByExpiringBlk, &missingDataKey, btlPolicy); err != nil { return nil, err } @@ -139,25 +160,38 @@ func getOrCreateExpiryData(mapByExpiringBlk map[uint64]*ExpiryData, expiringBlk } // deriveKeys constructs dataKeys and missingDataKey from an expiryEntry -func deriveKeys(expiryEntry *expiryEntry) (dataKeys []*dataKey, missingDataKeys []*missingDataKey) { +func deriveKeys(expiryEntry *expiryEntry) ([]*dataKey, []*missingDataKey) { + var dataKeys []*dataKey + var missingDataKeys []*missingDataKey + for ns, colls := range expiryEntry.value.Map { - // 1. constructs dataKeys of expired existing pvt data for coll, txNums := range colls.Map { for _, txNum := range txNums.List { dataKeys = append(dataKeys, - &dataKey{nsCollBlk{ns, coll, expiryEntry.key.committingBlk}, txNum}) + &dataKey{ + nsCollBlk: nsCollBlk{ + ns: ns, + coll: coll, + blkNum: expiryEntry.key.committingBlk, + }, + txNum: txNum, + }) } } - // 2. constructs missingDataKeys of expired missing pvt data + for coll := range colls.MissingDataMap { - // one key for eligible entries and another for ieligible entries missingDataKeys = append(missingDataKeys, - &missingDataKey{nsCollBlk{ns, coll, expiryEntry.key.committingBlk}, true}) - missingDataKeys = append(missingDataKeys, - &missingDataKey{nsCollBlk{ns, coll, expiryEntry.key.committingBlk}, false}) + &missingDataKey{ + nsCollBlk: nsCollBlk{ + ns: ns, + coll: coll, + blkNum: expiryEntry.key.committingBlk, + }, + }) } } - return + + return dataKeys, missingDataKeys } func passesFilter(dataKey *dataKey, filter ledger.PvtNsCollFilter) bool { diff --git a/core/ledger/pvtdatastorage/kv_encoding.go b/core/ledger/pvtdatastorage/kv_encoding.go index 1b1f03c3e57..a67985248c0 100644 --- a/core/ledger/pvtdatastorage/kv_encoding.go +++ b/core/ledger/pvtdatastorage/kv_encoding.go @@ -19,29 +19,30 @@ import ( ) var ( - pendingCommitKey = []byte{0} - lastCommittedBlkkey = []byte{1} - pvtDataKeyPrefix = []byte{2} - expiryKeyPrefix = []byte{3} - eligibleMissingDataKeyPrefix = []byte{4} - ineligibleMissingDataKeyPrefix = []byte{5} - collElgKeyPrefix = []byte{6} - lastUpdatedOldBlocksKey = []byte{7} + pendingCommitKey = []byte{0} + lastCommittedBlkkey = []byte{1} + pvtDataKeyPrefix = []byte{2} + expiryKeyPrefix = []byte{3} + elgPrioritizedMissingDataGroup = []byte{4} + inelgMissingDataGroup = []byte{5} + collElgKeyPrefix = []byte{6} + lastUpdatedOldBlocksKey = []byte{7} + elgDeprioritizedMissingDataGroup = []byte{8} nilByte = byte(0) emptyValue = []byte{} ) -func getDataKeysForRangeScanByBlockNum(blockNum uint64) (startKey, endKey []byte) { - startKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...) - endKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum+1, 0).ToBytes()...) - return +func getDataKeysForRangeScanByBlockNum(blockNum uint64) ([]byte, []byte) { + startKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...) + endKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum+1, 0).ToBytes()...) + return startKey, endKey } -func getExpiryKeysForRangeScan(minBlkNum, maxBlkNum uint64) (startKey, endKey []byte) { - startKey = append(expiryKeyPrefix, version.NewHeight(minBlkNum, 0).ToBytes()...) - endKey = append(expiryKeyPrefix, version.NewHeight(maxBlkNum+1, 0).ToBytes()...) - return +func getExpiryKeysForRangeScan(minBlkNum, maxBlkNum uint64) ([]byte, []byte) { + startKey := append(expiryKeyPrefix, version.NewHeight(minBlkNum, 0).ToBytes()...) + endKey := append(expiryKeyPrefix, version.NewHeight(maxBlkNum+1, 0).ToBytes()...) + return startKey, endKey } func encodeLastCommittedBlockVal(blockNum uint64) []byte { @@ -107,47 +108,52 @@ func decodeDataValue(datavalueBytes []byte) (*rwset.CollectionPvtReadWriteSet, e return collPvtdata, err } -func encodeMissingDataKey(key *missingDataKey) []byte { - if key.isEligible { - // When missing pvtData reconciler asks for missing data info, - // it is necessary to pass the missing pvtdata info associated with - // the most recent block so that missing pvtdata in the state db can - // be fixed sooner to reduce the "private data matching public hash version - // is not available" error during endorserments. In order to give priority - // to missing pvtData in the most recent block, we use reverse order - // preserving encoding for the missing data key. This simplifies the - // implementation of GetMissingPvtDataInfoForMostRecentBlocks(). - keyBytes := append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(key.blkNum)...) - keyBytes = append(keyBytes, []byte(key.ns)...) - keyBytes = append(keyBytes, nilByte) - return append(keyBytes, []byte(key.coll)...) - } +func encodeElgPrioMissingDataKey(key *missingDataKey) []byte { + // When missing pvtData reconciler asks for missing data info, + // it is necessary to pass the missing pvtdata info associated with + // the most recent block so that missing pvtdata in the state db can + // be fixed sooner to reduce the "private data matching public hash version + // is not available" error during endorserments. In order to give priority + // to missing pvtData in the most recent block, we use reverse order + // preserving encoding for the missing data key. This simplifies the + // implementation of GetMissingPvtDataInfoForMostRecentBlocks(). + encKey := append(elgPrioritizedMissingDataGroup, encodeReverseOrderVarUint64(key.blkNum)...) + encKey = append(encKey, []byte(key.ns)...) + encKey = append(encKey, nilByte) + return append(encKey, []byte(key.coll)...) +} - keyBytes := append(ineligibleMissingDataKeyPrefix, []byte(key.ns)...) - keyBytes = append(keyBytes, nilByte) - keyBytes = append(keyBytes, []byte(key.coll)...) - keyBytes = append(keyBytes, nilByte) - return append(keyBytes, []byte(encodeReverseOrderVarUint64(key.blkNum))...) +func encodeElgDeprioMissingDataKey(key *missingDataKey) []byte { + encKey := append(elgDeprioritizedMissingDataGroup, encodeReverseOrderVarUint64(key.blkNum)...) + encKey = append(encKey, []byte(key.ns)...) + encKey = append(encKey, nilByte) + return append(encKey, []byte(key.coll)...) } -func decodeMissingDataKey(keyBytes []byte) *missingDataKey { +func decodeElgMissingDataKey(keyBytes []byte) *missingDataKey { key := &missingDataKey{nsCollBlk: nsCollBlk{}} - if keyBytes[0] == eligibleMissingDataKeyPrefix[0] { - blkNum, numBytesConsumed := decodeReverseOrderVarUint64(keyBytes[1:]) - - splittedKey := bytes.Split(keyBytes[numBytesConsumed+1:], []byte{nilByte}) - key.ns = string(splittedKey[0]) - key.coll = string(splittedKey[1]) - key.blkNum = blkNum - key.isEligible = true - return key - } + blkNum, numBytesConsumed := decodeReverseOrderVarUint64(keyBytes[1:]) + splittedKey := bytes.Split(keyBytes[numBytesConsumed+1:], []byte{nilByte}) + key.ns = string(splittedKey[0]) + key.coll = string(splittedKey[1]) + key.blkNum = blkNum + return key +} + +func encodeInelgMissingDataKey(key *missingDataKey) []byte { + encKey := append(inelgMissingDataGroup, []byte(key.ns)...) + encKey = append(encKey, nilByte) + encKey = append(encKey, []byte(key.coll)...) + encKey = append(encKey, nilByte) + return append(encKey, []byte(encodeReverseOrderVarUint64(key.blkNum))...) +} +func decodeInelgMissingDataKey(keyBytes []byte) *missingDataKey { + key := &missingDataKey{nsCollBlk: nsCollBlk{}} splittedKey := bytes.SplitN(keyBytes[1:], []byte{nilByte}, 3) //encoded bytes for blknum may contain empty bytes key.ns = string(splittedKey[0]) key.coll = string(splittedKey[1]) key.blkNum, _ = decodeReverseOrderVarUint64(splittedKey[2]) - key.isEligible = false return key } @@ -184,27 +190,34 @@ func decodeCollElgVal(b []byte) (*CollElgInfo, error) { return m, nil } -func createRangeScanKeysForEligibleMissingDataEntries(blkNum uint64) (startKey, endKey []byte) { - startKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(blkNum)...) - endKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(0)...) +func createRangeScanKeysForElgMissingData(blkNum uint64, group []byte) ([]byte, []byte) { + startKey := append(group, encodeReverseOrderVarUint64(blkNum)...) + endKey := append(group, encodeReverseOrderVarUint64(0)...) return startKey, endKey } -func createRangeScanKeysForIneligibleMissingData(maxBlkNum uint64, ns, coll string) (startKey, endKey []byte) { - startKey = encodeMissingDataKey( +func createRangeScanKeysForInelgMissingData(maxBlkNum uint64, ns, coll string) ([]byte, []byte) { + startKey := encodeInelgMissingDataKey( &missingDataKey{ - nsCollBlk: nsCollBlk{ns: ns, coll: coll, blkNum: maxBlkNum}, - isEligible: false, + nsCollBlk: nsCollBlk{ + ns: ns, + coll: coll, + blkNum: maxBlkNum, + }, }, ) - endKey = encodeMissingDataKey( + endKey := encodeInelgMissingDataKey( &missingDataKey{ - nsCollBlk: nsCollBlk{ns: ns, coll: coll, blkNum: 0}, - isEligible: false, + nsCollBlk: nsCollBlk{ + ns: ns, + coll: coll, + blkNum: 0, + }, }, ) - return + + return startKey, endKey } func createRangeScanKeysForCollElg() (startKey, endKey []byte) { @@ -212,16 +225,16 @@ func createRangeScanKeysForCollElg() (startKey, endKey []byte) { encodeCollElgKey(0) } -func datakeyRange(blockNum uint64) (startKey, endKey []byte) { - startKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...) - endKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, math.MaxUint64).ToBytes()...) - return +func datakeyRange(blockNum uint64) ([]byte, []byte) { + startKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...) + endKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum, math.MaxUint64).ToBytes()...) + return startKey, endKey } -func eligibleMissingdatakeyRange(blkNum uint64) (startKey, endKey []byte) { - startKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(blkNum)...) - endKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(blkNum-1)...) - return +func eligibleMissingdatakeyRange(blkNum uint64) ([]byte, []byte) { + startKey := append(elgPrioritizedMissingDataGroup, encodeReverseOrderVarUint64(blkNum)...) + endKey := append(elgPrioritizedMissingDataGroup, encodeReverseOrderVarUint64(blkNum-1)...) + return startKey, endKey } // encodeReverseOrderVarUint64 returns a byte-representation for a uint64 number such that diff --git a/core/ledger/pvtdatastorage/kv_encoding_test.go b/core/ledger/pvtdatastorage/kv_encoding_test.go index 46c49f97aba..71639f9243e 100644 --- a/core/ledger/pvtdatastorage/kv_encoding_test.go +++ b/core/ledger/pvtdatastorage/kv_encoding_test.go @@ -21,7 +21,7 @@ func TestDataKeyEncoding(t *testing.T) { require.Equal(t, dataKey1, datakey2) } -func TestDatakeyRange(t *testing.T) { +func TestDataKeyRange(t *testing.T) { blockNum := uint64(20) startKey, endKey := datakeyRange(blockNum) var txNum uint64 @@ -51,27 +51,36 @@ func TestDatakeyRange(t *testing.T) { } } -func TestEligibleMissingdataRange(t *testing.T) { +func TestEligibleMissingDataRange(t *testing.T) { blockNum := uint64(20) startKey, endKey := eligibleMissingdatakeyRange(blockNum) var txNum uint64 for txNum = 0; txNum < 100; txNum++ { - keyOfBlock := encodeMissingDataKey( + keyOfBlock := encodeElgPrioMissingDataKey( &missingDataKey{ - nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum}, - isEligible: true, + nsCollBlk: nsCollBlk{ + ns: "ns", + coll: "coll", + blkNum: blockNum, + }, }, ) - keyOfPreviousBlock := encodeMissingDataKey( + keyOfPreviousBlock := encodeElgPrioMissingDataKey( &missingDataKey{ - nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum - 1}, - isEligible: true, + nsCollBlk: nsCollBlk{ + ns: "ns", + coll: "coll", + blkNum: blockNum - 1, + }, }, ) - keyOfNextBlock := encodeMissingDataKey( + keyOfNextBlock := encodeElgPrioMissingDataKey( &missingDataKey{ - nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum + 1}, - isEligible: true, + nsCollBlk: nsCollBlk{ + ns: "ns", + coll: "coll", + blkNum: blockNum + 1, + }, }, ) require.Equal(t, bytes.Compare(keyOfNextBlock, startKey), -1) @@ -99,19 +108,26 @@ func testEncodeDecodeMissingdataKey(t *testing.T, blkNum uint64) { t.Run("ineligibileKey", func(t *testing.T) { - key.isEligible = false - decodedKey := decodeMissingDataKey( - encodeMissingDataKey(key), + decodedKey := decodeInelgMissingDataKey( + encodeInelgMissingDataKey(key), ) require.Equal(t, key, decodedKey) }, ) - t.Run("ineligibileKey", + t.Run("eligiblePrioritizedKey", + func(t *testing.T) { + decodedKey := decodeElgMissingDataKey( + encodeElgPrioMissingDataKey(key), + ) + require.Equal(t, key, decodedKey) + }, + ) + + t.Run("eligibleDeprioritizedKey", func(t *testing.T) { - key.isEligible = true - decodedKey := decodeMissingDataKey( - encodeMissingDataKey(key), + decodedKey := decodeElgMissingDataKey( + encodeElgDeprioMissingDataKey(key), ) require.Equal(t, key, decodedKey) }, diff --git a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go index fc5c6a19657..2030c6b2c1f 100644 --- a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go +++ b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata.go @@ -10,9 +10,17 @@ import ( "github.com/hyperledger/fabric-protos-go/ledger/rwset" "github.com/hyperledger/fabric/common/ledger/util/leveldbhelper" "github.com/hyperledger/fabric/core/ledger" + "github.com/pkg/errors" "github.com/willf/bitset" ) +type elgMissingDataGroup int + +const ( + prioritized elgMissingDataGroup = iota + deprioritized +) + // CommitPvtDataOfOldBlocks commits the pvtData (i.e., previously missing data) of old blockp. // The parameter `blocksPvtData` refers a list of old block's pvtdata which are missing in the pvtstore. // Given a list of old block's pvtData, `CommitPvtDataOfOldBlocks` performs the following three @@ -21,91 +29,169 @@ import ( // from the above created data entries // (2) create a db update batch from the update entries // (3) commit the update batch to the pvtStore -func (s *Store) CommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error { +func (s *Store) CommitPvtDataOfOldBlocks( + blocksPvtData map[uint64][]*ledger.TxPvtData, + unreconciledMissingData ledger.MissingPvtDataInfo, +) error { + s.purgerLock.Lock() + defer s.purgerLock.Unlock() + + deprioritizedMissingData := unreconciledMissingData + if s.isLastUpdatedOldBlocksSet { return &ErrIllegalCall{`The lastUpdatedOldBlocksList is set. It means that the stateDB may not be in sync with the pvtStore`} } - oldBlkDataProccessor := &oldBlockDataProcessor{ + p := &oldBlockDataProcessor{ Store: s, + entries: &entriesForPvtDataOfOldBlocks{ + dataEntries: make(map[dataKey]*rwset.CollectionPvtReadWriteSet), + expiryEntries: make(map[expiryKey]*ExpiryData), + prioritizedMissingDataEntries: make(map[nsCollBlk]*bitset.BitSet), + deprioritizedMissingDataEntries: make(map[nsCollBlk]*bitset.BitSet), + }, } - logger.Debugf("Constructing pvtdatastore entries for pvtData of [%d] old blocks", len(blocksPvtData)) - entries, err := oldBlkDataProccessor.constructEntries(blocksPvtData) - if err != nil { + if err := p.prepareDataAndExpiryEntries(blocksPvtData); err != nil { return err } - logger.Debug("Constructing update batch from pvtdatastore entries") - batch := s.db.NewUpdateBatch() - if err := entries.addToUpdateBatch(batch); err != nil { + if err := p.prepareMissingDataEntriesToReflectReconciledData(); err != nil { return err } - logger.Debug("Committing the update batch to pvtdatastore") + if err := p.prepareMissingDataEntriesToReflectPriority(deprioritizedMissingData); err != nil { + return err + } + + batch, err := p.constructDBUpdateBatch() + if err != nil { + return err + } return s.db.WriteBatch(batch, true) } type oldBlockDataProcessor struct { *Store + entries *entriesForPvtDataOfOldBlocks } -func (p *oldBlockDataProcessor) constructEntries(blocksPvtData map[uint64][]*ledger.TxPvtData) (*entriesForPvtDataOfOldBlocks, error) { +func (p *oldBlockDataProcessor) prepareDataAndExpiryEntries(blocksPvtData map[uint64][]*ledger.TxPvtData) error { var dataEntries []*dataEntry + var expData *ExpiryData + for blkNum, pvtData := range blocksPvtData { dataEntries = append(dataEntries, prepareDataEntries(blkNum, pvtData)...) } - entries := &entriesForPvtDataOfOldBlocks{ - dataEntries: make(map[dataKey]*rwset.CollectionPvtReadWriteSet), - expiryEntries: make(map[expiryKey]*ExpiryData), - missingDataEntries: make(map[nsCollBlk]*bitset.BitSet), - } - for _, dataEntry := range dataEntries { - var expData *ExpiryData nsCollBlk := dataEntry.key.nsCollBlk txNum := dataEntry.key.txNum - expKey, err := p.constructExpiryKeyFromDataEntry(dataEntry) + expKey, err := p.constructExpiryKey(dataEntry) if err != nil { - return nil, err + return err } - if !neverExpires(expKey.expiringBlk) { - if expData, err = p.getExpiryDataFromEntriesOrStore(entries, expKey); err != nil { - return nil, err - } - if expData == nil { - // data entry is already expired - // and purged (a rare scenario) - continue - } - expData.addPresentData(nsCollBlk.ns, nsCollBlk.coll, txNum) + + if neverExpires(expKey.expiringBlk) { + p.entries.dataEntries[*dataEntry.key] = dataEntry.value + continue + } + + if expData, err = p.getExpiryDataFromEntriesOrStore(expKey); err != nil { + return err + } + if expData == nil { + // if expiryData is not available, it means that + // the pruge scheduler removed these entries and the + // associated data entry is no longer needed. Note + // that the associated missingData entry would also + // be not present. Hence, we can skip this data entry. + continue } + expData.addPresentData(nsCollBlk.ns, nsCollBlk.coll, txNum) - var missingData *bitset.BitSet - if missingData, err = p.getMissingDataFromEntriesOrStore(entries, nsCollBlk); err != nil { - return nil, err + p.entries.dataEntries[*dataEntry.key] = dataEntry.value + p.entries.expiryEntries[expKey] = expData + } + return nil +} + +func (p *oldBlockDataProcessor) prepareMissingDataEntriesToReflectReconciledData() error { + for dataKey := range p.entries.dataEntries { + key := dataKey.nsCollBlk + txNum := uint(dataKey.txNum) + + prioMissingData, err := p.getPrioMissingDataFromEntriesOrStore(key) + if err != nil { + return err } - if missingData == nil { - // data entry is already expired - // and purged (a rare scenario) + if prioMissingData != nil && prioMissingData.Test(txNum) { + p.entries.prioritizedMissingDataEntries[key] = prioMissingData.Clear(txNum) continue } - missingData.Clear(uint(txNum)) - entries.add(dataEntry, expKey, expData, missingData) + deprioMissingData, err := p.getDeprioMissingDataFromEntriesOrStore(key) + if err != nil { + return err + } + if deprioMissingData != nil && deprioMissingData.Test(txNum) { + p.entries.deprioritizedMissingDataEntries[key] = deprioMissingData.Clear(txNum) + } + } + + return nil +} + +func (p *oldBlockDataProcessor) prepareMissingDataEntriesToReflectPriority(deprioritizedList ledger.MissingPvtDataInfo) error { + for blkNum, blkMissingData := range deprioritizedList { + for txNum, txMissingData := range blkMissingData { + for _, nsColl := range txMissingData { + key := nsCollBlk{ + ns: nsColl.Namespace, + coll: nsColl.Collection, + blkNum: blkNum, + } + txNum := uint(txNum) + + prioMissingData, err := p.getPrioMissingDataFromEntriesOrStore(key) + if err != nil { + return err + } + if prioMissingData == nil { + // we would reach here when either of the following happens: + // (1) when the purge scheduler already removed the respective + // missing data entry. + // (2) when the missing data info is already persistent in the + // deprioritized list. Currently, we do not have different + // levels of deprioritized list. + // In both of the above case, we can continue to the next entry. + continue + } + p.entries.prioritizedMissingDataEntries[key] = prioMissingData.Clear(txNum) + + deprioMissingData, err := p.getDeprioMissingDataFromEntriesOrStore(key) + if err != nil { + return err + } + if deprioMissingData == nil { + deprioMissingData = &bitset.BitSet{} + } + p.entries.deprioritizedMissingDataEntries[key] = deprioMissingData.Set(txNum) + } + } } - return entries, nil + + return nil } -func (p *oldBlockDataProcessor) constructExpiryKeyFromDataEntry(dataEntry *dataEntry) (expiryKey, error) { +func (p *oldBlockDataProcessor) constructExpiryKey(dataEntry *dataEntry) (expiryKey, error) { // get the expiryBlk number to construct the expiryKey nsCollBlk := dataEntry.key.nsCollBlk expiringBlk, err := p.btlPolicy.GetExpiringBlock(nsCollBlk.ns, nsCollBlk.coll, nsCollBlk.blkNum) if err != nil { - return expiryKey{}, err + return expiryKey{}, errors.WithMessagef(err, "error while constructing expiry data key") } return expiryKey{ @@ -114,105 +200,132 @@ func (p *oldBlockDataProcessor) constructExpiryKeyFromDataEntry(dataEntry *dataE }, nil } -func (p *oldBlockDataProcessor) getExpiryDataFromEntriesOrStore(entries *entriesForPvtDataOfOldBlocks, expiryKey expiryKey) (*ExpiryData, error) { - if expiryData, ok := entries.expiryEntries[expiryKey]; ok { +func (p *oldBlockDataProcessor) getExpiryDataFromEntriesOrStore(expKey expiryKey) (*ExpiryData, error) { + if expiryData, ok := p.entries.expiryEntries[expKey]; ok { return expiryData, nil } - expiryData, err := p.getExpiryDataOfExpiryKey(&expiryKey) + expData, err := p.db.Get(encodeExpiryKey(&expKey)) if err != nil { return nil, err } - return expiryData, nil + if expData == nil { + return nil, nil + } + + return decodeExpiryValue(expData) } -func (p *oldBlockDataProcessor) getMissingDataFromEntriesOrStore(entries *entriesForPvtDataOfOldBlocks, nsCollBlk nsCollBlk) (*bitset.BitSet, error) { - if missingData, ok := entries.missingDataEntries[nsCollBlk]; ok { +func (p *oldBlockDataProcessor) getPrioMissingDataFromEntriesOrStore(nsCollBlk nsCollBlk) (*bitset.BitSet, error) { + missingData, ok := p.entries.prioritizedMissingDataEntries[nsCollBlk] + if ok { return missingData, nil } - missingDataKey := &missingDataKey{ - nsCollBlk: nsCollBlk, - isEligible: true, + missingKey := &missingDataKey{ + nsCollBlk: nsCollBlk, } - missingData, err := p.getBitmapOfMissingDataKey(missingDataKey) + key := encodeElgPrioMissingDataKey(missingKey) + + encMissingData, err := p.db.Get(key) if err != nil { - return nil, err + return nil, errors.Wrap(err, "error while getting missing data bitmap from the store") + } + if encMissingData == nil { + return nil, nil } - return missingData, nil -} -type entriesForPvtDataOfOldBlocks struct { - dataEntries map[dataKey]*rwset.CollectionPvtReadWriteSet - expiryEntries map[expiryKey]*ExpiryData - missingDataEntries map[nsCollBlk]*bitset.BitSet + return decodeMissingDataValue(encMissingData) } -func (e *entriesForPvtDataOfOldBlocks) add(datEntry *dataEntry, expKey expiryKey, expData *ExpiryData, missingData *bitset.BitSet) { - dataKey := dataKey{ - nsCollBlk: datEntry.key.nsCollBlk, - txNum: datEntry.key.txNum, +func (p *oldBlockDataProcessor) getDeprioMissingDataFromEntriesOrStore(nsCollBlk nsCollBlk) (*bitset.BitSet, error) { + missingData, ok := p.entries.deprioritizedMissingDataEntries[nsCollBlk] + if ok { + return missingData, nil + } + + missingKey := &missingDataKey{ + nsCollBlk: nsCollBlk, } - e.dataEntries[dataKey] = datEntry.value + key := encodeElgDeprioMissingDataKey(missingKey) - if expData != nil { - e.expiryEntries[expKey] = expData + encMissingData, err := p.db.Get(key) + if err != nil { + return nil, errors.Wrap(err, "error while getting missing data bitmap from the store") + } + if encMissingData == nil { + return nil, nil } - e.missingDataEntries[dataKey.nsCollBlk] = missingData + return decodeMissingDataValue(encMissingData) } -func (e *entriesForPvtDataOfOldBlocks) addToUpdateBatch(batch *leveldbhelper.UpdateBatch) error { - if err := e.addDataEntriesToUpdateBatch(batch); err != nil { - return err +func (p *oldBlockDataProcessor) constructDBUpdateBatch() (*leveldbhelper.UpdateBatch, error) { + batch := p.db.NewUpdateBatch() + + if err := p.entries.addDataEntriesTo(batch); err != nil { + return nil, errors.WithMessage(err, "error while adding data entries to the update batch") } - if err := e.addExpiryEntriesToUpdateBatch(batch); err != nil { - return err + if err := p.entries.addExpiryEntriesTo(batch); err != nil { + return nil, errors.WithMessage(err, "error while adding expiry entries to the update batch") } - return e.addMissingDataEntriesToUpdateBatch(batch) + if err := p.entries.addElgPrioMissingDataEntriesTo(batch); err != nil { + return nil, errors.WithMessage(err, "error while adding eligible prioritized missing data entries to the update batch") + } + + if err := p.entries.addElgDeprioMissingDataEntriesTo(batch); err != nil { + return nil, errors.WithMessage(err, "error while adding eligible deprioritized missing data entries to the update batch") + } + + return batch, nil +} + +type entriesForPvtDataOfOldBlocks struct { + dataEntries map[dataKey]*rwset.CollectionPvtReadWriteSet + expiryEntries map[expiryKey]*ExpiryData + prioritizedMissingDataEntries map[nsCollBlk]*bitset.BitSet + deprioritizedMissingDataEntries map[nsCollBlk]*bitset.BitSet } -func (e *entriesForPvtDataOfOldBlocks) addDataEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch) error { +func (e *entriesForPvtDataOfOldBlocks) addDataEntriesTo(batch *leveldbhelper.UpdateBatch) error { var key, val []byte var err error for dataKey, pvtData := range e.dataEntries { key = encodeDataKey(&dataKey) if val, err = encodeDataValue(pvtData); err != nil { - return err + return errors.Wrap(err, "error while encoding data value") } batch.Put(key, val) } return nil } -func (e *entriesForPvtDataOfOldBlocks) addExpiryEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch) error { +func (e *entriesForPvtDataOfOldBlocks) addExpiryEntriesTo(batch *leveldbhelper.UpdateBatch) error { var key, val []byte var err error for expiryKey, expiryData := range e.expiryEntries { key = encodeExpiryKey(&expiryKey) if val, err = encodeExpiryValue(expiryData); err != nil { - return err + return errors.Wrap(err, "error while encoding expiry value") } batch.Put(key, val) } return nil } -func (e *entriesForPvtDataOfOldBlocks) addMissingDataEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch) error { +func (e *entriesForPvtDataOfOldBlocks) addElgPrioMissingDataEntriesTo(batch *leveldbhelper.UpdateBatch) error { var key, val []byte var err error - for nsCollBlk, missingData := range e.missingDataEntries { - key = encodeMissingDataKey( - &missingDataKey{ - nsCollBlk: nsCollBlk, - isEligible: true, - }, - ) + for nsCollBlk, missingData := range e.prioritizedMissingDataEntries { + missingKey := &missingDataKey{ + nsCollBlk: nsCollBlk, + } + key = encodeElgPrioMissingDataKey(missingKey) if missingData.None() { batch.Delete(key) @@ -220,7 +333,30 @@ func (e *entriesForPvtDataOfOldBlocks) addMissingDataEntriesToUpdateBatch(batch } if val, err = encodeMissingDataValue(missingData); err != nil { - return err + return errors.Wrap(err, "error while encoding missing data bitmap") + } + batch.Put(key, val) + } + return nil +} + +func (e *entriesForPvtDataOfOldBlocks) addElgDeprioMissingDataEntriesTo(batch *leveldbhelper.UpdateBatch) error { + var key, val []byte + var err error + + for nsCollBlk, missingData := range e.deprioritizedMissingDataEntries { + missingKey := &missingDataKey{ + nsCollBlk: nsCollBlk, + } + key = encodeElgDeprioMissingDataKey(missingKey) + + if missingData.None() { + batch.Delete(key) + continue + } + + if val, err = encodeMissingDataValue(missingData); err != nil { + return errors.Wrap(err, "error while encoding missing data bitmap") } batch.Put(key, val) } diff --git a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go index 93653ebabff..4ac4749e184 100644 --- a/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go +++ b/core/ledger/pvtdatastorage/reconcile_missing_pvtdata_test.go @@ -7,196 +7,619 @@ SPDX-License-Identifier: Apache-2.0 package pvtdatastorage import ( + "fmt" "testing" - "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/ledger" btltestutil "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy/testutil" "github.com/stretchr/testify/require" + "github.com/willf/bitset" ) +type blockTxPvtDataInfoForTest struct { + blkNum uint64 + txNum uint64 + pvtDataPresent map[string][]string + pvtDataMissing map[string][]string +} + +type pvtDataForTest struct { + blockNum uint64 + pvtData []*ledger.TxPvtData + dataKeys []*dataKey + missingDataInfo ledger.TxMissingPvtData +} + func TestCommitPvtDataOfOldBlocks(t *testing.T) { btlPolicy := btltestutil.SampleBTLPolicy( map[[2]string]uint64{ - {"ns-1", "coll-1"}: 3, - {"ns-1", "coll-2"}: 1, + {"ns-1", "coll-1"}: 0, + {"ns-1", "coll-2"}: 0, {"ns-2", "coll-1"}: 0, - {"ns-2", "coll-2"}: 1, - {"ns-3", "coll-1"}: 0, - {"ns-3", "coll-2"}: 3, - {"ns-4", "coll-1"}: 0, - {"ns-4", "coll-2"}: 0, + {"ns-2", "coll-2"}: 0, }, ) env := NewTestStoreEnv(t, "TestCommitPvtDataOfOldBlocks", btlPolicy, pvtDataConf()) defer env.Cleanup() store := env.TestStore - testData := []*ledger.TxPvtData{ - produceSamplePvtdata(t, 2, []string{"ns-2:coll-1", "ns-2:coll-2"}), - produceSamplePvtdata(t, 4, []string{"ns-1:coll-1", "ns-1:coll-2", "ns-2:coll-1", "ns-2:coll-2"}), + blockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1", "coll-2"}, + "ns-2": {"coll-1", "coll-2"}, + }, + }, + { + blkNum: 1, + txNum: 2, + pvtDataPresent: map[string][]string{ + "ns-2": {"coll-1", "coll-2"}, + }, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1", "coll-2"}, + }, + }, + { + blkNum: 1, + txNum: 4, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1", "coll-2"}, + "ns-2": {"coll-1", "coll-2"}, + }, + }, + { + blkNum: 2, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1", "coll-2"}, + }, + }, + { + blkNum: 2, + txNum: 3, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + }, + }, } - // CONSTRUCT MISSING DATA FOR BLOCK 1 - blk1MissingData := make(ledger.TxMissingPvtData) - - // eligible missing data in tx1 - blk1MissingData.Add(1, "ns-1", "coll-1", true) - blk1MissingData.Add(1, "ns-1", "coll-2", true) - blk1MissingData.Add(1, "ns-2", "coll-1", true) - blk1MissingData.Add(1, "ns-2", "coll-2", true) - // eligible missing data in tx2 - blk1MissingData.Add(2, "ns-1", "coll-1", true) - blk1MissingData.Add(2, "ns-1", "coll-2", true) - blk1MissingData.Add(2, "ns-3", "coll-1", true) - blk1MissingData.Add(2, "ns-3", "coll-2", true) - - // CONSTRUCT MISSING DATA FOR BLOCK 2 - blk2MissingData := make(ledger.TxMissingPvtData) - // eligible missing data in tx1 - blk2MissingData.Add(1, "ns-1", "coll-1", true) - blk2MissingData.Add(1, "ns-1", "coll-2", true) - // eligible missing data in tx3 - blk2MissingData.Add(3, "ns-1", "coll-1", true) - - // COMMIT BLOCK 0 WITH NO DATA - require.NoError(t, store.Commit(0, nil, nil)) + blocksPvtData, missingDataSummary := constructPvtDataForTest(t, blockTxPvtDataInfo) - // COMMIT BLOCK 1 WITH PVTDATA AND MISSINGDATA - require.NoError(t, store.Commit(1, testData, blk1MissingData)) - - // COMMIT BLOCK 2 WITH PVTDATA AND MISSINGDATA - require.NoError(t, store.Commit(2, nil, blk2MissingData)) - - // CHECK MISSINGDATA ENTRIES ARE CORRECTLY STORED - expectedMissingPvtDataInfo := make(ledger.MissingPvtDataInfo) - // missing data in block1, tx1 - expectedMissingPvtDataInfo.Add(1, 1, "ns-1", "coll-1") - expectedMissingPvtDataInfo.Add(1, 1, "ns-1", "coll-2") - expectedMissingPvtDataInfo.Add(1, 1, "ns-2", "coll-1") - expectedMissingPvtDataInfo.Add(1, 1, "ns-2", "coll-2") - - // missing data in block1, tx2 - expectedMissingPvtDataInfo.Add(1, 2, "ns-1", "coll-1") - expectedMissingPvtDataInfo.Add(1, 2, "ns-1", "coll-2") - expectedMissingPvtDataInfo.Add(1, 2, "ns-3", "coll-1") - expectedMissingPvtDataInfo.Add(1, 2, "ns-3", "coll-2") - - // missing data in block2, tx1 - expectedMissingPvtDataInfo.Add(2, 1, "ns-1", "coll-1") - expectedMissingPvtDataInfo.Add(2, 1, "ns-1", "coll-2") - // missing data in block2, tx3 - expectedMissingPvtDataInfo.Add(2, 3, "ns-1", "coll-1") - - missingPvtDataInfo, err := store.GetMissingPvtDataInfoForMostRecentBlocks(2) - require.NoError(t, err) - require.Equal(t, expectedMissingPvtDataInfo, missingPvtDataInfo) - - // COMMIT THE MISSINGDATA IN BLOCK 1 AND BLOCK 2 - oldBlocksPvtData := make(map[uint64][]*ledger.TxPvtData) - oldBlocksPvtData[1] = []*ledger.TxPvtData{ - produceSamplePvtdata(t, 1, []string{"ns-1:coll-1", "ns-2:coll-1"}), - produceSamplePvtdata(t, 2, []string{"ns-1:coll-1", "ns-3:coll-1"}), - } - oldBlocksPvtData[2] = []*ledger.TxPvtData{ - produceSamplePvtdata(t, 3, []string{"ns-1:coll-1"}), + require.NoError(t, store.Commit(0, nil, nil)) + require.NoError(t, store.Commit(1, blocksPvtData[1].pvtData, blocksPvtData[1].missingDataInfo)) + require.NoError(t, store.Commit(2, blocksPvtData[2].pvtData, blocksPvtData[2].missingDataInfo)) + + assertMissingDataInfo(t, store, missingDataSummary, 2) + + // COMMIT some of the missing data in the block 1 and block 2 + oldBlockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-2"}, + "ns-2": {"coll-2"}, + }, + }, + { + blkNum: 1, + txNum: 2, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + }, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-2"}, + }, + }, + { + blkNum: 2, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1", "coll-2"}, + }, + }, + { + blkNum: 2, + txNum: 3, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + }, + }, } - err = store.CommitPvtDataOfOldBlocks(oldBlocksPvtData) - require.NoError(t, err) + blocksPvtData, missingDataSummary = constructPvtDataForTest(t, oldBlockTxPvtDataInfo) + oldBlocksPvtData := map[uint64][]*ledger.TxPvtData{ + 1: blocksPvtData[1].pvtData, + 2: blocksPvtData[2].pvtData, + } + require.NoError(t, store.CommitPvtDataOfOldBlocks(oldBlocksPvtData, nil)) - // ENSURE THAT THE PREVIOUSLY MISSING PVTDATA OF BLOCK 1 & 2 EXIST IN THE STORE - ns1Coll1Blk1Tx1 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: 1}, txNum: 1} - ns2Coll1Blk1Tx1 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-2", coll: "coll-1", blkNum: 1}, txNum: 1} - ns1Coll1Blk1Tx2 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: 1}, txNum: 2} - ns3Coll1Blk1Tx2 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-3", coll: "coll-1", blkNum: 1}, txNum: 2} - ns1Coll1Blk2Tx3 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: 2}, txNum: 3} - - require.True(t, testDataKeyExists(t, store, ns1Coll1Blk1Tx1)) - require.True(t, testDataKeyExists(t, store, ns2Coll1Blk1Tx1)) - require.True(t, testDataKeyExists(t, store, ns1Coll1Blk1Tx2)) - require.True(t, testDataKeyExists(t, store, ns3Coll1Blk1Tx2)) - require.True(t, testDataKeyExists(t, store, ns1Coll1Blk2Tx3)) - - // pvt data retrieval for block 2 should return the just committed pvtdata - var nilFilter ledger.PvtNsCollFilter - retrievedData, err := store.GetPvtDataByBlockNum(2, nilFilter) - require.NoError(t, err) - for i, data := range retrievedData { - require.Equal(t, data.SeqInBlock, oldBlocksPvtData[2][i].SeqInBlock) - require.True(t, proto.Equal(data.WriteSet, oldBlocksPvtData[2][i].WriteSet)) + for _, b := range blocksPvtData { + for _, dkey := range b.dataKeys { + require.True(t, testDataKeyExists(t, store, dkey)) + } } + assertMissingDataInfo(t, store, missingDataSummary, 2) +} - expectedMissingPvtDataInfo = make(ledger.MissingPvtDataInfo) - // missing data in block1, tx1 - expectedMissingPvtDataInfo.Add(1, 1, "ns-1", "coll-2") - expectedMissingPvtDataInfo.Add(1, 1, "ns-2", "coll-2") +func TestCommitPvtDataOfOldBlocksWithBTL(t *testing.T) { + setup := func(store *Store) { + blockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 1, + txNum: 2, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 1, + txNum: 3, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + } + + blocksPvtData, missingDataSummary := constructPvtDataForTest(t, blockTxPvtDataInfo) + + require.NoError(t, store.Commit(0, nil, nil)) + require.NoError(t, store.Commit(1, blocksPvtData[1].pvtData, blocksPvtData[1].missingDataInfo)) + + assertMissingDataInfo(t, store, missingDataSummary, 1) + + // COMMIT BLOCK 2 & 3 WITH NO PVTDATA + require.NoError(t, store.Commit(2, nil, nil)) + require.NoError(t, store.Commit(3, nil, nil)) + } - // missing data in block1, tx2 - expectedMissingPvtDataInfo.Add(1, 2, "ns-1", "coll-2") - expectedMissingPvtDataInfo.Add(1, 2, "ns-3", "coll-2") + t.Run("expired but not purged", func(t *testing.T) { + btlPolicy := btltestutil.SampleBTLPolicy( + map[[2]string]uint64{ + {"ns-1", "coll-1"}: 1, + {"ns-2", "coll-1"}: 1, + }, + ) + env := NewTestStoreEnv(t, "TestCommitPvtDataOfOldBlocksWithBTL", btlPolicy, pvtDataConf()) + defer env.Cleanup() + store := env.TestStore + + setup(store) + // in block 1, ns-1:coll-1 and ns-2:coll-2 should have expired but not purged. + // hence, the commit of pvtdata of block 1 transaction 1 should create entries + // in the store + oldBlockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + } + + blocksPvtData, _ := constructPvtDataForTest(t, oldBlockTxPvtDataInfo) + oldBlocksPvtData := map[uint64][]*ledger.TxPvtData{ + 1: blocksPvtData[1].pvtData, + } + deprioritizedList := ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 2: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + { + Namespace: "ns-2", + Collection: "coll-1", + }, + }, + }, + } + require.NoError(t, store.CommitPvtDataOfOldBlocks(oldBlocksPvtData, deprioritizedList)) + + for _, b := range blocksPvtData { + for _, dkey := range b.dataKeys { + require.True(t, testDataKeyExists(t, store, dkey)) + } + } + // as all missing data are expired, get missing info would return nil though + // it is not purged yet + assertMissingDataInfo(t, store, make(ledger.MissingPvtDataInfo), 1) + + // deprioritized list should be present + tests := []struct { + key nsCollBlk + expectedBitmap *bitset.BitSet + }{ + { + key: nsCollBlk{ + ns: "ns-1", + coll: "coll-1", + blkNum: 1, + }, + expectedBitmap: constructBitSetForTest(2), + }, + { + key: nsCollBlk{ + ns: "ns-2", + coll: "coll-1", + blkNum: 1, + }, + expectedBitmap: constructBitSetForTest(2), + }, + } + + for _, tt := range tests { + encKey := encodeElgDeprioMissingDataKey(&missingDataKey{tt.key}) + missingData, err := store.db.Get(encKey) + require.NoError(t, err) + + expectedMissingData, err := encodeMissingDataValue(tt.expectedBitmap) + require.NoError(t, err) + require.Equal(t, expectedMissingData, missingData) + } + }) + + t.Run("expired and purged", func(t *testing.T) { + btlPolicy := btltestutil.SampleBTLPolicy( + map[[2]string]uint64{ + {"ns-1", "coll-1"}: 1, + {"ns-2", "coll-1"}: 1, + }, + ) + env := NewTestStoreEnv(t, "TestCommitPvtDataOfOldBlocksWithBTL", btlPolicy, pvtDataConf()) + defer env.Cleanup() + store := env.TestStore + + setup(store) + require.NoError(t, store.Commit(4, nil, nil)) + + testWaitForPurgerRoutineToFinish(store) + + // in block 1, ns-1:coll-1 and ns-2:coll-2 should have expired and purged. + // hence, the commit of pvtdata of block 1 transaction 2 should not create + // entries in the store + oldBlockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 2, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + } + blocksPvtData, _ := constructPvtDataForTest(t, oldBlockTxPvtDataInfo) + oldBlocksPvtData := map[uint64][]*ledger.TxPvtData{ + 1: blocksPvtData[1].pvtData, + } + deprioritizedList := ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 3: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + { + Namespace: "ns-2", + Collection: "coll-1", + }, + }, + }, + } + require.NoError(t, store.CommitPvtDataOfOldBlocks(oldBlocksPvtData, deprioritizedList)) + + for _, b := range blocksPvtData { + for _, dkey := range b.dataKeys { + require.False(t, testDataKeyExists(t, store, dkey)) + } + } + + // deprioritized list should not be present + keys := []nsCollBlk{ + { + ns: "ns-1", + coll: "coll-1", + blkNum: 1, + }, + { + ns: "ns-2", + coll: "coll-1", + blkNum: 1, + }, + } + + for _, k := range keys { + encKey := encodeElgDeprioMissingDataKey(&missingDataKey{k}) + missingData, err := store.db.Get(encKey) + require.NoError(t, err) + require.Nil(t, missingData) + } + }) +} - // missing data in block2, tx1 - expectedMissingPvtDataInfo.Add(2, 1, "ns-1", "coll-1") - expectedMissingPvtDataInfo.Add(2, 1, "ns-1", "coll-2") +func TestCommitPvtDataOfOldBlocksWithDeprioritization(t *testing.T) { + blockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 1, + txNum: 2, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 2, + txNum: 1, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 2, + txNum: 2, + pvtDataMissing: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + } - missingPvtDataInfo, err = store.GetMissingPvtDataInfoForMostRecentBlocks(2) - require.NoError(t, err) - require.Equal(t, expectedMissingPvtDataInfo, missingPvtDataInfo) - - // COMMIT BLOCK 3 WITH NO PVTDATA - require.NoError(t, store.Commit(3, nil, nil)) - - // IN BLOCK 1, NS-1:COLL-2 AND NS-2:COLL-2 SHOULD HAVE EXPIRED BUT NOT PURGED - // HENCE, THE FOLLOWING COMMIT SHOULD CREATE ENTRIES IN THE STORE - oldBlocksPvtData = make(map[uint64][]*ledger.TxPvtData) - oldBlocksPvtData[1] = []*ledger.TxPvtData{ - produceSamplePvtdata(t, 1, []string{"ns-1:coll-2"}), // though expired, it - // would get committed to the store as it is not purged yet - produceSamplePvtdata(t, 2, []string{"ns-3:coll-2"}), // never expires + blocksPvtData, missingDataSummary := constructPvtDataForTest(t, blockTxPvtDataInfo) + + tests := []struct { + name string + deprioritizedList ledger.MissingPvtDataInfo + expectedPrioMissingDataKeys ledger.MissingPvtDataInfo + }{ + { + name: "all keys deprioritized", + deprioritizedList: missingDataSummary, + expectedPrioMissingDataKeys: make(ledger.MissingPvtDataInfo), + }, + { + name: "some keys deprioritized", + deprioritizedList: ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 2: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + }, + }, + 2: ledger.MissingBlockPvtdataInfo{ + 2: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + }, + }, + }, + expectedPrioMissingDataKeys: ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 1: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + { + Namespace: "ns-2", + Collection: "coll-1", + }, + }, + 2: { + { + Namespace: "ns-2", + Collection: "coll-1", + }, + }, + }, + 2: ledger.MissingBlockPvtdataInfo{ + 1: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + { + Namespace: "ns-2", + Collection: "coll-1", + }, + }, + 2: { + { + Namespace: "ns-2", + Collection: "coll-1", + }, + }, + }, + }, + }, } - err = store.CommitPvtDataOfOldBlocks(oldBlocksPvtData) - require.NoError(t, err) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + btlPolicy := btltestutil.SampleBTLPolicy( + map[[2]string]uint64{ + {"ns-1", "coll-1"}: 0, + {"ns-2", "coll-1"}: 0, + }, + ) + env := NewTestStoreEnv(t, "TestCommitPvtDataOfOldBlocksWithDeprio", btlPolicy, pvtDataConf()) + defer env.Cleanup() + store := env.TestStore + + // COMMIT BLOCK 0 WITH NO DATA + require.NoError(t, store.Commit(0, nil, nil)) + require.NoError(t, store.Commit(1, blocksPvtData[1].pvtData, blocksPvtData[1].missingDataInfo)) + require.NoError(t, store.Commit(2, blocksPvtData[2].pvtData, blocksPvtData[2].missingDataInfo)) + + assertMissingDataInfo(t, store, missingDataSummary, 2) + + require.NoError(t, store.CommitPvtDataOfOldBlocks(nil, tt.deprioritizedList)) + + prioMissingData, err := store.getMissingData(elgPrioritizedMissingDataGroup, 3) + require.NoError(t, err) + require.Equal(t, len(tt.expectedPrioMissingDataKeys), len(prioMissingData)) + for blkNum, txsMissingData := range tt.expectedPrioMissingDataKeys { + for txNum, expectedMissingData := range txsMissingData { + require.ElementsMatch(t, expectedMissingData, prioMissingData[blkNum][txNum]) + } + } + + deprioMissingData, err := store.getMissingData(elgDeprioritizedMissingDataGroup, 3) + require.NoError(t, err) + require.Equal(t, len(tt.deprioritizedList), len(deprioMissingData)) + for blkNum, txsMissingData := range tt.deprioritizedList { + for txNum, expectedMissingData := range txsMissingData { + require.ElementsMatch(t, expectedMissingData, deprioMissingData[blkNum][txNum]) + } + } + + oldBlockTxPvtDataInfo := []*blockTxPvtDataInfoForTest{ + { + blkNum: 1, + txNum: 1, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 1, + txNum: 2, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 2, + txNum: 1, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + { + blkNum: 2, + txNum: 2, + pvtDataPresent: map[string][]string{ + "ns-1": {"coll-1"}, + "ns-2": {"coll-1"}, + }, + }, + } + + pvtDataOfOldBlocks, _ := constructPvtDataForTest(t, oldBlockTxPvtDataInfo) + oldBlocksPvtData := map[uint64][]*ledger.TxPvtData{ + 1: pvtDataOfOldBlocks[1].pvtData, + 2: pvtDataOfOldBlocks[2].pvtData, + } + require.NoError(t, store.CommitPvtDataOfOldBlocks(oldBlocksPvtData, nil)) + + prioMissingData, err = store.getMissingData(elgPrioritizedMissingDataGroup, 3) + require.NoError(t, err) + require.Equal(t, make(ledger.MissingPvtDataInfo), prioMissingData) + + deprioMissingData, err = store.getMissingData(elgDeprioritizedMissingDataGroup, 3) + require.NoError(t, err) + require.Equal(t, make(ledger.MissingPvtDataInfo), deprioMissingData) + }) + } +} - ns1Coll2Blk1Tx1 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-2", blkNum: 1}, txNum: 1} - ns2Coll2Blk1Tx1 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-2", coll: "coll-2", blkNum: 1}, txNum: 1} - ns1Coll2Blk1Tx2 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-2", blkNum: 1}, txNum: 2} - ns3Coll2Blk1Tx2 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-3", coll: "coll-2", blkNum: 1}, txNum: 2} - - // though the pvtdata are expired but not purged yet, we do - // commit the data and hence the entries would exist in the - // store - require.True(t, testDataKeyExists(t, store, ns1Coll2Blk1Tx1)) // expired but committed - require.False(t, testDataKeyExists(t, store, ns2Coll2Blk1Tx1)) // expired but still missing - require.False(t, testDataKeyExists(t, store, ns1Coll2Blk1Tx2)) // expired still missing - require.True(t, testDataKeyExists(t, store, ns3Coll2Blk1Tx2)) // never expires - - // COMMIT BLOCK 4 WITH NO PVTDATA - require.NoError(t, store.Commit(4, nil, nil)) - - testWaitForPurgerRoutineToFinish(store) - - // IN BLOCK 1, NS-1:COLL-2 AND NS-2:COLL-2 SHOULD HAVE EXPIRED BUT NOT PURGED - // HENCE, THE FOLLOWING COMMIT SHOULD NOT CREATE ENTRIES IN THE STORE - oldBlocksPvtData = make(map[uint64][]*ledger.TxPvtData) - oldBlocksPvtData[1] = []*ledger.TxPvtData{ - // both data are expired and purged. hence, it won't be - // committed to the store - produceSamplePvtdata(t, 1, []string{"ns-2:coll-2"}), - produceSamplePvtdata(t, 2, []string{"ns-1:coll-2"}), +func constructPvtDataForTest(t *testing.T, blockInfo []*blockTxPvtDataInfoForTest) (map[uint64]*pvtDataForTest, ledger.MissingPvtDataInfo) { + blocksPvtData := make(map[uint64]*pvtDataForTest) + missingPvtDataInfoSummary := make(ledger.MissingPvtDataInfo) + + for _, b := range blockInfo { + p, ok := blocksPvtData[b.blkNum] + if !ok { + p = &pvtDataForTest{ + missingDataInfo: make(ledger.TxMissingPvtData), + } + blocksPvtData[b.blkNum] = p + } + + for ns, colls := range b.pvtDataMissing { + for _, coll := range colls { + p.missingDataInfo.Add(b.txNum, ns, coll, true) + missingPvtDataInfoSummary.Add(b.blkNum, b.txNum, ns, coll) + } + } + + var nsColls []string + for ns, colls := range b.pvtDataPresent { + for _, coll := range colls { + nsColls = append(nsColls, fmt.Sprintf("%s:%s", ns, coll)) + p.dataKeys = append(p.dataKeys, &dataKey{ + nsCollBlk: nsCollBlk{ + ns: ns, + coll: coll, + blkNum: b.blkNum, + }, + txNum: b.txNum, + }) + } + } + + if len(nsColls) == 0 { + continue + } + p.pvtData = append( + p.pvtData, + produceSamplePvtdata(t, b.txNum, nsColls), + ) } - err = store.CommitPvtDataOfOldBlocks(oldBlocksPvtData) - require.NoError(t, err) + return blocksPvtData, missingPvtDataInfoSummary +} - ns1Coll2Blk1Tx1 = &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-2", blkNum: 1}, txNum: 1} - ns2Coll2Blk1Tx1 = &dataKey{nsCollBlk: nsCollBlk{ns: "ns-2", coll: "coll-2", blkNum: 1}, txNum: 1} - ns1Coll2Blk1Tx2 = &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-2", blkNum: 1}, txNum: 2} - ns3Coll2Blk1Tx2 = &dataKey{nsCollBlk: nsCollBlk{ns: "ns-3", coll: "coll-2", blkNum: 1}, txNum: 2} +func assertMissingDataInfo(t *testing.T, store *Store, expected ledger.MissingPvtDataInfo, numRecentBlocks int) { + missingPvtDataInfo, err := store.GetMissingPvtDataInfoForMostRecentBlocks(numRecentBlocks) + require.NoError(t, err) + require.Equal(t, len(expected), len(missingPvtDataInfo)) + for blkNum, txsMissingData := range expected { + for txNum, expectedMissingData := range txsMissingData { + require.ElementsMatch(t, expectedMissingData, missingPvtDataInfo[blkNum][txNum]) + } + } +} - require.False(t, testDataKeyExists(t, store, ns1Coll2Blk1Tx1)) // purged - require.False(t, testDataKeyExists(t, store, ns2Coll2Blk1Tx1)) // purged - require.False(t, testDataKeyExists(t, store, ns1Coll2Blk1Tx2)) // purged - require.True(t, testDataKeyExists(t, store, ns3Coll2Blk1Tx2)) // never expires +func constructBitSetForTest(txNums ...uint) *bitset.BitSet { + bitmap := &bitset.BitSet{} + for _, txNum := range txNums { + bitmap.Set(txNum) + } + return bitmap } diff --git a/core/ledger/pvtdatastorage/store.go b/core/ledger/pvtdatastorage/store.go index 49c80a3b0b2..b229f5de051 100644 --- a/core/ledger/pvtdatastorage/store.go +++ b/core/ledger/pvtdatastorage/store.go @@ -21,7 +21,18 @@ import ( "github.com/willf/bitset" ) -var logger = flogging.MustGetLogger("pvtdatastorage") +var ( + logger = flogging.MustGetLogger("pvtdatastorage") + // The missing data entries are classified into three categories: + // (1) eligible prioritized + // (2) eligible deprioritized + // (3) ineligible + // The reconciler would fetch the eligible prioritized missing data + // from other peers. A chance for eligible deprioritized missing data + // would be given after giving deprioritizedMissingDataPeriodicity number + // of chances to the eligible prioritized missing data + deprioritizedMissingDataPeriodicity = 1000 +) // Provider provides handle to specific 'Store' that in turn manages // private write sets for a ledger @@ -63,6 +74,8 @@ type Store struct { // in the stateDB needs to be updated before finishing the // recovery operation. isLastUpdatedOldBlocksSet bool + + iterSinceDeprioMissingDataAccess int } type blkTranNumKey []byte @@ -94,13 +107,13 @@ type dataKey struct { type missingDataKey struct { nsCollBlk - isEligible bool } type storeEntries struct { - dataEntries []*dataEntry - expiryEntries []*expiryEntry - missingDataEntries map[missingDataKey]*bitset.BitSet + dataEntries []*dataEntry + expiryEntries []*expiryEntry + elgMissingDataEntries map[missingDataKey]*bitset.BitSet + inelgMissingDataEntries map[missingDataKey]*bitset.BitSet } // lastUpdatedOldBlocksList keeps the list of last updated blocks @@ -212,7 +225,7 @@ func (s *Store) Commit(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvtD batch := s.db.NewUpdateBatch() var err error - var keyBytes, valBytes []byte + var key, val []byte storeEntries, err := prepareStoreEntries(blockNum, pvtData, s.btlPolicy, missingPvtData) if err != nil { @@ -220,27 +233,37 @@ func (s *Store) Commit(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvtD } for _, dataEntry := range storeEntries.dataEntries { - keyBytes = encodeDataKey(dataEntry.key) - if valBytes, err = encodeDataValue(dataEntry.value); err != nil { + key = encodeDataKey(dataEntry.key) + if val, err = encodeDataValue(dataEntry.value); err != nil { return err } - batch.Put(keyBytes, valBytes) + batch.Put(key, val) } for _, expiryEntry := range storeEntries.expiryEntries { - keyBytes = encodeExpiryKey(expiryEntry.key) - if valBytes, err = encodeExpiryValue(expiryEntry.value); err != nil { + key = encodeExpiryKey(expiryEntry.key) + if val, err = encodeExpiryValue(expiryEntry.value); err != nil { + return err + } + batch.Put(key, val) + } + + for missingDataKey, missingDataValue := range storeEntries.elgMissingDataEntries { + key = encodeElgPrioMissingDataKey(&missingDataKey) + + if val, err = encodeMissingDataValue(missingDataValue); err != nil { return err } - batch.Put(keyBytes, valBytes) + batch.Put(key, val) } - for missingDataKey, missingDataValue := range storeEntries.missingDataEntries { - keyBytes = encodeMissingDataKey(&missingDataKey) - if valBytes, err = encodeMissingDataValue(missingDataValue); err != nil { + for missingDataKey, missingDataValue := range storeEntries.inelgMissingDataEntries { + key = encodeInelgMissingDataKey(&missingDataKey) + + if val, err = encodeMissingDataValue(missingDataValue); err != nil { return err } - batch.Put(keyBytes, valBytes) + batch.Put(key, val) } committingBlockNum := s.nextBlockNum() @@ -403,16 +426,27 @@ func (s *Store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.M return nil, nil } + if s.iterSinceDeprioMissingDataAccess == deprioritizedMissingDataPeriodicity { + s.iterSinceDeprioMissingDataAccess = 0 + return s.getMissingData(elgDeprioritizedMissingDataGroup, maxBlock) + } + + s.iterSinceDeprioMissingDataAccess++ + return s.getMissingData(elgPrioritizedMissingDataGroup, maxBlock) +} + +func (s *Store) getMissingData(group []byte, maxBlock int) (ledger.MissingPvtDataInfo, error) { missingPvtDataInfo := make(ledger.MissingPvtDataInfo) numberOfBlockProcessed := 0 lastProcessedBlock := uint64(0) isMaxBlockLimitReached := false + // as we are not acquiring a read lock, new blocks can get committed while we // construct the MissingPvtDataInfo. As a result, lastCommittedBlock can get // changed. To ensure consistency, we atomically load the lastCommittedBlock value lastCommittedBlock := atomic.LoadUint64(&s.lastCommittedBlock) - startKey, endKey := createRangeScanKeysForEligibleMissingDataEntries(lastCommittedBlock) + startKey, endKey := createRangeScanKeysForElgMissingData(lastCommittedBlock, group) dbItr, err := s.db.GetIterator(startKey, endKey) if err != nil { return nil, err @@ -421,7 +455,7 @@ func (s *Store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.M for dbItr.Next() { missingDataKeyBytes := dbItr.Key() - missingDataKey := decodeMissingDataKey(missingDataKeyBytes) + missingDataKey := decodeElgMissingDataKey(missingDataKeyBytes) if isMaxBlockLimitReached && (missingDataKey.blkNum != lastProcessedBlock) { // ensures that exactly maxBlock number @@ -512,26 +546,38 @@ func (s *Store) performPurgeIfScheduled(latestCommittedBlk uint64) { } func (s *Store) purgeExpiredData(minBlkNum, maxBlkNum uint64) error { - batch := s.db.NewUpdateBatch() expiryEntries, err := s.retrieveExpiryEntries(minBlkNum, maxBlkNum) if err != nil || len(expiryEntries) == 0 { return err } + + batch := s.db.NewUpdateBatch() for _, expiryEntry := range expiryEntries { - // this encoding could have been saved if the function retrieveExpiryEntries also returns the encoded expiry keys. - // However, keeping it for better readability batch.Delete(encodeExpiryKey(expiryEntry.key)) dataKeys, missingDataKeys := deriveKeys(expiryEntry) + for _, dataKey := range dataKeys { batch.Delete(encodeDataKey(dataKey)) } + for _, missingDataKey := range missingDataKeys { - batch.Delete(encodeMissingDataKey(missingDataKey)) + batch.Delete( + encodeElgPrioMissingDataKey(missingDataKey), + ) + batch.Delete( + encodeElgDeprioMissingDataKey(missingDataKey), + ) + batch.Delete( + encodeInelgMissingDataKey(missingDataKey), + ) } + if err := s.db.WriteBatch(batch, false); err != nil { return err } + batch.Reset() } + logger.Infof("[%s] - [%d] Entries purged from private data storage till block number [%d]", s.ledgerid, len(expiryEntries), maxBlkNum) return nil } @@ -606,7 +652,7 @@ func (s *Store) processCollElgEvents() error { var coll string for _, coll = range colls.Entries { logger.Infof("Converting missing data entries from ineligible to eligible for [ns=%s, coll=%s]", ns, coll) - startKey, endKey := createRangeScanKeysForIneligibleMissingData(blkNum, ns, coll) + startKey, endKey := createRangeScanKeysForInelgMissingData(blkNum, ns, coll) collItr, err := s.db.GetIterator(startKey, endKey) if err != nil { return err @@ -615,12 +661,14 @@ func (s *Store) processCollElgEvents() error { for collItr.Next() { // each entry originalKey, originalVal := collItr.Key(), collItr.Value() - modifiedKey := decodeMissingDataKey(originalKey) - modifiedKey.isEligible = true + modifiedKey := decodeInelgMissingDataKey(originalKey) batch.Delete(originalKey) copyVal := make([]byte, len(originalVal)) copy(copyVal, originalVal) - batch.Put(encodeMissingDataKey(modifiedKey), copyVal) + batch.Put( + encodeElgPrioMissingDataKey(modifiedKey), + copyVal, + ) collEntriesConverted++ if batch.Len() > s.maxBatchSize { s.db.WriteBatch(batch, true) @@ -711,30 +759,6 @@ func (c *collElgProcSync) waitForDone() { <-c.procComplete } -func (s *Store) getBitmapOfMissingDataKey(missingDataKey *missingDataKey) (*bitset.BitSet, error) { - var v []byte - var err error - if v, err = s.db.Get(encodeMissingDataKey(missingDataKey)); err != nil { - return nil, err - } - if v == nil { - return nil, nil - } - return decodeMissingDataValue(v) -} - -func (s *Store) getExpiryDataOfExpiryKey(expiryKey *expiryKey) (*ExpiryData, error) { - var v []byte - var err error - if v, err = s.db.Get(encodeExpiryKey(expiryKey)); err != nil { - return nil, err - } - if v == nil { - return nil, nil - } - return decodeExpiryValue(v) -} - // ErrIllegalCall is to be thrown by a store impl if the store does not expect a call to Prepare/Commit/Rollback/InitLastCommittedBlock type ErrIllegalCall struct { msg string diff --git a/core/ledger/pvtdatastorage/store_test.go b/core/ledger/pvtdatastorage/store_test.go index 6f4871fe3e2..f1e1efb522a 100644 --- a/core/ledger/pvtdatastorage/store_test.go +++ b/core/ledger/pvtdatastorage/store_test.go @@ -191,6 +191,77 @@ func TestStoreIteratorError(t *testing.T) { }) } +func TestGetMissingDataInfo(t *testing.T) { + ledgerid := "TestGetMissingDataInfoFromPrioDeprioList" + btlPolicy := btltestutil.SampleBTLPolicy( + map[[2]string]uint64{ + {"ns-1", "coll-1"}: 0, + {"ns-1", "coll-2"}: 0, + }, + ) + env := NewTestStoreEnv(t, ledgerid, btlPolicy, pvtDataConf()) + defer env.Cleanup() + store := env.TestStore + + // construct missing data for block 1 + blk1MissingData := make(ledger.TxMissingPvtData) + blk1MissingData.Add(1, "ns-1", "coll-1", true) + blk1MissingData.Add(1, "ns-1", "coll-2", true) + + require.NoError(t, store.Commit(0, nil, nil)) + require.NoError(t, store.Commit(1, nil, blk1MissingData)) + + deprioritizedList := ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 1: { + { + Namespace: "ns-1", + Collection: "coll-2", + }, + }, + }, + } + require.NoError(t, store.CommitPvtDataOfOldBlocks(nil, deprioritizedList)) + + expectedPrioMissingDataInfo := ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 1: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + }, + }, + } + + expectedDeprioMissingDataInfo := ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 1: { + { + Namespace: "ns-1", + Collection: "coll-2", + }, + }, + }, + } + + defaultVal := deprioritizedMissingDataPeriodicity + deprioritizedMissingDataPeriodicity = 10 + defer func() { + deprioritizedMissingDataPeriodicity = defaultVal + }() + + for i := 1; i <= 55; i++ { + if i%11 == 0 { + // after ever 10 iterations of accessing the prioritized list, the + // deprioritized list would be accessed + assertMissingDataInfo(t, store, expectedDeprioMissingDataInfo, 2) + continue + } + assertMissingDataInfo(t, store, expectedPrioMissingDataInfo, 2) + } +} + func TestExpiryDataNotIncluded(t *testing.T) { ledgerid := "TestExpiryDataNotIncluded" btlPolicy := btltestutil.SampleBTLPolicy( @@ -339,6 +410,9 @@ func TestStorePurge(t *testing.T) { // eligible missing data in tx1 blk1MissingData.Add(1, "ns-1", "coll-1", true) blk1MissingData.Add(1, "ns-1", "coll-2", true) + // eligible missing data in tx3 + blk1MissingData.Add(3, "ns-1", "coll-1", true) + blk1MissingData.Add(3, "ns-1", "coll-2", true) // ineligible missing data in tx4 blk1MissingData.Add(4, "ns-3", "coll-1", false) blk1MissingData.Add(4, "ns-3", "coll-2", false) @@ -357,22 +431,38 @@ func TestStorePurge(t *testing.T) { ns2Coll2 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-2", coll: "coll-2", blkNum: 1}, txNum: 2} // eligible missingData entries for ns-1:coll-1, ns-1:coll-2 (neverExpires) should exist in store - ns1Coll1elgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: 1}, isEligible: true} - ns1Coll2elgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-2", blkNum: 1}, isEligible: true} + ns1Coll1elgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: 1}} + ns1Coll2elgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-2", blkNum: 1}} // ineligible missingData entries for ns-3:col-1, ns-3:coll-2 (neverExpires) should exist in store - ns3Coll1inelgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-3", coll: "coll-1", blkNum: 1}, isEligible: false} - ns3Coll2inelgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-3", coll: "coll-2", blkNum: 1}, isEligible: false} + ns3Coll1inelgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-3", coll: "coll-1", blkNum: 1}} + ns3Coll2inelgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-3", coll: "coll-2", blkNum: 1}} testWaitForPurgerRoutineToFinish(s) require.True(t, testDataKeyExists(t, s, ns1Coll1)) require.True(t, testDataKeyExists(t, s, ns2Coll2)) - require.True(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD)) - require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD)) + require.True(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll1elgMD)) + require.True(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll2elgMD)) - require.True(t, testMissingDataKeyExists(t, s, ns3Coll1inelgMD)) - require.True(t, testMissingDataKeyExists(t, s, ns3Coll2inelgMD)) + require.True(t, testInelgMissingDataKeyExists(t, s, ns3Coll1inelgMD)) + require.True(t, testInelgMissingDataKeyExists(t, s, ns3Coll2inelgMD)) + + deprioritizedList := ledger.MissingPvtDataInfo{ + 1: ledger.MissingBlockPvtdataInfo{ + 3: { + { + Namespace: "ns-1", + Collection: "coll-1", + }, + { + Namespace: "ns-1", + Collection: "coll-2", + }, + }, + }, + } + require.NoError(t, s.CommitPvtDataOfOldBlocks(nil, deprioritizedList)) // write pvt data for block 3 require.NoError(t, s.Commit(3, nil, nil)) @@ -381,11 +471,14 @@ func TestStorePurge(t *testing.T) { require.True(t, testDataKeyExists(t, s, ns1Coll1)) require.True(t, testDataKeyExists(t, s, ns2Coll2)) // eligible missingData entries for ns-1:coll-1, ns-1:coll-2 (neverExpires) should exist in store - require.True(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD)) - require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD)) + require.True(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll1elgMD)) + require.True(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll2elgMD)) + // some transactions which miss ns-1:coll-1 and ns-1:coll-2 has be moved to deprioritizedList list + require.True(t, testElgDeprioMissingDataKeyExists(t, s, ns1Coll1elgMD)) + require.True(t, testElgDeprioMissingDataKeyExists(t, s, ns1Coll2elgMD)) // ineligible missingData entries for ns-3:col-1, ns-3:coll-2 (neverExpires) should exist in store - require.True(t, testMissingDataKeyExists(t, s, ns3Coll1inelgMD)) - require.True(t, testMissingDataKeyExists(t, s, ns3Coll2inelgMD)) + require.True(t, testInelgMissingDataKeyExists(t, s, ns3Coll1inelgMD)) + require.True(t, testInelgMissingDataKeyExists(t, s, ns3Coll2inelgMD)) // write pvt data for block 4 require.NoError(t, s.Commit(4, nil, nil)) @@ -395,11 +488,13 @@ func TestStorePurge(t *testing.T) { require.False(t, testDataKeyExists(t, s, ns1Coll1)) require.True(t, testDataKeyExists(t, s, ns2Coll2)) // eligible missingData entries for ns-1:coll-1 should have expired and ns-1:coll-2 (neverExpires) should exist in store - require.False(t, testMissingDataKeyExists(t, s, ns1Coll1elgMD)) - require.True(t, testMissingDataKeyExists(t, s, ns1Coll2elgMD)) + require.False(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll1elgMD)) + require.True(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll2elgMD)) + require.False(t, testElgDeprioMissingDataKeyExists(t, s, ns1Coll1elgMD)) + require.True(t, testElgDeprioMissingDataKeyExists(t, s, ns1Coll2elgMD)) // ineligible missingData entries for ns-3:col-1 should have expired and ns-3:coll-2 (neverExpires) should exist in store - require.False(t, testMissingDataKeyExists(t, s, ns3Coll1inelgMD)) - require.True(t, testMissingDataKeyExists(t, s, ns3Coll2inelgMD)) + require.False(t, testInelgMissingDataKeyExists(t, s, ns3Coll1inelgMD)) + require.True(t, testInelgMissingDataKeyExists(t, s, ns3Coll2inelgMD)) // write pvt data for block 5 require.NoError(t, s.Commit(5, nil, nil)) @@ -689,9 +784,25 @@ func testDataKeyExists(t *testing.T, s *Store, dataKey *dataKey) bool { return len(val) != 0 } -func testMissingDataKeyExists(t *testing.T, s *Store, missingDataKey *missingDataKey) bool { - dataKeyBytes := encodeMissingDataKey(missingDataKey) - val, err := s.db.Get(dataKeyBytes) +func testElgPrioMissingDataKeyExists(t *testing.T, s *Store, missingDataKey *missingDataKey) bool { + key := encodeElgPrioMissingDataKey(missingDataKey) + + val, err := s.db.Get(key) + require.NoError(t, err) + return len(val) != 0 +} + +func testElgDeprioMissingDataKeyExists(t *testing.T, s *Store, missingDataKey *missingDataKey) bool { + key := encodeElgDeprioMissingDataKey(missingDataKey) + + val, err := s.db.Get(key) + require.NoError(t, err) + return len(val) != 0 +} +func testInelgMissingDataKeyExists(t *testing.T, s *Store, missingDataKey *missingDataKey) bool { + key := encodeInelgMissingDataKey(missingDataKey) + + val, err := s.db.Get(key) require.NoError(t, err) return len(val) != 0 } diff --git a/core/ledger/pvtdatastorage/test_exports.go b/core/ledger/pvtdatastorage/test_exports.go index caf36a06c6a..d7bfec4c89b 100644 --- a/core/ledger/pvtdatastorage/test_exports.go +++ b/core/ledger/pvtdatastorage/test_exports.go @@ -69,6 +69,8 @@ func (env *StoreEnv) CloseAndReopen() { // Cleanup cleansup the store env after testing func (env *StoreEnv) Cleanup() { + env.TestStoreProvider.Close() + env.TestStore.db.Close() if err := os.RemoveAll(env.conf.StorePath); err != nil { env.t.Errorf("error while removing path %s, %v", env.conf.StorePath, err) }