diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 9d520767720..949f36ca775 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -319,33 +319,36 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang } func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { - ok := true - for _, c := range rt.Compare { - if _, ok = a.applyCompare(c); !ok { - break - } - } - - var reqs []*pb.RequestOp - if ok { - reqs = rt.Success - } else { - reqs = rt.Failure - } + isWrite := !isTxnReadonly(rt) + txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read()) - if err := a.checkRequestPut(reqs); err != nil { - return nil, err + reqs, ok := a.compareToOps(txn, rt) + if isWrite { + if err := a.checkRequestPut(txn, reqs); err != nil { + txn.End() + return nil, err + } } - if err := a.checkRequestRange(reqs); err != nil { + if err := checkRequestRange(txn, reqs); err != nil { + txn.End() return nil, err } resps := make([]*pb.ResponseOp, len(reqs)) - - // When executing the operations of txn, etcd must hold the txn lock so - // readers do not see any intermediate results. - // TODO: use Read txn if only Ranges - txn := a.s.KV().Write() + txnResp := &pb.TxnResponse{ + Responses: resps, + Succeeded: ok, + Header: &pb.ResponseHeader{}, + } + + // When executing mutable txn ops, etcd must hold the txn lock so + // readers do not see any intermediate results. Since writes are + // serialized on the raft loop, the revision in the read view will + // be the revision of the write txn. + if isWrite { + txn.End() + txn = a.s.KV().Write() + } for i := range reqs { resps[i] = a.applyUnion(txn, reqs[i]) } @@ -355,23 +358,25 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { } txn.End() - txnResp := &pb.TxnResponse{} - txnResp.Header = &pb.ResponseHeader{} txnResp.Header.Revision = rev - txnResp.Responses = resps - txnResp.Succeeded = ok return txnResp, nil } -// applyCompare applies the compare request. -// It returns the revision at which the comparison happens. If the comparison -// succeeds, the it returns true. Otherwise it returns false. -func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) { - rr, err := a.s.KV().Range(c.Key, nil, mvcc.RangeOptions{}) - rev := rr.Rev +func (a *applierV3backend) compareToOps(rv mvcc.ReadView, rt *pb.TxnRequest) ([]*pb.RequestOp, bool) { + for _, c := range rt.Compare { + if !applyCompare(rv, c) { + return rt.Failure, false + } + } + return rt.Success, true +} +// applyCompare applies the compare request. +// If the comparison succeeds, it returns true. Otherwise, returns false. +func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool { + rr, err := rv.Range(c.Key, nil, mvcc.RangeOptions{}) if err != nil { - return rev, false + return false } var ckv mvccpb.KeyValue if len(rr.KVs) != 0 { @@ -383,7 +388,7 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) { // We can treat non-existence as the empty set explicitly, such that // even a key with a value of length 0 bytes is still a real key // that was written that way - return rev, false + return false } } @@ -415,23 +420,15 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) { switch c.Result { case pb.Compare_EQUAL: - if result != 0 { - return rev, false - } + return result == 0 case pb.Compare_NOT_EQUAL: - if result == 0 { - return rev, false - } + return result != 0 case pb.Compare_GREATER: - if result != 1 { - return rev, false - } + return result > 0 case pb.Compare_LESS: - if result != -1 { - return rev, false - } + return result < 0 } - return rev, true + return true } func (a *applierV3backend) applyUnion(txn mvcc.TxnWrite, union *pb.RequestOp) *pb.ResponseOp { @@ -771,7 +768,7 @@ func (s *kvSortByValue) Less(i, j int) bool { return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0 } -func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error { +func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqs []*pb.RequestOp) error { for _, requ := range reqs { tv, ok := requ.Request.(*pb.RequestOp_RequestPut) if !ok { @@ -783,7 +780,7 @@ func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error { } if preq.IgnoreValue || preq.IgnoreLease { // expects previous key-value, error if not exist - rr, err := a.s.KV().Range(preq.Key, nil, mvcc.RangeOptions{}) + rr, err := rv.Range(preq.Key, nil, mvcc.RangeOptions{}) if err != nil { return err } @@ -801,7 +798,7 @@ func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error { return nil } -func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestOp) error { +func checkRequestRange(rv mvcc.ReadView, reqs []*pb.RequestOp) error { for _, requ := range reqs { tv, ok := requ.Request.(*pb.RequestOp_RequestRange) if !ok { @@ -812,10 +809,10 @@ func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestOp) error { continue } - if greq.Revision > a.s.KV().Rev() { + if greq.Revision > rv.Rev() { return mvcc.ErrFutureRev } - if greq.Revision < a.s.KV().FirstRev() { + if greq.Revision < rv.FirstRev() { return mvcc.ErrCompacted } }