Skip to content

Commit

Permalink
Merge pull request #8025 from heyitsanthony/txn-cmp-range
Browse files Browse the repository at this point in the history
api: txn comparisons on ranges
  • Loading branch information
Anthony Romano authored Jun 18, 2017
2 parents e475a4e + 8f34d0c commit 45fbac5
Show file tree
Hide file tree
Showing 11 changed files with 457 additions and 259 deletions.
1 change: 1 addition & 0 deletions Documentation/dev-guide/api_reference_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ CompactionRequest compacts the key-value store up to a given revision. All super
| create_revision | create_revision is the creation revision of the given key | int64 |
| mod_revision | mod_revision is the last modified revision of the given key. | int64 |
| value | value is the value of the given key, in bytes. | bytes |
| range_end | range_end compares the given target to all keys in the range [key, range_end). See RangeRequest for more details on key ranges. | bytes |



Expand Down
7 changes: 6 additions & 1 deletion Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,11 @@
"type": "string",
"format": "int64"
},
"range_end": {
"description": "range_end compares the given target to all keys in the range [key, range_end).\nSee RangeRequest for more details on key ranges.",
"type": "string",
"format": "byte"
},
"result": {
"description": "result is logical comparison operation for this comparison.",
"$ref": "#/definitions/CompareCompareResult"
Expand Down Expand Up @@ -2256,4 +2261,4 @@
"ApiKey": []
}
]
}
}
12 changes: 12 additions & 0 deletions clientv3/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,18 @@ func (cmp *Cmp) ValueBytes() []byte {
// WithValueBytes sets the byte slice for the comparison's value.
func (cmp *Cmp) WithValueBytes(v []byte) { cmp.TargetUnion.(*pb.Compare_Value).Value = v }

// WithRange sets the comparison to scan the range [key, end).
func (cmp Cmp) WithRange(end string) Cmp {
cmp.RangeEnd = []byte(end)
return cmp
}

// WithPrefix sets the comparison to scan all keys prefixed by the key.
func (cmp Cmp) WithPrefix() Cmp {
cmp.RangeEnd = getPrefix(cmp.Key)
return cmp
}

func mustInt64(val interface{}) int64 {
if v, ok := val.(int64); ok {
return v
Expand Down
27 changes: 27 additions & 0 deletions clientv3/integration/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,30 @@ func TestTxnSuccess(t *testing.T) {
t.Fatalf("unexpected Get response %v", resp)
}
}

func TestTxnCompareRange(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

kv := clus.Client(0)
fooResp, err := kv.Put(context.TODO(), "foo/", "bar")
if err != nil {
t.Fatal(err)
}
if _, err = kv.Put(context.TODO(), "foo/a", "baz"); err != nil {
t.Fatal(err)
}
tresp, terr := kv.Txn(context.TODO()).If(
clientv3.Compare(
clientv3.CreateRevision("foo/"), "=", fooResp.Header.Revision).
WithPrefix(),
).Commit()
if terr != nil {
t.Fatal(terr)
}
if tresp.Succeeded {
t.Fatal("expected prefix compare to false, got compares as true")
}
}
5 changes: 4 additions & 1 deletion clientv3/namespace/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ func (txn *txnPrefix) If(cs ...clientv3.Cmp) clientv3.Txn {
newCmps := make([]clientv3.Cmp, len(cs))
for i := range cs {
newCmps[i] = cs[i]
pfxKey, _ := txn.kv.prefixInterval(cs[i].KeyBytes(), nil)
pfxKey, endKey := txn.kv.prefixInterval(cs[i].KeyBytes(), cs[i].RangeEnd)
newCmps[i].WithKeyBytes(pfxKey)
if len(cs[i].RangeEnd) != 0 {
newCmps[i].RangeEnd = endKey
}
}
txn.Txn = txn.Txn.If(newCmps...)
return txn
Expand Down
83 changes: 44 additions & 39 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,18 +194,15 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
resp := &pb.DeleteRangeResponse{}
resp.Header = &pb.ResponseHeader{}
end := mkGteRange(dr.RangeEnd)

if txn == nil {
txn = a.s.kv.Write()
defer txn.End()
}

if isGteRange(dr.RangeEnd) {
dr.RangeEnd = []byte{}
}

if dr.PrevKv {
rr, err := txn.Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
rr, err := txn.Range(dr.Key, end, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
Expand All @@ -216,7 +213,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
}
}

resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, dr.RangeEnd)
resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, end)
return resp, nil
}

Expand All @@ -229,10 +226,6 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
defer txn.End()
}

