Skip to content

Commit

Permalink
Fix incorrect downsampling unit
Browse files Browse the repository at this point in the history
The bug incorrectly assumed the downsampling unit was second while computing
start and end time of HBase scan.
- Fixed the math.
- Suffixed the field name with _ms.
- Added unit tests to address the bug.
  * downsample and downsampleMilliseconds.
- Added a nested helper class for unit tests to access private methods.
- Factored out the unit tests related to downsampling from the huge
  TestTsdbQuery.java. It was too big for Eclipse to run.

Signed-off-by: Chris Larsen <clarsen@euphoriaaudio.com>
  • Loading branch information
jesse5e authored and manolama committed Apr 24, 2014
1 parent 1b5691e commit efb8186
Show file tree
Hide file tree
Showing 4 changed files with 557 additions and 259 deletions.
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ test_SRC := \
test/core/TestSpan.java \
test/core/TestTags.java \
test/core/TestTSDB.java \
test/core/TestTsdbQueryDownsample.java \
test/core/TestTsdbQuery.java \
test/core/TestTSQuery.java \
test/core/TestTSSubQuery.java \
Expand Down
55 changes: 38 additions & 17 deletions src/core/TsdbQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.hbase.async.KeyValue;
import org.hbase.async.Scanner;

import com.google.common.annotations.VisibleForTesting;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;

Expand Down Expand Up @@ -111,12 +112,12 @@ final class TsdbQuery implements Query {

/**
* Downsampling function to use, if any (can be {@code null}).
* If this is non-null, {@code sample_interval} must be strictly positive.
* If this is non-null, {@code sample_interval_ms} must be strictly positive.
*/
private Aggregator downsampler;

/** Minimum time interval (in seconds) wanted between each data point. */
private long sample_interval;
/** Minimum time interval (in milliseconds) wanted between each data point. */
private long sample_interval_ms;

/** Optional list of TSUIDs to fetch and aggregate instead of a metric */
private List<String> tsuids;
Expand Down Expand Up @@ -253,7 +254,7 @@ public void downsample(final long interval, final Aggregator downsampler) {
throw new IllegalArgumentException("interval not > 0: " + interval);
}
this.downsampler = downsampler;
this.sample_interval = interval;
this.sample_interval_ms = interval;
}

/**
Expand Down Expand Up @@ -448,12 +449,12 @@ public DataPoints[] call(final TreeMap<byte[], Span> spans) throws Exception {
// We haven't been asked to find groups, so let's put all the spans
// together in the same group.
final SpanGroup group = new SpanGroup(tsdb,
getScanStartTime(),
getScanEndTime(),
getScanStartTimeSeconds(),
getScanEndTimeSeconds(),
spans.values(),
rate, rate_options,
aggregator,
sample_interval, downsampler);
sample_interval_ms, downsampler);
return new SpanGroup[] { group };
}

Expand Down Expand Up @@ -494,9 +495,10 @@ public DataPoints[] call(final TreeMap<byte[], Span> spans) throws Exception {
//LOG.info("Span belongs to group " + Arrays.toString(group) + ": " + Arrays.toString(row));
SpanGroup thegroup = groups.get(group);
if (thegroup == null) {
thegroup = new SpanGroup(tsdb, getScanStartTime(), getScanEndTime(),
thegroup = new SpanGroup(tsdb, getScanStartTimeSeconds(),
getScanEndTimeSeconds(),
null, rate, rate_options, aggregator,
sample_interval, downsampler);
sample_interval_ms, downsampler);
// Copy the array because we're going to keep `group' and overwrite
// its contents. So we want the collection to have an immutable copy.
final byte[] group_copy = new byte[group.length];
Expand Down Expand Up @@ -530,10 +532,10 @@ protected Scanner getScanner() throws HBaseException {
// rely on having a few extra data points before & after the exact start
// & end dates in order to do proper rate calculation or downsampling near
// the "edges" of the graph.
Bytes.setInt(start_row, (int) getScanStartTime(), metric_width);
Bytes.setInt(start_row, (int) getScanStartTimeSeconds(), metric_width);
Bytes.setInt(end_row, (end_time == UNSET
? -1 // Will scan until the end (0xFFF...).
: (int) getScanEndTime()),
: (int) getScanEndTimeSeconds()),
metric_width);

// set the metric UID based on the TSUIDs if given, or the metric UID
Expand Down Expand Up @@ -561,7 +563,7 @@ protected Scanner getScanner() throws HBaseException {
}

/** Returns the UNIX timestamp from which we must start scanning. */
private long getScanStartTime() {
private long getScanStartTimeSeconds() {
// The reason we look before by `MAX_TIMESPAN * 2' seconds is because of
// the following. Let's assume MAX_TIMESPAN = 600 (10 minutes) and the
// start_time = ... 12:31:00. If we initialize the scanner to look
Expand All @@ -572,32 +574,32 @@ private long getScanStartTime() {
// look back by twice MAX_TIMESPAN. Only when start_time is aligned on a
// MAX_TIMESPAN boundary then we'll mistakenly scan back by an extra row,
// but this doesn't really matter.
// Additionally, in case our sample_interval is large, we need to look
// Additionally, in case our sample_interval_ms is large, we need to look
// even further before/after, so use that too.
long start = getStartTime();
// down cast to seconds if we have a query in ms
if ((start & Const.SECOND_MASK) != 0) {
start /= 1000;
}
final long ts = start - Const.MAX_TIMESPAN * 2 - sample_interval;
final long ts = start - Const.MAX_TIMESPAN * 2 - sample_interval_ms / 1000;
return ts > 0 ? ts : 0;
}

/** Returns the UNIX timestamp at which we must stop scanning. */
private long getScanEndTime() {
private long getScanEndTimeSeconds() {
// For the end_time, we have a different problem. For instance if our
// end_time = ... 12:30:00, we'll stop scanning when we get to 12:40, but
// once again we wanna try to look ahead one more row, so to avoid this
// problem we always add 1 second to the end_time. Only when the end_time
// is of the form HH:59:59 then we will scan ahead an extra row, but once
// again that doesn't really matter.
// Additionally, in case our sample_interval is large, we need to look
// Additionally, in case our sample_interval_ms is large, we need to look
// even further before/after, so use that too.
long end = getEndTime();
if ((end & Const.SECOND_MASK) != 0) {
end /= 1000;
}
return end + Const.MAX_TIMESPAN + 1 + sample_interval;
return end + Const.MAX_TIMESPAN + 1 + sample_interval_ms / 1000;
}

/**
Expand Down Expand Up @@ -856,4 +858,23 @@ public int compare(final byte[] a, final byte[] b) {

}

/** Helps unit tests inspect private methods. */
@VisibleForTesting
static class ForTesting {

/** @return the start time of the HBase scan for unit tests. */
static long getScanStartTimeSeconds(TsdbQuery query) {
return query.getScanStartTimeSeconds();
}

/** @return the end time of the HBase scan for unit tests. */
static long getScanEndTimeSeconds(TsdbQuery query) {
return query.getScanEndTimeSeconds();
}

/** @return the downsampling interval for unit tests. */
static long getDownsampleIntervalMs(TsdbQuery query) {
return query.sample_interval_ms;
}
}
}
Loading

0 comments on commit efb8186

Please sign in to comment.