Skip to content

Commit

Permalink
Merge pull request #8153 from gyuho/leadership-transfer
Browse files Browse the repository at this point in the history
*: expose Leadership Transfer API to clients
  • Loading branch information
gyuho committed Jul 6, 2017
2 parents d48e59e + d428958 commit a57405a
Show file tree
Hide file tree
Showing 24 changed files with 1,148 additions and 350 deletions.
17 changes: 17 additions & 0 deletions Documentation/dev-guide/api_reference_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ This is a generated documentation. Please read the proto files for more.
| Defragment | DefragmentRequest | DefragmentResponse | Defragment defragments a member's backend database to recover storage space. |
| Hash | HashRequest | HashResponse | Hash returns the hash of the local KV state for consistency checking purpose. This is designed for testing; do not use this in production when there are ongoing transactions. |
| Snapshot | SnapshotRequest | SnapshotResponse | Snapshot sends a snapshot of the entire backend from a member over a stream to a client. |
| MoveLeader | MoveLeaderRequest | MoveLeaderResponse | MoveLeader requests current leader node to transfer its leadership to transferee. |



Expand Down Expand Up @@ -608,6 +609,22 @@ Empty field.



##### message `MoveLeaderRequest` (etcdserver/etcdserverpb/rpc.proto)

| Field | Description | Type |
| ----- | ----------- | ---- |
| targetID | targetID is the node ID for the new leader. | uint64 |



##### message `MoveLeaderResponse` (etcdserver/etcdserverpb/rpc.proto)

| Field | Description | Type |
| ----- | ----------- | ---- |
| header | | ResponseHeader |



##### message `PutRequest` (etcdserver/etcdserverpb/rpc.proto)

| Field | Description | Type |
Expand Down
47 changes: 46 additions & 1 deletion Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,33 @@
}
}
},
"/v3alpha/maintenance/transfer-leadership": {
"post": {
"tags": [
"Maintenance"
],
"summary": "MoveLeader requests current leader node to transfer its leadership to transferee.",
"operationId": "MoveLeader",
"parameters": [
{
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/etcdserverpbMoveLeaderRequest"
}
}
],
"responses": {
"200": {
"description": "(empty)",
"schema": {
"$ref": "#/definitions/etcdserverpbMoveLeaderResponse"
}
}
}
}
},
"/v3alpha/watch": {
"post": {
"tags": [
Expand Down Expand Up @@ -1803,6 +1830,24 @@
}
}
},
"etcdserverpbMoveLeaderRequest": {
"type": "object",
"properties": {
"targetID": {
"description": "targetID is the node ID for the new leader.",
"type": "string",
"format": "uint64"
}
}
},
"etcdserverpbMoveLeaderResponse": {
"type": "object",
"properties": {
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
}
}
},
"etcdserverpbPutRequest": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2177,7 +2222,7 @@
"format": "boolean"
},
"compact_revision": {
"description": "compact_revision is set to the minimum index if a watcher tries to watch\nat a compacted index.\n\nThis happens when creating a watcher at a compacted revision or the watcher cannot\ncatch up with the progress of the key-value store. \n\nThe client should treat the watcher as canceled and should not try to create any\nwatcher with the same start_revision again.",
"description": "compact_revision is set to the minimum index if a watcher tries to watch\nat a compacted index.\n\nThis happens when creating a watcher at a compacted revision or the watcher cannot\ncatch up with the progress of the key-value store.\n\nThe client should treat the watcher as canceled and should not try to create any\nwatcher with the same start_revision again.",
"type": "string",
"format": "int64"
},
Expand Down
53 changes: 53 additions & 0 deletions clientv3/integration/maintenance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"testing"

"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
)

func TestMaintenanceMoveLeader(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

oldLeadIdx := clus.WaitLeader(t)
targetIdx := (oldLeadIdx + 1) % 3
target := uint64(clus.Members[targetIdx].ID())

cli := clus.Client(targetIdx)
_, err := cli.MoveLeader(context.Background(), target)
if err != rpctypes.ErrNotLeader {
t.Fatalf("error expected %v, got %v", rpctypes.ErrNotLeader, err)
}

cli = clus.Client(oldLeadIdx)
_, err = cli.MoveLeader(context.Background(), target)
if err != nil {
t.Fatal(err)
}

leadIdx := clus.WaitLeader(t)
lead := uint64(clus.Members[leadIdx].ID())
if target != lead {
t.Fatalf("new leader expected %d, got %d", target, lead)
}
}
10 changes: 10 additions & 0 deletions clientv3/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type (
AlarmResponse pb.AlarmResponse
AlarmMember pb.AlarmMember
StatusResponse pb.StatusResponse
MoveLeaderResponse pb.MoveLeaderResponse
)

type Maintenance interface {
Expand All @@ -51,6 +52,10 @@ type Maintenance interface {

// Snapshot provides a reader for a snapshot of a backend.
Snapshot(ctx context.Context) (io.ReadCloser, error)

// MoveLeader requests current leader to transfer its leadership to the transferee.
// Request must be made to the leader.
MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error)
}

