diff --git a/README.md b/README.md index bdccfcc7..6783b0bb 100644 --- a/README.md +++ b/README.md @@ -231,3 +231,11 @@ you create the breaker. E.g. // Force opossum to use native JS promises const breaker = circuitBreaker(readFile, { Promise: Promise }); ``` + +### Hystrix Metrics + +A Hystrix Stream is available for use with a Hystrix Dashboard using the `circuitBreaker.hystrixStats.getHystrixStream` method. + +This method returns a [Node.js Stream](https://nodejs.org/api/stream.html), which makes it easy to create an SSE stream that will be compliant with a Hystrix Dashboard. + +Additional Reading: [Hystrix Metrics Event Stream](https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-metrics-event-stream), [Turbine](https://github.com/Netflix/Turbine/wiki), [Hystrix Dashboard](https://github.com/Netflix/Hystrix/wiki/Dashboard) diff --git a/lib/circuit.js b/lib/circuit.js index 114c099f..4f5455b2 100644 --- a/lib/circuit.js +++ b/lib/circuit.js @@ -2,6 +2,7 @@ const EventEmitter = require('events'); const Status = require('./status'); +const HystrixStats = require('./hystrix-stats'); const STATE = Symbol('state'); const OPEN = Symbol('open'); @@ -12,6 +13,7 @@ const FALLBACK_FUNCTION = Symbol('fallback'); const STATUS = Symbol('status'); const NAME = Symbol('name'); const GROUP = Symbol('group'); +const HYSTRIX_STATS = Symbol('hystrix-stats'); const CACHE = new WeakMap(); /** @@ -95,6 +97,9 @@ class CircuitBreaker extends EventEmitter { if (this.options.cache) { CACHE.set(this, undefined); } + + // Register this instance of the circuit breaker with the hystrix stats listener + this[HYSTRIX_STATS] = new HystrixStats(this); } /** @@ -179,6 +184,12 @@ class CircuitBreaker extends EventEmitter { return this[STATUS].stats; } + /** + A convenience function that returns the hystrixStats + */ + get hystrixStats () { + return this[HYSTRIX_STATS]; + } /** * Provide a fallback function for this {@link CircuitBreaker}. This * function will be executed when the circuit is `fire`d and fails. diff --git a/lib/hystrix-formatter.js b/lib/hystrix-formatter.js new file mode 100644 index 00000000..a1a3127d --- /dev/null +++ b/lib/hystrix-formatter.js @@ -0,0 +1,69 @@ +'use strict'; + +// A function to map our stats data to the hystrix format +// returns JSON +function hystrixFormatter (stats) { + const json = {}; + json.type = 'HystrixCommand'; + json.name = stats.name; + json.group = stats.group; + json.currentTime = new Date(); + json.isCircuitBreakerOpen = !stats.closed; + json.errorPercentage = stats.fires === 0 ? 0 : (stats.failures / stats.fires) * 100; + json.errorCount = stats.failures; + json.requestCount = stats.fires; + json.rollingCountBadRequests = stats.failures; + json.rollingCountCollapsedRequests = 0; + json.rollingCountEmit = stats.fires; + json.rollingCountExceptionsThrown = 0; + json.rollingCountFailure = stats.failures; + json.rollingCountFallbackEmit = stats.fallbacks; + json.rollingCountFallbackFailure = 0; + json.rollingCountFallbackMissing = 0; + json.rollingCountFallbackRejection = 0; + json.rollingCountFallbackSuccess = 0; + json.rollingCountResponsesFromCache = stats.cacheHits; + json.rollingCountSemaphoreRejected = stats.rejects; + json.rollingCountShortCircuited = stats.rejects; + json.rollingCountSuccess = stats.successes; + json.rollingCountThreadPoolRejected = 0; + json.rollingCountTimeout = stats.timeouts; + json.currentConcurrentExecutionCount = 0; + json.rollingMaxConcurrentExecutionCount = 0; + // TODO: caluclate these latency values + json.latencyExecute_mean = 0; + json.latencyExecute = { + '0': 0, + '25': 0, + '50': 0, + '75': 0, + '90': 0, + '95': 0, + '99': 0, + '99.5': 0, + '100': 0 + }; + json.latencyTotal_mean = 0; + json.latencyTotal = { '0': 0, '25': 0, '50': 0, '75': 0, '90': 0, '95': 0, '99': 0, '99.5': 0, '100': 0 }; + json.propertyValue_circuitBreakerRequestVolumeThreshold = 5; + json.propertyValue_circuitBreakerSleepWindowInMilliseconds = stats.options.resetTimeout; + json.propertyValue_circuitBreakerErrorThresholdPercentage = stats.options.errorThresholdPercentage; + json.propertyValue_circuitBreakerForceOpen = false; + json.propertyValue_circuitBreakerForceClosed = false; + json.propertyValue_circuitBreakerEnabled = true; // Whether circuit breaker should be enabled. + json.propertyValue_executionIsolationStrategy = 'THREAD'; + json.propertyValue_executionIsolationThreadTimeoutInMilliseconds = 300; + json.propertyValue_executionTimeoutInMilliseconds = stats.options.timeout; + json.propertyValue_executionIsolationThreadInterruptOnTimeout = true; + json.propertyValue_executionIsolationThreadPoolKeyOverride = null; + json.propertyValue_executionIsolationSemaphoreMaxConcurrentRequests = 10; + json.propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests = 10; + json.propertyValue_metricsRollingStatisticalWindowInMilliseconds = 10000; + json.propertyValue_requestCacheEnabled = stats.options.cache || false; + json.propertyValue_requestLogEnabled = true; + json.reportingHosts = 1; + + return json; +} + +module.exports = exports = hystrixFormatter; diff --git a/lib/hystrix-stats.js b/lib/hystrix-stats.js new file mode 100644 index 00000000..ceebb9ae --- /dev/null +++ b/lib/hystrix-stats.js @@ -0,0 +1,72 @@ +'use strict'; + +const stream = require('stream'); +const hystrixFormatter = require('./hystrix-formatter'); + +/** + * @class + *

