Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc lock service #7444

Merged
merged 8 commits into from
Mar 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func New(cfg Config) (*Client, error) {
return newClient(&cfg)
}

// NewCtxClient creates a client with a context but no underlying grpc
// connection. This is useful for embedded cases that override the
// service interface implementations and do not need connection management.
func NewCtxClient(ctx context.Context) *Client {
cctx, cancel := context.WithCancel(ctx)
return &Client{ctx: cctx, cancel: cancel}
}

// NewFromURL creates a new etcdv3 client from a URL.
func NewFromURL(url string) (*Client, error) {
return New(Config{Endpoints: []string{url}})
Expand All @@ -87,7 +95,10 @@ func (c *Client) Close() error {
c.cancel()
c.Watcher.Close()
c.Lease.Close()
return toErr(c.ctx, c.conn.Close())
if c.conn != nil {
return toErr(c.ctx, c.conn.Close())
}
return c.ctx.Err()
}

// Ctx is a context for "out of band" messages (e.g., for sending
Expand Down
2 changes: 1 addition & 1 deletion clientv3/concurrency/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
}
}

err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
if err != nil {
// clean up in case of context cancel
select {
Expand Down
9 changes: 5 additions & 4 deletions clientv3/concurrency/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

v3 "github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -46,19 +47,19 @@ func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) e

// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) error {
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
for {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what's the rationale of for loop here? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It loops because waitDelete will return after sensing one delete. There is no guarantee that all keys within the revision range will be deleted by that time. This code is a little old and the API is a bit stronger now so this can possibly be optimized to be single-shot on the watcher, but I don't want to change it for this patch.

resp, err := client.Get(ctx, pfx, getOpts...)
if err != nil {
return err
return nil, err
}
if len(resp.Kvs) == 0 {
return nil
return resp.Header, nil
}
lastKey := string(resp.Kvs[0].Key)
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
return err
return nil, err
}
}
}
14 changes: 10 additions & 4 deletions clientv3/concurrency/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"

v3 "github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context"
)

Expand All @@ -29,13 +30,14 @@ type Mutex struct {
pfx string
myKey string
myRev int64
hdr *pb.ResponseHeader
}

func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx + "/", "", -1}
return &Mutex{s, pfx + "/", "", -1, nil}
}

// Lock locks the mutex with a cancellable context. If the context is cancelled
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
Expand All @@ -57,14 +59,15 @@ func (m *Mutex) Lock(ctx context.Context) error {
}

// wait for deletion revisions prior to myKey
err = waitDeletes(ctx, client, m.pfx, m.myRev-1)
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if cancelled
select {
case <-ctx.Done():
m.Unlock(client.Ctx())
default:
m.hdr = hdr
}
return err
return werr
}

func (m *Mutex) Unlock(ctx context.Context) error {
Expand All @@ -83,6 +86,9 @@ func (m *Mutex) IsOwner() v3.Cmp {

func (m *Mutex) Key() string { return m.myKey }

// Header is the response header received from etcd on acquiring the lock.
func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr }

type lockerMutex struct{ *Mutex }

func (lm *lockerMutex) Lock() {
Expand Down
5 changes: 5 additions & 0 deletions embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"time"

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3client"
"github.com/coreos/etcd/etcdserver/api/v3lock"
"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/transport"
Expand Down Expand Up @@ -68,6 +71,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle

if sctx.insecure {
gs := v3rpc.Server(s, nil)
v3lockpb.RegisterLockServer(gs, v3lock.NewLockServer(v3client.New(s)))
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
Expand Down Expand Up @@ -95,6 +99,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle

if sctx.secure {
gs := v3rpc.Server(s, tlscfg)
v3lockpb.RegisterLockServer(gs, v3lock.NewLockServer(v3client.New(s)))
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
Expand Down
16 changes: 16 additions & 0 deletions etcdserver/api/v3client/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package v3client provides clientv3 interfaces from an etcdserver.
package v3client
40 changes: 40 additions & 0 deletions etcdserver/api/v3client/v3client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v3client

import (
"context"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
"github.com/coreos/etcd/proxy/grpcproxy/adapter"
)

func New(s *etcdserver.EtcdServer) *clientv3.Client {
c := clientv3.NewCtxClient(context.Background())

kvc := adapter.KvServerToKvClient(v3rpc.NewQuotaKVServer(s))
c.KV = clientv3.NewKVFromKVClient(kvc)

lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s))
c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second)

wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
c.Watcher = clientv3.NewWatchFromWatchClient(wc)

return c
}
16 changes: 16 additions & 0 deletions etcdserver/api/v3lock/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package v3lock provides a v3 locking service from an etcdserver.
package v3lock
56 changes: 56 additions & 0 deletions etcdserver/api/v3lock/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v3lock

import (
"golang.org/x/net/context"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
)

type lockServer struct {
c *clientv3.Client
}

func NewLockServer(c *clientv3.Client) v3lockpb.LockServer {
return &lockServer{c}
}

func (ls *lockServer) Lock(ctx context.Context, req *v3lockpb.LockRequest) (*v3lockpb.LockResponse, error) {
s, err := concurrency.NewSession(
ls.c,
concurrency.WithLease(clientv3.LeaseID(req.Lease)),
concurrency.WithContext(ctx),
)
if err != nil {
return nil, err
}
s.Orphan()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to call Orphan here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise the server will keep the lease alive indefinitely.

An alternative to putting it here is defer s.Orphan() so it'll keep the lease alive until the the lock is acquired so the client doesn't have to worry about keep alives. The problem with that is there's no guarantee how much time the lease will have left after the acquire. This could be solved by issuing a keepalive before returning from the RPC.

Another option is to make leases optional and have an additional TTL field so the server can grant a lease and return a lock held for at least the given TTL.

m := concurrency.NewMutex(s, string(req.Name))
if err = m.Lock(ctx); err != nil {
return nil, err
}
return &v3lockpb.LockResponse{Header: m.Header(), Key: []byte(m.Key())}, nil
}

func (ls *lockServer) Unlock(ctx context.Context, req *v3lockpb.UnlockRequest) (*v3lockpb.UnlockResponse, error) {
resp, err := ls.c.Delete(ctx, string(req.Key))
if err != nil {
return nil, err
}
return &v3lockpb.UnlockResponse{Header: resp.Header}, nil
}
Loading