From 6aebd156fb83354dd8cbeed567c60b23439e5bd1 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Fri, 23 Jan 2015 21:13:05 -0800 Subject: [PATCH 1/5] Added metrics to HystrixCollapser --- .../com/netflix/hystrix/HystrixCollapser.java | 46 +- .../hystrix/HystrixCollapserMetrics.java | 173 ++++++++ .../hystrix/HystrixCollapserProperties.java | 135 ++++++ .../collapser/RequestCollapserFactory.java | 12 +- .../metrics/HystrixMetricsPublisher.java | 25 ++ .../HystrixMetricsPublisherCollapser.java | 36 ++ ...strixMetricsPublisherCollapserDefault.java | 40 ++ .../util/HystrixRollingNumberEvent.java | 3 +- .../netflix/hystrix/HystrixCollapserTest.java | 403 ++++++++++-------- 9 files changed, 685 insertions(+), 188 deletions(-) create mode 100644 hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapserMetrics.java create mode 100644 hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherCollapser.java create mode 100644 hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherCollapserDefault.java diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java index 67904feb3..5a379708a 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java @@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +67,7 @@ public abstract class HystrixCollapser collapserFactory; private final HystrixRequestCache requestCache; private final HystrixCollapserBridge collapserInstanceWrapper; + private final HystrixCollapserMetrics metrics; /** * The scope of request collapsing. @@ -108,18 +110,29 @@ protected HystrixCollapser(HystrixCollapserKey collapserKey) { * Fluent interface for constructor arguments */ protected HystrixCollapser(Setter setter) { - this(setter.collapserKey, setter.scope, new RealCollapserTimer(), setter.propertiesSetter); + this(setter.collapserKey, setter.scope, new RealCollapserTimer(), setter.propertiesSetter, null); } /* package for tests */ HystrixCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder) { + this(collapserKey, scope, timer, propertiesBuilder, null); + } + + /* package for tests */ HystrixCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) { if (collapserKey == null || collapserKey.name().trim().equals("")) { - String defaultKeyName = getDefaultNameFromClass(getClass()); + String defaultKeyName = getDefaultNameFromClass (getClass()); collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName); } - this.collapserFactory = new RequestCollapserFactory(collapserKey, scope, timer, propertiesBuilder); + HystrixCollapserProperties properties = HystrixPropertiesFactory.getCollapserProperties(collapserKey, propertiesBuilder); + this.collapserFactory = new RequestCollapserFactory(collapserKey, scope, timer, properties); this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy()); + if (metrics == null) { + this.metrics = HystrixCollapserMetrics.getInstance(collapserKey, properties); + } else { + this.metrics = metrics; + } + final HystrixCollapser self = this; /** @@ -129,15 +142,17 @@ protected HystrixCollapser(Setter setter) { @Override public Collection>> shardRequests(Collection> requests) { - return self.shardRequests(requests); + Collection>> shards = self.shardRequests(requests); + self.metrics.markShards(shards.size()); + return shards; } @Override public Observable createObservableCommand(Collection> requests) { HystrixCommand command = self.createCommand(requests); - // mark the number of requests being collapsed together command.markAsCollapsedCommand(requests.size()); + self.metrics.markBatch(requests.size()); return command.toObservable(); } @@ -162,6 +177,8 @@ public HystrixCollapserKey getCollapserKey() { } }; + + } private HystrixCollapserProperties getProperties() { @@ -196,6 +213,14 @@ public Scope getScope() { return Scope.valueOf(collapserFactory.getScope().name()); } + /** + * Return the {@link HystrixCollapserMetrics} for this collapser + * @return {@link HystrixCollapserMetrics} for this collapser + */ + public HystrixCollapserMetrics getMetrics() { + return metrics; + } + /** * The request arguments to be passed to the {@link HystrixCommand}. *

@@ -298,7 +323,7 @@ protected CollectionCallback Scheduling *

*

    - *
  • When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.
  • + *
  • When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#computation()} for callbacks.
  • *
  • When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.
  • *
* Use {@link #toObservable(rx.Scheduler)} to schedule the callback differently. @@ -323,7 +348,7 @@ public Observable observe() { * Callback Scheduling *

*

    - *
  • When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.
  • + *
  • When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#computation()} for callbacks.
  • *
  • When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.
  • *
*

@@ -354,17 +379,14 @@ public Observable toObservable(Scheduler observeOn) { if (getProperties().requestCachingEnabled().get()) { Observable fromCache = requestCache.get(getCacheKey()); if (fromCache != null) { - /* mark that we received this response from cache */ - // TODO Add collapser metrics so we can capture this information - // we can't add it to the command metrics because the command can change each time (dynamic key for example) - // and we don't have access to it when responding from cache - // collapserMetrics.markResponseFromCache(); + metrics.markResponseFromCache(); return fromCache; } } RequestCollapser requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper); Observable response = requestCollapser.submitRequest(getRequestArgument()); + metrics.markRequestBatched(); if (getProperties().requestCachingEnabled().get()) { /* * A race can occur here with multiple threads queuing but only one will be cached. diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapserMetrics.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapserMetrics.java new file mode 100644 index 000000000..31bbf4980 --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapserMetrics.java @@ -0,0 +1,173 @@ +/** + * Copyright 2012 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; + +import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier; +import com.netflix.hystrix.util.HystrixRollingNumber; +import com.netflix.hystrix.util.HystrixRollingNumberEvent; +import com.netflix.hystrix.util.HystrixRollingPercentile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used by {@link HystrixCollapser} to record metrics. + * {@link HystrixEventNotifier} not hooked up yet. It may be in the future. + */ +public class HystrixCollapserMetrics { + + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(HystrixCollapserMetrics.class); + + // String is HystrixCollapserKey.name() (we can't use HystrixCollapserKey directly as we can't guarantee it implements hashcode/equals correctly) + private static final ConcurrentHashMap metrics = new ConcurrentHashMap(); + + /** + * Get or create the {@link HystrixCollapserMetrics} instance for a given {@link HystrixCollapserKey}. + *