type maintenance struct {
Expand Down Expand Up @@ -180,3 +185,8 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
}()
return pr, nil
}

func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, grpc.FailFast(false))
return (*MoveLeaderResponse)(resp), toErr(ctx, err)
}
92 changes: 92 additions & 0 deletions e2e/ctl_v3_move_leader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"context"
"fmt"
"os"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
)

func TestCtlV3MoveLeader(t *testing.T) {
defer testutil.AfterTest(t)

epc := setupEtcdctlTest(t, &configNoTLS, true)
defer func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
}()

var leadIdx int
var leaderID uint64
var transferee uint64
for i, ep := range epc.grpcEndpoints() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
DialTimeout: 3 * time.Second,
})
if err != nil {
t.Fatal(err)
}
resp, err := cli.Status(context.Background(), ep)
if err != nil {
t.Fatal(err)
}
cli.Close()

if resp.Header.GetMemberId() == resp.Leader {
leadIdx = i
leaderID = resp.Leader
} else {
transferee = resp.Header.GetMemberId()
}
}

os.Setenv("ETCDCTL_API", "3")
defer os.Unsetenv("ETCDCTL_API")
cx := ctlCtx{
t: t,
cfg: configNoTLS,
dialTimeout: 7 * time.Second,
epc: epc,
}

tests := []struct {
prefixes []string
expect string
}{
{ // request to non-leader
cx.prefixArgs([]string{cx.epc.grpcEndpoints()[(leadIdx+1)%3]}),
"no leader endpoint given at ",
},
{ // request to leader
cx.prefixArgs([]string{cx.epc.grpcEndpoints()[leadIdx]}),
fmt.Sprintf("Leadership transferred from %s to %s", types.ID(leaderID), types.ID(transferee)),
},
}
for i, tc := range tests {
cmdArgs := append(tc.prefixes, "move-leader", types.ID(transferee).String())
if err := spawnWithExpect(cmdArgs, tc.expect); err != nil {
t.Fatalf("#%d: %v", i, err)
}
}
}
23 changes: 23 additions & 0 deletions etcdctl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,29 @@ Prints a line of JSON encoding the database hash, revision, total keys, and size
+----------+----------+------------+------------+
```

### MOVE-LEADER \<hexadecimal-transferee-id\>

MOVE-LEADER transfers leadership from the leader to another member in the cluster.

#### Example

```bash
# to choose transferee
transferee_id=$(./etcdctl \
--endpoints localhost:12379,localhost:22379,localhost:32379 \
endpoint status | grep -m 1 "false" | awk -F', ' '{print $2}')
echo ${transferee_id}
# c89feb932daef420

# endpoints should include leader node
./etcdctl --endpoints ${transferee_ep} move-leader ${transferee_id}
# Error: no leader endpoint given at [localhost:22379 localhost:32379]

# request to leader with target node ID
./etcdctl --endpoints ${leader_ep} move-leader ${transferee_id}
# Leadership transferred from 45ddc0e800e20b93 to c89feb932daef420
```

## Concurrency commands

### LOCK \<lockname\> [command arg1 arg2 ...]
Expand Down
87 changes: 87 additions & 0 deletions etcdctl/ctlv3/command/move_leader_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package command

import (
"fmt"
"strconv"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/spf13/cobra"
)

// NewMoveLeaderCommand returns the cobra command for "move-leader".
func NewMoveLeaderCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "move-leader <transferee-member-id>",
Short: "Transfers leadership to another etcd cluster member.",
Run: transferLeadershipCommandFunc,
}
return cmd
}

// transferLeadershipCommandFunc executes the "compaction" command.
func transferLeadershipCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
ExitWithError(ExitBadArgs, fmt.Errorf("move-leader command needs 1 argument"))
}
target, err := strconv.ParseUint(args[0], 16, 64)
if err != nil {
ExitWithError(ExitBadArgs, err)
}

c := mustClientFromCmd(cmd)
eps := c.Endpoints()
c.Close()

ctx, cancel := commandCtx(cmd)

// find current leader
var leaderCli *clientv3.Client
var leaderID uint64
for _, ep := range eps {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
DialTimeout: 3 * time.Second,
})
if err != nil {
ExitWithError(ExitError, err)
}
resp, err := cli.Status(ctx, ep)
if err != nil {
ExitWithError(ExitError, err)
}

if resp.Header.GetMemberId() == resp.Leader {
leaderCli = cli
leaderID = resp.Leader
break
}
cli.Close()
}
if leaderCli == nil {
ExitWithError(ExitBadArgs, fmt.Errorf("no leader endpoint given at %v", eps))
}

var resp *clientv3.MoveLeaderResponse
resp, err = leaderCli.MoveLeader(ctx, target)
cancel()
if err != nil {
ExitWithError(ExitError, err)
}

display.MoveLeader(leaderID, target, *resp)
}
Loading

0 comments on commit a57405a

Please sign in to comment.