Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deprioritize unreconcilable missingPvtData #1721

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func (l *kvLedger) CommitPvtDataOfOldBlocks(reconciledPvtdata []*ledger.Reconcil

logger.Debugf("[%s:] Committing pvtData of [%d] old blocks to the pvtdatastore", l.ledgerID, len(reconciledPvtdata))

err = l.pvtdataStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData)
err = l.pvtdataStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData, nil)
if err != nil {
return nil, err
}
Expand Down
90 changes: 62 additions & 28 deletions core/ledger/pvtdatastorage/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@ func prepareStoreEntries(blockNum uint64, pvtData []*ledger.TxPvtData, btlPolicy
missingPvtData ledger.TxMissingPvtData) (*storeEntries, error) {
dataEntries := prepareDataEntries(blockNum, pvtData)

missingDataEntries := prepareMissingDataEntries(blockNum, missingPvtData)
elgMissingDataEntries, inelgMissingDataEntries := prepareMissingDataEntries(blockNum, missingPvtData)

expiryEntries, err := prepareExpiryEntries(blockNum, dataEntries, missingDataEntries, btlPolicy)
expiryEntries, err := prepareExpiryEntries(blockNum, dataEntries, elgMissingDataEntries, inelgMissingDataEntries, btlPolicy)
if err != nil {
return nil, err
}

return &storeEntries{
dataEntries: dataEntries,
expiryEntries: expiryEntries,
missingDataEntries: missingDataEntries}, nil
dataEntries: dataEntries,
expiryEntries: expiryEntries,
elgMissingDataEntries: elgMissingDataEntries,
inelgMissingDataEntries: inelgMissingDataEntries,
}, nil
}

func prepareDataEntries(blockNum uint64, pvtData []*ledger.TxPvtData) []*dataEntry {
Expand All @@ -48,42 +50,61 @@ func prepareDataEntries(blockNum uint64, pvtData []*ledger.TxPvtData) []*dataEnt
return dataEntries
}

