Skip to content

Commit

Permalink
grpcproxy: support nested txns
Browse files Browse the repository at this point in the history
  • Loading branch information
Anthony Romano committed Jun 21, 2017
1 parent f400010 commit f465e3e
Showing 1 changed file with 24 additions and 19 deletions.
43 changes: 24 additions & 19 deletions proxy/grpcproxy/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,28 +99,13 @@ func (p *kvProxy) txnToCache(reqs []*pb.RequestOp, resps []*pb.ResponseOp) {
}

func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
txn := p.kv.Txn(ctx)
cmps := make([]clientv3.Cmp, len(r.Compare))
thenops := make([]clientv3.Op, len(r.Success))
elseops := make([]clientv3.Op, len(r.Failure))

for i := range r.Compare {
cmps[i] = (clientv3.Cmp)(*r.Compare[i])
}

for i := range r.Success {
thenops[i] = requestOpToOp(r.Success[i])
}

for i := range r.Failure {
elseops[i] = requestOpToOp(r.Failure[i])
}

resp, err := txn.If(cmps...).Then(thenops...).Else(elseops...).Commit()

op := TxnRequestToOp(r)
opResp, err := p.kv.Do(ctx, op)
if err != nil {
return nil, err
}
resp := opResp.Txn()

// txn may claim an outdated key is updated; be safe and invalidate
for _, cmp := range r.Compare {
p.cache.Invalidate(cmp.Key, cmp.RangeEnd)
Expand Down Expand Up @@ -167,6 +152,10 @@ func requestOpToOp(union *pb.RequestOp) clientv3.Op {
if tv.RequestDeleteRange != nil {
return DelRequestToOp(tv.RequestDeleteRange)
}
case *pb.RequestOp_RequestTxn:
if tv.RequestTxn != nil {
return TxnRequestToOp(tv.RequestTxn)
}
}
panic("unknown request")
}
Expand Down Expand Up @@ -219,3 +208,19 @@ func DelRequestToOp(r *pb.DeleteRangeRequest) clientv3.Op {
}
return clientv3.OpDelete(string(r.Key), opts...)
}

func TxnRequestToOp(r *pb.TxnRequest) clientv3.Op {
cmps := make([]clientv3.Cmp, len(r.Compare))
thenops := make([]clientv3.Op, len(r.Success))
elseops := make([]clientv3.Op, len(r.Failure))
for i := range r.Compare {
cmps[i] = (clientv3.Cmp)(*r.Compare[i])
}
for i := range r.Success {
thenops[i] = requestOpToOp(r.Success[i])
}
for i := range r.Failure {
elseops[i] = requestOpToOp(r.Failure[i])
}
return clientv3.OpTxn(cmps, thenops, elseops)
}

0 comments on commit f465e3e

Please sign in to comment.