Skip to content

Commit

Permalink
Merge pull request #779 from mattrjacobs/rebased-hdr-histogram
Browse files Browse the repository at this point in the history
Use HdrHistogram for latency percentile calculation
  • Loading branch information
mattrjacobs committed Apr 29, 2015
2 parents e7c9e98 + e3ea425 commit 89c2569
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 161 deletions.
2 changes: 1 addition & 1 deletion hystrix-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ dependencies {
compile 'com.netflix.archaius:archaius-core:0.4.1'
compile 'io.reactivex:rxjava:1.0.9'
compile 'org.slf4j:slf4j-api:1.7.0'
compile 'org.hdrhistogram:HdrHistogram:2.1.4'
testCompile 'junit:junit-dep:4.10'
}


javadoc {
// the exclude isn't working, nor is there a subPackages options as docs suggest there should be
// we do not want the com.netflix.hystrix.util package include
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public static Collection<HystrixCollapserMetrics> getInstances() {
this.key = key;
this.properties = properties;

this.percentileBatchSize = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileBucketSize(), properties.metricsRollingPercentileEnabled());
this.percentileShardSize = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileBucketSize(), properties.metricsRollingPercentileEnabled());
this.percentileBatchSize = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileEnabled());
this.percentileShardSize = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileEnabled());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ public static Collection<HystrixCommandMetrics> getInstances() {
this.group = commandGroup;
this.threadPoolKey = threadPoolKey;
this.properties = properties;
this.percentileExecution = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileBucketSize(), properties.metricsRollingPercentileEnabled());
this.percentileTotal = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileBucketSize(), properties.metricsRollingPercentileEnabled());
this.percentileExecution = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileEnabled());
this.percentileTotal = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileEnabled());
this.eventNotifier = eventNotifier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReentrantLock;

