Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Election RPC service #7634

Merged
merged 11 commits into from
Apr 8, 2017
Merged
45 changes: 37 additions & 8 deletions clientv3/concurrency/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

v3 "github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
)
Expand All @@ -36,13 +37,24 @@ type Election struct {
leaderKey string
leaderRev int64
leaderSession *Session
hdr *pb.ResponseHeader
}

// NewElection returns a new election on a given key prefix.
func NewElection(s *Session, pfx string) *Election {
return &Election{session: s, keyPrefix: pfx + "/"}
}

// ResumeElection initializes an election with a known leader.
func ResumeElection(s *Session, pfx string, leaderKey string, leaderRev int64) *Election {
return &Election{
session: s,
leaderKey: leaderKey,
leaderRev: leaderRev,
leaderSession: s,
}
}

// Campaign puts a value as eligible for the election. It blocks until
// it is elected, an error occurs, or the context is cancelled.
func (e *Election) Campaign(ctx context.Context, val string) error {
Expand Down Expand Up @@ -80,6 +92,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
}
return err
}
e.hdr = resp.Header

return nil
}
Expand All @@ -101,6 +114,8 @@ func (e *Election) Proclaim(ctx context.Context, val string) error {
e.leaderKey = ""
return ErrElectionNotLeader
}

e.hdr = tresp.Header
return nil
}

Expand All @@ -110,23 +125,27 @@ func (e *Election) Resign(ctx context.Context) (err error) {
return nil
}
client := e.session.Client()
_, err = client.Delete(ctx, e.leaderKey)
cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
resp, err := client.Txn(ctx).If(cmp).Then(v3.OpDelete(e.leaderKey)).Commit()
if err == nil {
e.hdr = resp.Header
}
e.leaderKey = ""
e.leaderSession = nil
return err
}

// Leader returns the leader value for the current election.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the comment to reflect the change? I don't think leader just return leader value anymore.

func (e *Election) Leader(ctx context.Context) (string, error) {
func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error) {
client := e.session.Client()
resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
if err != nil {
return "", err
return nil, err
} else if len(resp.Kvs) == 0 {
// no leader currently elected
return "", ErrElectionNoLeader
return nil, ErrElectionNoLeader
}
return string(resp.Kvs[0].Value), nil
return resp, nil
}

// Observe returns a channel that observes all leader proposal values as
Expand All @@ -142,20 +161,21 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
client := e.session.Client()

defer close(ch)
lastRev := int64(0)
for {
resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
opts := append(v3.WithFirstCreate(), v3.WithRev(lastRev))
resp, err := client.Get(ctx, e.keyPrefix, opts...)
if err != nil {
return
}

var kv *mvccpb.KeyValue

cctx, cancel := context.WithCancel(ctx)
if len(resp.Kvs) == 0 {
cctx, cancel := context.WithCancel(ctx)
// wait for first key put on prefix
opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
wch := client.Watch(cctx, e.keyPrefix, opts...)

for kv == nil {
wr, ok := <-wch
if !ok || wr.Err() != nil {
Expand All @@ -170,10 +190,12 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
}
}
}
cancel()
} else {
kv = resp.Kvs[0]
}

