Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrates Consul with "stage one" of HashiCorp Raft library v2. #2222

Merged
merged 12 commits into from
Aug 9, 2016
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions consul/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ func TestACL_NonAuthority_NotFound(t *testing.T) {
}

testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
Expand Down Expand Up @@ -275,8 +275,8 @@ func TestACL_NonAuthority_Found(t *testing.T) {
}

testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
Expand Down Expand Up @@ -351,8 +351,8 @@ func TestACL_NonAuthority_Management(t *testing.T) {
}

testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
Expand Down Expand Up @@ -408,8 +408,8 @@ func TestACL_DownPolicy_Deny(t *testing.T) {
}

testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
Expand Down Expand Up @@ -482,8 +482,8 @@ func TestACL_DownPolicy_Allow(t *testing.T) {
}

testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
Expand Down Expand Up @@ -558,8 +558,8 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) {
}

testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
Expand Down
4 changes: 4 additions & 0 deletions consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ func DefaultConfig() *Config {
conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort
conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort

// Enable interoperability with unversioned Raft library, and don't
// start using new ID-based features yet.
conf.RaftConfig.ProtocolVersion = 1

// Disable shutdown on removal
conf.RaftConfig.ShutdownOnRemove = false

Expand Down
52 changes: 42 additions & 10 deletions consul/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,10 +543,26 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
}
}

// TODO (slackpad) - This will need to be changed once we support node IDs.
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()

// See if it's already in the configuration. It's harmless to re-add it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
return err
}
for _, server := range configFuture.Configuration().Servers {
if server.Address == raft.ServerAddress(addr) {
return nil
}
}

