Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/tscache: propagate synthetic timestamps through timestamp cache #57811

Merged
merged 4 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions pkg/kv/kvserver/tscache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,26 @@ func (v cacheValue) String() string {
//
// This ratcheting policy is shared across all Cache implementations, even if
// they do not use this function directly.
func ratchetValue(old, new cacheValue) (cacheValue, bool) {
func ratchetValue(old, new cacheValue) (res cacheValue, updated bool) {
if old.ts.Less(new.ts) {
// Ratchet to new value.
// New value newer. Ratchet to new value.
return new, true
} else if new.ts.Less(old.ts) {
// Nothing to update.
// Old value newer. Nothing to update.
return old, false
} else if new.txnID != old.txnID {
}

// Equal times.
if new.ts.Synthetic != old.ts.Synthetic {
// old.ts == new.ts but the values have different synthetic flags.
// Remove the synthetic flag from the resulting value.
new.ts.Synthetic = false
}
if new.txnID != old.txnID {
// old.ts == new.ts but the values have different txnIDs. Remove the
// transaction ID from the value so that it is no longer owned by any
// transaction.
// transaction ID from the resulting value so that it is no longer owned
// by any transaction.
new.txnID = noTxnID
return new, old.txnID != noTxnID
}
// old == new.
return old, false
return new, new != old
}
162 changes: 106 additions & 56 deletions pkg/kv/kvserver/tscache/interval_skl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"bytes"
"container/list"
"context"
"encoding/binary"
"fmt"
"sync/atomic"
"time"
Expand All @@ -26,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -90,9 +88,7 @@ const (
)

const (
encodedTsSize = int(unsafe.Sizeof(int64(0)) + unsafe.Sizeof(int32(0)))
encodedTxnIDSize = int(unsafe.Sizeof(uuid.UUID{}))
encodedValSize = encodedTsSize + encodedTxnIDSize
encodedValSize = int(unsafe.Sizeof(cacheValue{}))

// initialSklPageSize is the initial size of each page in the sklImpl's
// intervalSkl. The pages start small to limit the memory footprint of
Expand Down Expand Up @@ -202,7 +198,7 @@ func newIntervalSkl(clock *hlc.Clock, minRet time.Duration, metrics sklMetrics)
minPages: defaultMinSklPages,
metrics: metrics,
}
s.pushNewPage(0 /* maxWallTime */, nil /* arena */)
s.pushNewPage(0 /* maxTime */, nil /* arena */)
s.metrics.Pages.Update(1)
return &s
}
Expand Down Expand Up @@ -301,9 +297,10 @@ func (s *intervalSkl) addRange(from, to []byte, opt rangeOptions, val cacheValue
s.rotMutex.RLock()
defer s.rotMutex.RUnlock()

// If floor ts is >= requested timestamp, then no need to perform a search
// or add any records.
if val.ts.LessEq(s.floorTS) {
// If floor ts is greater than the requested timestamp, then no need to
// perform a search or add any records. We don't return early when the
// timestamps are equal, because their flags may differ.
if val.ts.Less(s.floorTS) {
return nil
}

Expand Down Expand Up @@ -385,7 +382,7 @@ func (s *intervalSkl) frontPage() *sklPage {

// pushNewPage prepends a new empty page to the front of the pages list. It
// accepts an optional arena argument to facilitate re-use.
func (s *intervalSkl) pushNewPage(maxWallTime int64, arena *arenaskl.Arena) {
func (s *intervalSkl) pushNewPage(maxTime ratchetingTime, arena *arenaskl.Arena) {
size := s.nextPageSize()
if arena != nil && arena.Cap() == size {
// Re-use the provided arena, if possible.
Expand All @@ -395,7 +392,7 @@ func (s *intervalSkl) pushNewPage(maxWallTime int64, arena *arenaskl.Arena) {
arena = arenaskl.NewArena(size)
}
p := newSklPage(arena)
p.maxWallTime = maxWallTime
p.maxTime = maxTime
s.pages.PushFront(p)
}

Expand Down Expand Up @@ -455,7 +452,7 @@ func (s *intervalSkl) rotatePages(filledPage *sklPage) {
var oldArena *arenaskl.Arena
for s.pages.Len() >= s.minPages {
bp := back.Value.(*sklPage)
bpMaxTS := hlc.Timestamp{WallTime: bp.maxWallTime}
bpMaxTS := bp.getMaxTimestamp()
if minTSToRetain.LessEq(bpMaxTS) {
// The back page's maximum timestamp is within the time
// window we've promised to retain, so we can't evict it.
Expand All @@ -473,12 +470,12 @@ func (s *intervalSkl) rotatePages(filledPage *sklPage) {
}

// Push a new empty page on the front of the pages list. We give this page
// the maxWallTime of the old front page. This assures that the maxWallTime
// for a page is always equal to or greater than that for all earlier pages.
// In other words, it assures that the maxWallTime for a page is not only
// the maximum timestamp for all values it contains, but also for all values
// any earlier pages contain.
s.pushNewPage(fp.maxWallTime, oldArena)
// the maxTime of the old front page. This assures that the maxTime for a
// page is always equal to or greater than that for all earlier pages. In
// other words, it assures that the maxTime for a page is not only the
// maximum timestamp for all values it contains, but also for all values any
// earlier pages contain.
s.pushNewPage(fp.maxTime, oldArena)

// Update metrics.
s.metrics.Pages.Update(int64(s.pages.Len()))
Expand Down Expand Up @@ -526,7 +523,7 @@ func (s *intervalSkl) LookupTimestampRange(from, to []byte, opt rangeOptions) ca
// different txnID than our current cacheValue result (val), then we
// need to remove the txnID from our result, per the ratcheting policy
// for cacheValues. This is tested in TestIntervalSklMaxPageTS.
maxTS := hlc.Timestamp{WallTime: atomic.LoadInt64(&p.maxWallTime)}
maxTS := p.getMaxTimestamp()
if maxTS.Less(val.ts) {
break
}
Expand Down Expand Up @@ -554,9 +551,9 @@ func (s *intervalSkl) FloorTS() hlc.Timestamp {
// filled up, it returns arenaskl.ErrArenaFull. At that point, a new fixed page
// must be allocated and used instead.
type sklPage struct {
list *arenaskl.Skiplist
maxWallTime int64 // accessed atomically
isFull int32 // accessed atomically
list *arenaskl.Skiplist
maxTime ratchetingTime // accessed atomically
isFull int32 // accessed atomically
}

func newSklPage(arena *arenaskl.Arena) *sklPage {
Expand Down Expand Up @@ -802,6 +799,54 @@ func (p *sklPage) ensureFloorValue(it *arenaskl.Iterator, to []byte, val cacheVa
}

func (p *sklPage) ratchetMaxTimestamp(ts hlc.Timestamp) {
new := makeRatchetingTime(ts)
for {
old := ratchetingTime(atomic.LoadInt64((*int64)(&p.maxTime)))
if new <= old {
break
}

if atomic.CompareAndSwapInt64((*int64)(&p.maxTime), int64(old), int64(new)) {
break
}
}
}

func (p *sklPage) getMaxTimestamp() hlc.Timestamp {
return ratchetingTime(atomic.LoadInt64((*int64)(&p.maxTime))).get()
}

// ratchetingTime is a compressed representation of an hlc.Timestamp, reduced
// down to 64 bits to support atomic access.
//
// ratchetingTime implements compression such that any loss of information when
// passing through the type results in the resulting Timestamp being ratcheted
// to a larger value. This provides the guarantee that the following relation
// holds, regardless of the value of x:
//
// x.LessEq(makeRatchetingTime(x).get())
//
// It also provides the guarantee that if the synthetic flag is set on the
// initial timestamp, then this flag is set on the resulting Timestamp. So the
// following relation is guaranteed to hold, regardless of the value of x:
//
// x.IsFlagSet(SYNTHETIC) == makeRatchetingTime(x).get().IsFlagSet(SYNTHETIC)
//
// Compressed ratchetingTime values compare such that taking the maximum of any
// two ratchetingTime values and converting that back to a Timestamp is always
// equal to or larger than the equivalent call through the Timestamp.Forward
// method. So the following relation is guaranteed to hold, regardless of the
// value of x or y:
//
// z := max(makeRatchetingTime(x), makeRatchetingTime(y)).get()
// x.Forward(y).LessEq(z)
//
// Bit layout (LSB to MSB):
// bits 0: inverted synthetic flag
// bits 1 - 63: upper 63 bits of wall time
type ratchetingTime int64

func makeRatchetingTime(ts hlc.Timestamp) ratchetingTime {
// Cheat and just use the max wall time portion of the timestamp, since it's
// fine for the max timestamp to be a bit too large. This is the case
// because it's always safe to increase the timestamp in a range. It's also
Expand All @@ -815,23 +860,38 @@ func (p *sklPage) ratchetMaxTimestamp(ts hlc.Timestamp) {
// We could use an atomic.Value to store a "MaxValue" cacheValue for a given
// page, but this would be more expensive and it's not clear that it would
// be worth it.
new := ts.WallTime
rt := ratchetingTime(ts.WallTime)
if ts.Logical > 0 {
new++
rt++
}

// TODO(nvanbenschoten): propagate the timestamp synthetic bit through the
// page's max time.
for {
old := atomic.LoadInt64(&p.maxWallTime)
if new <= old {
break
}
// Similarly, cheat and use the last bit in the wall time to indicate
// whether the timestamp is synthetic or not. Do so by first rounding up the
// last bit of the wall time so that it is empty. This is safe for the same
// reason that rounding up the logical portion of the timestamp in the wall
// time is safe (see above).
//
// We use the last bit to indicate that the flag is NOT set. This ensures
// that if two timestamps have the same ordering but different values for
// the synthetic flag, the timestamp without the synthetic flag has a larger
// ratchetingTime value. This follows how Timestamp.Forward treats the flag.
if rt&1 == 1 {
rt++
}
if !ts.Synthetic {
rt |= 1
}

if atomic.CompareAndSwapInt64(&p.maxWallTime, old, new) {
break
}
return rt
}

func (rt ratchetingTime) get() hlc.Timestamp {
var ts hlc.Timestamp
ts.WallTime = int64(rt &^ 1)
if rt&1 == 0 {
ts.Synthetic = true
}
return ts
}

// ratchetPolicy defines the behavior a ratcheting attempt should take when
Expand Down Expand Up @@ -899,12 +959,9 @@ func (p *sklPage) ratchetValueSet(
// must handle with care.

// Ratchet the max timestamp.
keyTs, gapTs := keyVal.ts, gapVal.ts
if gapTs.Less(keyTs) {
p.ratchetMaxTimestamp(keyTs)
} else {
p.ratchetMaxTimestamp(gapTs)
}
maxTs := keyVal.ts
maxTs.Forward(gapVal.ts)
p.ratchetMaxTimestamp(maxTs)

// Remove the hasKey and hasGap flags from the meta. These will be
// replaced below.
Expand Down Expand Up @@ -1151,26 +1208,19 @@ func encodeValueSet(b []byte, keyVal, gapVal cacheValue) (ret []byte, meta uint1
}

func decodeValue(b []byte) (ret []byte, val cacheValue) {
// TODO(nvanbenschoten): decode the timestamp synthetic bit.
val.ts.WallTime = int64(binary.BigEndian.Uint64(b))
val.ts.Logical = int32(binary.BigEndian.Uint32(b[8:]))
var err error
if val.txnID, err = uuid.FromBytes(b[encodedTsSize:encodedValSize]); err != nil {
panic(err)
}
// Copy and interpret the byte slice as a cacheValue.
valPtr := (*[encodedValSize]byte)(unsafe.Pointer(&val))
copy(valPtr[:], b)
ret = b[encodedValSize:]
return
return ret, val
}

func encodeValue(b []byte, val cacheValue) []byte {
// TODO(nvanbenschoten): encode the timestamp synthetic bit.
l := len(b)
b = b[:l+encodedValSize]
binary.BigEndian.PutUint64(b[l:], uint64(val.ts.WallTime))
binary.BigEndian.PutUint32(b[l+8:], uint32(val.ts.Logical))
if _, err := val.txnID.MarshalTo(b[l+encodedTsSize:]); err != nil {
panic(err)
}
// Interpret the cacheValue as a byte slice and copy.
prev := len(b)
b = b[:prev+encodedValSize]
valPtr := (*[encodedValSize]byte)(unsafe.Pointer(&val))
copy(b[prev:], valPtr[:])
return b
}

Expand Down
Loading