Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: coalescing calls + feature: max cache size #877

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ const stats = breaker.stats;
timeouts: 0,
cacheHits: 0,
cacheMisses: 0,
coalesceCacheHits: 0,
coalesceCacheMisses: 0,
semaphoreRejections: 0,
percentiles: {
'0': 0,
Expand Down Expand Up @@ -417,6 +419,9 @@ The code that is summing the stats samples is here:
}, bucket());
```

### Coalesce calls

Circuitbreaker offers coalescing your calls. If options.coalesce is set, multiple calls to the circuitbreaker will be handled as one, within the given timeframe (options.coalesceTTL). Performance will improve when rapidly firing the circuitbreaker with the same request, especially on a slower action. This is especially useful if multiple events can trigger the same functions at the same time. Of course, caching has the same function, but will only be effective when the call has been executed once to store the return value. Coalescing and cache can be used at the same time, coalescing calls will always use the internal cache.

### Typings

Expand Down
8 changes: 7 additions & 1 deletion lib/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
* @property {Map} cache Cache map
*/
class MemoryCache {
constructor () {
constructor (maxEntries) {
this.cache = new Map();
this.maxEntries = maxEntries ?? 2 ^ 24 - 1; // Max size for Map is 2 ^ 24.
}

/**
Expand All @@ -32,6 +33,11 @@ class MemoryCache {
* @return {void}
*/
set (key, value, ttl) {
// Evict oldest entry when at capacity.
if (this.cache.size === this.maxEntries) {
this.cache.delete(this.cache.keys().next().value);
}

this.cache.set(key, {
expiresAt: ttl,
value
Expand Down
77 changes: 70 additions & 7 deletions lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,29 @@ Please use options.errorThresholdPercentage`;
* `cacheMiss` reflect cache activity.) Default: false
* @param {Number} options.cacheTTL the time to live for the cache
* in milliseconds. Set 0 for infinity cache. Default: 0 (no TTL)
* @param {Number} options.cacheSize the max amount of entries in the internal
* cache. Only used when cacheTransport is not defined.
* Default: max size of JS map (2^24).
* @param {Function} options.cacheGetKey function that returns the key to use
* when caching the result of the circuit's fire.
* Better to use custom one, because `JSON.stringify` is not good
* from performance perspective.
* Default: `(...args) => JSON.stringify(args)`
* @param {CacheTransport} options.cacheTransport custom cache transport
* should implement `get`, `set` and `flush` methods.
* @param {boolean} options.coalesce If true, this provides coalescing of
* requests to this breaker, in other words: the promise will be cached.
* Only one action (with same cache key) is executed at a time, and the other
* pending actions wait for the result. Performance will improve when rapidly
* firing the circuitbreaker with the same request, especially on a slower
* action (e.g. multiple end-users fetching same data from remote).
* Will use internal cache only. Can be used in combination with options.cache.
* The metrics `coalesceCacheHit` and `coalesceCacheMiss` are available.
* Default: false
* @param {Number} options.coalesceTTL the time to live for the coalescing
* in milliseconds. Set 0 for infinity cache. Default: same as options.timeout
* @param {Number} options.coalesceSize the max amount of entries in the
* coalescing cache. Default: max size of JS map (2^24).
* @param {AbortController} options.abortController this allows Opossum to
* signal upon timeout and properly abort your on going requests instead of
* leaving it in the background
Expand All @@ -115,6 +131,8 @@ Please use options.errorThresholdPercentage`;
* @fires CircuitBreaker#fire
* @fires CircuitBreaker#cacheHit
* @fires CircuitBreaker#cacheMiss
* @fires CircuitBreaker#coalesceCacheHit
* @fires CircuitBreaker#coalesceCacheMiss
* @fires CircuitBreaker#reject
* @fires CircuitBreaker#timeout
* @fires CircuitBreaker#success
Expand Down Expand Up @@ -168,11 +186,13 @@ class CircuitBreaker extends EventEmitter {
((...args) => JSON.stringify(args));
this.options.enableSnapshots = options.enableSnapshots !== false;
this.options.rotateBucketController = options.rotateBucketController;
this.options.coalesce = !!options.coalesce;
this.options.coalesceTTL = options.coalesceTTL ?? this.options.timeout;

// Set default cache transport if not provided
if (this.options.cache) {
if (this.options.cacheTransport === undefined) {
this.options.cacheTransport = new MemoryCache();
this.options.cacheTransport = new MemoryCache(options.cacheSize);
} else if (typeof this.options.cacheTransport !== 'object' ||
!this.options.cacheTransport.get ||
!this.options.cacheTransport.set ||
Expand All @@ -184,6 +204,10 @@ class CircuitBreaker extends EventEmitter {
}
}

if (this.options.coalesce) {
this.options.coalesceCache = new MemoryCache(options.coalesceSize);
}

this.semaphore = new Semaphore(this.options.capacity);

// check if action is defined
Expand Down Expand Up @@ -265,6 +289,8 @@ class CircuitBreaker extends EventEmitter {
this.on('reject', increment('rejects'));
this.on('cacheHit', increment('cacheHits'));
this.on('cacheMiss', increment('cacheMisses'));
this.on('coalesceCacheHit', increment('coalesceCacheHits'));
this.on('coalesceCacheMiss', increment('coalesceCacheMisses'));
this.on('open', _ => this[STATUS].open());
this.on('close', _ => this[STATUS].close());
this.on('semaphoreLocked', increment('semaphoreRejections'));
Expand Down Expand Up @@ -593,6 +619,14 @@ class CircuitBreaker extends EventEmitter {

const args = rest.slice();

// Protection, caches and coalesce disabled.
if (!this[ENABLED]) {
const result = this.action.apply(context, args);
return (typeof result.then === 'function')
? result
: Promise.resolve(result);
}

// Need to create variable here to prevent extra calls if cache is disabled
let cacheKey = '';

Expand Down Expand Up @@ -624,11 +658,26 @@ class CircuitBreaker extends EventEmitter {
this.emit('cacheMiss');
}

if (!this[ENABLED]) {
const result = this.action.apply(context, args);
return (typeof result.then === 'function')
? result
: Promise.resolve(result);
/* When coalesce is enabled, check coalesce cache and return
promise, if any. */
if (this.options.coalesce) {
const cachedCall = this.options.coalesceCache.get(cacheKey);

if (cachedCall) {
/**
* Emitted when the circuit breaker is using coalesce cache
* and finds a cached promise.
* @event CircuitBreaker#coalesceCacheHit
*/
this.emit('coalesceCacheHit');
return cachedCall;
}
/**
* Emitted when the circuit breaker does not find a value in
* coalesce cache, but the coalesce option is enabled.
* @event CircuitBreaker#coalesceCacheMiss
*/
this.emit('coalesceCacheMiss');
}

if (!this.closed && !this.pendingClose) {
Expand All @@ -648,7 +697,8 @@ class CircuitBreaker extends EventEmitter {

let timeout;
let timeoutError = false;
return new Promise((resolve, reject) => {

const call = new Promise((resolve, reject) => {
const latencyStartTime = Date.now();
if (this.semaphore.test()) {
if (this.options.timeout) {
Expand Down Expand Up @@ -728,6 +778,19 @@ class CircuitBreaker extends EventEmitter {
handleError(err, this, timeout, args, latency, resolve, reject);
}
});

/* When coalesce is enabled, store promise in coalesceCache */
if (this.options.coalesce) {
this.options.coalesceCache.set(
cacheKey,
call,
this.options.coalesceTTL > 0
? Date.now() + this.options.coalesceTTL
: 0
);
}

return call;
}

/**
Expand Down
2 changes: 2 additions & 0 deletions lib/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ const bucket = _ => ({
timeouts: 0,
cacheHits: 0,
cacheMisses: 0,
coalesceCacheHits: 0,
coalesceCacheMisses: 0,
semaphoreRejections: 0,
percentiles: {},
latencyTimes: []
Expand Down
Loading
Loading