import org.HdrHistogram.IntCountsHistogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -51,13 +52,12 @@ public class HystrixRollingPercentile {
/* package for testing */ final BucketCircularArray buckets;
private final HystrixProperty<Integer> timeInMilliseconds;
private final HystrixProperty<Integer> numberOfBuckets;
private final HystrixProperty<Integer> bucketDataLength;
private final HystrixProperty<Boolean> enabled;

/*
* This will get flipped each time a new bucket is created.
*/
/* package for testing */ volatile PercentileSnapshot currentPercentileSnapshot = new PercentileSnapshot(0);
/* package for testing */ volatile PercentileSnapshot currentPercentileSnapshot = new PercentileSnapshot();

/**
*
Expand All @@ -69,25 +69,27 @@ public class HystrixRollingPercentile {
* {@code HystrixProperty<Integer>} for number of buckets that the time window should be divided into
* <p>
* Example: 12 for 5 second buckets in a 1 minute window
* @param bucketDataLength
* {@code HystrixProperty<Integer>} for number of values stored in each bucket
* <p>
* Example: 1000 to store a max of 1000 values in each 5 second bucket
* @param enabled
* {@code HystrixProperty<Boolean>} whether data should be tracked and percentiles calculated.
* <p>
* If 'false' methods will do nothing.
*/
public HystrixRollingPercentile(HystrixProperty<Integer> timeInMilliseconds, HystrixProperty<Integer> numberOfBuckets, HystrixProperty<Integer> bucketDataLength, HystrixProperty<Boolean> enabled) {
this(ACTUAL_TIME, timeInMilliseconds, numberOfBuckets, bucketDataLength, enabled);
public HystrixRollingPercentile(HystrixProperty<Integer> timeInMilliseconds, HystrixProperty<Integer> numberOfBuckets, HystrixProperty<Boolean> enabled) {
this(ACTUAL_TIME, timeInMilliseconds, numberOfBuckets, enabled);

}
/*
* @deprecated Use {@link HystrixRollingPercentile(HystrixProperty<Integer>, HystrixProperty<Integer>, HystrixProperty<Boolean>)} instead
*/
@Deprecated
public HystrixRollingPercentile(HystrixProperty<Integer> timeInMilliseconds, HystrixProperty<Integer> numberOfBuckets, HystrixProperty<Integer> bucketDataLength, HystrixProperty<Boolean> enabled) {
this(ACTUAL_TIME, timeInMilliseconds, numberOfBuckets, enabled);
}

/* package for testing */ HystrixRollingPercentile(Time time, HystrixProperty<Integer> timeInMilliseconds, HystrixProperty<Integer> numberOfBuckets, HystrixProperty<Integer> bucketDataLength, HystrixProperty<Boolean> enabled) {
/* package for testing */ HystrixRollingPercentile(Time time, HystrixProperty<Integer> timeInMilliseconds, HystrixProperty<Integer> numberOfBuckets, HystrixProperty<Boolean> enabled) {
this.time = time;
this.timeInMilliseconds = timeInMilliseconds;
this.numberOfBuckets = numberOfBuckets;
this.bucketDataLength = bucketDataLength;
this.enabled = enabled;

if (this.timeInMilliseconds.get() % this.numberOfBuckets.get() != 0) {
Expand Down Expand Up @@ -218,7 +220,7 @@ private Bucket getCurrentBucket() {
try {
if (buckets.peekLast() == null) {
// the list is empty so create the first bucket
Bucket newBucket = new Bucket(currentTime, bucketDataLength.get());
Bucket newBucket = new Bucket(currentTime);
buckets.addLast(newBucket);
return newBucket;
} else {
Expand All @@ -240,7 +242,7 @@ private Bucket getCurrentBucket() {
} else { // we're past the window so we need to create a new bucket
Bucket[] allBuckets = buckets.getArray();
// create a new bucket and add it as the new 'last' (once this is done other threads will start using it on subsequent retrievals)
buckets.addLast(new Bucket(lastBucket.windowStart + getBucketSizeInMilliseconds(), bucketDataLength.get()));
buckets.addLast(new Bucket(lastBucket.windowStart + getBucketSizeInMilliseconds()));
// we created a new bucket so let's re-generate the PercentileSnapshot (not including the new bucket)
currentPercentileSnapshot = new PercentileSnapshot(allBuckets);
}
Expand Down Expand Up @@ -282,135 +284,60 @@ public void reset() {
}

private static class PercentileBucketData {
private final int length;
private final AtomicIntegerArray list;
private final AtomicInteger index = new AtomicInteger();
private final IntCountsHistogram histogram;

public PercentileBucketData(int dataLength) {
this.length = dataLength;
this.list = new AtomicIntegerArray(dataLength);
public PercentileBucketData() {
this.histogram = new IntCountsHistogram(3);
}

public void addValue(int... latency) {
for (int l : latency) {
/* We just wrap around the beginning and over-write if we go past 'dataLength' as that will effectively cause us to "sample" the most recent data */
list.set(index.getAndIncrement() % length, l);
// TODO Alternative to AtomicInteger? The getAndIncrement may be a source of contention on high throughput circuits on large multi-core systems.
// LongAdder isn't suited to this as it is not consistent. Perhaps a different data structure that doesn't need indexed adds?
// A threadlocal data storage that only aggregates when fetched would be ideal. Similar to LongAdder except for accumulating lists of data.
for (int l: latency) {
histogram.recordValue(l);
}
}

public int length() {
if (index.get() > list.length()) {
return list.length();
} else {
return index.get();
}
return (int) histogram.getTotalCount();
}

}

/**
* @NotThreadSafe
*/
/* package for testing */ static class PercentileSnapshot {
private final int[] data;
private final int length;
private int mean;

/* package for testing */ PercentileSnapshot(Bucket[] buckets) {
int lengthFromBuckets = 0;
// we need to calculate it dynamically as it could have been changed by properties (rare, but possible)
// also this way we capture the actual index size rather than the max so size the int[] to only what we need
for (Bucket bd : buckets) {
lengthFromBuckets += bd.data.length;
}
data = new int[lengthFromBuckets];
int index = 0;
int sum = 0;
for (Bucket bd : buckets) {
PercentileBucketData pbd = bd.data;
int length = pbd.length();
for (int i = 0; i < length; i++) {
int v = pbd.list.get(i);
this.data[index++] = v;
sum += v;
}
}
this.length = index;
if (this.length == 0) {
this.mean = 0;
} else {
this.mean = sum / this.length;
}
private final IntCountsHistogram aggregateHistogram;

Arrays.sort(this.data, 0, length);
/* package for testing */ PercentileSnapshot() {
this(new Bucket[0]);
}

/* package for testing */ PercentileSnapshot(int... data) {
this.data = data;
this.length = data.length;

int sum = 0;
for (int v : data) {
sum += v;
aggregateHistogram = new IntCountsHistogram(4);
for (int latency: data) {
aggregateHistogram.recordValue(latency);
}
this.mean = sum / this.length;
}

Arrays.sort(this.data, 0, length);
/* package for testing */ PercentileSnapshot(Bucket[] buckets) {
aggregateHistogram = new IntCountsHistogram(4);
for (Bucket bucket: buckets) {
aggregateHistogram.add(bucket.data.histogram);
}
}

/* package for testing */ int getMean() {
return mean;
return (int) aggregateHistogram.getMean();
}

/**
* Provides percentile computation.
*/
public int getPercentile(double percentile) {
if (length == 0) {
if (aggregateHistogram.getTotalCount() == 0) {
return 0;
}
return computePercentile(percentile);
return (int) aggregateHistogram.getValueAtPercentile(percentile);
}

/**
* @see <a href="http://en.wikipedia.org/wiki/Percentile">Percentile (Wikipedia)</a>
* @see <a href="http://cnx.org/content/m10805/latest/">Percentile</a>
*
* @param percent percentile of data desired
* @return data at the asked-for percentile. Interpolation is used if exactness is not possible
*/
private int computePercentile(double percent) {
// Some just-in-case edge cases
if (length <= 0) {
return 0;
} else if (percent <= 0.0) {
return data[0];
} else if (percent >= 100.0) {
return data[length - 1];
}

// ranking (http://en.wikipedia.org/wiki/Percentile#Alternative_methods)
double rank = (percent / 100.0) * length;

// linear interpolation between closest ranks
int iLow = (int) Math.floor(rank);
int iHigh = (int) Math.ceil(rank);
assert 0 <= iLow && iLow <= rank && rank <= iHigh && iHigh <= length;
assert (iHigh - iLow) <= 1;
if (iHigh >= length) {
// Another edge case
return data[length - 1];
} else if (iLow == iHigh) {
return data[iLow];
} else {
// Interpolate between the two bounding values
return (int) (data[iLow] + (rank - iLow) * (data[iHigh] - data[iLow]));
}
}

}

/**
Expand Down Expand Up @@ -596,9 +523,9 @@ private Bucket[] getArray() {
final long windowStart;
final PercentileBucketData data;

Bucket(long startTime, int bucketDataLength) {
Bucket(long startTime) {
this.windowStart = startTime;
this.data = new PercentileBucketData(bucketDataLength);
this.data = new PercentileBucketData();
}

}
Expand Down
Loading

0 comments on commit 89c2569

Please sign in to comment.