// Attempt to add as a peer
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port}
future := s.raft.AddPeer(addr.String())
if err := future.Error(); err != nil && err != raft.ErrKnownPeer {
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
return err
}
Expand All @@ -555,15 +571,31 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {

// removeConsulServer is used to try to remove a consul server that has left
func (s *Server) removeConsulServer(m serf.Member, port int) error {
// Attempt to remove as peer
peer := &net.TCPAddr{IP: m.Addr, Port: port}
future := s.raft.RemovePeer(peer.String())
if err := future.Error(); err != nil && err != raft.ErrUnknownPeer {
// TODO (slackpad) - This will need to be changed once we support node IDs.
addr := (&net.TCPAddr{IP: m.Addr, Port: port}).String()

// See if it's already in the configuration. It's harmless to re-remove it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
return err
}
for _, server := range configFuture.Configuration().Servers {
if server.Address == raft.ServerAddress(addr) {
goto REMOVE
}
}
return nil

REMOVE:
// Attempt to remove as a peer.
future := s.raft.RemovePeer(raft.ServerAddress(addr))
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
peer, err)
addr, err)
return err
} else if err == nil {
s.logger.Printf("[INFO] consul: removed server '%s' as peer", m.Name)
}
return nil
}
Expand Down
24 changes: 12 additions & 12 deletions consul/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ func TestLeader_LeftServer(t *testing.T) {

for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 3, nil
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
Expand All @@ -358,8 +358,8 @@ func TestLeader_LeftServer(t *testing.T) {
}

for _, s := range servers[1:] {
peers, _ := s.raftPeers.Peers()
return len(peers) == 2, errors.New(fmt.Sprintf("%v", peers))
peers, _ := s.numPeers()
return peers == 2, errors.New(fmt.Sprintf("%d", peers))
}

return true, nil
Expand Down Expand Up @@ -394,8 +394,8 @@ func TestLeader_LeftLeader(t *testing.T) {

for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 3, nil
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
Expand Down Expand Up @@ -423,8 +423,8 @@ func TestLeader_LeftLeader(t *testing.T) {
}
remain = s
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 2, errors.New(fmt.Sprintf("%v", peers))
peers, _ := s.numPeers()
return peers == 2, errors.New(fmt.Sprintf("%d", peers))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
Expand Down Expand Up @@ -472,8 +472,8 @@ func TestLeader_MultiBootstrap(t *testing.T) {

// Ensure we don't have multiple raft peers
for _, s := range servers {
peers, _ := s.raftPeers.Peers()
if len(peers) != 1 {
peers, _ := s.numPeers()
if peers != 1 {
t.Fatalf("should only have 1 raft peer!")
}
}
Expand Down Expand Up @@ -505,8 +505,8 @@ func TestLeader_TombstoneGC_Reset(t *testing.T) {

for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 3, nil
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
Expand Down
5 changes: 3 additions & 2 deletions consul/raft_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/raft"
)

// RaftLayer implements the raft.StreamLayer interface,
Expand Down Expand Up @@ -80,8 +81,8 @@ func (l *RaftLayer) Addr() net.Addr {
}

// Dial is used to create a new outgoing connection
func (l *RaftLayer) Dial(address string, timeout time.Duration) (net.Conn, error) {
conn, err := net.DialTimeout("tcp", address, timeout)
func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net.Conn, error) {
conn, err := net.DialTimeout("tcp", string(address), timeout)
if err != nil {
return nil, err
}
Expand Down
36 changes: 24 additions & 12 deletions consul/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)

Expand Down Expand Up @@ -150,7 +151,7 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
// See if it's configured as part of our DC.
if parts.Datacenter == s.config.Datacenter {
s.localLock.Lock()
s.localConsuls[parts.Addr.String()] = parts
s.localConsuls[raft.ServerAddress(parts.Addr.String())] = parts
s.localLock.Unlock()
}

Expand Down Expand Up @@ -193,20 +194,20 @@ func (s *Server) wanNodeJoin(me serf.MemberEvent) {

// maybeBootsrap is used to handle bootstrapping when a new consul server joins
func (s *Server) maybeBootstrap() {
// Bootstrap can only be done if there are no committed logs, remove our
// expectations of bootstrapping. This is slightly cheaper than the full
// check that BootstrapCluster will do, so this is a good pre-filter.
index, err := s.raftStore.LastIndex()
if err != nil {
s.logger.Printf("[ERR] consul: failed to read last raft index: %v", err)
return
}

// Bootstrap can only be done if there are no committed logs,
// remove our expectations of bootstrapping
if index != 0 {
s.config.BootstrapExpect = 0
return
}

// Scan for all the known servers
// Scan for all the known servers.
members := s.serfLAN.Members()
addrs := make([]string, 0)
for _, member := range members {
Expand All @@ -230,18 +231,29 @@ func (s *Server) maybeBootstrap() {
addrs = append(addrs, addr.String())
}

// Skip if we haven't met the minimum expect count
// Skip if we haven't met the minimum expect count.
if len(addrs) < s.config.BootstrapExpect {
return
}

// Update the peer set
s.logger.Printf("[INFO] consul: Attempting bootstrap with nodes: %v", addrs)
if err := s.raft.SetPeers(addrs).Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to bootstrap peers: %v", err)
// Attempt a live bootstrap!
s.logger.Printf("[INFO] consul: found expected number of peers, attempting to bootstrap cluster...")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its useful to have it log the IPs it is attempting to bootstrap with as a sanity check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call - I'll add them to this log message.

var configuration raft.Configuration
for _, addr := range addrs {
// TODO (slackpad) - This will need to be updated once we support
// node IDs.
server := raft.Server{
ID: raft.ServerID(addr),
Address: raft.ServerAddress(addr),
}
configuration.Servers = append(configuration.Servers, server)
}
future := s.raft.BootstrapCluster(configuration)
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to bootstrap cluster: %v", err)
}

// Bootstrapping complete, don't enter this again
// Bootstrapping complete, don't enter this again.
s.config.BootstrapExpect = 0
}

Expand All @@ -255,7 +267,7 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {
s.logger.Printf("[INFO] consul: removing LAN server %s", parts)

s.localLock.Lock()
delete(s.localConsuls, parts.Addr.String())
delete(s.localConsuls, raft.ServerAddress(parts.Addr.String()))
s.localLock.Unlock()
}
}
Expand Down
Loading