if isGteRange(r.RangeEnd) {
r.RangeEnd = []byte{}
}

limit := r.Limit
if r.SortOrder != pb.RangeRequest_NONE ||
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
Expand All @@ -251,7 +244,7 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
Count: r.CountOnly,
}

rr, err := txn.Range(r.Key, r.RangeEnd, ro)
rr, err := txn.Range(r.Key, mkGteRange(r.RangeEnd), ro)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -374,50 +367,57 @@ func (a *applierV3backend) compareToOps(rv mvcc.ReadView, rt *pb.TxnRequest) ([]
// applyCompare applies the compare request.
// If the comparison succeeds, it returns true. Otherwise, returns false.
func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
rr, err := rv.Range(c.Key, nil, mvcc.RangeOptions{})
// TOOD: possible optimizations
// * chunk reads for large ranges to conserve memory
// * rewrite rules for common patterns:
// ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0"
// * caching
rr, err := rv.Range(c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
if err != nil {
return false
}
var ckv mvccpb.KeyValue
if len(rr.KVs) != 0 {
ckv = rr.KVs[0]
} else {
// Use the zero value of ckv normally. However...
if len(rr.KVs) == 0 {
if c.Target == pb.Compare_VALUE {
// Always fail if we're comparing a value on a key that doesn't exist.
// We can treat non-existence as the empty set explicitly, such that
// even a key with a value of length 0 bytes is still a real key
// that was written that way
// Always fail if comparing a value on a key/keys that doesn't exist;
// nil == empty string in grpc; no way to represent missing value
return false
}
return compareKV(c, mvccpb.KeyValue{})
}
for _, kv := range rr.KVs {
if !compareKV(c, kv) {
return false
}
}
return true
}

// -1 is less, 0 is equal, 1 is greater
func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
var result int
rev := int64(0)
switch c.Target {
case pb.Compare_VALUE:
tv, _ := c.TargetUnion.(*pb.Compare_Value)
if tv != nil {
result = bytes.Compare(ckv.Value, tv.Value)
v := []byte{}
if tv, _ := c.TargetUnion.(*pb.Compare_Value); tv != nil {
v = tv.Value
}
result = bytes.Compare(ckv.Value, v)
case pb.Compare_CREATE:
tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision)
if tv != nil {
result = compareInt64(ckv.CreateRevision, tv.CreateRevision)
if tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision); tv != nil {
rev = tv.CreateRevision
}

result = compareInt64(ckv.CreateRevision, rev)
case pb.Compare_MOD:
tv, _ := c.TargetUnion.(*pb.Compare_ModRevision)
if tv != nil {
result = compareInt64(ckv.ModRevision, tv.ModRevision)
if tv, _ := c.TargetUnion.(*pb.Compare_ModRevision); tv != nil {
rev = tv.ModRevision
}
result = compareInt64(ckv.ModRevision, rev)
case pb.Compare_VERSION:
tv, _ := c.TargetUnion.(*pb.Compare_Version)
if tv != nil {
result = compareInt64(ckv.Version, tv.Version)
if tv, _ := c.TargetUnion.(*pb.Compare_Version); tv != nil {
rev = tv.Version
}
result = compareInt64(ckv.Version, rev)
}

switch c.Result {
case pb.Compare_EQUAL:
return result == 0
Expand Down Expand Up @@ -830,10 +830,15 @@ func compareInt64(a, b int64) int {
}
}

// isGteRange determines if the range end is a >= range. This works around grpc
// mkGteRange determines if the range end is a >= range. This works around grpc
// sending empty byte strings as nil; >= is encoded in the range end as '\0'.
func isGteRange(rangeEnd []byte) bool {
return len(rangeEnd) == 1 && rangeEnd[0] == 0
// If it is a GTE range, then []byte{} is returned to indicate the empty byte
// string (vs nil being no byte string).
func mkGteRange(rangeEnd []byte) []byte {
if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
return []byte{}
}
return rangeEnd
}

func noSideEffect(r *pb.InternalRaftRequest) bool {
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/apply_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func checkTxnReqsPermission(as auth.AuthStore, ai *auth.AuthInfo, reqs []*pb.Req

func checkTxnAuth(as auth.AuthStore, ai *auth.AuthInfo, rt *pb.TxnRequest) error {
for _, c := range rt.Compare {
if err := as.IsRangePermitted(ai, c.Key, nil); err != nil {
if err := as.IsRangePermitted(ai, c.Key, c.RangeEnd); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit 45fbac5

Please sign in to comment.