Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: txn comparisons on ranges #8025

Merged
merged 6 commits into from
Jun 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Documentation/dev-guide/api_reference_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ CompactionRequest compacts the key-value store up to a given revision. All super
| create_revision | create_revision is the creation revision of the given key | int64 |
| mod_revision | mod_revision is the last modified revision of the given key. | int64 |
| value | value is the value of the given key, in bytes. | bytes |
| range_end | range_end compares the given target to all keys in the range [key, range_end). See RangeRequest for more details on key ranges. | bytes |



Expand Down
7 changes: 6 additions & 1 deletion Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,11 @@
"type": "string",
"format": "int64"
},
"range_end": {
"description": "range_end compares the given target to all keys in the range [key, range_end).\nSee RangeRequest for more details on key ranges.",
"type": "string",
"format": "byte"
},
"result": {
"description": "result is logical comparison operation for this comparison.",
"$ref": "#/definitions/CompareCompareResult"
Expand Down Expand Up @@ -2256,4 +2261,4 @@
"ApiKey": []
}
]
}
}
12 changes: 12 additions & 0 deletions clientv3/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,18 @@ func (cmp *Cmp) ValueBytes() []byte {
// WithValueBytes sets the byte slice for the comparison's value.
func (cmp *Cmp) WithValueBytes(v []byte) { cmp.TargetUnion.(*pb.Compare_Value).Value = v }

// WithRange sets the comparison to scan the range [key, end).
func (cmp Cmp) WithRange(end string) Cmp {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(cmp *Cmp)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it that way but clientv3.Compare(...).WithRange(...) will complain that the result from Compare() is not a pointer and it doesn't seem worth changing Compare()'s return value to a pointer to support this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh it returns Cmp anyway. Nvm.

cmp.RangeEnd = []byte(end)
return cmp
}

// WithPrefix sets the comparison to scan all keys prefixed by the key.
func (cmp Cmp) WithPrefix() Cmp {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(cmp *Cmp)?

cmp.RangeEnd = getPrefix(cmp.Key)
return cmp
}

func mustInt64(val interface{}) int64 {
if v, ok := val.(int64); ok {
return v
Expand Down
27 changes: 27 additions & 0 deletions clientv3/integration/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,30 @@ func TestTxnSuccess(t *testing.T) {
t.Fatalf("unexpected Get response %v", resp)
}
}

func TestTxnCompareRange(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

kv := clus.Client(0)
fooResp, err := kv.Put(context.TODO(), "foo/", "bar")
if err != nil {
t.Fatal(err)
}
if _, err = kv.Put(context.TODO(), "foo/a", "baz"); err != nil {
t.Fatal(err)
}
tresp, terr := kv.Txn(context.TODO()).If(
clientv3.Compare(
clientv3.CreateRevision("foo/"), "=", fooResp.Header.Revision).
WithPrefix(),
).Commit()
if terr != nil {
t.Fatal(terr)
}
if tresp.Succeeded {
t.Fatal("expected prefix compare to false, got compares as true")
}
}
5 changes: 4 additions & 1 deletion clientv3/namespace/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ func (txn *txnPrefix) If(cs ...clientv3.Cmp) clientv3.Txn {
newCmps := make([]clientv3.Cmp, len(cs))
for i := range cs {
newCmps[i] = cs[i]
pfxKey, _ := txn.kv.prefixInterval(cs[i].KeyBytes(), nil)
pfxKey, endKey := txn.kv.prefixInterval(cs[i].KeyBytes(), cs[i].RangeEnd)
newCmps[i].WithKeyBytes(pfxKey)
if len(cs[i].RangeEnd) != 0 {
newCmps[i].RangeEnd = endKey
}
}
txn.Txn = txn.Txn.If(newCmps...)
return txn
Expand Down
83 changes: 44 additions & 39 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,18 +194,15 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
resp := &pb.DeleteRangeResponse{}
resp.Header = &pb.ResponseHeader{}
end := mkGteRange(dr.RangeEnd)

if txn == nil {
txn = a.s.kv.Write()
defer txn.End()
}

if isGteRange(dr.RangeEnd) {
dr.RangeEnd = []byte{}
}

if dr.PrevKv {
rr, err := txn.Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
rr, err := txn.Range(dr.Key, end, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
Expand All @@ -216,7 +213,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
}
}

resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, dr.RangeEnd)
resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, end)
return resp, nil
}

Expand All @@ -229,10 +226,6 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
defer txn.End()
}

