Skip to content

Commit

Permalink
Merge pull request #79 from Random-Liu/change-resync-mechanism
Browse files Browse the repository at this point in the history
Update NPD to only do forcibly sync every 1 minutes.
  • Loading branch information
Random-Liu authored Jan 24, 2017
2 parents 4f964a1 + 60975f5 commit ba5f5a1
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 28 deletions.
58 changes: 35 additions & 23 deletions pkg/condition/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import (
)

const (
// updatePeriod is the period which condition manager checks update.
// updatePeriod is the period at which condition manager checks update.
updatePeriod = 1 * time.Second
// updateTimeout is the timeout of condition update operation.
updateTimeout = 10 * time.Second
// resyncPeriod is the period which condition manager does resync no matter whether these is any update.
resyncPeriod = 30 * time.Second
// resyncPeriod is the period at which condition manager does resync, only updates when needed.
resyncPeriod = 10 * time.Second
// heartbeatPeriod is the period at which condition manager does forcibly sync with apiserver.
heartbeatPeriod = 1 * time.Minute
)

// ConditionManager synchronizes node conditions with the apiserver with problem client.
Expand All @@ -52,17 +52,21 @@ const (
type ConditionManager interface {
// Start starts the condition manager.
Start()
// UpdateCondition update specific condition.
// UpdateCondition updates a specific condition.
UpdateCondition(types.Condition)
}

type conditionManager struct {
sync.Mutex
clock clock.Clock
latest time.Time
client problemclient.Client
updates map[string]types.Condition
conditions map[string]types.Condition
clock clock.Clock
latestTry time.Time
resyncNeeded bool
client problemclient.Client
// updatesLock is the lock protecting updates. Only the field `updates`
// will be accessed by random caller and the sync routine, so only it
// needs to be protected.
updatesLock sync.Mutex
updates map[string]types.Condition
conditions map[string]types.Condition
}

// NewConditionManager creates a condition manager.
Expand All @@ -80,8 +84,8 @@ func (c *conditionManager) Start() {
}

func (c *conditionManager) UpdateCondition(condition types.Condition) {
c.Lock()
defer c.Unlock()
c.updatesLock.Lock()
defer c.updatesLock.Unlock()
// New node condition will override the old condition, because we only need the newest
// condition for each condition type.
c.updates[condition.Type] = condition
Expand All @@ -92,17 +96,17 @@ func (c *conditionManager) syncLoop() {
for {
select {
case <-updateCh:
if c.checkUpdates() || c.checkResync() {
if c.needUpdates() || c.needResync() || c.needHeartbeat() {
c.sync()
}
}
}
}

// checkUpdates checks whether there are recent updates.
func (c *conditionManager) checkUpdates() bool {
c.Lock()
defer c.Unlock()
// needUpdates checks whether there are recent updates.
func (c *conditionManager) needUpdates() bool {
c.updatesLock.Lock()
defer c.updatesLock.Unlock()
needUpdate := false
for t, update := range c.updates {
if !reflect.DeepEqual(c.conditions[t], update) {
Expand All @@ -114,21 +118,29 @@ func (c *conditionManager) checkUpdates() bool {
return needUpdate
}

// checkResync checks whether a resync is needed.
func (c *conditionManager) checkResync() bool {
return c.clock.Now().Sub(c.latest) >= resyncPeriod
// needResync checks whether a resync is needed.
func (c *conditionManager) needResync() bool {
// Only update when resync is needed.
return c.clock.Now().Sub(c.latestTry) >= resyncPeriod && c.resyncNeeded
}

// needHeartbeat checks whether a forcible heartbeat is needed.
func (c *conditionManager) needHeartbeat() bool {
return c.clock.Now().Sub(c.latestTry) >= heartbeatPeriod
}

// sync synchronizes node conditions with the apiserver.
func (c *conditionManager) sync() {
c.latestTry = c.clock.Now()
c.resyncNeeded = false
conditions := []api.NodeCondition{}
for i := range c.conditions {
conditions = append(conditions, problemutil.ConvertToAPICondition(c.conditions[i]))
}
if err := c.client.SetConditions(conditions); err != nil {
// The conditions will be updated again in future sync
glog.Errorf("failed to update node conditions: %v", err)
c.resyncNeeded = true
return
}
c.latest = c.clock.Now()
}
33 changes: 28 additions & 5 deletions pkg/condition/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package condition

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -47,7 +48,7 @@ func newTestCondition(condition string) types.Condition {
}
}

func TestCheckUpdates(t *testing.T) {
func TestNeedUpdates(t *testing.T) {
m, _, _ := newTestManager()
var c types.Condition
for desc, test := range map[string]struct {
Expand Down Expand Up @@ -75,19 +76,41 @@ func TestCheckUpdates(t *testing.T) {
c = newTestCondition(test.condition)
}
m.UpdateCondition(c)
assert.Equal(t, test.update, m.checkUpdates(), desc)
assert.Equal(t, test.update, m.needUpdates(), desc)
assert.Equal(t, c, m.conditions[c.Type], desc)
}
}

func TestSync(t *testing.T) {
func TestResync(t *testing.T) {
m, fakeClient, fakeClock := newTestManager()
condition := newTestCondition("TestCondition")
m.conditions = map[string]types.Condition{condition.Type: condition}
m.sync()
expected := []api.NodeCondition{problemutil.ConvertToAPICondition(condition)}
assert.Nil(t, fakeClient.AssertConditions(expected), "Condition should be updated via client")
assert.False(t, m.checkResync(), "Should not resync before timeout exceeds")

assert.False(t, m.needResync(), "Should not resync before resync period")
fakeClock.Step(resyncPeriod)
assert.False(t, m.needResync(), "Should not resync after resync period without resync needed")

fakeClient.InjectError("SetConditions", fmt.Errorf("injected error"))
m.sync()

assert.False(t, m.needResync(), "Should not resync before resync period")
fakeClock.Step(resyncPeriod)
assert.True(t, m.checkResync(), "Should resync after timeout exceeds")
assert.True(t, m.needResync(), "Should resync after resync period and resync is needed")
}

func TestHeartbeat(t *testing.T) {
m, fakeClient, fakeClock := newTestManager()
condition := newTestCondition("TestCondition")
m.conditions = map[string]types.Condition{condition.Type: condition}
m.sync()
expected := []api.NodeCondition{problemutil.ConvertToAPICondition(condition)}
assert.Nil(t, fakeClient.AssertConditions(expected), "Condition should be updated via client")

assert.False(t, m.needHeartbeat(), "Should not heartbeat before heartbeat period")

fakeClock.Step(heartbeatPeriod)
assert.True(t, m.needHeartbeat(), "Should heartbeat after heartbeat period")
}

0 comments on commit ba5f5a1

Please sign in to comment.