-
Notifications
You must be signed in to change notification settings - Fork 48
/
circuit.go
468 lines (411 loc) · 17.3 KB
/
circuit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
package circuit
import (
"context"
"expvar"
"sync"
"time"
"github.com/cep21/circuit/v4/faststats"
)
// Circuit is a circuit breaker pattern implementation that can accept commands and open/close on failures
type Circuit struct {
// circuitStats
CmdMetricCollector RunMetricsCollection
FallbackMetricCollector FallbackMetricsCollection
CircuitMetricsCollector MetricsCollection
// This is used to help run `Go` calls in the background
goroutineWrapper goroutineWrapper
name string
// The passed in config is not atomic and thread safe. We reference thread safe values during circuit operations
// with atomicCircuitConfig. Those are, also, the only values that can actually be changed while a circuit is
// running.
notThreadSafeConfig Config
// The mutex supports setting and reading the command properties, but is not locked when we reference the config
// while live: we use the threadSafeConfig below
notThreadSafeConfigMu sync.Mutex
threadSafeConfig atomicCircuitConfig
// Tracks if the circuit has been shut open or closed
isOpen faststats.AtomicBoolean
// Tracks how many commands are currently running
concurrentCommands faststats.AtomicInt64
// Tracks how many fallbacks are currently running
concurrentFallbacks faststats.AtomicInt64
// ClosedToOpen controls when to open a closed circuit
ClosedToOpen ClosedToOpen
// openToClosed controls when to close an open circuit
OpenToClose OpenToClosed
timeNow func() time.Time
}
// NewCircuitFromConfig creates an inline circuit. If you want to group all your circuits together, you should probably
// just use Manager struct instead.
func NewCircuitFromConfig(name string, config Config) *Circuit {
config.Merge(defaultCommandProperties)
ret := &Circuit{
name: name,
notThreadSafeConfig: config,
}
ret.SetConfigNotThreadSafe(config)
return ret
}
// ConcurrentCommands returns how many commands are currently running
func (c *Circuit) ConcurrentCommands() int64 {
return c.concurrentCommands.Get()
}
// ConcurrentFallbacks returns how many fallbacks are currently running
func (c *Circuit) ConcurrentFallbacks() int64 {
return c.concurrentFallbacks.Get()
}
// SetConfigThreadSafe changes the current configuration of this circuit. Note that many config parameters, specifically those
// around creating stat tracking buckets, are not modifiable during runtime for efficiency reasons. Those buckets
// will stay the same.
func (c *Circuit) SetConfigThreadSafe(config Config) {
c.notThreadSafeConfigMu.Lock()
defer c.notThreadSafeConfigMu.Unlock()
c.notThreadSafeConfig = config
c.threadSafeConfig.reset(c.notThreadSafeConfig)
if cfg, ok := c.OpenToClose.(Configurable); ok {
cfg.SetConfigThreadSafe(config)
}
if cfg, ok := c.ClosedToOpen.(Configurable); ok {
cfg.SetConfigThreadSafe(config)
}
}
// Config returns the circuit's configuration. Modifications to this configuration are not reflected by the circuit.
// In other words, this creates a copy.
func (c *Circuit) Config() Config {
c.notThreadSafeConfigMu.Lock()
defer c.notThreadSafeConfigMu.Unlock()
return c.notThreadSafeConfig
}
// SetConfigNotThreadSafe is only useful during construction before a circuit is being used. It is not thread safe,
// but will modify all the circuit's internal structs to match what the config wants. It also doe *NOT* use the
// default configuration parameters.
func (c *Circuit) SetConfigNotThreadSafe(config Config) {
c.notThreadSafeConfigMu.Lock()
// Set, but do not reference this config inside this function, since that would not be thread safe (no mu protection)
c.notThreadSafeConfig = config
c.notThreadSafeConfigMu.Unlock()
c.goroutineWrapper.lostErrors = config.General.GoLostErrors
c.timeNow = config.General.TimeKeeper.Now
c.OpenToClose = config.General.OpenToClosedFactory()
c.ClosedToOpen = config.General.ClosedToOpenFactory()
if cfg, ok := c.OpenToClose.(Configurable); ok {
cfg.SetConfigNotThreadSafe(config)
}
if cfg, ok := c.ClosedToOpen.(Configurable); ok {
cfg.SetConfigNotThreadSafe(config)
}
c.CmdMetricCollector = append(
make([]RunMetrics, 0, len(config.Metrics.Run)+2),
c.OpenToClose,
c.ClosedToOpen)
c.CmdMetricCollector = append(c.CmdMetricCollector, config.Metrics.Run...)
c.FallbackMetricCollector = append(
make([]FallbackMetrics, 0, len(config.Metrics.Fallback)+2),
config.Metrics.Fallback...)
c.CircuitMetricsCollector = append(
make([]Metrics, 0, len(config.Metrics.Circuit)+2),
c.OpenToClose,
c.ClosedToOpen)
c.CircuitMetricsCollector = append(c.CircuitMetricsCollector, config.Metrics.Circuit...)
c.SetConfigThreadSafe(config)
}
func (c *Circuit) now() time.Time {
return c.timeNow()
}
// Var exports that help diagnose the circuit
func (c *Circuit) Var() expvar.Var {
return expvar.Func(func() interface{} {
if c == nil {
return nil
}
ret := map[string]interface{}{
"config": c.Config(),
"is_open": c.IsOpen(),
"name": c.Name(),
"run_metrics": expvarToVal(c.CmdMetricCollector.Var()),
"concurrent_commands": c.ConcurrentCommands(),
"concurrent_fallbacks": c.ConcurrentFallbacks(),
"closer": c.OpenToClose,
"opener": c.ClosedToOpen,
"fallback_metrics": expvarToVal(c.FallbackMetricCollector.Var()),
}
return ret
})
}
// Name of this circuit
func (c *Circuit) Name() string {
if c == nil {
return ""
}
return c.name
}
// IsOpen returns true if the circuit should be considered 'open' (ie not allowing runFunc calls)
func (c *Circuit) IsOpen() bool {
if c == nil {
return false
}
if c.threadSafeConfig.CircuitBreaker.ForceOpen.Get() {
return true
}
if c.threadSafeConfig.CircuitBreaker.ForcedClosed.Get() {
return false
}
return c.isOpen.Get()
}
// CloseCircuit closes an open circuit. Usually because we think it's healthy again. Be aware, if the circuit isn't actually
// healthy, it will just open back up again.
func (c *Circuit) CloseCircuit(ctx context.Context) {
c.close(ctx, c.now(), true)
}
// OpenCircuit will open a closed circuit. The circuit will then try to repair itself
func (c *Circuit) OpenCircuit(ctx context.Context) {
c.openCircuit(ctx, time.Now())
}
// OpenCircuit opens a circuit, without checking error thresholds or request volume thresholds. The circuit will, after
// some delay, try to close again.
func (c *Circuit) openCircuit(ctx context.Context, now time.Time) {
if c.threadSafeConfig.CircuitBreaker.ForcedClosed.Get() {
// Don't open circuits that are forced closed
return
}
if c.IsOpen() {
// Don't bother opening a circuit that is already open
return
}
c.CircuitMetricsCollector.Opened(ctx, now)
c.isOpen.Set(true)
}
// Go executes `Execute`, but uses spawned goroutines to end early if the context is canceled. Use this if you don't trust
// the runFunc to end correctly if context fails. This is a design mirroed in the go-hystrix library, but be warned it
// is very dangerous and could leave orphaned goroutines hanging around forever doing who knows what.
func (c *Circuit) Go(ctx context.Context, runFunc func(context.Context) error, fallbackFunc func(context.Context, error) error) error {
if c == nil {
var wrapper goroutineWrapper
return c.Execute(ctx, wrapper.run(runFunc), wrapper.fallback(fallbackFunc))
}
return c.Execute(ctx, c.goroutineWrapper.run(runFunc), c.goroutineWrapper.fallback(fallbackFunc))
}
// Run will execute the circuit without a fallback. It is the equivalent of calling Execute with a nil fallback function
func (c *Circuit) Run(ctx context.Context, runFunc func(context.Context) error) error {
return c.Execute(ctx, runFunc, nil)
}
// Execute the circuit. Prefer this over Go. Similar to http://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCommand.html#execute--
// The returned error will either be the result of runFunc, the result of fallbackFunc, or an internal library error.
// Internal library errors will match the interface Error and you can use type casting to check this.
func (c *Circuit) Execute(ctx context.Context, runFunc func(context.Context) error, fallbackFunc func(context.Context, error) error) error {
if c.isEmptyOrNil() || c.threadSafeConfig.CircuitBreaker.Disabled.Get() {
return runFunc(ctx)
}
// Try to run the command in the context of the circuit
err := c.run(ctx, runFunc)
if err == nil {
return nil
}
// A bad request should not trigger fallback logic. The user just gave bad input.
// The list of conditions that trigger fallbacks is documented at
// https://github.com/Netflix/Hystrix/wiki/Metrics-and-Monitoring#command-execution-event-types-comnetflixhystrixhystrixeventtype
if IsBadRequest(err) {
return err
}
return c.fallback(ctx, err, fallbackFunc)
}
// --------- only private functions below here
func (c *Circuit) throttleConcurrentCommands(currentCommandCount int64) error {
if c.threadSafeConfig.Execution.MaxConcurrentRequests.Get() >= 0 && currentCommandCount > c.threadSafeConfig.Execution.MaxConcurrentRequests.Get() {
return errThrottledConcurrentCommands
}
return nil
}
// isEmptyOrNil returns true if the circuit is nil or if the circuit was created from an empty circuit. The empty
// circuit setup is mostly a guess (checking OpenToClose). This allows us to give circuits reasonable behavior
// in the nil/empty case.
func (c *Circuit) isEmptyOrNil() bool {
return c == nil || c.OpenToClose == nil
}
// run is the equivalent of Java Manager's http://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCommand.html#run()
func (c *Circuit) run(ctx context.Context, runFunc func(context.Context) error) (retErr error) {
if runFunc == nil {
return nil
}
var expectedDoneBy time.Time
startTime := c.now()
originalContext := ctx
if !c.allowNewRun(ctx, startTime) {
// Rather than make this inline, return a global reference (for memory optimization sake).
c.CmdMetricCollector.ErrShortCircuit(ctx, startTime)
return errCircuitOpen
}
if c.ClosedToOpen.Prevent(ctx, startTime) {
return errCircuitOpen
}
currentCommandCount := c.concurrentCommands.Add(1)
defer c.concurrentCommands.Add(-1)
if err := c.throttleConcurrentCommands(currentCommandCount); err != nil {
c.CmdMetricCollector.ErrConcurrencyLimitReject(ctx, startTime)
return err
}
// Set timeout on the command if we have one
if c.threadSafeConfig.Execution.ExecutionTimeout.Get() > 0 {
var timeoutCancel func()
expectedDoneBy = startTime.Add(c.threadSafeConfig.Execution.ExecutionTimeout.Duration())
ctx, timeoutCancel = context.WithDeadline(ctx, expectedDoneBy)
defer timeoutCancel()
}
ret := runFunc(ctx)
endTime := c.now()
totalCmdTime := endTime.Sub(startTime)
runFuncDoneTime := c.now()
// See bad request documentation at https://github.com/Netflix/Hystrix/wiki/How-To-Use#error-propagation
// This request had invalid input, but shouldn't be marked as an 'error' for the circuit
// From documentation
// -------
// The HystrixBadRequestException is intended for use cases such as reporting illegal arguments or non-system
// failures that should not count against the failure metrics and should not trigger fallback logic.
if c.checkErrBadRequest(ctx, ret, runFuncDoneTime, totalCmdTime) {
return ret
}
// Even if there is no error (or if there is an error), if the request took too long it is always an error for the
// circuit. Note that ret *MAY* actually be nil. In that case, we still want to return nil.
if c.checkErrTimeout(ctx, expectedDoneBy, runFuncDoneTime, totalCmdTime) {
// Note: ret could possibly be nil. We will still return nil, but the circuit will consider it a failure.
return ret
}
// The runFunc failed, but someone asked the original context to end. This probably isn't a failure of the
// circuit: someone just wanted `Execute` to end early, so don't track it as a failure.
if c.checkErrInterrupt(ctx, originalContext, ret, runFuncDoneTime, totalCmdTime) {
return ret
}
if c.checkErrFailure(ctx, ret, runFuncDoneTime, totalCmdTime) {
return ret
}
// The circuit works. Close it!
// Note: Execute this *after* you check for timeouts so we can still track circuit time outs that happen to also return a
// valid value later.
c.checkSuccess(ctx, runFuncDoneTime, totalCmdTime)
return nil
}
func (c *Circuit) checkSuccess(ctx context.Context, runFuncDoneTime time.Time, totalCmdTime time.Duration) {
c.CmdMetricCollector.Success(ctx, runFuncDoneTime, totalCmdTime)
if c.IsOpen() {
c.close(ctx, runFuncDoneTime, false)
}
}
// checkErrInterrupt returns true if this is considered an interrupt error: interrupt errors do not open the circuit.
// Normally if the parent context is canceled before a timeout is reached, we don't consider the circuit
// unhealthy. But when ExecutionConfig.IgnoreInterrupts set to true we try to classify originalContext.Err()
// with help of ExecutionConfig.IsErrInterrupt function. When this function returns true we do not open the circuit
func (c *Circuit) checkErrInterrupt(ctx context.Context, originalContext context.Context, ret error, runFuncDoneTime time.Time, totalCmdTime time.Duration) bool {
// We need to see an error in both the original context and the return value to consider this an "interrupt" caused
// error.
if ret == nil || originalContext.Err() == nil {
return false
}
isErrInterrupt := c.notThreadSafeConfig.Execution.IsErrInterrupt
if isErrInterrupt == nil {
isErrInterrupt = func(_ error) bool {
// By default, we consider any error from the original context an interrupt causing error
return true
}
}
if !c.threadSafeConfig.GoSpecific.IgnoreInterrupts.Get() && isErrInterrupt(originalContext.Err()) {
c.CmdMetricCollector.ErrInterrupt(ctx, runFuncDoneTime, totalCmdTime)
return true
}
return false
}
func (c *Circuit) checkErrBadRequest(ctx context.Context, ret error, runFuncDoneTime time.Time, totalCmdTime time.Duration) bool {
if IsBadRequest(ret) {
c.CmdMetricCollector.ErrBadRequest(ctx, runFuncDoneTime, totalCmdTime)
return true
}
return false
}
func (c *Circuit) checkErrFailure(ctx context.Context, ret error, runFuncDoneTime time.Time, totalCmdTime time.Duration) bool {
if ret != nil {
c.CmdMetricCollector.ErrFailure(ctx, runFuncDoneTime, totalCmdTime)
if !c.IsOpen() {
c.attemptToOpen(ctx, runFuncDoneTime)
}
return true
}
return false
}
func (c *Circuit) checkErrTimeout(ctx context.Context, expectedDoneBy time.Time, runFuncDoneTime time.Time, totalCmdTime time.Duration) bool {
// I don't use the deadline from the context because it could be a smaller timeout from the parent context
if !expectedDoneBy.IsZero() && expectedDoneBy.Before(runFuncDoneTime) {
c.CmdMetricCollector.ErrTimeout(ctx, runFuncDoneTime, totalCmdTime)
if !c.IsOpen() {
c.attemptToOpen(ctx, runFuncDoneTime)
}
return true
}
return false
}
// Does fallback logic. Equivalent of
// http://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCommand.html#getFallback
func (c *Circuit) fallback(ctx context.Context, err error, fallbackFunc func(context.Context, error) error) error {
// Use the fallback command if available
if fallbackFunc == nil || c.threadSafeConfig.Fallback.Disabled.Get() {
return err
}
// Throttle concurrent fallback calls
currentFallbackCount := c.concurrentFallbacks.Add(1)
defer c.concurrentFallbacks.Add(-1)
if c.threadSafeConfig.Fallback.MaxConcurrentRequests.Get() >= 0 && currentFallbackCount > c.threadSafeConfig.Fallback.MaxConcurrentRequests.Get() {
c.FallbackMetricCollector.ErrConcurrencyLimitReject(ctx, c.now())
return &circuitError{concurrencyLimitReached: true, msg: "throttling concurrency to fallbacks"}
}
startTime := c.now()
retErr := fallbackFunc(ctx, err)
totalCmdTime := c.now().Sub(startTime)
if retErr != nil {
c.FallbackMetricCollector.ErrFailure(ctx, startTime, totalCmdTime)
return retErr
}
c.FallbackMetricCollector.Success(ctx, startTime, totalCmdTime)
return nil
}
// allowNewRun checks if the circuit is allowing new run commands. This happens if the circuit is closed, or
// if it is open, but we want to explore to see if we should close it again.
func (c *Circuit) allowNewRun(ctx context.Context, now time.Time) bool {
if !c.IsOpen() {
return true
}
if c.OpenToClose.Allow(ctx, now) {
return true
}
return false
}
// close closes an open circuit. Usually because we think it's healthy again.
func (c *Circuit) close(ctx context.Context, now time.Time, forceClosed bool) {
if !c.IsOpen() {
// Not open. Don't need to close it
return
}
if c.threadSafeConfig.CircuitBreaker.ForceOpen.Get() {
return
}
if forceClosed || c.OpenToClose.ShouldClose(ctx, now) {
c.CircuitMetricsCollector.Closed(ctx, now)
c.isOpen.Set(false)
}
}
// attemptToOpen tries to open an unhealthy circuit. Usually because we think run is having problems, and we want
// to give run a rest for a bit.
//
// It is called "attemptToOpen" because the circuit may not actually open (for example if there aren't enough requests)
func (c *Circuit) attemptToOpen(ctx context.Context, now time.Time) {
if c.threadSafeConfig.CircuitBreaker.ForcedClosed.Get() {
// Don't open circuits that are forced closed
return
}
if c.IsOpen() {
// Don't bother opening a circuit that is already open
// This check isn't needed (it is also checked inside OpenCircuit below), but is an optimization to avoid
// the below logic when the circuit is in a bad state and would otherwise try to close itself repeatedly.
return
}
if c.ClosedToOpen.ShouldOpen(ctx, now) {
c.openCircuit(ctx, now)
}
}