Skip to content

Commit

Permalink
Merge pull request #2009 from hashicorp/f-use-embedded-consul
Browse files Browse the repository at this point in the history
Add a chaos test for consul syncer and fix some races it found
  • Loading branch information
schmichael committed Dec 5, 2016
2 parents ec4a0d2 + 19fd195 commit cd3dab9
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 26 deletions.
193 changes: 193 additions & 0 deletions command/agent/consul/chaos_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// +build chaos

package consul

import (
"fmt"
"io/ioutil"
"sort"
"strings"
"sync"
"testing"
"time"

"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
)

func TestSyncerChaos(t *testing.T) {
// Create an embedded Consul server
testconsul := testutil.NewTestServerConfig(t, func(c *testutil.TestServerConfig) {
// If -v wasn't specified squelch consul logging
if !testing.Verbose() {
c.Stdout = ioutil.Discard
c.Stderr = ioutil.Discard
}
})
defer testconsul.Stop()

// Configure Syncer to talk to the test server
cconf := config.DefaultConsulConfig()
cconf.Addr = testconsul.HTTPAddr

clientSyncer, err := NewSyncer(cconf, nil, logger)
if err != nil {
t.Fatalf("Error creating Syncer: %v", err)
}
defer clientSyncer.Shutdown()

execSyncer, err := NewSyncer(cconf, nil, logger)
if err != nil {
t.Fatalf("Error creating Syncer: %v", err)
}
defer execSyncer.Shutdown()

clientService := &structs.Service{Name: "nomad-client"}
services := map[ServiceKey]*structs.Service{
GenerateServiceKey(clientService): clientService,
}
if err := clientSyncer.SetServices("client", services); err != nil {
t.Fatalf("error setting client service: %v", err)
}

const execn = 100
const reapern = 2
errors := make(chan error, 100)
wg := sync.WaitGroup{}

// Start goroutines to concurrently SetServices
for i := 0; i < execn; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
domain := ServiceDomain(fmt.Sprintf("exec-%d", i))
services := map[ServiceKey]*structs.Service{}
for ii := 0; ii < 10; ii++ {
s := &structs.Service{Name: fmt.Sprintf("exec-%d-%d", i, ii)}
services[GenerateServiceKey(s)] = s
if err := execSyncer.SetServices(domain, services); err != nil {
select {
case errors <- err:
default:
}
return
}
time.Sleep(1)
}
}(i)
}

// SyncServices runs a timer started by Syncer.Run which we don't use
// in this test, so run SyncServices concurrently
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < execn; i++ {
if err := execSyncer.SyncServices(); err != nil {
select {
case errors <- err:
default:
}
return
}
time.Sleep(100)
}
}()

wg.Add(1)
go func() {
defer wg.Done()
if err := clientSyncer.ReapUnmatched([]ServiceDomain{"nomad-client"}); err != nil {
select {
case errors <- err:
default:
}
return
}
}()

// Reap all but exec-0-*
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < execn; i++ {
if err := execSyncer.ReapUnmatched([]ServiceDomain{"exec-0", ServiceDomain(fmt.Sprintf("exec-%d", i))}); err != nil {
select {
case errors <- err:
default:
}
}
time.Sleep(100)
}
}()

go func() {
wg.Wait()
close(errors)
}()

for err := range errors {
if err != nil {
t.Errorf("error setting service from executor goroutine: %v", err)
}
}

// Do a final ReapUnmatched to get consul back into a deterministic state
if err := execSyncer.ReapUnmatched([]ServiceDomain{"exec-0"}); err != nil {
t.Fatalf("error doing final reap: %v", err)
}

// flattenedServices should be fully populated as ReapUnmatched doesn't
// touch Syncer's internal state
expected := map[string]struct{}{}
for i := 0; i < execn; i++ {
for ii := 0; ii < 10; ii++ {
expected[fmt.Sprintf("exec-%d-%d", i, ii)] = struct{}{}
}
}

for _, s := range execSyncer.flattenedServices() {
_, ok := expected[s.Name]
if !ok {
t.Errorf("%s unexpected", s.Name)
}
delete(expected, s.Name)
}
if len(expected) > 0 {
left := []string{}
for s := range expected {
left = append(left, s)
}
sort.Strings(left)
t.Errorf("Couldn't find %d names in flattened services:\n%s", len(expected), strings.Join(left, "\n"))
}

