Skip to content

Commit

Permalink
Implement raft list-peers and remove-peer
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Castell committed Jun 28, 2019
1 parent 27e9ea0 commit edc6acb
Show file tree
Hide file tree
Showing 8 changed files with 590 additions and 95 deletions.
75 changes: 75 additions & 0 deletions cmd/raft.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package cmd

import (
"fmt"

"github.com/ryanuber/columnize"
"github.com/spf13/cobra"
"github.com/victorcoder/dkron/dkron"
)

// versionCmd represents the version command
var raftCmd = &cobra.Command{
Use: "raft [command]",
Short: "Command to perform some raft operations",
Long: ``,
}

var raftListCmd = &cobra.Command{
Use: "list-peers",
Short: "Command to list raft peers",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
var gc dkron.DkronGRPCClient
gc = dkron.NewGRPCClient(nil, nil)

reply, err := gc.RaftGetConfiguration(rpcAddr)
if err != nil {
return err
}

// Format it as a nice table.
result := []string{"Node|ID|Address|State|Voter"}
for _, s := range reply.Servers {
state := "follower"
if s.Leader {
state = "leader"
}
result = append(result, fmt.Sprintf("%s|%s|%s|%s|%v",
s.Node, s.Id, s.Address, state, s.Voter))
}

fmt.Println(columnize.SimpleFormat(result))

return nil
},
}

var peerID string

var raftRemovePeerCmd = &cobra.Command{
Use: "remove-peer",
Short: "Command to list raft peers",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
var gc dkron.DkronGRPCClient
gc = dkron.NewGRPCClient(nil, nil)

if err := gc.RaftRemovePeerByID(rpcAddr, peerID); err != nil {
return err
}
fmt.Println("Peer removed")

return nil
},
}

func init() {
raftCmd.PersistentFlags().StringVar(&rpcAddr, "rpc-addr", "127.0.0.1:6868", "gRPC address of the agent")
raftRemovePeerCmd.Flags().StringVar(&peerID, "peer-id", "", "Remove a Dkron server with the given ID from the Raft configuration.")

raftCmd.AddCommand(raftListCmd)
raftCmd.AddCommand(raftRemovePeerCmd)

dkronCmd.AddCommand(raftCmd)
}
90 changes: 90 additions & 0 deletions dkron/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
metrics "github.com/armon/go-metrics"
pb "github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"github.com/sirupsen/logrus"
"github.com/victorcoder/dkron/proto"
"golang.org/x/net/context"
Expand Down Expand Up @@ -254,3 +256,91 @@ func (grpcs *GRPCServer) RunJob(ctx context.Context, req *proto.RunJobRequest) (
func (grpcs *GRPCServer) ToggleJob(ctx context.Context, getJobReq *proto.ToggleJobRequest) (*proto.ToggleJobResponse, error) {
return nil, nil
}

// RaftGetConfiguration get raft config
func (grpcs *GRPCServer) RaftGetConfiguration(ctx context.Context, in *empty.Empty) (*proto.RaftGetConfigurationResponse, error) {
// We can't fetch the leader and the configuration atomically with
// the current Raft API.
future := grpcs.agent.raft.GetConfiguration()
if err := future.Error(); err != nil {
return nil, err
}

// Index the information about the servers.
serverMap := make(map[raft.ServerAddress]serf.Member)
for _, member := range grpcs.agent.serf.Members() {
valid, parts := isServer(member)
if !valid {
continue
}

addr := (&net.TCPAddr{IP: member.Addr, Port: parts.Port}).String()
serverMap[raft.ServerAddress(addr)] = member
}

// Fill out the reply.
leader := grpcs.agent.raft.Leader()
reply := &proto.RaftGetConfigurationResponse{}
reply.Index = future.Index()
for _, server := range future.Configuration().Servers {
node := "(unknown)"
raftProtocolVersion := "unknown"
if member, ok := serverMap[server.Address]; ok {
node = member.Name
if raftVsn, ok := member.Tags["raft_vsn"]; ok {
raftProtocolVersion = raftVsn
}
}

entry := &proto.RaftServer{
Id: string(server.ID),
Node: node,
Address: string(server.Address),
Leader: server.Address == leader,
Voter: server.Suffrage == raft.Voter,
RaftProtocol: raftProtocolVersion,
}
reply.Servers = append(reply.Servers, entry)
}
return reply, nil
}

// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft
// quorum but no longer known to Serf or the catalog) by address in the form of
// "IP:port". The reply argument is not used, but is required to fulfill the RPC
// interface.
func (grpcs *GRPCServer) RaftRemovePeerByID(ctx context.Context, in *proto.RaftRemovePeerByIDRequest) (*empty.Empty, error) {
// Since this is an operation designed for humans to use, we will return
// an error if the supplied id isn't among the peers since it's
// likely they screwed up.
{
future := grpcs.agent.raft.GetConfiguration()
if err := future.Error(); err != nil {
return nil, err
}
for _, s := range future.Configuration().Servers {
if s.ID == raft.ServerID(in.Id) {
goto REMOVE
}
}
return nil, fmt.Errorf("id %q was not found in the Raft configuration", in.Id)
}

REMOVE:
// The Raft library itself will prevent various forms of foot-shooting,
// like making a configuration with no voters. Some consideration was
// given here to adding more checks, but it was decided to make this as
// low-level and direct as possible. We've got ACL coverage to lock this
// down, and if you are an operator, it's assumed you know what you are
// doing if you are calling this. If you remove a peer that's known to
// Serf, for example, it will come back when the leader does a reconcile
// pass.
future := grpcs.agent.raft.RemoveServer(raft.ServerID(in.Id), 0, 0)
if err := future.Error(); err != nil {
log.WithError(err).WithField("peer", in.Id).Warn("failed to remove Raft peer")
return nil, err
}

log.WithField("peer", in.Id).Warn("removed Raft peer")
return nil, nil
}
90 changes: 78 additions & 12 deletions dkron/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type DkronGRPCClient interface {
CallDeleteJob(string) (*Job, error)
Leave(string) error
CallRunJob(string) (*Job, error)
RaftGetConfiguration(string) (*proto.RaftGetConfigurationResponse, error)
RaftRemovePeerByID(string, string) error
}

type GRPCClient struct {
Expand Down Expand Up @@ -133,9 +135,10 @@ func (grpcc *GRPCClient) Leave(addr string) error {
d := proto.NewDkronClient(conn)
_, err = d.Leave(context.Background(), &empty.Empty{})
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Warning("grpc: Error calling Leave")
log.WithError(err).WithFields(logrus.Fields{
"method": "Leave",
"server_addr": addr,
}).Error("grpc: Error calling gRPC method")
return err
}

Expand Down Expand Up @@ -165,9 +168,10 @@ func (grpcc *GRPCClient) CallSetJob(job *Job) error {
Job: job.ToProto(),
})
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Warning("grpc: Error calling SetJob")
log.WithError(err).WithFields(logrus.Fields{
"method": "CallSetJob",
"server_addr": addr,
}).Error("grpc: Error calling gRPC method")
return err
}
return nil
Expand Down Expand Up @@ -196,9 +200,10 @@ func (grpcc *GRPCClient) CallDeleteJob(jobName string) (*Job, error) {
JobName: jobName,
})
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Warning("grpc: Error calling SetJob")
log.WithError(err).WithFields(logrus.Fields{
"method": "CallDeleteJob",
"server_addr": addr,
}).Error("grpc: Error calling gRPC method")
return nil, err
}