if isGteRange(r.RangeEnd) {
r.RangeEnd = []byte{}
}

limit := r.Limit
if r.SortOrder != pb.RangeRequest_NONE ||
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
Expand All @@ -251,7 +244,7 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
Count: r.CountOnly,
}

rr, err := txn.Range(r.Key, r.RangeEnd, ro)
rr, err := txn.Range(r.Key, mkGteRange(r.RangeEnd), ro)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -374,50 +367,57 @@ func (a *applierV3backend) compareToOps(rv mvcc.ReadView, rt *pb.TxnRequest) ([]
// applyCompare applies the compare request.
// If the comparison succeeds, it returns true. Otherwise, returns false.
func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
rr, err := rv.Range(c.Key, nil, mvcc.RangeOptions{})
// TOOD: possible optimizations
// * chunk reads for large ranges to conserve memory
// * rewrite rules for common patterns:
// ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0"
// * caching
rr, err := rv.Range(c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
if err != nil {
return false
}
var ckv mvccpb.KeyValue
if len(rr.KVs) != 0 {
ckv = rr.KVs[0]
} else {
// Use the zero value of ckv normally. However...
if len(rr.KVs) == 0 {
if c.Target == pb.Compare_VALUE {
// Always fail if we're comparing a value on a key that doesn't exist.
// We can treat non-existence as the empty set explicitly, such that
// even a key with a value of length 0 bytes is still a real key
// that was written that way
// Always fail if comparing a value on a key/keys that doesn't exist;
// nil == empty string in grpc; no way to represent missing value
return false
}
return compareKV(c, mvccpb.KeyValue{})
}
for _, kv := range rr.KVs {
if !compareKV(c, kv) {
return false
}
}
return true
}

// -1 is less, 0 is equal, 1 is greater
func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
var result int
rev := int64(0)
switch c.Target {
case pb.Compare_VALUE:
tv, _ := c.TargetUnion.(*pb.Compare_Value)
if tv != nil {
result = bytes.Compare(ckv.Value, tv.Value)
v := []byte{}
if tv, _ := c.TargetUnion.(*pb.Compare_Value); tv != nil {
v = tv.Value
}
result = bytes.Compare(ckv.Value, v)
case pb.Compare_CREATE:
tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision)
if tv != nil {
result = compareInt64(ckv.CreateRevision, tv.CreateRevision)
if tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision); tv != nil {
rev = tv.CreateRevision
}

result = compareInt64(ckv.CreateRevision, rev)
case pb.Compare_MOD:
tv, _ := c.TargetUnion.(*pb.Compare_ModRevision)
if tv != nil {
result = compareInt64(ckv.ModRevision, tv.ModRevision)
if tv, _ := c.TargetUnion.(*pb.Compare_ModRevision); tv != nil {
rev = tv.ModRevision
}
result = compareInt64(ckv.ModRevision, rev)
case pb.Compare_VERSION:
tv, _ := c.TargetUnion.(*pb.Compare_Version)
if tv != nil {
result = compareInt64(ckv.Version, tv.Version)
if tv, _ := c.TargetUnion.(*pb.Compare_Version); tv != nil {
rev = tv.Version
}
result = compareInt64(ckv.Version, rev)
}

switch c.Result {
case pb.Compare_EQUAL:
return result == 0
Expand Down Expand Up @@ -830,10 +830,15 @@ func compareInt64(a, b int64) int {
}
}

// isGteRange determines if the range end is a >= range. This works around grpc
// mkGteRange determines if the range end is a >= range. This works around grpc
// sending empty byte strings as nil; >= is encoded in the range end as '\0'.
func isGteRange(rangeEnd []byte) bool {
return len(rangeEnd) == 1 && rangeEnd[0] == 0
// If it is a GTE range, then []byte{} is returned to indicate the empty byte
// string (vs nil being no byte string).
func mkGteRange(rangeEnd []byte) []byte {
if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
return []byte{}
}
return rangeEnd
}

func noSideEffect(r *pb.InternalRaftRequest) bool {
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/apply_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func checkTxnReqsPermission(as auth.AuthStore, ai *auth.AuthInfo, reqs []*pb.Req

func checkTxnAuth(as auth.AuthStore, ai *auth.AuthInfo, rt *pb.TxnRequest) error {
for _, c := range rt.Compare {
if err := as.IsRangePermitted(ai, c.Key, nil); err != nil {
if err := as.IsRangePermitted(ai, c.Key, c.RangeEnd); err != nil {
return err
}
}
Expand Down
Loading