func prepareMissingDataEntries(committingBlk uint64, missingPvtData ledger.TxMissingPvtData) map[missingDataKey]*bitset.BitSet {
missingDataEntries := make(map[missingDataKey]*bitset.BitSet)
func prepareMissingDataEntries(
committingBlk uint64,
missingPvtData ledger.TxMissingPvtData,
) (map[missingDataKey]*bitset.BitSet, map[missingDataKey]*bitset.BitSet) {
elgMissingDataEntries := make(map[missingDataKey]*bitset.BitSet)
inelgMissingDataEntries := make(map[missingDataKey]*bitset.BitSet)

for txNum, missingData := range missingPvtData {
for _, nsColl := range missingData {
key := missingDataKey{nsCollBlk{nsColl.Namespace, nsColl.Collection, committingBlk},
nsColl.IsEligible}

if _, ok := missingDataEntries[key]; !ok {
missingDataEntries[key] = &bitset.BitSet{}
key := missingDataKey{
nsCollBlk{
ns: nsColl.Namespace,
coll: nsColl.Collection,
blkNum: committingBlk,
},
}
bitmap := missingDataEntries[key]

bitmap.Set(uint(txNum))
switch nsColl.IsEligible {
case true:
if _, ok := elgMissingDataEntries[key]; !ok {
elgMissingDataEntries[key] = &bitset.BitSet{}
}
elgMissingDataEntries[key].Set(uint(txNum))
default:
if _, ok := inelgMissingDataEntries[key]; !ok {
inelgMissingDataEntries[key] = &bitset.BitSet{}
}
inelgMissingDataEntries[key].Set(uint(txNum))
}
}
}

return missingDataEntries
return elgMissingDataEntries, inelgMissingDataEntries
}

// prepareExpiryEntries returns expiry entries for both private data which is present in the committingBlk
// and missing private.
func prepareExpiryEntries(committingBlk uint64, dataEntries []*dataEntry, missingDataEntries map[missingDataKey]*bitset.BitSet,
func prepareExpiryEntries(committingBlk uint64, dataEntries []*dataEntry, elgMissingDataEntries, inelgMissingDataEntries map[missingDataKey]*bitset.BitSet,
btlPolicy pvtdatapolicy.BTLPolicy) ([]*expiryEntry, error) {
var expiryEntries []*expiryEntry
mapByExpiringBlk := make(map[uint64]*ExpiryData)

// 1. prepare expiryData for non-missing data
for _, dataEntry := range dataEntries {
if err := prepareExpiryEntriesForPresentData(mapByExpiringBlk, dataEntry.key, btlPolicy); err != nil {
return nil, err
}
}

// 2. prepare expiryData for missing data
for missingDataKey := range missingDataEntries {
for missingDataKey := range elgMissingDataEntries {
if err := prepareExpiryEntriesForMissingData(mapByExpiringBlk, &missingDataKey, btlPolicy); err != nil {
return nil, err
}
}

for missingDataKey := range inelgMissingDataEntries {
if err := prepareExpiryEntriesForMissingData(mapByExpiringBlk, &missingDataKey, btlPolicy); err != nil {
return nil, err
}
Expand Down Expand Up @@ -139,25 +160,38 @@ func getOrCreateExpiryData(mapByExpiringBlk map[uint64]*ExpiryData, expiringBlk
}

// deriveKeys constructs dataKeys and missingDataKey from an expiryEntry
func deriveKeys(expiryEntry *expiryEntry) (dataKeys []*dataKey, missingDataKeys []*missingDataKey) {
func deriveKeys(expiryEntry *expiryEntry) ([]*dataKey, []*missingDataKey) {
var dataKeys []*dataKey
var missingDataKeys []*missingDataKey

for ns, colls := range expiryEntry.value.Map {
// 1. constructs dataKeys of expired existing pvt data
for coll, txNums := range colls.Map {
for _, txNum := range txNums.List {
dataKeys = append(dataKeys,
&dataKey{nsCollBlk{ns, coll, expiryEntry.key.committingBlk}, txNum})
&dataKey{
nsCollBlk: nsCollBlk{
ns: ns,
coll: coll,
blkNum: expiryEntry.key.committingBlk,
},
txNum: txNum,
})
}
}
// 2. constructs missingDataKeys of expired missing pvt data

for coll := range colls.MissingDataMap {
// one key for eligible entries and another for ieligible entries
missingDataKeys = append(missingDataKeys,
&missingDataKey{nsCollBlk{ns, coll, expiryEntry.key.committingBlk}, true})
missingDataKeys = append(missingDataKeys,
&missingDataKey{nsCollBlk{ns, coll, expiryEntry.key.committingBlk}, false})
&missingDataKey{
nsCollBlk: nsCollBlk{
ns: ns,
coll: coll,
blkNum: expiryEntry.key.committingBlk,
Comment on lines -155 to +188
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. One way is to replace isEligible with the group and then pass the information from here. There will be three groups -- elgPrio, elgDepri, and inElg. The only problem is that removing isEligible might impact moderate amount of existing code. I thought of doing it but many tests and normal commit path dependent on it. Hence, I left it as-is now and decided to do it in a separate PR. Let me know what do you think.

},
})
}
}
return

return dataKeys, missingDataKeys
}

func passesFilter(dataKey *dataKey, filter ledger.PvtNsCollFilter) bool {
Expand Down
147 changes: 80 additions & 67 deletions core/ledger/pvtdatastorage/kv_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,30 @@ import (
)

var (
pendingCommitKey = []byte{0}
lastCommittedBlkkey = []byte{1}
pvtDataKeyPrefix = []byte{2}
expiryKeyPrefix = []byte{3}
eligibleMissingDataKeyPrefix = []byte{4}
ineligibleMissingDataKeyPrefix = []byte{5}
collElgKeyPrefix = []byte{6}
lastUpdatedOldBlocksKey = []byte{7}
pendingCommitKey = []byte{0}
lastCommittedBlkkey = []byte{1}
pvtDataKeyPrefix = []byte{2}
expiryKeyPrefix = []byte{3}
elgPrioritizedMissingDataGroup = []byte{4}
inelgMissingDataGroup = []byte{5}
collElgKeyPrefix = []byte{6}
lastUpdatedOldBlocksKey = []byte{7}
elgDeprioritizedMissingDataGroup = []byte{8}

nilByte = byte(0)
emptyValue = []byte{}
)

func getDataKeysForRangeScanByBlockNum(blockNum uint64) (startKey, endKey []byte) {
startKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...)
endKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum+1, 0).ToBytes()...)
return
func getDataKeysForRangeScanByBlockNum(blockNum uint64) ([]byte, []byte) {
startKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...)
endKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum+1, 0).ToBytes()...)
return startKey, endKey
}

func getExpiryKeysForRangeScan(minBlkNum, maxBlkNum uint64) (startKey, endKey []byte) {
startKey = append(expiryKeyPrefix, version.NewHeight(minBlkNum, 0).ToBytes()...)
endKey = append(expiryKeyPrefix, version.NewHeight(maxBlkNum+1, 0).ToBytes()...)
return
func getExpiryKeysForRangeScan(minBlkNum, maxBlkNum uint64) ([]byte, []byte) {
startKey := append(expiryKeyPrefix, version.NewHeight(minBlkNum, 0).ToBytes()...)
endKey := append(expiryKeyPrefix, version.NewHeight(maxBlkNum+1, 0).ToBytes()...)
return startKey, endKey
}

func encodeLastCommittedBlockVal(blockNum uint64) []byte {
Expand Down Expand Up @@ -107,47 +108,52 @@ func decodeDataValue(datavalueBytes []byte) (*rwset.CollectionPvtReadWriteSet, e
return collPvtdata, err
}

func encodeMissingDataKey(key *missingDataKey) []byte {
if key.isEligible {
// When missing pvtData reconciler asks for missing data info,
// it is necessary to pass the missing pvtdata info associated with
// the most recent block so that missing pvtdata in the state db can
// be fixed sooner to reduce the "private data matching public hash version
// is not available" error during endorserments. In order to give priority
// to missing pvtData in the most recent block, we use reverse order
// preserving encoding for the missing data key. This simplifies the
// implementation of GetMissingPvtDataInfoForMostRecentBlocks().
keyBytes := append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(key.blkNum)...)
keyBytes = append(keyBytes, []byte(key.ns)...)
keyBytes = append(keyBytes, nilByte)
return append(keyBytes, []byte(key.coll)...)
}
func encodeElgPrioMissingDataKey(key *missingDataKey) []byte {
// When missing pvtData reconciler asks for missing data info,
// it is necessary to pass the missing pvtdata info associated with
// the most recent block so that missing pvtdata in the state db can
// be fixed sooner to reduce the "private data matching public hash version
// is not available" error during endorserments. In order to give priority
// to missing pvtData in the most recent block, we use reverse order
// preserving encoding for the missing data key. This simplifies the
// implementation of GetMissingPvtDataInfoForMostRecentBlocks().
encKey := append(elgPrioritizedMissingDataGroup, encodeReverseOrderVarUint64(key.blkNum)...)
encKey = append(encKey, []byte(key.ns)...)
encKey = append(encKey, nilByte)
return append(encKey, []byte(key.coll)...)
}

keyBytes := append(ineligibleMissingDataKeyPrefix, []byte(key.ns)...)
keyBytes = append(keyBytes, nilByte)
keyBytes = append(keyBytes, []byte(key.coll)...)
keyBytes = append(keyBytes, nilByte)
return append(keyBytes, []byte(encodeReverseOrderVarUint64(key.blkNum))...)
func encodeElgDeprioMissingDataKey(key *missingDataKey) []byte {
encKey := append(elgDeprioritizedMissingDataGroup, encodeReverseOrderVarUint64(key.blkNum)...)
encKey = append(encKey, []byte(key.ns)...)
encKey = append(encKey, nilByte)
return append(encKey, []byte(key.coll)...)
}

func decodeMissingDataKey(keyBytes []byte) *missingDataKey {
func decodeElgMissingDataKey(keyBytes []byte) *missingDataKey {
key := &missingDataKey{nsCollBlk: nsCollBlk{}}
if keyBytes[0] == eligibleMissingDataKeyPrefix[0] {
blkNum, numBytesConsumed := decodeReverseOrderVarUint64(keyBytes[1:])

splittedKey := bytes.Split(keyBytes[numBytesConsumed+1:], []byte{nilByte})
key.ns = string(splittedKey[0])
key.coll = string(splittedKey[1])
key.blkNum = blkNum
key.isEligible = true
return key
}
blkNum, numBytesConsumed := decodeReverseOrderVarUint64(keyBytes[1:])
splittedKey := bytes.Split(keyBytes[numBytesConsumed+1:], []byte{nilByte})
key.ns = string(splittedKey[0])
key.coll = string(splittedKey[1])
key.blkNum = blkNum
return key
}

func encodeInelgMissingDataKey(key *missingDataKey) []byte {
encKey := append(inelgMissingDataGroup, []byte(key.ns)...)
encKey = append(encKey, nilByte)
encKey = append(encKey, []byte(key.coll)...)
encKey = append(encKey, nilByte)
return append(encKey, []byte(encodeReverseOrderVarUint64(key.blkNum))...)
}

func decodeInelgMissingDataKey(keyBytes []byte) *missingDataKey {
key := &missingDataKey{nsCollBlk: nsCollBlk{}}
splittedKey := bytes.SplitN(keyBytes[1:], []byte{nilByte}, 3) //encoded bytes for blknum may contain empty bytes
key.ns = string(splittedKey[0])
key.coll = string(splittedKey[1])
key.blkNum, _ = decodeReverseOrderVarUint64(splittedKey[2])
key.isEligible = false
return key
}

Expand Down Expand Up @@ -184,44 +190,51 @@ func decodeCollElgVal(b []byte) (*CollElgInfo, error) {
return m, nil
}

func createRangeScanKeysForEligibleMissingDataEntries(blkNum uint64) (startKey, endKey []byte) {
startKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(blkNum)...)
endKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(0)...)
func createRangeScanKeysForElgMissingData(blkNum uint64, group []byte) ([]byte, []byte) {
startKey := append(group, encodeReverseOrderVarUint64(blkNum)...)
endKey := append(group, encodeReverseOrderVarUint64(0)...)

return startKey, endKey
}

func createRangeScanKeysForIneligibleMissingData(maxBlkNum uint64, ns, coll string) (startKey, endKey []byte) {
startKey = encodeMissingDataKey(
func createRangeScanKeysForInelgMissingData(maxBlkNum uint64, ns, coll string) ([]byte, []byte) {
startKey := encodeInelgMissingDataKey(
&missingDataKey{
nsCollBlk: nsCollBlk{ns: ns, coll: coll, blkNum: maxBlkNum},
isEligible: false,
nsCollBlk: nsCollBlk{
ns: ns,
coll: coll,
blkNum: maxBlkNum,
},
},
)
endKey = encodeMissingDataKey(
endKey := encodeInelgMissingDataKey(
&missingDataKey{
nsCollBlk: nsCollBlk{ns: ns, coll: coll, blkNum: 0},
isEligible: false,
nsCollBlk: nsCollBlk{
ns: ns,
coll: coll,
blkNum: 0,
},
},
)
return

return startKey, endKey
}

func createRangeScanKeysForCollElg() (startKey, endKey []byte) {
return encodeCollElgKey(math.MaxUint64),
encodeCollElgKey(0)
}

func datakeyRange(blockNum uint64) (startKey, endKey []byte) {
startKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...)
endKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, math.MaxUint64).ToBytes()...)
return
func datakeyRange(blockNum uint64) ([]byte, []byte) {
startKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...)
endKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum, math.MaxUint64).ToBytes()...)
return startKey, endKey
}

func eligibleMissingdatakeyRange(blkNum uint64) (startKey, endKey []byte) {
startKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(blkNum)...)
endKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(blkNum-1)...)
return
func eligibleMissingdatakeyRange(blkNum uint64) ([]byte, []byte) {
startKey := append(elgPrioritizedMissingDataGroup, encodeReverseOrderVarUint64(blkNum)...)
endKey := append(elgPrioritizedMissingDataGroup, encodeReverseOrderVarUint64(blkNum-1)...)
return startKey, endKey
}

// encodeReverseOrderVarUint64 returns a byte-representation for a uint64 number such that
Expand Down
Loading