Skip to content

Commit

Permalink
etcdserver: use same ReadView for read-only txns
Browse files Browse the repository at this point in the history
A read-only txn isn't serialized by raft, but it uses a fresh
read txn for every mvcc access prior to executing its request ops.
If a write txn modifies the keys matching the read txn's comparisons,
the read txn may return inconsistent results.

To fix, use the same read-only mvcc txn for the duration of the etcd
txn. Probably gets a modest txn speedup as well since there are
fewer read txn allocations.
  • Loading branch information
heyitsanthony committed Jun 9, 2017
1 parent da48f1f commit d173b09
Showing 1 changed file with 48 additions and 51 deletions.
99 changes: 48 additions & 51 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,33 +319,36 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
}

func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
ok := true
for _, c := range rt.Compare {
if _, ok = a.applyCompare(c); !ok {
break
}
}

var reqs []*pb.RequestOp
if ok {
reqs = rt.Success
} else {
reqs = rt.Failure
}
isWrite := !isTxnReadonly(rt)
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read())

if err := a.checkRequestPut(reqs); err != nil {
return nil, err
reqs, ok := a.compareToOps(txn, rt)
if isWrite {
if err := a.checkRequestPut(txn, reqs); err != nil {
txn.End()
return nil, err
}
}
if err := a.checkRequestRange(reqs); err != nil {
if err := checkRequestRange(txn, reqs); err != nil {
txn.End()
return nil, err
}

resps := make([]*pb.ResponseOp, len(reqs))

// When executing the operations of txn, etcd must hold the txn lock so
// readers do not see any intermediate results.
// TODO: use Read txn if only Ranges
txn := a.s.KV().Write()
txnResp := &pb.TxnResponse{
Responses: resps,
Succeeded: ok,
Header: &pb.ResponseHeader{},
}

// When executing mutable txn ops, etcd must hold the txn lock so
// readers do not see any intermediate results. Since writes are
// serialized on the raft loop, the revision in the read view will

This comment has been minimized.

Copy link
@tisonkun

tisonkun Mar 11, 2021

Where is the entrance of "writes are serialized on the raft loop"?

We end the txn at first and then create a new one, which seems possibly lost the store backend lock intermediately and the compare version is stale.

// be the revision of the write txn.
if isWrite {
txn.End()
txn = a.s.KV().Write()
}
for i := range reqs {
resps[i] = a.applyUnion(txn, reqs[i])
}
Expand All @@ -355,23 +358,25 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
}
txn.End()

txnResp := &pb.TxnResponse{}
txnResp.Header = &pb.ResponseHeader{}
txnResp.Header.Revision = rev
txnResp.Responses = resps
txnResp.Succeeded = ok
return txnResp, nil
}

// applyCompare applies the compare request.
// It returns the revision at which the comparison happens. If the comparison
// succeeds, the it returns true. Otherwise it returns false.
func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
rr, err := a.s.KV().Range(c.Key, nil, mvcc.RangeOptions{})
rev := rr.Rev
func (a *applierV3backend) compareToOps(rv mvcc.ReadView, rt *pb.TxnRequest) ([]*pb.RequestOp, bool) {
for _, c := range rt.Compare {
if !applyCompare(rv, c) {
return rt.Failure, false
}
}
return rt.Success, true
}

// applyCompare applies the compare request.
// If the comparison succeeds, it returns true. Otherwise, returns false.
func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
rr, err := rv.Range(c.Key, nil, mvcc.RangeOptions{})
if err != nil {
return rev, false
return false
}
var ckv mvccpb.KeyValue
if len(rr.KVs) != 0 {
Expand All @@ -383,7 +388,7 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
// We can treat non-existence as the empty set explicitly, such that
// even a key with a value of length 0 bytes is still a real key
// that was written that way
return rev, false
return false
}
}

Expand Down Expand Up @@ -415,23 +420,15 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {

switch c.Result {
case pb.Compare_EQUAL:
if result != 0 {
return rev, false
}
return result == 0
case pb.Compare_NOT_EQUAL:
if result == 0 {
return rev, false
}
return result != 0
case pb.Compare_GREATER:
if result != 1 {
return rev, false
}
return result > 0
case pb.Compare_LESS:
if result != -1 {
return rev, false
}
return result < 0
}
return rev, true
return true
}

func (a *applierV3backend) applyUnion(txn mvcc.TxnWrite, union *pb.RequestOp) *pb.ResponseOp {
Expand Down Expand Up @@ -771,7 +768,7 @@ func (s *kvSortByValue) Less(i, j int) bool {
return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
}

func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error {
func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqs []*pb.RequestOp) error {
for _, requ := range reqs {
tv, ok := requ.Request.(*pb.RequestOp_RequestPut)
if !ok {
Expand All @@ -783,7 +780,7 @@ func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error {
}
if preq.IgnoreValue || preq.IgnoreLease {
// expects previous key-value, error if not exist
rr, err := a.s.KV().Range(preq.Key, nil, mvcc.RangeOptions{})
rr, err := rv.Range(preq.Key, nil, mvcc.RangeOptions{})
if err != nil {
return err
}
Expand All @@ -801,7 +798,7 @@ func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error {
return nil
}

func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestOp) error {
func checkRequestRange(rv mvcc.ReadView, reqs []*pb.RequestOp) error {
for _, requ := range reqs {
tv, ok := requ.Request.(*pb.RequestOp_RequestRange)
if !ok {
Expand All @@ -812,10 +809,10 @@ func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestOp) error {
continue
}

if greq.Revision > a.s.KV().Rev() {
if greq.Revision > rv.Rev() {
return mvcc.ErrFutureRev
}
if greq.Revision < a.s.KV().FirstRev() {
if greq.Revision < rv.FirstRev() {
return mvcc.ErrCompacted
}
}
Expand Down

0 comments on commit d173b09

Please sign in to comment.