-
Notifications
You must be signed in to change notification settings - Fork 84
/
Copy pathhandler.go
349 lines (291 loc) · 10 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
package policy
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/google/go-cmp/cmp"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad-autoscaler/plugins"
"github.com/hashicorp/nomad-autoscaler/plugins/manager"
targetpkg "github.com/hashicorp/nomad-autoscaler/plugins/target"
)
const (
cooldownIgnoreTime = 1 * time.Second
)
// Handler monitors a policy for changes and controls when them are sent for
// evaluation.
type Handler struct {
log hclog.Logger
// policyID is the ID of the policy the handler is responsible for.
policyID PolicyID
// pluginManager is used to retrieve an instance of the target plugin used
// by the policy.
pluginManager *manager.PluginManager
// policySource is used to monitor for changes to the policy the handler
// is responsible for.
policySource Source
// ticker controls the frequency the policy is sent for evaluation.
ticker *time.Ticker
// cooldownCh is used to notify the handler that it should enter a cooldown
// period.
cooldownCh chan time.Duration
// running is used to help keep track if the handler is active or not.
running bool
runningLock sync.RWMutex
// ch is used to listen for policy updates.
ch chan Policy
// errCh is used to listen for errors from the policy source.
errCh chan error
// doneCh is used to signal the handler to stop.
doneCh chan struct{}
// reloadCh is used to communicate to the MonitorPolicy routine that it
// should perform a reload.
reloadCh chan struct{}
}
// NewHandler returns a new handler for a policy.
func NewHandler(ID PolicyID, log hclog.Logger, pm *manager.PluginManager, ps Source) *Handler {
return &Handler{
policyID: ID,
log: log.Named("policy_handler").With("policy_id", ID),
pluginManager: pm,
policySource: ps,
ch: make(chan Policy),
errCh: make(chan error),
doneCh: make(chan struct{}),
cooldownCh: make(chan time.Duration),
reloadCh: make(chan struct{}),
}
}
// Run starts the handler and periodically sends the policy for evaluation.
//
// This function blocks until the context provided is canceled or the handler
// is stopped with the Stop() method.
func (h *Handler) Run(ctx context.Context, evalCh chan<- *Evaluation) {
h.log.Trace("starting policy handler")
defer h.Stop()
// Mark the handler as running.
h.runningLock.Lock()
h.running = true
h.runningLock.Unlock()
// Store a local copy of the policy so we can compare it for changes.
var currentPolicy *Policy
// Start with a long ticker until we receive the right interval.
// TODO(luiz): make this a config param
policyReadTimeout := 3 * time.Minute
h.ticker = time.NewTicker(policyReadTimeout)
// Create separate context so we can stop the monitoring Go routine if
// doneCh is closed, but ctx is still valid.
monitorCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Start monitoring the policy for changes.
req := MonitorPolicyReq{ID: h.policyID, ErrCh: h.errCh, ReloadCh: h.reloadCh, ResultCh: h.ch}
go h.policySource.MonitorPolicy(monitorCtx, req)
for {
select {
case <-ctx.Done():
h.log.Trace("stopping policy handler due to context done")
return
case <-h.doneCh:
h.log.Trace("stopping policy handler due to done channel")
return
case err := <-h.errCh:
// In case of error, log the error message and loop around.
// Handlers never stop running unless ctx.Done() or doneCh is
// closed.
// multierror.Error objects are logged differently to allow for a
// more structured output.
merr, ok := err.(*multierror.Error)
if ok && len(merr.Errors) > 1 {
// Transform Errors into a slice of strings to avoid logging
// empty objects when using JSON format.
errors := make([]string, len(merr.Errors))
for i, e := range merr.Errors {
errors[i] = e.Error()
}
h.log.Error(errors[0], "errors", errors[1:])
} else {
h.log.Error(err.Error())
}
continue
case p := <-h.ch:
h.updateHandler(currentPolicy, &p)
currentPolicy = &p
case <-h.ticker.C:
eval, err := h.handleTick(ctx, currentPolicy)
if err != nil {
if err == context.Canceled {
// Context was canceled, return to stop the handler.
return
}
h.log.Error(err.Error())
continue
}
if eval != nil {
evalCh <- eval
}
case ts := <-h.cooldownCh:
// Enforce the cooldown which will block until complete.
if !h.enforceCooldown(ctx, ts) {
// Context was canceled, return to stop the handler.
return
}
}
}
}
// Stop stops the handler and the monitoring Go routine.
func (h *Handler) Stop() {
h.runningLock.Lock()
defer h.runningLock.Unlock()
if h.running {
h.log.Trace("stopping handler")
h.ticker.Stop()
close(h.doneCh)
}
h.running = false
}
func (h *Handler) handleTick(ctx context.Context, policy *Policy) (*Evaluation, error) {
// Timestamp the invocation of this evaluation run. This can be
// used when checking cooldown or emitting metrics to ensure some
// consistency.
curTime := time.Now().UTC().UnixNano()
eval, err := h.generateEvaluation(policy)
if err != nil {
return nil, err
}
// If the evaluation is nil there is nothing to be done this time
// around.
if eval == nil {
return nil, nil
}
// If the target status includes a last event meta key, check for cooldown
// due to out-of-band events. This is also useful if the Autoscaler has
// been re-deployed.
ts, ok := eval.TargetStatus.Meta[targetpkg.MetaKeyLastEvent]
if !ok {
return eval, nil
}
// Convert the last event string. If an error occurs, just log and
// continue with the evaluation. A malformed timestamp shouldn't mean
// we skip scaling.
lastTS, err := strconv.ParseUint(ts, 10, 64)
if err != nil {
h.log.Error("failed to parse last event timestamp as uint64", "error", err)
return eval, nil
}
// Calculate the remaining time period left on the cooldown. If this is
// cooldownIgnoreTime or below, we do not need to enter cooldown. Reasoning
// on ignoring small variations can be seen within GH-138.
cdPeriod := h.calculateRemainingCooldown(policy.Cooldown, curTime, int64(lastTS))
if cdPeriod <= cooldownIgnoreTime {
return eval, nil
}
// Enforce the cooldown which will block until complete. A false response
// means we did not reach the end of cooldown due to a request to shutdown.
if !h.enforceCooldown(ctx, cdPeriod) {
return nil, context.Canceled
}
// If we reach this point, we have entered and exited cooldown. Our data is
// stale, therefore return so that we do not send the eval this time and
// wait for the next tick.
return nil, nil
}
// generateEvaluation returns an evaluation if the policy needs to be evaluated.
// Returning an error will stop the handler.
func (h *Handler) generateEvaluation(policy *Policy) (*Evaluation, error) {
h.log.Trace("tick")
if policy == nil {
// Initial ticker ticked without a policy being set, assume we are not able
// to retrieve the policy and exit.
return nil, fmt.Errorf("timeout: failed to read policy in time")
}
// Exit early if the policy is not enabled.
if !policy.Enabled {
h.log.Debug("policy is not enabled")
return nil, nil
}
// Dispense an instance of target plugin used by the policy.
targetPlugin, err := h.pluginManager.Dispense(policy.Target.Name, plugins.PluginTypeTarget)
if err != nil {
return nil, err
}
targetInst, ok := targetPlugin.Plugin().(targetpkg.Target)
if !ok {
err := fmt.Errorf("plugin %s (%T) is not a target plugin", policy.Target.Name, targetPlugin.Plugin())
return nil, err
}
// Get target status.
h.log.Trace("getting target status")
status, err := targetInst.Status(policy.Target.Config)
if err != nil {
h.log.Warn("failed to get target status", "error", err)
return nil, nil
}
// A nil status indicates the target doesn't exist, so we don't need to
// monitor the policy anymore.
if status == nil {
h.log.Trace("target doesn't exist anymore", "target", policy.Target.Config)
h.Stop()
return nil, nil
}
// Exit early if the target is not ready yet.
if !status.Ready {
h.log.Trace("target is not ready")
return nil, nil
}
// Send policy for evaluation.
h.log.Trace("sending policy for evaluation")
return &Evaluation{
Policy: policy,
TargetStatus: status,
}, nil
}
// updateHandler updates the handler's internal state based on the changes in
// the policy being monitored.
func (h *Handler) updateHandler(current, next *Policy) {
if current == nil {
h.log.Trace("received policy")
} else {
h.log.Trace("received policy change")
h.log.Trace(cmp.Diff(current, next))
}
// Update ticker if it's the first time we receive the policy or if the
// policy's evaluation interval has changed.
if current == nil || current.EvaluationInterval != next.EvaluationInterval {
h.ticker.Stop()
h.ticker = time.NewTicker(next.EvaluationInterval)
}
}
// enforceCooldown blocks until the cooldown period has been reached, or the
// handler has been instructed to exit. The boolean return details whether or
// not the cooldown period passed without being interrupted.
func (h *Handler) enforceCooldown(ctx context.Context, t time.Duration) (complete bool) {
// Log that cooldown is being enforced. This is very useful as cooldown
// blocks the ticker making this the only indication of cooldown to
// operators.
h.log.Debug("scaling policy has been placed into cooldown", "cooldown", t)
// Using a timer directly is mentioned to be more efficient than
// time.After() as long as we ensure to call Stop(). So setup a timer for
// use and defer the stop.
timer := time.NewTimer(t)
defer timer.Stop()
// Cooldown should not mean we miss other handler control signals. So wait
// on all the channels desired here.
select {
case <-timer.C:
complete = true
return
case <-ctx.Done():
return
case <-h.doneCh:
return
}
}
// calculateRemainingCooldown calculates the remaining cooldown based on the
// time since the last event. The remaining period can be negative, indicating
// no cooldown period is required.
func (h *Handler) calculateRemainingCooldown(cd time.Duration, ts, lastEvent int64) time.Duration {
return cd - time.Duration(ts-lastEvent)
}