Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.4] backports #11990 #12933 #15072

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions etcdctl/ctlv3/command/get_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
getFromKey bool
getRev int64
getKeysOnly bool
getCountOnly bool
printValueOnly bool
)

Expand All @@ -50,6 +51,7 @@ func NewGetCommand() *cobra.Command {
cmd.Flags().BoolVar(&getFromKey, "from-key", false, "Get keys that are greater than or equal to the given key using byte compare")
cmd.Flags().Int64Var(&getRev, "rev", 0, "Specify the kv revision")
cmd.Flags().BoolVar(&getKeysOnly, "keys-only", false, "Get only the keys")
cmd.Flags().BoolVar(&getCountOnly, "count-only", false, "Get only the count")
cmd.Flags().BoolVar(&printValueOnly, "print-value-only", false, `Only write values when using the "simple" output format`)
return cmd
}
Expand All @@ -64,6 +66,12 @@ func getCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitError, err)
}

if getCountOnly {
if _, fields := display.(*fieldsPrinter); !fields {
ExitWithError(ExitBadArgs, fmt.Errorf("--count-only is only for `--write-out=fields`"))
}
}

if printValueOnly {
dp, simple := (display).(*simplePrinter)
if !simple {
Expand All @@ -83,6 +91,10 @@ func getGetOp(args []string) (string, []clientv3.OpOption) {
ExitWithError(ExitBadArgs, fmt.Errorf("`--prefix` and `--from-key` cannot be set at the same time, choose one"))
}

if getKeysOnly && getCountOnly {
ExitWithError(ExitBadArgs, fmt.Errorf("`--keys-only` and `--count-only` cannot be set at the same time, choose one"))
}

opts := []clientv3.OpOption{}
switch getConsistency {
case "s":
Expand Down Expand Up @@ -159,5 +171,9 @@ func getGetOp(args []string) (string, []clientv3.OpOption) {
opts = append(opts, clientv3.WithKeysOnly())
}

if getCountOnly {
opts = append(opts, clientv3.WithCountOnly())
}

return key, opts
}
31 changes: 24 additions & 7 deletions integration/v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1285,8 +1285,9 @@ func TestV3RangeRequest(t *testing.T) {
putKeys []string
reqs []pb.RangeRequest

wresps [][]string
wmores []bool
wresps [][]string
wmores []bool
wcounts []int64
}{
// single key
{
Expand All @@ -1303,6 +1304,7 @@ func TestV3RangeRequest(t *testing.T) {
{},
},
[]bool{false, false},
[]int64{1, 0},
},
// multi-key
{
Expand Down Expand Up @@ -1331,6 +1333,7 @@ func TestV3RangeRequest(t *testing.T) {
{"a", "b", "c", "d", "e"},
},
[]bool{false, false, false, false, false, false},
[]int64{5, 2, 0, 0, 0, 5},
},
// revision
{
Expand All @@ -1349,22 +1352,30 @@ func TestV3RangeRequest(t *testing.T) {
{"a", "b"},
},
[]bool{false, false, false, false},
[]int64{5, 0, 1, 2},
},
// limit
{
[]string{"foo", "bar"},
[]string{"a", "b", "c"},
[]pb.RangeRequest{
// more
{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
// no more
// half
{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
// no more
{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 3},
// limit over
{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 4},
},

[][]string{
{"bar"},
{"bar", "foo"},
{"a"},
{"a", "b"},
{"a", "b", "c"},
{"a", "b", "c"},
},
[]bool{true, false},
[]bool{true, true, false, false},
[]int64{3, 3, 3, 3},
},
// sort
{
Expand Down Expand Up @@ -1417,6 +1428,7 @@ func TestV3RangeRequest(t *testing.T) {
{"b", "a", "c", "d"},
},
[]bool{true, true, true, true, false, false},
[]int64{4, 4, 4, 4, 0, 4},
},
// min/max mod rev
{
Expand Down Expand Up @@ -1448,6 +1460,7 @@ func TestV3RangeRequest(t *testing.T) {
{"rev2", "rev3", "rev4", "rev5", "rev6"},
},
[]bool{false, false, false, false},
[]int64{5, 5, 5, 5},
},
// min/max create rev
{
Expand Down Expand Up @@ -1479,6 +1492,7 @@ func TestV3RangeRequest(t *testing.T) {
{"rev2", "rev3", "rev6"},
},
[]bool{false, false, false, false},
[]int64{3, 3, 3, 3},
},
}

Expand Down Expand Up @@ -1512,6 +1526,9 @@ func TestV3RangeRequest(t *testing.T) {
if resp.More != tt.wmores[j] {
t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
}
if resp.GetCount() != tt.wcounts[j] {
t.Errorf("#%d.%d: bad count. got = %v, want = %v, ", i, j, resp.GetCount(), tt.wcounts[j])
}
wrev := int64(len(tt.putKeys) + 1)
if resp.Header.Revision != wrev {
t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
Expand Down
80 changes: 78 additions & 2 deletions mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ type Snapshot interface {
Close() error
}

type txReadBufferCache struct {
mu sync.Mutex
buf *txReadBuffer
bufVersion uint64
}

type backend struct {
// size and commits are used with atomic operations so they must be
// 64-bit aligned, otherwise 32-bit tests will crash
Expand All @@ -104,6 +110,11 @@ type backend struct {
batchTx *batchTxBuffered

readTx *readTx
// txReadBufferCache mirrors "txReadBuffer" within "readTx" -- readTx.baseReadTx.buf.
// When creating "concurrentReadTx":
// - if the cache is up-to-date, "readTx.baseReadTx.buf" copy can be skipped
// - if the cache is empty or outdated, "readTx.baseReadTx.buf" copy is required
txReadBufferCache txReadBufferCache

stopc chan struct{}
donec chan struct{}
Expand Down Expand Up @@ -176,17 +187,24 @@ func newBackend(bcfg BackendConfig) *backend {

readTx: &readTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
bufVersion: 0,
},
buckets: make(map[string]*bolt.Bucket),
txWg: new(sync.WaitGroup),
},
txReadBufferCache: txReadBufferCache{
mu: sync.Mutex{},
bufVersion: 0,
buf: nil,
},

stopc: make(chan struct{}),
donec: make(chan struct{}),

lg: bcfg.Logger,
}

b.batchTx = newBatchTxBuffered(b)
go b.run()
return b
Expand All @@ -209,9 +227,67 @@ func (b *backend) ConcurrentReadTx() ReadTx {
defer b.readTx.RUnlock()
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
b.readTx.txWg.Add(1)

// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.

// inspect/update cache recency iff there's no ongoing update to the cache
// this falls through if there's no cache update

// by this line, "ConcurrentReadTx" code path is already protected against concurrent "writeback" operations
// which requires write lock to update "readTx.baseReadTx.buf".
// Which means setting "buf *txReadBuffer" with "readTx.buf.unsafeCopy()" is guaranteed to be up-to-date,
// whereas "txReadBufferCache.buf" may be stale from concurrent "writeback" operations.
// We only update "txReadBufferCache.buf" if we know "buf *txReadBuffer" is up-to-date.
// The update to "txReadBufferCache.buf" will benefit the following "ConcurrentReadTx" creation
// by avoiding copying "readTx.baseReadTx.buf".
b.txReadBufferCache.mu.Lock()

curCache := b.txReadBufferCache.buf
curCacheVer := b.txReadBufferCache.bufVersion
curBufVer := b.readTx.buf.bufVersion

isEmptyCache := curCache == nil
isStaleCache := curCacheVer != curBufVer

var buf *txReadBuffer
switch {
case isEmptyCache:
// perform safe copy of buffer while holding "b.txReadBufferCache.mu.Lock"
// this is only supposed to run once so there won't be much overhead
curBuf := b.readTx.buf.unsafeCopy()
buf = &curBuf
case isStaleCache:
// to maximize the concurrency, try unsafe copy of buffer
// release the lock while copying buffer -- cache may become stale again and
// get overwritten by someone else.
// therefore, we need to check the readTx buffer version again
b.txReadBufferCache.mu.Unlock()
curBuf := b.readTx.buf.unsafeCopy()
b.txReadBufferCache.mu.Lock()
buf = &curBuf
default:
// neither empty nor stale cache, just use the current buffer
buf = curCache
}
// txReadBufferCache.bufVersion can be modified when we doing an unsafeCopy()
// as a result, curCacheVer could be no longer the same as
// txReadBufferCache.bufVersion
// if !isEmptyCache && curCacheVer != b.txReadBufferCache.bufVersion
// then the cache became stale while copying "readTx.baseReadTx.buf".
// It is safe to not update "txReadBufferCache.buf", because the next following
// "ConcurrentReadTx" creation will trigger a new "readTx.baseReadTx.buf" copy
// and "buf" is still used for the current "concurrentReadTx.baseReadTx.buf".
if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {
// continue if the cache is never set or no one has modified the cache
b.txReadBufferCache.buf = buf
b.txReadBufferCache.bufVersion = curBufVer
}

b.txReadBufferCache.mu.Unlock()

// concurrentReadTx is not supposed to write to its txReadBuffer
return &concurrentReadTx{
buf: b.readTx.buf.unsafeCopy(),
buf: *buf,
tx: b.readTx.tx,
txMu: &b.readTx.txMu,
buckets: b.readTx.buckets,
Expand Down
13 changes: 11 additions & 2 deletions mvcc/backend/tx_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sort"
)

const bucketBufferInitialSize = 512

// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
type txBuffer struct {
buckets map[string]*bucketBuffer
Expand Down Expand Up @@ -69,10 +71,16 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
rb.merge(wb)
}
txw.reset()
// increase the buffer version
txr.bufVersion++
}

// txReadBuffer accesses buffered updates.
type txReadBuffer struct{ txBuffer }
type txReadBuffer struct {
txBuffer
// bufVersion is used to check if the buffer is modified recently
bufVersion uint64
}

func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if b := txr.buckets[string(bucketName)]; b != nil {
Expand All @@ -94,6 +102,7 @@ func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
txBuffer: txBuffer{
buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
},
bufVersion: 0,
}
for bucketName, bucket := range txr.txBuffer.buckets {
txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
Expand All @@ -114,7 +123,7 @@ type bucketBuffer struct {
}

func newBucketBuffer() *bucketBuffer {
return &bucketBuffer{buf: make([]kv, 512), used: 0}
return &bucketBuffer{buf: make([]kv, bucketBufferInitialSize), used: 0}
}

func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
Expand Down
Loading