Skip to content

Commit

Permalink
Merge pull request #7444 from heyitsanthony/lock-service
Browse files Browse the repository at this point in the history
grpc lock service
  • Loading branch information
Anthony Romano committed Mar 14, 2017
2 parents 148c923 + 300323f commit 1a6be70
Show file tree
Hide file tree
Showing 20 changed files with 1,289 additions and 19 deletions.
13 changes: 12 additions & 1 deletion clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func New(cfg Config) (*Client, error) {
return newClient(&cfg)
}

// NewCtxClient creates a client with a context but no underlying grpc
// connection. This is useful for embedded cases that override the
// service interface implementations and do not need connection management.
func NewCtxClient(ctx context.Context) *Client {
cctx, cancel := context.WithCancel(ctx)
return &Client{ctx: cctx, cancel: cancel}
}

// NewFromURL creates a new etcdv3 client from a URL.
func NewFromURL(url string) (*Client, error) {
return New(Config{Endpoints: []string{url}})
Expand All @@ -87,7 +95,10 @@ func (c *Client) Close() error {
c.cancel()
c.Watcher.Close()
c.Lease.Close()
return toErr(c.ctx, c.conn.Close())
if c.conn != nil {
return toErr(c.ctx, c.conn.Close())
}
return c.ctx.Err()
}

// Ctx is a context for "out of band" messages (e.g., for sending
Expand Down
2 changes: 1 addition & 1 deletion clientv3/concurrency/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
}
}

err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
if err != nil {
// clean up in case of context cancel
select {
Expand Down
9 changes: 5 additions & 4 deletions clientv3/concurrency/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

v3 "github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -46,19 +47,19 @@ func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) e

// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) error {
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
for {
resp, err := client.Get(ctx, pfx, getOpts...)
if err != nil {
return err
return nil, err
}
if len(resp.Kvs) == 0 {
return nil
return resp.Header, nil
}
lastKey := string(resp.Kvs[0].Key)
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
return err
return nil, err
}
}
}
14 changes: 10 additions & 4 deletions clientv3/concurrency/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"

v3 "github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context"
)

Expand All @@ -29,13 +30,14 @@ type Mutex struct {
pfx string
myKey string
myRev int64
hdr *pb.ResponseHeader
}

func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx + "/", "", -1}
return &Mutex{s, pfx + "/", "", -1, nil}
}

// Lock locks the mutex with a cancellable context. If the context is cancelled
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
Expand All @@ -57,14 +59,15 @@ func (m *Mutex) Lock(ctx context.Context) error {
}

// wait for deletion revisions prior to myKey
err = waitDeletes(ctx, client, m.pfx, m.myRev-1)
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if cancelled
select {
case <-ctx.Done():
m.Unlock(client.Ctx())
default:
m.hdr = hdr
}
return err
return werr
}

func (m *Mutex) Unlock(ctx context.Context) error {
Expand All @@ -83,6 +86,9 @@ func (m *Mutex) IsOwner() v3.Cmp {

func (m *Mutex) Key() string { return m.myKey }

// Header is the response header received from etcd on acquiring the lock.
func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr }

type lockerMutex struct{ *Mutex }

func (lm *lockerMutex) Lock() {
Expand Down
5 changes: 5 additions & 0 deletions embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"time"

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3client"
"github.com/coreos/etcd/etcdserver/api/v3lock"
"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/transport"
Expand Down Expand Up @@ -68,6 +71,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle

if sctx.insecure {
gs := v3rpc.Server(s, nil)
v3lockpb.RegisterLockServer(gs, v3lock.NewLockServer(v3client.New(s)))
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
Expand Down Expand Up @@ -95,6 +99,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle

if sctx.secure {
gs := v3rpc.Server(s, tlscfg)
v3lockpb.RegisterLockServer(gs, v3lock.NewLockServer(v3client.New(s)))
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
Expand Down
16 changes: 16 additions & 0 deletions etcdserver/api/v3client/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package v3client provides clientv3 interfaces from an etcdserver.
package v3client
40 changes: 40 additions & 0 deletions etcdserver/api/v3client/v3client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v3client

import (
"context"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
"github.com/coreos/etcd/proxy/grpcproxy/adapter"
)

func New(s *etcdserver.EtcdServer) *clientv3.Client {
c := clientv3.NewCtxClient(context.Background())

kvc := adapter.KvServerToKvClient(v3rpc.NewQuotaKVServer(s))
c.KV = clientv3.NewKVFromKVClient(kvc)

lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s))
c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second)

wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
c.Watcher = clientv3.NewWatchFromWatchClient(wc)

return c
}
16 changes: 16 additions & 0 deletions etcdserver/api/v3lock/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package v3lock provides a v3 locking service from an etcdserver.
package v3lock
56 changes: 56 additions & 0 deletions etcdserver/api/v3lock/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v3lock

import (
"golang.org/x/net/context"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
)

type lockServer struct {
c *clientv3.Client
}

func NewLockServer(c *clientv3.Client) v3lockpb.LockServer {
return &lockServer{c}
}

func (ls *lockServer) Lock(ctx context.Context, req *v3lockpb.LockRequest) (*v3lockpb.LockResponse, error) {
s, err := concurrency.NewSession(
ls.c,
concurrency.WithLease(clientv3.LeaseID(req.Lease)),
concurrency.WithContext(ctx),
)
if err != nil {
return nil, err
}
s.Orphan()
m := concurrency.NewMutex(s, string(req.Name))
if err = m.Lock(ctx); err != nil {
return nil, err
}
return &v3lockpb.LockResponse{Header: m.Header(), Key: []byte(m.Key())}, nil
}

func (ls *lockServer) Unlock(ctx context.Context, req *v3lockpb.UnlockRequest) (*v3lockpb.UnlockResponse, error) {
resp, err := ls.c.Delete(ctx, string(req.Key))
if err != nil {
return nil, err
}
return &v3lockpb.UnlockResponse{Header: resp.Header}, nil
}
Loading

0 comments on commit 1a6be70

Please sign in to comment.