From 950e33efe2afe1c36da9665323f60696a537d514 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 14 Jun 2017 18:37:45 -0700 Subject: [PATCH] namespace: support nested txns --- clientv3/namespace/kv.go | 64 ++++++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/clientv3/namespace/kv.go b/clientv3/namespace/kv.go index dc013742b018..b5ec5e964e9b 100644 --- a/clientv3/namespace/kv.go +++ b/clientv3/namespace/kv.go @@ -74,7 +74,7 @@ func (kv *kvPrefix) Delete(ctx context.Context, key string, opts ...clientv3.OpO } func (kv *kvPrefix) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) { - if len(op.KeyBytes()) == 0 { + if len(op.KeyBytes()) == 0 && !op.IsTxn() { return clientv3.OpResponse{}, rpctypes.ErrEmptyKey } r, err := kv.KV.Do(ctx, kv.prefixOp(op)) @@ -88,6 +88,8 @@ func (kv *kvPrefix) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse kv.unprefixPutResponse(r.Put()) case r.Del() != nil: kv.unprefixDeleteResponse(r.Del()) + case r.Txn() != nil: + kv.unprefixTxnResponse(r.Txn()) } return r, nil } @@ -102,34 +104,17 @@ func (kv *kvPrefix) Txn(ctx context.Context) clientv3.Txn { } func (txn *txnPrefix) If(cs ...clientv3.Cmp) clientv3.Txn { - newCmps := make([]clientv3.Cmp, len(cs)) - for i := range cs { - newCmps[i] = cs[i] - pfxKey, endKey := txn.kv.prefixInterval(cs[i].KeyBytes(), cs[i].RangeEnd) - newCmps[i].WithKeyBytes(pfxKey) - if len(cs[i].RangeEnd) != 0 { - newCmps[i].RangeEnd = endKey - } - } - txn.Txn = txn.Txn.If(newCmps...) + txn.Txn = txn.Txn.If(txn.kv.prefixCmps(cs)...) return txn } func (txn *txnPrefix) Then(ops ...clientv3.Op) clientv3.Txn { - newOps := make([]clientv3.Op, len(ops)) - for i := range ops { - newOps[i] = txn.kv.prefixOp(ops[i]) - } - txn.Txn = txn.Txn.Then(newOps...) + txn.Txn = txn.Txn.Then(txn.kv.prefixOps(ops)...) return txn } func (txn *txnPrefix) Else(ops ...clientv3.Op) clientv3.Txn { - newOps := make([]clientv3.Op, len(ops)) - for i := range ops { - newOps[i] = txn.kv.prefixOp(ops[i]) - } - txn.Txn = txn.Txn.Else(newOps...) + txn.Txn = txn.Txn.Else(txn.kv.prefixOps(ops)...) return txn } @@ -143,10 +128,14 @@ func (txn *txnPrefix) Commit() (*clientv3.TxnResponse, error) { } func (kv *kvPrefix) prefixOp(op clientv3.Op) clientv3.Op { - begin, end := kv.prefixInterval(op.KeyBytes(), op.RangeBytes()) - op.WithKeyBytes(begin) - op.WithRangeBytes(end) - return op + if !op.IsTxn() { + begin, end := kv.prefixInterval(op.KeyBytes(), op.RangeBytes()) + op.WithKeyBytes(begin) + op.WithRangeBytes(end) + return op + } + cmps, thenOps, elseOps := op.Txn() + return clientv3.OpTxn(kv.prefixCmps(cmps), kv.prefixOps(thenOps), kv.prefixOps(elseOps)) } func (kv *kvPrefix) unprefixGetResponse(resp *clientv3.GetResponse) { @@ -182,6 +171,10 @@ func (kv *kvPrefix) unprefixTxnResponse(resp *clientv3.TxnResponse) { if tv.ResponseDeleteRange != nil { kv.unprefixDeleteResponse((*clientv3.DeleteResponse)(tv.ResponseDeleteRange)) } + case *pb.ResponseOp_ResponseTxn: + if tv.ResponseTxn != nil { + kv.unprefixTxnResponse((*clientv3.TxnResponse)(tv.ResponseTxn)) + } default: } } @@ -190,3 +183,24 @@ func (kv *kvPrefix) unprefixTxnResponse(resp *clientv3.TxnResponse) { func (p *kvPrefix) prefixInterval(key, end []byte) (pfxKey []byte, pfxEnd []byte) { return prefixInterval(p.pfx, key, end) } + +func (kv *kvPrefix) prefixCmps(cs []clientv3.Cmp) []clientv3.Cmp { + newCmps := make([]clientv3.Cmp, len(cs)) + for i := range cs { + newCmps[i] = cs[i] + pfxKey, endKey := kv.prefixInterval(cs[i].KeyBytes(), cs[i].RangeEnd) + newCmps[i].WithKeyBytes(pfxKey) + if len(cs[i].RangeEnd) != 0 { + newCmps[i].RangeEnd = endKey + } + } + return newCmps +} + +func (kv *kvPrefix) prefixOps(ops []clientv3.Op) []clientv3.Op { + newOps := make([]clientv3.Op, len(ops)) + for i := range ops { + newOps[i] = kv.prefixOp(ops[i]) + } + return newOps +}