Expand Down Expand Up @@ -230,13 +235,74 @@ func (grpcc *GRPCClient) CallRunJob(jobName string) (*Job, error) {
JobName: jobName,
})
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Warning("grpc: Error calling SetJob")
log.WithError(err).WithFields(logrus.Fields{
"method": "CallRunJob",
"server_addr": addr,
}).Error("grpc: Error calling gRPC method")
return nil, err
}

job := NewJobFromProto(res.Job)

return job, nil
}

// RaftGetConfiguration get the current raft configuration of peers
func (grpcc *GRPCClient) RaftGetConfiguration(addr string) (*proto.RaftGetConfigurationResponse, error) {
var conn *grpc.ClientConn

// Initiate a connection with the server
conn, err := grpcc.Connect(addr)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"method": "RaftGetConfiguration",
"server_addr": addr,
}).Error("grpc: error dialing.")
return nil, err
}
defer conn.Close()

// Synchronous call
d := proto.NewDkronClient(conn)
res, err := d.RaftGetConfiguration(context.Background(), &empty.Empty{})
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"method": "RaftGetConfiguration",
"server_addr": addr,
}).Error("grpc: Error calling gRPC method")
return nil, err
}

return res, nil
}

// RaftRemovePeerByAddress remove a raft peer
func (grpcc *GRPCClient) RaftRemovePeerByID(addr, peerID string) error {
var conn *grpc.ClientConn

// Initiate a connection with the server
conn, err := grpcc.Connect(addr)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"method": "RaftRemovePeerByID",
"server_addr": addr,
}).Error("grpc: error dialing.")
return err
}
defer conn.Close()

// Synchronous call
d := proto.NewDkronClient(conn)
_, err = d.RaftRemovePeerByID(context.Background(),
&proto.RaftRemovePeerByIDRequest{Id: peerID},
)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"method": "RaftRemovePeerByID",
"server_addr": addr,
}).Error("grpc: Error calling gRPC method")
return err
}

return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/pelletier/go-toml v1.0.1 // indirect
github.com/pkg/errors v0.8.1 // indirect
github.com/russross/blackfriday v0.0.0-20180804101149-46c73eb196ba // indirect
github.com/ryanuber/columnize v2.1.0+incompatible
github.com/sirupsen/logrus v1.2.0
github.com/soheilhy/cmux v0.1.4
github.com/spf13/afero v0.0.0-20171008182726-e67d870304c4 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
github.com/russross/blackfriday v0.0.0-20180804101149-46c73eb196ba h1:8Vzt8HxRjy7hp1eqPKVoAEPK9npQFW2510qlobGzvi0=
github.com/russross/blackfriday v0.0.0-20180804101149-46c73eb196ba/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/columnize v2.1.0+incompatible h1:j1Wcmh8OrK4Q7GXY+V7SVSY8nUWQxHW5TkBe7YUl+2s=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
Expand Down
Loading

0 comments on commit edc6acb

Please sign in to comment.