Skip to content

Commit

Permalink
storage: add MakeEngineIterator and {Put,Clear}EngineKey
Browse files Browse the repository at this point in the history
- The EngineIterator interface is implemented by
  pebbleIterator. There are changes to method names
  in EngineIterator since Go does not allow method
  overloading.
- Callers that need to work with EngineKeys, since they will
  eventually need to see separated lock table keys in a
  non-interleaved manner, are switched to using the new
  interfaces.
- ReplicaDataIterator is split into ReplicaMVCCDataIterator
  and ReplicaEngineDataIterator. Only the gcIterator uses
  the former.
- Add PrintEngineKeyValue.
- There was a bug in EngineKey encoding that caused it to
  diverge from MVCCKey. This has been fixed and tests
  added.

Informs cockroachdb#41720

Release note: None
  • Loading branch information
sumeerbhola authored and rytaft committed Nov 5, 2020
1 parent 983f025 commit 32cd89c
Show file tree
Hide file tree
Showing 35 changed files with 664 additions and 216 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/engineccl/batch_repr.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,5 @@ func VerifyBatchRepr(
}
defer iter.Close()

return storage.ComputeStatsGo(iter, start.Key, end.Key, nowNanos)
return storage.ComputeStatsForRange(iter, start.Key, end.Key, nowNanos)
}
2 changes: 2 additions & 0 deletions pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.Impo
}
defer cArgs.EvalCtx.GetLimiters().ConcurrentImportRequests.Finish()