+ * This is thread-safe and ensures only 1 {@link HystrixCollapserMetrics} per {@link HystrixCollapserKey}. + * + * @param key + * {@link HystrixCollapserKey} of {@link HystrixCollapser} instance requesting the {@link HystrixCollapserMetrics} + * @return {@link HystrixCollapserMetrics} + */ + public static HystrixCollapserMetrics getInstance(HystrixCollapserKey key, HystrixCollapserProperties properties) { + // attempt to retrieve from cache first + HystrixCollapserMetrics collapserMetrics = metrics.get(key.name()); + if (collapserMetrics != null) { + return collapserMetrics; + } + // it doesn't exist so we need to create it + collapserMetrics = new HystrixCollapserMetrics(key, properties); + // attempt to store it (race other threads) + HystrixCollapserMetrics existing = metrics.putIfAbsent(key.name(), collapserMetrics); + if (existing == null) { + // we won the thread-race to store the instance we created + return collapserMetrics; + } else { + // we lost so return 'existing' and let the one we created be garbage collected + return existing; + } + } + + /** + * All registered instances of {@link HystrixCollapserMetrics} + * + * @return {@code Collection} + */ + public static Collection getInstances() { + return Collections.unmodifiableCollection(metrics.values()); + } + + /** + * Clears all state from metrics. If new requests come in instances will be recreated and metrics started from scratch. + */ + /* package */ static void reset() { + metrics.clear(); + } + + private final HystrixCollapserKey key; + private final HystrixRollingNumber counter; + private final HystrixRollingPercentile percentileBatchSize; + private final HystrixRollingPercentile percentileShardSize; + + /* package */HystrixCollapserMetrics(HystrixCollapserKey key, HystrixCollapserProperties properties) { + this.key = key; + + this.counter = new HystrixRollingNumber(properties.metricsRollingStatisticalWindowInMilliseconds(), properties.metricsRollingStatisticalWindowBuckets()); + this.percentileBatchSize = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileBucketSize(), properties.metricsRollingPercentileEnabled()); + this.percentileShardSize = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileBucketSize(), properties.metricsRollingPercentileEnabled()); + } + + /** + * {@link HystrixCollapserKey} these metrics represent. + * + * @return HystrixCollapserKey + */ + public HystrixCollapserKey getCollapserKey() { + return key; + } + + public Number getRollingCountRequestsBatched() { + return counter.getRollingSum(HystrixRollingNumberEvent.COLLAPSER_REQUEST_BATCHED); + } + + public Number getCumulativeCountRequestsBatched() { + return counter.getCumulativeSum(HystrixRollingNumberEvent.COLLAPSER_REQUEST_BATCHED); + } + + public Number getRollingCountResponsesFromCache() { + return counter.getRollingSum(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE); + } + + public Number getCumulativeCountResponsesFromCache() { + return counter.getCumulativeSum(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE); + } + + public Number getRollingCountBatches() { + return counter.getRollingSum(HystrixRollingNumberEvent.COLLAPSER_BATCH); + } + + public Number getCumulativeCountBatches() { + return counter.getCumulativeSum(HystrixRollingNumberEvent.COLLAPSER_BATCH); + } + + /** + * Retrieve the batch size for the {@link HystrixCollapser} being invoked at a given percentile. + *

+ * Percentile capture and calculation is configured via {@link HystrixCollapserProperties#metricsRollingStatisticalWindowInMilliseconds()} and other related properties. + * + * @param percentile + * Percentile such as 50, 99, or 99.5. + * @return batch size + */ + public int getBatchSizePercentile(double percentile) { + return percentileBatchSize.getPercentile(percentile); + } + + /** + * Retrieve the shard size for the {@link HystrixCollapser} being invoked at a given percentile. + *

+ * Percentile capture and calculation is configured via {@link HystrixCollapserProperties#metricsRollingStatisticalWindowInMilliseconds()} and other related properties. + * + * @param percentile + * Percentile such as 50, 99, or 99.5. + * @return batch size + */ + public int getShardSizePercentile(double percentile) { + return percentileShardSize.getPercentile(percentile); + } + + public void markRequestBatched() { + counter.increment(HystrixRollingNumberEvent.COLLAPSER_REQUEST_BATCHED); + } + + public void markResponseFromCache() { + counter.increment(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE); + } + + public void markBatch(int batchSize) { + percentileBatchSize.addValue(batchSize); + counter.increment(HystrixRollingNumberEvent.COLLAPSER_BATCH); + } + + public void markShards(int numShards) { + percentileShardSize.addValue(numShards); + } +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapserProperties.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapserProperties.java index 567ab5b7c..ab79c6d8c 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapserProperties.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapserProperties.java @@ -20,6 +20,8 @@ import com.netflix.hystrix.strategy.properties.HystrixPropertiesChainedArchaiusProperty; import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy; import com.netflix.hystrix.strategy.properties.HystrixProperty; +import com.netflix.hystrix.util.HystrixRollingNumber; +import com.netflix.hystrix.util.HystrixRollingPercentile; /** * Properties for instances of {@link HystrixCollapser}. @@ -32,10 +34,22 @@ public abstract class HystrixCollapserProperties { private static final Integer default_maxRequestsInBatch = Integer.MAX_VALUE; private static final Integer default_timerDelayInMilliseconds = 10; private static final Boolean default_requestCacheEnabled = true; + /* package */ static final Integer default_metricsRollingStatisticalWindow = 10000;// default => statisticalWindow: 10000 = 10 seconds (and default of 10 buckets so each bucket is 1 second) + private static final Integer default_metricsRollingStatisticalWindowBuckets = 10;// default => statisticalWindowBuckets: 10 = 10 buckets in a 10 second window so each bucket is 1 second + private static final Boolean default_metricsRollingPercentileEnabled = true; + private static final Integer default_metricsRollingPercentileWindow = 60000; // default to 1 minute for RollingPercentile + private static final Integer default_metricsRollingPercentileWindowBuckets = 6; // default to 6 buckets (10 seconds each in 60 second window) + private static final Integer default_metricsRollingPercentileBucketSize = 100; // default to 100 values max per bucket private final HystrixProperty maxRequestsInBatch; private final HystrixProperty timerDelayInMilliseconds; private final HystrixProperty requestCacheEnabled; + private final HystrixProperty metricsRollingStatisticalWindowInMilliseconds; // milliseconds back that will be tracked + private final HystrixProperty metricsRollingStatisticalWindowBuckets; // number of buckets in the statisticalWindow + private final HystrixProperty metricsRollingPercentileEnabled; // Whether monitoring should be enabled + private final HystrixProperty metricsRollingPercentileWindowInMilliseconds; // number of milliseconds that will be tracked in RollingPercentile + private final HystrixProperty metricsRollingPercentileWindowBuckets; // number of buckets percentileWindow will be divided into + private final HystrixProperty metricsRollingPercentileBucketSize; // how many values will be stored in each percentileWindowBucket protected HystrixCollapserProperties(HystrixCollapserKey collapserKey) { this(collapserKey, new Setter(), "hystrix"); @@ -49,6 +63,12 @@ protected HystrixCollapserProperties(HystrixCollapserKey key, Setter builder, St this.maxRequestsInBatch = getProperty(propertyPrefix, key, "maxRequestsInBatch", builder.getMaxRequestsInBatch(), default_maxRequestsInBatch); this.timerDelayInMilliseconds = getProperty(propertyPrefix, key, "timerDelayInMilliseconds", builder.getTimerDelayInMilliseconds(), default_timerDelayInMilliseconds); this.requestCacheEnabled = getProperty(propertyPrefix, key, "requestCache.enabled", builder.getRequestCacheEnabled(), default_requestCacheEnabled); + this.metricsRollingStatisticalWindowInMilliseconds = getProperty(propertyPrefix, key, "metrics.rollingStats.timeInMilliseconds", builder.getMetricsRollingStatisticalWindowInMilliseconds(), default_metricsRollingStatisticalWindow); + this.metricsRollingStatisticalWindowBuckets = getProperty(propertyPrefix, key, "metrics.rollingStats.numBuckets", builder.getMetricsRollingStatisticalWindowBuckets(), default_metricsRollingStatisticalWindowBuckets); + this.metricsRollingPercentileEnabled = getProperty(propertyPrefix, key, "metrics.rollingPercentile.enabled", builder.getMetricsRollingPercentileEnabled(), default_metricsRollingPercentileEnabled); + this.metricsRollingPercentileWindowInMilliseconds = getProperty(propertyPrefix, key, "metrics.rollingPercentile.timeInMilliseconds", builder.getMetricsRollingPercentileWindowInMilliseconds(), default_metricsRollingPercentileWindow); + this.metricsRollingPercentileWindowBuckets = getProperty(propertyPrefix, key, "metrics.rollingPercentile.numBuckets", builder.getMetricsRollingPercentileWindowBuckets(), default_metricsRollingPercentileWindowBuckets); + this.metricsRollingPercentileBucketSize = getProperty(propertyPrefix, key, "metrics.rollingPercentile.bucketSize", builder.getMetricsRollingPercentileBucketSize(), default_metricsRollingPercentileBucketSize); } private static HystrixProperty getProperty(String propertyPrefix, HystrixCollapserKey key, String instanceProperty, Integer builderOverrideValue, Integer defaultValue) { @@ -97,6 +117,60 @@ public HystrixProperty timerDelayInMilliseconds() { return timerDelayInMilliseconds; } + /** + * Duration of statistical rolling window in milliseconds. This is passed into {@link HystrixRollingNumber} inside {@link HystrixCommandMetrics}. + * + * @return {@code HystrixProperty} + */ + public HystrixProperty metricsRollingStatisticalWindowInMilliseconds() { + return metricsRollingStatisticalWindowInMilliseconds; + } + + /** + * Number of buckets the rolling statistical window is broken into. This is passed into {@link HystrixRollingNumber} inside {@link HystrixCollapserMetrics}. + * + * @return {@code HystrixProperty} + */ + public HystrixProperty metricsRollingStatisticalWindowBuckets() { + return metricsRollingStatisticalWindowBuckets; + } + + /** + * Whether percentile metrics should be captured using {@link HystrixRollingPercentile} inside {@link HystrixCollapserMetrics}. + * + * @return {@code HystrixProperty} + */ + public HystrixProperty metricsRollingPercentileEnabled() { + return metricsRollingPercentileEnabled; + } + + /** + * Duration of percentile rolling window in milliseconds. This is passed into {@link HystrixRollingPercentile} inside {@link HystrixCollapserMetrics}. + * + * @return {@code HystrixProperty} + */ + public HystrixProperty metricsRollingPercentileWindowInMilliseconds() { + return metricsRollingPercentileWindowInMilliseconds; + } + + /** + * Number of buckets the rolling percentile window is broken into. This is passed into {@link HystrixRollingPercentile} inside {@link HystrixCollapserMetrics}. + * + * @return {@code HystrixProperty} + */ + public HystrixProperty metricsRollingPercentileWindowBuckets() { + return metricsRollingPercentileWindowBuckets; + } + + /** + * Maximum number of values stored in each bucket of the rolling percentile. This is passed into {@link HystrixRollingPercentile} inside {@link HystrixCollapserMetrics}. + * + * @return {@code HystrixProperty} + */ + public HystrixProperty metricsRollingPercentileBucketSize() { + return metricsRollingPercentileBucketSize; + } + /** * Factory method to retrieve the default Setter. */ @@ -124,6 +198,12 @@ public static class Setter { private Integer maxRequestsInBatch = null; private Integer timerDelayInMilliseconds = null; private Boolean requestCacheEnabled = null; + private Integer metricsRollingStatisticalWindowInMilliseconds = null; + private Integer metricsRollingStatisticalWindowBuckets = null; + private Integer metricsRollingPercentileBucketSize = null; + private Boolean metricsRollingPercentileEnabled = null; + private Integer metricsRollingPercentileWindowInMilliseconds = null; + private Integer metricsRollingPercentileWindowBuckets = null; private Setter() { } @@ -144,6 +224,31 @@ public Boolean getRequestCacheEnabled() { return requestCacheEnabled; } + public Integer getMetricsRollingStatisticalWindowInMilliseconds() { + return metricsRollingStatisticalWindowInMilliseconds; + } + + public Integer getMetricsRollingStatisticalWindowBuckets() { + return metricsRollingStatisticalWindowBuckets; + } + + public Integer getMetricsRollingPercentileBucketSize() { + return metricsRollingPercentileBucketSize; + } + + public Boolean getMetricsRollingPercentileEnabled() { + return metricsRollingPercentileEnabled; + } + + public Integer getMetricsRollingPercentileWindowInMilliseconds() { + return metricsRollingPercentileWindowInMilliseconds; + } + + public Integer getMetricsRollingPercentileWindowBuckets() { + return metricsRollingPercentileWindowBuckets; + } + + public Setter withCollapsingEnabled(boolean value) { this.collapsingEnabled = value; return this; @@ -164,6 +269,36 @@ public Setter withRequestCacheEnabled(boolean value) { return this; } + public Setter withMetricsRollingStatisticalWindowInMilliseconds(int value) { + this.metricsRollingStatisticalWindowInMilliseconds = value; + return this; + } + + public Setter withMetricsRollingStatisticalWindowBuckets(int value) { + this.metricsRollingStatisticalWindowBuckets = value; + return this; + } + + public Setter withMetricsRollingPercentileBucketSize(int value) { + this.metricsRollingPercentileBucketSize = value; + return this; + } + + public Setter withMetricsRollingPercentileEnabled(boolean value) { + this.metricsRollingPercentileEnabled = value; + return this; + } + + public Setter withMetricsRollingPercentileWindowInMilliseconds(int value) { + this.metricsRollingPercentileWindowInMilliseconds = value; + return this; + } + + public Setter withMetricsRollingPercentileWindowBuckets(int value) { + this.metricsRollingPercentileWindowBuckets = value; + return this; + } + /** * Base properties for unit testing. */ diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapserFactory.java b/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapserFactory.java index 822e2e9df..0772ac8d3 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapserFactory.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapserFactory.java @@ -2,6 +2,8 @@ import java.util.concurrent.ConcurrentHashMap; +import com.netflix.hystrix.HystrixCollapserMetrics; +import com.netflix.hystrix.HystrixCommandMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,13 +45,17 @@ private static enum Scopes implements Scope { } public RequestCollapserFactory(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder) { - /* strategy: ConcurrencyStrategy */ - this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); + this(collapserKey, scope, timer, HystrixPropertiesFactory.getCollapserProperties(collapserKey, propertiesBuilder)); + } + public RequestCollapserFactory(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties properties) { + /* strategy: ConcurrencyStrategy */ + this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.timer = timer; this.scope = scope; this.collapserKey = collapserKey; - this.properties = HystrixPropertiesFactory.getCollapserProperties(this.collapserKey, propertiesBuilder); + this.properties = properties; + } public HystrixCollapserKey getCollapserKey() { diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisher.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisher.java index 23cd06f92..dc6506532 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisher.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisher.java @@ -16,6 +16,10 @@ package com.netflix.hystrix.strategy.metrics; import com.netflix.hystrix.HystrixCircuitBreaker; +import com.netflix.hystrix.HystrixCollapser; +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCollapserMetrics; +import com.netflix.hystrix.HystrixCollapserProperties; import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandKey; @@ -87,4 +91,25 @@ public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(Hystri return new HystrixMetricsPublisherThreadPoolDefault(threadPoolKey, metrics, properties); } + /** + * Construct an implementation of {@link HystrixMetricsPublisherCollapser} for {@link HystrixCollapser} instances having key {@link HystrixCollapserKey}. + *

+ * This will be invoked once per {@link HystrixCollapserKey} instance. + *

+ * Default Implementation + *

+ * Return instance of {@link HystrixMetricsPublisherCollapserDefault} + * + * @param collapserKey + * {@link HystrixCollapserKey} representing the name or type of {@link HystrixCollapser} + * @param metrics + * {@link HystrixCollapserMetrics} instance tracking metrics for the {@link HystrixCollapser} instance having the key as defined by {@link HystrixCollapserKey} + * @param properties + * {@link HystrixCollapserProperties} instance for the {@link HystrixCollapser} instance having the key as defined by {@link HystrixCollapserKey} + * @return instance of {@link HystrixMetricsPublisherCollapser} that will have its initialize method invoked once. + */ + public HystrixMetricsPublisherCollapser getMetricsPublisherForCollapser(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) { + return new HystrixMetricsPublisherCollapserDefault(collapserKey, metrics, properties); + } + } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherCollapser.java new file mode 100644 index 000000000..239d11ebd --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherCollapser.java @@ -0,0 +1,36 @@ +/** + * Copyright 2012 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.strategy.metrics; + +import com.netflix.hystrix.HystrixCollapser; + +/** + * Metrics publisher for a {@link HystrixCollapser} that will be constructed by an implementation of {@link HystrixMetricsPublisher}. + *

+ * Instantiation of implementations of this interface should NOT allocate resources that require shutdown, register listeners or other such global state changes. + *

+ * The initialize() method will be called once-and-only-once to indicate when this instance can register with external services, start publishing metrics etc. + *

+ * Doing this logic in the constructor could result in memory leaks, double-publishing and other such behavior because this can be optimistically constructed more than once and "extras" discarded with + * only one actually having initialize() called on it. + */ +public interface HystrixMetricsPublisherCollapser { + + // TODO should the arguments be given via initialize rather than constructor so people can't accidentally do it wrong? + + public void initialize(); + +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherCollapserDefault.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherCollapserDefault.java new file mode 100644 index 000000000..da78682e1 --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherCollapserDefault.java @@ -0,0 +1,40 @@ +/** + * Copyright 2012 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.strategy.metrics; + +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCollapserMetrics; +import com.netflix.hystrix.HystrixCollapserProperties; + +/** + * Default implementation of {@link HystrixMetricsPublisherCollapser} that does nothing. + *

+ * See Wiki docs about plugins for more information. + * + * @ExcludeFromJavadoc + */ +public class HystrixMetricsPublisherCollapserDefault implements HystrixMetricsPublisherCollapser { + + public HystrixMetricsPublisherCollapserDefault(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) { + // do nothing by default + } + + @Override + public void initialize() { + // do nothing by default + } + +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/util/HystrixRollingNumberEvent.java b/hystrix-core/src/main/java/com/netflix/hystrix/util/HystrixRollingNumberEvent.java index e1c918807..1e1939f55 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/util/HystrixRollingNumberEvent.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/util/HystrixRollingNumberEvent.java @@ -31,7 +31,8 @@ public enum HystrixRollingNumberEvent { SUCCESS(1), FAILURE(1), TIMEOUT(1), SHORT_CIRCUITED(1), THREAD_POOL_REJECTED(1), SEMAPHORE_REJECTED(1), FALLBACK_SUCCESS(1), FALLBACK_FAILURE(1), FALLBACK_REJECTION(1), EXCEPTION_THROWN(1), - THREAD_EXECUTION(1), THREAD_MAX_ACTIVE(2), COLLAPSED(1), RESPONSE_FROM_CACHE(1); + THREAD_EXECUTION(1), THREAD_MAX_ACTIVE(2), COLLAPSED(1), RESPONSE_FROM_CACHE(1), + COLLAPSER_REQUEST_BATCHED(1), COLLAPSER_BATCH(1); private final int type; diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java index b0d9e5f51..967a00e43 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java @@ -1,12 +1,5 @@ package com.netflix.hystrix; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.lang.ref.Reference; import java.lang.ref.SoftReference; import java.util.ArrayList; @@ -18,6 +11,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import com.netflix.hystrix.strategy.properties.HystrixPropertiesCollapserDefault; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -34,14 +28,14 @@ import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableHolder; import com.netflix.hystrix.util.HystrixTimer.TimerListener; -public class HystrixCollapserTest { - static AtomicInteger counter = new AtomicInteger(); +import static org.junit.Assert.*; +public class HystrixCollapserTest { @Before public void init() { - counter.set(0); // since we're going to modify properties of the same class between tests, wipe the cache each time HystrixCollapser.reset(); + HystrixCollapserMetrics.reset(); /* we must call this to simulate a new request lifecycle running and clearing caches */ HystrixRequestContext.initializeContext(); } @@ -58,47 +52,57 @@ public void cleanup() { @Test public void testTwoRequests() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestRequestCollapser(timer, counter, 1).queue(); - Future response2 = new TestRequestCollapser(timer, counter, 2).queue(); + HystrixCollapser, String, String> collapser1 = new TestRequestCollapser(timer, 1); + Future response1 = collapser1.queue(); + HystrixCollapser, String, String> collapser2 = new TestRequestCollapser(timer, 2); + Future response2 = collapser2.queue(); timer.incrementTime(10); // let time pass that equals the default delay/period assertEquals("1", response1.get()); assertEquals("2", response2.get()); - assertEquals(1, counter.get()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertSame(metrics, collapser2.getMetrics()); + assertEquals(2L, metrics.getRollingCountRequestsBatched()); + assertEquals(1L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } @Test public void testMultipleBatches() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestRequestCollapser(timer, counter, 1).queue(); - Future response2 = new TestRequestCollapser(timer, counter, 2).queue(); + HystrixCollapser, String, String> collapser1 = new TestRequestCollapser(timer, 1); + Future response1 = collapser1.queue(); + Future response2 = new TestRequestCollapser(timer, 2).queue(); timer.incrementTime(10); // let time pass that equals the default delay/period assertEquals("1", response1.get()); assertEquals("2", response2.get()); - assertEquals(1, counter.get()); - // now request more - Future response3 = new TestRequestCollapser(timer, counter, 3).queue(); + Future response3 = new TestRequestCollapser(timer, 3).queue(); timer.incrementTime(10); // let time pass that equals the default delay/period assertEquals("3", response3.get()); // we should have had it execute twice now - assertEquals(2, counter.get()); assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(3L, metrics.getRollingCountRequestsBatched()); + assertEquals(2L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } @Test public void testMaxRequestsInBatch() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestRequestCollapser(timer, counter, 1, 2, 10).queue(); - Future response2 = new TestRequestCollapser(timer, counter, 2, 2, 10).queue(); - Future response3 = new TestRequestCollapser(timer, counter, 3, 2, 10).queue(); + HystrixCollapser, String, String> collapser1 = new TestRequestCollapser(timer, 1, 2, 10); + Future response1 = collapser1.queue(); + Future response2 = new TestRequestCollapser(timer, 2, 2, 10).queue(); + Future response3 = new TestRequestCollapser(timer, 3, 2, 10).queue(); timer.incrementTime(10); // let time pass that equals the default delay/period assertEquals("1", response1.get()); @@ -106,24 +110,29 @@ public void testMaxRequestsInBatch() throws Exception { assertEquals("3", response3.get()); // we should have had it execute twice because the batch size was 2 - assertEquals(2, counter.get()); assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(3L, metrics.getRollingCountRequestsBatched()); + assertEquals(2L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } @Test public void testRequestsOverTime() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestRequestCollapser(timer, counter, 1).queue(); + HystrixCollapser, String, String> collapser1 = new TestRequestCollapser(timer, 1); + Future response1 = collapser1.queue(); timer.incrementTime(5); - Future response2 = new TestRequestCollapser(timer, counter, 2).queue(); + Future response2 = new TestRequestCollapser(timer, 2).queue(); timer.incrementTime(8); // should execute here - Future response3 = new TestRequestCollapser(timer, counter, 3).queue(); + Future response3 = new TestRequestCollapser(timer, 3).queue(); timer.incrementTime(6); - Future response4 = new TestRequestCollapser(timer, counter, 4).queue(); + Future response4 = new TestRequestCollapser(timer, 4).queue(); timer.incrementTime(8); // should execute here - Future response5 = new TestRequestCollapser(timer, counter, 5).queue(); + Future response5 = new TestRequestCollapser(timer, 5).queue(); timer.incrementTime(10); // should execute here @@ -134,16 +143,20 @@ public void testRequestsOverTime() throws Exception { assertEquals("4", response4.get()); assertEquals("5", response5.get()); - System.out.println("number of executions: " + counter.get()); - assertEquals(3, counter.get()); assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(5L, metrics.getRollingCountRequestsBatched()); + assertEquals(3L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } @Test public void testUnsubscribeOnOneDoesntKillBatch() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestRequestCollapser(timer, counter, 1).queue(); - Future response2 = new TestRequestCollapser(timer, counter, 2).queue(); + HystrixCollapser, String, String> collapser1 = new TestRequestCollapser(timer, 1); + Future response1 = collapser1.queue(); + Future response2 = new TestRequestCollapser(timer, 2).queue(); // kill the first response1.cancel(true); @@ -160,18 +173,21 @@ public void testUnsubscribeOnOneDoesntKillBatch() throws Exception { // we should still get a response on the second assertEquals("2", response2.get()); - assertEquals(1, counter.get()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(2L, metrics.getRollingCountRequestsBatched()); + assertEquals(1L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } @Test public void testShardedRequests() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestShardedRequestCollapser(timer, counter, "1a").queue(); - Future response2 = new TestShardedRequestCollapser(timer, counter, "2b").queue(); - Future response3 = new TestShardedRequestCollapser(timer, counter, "3b").queue(); - Future response4 = new TestShardedRequestCollapser(timer, counter, "4a").queue(); + HystrixCollapser, String, String> collapser1 = new TestShardedRequestCollapser(timer, "1a"); + Future response1 = collapser1.queue(); + Future response2 = new TestShardedRequestCollapser(timer, "2b").queue(); + Future response3 = new TestShardedRequestCollapser(timer, "3b").queue(); + Future response4 = new TestShardedRequestCollapser(timer, "4a").queue(); timer.incrementTime(10); // let time pass that equals the default delay/period assertEquals("1a", response1.get()); @@ -180,21 +196,25 @@ public void testShardedRequests() throws Exception { assertEquals("4a", response4.get()); /* we should get 2 batches since it gets sharded */ - assertEquals(2, counter.get()); assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(4L, metrics.getRollingCountRequestsBatched()); + assertEquals(2L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } @Test public void testRequestScope() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestRequestCollapser(timer, counter, "1").queue(); - Future response2 = new TestRequestCollapser(timer, counter, "2").queue(); + HystrixCollapser, String, String> collapser1 = new TestRequestCollapser(timer, "1"); + Future response1 = collapser1.queue(); + Future response2 = new TestRequestCollapser(timer, "2").queue(); // simulate a new request RequestCollapserFactory.resetRequest(); - Future response3 = new TestRequestCollapser(timer, counter, "3").queue(); - Future response4 = new TestRequestCollapser(timer, counter, "4").queue(); + Future response3 = new TestRequestCollapser(timer, "3").queue(); + Future response4 = new TestRequestCollapser(timer, "4").queue(); timer.incrementTime(10); // let time pass that equals the default delay/period @@ -204,21 +224,25 @@ public void testRequestScope() throws Exception { assertEquals("4", response4.get()); // 2 different batches should execute, 1 per request - assertEquals(2, counter.get()); assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(4L, metrics.getRollingCountRequestsBatched()); + assertEquals(2L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } @Test public void testGlobalScope() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestGloballyScopedRequestCollapser(timer, counter, "1").queue(); - Future response2 = new TestGloballyScopedRequestCollapser(timer, counter, "2").queue(); + HystrixCollapser, String, String> collapser1 = new TestGloballyScopedRequestCollapser(timer, "1"); + Future response1 = collapser1.queue(); + Future response2 = new TestGloballyScopedRequestCollapser(timer, "2").queue(); // simulate a new request RequestCollapserFactory.resetRequest(); - Future response3 = new TestGloballyScopedRequestCollapser(timer, counter, "3").queue(); - Future response4 = new TestGloballyScopedRequestCollapser(timer, counter, "4").queue(); + Future response3 = new TestGloballyScopedRequestCollapser(timer, "3").queue(); + Future response4 = new TestGloballyScopedRequestCollapser(timer, "4").queue(); timer.incrementTime(10); // let time pass that equals the default delay/period @@ -228,15 +252,19 @@ public void testGlobalScope() throws Exception { assertEquals("4", response4.get()); // despite having cleared the cache in between we should have a single execution because this is on the global not request cache - assertEquals(1, counter.get()); assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(4L, metrics.getRollingCountRequestsBatched()); + assertEquals(1L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } @Test public void testErrorHandlingViaFutureException() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestRequestCollapserWithFaultyCreateCommand(timer, counter, "1").queue(); - Future response2 = new TestRequestCollapserWithFaultyCreateCommand(timer, counter, "2").queue(); + HystrixCollapser, String, String> collapser1 = new TestRequestCollapserWithFaultyCreateCommand(timer, "1"); + Future response1 = collapser1.queue(); + Future response2 = new TestRequestCollapserWithFaultyCreateCommand(timer, "2").queue(); timer.incrementTime(10); // let time pass that equals the default delay/period try { @@ -252,15 +280,19 @@ public void testErrorHandlingViaFutureException() throws Exception { // what we expect } - assertEquals(0, counter.get()); assertEquals(0, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(2L, metrics.getRollingCountRequestsBatched()); + assertEquals(0L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } @Test public void testErrorHandlingWhenMapToResponseFails() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestRequestCollapserWithFaultyMapToResponse(timer, counter, "1").queue(); - Future response2 = new TestRequestCollapserWithFaultyMapToResponse(timer, counter, "2").queue(); + HystrixCollapser, String, String> collapser1 = new TestRequestCollapserWithFaultyMapToResponse(timer, "1"); + Future response1 = collapser1.queue(); + Future response2 = new TestRequestCollapserWithFaultyMapToResponse(timer, "2").queue(); timer.incrementTime(10); // let time pass that equals the default delay/period try { @@ -277,9 +309,12 @@ public void testErrorHandlingWhenMapToResponseFails() throws Exception { } // the batch failed so no executions - assertEquals(0, counter.get()); // but it still executed the command once assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(2L, metrics.getRollingCountRequestsBatched()); + assertEquals(1L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } @Test @@ -289,17 +324,18 @@ public void testRequestVariableLifecycle1() throws Exception { // do actual work TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestRequestCollapser(timer, counter, 1).queue(); + HystrixCollapser, String, String> collapser1 = new TestRequestCollapser(timer, 1); + Future response1 = collapser1.queue(); timer.incrementTime(5); - Future response2 = new TestRequestCollapser(timer, counter, 2).queue(); + Future response2 = new TestRequestCollapser(timer, 2).queue(); timer.incrementTime(8); // should execute here - Future response3 = new TestRequestCollapser(timer, counter, 3).queue(); + Future response3 = new TestRequestCollapser(timer, 3).queue(); timer.incrementTime(6); - Future response4 = new TestRequestCollapser(timer, counter, 4).queue(); + Future response4 = new TestRequestCollapser(timer, 4).queue(); timer.incrementTime(8); // should execute here - Future response5 = new TestRequestCollapser(timer, counter, 5).queue(); + Future response5 = new TestRequestCollapser(timer, 5).queue(); timer.incrementTime(10); // should execute here @@ -323,11 +359,16 @@ public void testRequestVariableLifecycle1() throws Exception { System.out.println("timer.tasks.size() B: " + timer.tasks.size()); - HystrixRequestVariableHolder> rv = RequestCollapserFactory.getRequestVariable(new TestRequestCollapser(timer, counter, 1).getCollapserKey().name()); + HystrixRequestVariableHolder> rv = RequestCollapserFactory.getRequestVariable(new TestRequestCollapser(timer, 1).getCollapserKey().name()); assertNotNull(rv); // they should have all been removed as part of ThreadContext.remove() assertEquals(0, timer.tasks.size()); + + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(5L, metrics.getRollingCountRequestsBatched()); + assertEquals(3L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } @Test @@ -346,7 +387,7 @@ public void testRequestVariableLifecycle2() throws Exception { @Override public void run() { for (int i = 0; i < 100; i++) { - responses.add(new TestRequestCollapser(timer, counter, 1).queue()); + responses.add(new TestRequestCollapser(timer, 1).queue()); } } })); @@ -369,15 +410,16 @@ public void run() { } timer.incrementTime(5); - Future response2 = new TestRequestCollapser(timer, counter, 2).queue(); + HystrixCollapser, String, String> collapser1 = new TestRequestCollapser(timer, 2); + Future response2 = collapser1.queue(); timer.incrementTime(8); // should execute here - Future response3 = new TestRequestCollapser(timer, counter, 3).queue(); + Future response3 = new TestRequestCollapser(timer, 3).queue(); timer.incrementTime(6); - Future response4 = new TestRequestCollapser(timer, counter, 4).queue(); + Future response4 = new TestRequestCollapser(timer, 4).queue(); timer.incrementTime(8); // should execute here - Future response5 = new TestRequestCollapser(timer, counter, 5).queue(); + Future response5 = new TestRequestCollapser(timer, 5).queue(); timer.incrementTime(10); // should execute here @@ -398,11 +440,16 @@ public void run() { // simulate request lifecycle requestContext.shutdown(); - HystrixRequestVariableHolder> rv = RequestCollapserFactory.getRequestVariable(new TestRequestCollapser(timer, counter, 1).getCollapserKey().name()); + HystrixRequestVariableHolder> rv = RequestCollapserFactory.getRequestVariable(new TestRequestCollapser(timer, 1).getCollapserKey().name()); assertNotNull(rv); // they should have all been removed as part of ThreadContext.remove() assertEquals(0, timer.tasks.size()); + + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(504L, metrics.getRollingCountRequestsBatched()); + assertEquals(3L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } /** @@ -414,8 +461,8 @@ public void testRequestCache1() { HystrixRequestContext.initializeContext(); final TestCollapserTimer timer = new TestCollapserTimer(); - SuccessfulCacheableCollapsedCommand command1 = new SuccessfulCacheableCollapsedCommand(timer, counter, "A", true); - SuccessfulCacheableCollapsedCommand command2 = new SuccessfulCacheableCollapsedCommand(timer, counter, "A", true); + SuccessfulCacheableCollapsedCommand command1 = new SuccessfulCacheableCollapsedCommand(timer, "A", true); + SuccessfulCacheableCollapsedCommand command2 = new SuccessfulCacheableCollapsedCommand(timer, "A", true); Future f1 = command1.queue(); Future f2 = command2.queue(); @@ -430,9 +477,6 @@ public void testRequestCache1() { throw new RuntimeException(e); } - // we should have executed a command once - assertEquals(1, counter.get()); - Future f3 = command1.queue(); // increment past batch time so it executes @@ -445,7 +489,6 @@ public void testRequestCache1() { } // we should still have executed only one command - assertEquals(1, counter.get()); assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); HystrixInvokableInfo command = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixInvokableInfo[1])[0]; @@ -453,6 +496,11 @@ public void testRequestCache1() { assertEquals(2, command.getExecutionEvents().size()); assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); + + HystrixCollapserMetrics metrics = command1.getMetrics(); + assertEquals(1L, metrics.getRollingCountRequestsBatched()); + assertEquals(1L, metrics.getRollingCountBatches()); + assertEquals(2L, metrics.getRollingCountResponsesFromCache()); } /** @@ -464,8 +512,8 @@ public void testRequestCache2() { HystrixRequestContext.initializeContext(); final TestCollapserTimer timer = new TestCollapserTimer(); - SuccessfulCacheableCollapsedCommand command1 = new SuccessfulCacheableCollapsedCommand(timer, counter, "A", true); - SuccessfulCacheableCollapsedCommand command2 = new SuccessfulCacheableCollapsedCommand(timer, counter, "B", true); + SuccessfulCacheableCollapsedCommand command1 = new SuccessfulCacheableCollapsedCommand(timer, "A", true); + SuccessfulCacheableCollapsedCommand command2 = new SuccessfulCacheableCollapsedCommand(timer, "B", true); Future f1 = command1.queue(); Future f2 = command2.queue(); @@ -480,9 +528,6 @@ public void testRequestCache2() { throw new RuntimeException(e); } - // we should have executed a command once - assertEquals(1, counter.get()); - Future f3 = command1.queue(); Future f4 = command2.queue(); @@ -497,13 +542,17 @@ public void testRequestCache2() { } // we should still have executed only one command - assertEquals(1, counter.get()); assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); HystrixInvokableInfo command = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixInvokableInfo[1])[0]; assertEquals(2, command.getExecutionEvents().size()); assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); + + HystrixCollapserMetrics metrics = command1.getMetrics(); + assertEquals(2L, metrics.getRollingCountRequestsBatched()); + assertEquals(1L, metrics.getRollingCountBatches()); + assertEquals(2L, metrics.getRollingCountResponsesFromCache()); } /** @@ -515,9 +564,9 @@ public void testRequestCache3() { HystrixRequestContext.initializeContext(); final TestCollapserTimer timer = new TestCollapserTimer(); - SuccessfulCacheableCollapsedCommand command1 = new SuccessfulCacheableCollapsedCommand(timer, counter, "A", true); - SuccessfulCacheableCollapsedCommand command2 = new SuccessfulCacheableCollapsedCommand(timer, counter, "B", true); - SuccessfulCacheableCollapsedCommand command3 = new SuccessfulCacheableCollapsedCommand(timer, counter, "B", true); + SuccessfulCacheableCollapsedCommand command1 = new SuccessfulCacheableCollapsedCommand(timer, "A", true); + SuccessfulCacheableCollapsedCommand command2 = new SuccessfulCacheableCollapsedCommand(timer, "B", true); + SuccessfulCacheableCollapsedCommand command3 = new SuccessfulCacheableCollapsedCommand(timer, "B", true); Future f1 = command1.queue(); Future f2 = command2.queue(); @@ -534,9 +583,6 @@ public void testRequestCache3() { throw new RuntimeException(e); } - // we should have executed a command once - assertEquals(1, counter.get()); - Future f4 = command1.queue(); Future f5 = command2.queue(); Future f6 = command3.queue(); @@ -553,13 +599,17 @@ public void testRequestCache3() { } // we should still have executed only one command - assertEquals(1, counter.get()); assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); HystrixInvokableInfo command = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixInvokableInfo[1])[0]; assertEquals(2, command.getExecutionEvents().size()); assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); + + HystrixCollapserMetrics metrics = command1.getMetrics(); + assertEquals(2L, metrics.getRollingCountRequestsBatched()); + assertEquals(1L, metrics.getRollingCountBatches()); + assertEquals(4L, metrics.getRollingCountResponsesFromCache()); } /** @@ -571,9 +621,9 @@ public void testNoRequestCache3() { HystrixRequestContext.initializeContext(); final TestCollapserTimer timer = new TestCollapserTimer(); - SuccessfulCacheableCollapsedCommand command1 = new SuccessfulCacheableCollapsedCommand(timer, counter, "A", false); - SuccessfulCacheableCollapsedCommand command2 = new SuccessfulCacheableCollapsedCommand(timer, counter, "B", false); - SuccessfulCacheableCollapsedCommand command3 = new SuccessfulCacheableCollapsedCommand(timer, counter, "B", false); + SuccessfulCacheableCollapsedCommand command1 = new SuccessfulCacheableCollapsedCommand(timer, "A", false); + SuccessfulCacheableCollapsedCommand command2 = new SuccessfulCacheableCollapsedCommand(timer, "B", false); + SuccessfulCacheableCollapsedCommand command3 = new SuccessfulCacheableCollapsedCommand(timer, "B", false); Future f1 = command1.queue(); Future f2 = command2.queue(); @@ -590,9 +640,6 @@ public void testNoRequestCache3() { throw new RuntimeException(e); } - // we should have executed a command once - assertEquals(1, counter.get()); - Future f4 = command1.queue(); Future f5 = command2.queue(); Future f6 = command3.queue(); @@ -609,7 +656,6 @@ public void testNoRequestCache3() { } // request caching is turned off on this so we expect 2 command executions - assertEquals(2, counter.get()); assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); // we expect to see it with SUCCESS and COLLAPSED and both @@ -623,6 +669,11 @@ public void testNoRequestCache3() { assertEquals(2, commandB.getExecutionEvents().size()); assertTrue(commandB.getExecutionEvents().contains(HystrixEventType.SUCCESS)); assertTrue(commandB.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); + + HystrixCollapserMetrics metrics = command1.getMetrics(); + assertEquals(6L, metrics.getRollingCountRequestsBatched()); + assertEquals(2L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } /** @@ -637,8 +688,8 @@ public void testRequestCacheWithException() { final TestCollapserTimer timer = new TestCollapserTimer(); // pass in 'null' which will cause an NPE to be thrown - SuccessfulCacheableCollapsedCommand command1 = new SuccessfulCacheableCollapsedCommand(timer, counter, null, true, commands); - SuccessfulCacheableCollapsedCommand command2 = new SuccessfulCacheableCollapsedCommand(timer, counter, null, true, commands); + SuccessfulCacheableCollapsedCommand command1 = new SuccessfulCacheableCollapsedCommand(timer, null, true, commands); + SuccessfulCacheableCollapsedCommand command2 = new SuccessfulCacheableCollapsedCommand(timer, null, true, commands); Future f1 = command1.queue(); Future f2 = command2.queue(); @@ -654,15 +705,12 @@ public void testRequestCacheWithException() { // expected } - // this should be 0 because we never complete execution - assertEquals(0, counter.get()); - // it should have executed 1 command assertEquals(1, commands.size()); assertTrue(commands.peek().getExecutionEvents().contains(HystrixEventType.FAILURE)); assertTrue(commands.peek().getExecutionEvents().contains(HystrixEventType.COLLAPSED)); - SuccessfulCacheableCollapsedCommand command3 = new SuccessfulCacheableCollapsedCommand(timer, counter, null, true, commands); + SuccessfulCacheableCollapsedCommand command3 = new SuccessfulCacheableCollapsedCommand(timer, null, true, commands); Future f3 = command3.queue(); // increment past batch time so it executes @@ -675,9 +723,6 @@ public void testRequestCacheWithException() { // expected } - // this should be 0 because we never complete execution - assertEquals(0, counter.get()); - // it should still be 1 ... no new executions assertEquals(1, commands.size()); assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); @@ -686,6 +731,11 @@ public void testRequestCacheWithException() { assertEquals(2, command.getExecutionEvents().size()); assertTrue(command.getExecutionEvents().contains(HystrixEventType.FAILURE)); assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); + + HystrixCollapserMetrics metrics = command1.getMetrics(); + assertEquals(1L, metrics.getRollingCountRequestsBatched()); + assertEquals(1L, metrics.getRollingCountBatches()); + assertEquals(2L, metrics.getRollingCountResponsesFromCache()); } /** @@ -700,8 +750,8 @@ public void testRequestCacheWithTimeout() { final TestCollapserTimer timer = new TestCollapserTimer(); // pass in 'null' which will cause an NPE to be thrown - SuccessfulCacheableCollapsedCommand command1 = new SuccessfulCacheableCollapsedCommand(timer, counter, "TIMEOUT", true, commands); - SuccessfulCacheableCollapsedCommand command2 = new SuccessfulCacheableCollapsedCommand(timer, counter, "TIMEOUT", true, commands); + SuccessfulCacheableCollapsedCommand command1 = new SuccessfulCacheableCollapsedCommand(timer, "TIMEOUT", true, commands); + SuccessfulCacheableCollapsedCommand command2 = new SuccessfulCacheableCollapsedCommand(timer, "TIMEOUT", true, commands); Future f1 = command1.queue(); Future f2 = command2.queue(); @@ -717,9 +767,6 @@ public void testRequestCacheWithTimeout() { // expected } - // this should be 0 because we never complete execution - assertEquals(0, counter.get()); - // it should have executed 1 command assertEquals(1, commands.size()); assertTrue(commands.peek().getExecutionEvents().contains(HystrixEventType.TIMEOUT)); @@ -737,12 +784,14 @@ public void testRequestCacheWithTimeout() { // expected } - // this should be 0 because we never complete execution - assertEquals(0, counter.get()); - // it should still be 1 ... no new executions assertEquals(1, commands.size()); assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + + HystrixCollapserMetrics metrics = command1.getMetrics(); + assertEquals(1L, metrics.getRollingCountRequestsBatched()); + assertEquals(1L, metrics.getRollingCountBatches()); + assertEquals(2L, metrics.getRollingCountResponsesFromCache()); } /** @@ -751,8 +800,9 @@ public void testRequestCacheWithTimeout() { @Test public void testRequestWithCommandShortCircuited() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestRequestCollapserWithShortCircuitedCommand(timer, counter, "1").queue(); - Future response2 = new TestRequestCollapserWithShortCircuitedCommand(timer, counter, "2").queue(); + HystrixCollapser, String, String> collapser1 = new TestRequestCollapserWithShortCircuitedCommand(timer, "1"); + Future response1 = collapser1.queue(); + Future response2 = new TestRequestCollapserWithShortCircuitedCommand(timer, "2").queue(); timer.incrementTime(10); // let time pass that equals the default delay/period try { @@ -770,9 +820,13 @@ public void testRequestWithCommandShortCircuited() throws Exception { // what we expect } - assertEquals(0, counter.get()); // it will execute once (short-circuited) assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(2L, metrics.getRollingCountRequestsBatched()); + assertEquals(1L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } /** @@ -783,8 +837,9 @@ public void testRequestWithCommandShortCircuited() throws Exception { @Test public void testVoidResponseTypeFireAndForgetCollapsing1() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestCollapserWithVoidResponseType(timer, counter, 1).queue(); - Future response2 = new TestCollapserWithVoidResponseType(timer, counter, 2).queue(); + TestCollapserWithVoidResponseType collapser1 = new TestCollapserWithVoidResponseType(timer, 1); + Future response1 = collapser1.queue(); + Future response2 = new TestCollapserWithVoidResponseType(timer, 2).queue(); timer.incrementTime(100); // let time pass that equals the default delay/period // normally someone wouldn't wait on these, but we need to make sure they do in fact return @@ -792,9 +847,12 @@ public void testVoidResponseTypeFireAndForgetCollapsing1() throws Exception { assertEquals(null, response1.get()); assertEquals(null, response2.get()); - assertEquals(1, counter.get()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(2L, metrics.getRollingCountRequestsBatched()); + assertEquals(1L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } /** @@ -805,8 +863,9 @@ public void testVoidResponseTypeFireAndForgetCollapsing1() throws Exception { @Test public void testVoidResponseTypeFireAndForgetCollapsing2() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestCollapserWithVoidResponseTypeAndMissingMapResponseToRequests(timer, counter, 1).queue(); - Future response2 = new TestCollapserWithVoidResponseTypeAndMissingMapResponseToRequests(timer, counter, 2).queue(); + TestCollapserWithVoidResponseTypeAndMissingMapResponseToRequests collapser1 = new TestCollapserWithVoidResponseTypeAndMissingMapResponseToRequests(timer, 1); + Future response1 = collapser1.queue(); + Future response2 = new TestCollapserWithVoidResponseTypeAndMissingMapResponseToRequests(timer, 2).queue(); timer.incrementTime(100); // let time pass that equals the default delay/period // we will fetch one of these just so we wait for completion ... but expect an error @@ -818,9 +877,12 @@ public void testVoidResponseTypeFireAndForgetCollapsing2() throws Exception { assertTrue(e.getCause().getMessage().startsWith("No response set by")); } - assertEquals(1, counter.get()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(2L, metrics.getRollingCountRequestsBatched()); + assertEquals(1L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } /** @@ -831,56 +893,64 @@ public void testVoidResponseTypeFireAndForgetCollapsing2() throws Exception { @Test public void testVoidResponseTypeFireAndForgetCollapsing3() throws Exception { CollapserTimer timer = new RealCollapserTimer(); - assertNull(new TestCollapserWithVoidResponseType(timer, counter, 1).execute()); - - assertEquals(1, counter.get()); + TestCollapserWithVoidResponseType collapser1 = new TestCollapserWithVoidResponseType(timer, 1); + assertNull(collapser1.execute()); assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertEquals(1L, metrics.getRollingCountRequestsBatched()); + assertEquals(1L, metrics.getRollingCountBatches()); + assertEquals(0L, metrics.getRollingCountResponsesFromCache()); } private static class TestRequestCollapser extends HystrixCollapser, String, String> { - private final AtomicInteger count; private final String value; private ConcurrentLinkedQueue>> commandsExecuted; - public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, int value) { - this(timer, counter, String.valueOf(value)); + public TestRequestCollapser(TestCollapserTimer timer, int value) { + this(timer, String.valueOf(value)); + } + + public TestRequestCollapser(TestCollapserTimer timer, String value) { + this(timer, value, 10000, 10); } - public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value) { - this(timer, counter, value, 10000, 10); + public TestRequestCollapser(Scope scope, TestCollapserTimer timer, String value) { + this(scope, timer, value, 10000, 10); } - public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value) { - this(scope, timer, counter, value, 10000, 10); + public TestRequestCollapser(TestCollapserTimer timer, String value, ConcurrentLinkedQueue>> executionLog) { + this(timer, value, 10000, 10, executionLog); } - public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, ConcurrentLinkedQueue>> executionLog) { - this(timer, counter, value, 10000, 10, executionLog); + public TestRequestCollapser(TestCollapserTimer timer, int value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { + this(timer, String.valueOf(value), defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds); } - public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, int value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { - this(timer, counter, String.valueOf(value), defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds); + public TestRequestCollapser(TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { + this(timer, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null); } - public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { - this(timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null); + public TestRequestCollapser(Scope scope, TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { + this(scope, timer, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null); } - public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { - this(scope, timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null); + public TestRequestCollapser(TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue>> executionLog) { + this(Scope.REQUEST, timer, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, executionLog); } - public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue>> executionLog) { - this(Scope.REQUEST, timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, executionLog); + private static HystrixCollapserMetrics createMetrics() { + HystrixCollapserKey key = HystrixCollapserKey.Factory.asKey("COLLAPSER_ONE"); + HystrixCollapserMetrics metrics = HystrixCollapserMetrics.getInstance(key, new HystrixPropertiesCollapserDefault(key, HystrixCollapserProperties.Setter())); + return metrics; } - public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue>> executionLog) { + public TestRequestCollapser(Scope scope, TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue>> executionLog) { // use a CollapserKey based on the CollapserTimer object reference so it's unique for each timer as we don't want caching // of properties to occur and we're using the default HystrixProperty which typically does caching - super(collapserKeyFromString(timer), scope, timer, HystrixCollapserProperties.Setter().withMaxRequestsInBatch(defaultMaxRequestsInBatch).withTimerDelayInMilliseconds(defaultTimerDelayInMilliseconds)); - this.count = counter; + super(collapserKeyFromString(timer), scope, timer, HystrixCollapserProperties.Setter().withMaxRequestsInBatch(defaultMaxRequestsInBatch).withTimerDelayInMilliseconds(defaultTimerDelayInMilliseconds), createMetrics()); this.value = value; this.commandsExecuted = executionLog; } @@ -902,9 +972,6 @@ public HystrixCommand> createCommand(final Collection batchResponse, Collection> requests) { - // count how many times a batch is executed (this method is executed once per batch) - System.out.println("increment count: " + count.incrementAndGet()); - // for simplicity I'll assume it's a 1:1 mapping between lists ... in real implementations they often need to index to maps // to allow random access as the response size does not match the request size if (batchResponse.size() != requests.size()) { @@ -924,8 +991,8 @@ public void mapResponseToRequests(List batchResponse, Collection>> shardRequests */ private static class TestGloballyScopedRequestCollapser extends TestRequestCollapser { - public TestGloballyScopedRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value) { - super(Scope.GLOBAL, timer, counter, value); + public TestGloballyScopedRequestCollapser(TestCollapserTimer timer, String value) { + super(Scope.GLOBAL, timer, value); } } @@ -965,8 +1032,8 @@ public TestGloballyScopedRequestCollapser(TestCollapserTimer timer, AtomicIntege */ private static class TestRequestCollapserWithFaultyCreateCommand extends TestRequestCollapser { - public TestRequestCollapserWithFaultyCreateCommand(TestCollapserTimer timer, AtomicInteger counter, String value) { - super(timer, counter, value); + public TestRequestCollapserWithFaultyCreateCommand(TestCollapserTimer timer, String value) { + super(timer, value); } @Override @@ -981,8 +1048,8 @@ public HystrixCommand> createCommand(Collection> createCommand(Collection>> executionLog) { - super(timer, counter, value, executionLog); + public SuccessfulCacheableCollapsedCommand(TestCollapserTimer timer, String value, boolean cacheEnabled, ConcurrentLinkedQueue>> executionLog) { + super(timer, value, executionLog); this.cacheEnabled = cacheEnabled; } @@ -1228,12 +1295,10 @@ public String name() { private static class TestCollapserWithVoidResponseType extends HystrixCollapser { - private final AtomicInteger count; private final Integer value; - public TestCollapserWithVoidResponseType(CollapserTimer timer, AtomicInteger counter, int value) { + public TestCollapserWithVoidResponseType(CollapserTimer timer, int value) { super(collapserKeyFromString(timer), Scope.REQUEST, timer, HystrixCollapserProperties.Setter().withMaxRequestsInBatch(1000).withTimerDelayInMilliseconds(50)); - this.count = counter; this.value = value; } @@ -1254,7 +1319,6 @@ protected HystrixCommand createCommand(Collection> requests) { - count.incrementAndGet(); for (CollapsedRequest r : requests) { r.setResponse(null); } @@ -1264,12 +1328,10 @@ protected void mapResponseToRequests(Void batchResponse, Collection { - private final AtomicInteger count; private final Integer value; - public TestCollapserWithVoidResponseTypeAndMissingMapResponseToRequests(CollapserTimer timer, AtomicInteger counter, int value) { + public TestCollapserWithVoidResponseTypeAndMissingMapResponseToRequests(CollapserTimer timer, int value) { super(collapserKeyFromString(timer), Scope.REQUEST, timer, HystrixCollapserProperties.Setter().withMaxRequestsInBatch(1000).withTimerDelayInMilliseconds(50)); - this.count = counter; this.value = value; } @@ -1290,9 +1352,6 @@ protected HystrixCommand createCommand(Collection> requests) { - count.incrementAndGet(); } - } - } From 13170977ff90955eb0ce7c40aeb8dace08a45445 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Mon, 26 Jan 2015 14:51:16 -0800 Subject: [PATCH 2/5] Add Servo integration for HystrixCollapser metrics --- ...HystrixServoMetricsPublisherCollapser.java | 262 ++++++++++++++++++ .../com/netflix/hystrix/HystrixCollapser.java | 2 - 2 files changed, 262 insertions(+), 2 deletions(-) create mode 100644 hystrix-contrib/hystrix-servo-metrics-publisher/src/main/java/com/netflix/hystrix/contrib/servopublisher/HystrixServoMetricsPublisherCollapser.java diff --git a/hystrix-contrib/hystrix-servo-metrics-publisher/src/main/java/com/netflix/hystrix/contrib/servopublisher/HystrixServoMetricsPublisherCollapser.java b/hystrix-contrib/hystrix-servo-metrics-publisher/src/main/java/com/netflix/hystrix/contrib/servopublisher/HystrixServoMetricsPublisherCollapser.java new file mode 100644 index 000000000..c4363ab3c --- /dev/null +++ b/hystrix-contrib/hystrix-servo-metrics-publisher/src/main/java/com/netflix/hystrix/contrib/servopublisher/HystrixServoMetricsPublisherCollapser.java @@ -0,0 +1,262 @@ +/** + * Copyright 2012 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.servopublisher; + +import java.util.ArrayList; +import java.util.List; + +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCollapserMetrics; +import com.netflix.hystrix.HystrixCollapserProperties; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.HystrixThreadPoolMetrics; +import com.netflix.hystrix.HystrixThreadPoolProperties; +import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherCollapser; +import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherThreadPool; +import com.netflix.servo.DefaultMonitorRegistry; +import com.netflix.servo.annotations.DataSourceLevel; +import com.netflix.servo.monitor.BasicCompositeMonitor; +import com.netflix.servo.monitor.Monitor; +import com.netflix.servo.monitor.MonitorConfig; +import com.netflix.servo.tag.Tag; + +/** + * Implementation of {@link HystrixMetricsPublisherCollapser} using Servo (https://github.com/Netflix/servo) + */ +public class HystrixServoMetricsPublisherCollapser extends HystrixServoMetricsPublisherAbstract implements HystrixMetricsPublisherCollapser { + + private final HystrixCollapserKey key; + private final HystrixCollapserMetrics metrics; + private final HystrixCollapserProperties properties; + private final Tag servoInstanceTag; + private final Tag servoTypeTag; + + public HystrixServoMetricsPublisherCollapser(HystrixCollapserKey threadPoolKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) { + this.key = threadPoolKey; + this.metrics = metrics; + this.properties = properties; + + this.servoInstanceTag = new Tag() { + + @Override + public String getKey() { + return "instance"; + } + + @Override + public String getValue() { + return key.name(); + } + + @Override + public String tagString() { + return key.name(); + } + + }; + this.servoTypeTag = new Tag() { + + @Override + public String getKey() { + return "type"; + } + + @Override + public String getValue() { + return "HystrixCollapser"; + } + + @Override + public String tagString() { + return "HystrixCollapser"; + } + + }; + } + + @Override + public void initialize() { + /* list of monitors */ + List> monitors = getServoMonitors(); + + // publish metrics together under a single composite (it seems this name is ignored) + MonitorConfig commandMetricsConfig = MonitorConfig.builder("HystrixCollapser_" + key.name()).build(); + BasicCompositeMonitor commandMetricsMonitor = new BasicCompositeMonitor(commandMetricsConfig, monitors); + + DefaultMonitorRegistry.getInstance().register(commandMetricsMonitor); + } + + @Override + protected Tag getServoTypeTag() { + return servoTypeTag; + } + + @Override + protected Tag getServoInstanceTag() { + return servoInstanceTag; + } + + /** + * Servo will flatten metric names as: getServoTypeTag()_getServoInstanceTag()_monitorName + */ + private List> getServoMonitors() { + + List> monitors = new ArrayList>(); + + monitors.add(new InformationalMetric(MonitorConfig.builder("name").build()) { + @Override + public String getValue() { + return key.name(); + } + }); + + // allow Servo and monitor to know exactly at what point in time these stats are for so they can be plotted accurately + monitors.add(new GaugeMetric(MonitorConfig.builder("currentTime").withTag(DataSourceLevel.DEBUG).build()) { + @Override + public Number getValue() { + return System.currentTimeMillis(); + } + }); + + monitors.add(new CounterMetric(MonitorConfig.builder("countRequestsBatched").build()) { + @Override + public Number getValue() { + return metrics.getCumulativeCountRequestsBatched(); + } + }); + + monitors.add(new CounterMetric(MonitorConfig.builder("countBatches").build()) { + @Override + public Number getValue() { + return metrics.getCumulativeCountBatches(); + } + }); + + monitors.add(new CounterMetric(MonitorConfig.builder("countResponsesFromCache").build()) { + @Override + public Number getValue() { + return metrics.getCumulativeCountResponsesFromCache(); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("batchSize_percentile_25").build()) { + @Override + public Number getValue() { + return metrics.getBatchSizePercentile(25); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("batchSize_percentile_50").build()) { + @Override + public Number getValue() { + return metrics.getBatchSizePercentile(50); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("batchSize_percentile_75").build()) { + @Override + public Number getValue() { + return metrics.getBatchSizePercentile(75); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("batchSize_percentile_95").build()) { + @Override + public Number getValue() { + return metrics.getBatchSizePercentile(95); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("batchSize_percentile_99").build()) { + @Override + public Number getValue() { + return metrics.getBatchSizePercentile(99); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("batchSize_percentile_99_5").build()) { + @Override + public Number getValue() { + return metrics.getBatchSizePercentile(99.5); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("batchSize_percentile_100").build()) { + @Override + public Number getValue() { + return metrics.getBatchSizePercentile(100); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_25").build()) { + @Override + public Number getValue() { + return metrics.getShardSizePercentile(25); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_50").build()) { + @Override + public Number getValue() { + return metrics.getShardSizePercentile(50); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_75").build()) { + @Override + public Number getValue() { + return metrics.getShardSizePercentile(75); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_90").build()) { + @Override + public Number getValue() { + return metrics.getShardSizePercentile(90); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_95").build()) { + @Override + public Number getValue() { + return metrics.getShardSizePercentile(95); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_99").build()) { + @Override + public Number getValue() { + return metrics.getShardSizePercentile(99); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_99_5").build()) { + @Override + public Number getValue() { + return metrics.getShardSizePercentile(99.5); + } + }); + + monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_100").build()) { + @Override + public Number getValue() { + return metrics.getShardSizePercentile(100); + } + }); + + return monitors; + } +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java index 5a379708a..30c9801d0 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java @@ -177,8 +177,6 @@ public HystrixCollapserKey getCollapserKey() { } }; - - } private HystrixCollapserProperties getProperties() { From e716bee167e4d26f3df6fbe5a4bbac3a8742a685 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Mon, 26 Jan 2015 16:11:11 -0800 Subject: [PATCH 3/5] Add publisher creation/init to HystrixCollapser --- .../com/netflix/hystrix/HystrixCollapser.java | 5 ++ .../HystrixMetricsPublisherFactory.java | 46 ++++++++++++++++++- 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java index 30c9801d0..022fccb13 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java @@ -20,6 +20,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherCollapser; +import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory; import com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,6 +137,9 @@ protected HystrixCollapser(Setter setter) { final HystrixCollapser self = this; + /* strategy: HystrixMetricsPublisherCollapser */ + HystrixMetricsPublisherFactory.createOrRetrievePublisherForCollapser(collapserKey, this.metrics, properties); + /** * Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class. */ diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherFactory.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherFactory.java index 7a654c0e6..e46c26926 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherFactory.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/metrics/HystrixMetricsPublisherFactory.java @@ -16,9 +16,12 @@ package com.netflix.hystrix.strategy.metrics; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; import com.netflix.hystrix.HystrixCircuitBreaker; +import com.netflix.hystrix.HystrixCollapser; +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCollapserMetrics; +import com.netflix.hystrix.HystrixCollapserProperties; import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandKey; @@ -89,6 +92,7 @@ public static void reset() { SINGLETON = new HystrixMetricsPublisherFactory(); SINGLETON.commandPublishers.clear(); SINGLETON.threadPoolPublishers.clear(); + SINGLETON.collapserPublishers.clear(); } /* package */ HystrixMetricsPublisherFactory() {} @@ -143,4 +147,44 @@ public static void reset() { } } + /** + * Get an instance of {@link HystrixMetricsPublisherCollapser} with the given factory {@link HystrixMetricsPublisher} implementation for each {@link HystrixCollapser} instance. + * + * @param collapserKey + * Pass-thru to {@link HystrixMetricsPublisher#getMetricsPublisherForCollapser} implementation + * @param metrics + * Pass-thru to {@link HystrixMetricsPublisher#getMetricsPublisherForCollapser} implementation + * @param properties + * Pass-thru to {@link HystrixMetricsPublisher#getMetricsPublisherForCollapser} implementation + * @return {@link HystrixMetricsPublisherCollapser} instance + */ + public static HystrixMetricsPublisherCollapser createOrRetrievePublisherForCollapser(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) { + return SINGLETON.getPublisherForCollapser(collapserKey, metrics, properties); + } + + // String is CollapserKey.name() (we can't use CollapserKey directly as we can't guarantee it implements hashcode/equals correctly) + private final ConcurrentHashMap collapserPublishers = new ConcurrentHashMap(); + + /* package */ HystrixMetricsPublisherCollapser getPublisherForCollapser(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) { + // attempt to retrieve from cache first + HystrixMetricsPublisherCollapser publisher = collapserPublishers.get(collapserKey.name()); + if (publisher != null) { + return publisher; + } + // it doesn't exist so we need to create it + publisher = HystrixPlugins.getInstance().getMetricsPublisher().getMetricsPublisherForCollapser(collapserKey, metrics, properties); + // attempt to store it (race other threads) + HystrixMetricsPublisherCollapser existing = collapserPublishers.putIfAbsent(collapserKey.name(), publisher); + if (existing == null) { + // we won the thread-race to store the instance we created so initialize it + publisher.initialize(); + // done registering, return instance that got cached + return publisher; + } else { + // we lost so return 'existing' and let the one we created be garbage collected + // without calling initialize() on it + return existing; + } + } + } From 33dd4617eb3505464ab404a3fdbf03187de7b248 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Mon, 26 Jan 2015 16:38:56 -0800 Subject: [PATCH 4/5] Fix formatting in HystrixCollapser constructor --- .../src/main/java/com/netflix/hystrix/HystrixCollapser.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java index 022fccb13..c281a62ae 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java @@ -121,7 +121,7 @@ protected HystrixCollapser(Setter setter) { /* package for tests */ HystrixCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) { if (collapserKey == null || collapserKey.name().trim().equals("")) { - String defaultKeyName = getDefaultNameFromClass (getClass()); + String defaultKeyName = getDefaultNameFromClass(getClass()); collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName); } From c64e0566f7c893ff1c12092e8d84865d1aa6e072 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Mon, 26 Jan 2015 17:01:31 -0800 Subject: [PATCH 5/5] Removing unused imports from HystrixServoMetricsPublisherCollapser --- .../servopublisher/HystrixServoMetricsPublisherCollapser.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/hystrix-contrib/hystrix-servo-metrics-publisher/src/main/java/com/netflix/hystrix/contrib/servopublisher/HystrixServoMetricsPublisherCollapser.java b/hystrix-contrib/hystrix-servo-metrics-publisher/src/main/java/com/netflix/hystrix/contrib/servopublisher/HystrixServoMetricsPublisherCollapser.java index c4363ab3c..57e445c55 100644 --- a/hystrix-contrib/hystrix-servo-metrics-publisher/src/main/java/com/netflix/hystrix/contrib/servopublisher/HystrixServoMetricsPublisherCollapser.java +++ b/hystrix-contrib/hystrix-servo-metrics-publisher/src/main/java/com/netflix/hystrix/contrib/servopublisher/HystrixServoMetricsPublisherCollapser.java @@ -21,11 +21,7 @@ import com.netflix.hystrix.HystrixCollapserKey; import com.netflix.hystrix.HystrixCollapserMetrics; import com.netflix.hystrix.HystrixCollapserProperties; -import com.netflix.hystrix.HystrixThreadPoolKey; -import com.netflix.hystrix.HystrixThreadPoolMetrics; -import com.netflix.hystrix.HystrixThreadPoolProperties; import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherCollapser; -import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherThreadPool; import com.netflix.servo.DefaultMonitorRegistry; import com.netflix.servo.annotations.DataSourceLevel; import com.netflix.servo.monitor.BasicCompositeMonitor;