cctx, cancel := context.WithCancel(ctx)
wch := client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision))
keyDeleted := false
for !keyDeleted {
Expand All @@ -183,6 +205,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
}
for _, ev := range wr.Events {
if ev.Type == mvccpb.DELETE {
lastRev = ev.Kv.ModRevision
keyDeleted = true
break
}
Expand All @@ -201,3 +224,9 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {

// Key returns the leader key if elected, empty string otherwise.
func (e *Election) Key() string { return e.leaderKey }

// Rev returns the leader key's creation revision, if elected.
func (e *Election) Rev() int64 { return e.leaderRev }

// Header is the response header from the last successful election proposal.
func (m *Election) Header() *pb.ResponseHeader { return m.hdr }
11 changes: 9 additions & 2 deletions embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3client"
"github.com/coreos/etcd/etcdserver/api/v3election"
"github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
"github.com/coreos/etcd/etcdserver/api/v3lock"
"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
Expand Down Expand Up @@ -66,10 +68,14 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
plog.Info("ready to serve client requests")

m := cmux.New(sctx.l)
v3c := v3client.New(s)
servElection := v3election.NewElectionServer(v3c)
servLock := v3lock.NewLockServer(v3c)

if sctx.insecure {
gs := v3rpc.Server(s, nil)
v3lockpb.RegisterLockServer(gs, v3lock.NewLockServer(v3client.New(s)))
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
Expand Down Expand Up @@ -97,7 +103,8 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle

if sctx.secure {
gs := v3rpc.Server(s, tlscfg)
v3lockpb.RegisterLockServer(gs, v3lock.NewLockServer(v3client.New(s)))
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}
Expand Down
16 changes: 16 additions & 0 deletions etcdserver/api/v3election/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package v3election provides a v3 election service from an etcdserver.
package v3election
123 changes: 123 additions & 0 deletions etcdserver/api/v3election/election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v3election

import (
"golang.org/x/net/context"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
epb "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
)

type electionServer struct {
c *clientv3.Client
}

func NewElectionServer(c *clientv3.Client) epb.ElectionServer {
return &electionServer{c}
}

func (es *electionServer) Campaign(ctx context.Context, req *epb.CampaignRequest) (*epb.CampaignResponse, error) {
s, err := es.session(ctx, req.Lease)
if err != nil {
return nil, err
}
e := concurrency.NewElection(s, string(req.Name))
if err = e.Campaign(ctx, string(req.Value)); err != nil {
return nil, err
}
return &epb.CampaignResponse{
Header: e.Header(),
Leader: &epb.LeaderKey{
Name: req.Name,
Key: []byte(e.Key()),
Rev: e.Rev(),
Lease: int64(s.Lease()),
},
}, nil
}

func (es *electionServer) Proclaim(ctx context.Context, req *epb.ProclaimRequest) (*epb.ProclaimResponse, error) {
s, err := es.session(ctx, req.Leader.Lease)
if err != nil {
return nil, err
}
e := concurrency.ResumeElection(s, string(req.Leader.Name), string(req.Leader.Key), req.Leader.Rev)
if err := e.Proclaim(ctx, string(req.Value)); err != nil {
return nil, err
}
return &epb.ProclaimResponse{Header: e.Header()}, nil
}

func (es *electionServer) Observe(req *epb.LeaderRequest, stream epb.Election_ObserveServer) error {
s, err := es.session(stream.Context(), -1)
if err != nil {
return err
}
e := concurrency.NewElection(s, string(req.Name))
ch := e.Observe(stream.Context())
for stream.Context().Err() == nil {
select {
case <-stream.Context().Done():
case resp, ok := <-ch:
if !ok {
return nil
}
lresp := &epb.LeaderResponse{Header: resp.Header, Kv: resp.Kvs[0]}
if err := stream.Send(lresp); err != nil {
return err
}
}
}
return stream.Context().Err()
}

func (es *electionServer) Leader(ctx context.Context, req *epb.LeaderRequest) (*epb.LeaderResponse, error) {
s, err := es.session(ctx, -1)
if err != nil {
return nil, err
}
l, lerr := concurrency.NewElection(s, string(req.Name)).Leader(ctx)
if lerr != nil {
return nil, lerr
}
return &epb.LeaderResponse{Header: l.Header, Kv: l.Kvs[0]}, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we expecting the client that this api to parse leader name out of kv?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It's returning the full kv so it's possible to do txns on the leader key and revision. The client shouldn't care about what's in the key name in particular, just that it has some way to refer to the key by name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it the user responsibility to understand what kv contains and how to use it? if yes, would it be nicer to have response that that's more descriptive something like following?

message LeaderResponse {
   etcdserverpb.ResponseHeader header = 1;
   // comment
   bytes key = 2;
   // comment
   int64 rev = 3;
   // comment
   bytes value = 4;
} 

Just a thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assumption is the caller for Leader expects to get a key that represents the latest leader post. There's already a data type for keys and it's mvccpb.KeyValue and the client should already know what a key is.

The metadata is still useful since creation revision, modification revision, and version can all be used to reason about the status of the leader. I don't understand what the advantage is to having a totally separate data type for this-- it's a new data type that can't be used like an ordinary key and it lacks key metadata.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe separate data structure is not useful. However, my main concern is that would the caller/user know exactly what to do with the kv data type and its metadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, can add some examples for the Lock and Election RPCs in another PR.

}

func (es *electionServer) Resign(ctx context.Context, req *epb.ResignRequest) (*epb.ResignResponse, error) {
s, err := es.session(ctx, req.Leader.Lease)
if err != nil {
return nil, err
}
e := concurrency.ResumeElection(s, string(req.Leader.Name), string(req.Leader.Key), req.Leader.Rev)
if err := e.Resign(ctx); err != nil {
return nil, err
}
return &epb.ResignResponse{Header: e.Header()}, nil
}

func (es *electionServer) session(ctx context.Context, lease int64) (*concurrency.Session, error) {
s, err := concurrency.NewSession(
es.c,
concurrency.WithLease(clientv3.LeaseID(lease)),
concurrency.WithContext(ctx),
)
if err != nil {
return nil, err
}
s.Orphan()
return s, nil
}
Loading