// The sstables only contain MVCC data and no intents, so using an MVCC
// iterator is sufficient.
var iters []storage.SimpleMVCCIterator
for _, file := range args.Files {
log.VEventf(ctx, 2, "import file %s %s", file.Path, args.Key)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/workloadccl/format/sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func ToSSTable(t workload.Table, tableID descpb.ID, ts time.Time) ([]byte, error
for kvBatch := range kvCh {
for _, kv := range kvBatch.KVs {
mvccKey := storage.MVCCKey{Timestamp: sstTS, Key: kv.Key}
if err := sw.Put(mvccKey, kv.Value.RawBytes); err != nil {
if err := sw.PutMVCC(mvccKey, kv.Value.RawBytes); err != nil {
return err
}
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func runDebugRangeData(cmd *cobra.Command, args []string) error {
return err
}

iter := rditer.NewReplicaDataIterator(&desc, db, debugCtx.replicated, false /* seekEnd */)
iter := rditer.NewReplicaEngineDataIterator(&desc, db, debugCtx.replicated)
defer iter.Close()
results := 0
for ; ; iter.Next() {
Expand All @@ -325,10 +325,7 @@ func runDebugRangeData(cmd *cobra.Command, args []string) error {
} else if !ok {
break
}
kvserver.PrintKeyValue(storage.MVCCKeyValue{
Key: iter.Key(),
Value: iter.Value(),
})
kvserver.PrintEngineKeyValue(iter.UnsafeKey(), iter.UnsafeValue())
results++
if results == debugCtx.maxResults {
break
Expand Down Expand Up @@ -461,9 +458,14 @@ Decode and print a hexadecimal-encoded key-value pair.
isTS := bytes.HasPrefix(bs[0], keys.TimeseriesPrefix)
k, err := storage.DecodeMVCCKey(bs[0])
if err != nil {
// Older versions of the consistency checker give you diffs with a raw_key that
// is already a roachpb.Key, so make a half-assed attempt to support both.
// - Could be an EngineKey.
// - Older versions of the consistency checker give you diffs with a raw_key that
// is already a roachpb.Key, so make a half-assed attempt to support both.
if !isTS {
if k, ok := storage.DecodeEngineKey(bs[0]); ok {
kvserver.PrintEngineKeyValue(k, bs[1])
return nil
}
fmt.Printf("unable to decode key: %v, assuming it's a roachpb.Key with fake timestamp;\n"+
"if the result below looks like garbage, then it likely is:\n\n", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func AddSSTable(

var stats enginepb.MVCCStats
if (ms == enginepb.MVCCStats{}) {
stats, err = storage.ComputeStatsGo(iter, start, end, now.UnixNano())
stats, err = storage.ComputeStatsForRange(iter, start, end, now.UnixNano())
if err != nil {
return 0, errors.Wrapf(err, "computing stats for SST [%s, %s)", start, end)
}
Expand Down Expand Up @@ -437,7 +437,7 @@ func AddSSTable(
return err
}

right.stats, err = storage.ComputeStatsGo(
right.stats, err = storage.ComputeStatsForRange(
iter, right.start, right.end, now.UnixNano(),
)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func init() {
}

// EvalAddSSTable evaluates an AddSSTable command.
// NB: These sstables do not contain intents/locks, so the code below only
// needs to deal with MVCCKeys.
func EvalAddSSTable(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, _ roachpb.Response,
) (result.Result, error) {
Expand Down Expand Up @@ -89,7 +91,7 @@ func EvalAddSSTable(
if args.MVCCStats == nil || verifyFastPath {
log.VEventf(ctx, 2, "computing MVCCStats for SSTable [%s,%s)", mvccStartKey.Key, mvccEndKey.Key)

computed, err := storage.ComputeStatsGo(
computed, err := storage.ComputeStatsForRange(
dataIter, mvccStartKey.Key, mvccEndKey.Key, h.Timestamp.WallTime)
if err != nil {
return result.Result{}, errors.Wrap(err, "computing SSTable MVCC stats")
Expand Down Expand Up @@ -179,7 +181,6 @@ func EvalAddSSTable(

ms.Add(stats)

// TODO(sumeer): use EngineIterator and replace the Put hack below.
if args.IngestAsWrites {
log.VEventf(ctx, 2, "ingesting SST (%d keys/%d bytes) via regular write batch", stats.KeyCount, len(args.Data))
dataIter.SeekGE(storage.MVCCKey{Key: keys.MinKey})
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func TestAddSSTableMVCCStats(t *testing.T) {
beforeStats := func() enginepb.MVCCStats {
iter := e.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
defer iter.Close()
beforeStats, err := storage.ComputeStatsGo(iter, roachpb.KeyMin, roachpb.KeyMax, 10)
beforeStats, err := storage.ComputeStatsForRange(iter, roachpb.KeyMin, roachpb.KeyMax, 10)
if err != nil {
t.Fatalf("%+v", err)
}
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestAddSSTableMVCCStats(t *testing.T) {
afterStats := func() enginepb.MVCCStats {
iter := e.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
defer iter.Close()
afterStats, err := storage.ComputeStatsGo(iter, roachpb.KeyMin, roachpb.KeyMax, 10)
afterStats, err := storage.ComputeStatsForRange(iter, roachpb.KeyMin, roachpb.KeyMax, 10)
if err != nil {
t.Fatalf("%+v", err)
}
Expand Down Expand Up @@ -526,7 +526,7 @@ func TestAddSSTableDisallowShadowing(t *testing.T) {
}
defer dataIter.Close()

stats, err := storage.ComputeStatsGo(dataIter, startKey, endKey, 0)
stats, err := storage.ComputeStatsForRange(dataIter, startKey, endKey, 0)
if err != nil {
t.Fatalf("%+v", err)
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/kv/kvserver/batcheval/cmd_clear_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,19 @@ func ClearRange(
// instead of using a range tombstone (inefficient for small ranges).
if total := statsDelta.Total(); total < ClearRangeBytesThreshold {
log.VEventf(ctx, 2, "delta=%d < threshold=%d; using non-range clear", total, ClearRangeBytesThreshold)
if err := readWriter.MVCCIterate(from, to, storage.MVCCKeyAndIntentsIterKind, func(kv storage.MVCCKeyValue) error {
if kv.Key.Timestamp.IsEmpty() {
// It can be an intent or an inline MVCCMetadata -- we have no idea.
// TODO(sumeer): cannot clear separated intents in this manner. Write the iteration code
// here instead of using Reader.MVCCIterate.
return readWriter.ClearUnversioned(kv.Key.Key)
iter := readWriter.NewEngineIterator(storage.IterOptions{UpperBound: to})
valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: from})
for ; valid; valid, err = iter.NextEngineKey() {
var k storage.EngineKey
if k, err = iter.UnsafeEngineKey(); err != nil {
break
}
return readWriter.ClearMVCC(kv.Key)
}); err != nil {
if err = readWriter.ClearEngineKey(k); err != nil {
return result.Result{}, err
}
}
iter.Close()
if err != nil {
return result.Result{}, err
}
return pd, nil
Expand Down
27 changes: 2 additions & 25 deletions pkg/kv/kvserver/batcheval/cmd_clear_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,39 +33,16 @@ type wrappedBatch struct {
clearRangeCount int
}

// TODO(sbhola): narrow the calls where we increment counters to
// make this test stricter.

func (wb *wrappedBatch) ClearMVCC(key storage.MVCCKey) error {
wb.clearCount++
return wb.Batch.ClearMVCC(key)
}

func (wb *wrappedBatch) ClearUnversioned(key roachpb.Key) error {
wb.clearCount++
return wb.Batch.ClearUnversioned(key)
}

func (wb *wrappedBatch) ClearIntent(key roachpb.Key) error {
func (wb *wrappedBatch) ClearEngineKey(key storage.EngineKey) error {
wb.clearCount++
return wb.Batch.ClearIntent(key)
}

func (wb *wrappedBatch) ClearRawRange(start, end roachpb.Key) error {
wb.clearRangeCount++
return wb.Batch.ClearRawRange(start, end)
return wb.Batch.ClearEngineKey(key)
}

func (wb *wrappedBatch) ClearMVCCRangeAndIntents(start, end roachpb.Key) error {
wb.clearRangeCount++
return wb.Batch.ClearMVCCRangeAndIntents(start, end)
}

func (wb *wrappedBatch) ClearMVCCRange(start, end storage.MVCCKey) error {
wb.clearRangeCount++
return wb.Batch.ClearMVCCRange(start, end)
}

// TestCmdClearRangeBytesThreshold verifies that clear range resorts to
// clearing keys individually if under the bytes threshold and issues a
// clear range command to the batch otherwise.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_revert_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func getStats(t *testing.T, reader storage.Reader) enginepb.MVCCStats {
t.Helper()
iter := reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
defer iter.Close()
s, err := storage.ComputeStatsGo(iter, roachpb.KeyMin, roachpb.KeyMax, 1100)
s, err := storage.ComputeStatsForRange(iter, roachpb.KeyMin, roachpb.KeyMax, 1100)
if err != nil {
t.Fatalf("%+v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3116,7 +3116,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
// Construct SST #1 through #3 as numbered above, but only ultimately
// keep the 3rd one.
keyRanges := rditer.MakeReplicatedKeyRanges(inSnap.State.Desc)
it := rditer.NewReplicaDataIterator(inSnap.State.Desc, sendingEng, true /* replicatedOnly */, false /* seekEnd */)
it := rditer.NewReplicaEngineDataIterator(inSnap.State.Desc, sendingEng, true /* replicatedOnly */)
defer it.Close()
// Write a range deletion tombstone to each of the SSTs then put in the
// kv entries from the sender of the snapshot.
Expand All @@ -3142,7 +3142,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
expectedSSTs = append(expectedSSTs, sstFile.Data())
break
}
if err := sst.Put(it.Key(), it.Value()); err != nil {
if err := sst.PutEngineKey(it.Key(), it.Value()); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
iter := cpEng.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: []byte("\xff")})
defer iter.Close()

ms, err := storage.ComputeStatsGo(iter, roachpb.KeyMin, roachpb.KeyMax, 0 /* nowNanos */)
ms, err := storage.ComputeStatsForRange(iter, roachpb.KeyMin, roachpb.KeyMax, 0 /* nowNanos */)
assert.NoError(t, err)

assert.NotZero(t, ms.KeyBytes)
Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/debug_print.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func tryRangeDescriptor(kv storage.MVCCKeyValue) (string, error) {
return descStr(desc), nil
}

// tryIntent does not look at the key.
func tryIntent(kv storage.MVCCKeyValue) (string, error) {
if len(kv.Value) == 0 {
return "", errors.New("empty")
Expand Down Expand Up @@ -374,3 +375,20 @@ func (s *stringifyWriteBatch) String() string {
}
return fmt.Sprintf("failed to stringify write batch (%x): %s", s.Data, err)
}

// PrintEngineKeyValue attempts to print the given key-value pair.
func PrintEngineKeyValue(k storage.EngineKey, v []byte) {
if k.IsMVCCKey() {
if key, err := k.ToMVCCKey(); err == nil {
PrintKeyValue(storage.MVCCKeyValue{Key: key, Value: v})
return
}
}
var sb strings.Builder
fmt.Fprintf(&sb, "%s %x (%#x): ", k.Key, k.Version, k.Encode())
if out, err := tryIntent(storage.MVCCKeyValue{Value: v}); err == nil {
sb.WriteString(out)
} else {
fmt.Fprintf(&sb, "%x", v)
}
}
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/gc/gc_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
)

// gcIterator wraps an rditer.ReplicaDataIterator which it reverse iterates for
// gcIterator wraps an rditer.ReplicaMVCCDataIterator which it reverse iterates for
// the purpose of discovering gc-able replicated data.
type gcIterator struct {
it *rditer.ReplicaDataIterator
it *rditer.ReplicaMVCCDataIterator
done bool
err error
buf gcIteratorRingBuf
}

func makeGCIterator(desc *roachpb.RangeDescriptor, snap storage.Reader) gcIterator {
return gcIterator{
it: rditer.NewReplicaDataIterator(desc, snap,
it: rditer.NewReplicaMVCCDataIterator(desc, snap,
true /* replicatedOnly */, true /* seekEnd */),
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/gc/gc_old_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func runGCOld(
cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc,
) (Info, error) {

iter := rditer.NewReplicaDataIterator(desc, snap,
iter := rditer.NewReplicaMVCCDataIterator(desc, snap,
true /* replicatedOnly */, false /* seekEnd */)
defer iter.Close()

Expand Down
Loading

0 comments on commit 32cd89c

Please sign in to comment.