+ * Stream Hystrix Metrics for a given {@link CircuitBreaker}. + * A HystrixStats instance is created for every {@link CircuitBreaker} + * and does not typically need to be created by a user. + *

+ *

+ * A HystrixStats instance will listen for all events on the {@link CircuitBreaker.status.snapshot} + * and format the data to the proper Hystrix format. Making it easy to construct an Event Stream for a client + *

+ * + * @example + * const circuit = circuitBreaker(fs.readFile, {}); + * + * circuit.hystrixStats.getHystrixStream().pipe(response); + * @see CircuitBreaker#hystrixStats + */ +class HystrixStats { + constructor (circuit) { + this._circuit = circuit; + + // Listen for the stats's snapshot event + this._circuit.status.on('snapshot', this._hystrixSnapshotListener.bind(this)); + + this._readableStream = new stream.Readable({ + objectMode: true + }); + + // Need a _read() function to satisfy the protocol + this._readableStream._read = () => {}; + this._readableStream.resume(); + + this._hystrixStream = new stream.Transform({ + objectMode: true + }); + + // Need a _transform() function to satisfy the protocol + this._hystrixStream._transform = this._hystrixTransformer; + this._hystrixStream.resume(); + + this._readableStream.pipe(this._hystrixStream); + } + + // The stats coming in should be already "Reduced" + _hystrixTransformer (stats, encoding, cb) { + const formattedStats = hystrixFormatter(stats); + + // Need to take the stats and map them to the hystrix format + return cb(null, `data: ${JSON.stringify(formattedStats)}\n\n`); + } + + /** + A convenience function that returns the hystrxStream + */ + getHystrixStream () { + return this._hystrixStream; + } + + // This will take the stats data from the listener and push it on the stream to be transformed + _hystrixSnapshotListener (stats) { + const circuit = this._circuit; + this._readableStream.push(Object.assign({}, {name: circuit.name, closed: circuit.closed, group: circuit.group, options: circuit.options}, stats)); + } +} + +module.exports = exports = HystrixStats; diff --git a/test/test.js b/test/test.js index 36116553..f124d69d 100644 --- a/test/test.js +++ b/test/test.js @@ -25,6 +25,7 @@ test('api', (t) => { t.ok(breaker.closed, 'CircuitBreaker.closed'); t.ok(breaker.status, 'CircuitBreaker.status'); t.ok(breaker.options, 'CircuitBreaker.options'); + t.ok(breaker.hystrixStats, 'CircuitBreaker.hystrixStats'); t.equals(breaker.action, passFail, 'CircuitBreaker.action'); t.end(); });