Skip to content

Commit

Permalink
edge trigger node update
Browse files Browse the repository at this point in the history
test update config copy trigger
  • Loading branch information
chelseakomlo committed Feb 16, 2018
1 parent 6820c96 commit 7b0f18b
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 89 deletions.
138 changes: 76 additions & 62 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
vaultapi "github.com/hashicorp/vault/api"
"github.com/mitchellh/hashstructure"
"github.com/shirou/gopsutil/host"
)

Expand Down Expand Up @@ -121,6 +120,10 @@ type Client struct {
// triggerDiscoveryCh triggers Consul discovery; see triggerDiscovery
triggerDiscoveryCh chan struct{}

// triggerNodeUpdate triggers the client to mark the Node as changed and
// update it.
triggerNodeUpdate chan struct{}

// discovered will be ticked whenever Consul discovery completes
// successfully
serversDiscoveredCh chan struct{}
Expand Down Expand Up @@ -194,6 +197,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
shutdownCh: make(chan struct{}),
servers: newServerList(),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}),
serversDiscoveredCh: make(chan struct{}),
}

Expand Down Expand Up @@ -927,30 +931,72 @@ func (c *Client) reservePorts() {
func (c *Client) updateNodeFromFingerprint(response *cstructs.FingerprintResponse) *structs.Node {
c.configLock.Lock()
defer c.configLock.Unlock()
for name, val := range response.Attributes {
if val == "" {

var nodeHasChanged bool

for name, new_val := range response.Attributes {
old_val := c.config.Node.Attributes[name]
if old_val == new_val {
continue
}

nodeHasChanged = true
if new_val == "" {
delete(c.config.Node.Attributes, name)
} else {
c.config.Node.Attributes[name] = val
c.config.Node.Attributes[name] = new_val
}
}

// update node links and resources from the diff created from
// fingerprinting
for name, val := range response.Links {
if val == "" {
for name, new_val := range response.Links {
old_val := c.config.Node.Links[name]
if old_val == new_val {
continue
}

nodeHasChanged = true
if new_val == "" {
delete(c.config.Node.Links, name)
} else {
c.config.Node.Links[name] = val
c.config.Node.Links[name] = new_val
}
}

if response.Resources != nil {
if response.Resources != nil && !resourcesAreEqual(c.config.Node.Resources, response.Resources) {
nodeHasChanged = true
c.config.Node.Resources.Merge(response.Resources)
}

if nodeHasChanged {
c.updateNode()
}
return c.config.Node
}

// resourcesAreEqual is a temporary function to compare whether resources are
// equal. We can use this until we change fingerprinters to set pointers on a
// return type.
func resourcesAreEqual(first, second *structs.Resources) bool {
if first.CPU != second.CPU {
return false
}
if first.MemoryMB != second.MemoryMB {
return false
}
if first.DiskMB != second.DiskMB {
return false
}
if first.IOPS != second.IOPS {
return false
}
if len(first.Networks) != len(second.Networks) {
return false
}
return true
}

// retryIntv calculates a retry interval value given the base
func (c *Client) retryIntv(base time.Duration) time.Duration {
if c.config.DevMode {
Expand All @@ -962,21 +1008,11 @@ func (c *Client) retryIntv(base time.Duration) time.Duration {
// registerAndHeartbeat is a long lived goroutine used to register the client
// and then start heartbeatng to the server.
func (c *Client) registerAndHeartbeat() {
// Before registering capture the hashes of the Node's attribute and
// metadata maps. The hashes may be out of date with what registers but this
// is okay since the loop checking for node updates will detect this and
// reregister. This is necessary to avoid races between the periodic
// fingerprinters and the node registering.
attrHash, metaHash, err := nodeMapHashes(c.Node())
if err != nil {
c.logger.Printf("[ERR] client: failed to determine initial node hashes. May result in stale node being registered: %v", err)
}

// Register the node
c.retryRegisterNode()

// Start watching changes for node changes
go c.watchNodeUpdates(attrHash, metaHash)
go c.watchNodeUpdates()

// Setup the heartbeat timer, for the initial registration
// we want to do this quickly. We want to do it extra quickly
Expand Down Expand Up @@ -1057,40 +1093,6 @@ func (c *Client) run() {
}
}

// nodeMapHashes returns the hashes of the passed Node's attribute and metadata
// maps.
func nodeMapHashes(node *structs.Node) (attrHash, metaHash uint64, err error) {
attrHash, err = hashstructure.Hash(node.Attributes, nil)
if err != nil {
return 0, 0, fmt.Errorf("unable to calculate node attributes hash: %v", err)
}
// Calculate node meta map hash
metaHash, err = hashstructure.Hash(node.Meta, nil)
if err != nil {
return 0, 0, fmt.Errorf("unable to calculate node meta hash: %v", err)
}
return attrHash, metaHash, nil
}

// hasNodeChanged calculates a hash for the node attributes- and meta map.
// The new hash values are compared against the old (passed-in) hash values to
// determine if the node properties have changed. It returns the new hash values
// in case they are different from the old hash values.
func (c *Client) hasNodeChanged(oldAttrHash uint64, oldMetaHash uint64) (bool, uint64, uint64) {
c.configLock.RLock()
defer c.configLock.RUnlock()

// Check if the Node that is being updated by fingerprinters has changed.
newAttrHash, newMetaHash, err := nodeMapHashes(c.config.Node)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node hashes: %v", err)
}
if newAttrHash != oldAttrHash || newMetaHash != oldMetaHash {
return true, newAttrHash, newMetaHash
}
return false, oldAttrHash, oldMetaHash
}

// retryRegisterNode is used to register the node or update the registration and
// retry in case of failure.
func (c *Client) retryRegisterNode() {
Expand Down Expand Up @@ -1484,19 +1486,28 @@ OUTER:
}
}

// watchNodeUpdates periodically checks for changes to the node attributes or
// meta map. The passed hashes are the initial hash values for the attribute and
// metadata of the node respectively.
func (c *Client) watchNodeUpdates(attrHash, metaHash uint64) {
c.logger.Printf("[DEBUG] client: periodically checking for node changes at duration %v", nodeUpdateRetryIntv)
// updateNode triggers a client to update its node copy if it isn't doing
// so already
func (c *Client) updateNode() {
select {
case c.triggerNodeUpdate <- struct{}{}:
// Node update goroutine was released to execute
default:
// Node update goroutine was already running
}
}

// watchNodeUpdates blocks until it is edge triggered. Once triggered,
// it will update the client node copy and re-register the node.
func (c *Client) watchNodeUpdates() {
c.logger.Printf("[DEBUG] client: starting process to watch for node updates.")

var changed bool
var hasChanged bool
for {
select {
case <-time.After(c.retryIntv(nodeUpdateRetryIntv)):
changed, attrHash, metaHash = c.hasNodeChanged(attrHash, metaHash)
if changed {
c.logger.Printf("[DEBUG] client: state changed, updating node.")
if hasChanged {
c.logger.Printf("[DEBUG] client: state changed, updating node and re-registering.")

// Update the config copy.
c.configLock.Lock()
Expand All @@ -1506,6 +1517,9 @@ func (c *Client) watchNodeUpdates(attrHash, metaHash uint64) {

c.retryRegisterNode()
}
hasChanged = false
case <-c.triggerNodeUpdate:
hasChanged = true
case <-c.shutdownCh:
return
}
Expand Down
66 changes: 39 additions & 27 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/hashstructure"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -223,36 +222,49 @@ func TestClient_Fingerprint(t *testing.T) {
require.NotEqual("", node.Attributes["driver.mock_driver"])
}

func TestClient_HasNodeChanged(t *testing.T) {
func TestClient_TriggerNodeUpdate(t *testing.T) {
driver.CheckForMockDriver(t)
t.Parallel()
c := testClient(t, nil)
defer c.Shutdown()

node := c.config.Node
attrHash, err := hashstructure.Hash(node.Attributes, nil)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node attributes hash: %v", err)
}
// Calculate node meta map hash
metaHash, err := hashstructure.Hash(node.Meta, nil)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node meta hash: %v", err)
}
if changed, _, _ := c.hasNodeChanged(attrHash, metaHash); changed {
t.Fatalf("Unexpected hash change.")
}
// these constants are only defined when nomad_test is enabled, so these fail
// our linter without explicit disabling.
c1 := testClient(t, func(c *config.Config) {
c.Options = map[string]string{
driver.ShutdownPeriodicAfter: "true", // nolint: varcheck
driver.ShutdownPeriodicDuration: "3", // nolint: varcheck
}
})
defer c1.Shutdown()

// Change node attribute
node.Attributes["arch"] = "xyz_86"
if changed, newAttrHash, _ := c.hasNodeChanged(attrHash, metaHash); !changed {
t.Fatalf("Expected hash change in attributes: %d vs %d", attrHash, newAttrHash)
}
mockDriverName := "driver.mock_driver"

// Change node meta map
node.Meta["foo"] = "bar"
if changed, _, newMetaHash := c.hasNodeChanged(attrHash, metaHash); !changed {
t.Fatalf("Expected hash change in meta map: %d vs %d", metaHash, newMetaHash)
}
go c1.watchNodeUpdates()
c1.updateNode()
// This needs to be directly called as otherwise the client hangs on
// attempt to register with a server. S[ecifically, retryRegisterNode is
// blocking

// test that the client's copy of the node is also updated
testutil.WaitForResult(func() (bool, error) {
mockDriverStatusCopy := c1.configCopy.Node.Attributes[mockDriverName]
if mockDriverStatusCopy == "" {
return false, fmt.Errorf("mock driver attribute should be set on the client")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

// test that the client's copy of the node is also updated
testutil.WaitForResult(func() (bool, error) {
mockDriverStatusCopy := c1.configCopy.Node.Attributes[mockDriverName]
if mockDriverStatusCopy != "" {
return false, fmt.Errorf("mock driver attribute should not be set on the client")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

func TestClient_Fingerprint_Periodic(t *testing.T) {
Expand Down
33 changes: 33 additions & 0 deletions client/fingerprint_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,39 @@ func TestFingerprintManager_Run_MockDriver(t *testing.T) {
require.NotEqual("", node.Attributes["driver.mock_driver"])
}

func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) {
driver.CheckForMockDriver(t)
t.Parallel()
require := require.New(t)

node := &structs.Node{Resources: &structs.Resources{}}

updateNode := func(r *cstructs.FingerprintResponse) *structs.Node {
if r.Resources != nil {
node.Resources.Merge(r.Resources)
}
return node
}
conf := config.DefaultConfig()
getConfig := func() *config.Config {
return conf
}

fm := NewFingerprintManager(
getConfig,
node,
make(chan struct{}),
updateNode,
testLogger(),
)

err := fm.Run()
require.Nil(err)
require.NotEqual(0, node.Resources.CPU)
require.NotEqual(0, node.Resources.MemoryMB)
require.NotEqual(0, node.Resources.DiskMB)
}

func TestFingerprintManager_Fingerprint_Run(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down

0 comments on commit 7b0f18b

Please sign in to comment.