Skip to content

Commit

Permalink
Make thread-pool and collapser metrics also able to be varied by plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Jacobs committed Jul 28, 2015
1 parent 795e937 commit a46b6e9
Show file tree
Hide file tree
Showing 11 changed files with 375 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,17 @@
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;

import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
import com.netflix.hystrix.util.HystrixRollingNumber;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsCollection;
import com.netflix.hystrix.util.HystrixRollingNumberEvent;
import com.netflix.hystrix.util.HystrixRollingPercentile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Used by {@link HystrixCollapser} to record metrics.
* {@link HystrixEventNotifier} not hooked up yet. It may be in the future.
* {@link com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier} not hooked up yet. It may be in the future.
*/
public class HystrixCollapserMetrics extends HystrixMetrics {

private final HystrixRollingNumber counter;
public abstract class HystrixCollapserMetrics extends HystrixMetrics {

@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(HystrixCollapserMetrics.class);
Expand All @@ -56,7 +53,8 @@ public static HystrixCollapserMetrics getInstance(HystrixCollapserKey key, Hystr
return collapserMetrics;
}
// it doesn't exist so we need to create it
collapserMetrics = new HystrixCollapserMetrics(key, properties);
HystrixMetricsCollection metricsCollectionStrategy = HystrixPlugins.getInstance().getMetricsCollection();
collapserMetrics = metricsCollectionStrategy.getCollapserMetricsInstance(key, properties);
// attempt to store it (race other threads)
HystrixCollapserMetrics existing = metrics.putIfAbsent(key.name(), collapserMetrics);
if (existing == null) {
Expand Down Expand Up @@ -86,16 +84,10 @@ public static Collection<HystrixCollapserMetrics> getInstances() {

private final HystrixCollapserKey key;
private final HystrixCollapserProperties properties;
private final HystrixRollingPercentile percentileBatchSize;
private final HystrixRollingPercentile percentileShardSize;

/* package */HystrixCollapserMetrics(HystrixCollapserKey key, HystrixCollapserProperties properties) {
counter = new HystrixRollingNumber(properties.metricsRollingStatisticalWindowInMilliseconds().get(), properties.metricsRollingStatisticalWindowBuckets().get());
protected HystrixCollapserMetrics(HystrixCollapserKey key, HystrixCollapserProperties properties) {
this.key = key;
this.properties = properties;

this.percentileBatchSize = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds().get(), properties.metricsRollingPercentileWindowBuckets().get(), properties.metricsRollingPercentileBucketSize().get(), properties.metricsRollingPercentileEnabled());
this.percentileShardSize = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds().get(), properties.metricsRollingPercentileWindowBuckets().get(), properties.metricsRollingPercentileBucketSize().get(), properties.metricsRollingPercentileEnabled());
}

/**
Expand All @@ -120,13 +112,11 @@ public HystrixCollapserProperties getProperties() {
* Percentile such as 50, 99, or 99.5.
* @return batch size
*/
public int getBatchSizePercentile(double percentile) {
return percentileBatchSize.getPercentile(percentile);
}
public abstract int getBatchSizePercentile(double percentile);

public int getBatchSizeMean() {
return percentileBatchSize.getMean();
}
public abstract int getBatchSizeMean();

protected abstract void addBatchSize(int batchSize);

/**
* Retrieve the shard size for the {@link HystrixCollapser} being invoked at a given percentile.
Expand All @@ -137,39 +127,26 @@ public int getBatchSizeMean() {
* Percentile such as 50, 99, or 99.5.
* @return batch size
*/
public int getShardSizePercentile(double percentile) {
return percentileShardSize.getPercentile(percentile);
}
public abstract int getShardSizePercentile(double percentile);

public int getShardSizeMean() {
return percentileShardSize.getMean();
}
public abstract int getShardSizeMean();

protected abstract void addShardSize(int shardSize);

public void markRequestBatched() {
counter.increment(HystrixRollingNumberEvent.COLLAPSER_REQUEST_BATCHED);
addEvent(HystrixRollingNumberEvent.COLLAPSER_REQUEST_BATCHED);
}

public void markResponseFromCache() {
counter.increment(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE);
addEvent(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE);
}

public void markBatch(int batchSize) {
percentileBatchSize.addValue(batchSize);
counter.increment(HystrixRollingNumberEvent.COLLAPSER_BATCH);
addBatchSize(batchSize);
addEvent(HystrixRollingNumberEvent.COLLAPSER_BATCH);
}

public void markShards(int numShards) {
percentileShardSize.addValue(numShards);
}


@Override
public long getCumulativeCount(HystrixRollingNumberEvent event) {
return counter.getCumulativeSum(event);
}

@Override
public long getRollingCount(HystrixRollingNumberEvent event) {
return counter.getRollingSum(event);
addShardSize(numShards);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.netflix.hystrix.strategy.metrics.HystrixMetricsCollection;
Expand All @@ -33,8 +34,7 @@
*/
public abstract class HystrixCommandMetrics extends HystrixMetrics {

/* strategy: HystrixMetricsCollection */
protected HystrixMetricsCollection metricsCollection;
private final AtomicInteger concurrentExecutionCount = new AtomicInteger();

// String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly)
private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<String, HystrixCommandMetrics>();
Expand Down Expand Up @@ -131,7 +131,7 @@ public static Collection<HystrixCommandMetrics> getInstances() {
private final HystrixThreadPoolKey threadPoolKey;
private final HystrixEventNotifier eventNotifier;

/* package */HystrixCommandMetrics(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {
protected HystrixCommandMetrics(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {
this.key = key;
this.group = commandGroup;
this.threadPoolKey = threadPoolKey;
Expand Down Expand Up @@ -230,16 +230,31 @@ public HystrixCommandProperties getProperties() {

/**
* Current number of concurrent executions of {@link HystrixCommand#run()};
*
*
* @return int
*/
public abstract int getCurrentConcurrentExecutionCount();
public int getCurrentConcurrentExecutionCount() {
return concurrentExecutionCount.get();
}

protected abstract void addEvent(HystrixRollingNumberEvent event);
/**
* Increment concurrent requests counter.
*/
/* package */ void incrementConcurrentExecutionCount() {
int numConcurrent = concurrentExecutionCount.incrementAndGet();
updateRollingMax(HystrixRollingNumberEvent.COMMAND_MAX_ACTIVE, (long) numConcurrent);
}

protected abstract void addEventWithValue(HystrixRollingNumberEvent event, int count);
/**
* Decrement concurrent requests counter.
*/
/* package */ void decrementConcurrentExecutionCount() {
concurrentExecutionCount.decrementAndGet();
}

protected abstract long getRollingSum(HystrixRollingNumberEvent event);
public long getRollingMaxConcurrentExecutions() {
return getRollingMax(HystrixRollingNumberEvent.COMMAND_MAX_ACTIVE);
}

/**
* When a {@link HystrixCommand} successfully completes it will call this method to report its success along with how long the execution took.
Expand Down Expand Up @@ -373,32 +388,26 @@ public HystrixCommandProperties getProperties() {
addEvent(HystrixRollingNumberEvent.FALLBACK_EMIT);
}


/**
* Increment concurrent requests counter.
*/
/* package */ abstract void incrementConcurrentExecutionCount();

/**
* Decrement concurrent requests counter.
*/
/* package */ abstract void decrementConcurrentExecutionCount();

public abstract long getRollingMaxConcurrentExecutions();


/**
* Execution time of {@link HystrixCommand#run()}.
*/
/* package */abstract void addCommandExecutionTime(long duration);
protected abstract void addCommandExecutionTime(long duration);

/**
* Complete execution time of {@link HystrixCommand#execute()} or {@link HystrixCommand#queue()} (queue is considered complete once the work is finished and {@link Future#get} is capable of
* retrieving the value.
* <p>
* This differs from {@link #addCommandExecutionTime} in that this covers all of the threading and scheduling overhead, not just the execution of the {@link HystrixCommand#run()} method.
*/
/* package */abstract void addUserThreadExecutionTime(long duration);
protected abstract void addUserThreadExecutionTime(long duration);

protected abstract void clear();

/* package */ void resetCounter() {
clear();
lastHealthCountsSnapshot.set(System.currentTimeMillis());
healthCountsSnapshot = new HealthCounts(0, 0, 0);
}

private volatile HealthCounts healthCountsSnapshot = new HealthCounts(0, 0, 0);
private volatile AtomicLong lastHealthCountsSnapshot = new AtomicLong(System.currentTimeMillis());
Expand All @@ -417,12 +426,12 @@ public final HealthCounts getHealthCounts() {
if (lastHealthCountsSnapshot.compareAndSet(lastTime, currentTime)) {
// our thread won setting the snapshot time so we will proceed with generating a new snapshot
// losing threads will continue using the old snapshot
long success = getRollingSum(HystrixRollingNumberEvent.SUCCESS);
long failure = getRollingSum(HystrixRollingNumberEvent.FAILURE); // fallbacks occur on this
long timeout = getRollingSum(HystrixRollingNumberEvent.TIMEOUT); // fallbacks occur on this
long threadPoolRejected = getRollingSum(HystrixRollingNumberEvent.THREAD_POOL_REJECTED); // fallbacks occur on this
long semaphoreRejected = getRollingSum(HystrixRollingNumberEvent.SEMAPHORE_REJECTED); // fallbacks occur on this
long shortCircuited = getRollingSum(HystrixRollingNumberEvent.SHORT_CIRCUITED); // fallbacks occur on this
long success = getRollingCount(HystrixRollingNumberEvent.SUCCESS);
long failure = getRollingCount(HystrixRollingNumberEvent.FAILURE); // fallbacks occur on this
long timeout = getRollingCount(HystrixRollingNumberEvent.TIMEOUT); // fallbacks occur on this
long threadPoolRejected = getRollingCount(HystrixRollingNumberEvent.THREAD_POOL_REJECTED); // fallbacks occur on this
long semaphoreRejected = getRollingCount(HystrixRollingNumberEvent.SEMAPHORE_REJECTED); // fallbacks occur on this
long shortCircuited = getRollingCount(HystrixRollingNumberEvent.SHORT_CIRCUITED); // fallbacks occur on this
long totalCount = failure + success + timeout + threadPoolRejected + shortCircuited + semaphoreRejected;
long errorCount = failure + timeout + threadPoolRejected + shortCircuited + semaphoreRejected;
int errorPercentage = 0;
Expand All @@ -437,14 +446,6 @@ public final HealthCounts getHealthCounts() {
return healthCountsSnapshot;
}

/* package */ abstract void clear();

/* package */ void resetCounter() {
clear();
lastHealthCountsSnapshot.set(System.currentTimeMillis());
healthCountsSnapshot = new HealthCounts(0, 0, 0);
}

/**
* Number of requests during rolling window.
* Number that failed (failure + success + timeout + threadPoolRejected + shortCircuited + semaphoreRejected).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,12 @@ public abstract class HystrixMetrics {
* @return long rolling count
*/
public abstract long getRollingCount(HystrixRollingNumberEvent event);

protected abstract void addEvent(HystrixRollingNumberEvent event);

protected abstract void addEventWithValue(HystrixRollingNumberEvent event, long value);

protected abstract void updateRollingMax(HystrixRollingNumberEvent event, long value);

protected abstract long getRollingMax(HystrixRollingNumberEvent event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;

import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,12 +32,7 @@
/**
* Used by {@link HystrixThreadPool} to record metrics.
*/
public class HystrixThreadPoolMetrics extends HystrixMetrics {

private final HystrixRollingNumber counter;

@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolMetrics.class);
public abstract class HystrixThreadPoolMetrics extends HystrixMetrics {

// String is HystrixThreadPoolKey.name() (we can't use HystrixThreadPoolKey directly as we can't guarantee it implements hashcode/equals correctly)
private static final ConcurrentHashMap<String, HystrixThreadPoolMetrics> metrics = new ConcurrentHashMap<String, HystrixThreadPoolMetrics>();
Expand All @@ -60,7 +57,8 @@ public static HystrixThreadPoolMetrics getInstance(HystrixThreadPoolKey key, Thr
return threadPoolMetrics;
}
// it doesn't exist so we need to create it
threadPoolMetrics = new HystrixThreadPoolMetrics(key, threadPool, properties);
HystrixMetricsCollection metricsCollectionStrategy = HystrixPlugins.getInstance().getMetricsCollection();
threadPoolMetrics = metricsCollectionStrategy.getThreadPoolMetricsInstance(key, threadPool, properties);
// attempt to store it (race other threads)
HystrixThreadPoolMetrics existing = metrics.putIfAbsent(key.name(), threadPoolMetrics);
if (existing == null) {
Expand Down Expand Up @@ -104,8 +102,7 @@ public static Collection<HystrixThreadPoolMetrics> getInstances() {
private final ThreadPoolExecutor threadPool;
private final HystrixThreadPoolProperties properties;

private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolExecutor threadPool, HystrixThreadPoolProperties properties) {
this.counter = new HystrixRollingNumber(properties.metricsRollingStatisticalWindowInMilliseconds().get(), properties.metricsRollingStatisticalWindowBuckets().get());
protected HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolExecutor threadPool, HystrixThreadPoolProperties properties) {
this.threadPoolKey = threadPoolKey;
this.threadPool = threadPool;
this.properties = properties;
Expand Down Expand Up @@ -215,7 +212,7 @@ public Number getCurrentQueueSize() {
*/
public void markThreadExecution() {
// increment the count
counter.increment(HystrixRollingNumberEvent.THREAD_EXECUTION);
addEvent(HystrixRollingNumberEvent.THREAD_EXECUTION);
setMaxActiveThreads();
}

Expand Down Expand Up @@ -254,27 +251,17 @@ public void markThreadCompletion() {
* @return rolling max active threads
*/
public long getRollingMaxActiveThreads() {
return counter.getRollingMaxValue(HystrixRollingNumberEvent.THREAD_MAX_ACTIVE);
return getRollingMax(HystrixRollingNumberEvent.THREAD_MAX_ACTIVE);
}

private void setMaxActiveThreads() {
counter.updateRollingMax(HystrixRollingNumberEvent.THREAD_MAX_ACTIVE, threadPool.getActiveCount());
updateRollingMax(HystrixRollingNumberEvent.THREAD_MAX_ACTIVE, threadPool.getActiveCount());
}

/**
* Invoked each time a command is rejected from the thread-pool
*/
public void markThreadRejection() {
counter.increment(HystrixRollingNumberEvent.THREAD_POOL_REJECTED);
}

@Override
public long getCumulativeCount(HystrixRollingNumberEvent event) {
return 0;
}

@Override
public long getRollingCount(HystrixRollingNumberEvent event) {
return 0;
addEvent(HystrixRollingNumberEvent.THREAD_POOL_REJECTED);
}
}
Loading

0 comments on commit a46b6e9

Please sign in to comment.