// All but exec-0 and possibly some of exec-99 should have been reaped
{
services, err := execSyncer.client.Agent().Services()
if err != nil {
t.Fatalf("Error getting services: %v", err)
}
expected := []int{}
for k, service := range services {
if service.Service == "consul" {
continue
}
i := -1
ii := -1
fmt.Sscanf(service.Service, "exec-%d-%d", &i, &ii)
switch {
case i == -1 || ii == -1:
t.Errorf("invalid service: %s -> %s", k, service.Service)
case i != 0 || ii > 9:
t.Errorf("unexpected service: %s -> %s", k, service.Service)
default:
expected = append(expected, ii)
}
}
if len(expected) != 10 {
t.Errorf("expected 0-9 but found: %#q", expected)
}
}
}
31 changes: 20 additions & 11 deletions command/agent/consul/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"time"

consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-multierror"

"github.com/hashicorp/nomad/nomad/structs"
Expand All @@ -56,11 +55,11 @@ const (
nomadServicePrefix = "_nomad"

// The periodic time interval for syncing services and checks with Consul
syncInterval = 5 * time.Second
defaultSyncInterval = 6 * time.Second

// syncJitter provides a little variance in the frequency at which
// defaultSyncJitter provides a little variance in the frequency at which
// Syncer polls Consul.
syncJitter = 8
defaultSyncJitter = time.Second

// ttlCheckBuffer is the time interval that Nomad can take to report Consul
// the check result
Expand Down Expand Up @@ -144,6 +143,13 @@ type Syncer struct {
periodicCallbacks map[string]types.PeriodicCallback
notifySyncCh chan struct{}
periodicLock sync.RWMutex

// The periodic time interval for syncing services and checks with Consul
syncInterval time.Duration

// syncJitter provides a little variance in the frequency at which
// Syncer polls Consul.
syncJitter time.Duration
}

// NewSyncer returns a new consul.Syncer
Expand All @@ -168,8 +174,11 @@ func NewSyncer(consulConfig *config.ConsulConfig, shutdownCh chan struct{}, logg
checkGroups: make(map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration),
checkRunners: make(map[consulCheckID]*CheckRunner),
periodicCallbacks: make(map[string]types.PeriodicCallback),
notifySyncCh: make(chan struct{}, 1),
// default noop implementation of addrFinder
addrFinder: func(string) (string, int) { return "", 0 },
addrFinder: func(string) (string, int) { return "", 0 },
syncInterval: defaultSyncInterval,
syncJitter: defaultSyncJitter,
}

return &consulSyncer, nil
Expand Down Expand Up @@ -809,7 +818,7 @@ func (c *Syncer) Run() {
for {
select {
case <-sync.C:
d := syncInterval - lib.RandomStagger(syncInterval/syncJitter)
d := c.syncInterval - c.syncJitter
sync.Reset(d)

if err := c.SyncServices(); err != nil {
Expand All @@ -824,7 +833,7 @@ func (c *Syncer) Run() {
c.consulAvailable = true
}
case <-c.notifySyncCh:
sync.Reset(syncInterval)
sync.Reset(0)
case <-c.shutdownCh:
c.Shutdown()
case <-c.notifyShutdownCh:
Expand Down Expand Up @@ -872,8 +881,8 @@ func (c *Syncer) SyncServices() error {
// the syncer
func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentService) map[consulServiceID]*consul.AgentService {
localServices := make(map[consulServiceID]*consul.AgentService, len(consulServices))
c.registryLock.RLock()
defer c.registryLock.RUnlock()
c.groupsLock.RLock()
defer c.groupsLock.RUnlock()
for serviceID, service := range consulServices {
for domain := range c.servicesGroups {
if strings.HasPrefix(service.ID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) {
Expand All @@ -889,8 +898,8 @@ func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentSer
// services with Syncer's idPrefix.
func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[consulCheckID]*consul.AgentCheck {
localChecks := make(map[consulCheckID]*consul.AgentCheck, len(consulChecks))
c.registryLock.RLock()
defer c.registryLock.RUnlock()
c.groupsLock.RLock()
defer c.groupsLock.RUnlock()
for checkID, check := range consulChecks {
for domain := range c.checkGroups {
if strings.HasPrefix(check.ServiceID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) {
Expand Down
Loading

0 comments on commit cd3dab9

Please sign in to comment.