diff --git a/clientv3/integration/leasing_test.go b/clientv3/integration/leasing_test.go index 317e690f814..28b257cc7c8 100644 --- a/clientv3/integration/leasing_test.go +++ b/clientv3/integration/leasing_test.go @@ -1296,7 +1296,7 @@ func TestLeasingReconnectOwnerRevoke(t *testing.T) { defer close(sdonec) for i := 0; i < 10 && cctx.Err() == nil; i++ { clus.Members[0].Stop(t) - time.Sleep(100 * time.Millisecond) + time.Sleep(10 * time.Millisecond) clus.Members[0].Restart(t) } }() @@ -1317,7 +1317,7 @@ func TestLeasingReconnectOwnerRevoke(t *testing.T) { case <-pdonec: cancel() <-sdonec - case <-time.After(5 * time.Second): + case <-time.After(10 * time.Second): cancel() <-sdonec <-pdonec diff --git a/clientv3/leasing/kv.go b/clientv3/leasing/kv.go index 7da812b98be..d899f2ce793 100644 --- a/clientv3/leasing/kv.go +++ b/clientv3/leasing/kv.go @@ -20,9 +20,12 @@ import ( v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/mvcc/mvccpb" "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" ) type leasingKV struct { @@ -239,16 +242,33 @@ func (lkv *leasingKV) put(ctx context.Context, op v3.Op) (pr *v3.PutResponse, er } func (lkv *leasingKV) acquire(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) { - if err := lkv.waitSession(ctx); err != nil { - return nil, err + for ctx.Err() == nil { + if err := lkv.waitSession(ctx); err != nil { + return nil, err + } + resp, err := lkv.kv.Txn(ctx).If( + v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0)). + Then( + op, + v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))). + Else( + op, + v3.OpGet(lkv.pfx+key), + ).Commit() + if err == nil { + if !resp.Succeeded { + kvs := resp.Responses[1].GetResponseRange().Kvs + // if txn failed since already owner, lease is acquired + resp.Succeeded = v3.LeaseID(kvs[0].Lease) == lkv.leaseID() + } + return resp, nil + } + // retry if transient error + if _, ok := err.(rpctypes.EtcdError); ok || grpc.Code(err) != codes.Unavailable { + return nil, err + } } - return lkv.kv.Txn(ctx).If( - v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0)). - Then( - op, - v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))). - Else(op). - Commit() + return nil, ctx.Err() } func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error) {