diff --git a/Makefile.am b/Makefile.am index 83e76f81d5..861a373321 100644 --- a/Makefile.am +++ b/Makefile.am @@ -136,6 +136,7 @@ test_SRC := \ test/core/TestSpan.java \ test/core/TestTags.java \ test/core/TestTSDB.java \ + test/core/TestTsdbQueryDownsample.java \ test/core/TestTsdbQuery.java \ test/core/TestTSQuery.java \ test/core/TestTSSubQuery.java \ diff --git a/src/core/TsdbQuery.java b/src/core/TsdbQuery.java index 1e06ab1a1e..9f95a3a942 100644 --- a/src/core/TsdbQuery.java +++ b/src/core/TsdbQuery.java @@ -29,6 +29,7 @@ import org.hbase.async.KeyValue; import org.hbase.async.Scanner; +import com.google.common.annotations.VisibleForTesting; import com.stumbleupon.async.Callback; import com.stumbleupon.async.Deferred; @@ -111,12 +112,12 @@ final class TsdbQuery implements Query { /** * Downsampling function to use, if any (can be {@code null}). - * If this is non-null, {@code sample_interval} must be strictly positive. + * If this is non-null, {@code sample_interval_ms} must be strictly positive. */ private Aggregator downsampler; - /** Minimum time interval (in seconds) wanted between each data point. */ - private long sample_interval; + /** Minimum time interval (in milliseconds) wanted between each data point. */ + private long sample_interval_ms; /** Optional list of TSUIDs to fetch and aggregate instead of a metric */ private List tsuids; @@ -253,7 +254,7 @@ public void downsample(final long interval, final Aggregator downsampler) { throw new IllegalArgumentException("interval not > 0: " + interval); } this.downsampler = downsampler; - this.sample_interval = interval; + this.sample_interval_ms = interval; } /** @@ -448,12 +449,12 @@ public DataPoints[] call(final TreeMap spans) throws Exception { // We haven't been asked to find groups, so let's put all the spans // together in the same group. final SpanGroup group = new SpanGroup(tsdb, - getScanStartTime(), - getScanEndTime(), + getScanStartTimeSeconds(), + getScanEndTimeSeconds(), spans.values(), rate, rate_options, aggregator, - sample_interval, downsampler); + sample_interval_ms, downsampler); return new SpanGroup[] { group }; } @@ -494,9 +495,10 @@ public DataPoints[] call(final TreeMap spans) throws Exception { //LOG.info("Span belongs to group " + Arrays.toString(group) + ": " + Arrays.toString(row)); SpanGroup thegroup = groups.get(group); if (thegroup == null) { - thegroup = new SpanGroup(tsdb, getScanStartTime(), getScanEndTime(), + thegroup = new SpanGroup(tsdb, getScanStartTimeSeconds(), + getScanEndTimeSeconds(), null, rate, rate_options, aggregator, - sample_interval, downsampler); + sample_interval_ms, downsampler); // Copy the array because we're going to keep `group' and overwrite // its contents. So we want the collection to have an immutable copy. final byte[] group_copy = new byte[group.length]; @@ -530,10 +532,10 @@ protected Scanner getScanner() throws HBaseException { // rely on having a few extra data points before & after the exact start // & end dates in order to do proper rate calculation or downsampling near // the "edges" of the graph. - Bytes.setInt(start_row, (int) getScanStartTime(), metric_width); + Bytes.setInt(start_row, (int) getScanStartTimeSeconds(), metric_width); Bytes.setInt(end_row, (end_time == UNSET ? -1 // Will scan until the end (0xFFF...). - : (int) getScanEndTime()), + : (int) getScanEndTimeSeconds()), metric_width); // set the metric UID based on the TSUIDs if given, or the metric UID @@ -561,7 +563,7 @@ protected Scanner getScanner() throws HBaseException { } /** Returns the UNIX timestamp from which we must start scanning. */ - private long getScanStartTime() { + private long getScanStartTimeSeconds() { // The reason we look before by `MAX_TIMESPAN * 2' seconds is because of // the following. Let's assume MAX_TIMESPAN = 600 (10 minutes) and the // start_time = ... 12:31:00. If we initialize the scanner to look @@ -572,32 +574,32 @@ private long getScanStartTime() { // look back by twice MAX_TIMESPAN. Only when start_time is aligned on a // MAX_TIMESPAN boundary then we'll mistakenly scan back by an extra row, // but this doesn't really matter. - // Additionally, in case our sample_interval is large, we need to look + // Additionally, in case our sample_interval_ms is large, we need to look // even further before/after, so use that too. long start = getStartTime(); // down cast to seconds if we have a query in ms if ((start & Const.SECOND_MASK) != 0) { start /= 1000; } - final long ts = start - Const.MAX_TIMESPAN * 2 - sample_interval; + final long ts = start - Const.MAX_TIMESPAN * 2 - sample_interval_ms / 1000; return ts > 0 ? ts : 0; } /** Returns the UNIX timestamp at which we must stop scanning. */ - private long getScanEndTime() { + private long getScanEndTimeSeconds() { // For the end_time, we have a different problem. For instance if our // end_time = ... 12:30:00, we'll stop scanning when we get to 12:40, but // once again we wanna try to look ahead one more row, so to avoid this // problem we always add 1 second to the end_time. Only when the end_time // is of the form HH:59:59 then we will scan ahead an extra row, but once // again that doesn't really matter. - // Additionally, in case our sample_interval is large, we need to look + // Additionally, in case our sample_interval_ms is large, we need to look // even further before/after, so use that too. long end = getEndTime(); if ((end & Const.SECOND_MASK) != 0) { end /= 1000; } - return end + Const.MAX_TIMESPAN + 1 + sample_interval; + return end + Const.MAX_TIMESPAN + 1 + sample_interval_ms / 1000; } /** @@ -856,4 +858,23 @@ public int compare(final byte[] a, final byte[] b) { } + /** Helps unit tests inspect private methods. */ + @VisibleForTesting + static class ForTesting { + + /** @return the start time of the HBase scan for unit tests. */ + static long getScanStartTimeSeconds(TsdbQuery query) { + return query.getScanStartTimeSeconds(); + } + + /** @return the end time of the HBase scan for unit tests. */ + static long getScanEndTimeSeconds(TsdbQuery query) { + return query.getScanEndTimeSeconds(); + } + + /** @return the downsampling interval for unit tests. */ + static long getDownsampleIntervalMs(TsdbQuery query) { + return query.sample_interval_ms; + } + } } diff --git a/test/core/TestTsdbQuery.java b/test/core/TestTsdbQuery.java index 1575a211e0..0704445bf9 100644 --- a/test/core/TestTsdbQuery.java +++ b/test/core/TestTsdbQuery.java @@ -285,22 +285,6 @@ public void setTimeSeriesTSDifferentMetrics() throws Exception { query.setTimeSeries(tsuids, Aggregators.SUM, false); } - @Test - public void downsample() throws Exception { - query.downsample(60, Aggregators.SUM); - assertNotNull(query); - } - - @Test (expected = NullPointerException.class) - public void downsampleNullAgg() throws Exception { - query.downsample(60, null); - } - - @Test (expected = IllegalArgumentException.class) - public void downsampleInvalidInterval() throws Exception { - query.downsample(0, Aggregators.SUM); - } - @Test public void runLongSingleTS() throws Exception { storeLongTimeSeriesSeconds(true, false);; @@ -481,139 +465,6 @@ public void runLongSingleTSRateMs() throws Exception { } assertEquals(299, dps[0].size()); } - - @Test - public void runLongSingleTSDownsample() throws Exception { - storeLongTimeSeriesSeconds(true, false);; - HashMap tags = new HashMap(1); - tags.put("host", "web01"); - query.setStartTime(1356998400); - query.setEndTime(1357041600); - query.downsample(60000, Aggregators.AVG); - query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, false); - final DataPoints[] dps = query.run(); - assertNotNull(dps); - assertEquals("sys.cpu.user", dps[0].metricName()); - assertTrue(dps[0].getAggregatedTags().isEmpty()); - assertNull(dps[0].getAnnotations()); - assertEquals("web01", dps[0].getTags().get("host")); - - int i = 1; - for (DataPoint dp : dps[0]) { - assertEquals(i, dp.longValue()); - i += 2; - } - assertEquals(150, dps[0].size()); - } - - @Test - public void runLongSingleTSDownsampleMs() throws Exception { - storeLongTimeSeriesMs(); - HashMap tags = new HashMap(1); - tags.put("host", "web01"); - query.setStartTime(1356998400); - query.setEndTime(1357041600); - query.downsample(1000, Aggregators.AVG); - query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, false); - final DataPoints[] dps = query.run(); - assertNotNull(dps); - assertEquals("sys.cpu.user", dps[0].metricName()); - assertTrue(dps[0].getAggregatedTags().isEmpty()); - assertNull(dps[0].getAnnotations()); - assertEquals("web01", dps[0].getTags().get("host")); - - int i = 1; - for (DataPoint dp : dps[0]) { - assertEquals(i, dp.longValue()); - i += 2; - } - assertEquals(150, dps[0].size()); - } - - /** - * This test is storing > Short.MAX_VALUE data points in a single row and - * making sure the state and iterators function properly. 1.x used a short as - * we would only have a max of 3600 data points but now we can have over 4M - * so we have to index with an int and store the state in a long. - */ - @Test - public void runLongSingleTSDownsampleMsLarge() throws Exception { - setQueryStorage(); - long ts = 1356998400500L; - // mimicks having 64K data points in a row - final int limit = 64000; - final byte[] qualifier = new byte[4 * limit]; - for (int i = 0; i < limit; i++) { - System.arraycopy(Internal.buildQualifier(ts, (short) 0), 0, - qualifier, i * 4, 4); - ts += 50; - } - final byte[] values = new byte[limit + 2]; - storage.addColumn(MockBase.stringToBytes("00000150E22700000001000001"), - qualifier, values); - - HashMap tags = new HashMap(1); - tags.put("host", "web01"); - query.setStartTime(1356998400); - query.setEndTime(1357041600); - query.downsample(1000, Aggregators.AVG); - query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, false); - final DataPoints[] dps = query.run(); - assertNotNull(dps); - assertEquals("sys.cpu.user", dps[0].metricName()); - assertTrue(dps[0].getAggregatedTags().isEmpty()); - assertNull(dps[0].getAnnotations()); - assertEquals("web01", dps[0].getTags().get("host")); - - for (DataPoint dp : dps[0]) { - assertEquals(0, dp.longValue()); - } - assertEquals(3200, dps[0].size()); - } - - @Test - public void runLongSingleTSDownsampleAndRate() throws Exception { - storeLongTimeSeriesSeconds(true, false);; - HashMap tags = new HashMap(1); - tags.put("host", "web01"); - query.setStartTime(1356998400); - query.setEndTime(1357041600); - query.downsample(60000, Aggregators.AVG); - query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, true); - final DataPoints[] dps = query.run(); - assertNotNull(dps); - assertEquals("sys.cpu.user", dps[0].metricName()); - assertTrue(dps[0].getAggregatedTags().isEmpty()); - assertNull(dps[0].getAnnotations()); - assertEquals("web01", dps[0].getTags().get("host")); - - for (DataPoint dp : dps[0]) { - assertEquals(0.033F, dp.doubleValue(), 0.001); - } - assertEquals(149, dps[0].size()); - } - - @Test - public void runLongSingleTSDownsampleAndRateMs() throws Exception { - storeLongTimeSeriesMs(); - HashMap tags = new HashMap(1); - tags.put("host", "web01"); - query.setStartTime(1356998400); - query.setEndTime(1357041600); - query.downsample(1000, Aggregators.AVG); - query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, true); - final DataPoints[] dps = query.run(); - assertNotNull(dps); - assertEquals("sys.cpu.user", dps[0].metricName()); - assertTrue(dps[0].getAggregatedTags().isEmpty()); - assertNull(dps[0].getAnnotations()); - assertEquals("web01", dps[0].getTags().get("host")); - - for (DataPoint dp : dps[0]) { - assertEquals(2.0F, dp.doubleValue(), 0.001); - } - assertEquals(149, dps[0].size()); - } @Test public void runLongSingleTSCompacted() throws Exception { @@ -825,99 +676,7 @@ public void runFloatSingleTSRateMs() throws Exception { } assertEquals(299, dps[0].size()); } - - @Test - public void runFloatSingleTSDownsample() throws Exception { - storeFloatTimeSeriesSeconds(true, false); - HashMap tags = new HashMap(1); - tags.put("host", "web01"); - query.setStartTime(1356998400); - query.setEndTime(1357041600); - query.downsample(60000, Aggregators.AVG); - query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, false); - final DataPoints[] dps = query.run(); - assertNotNull(dps); - assertEquals("sys.cpu.user", dps[0].metricName()); - assertTrue(dps[0].getAggregatedTags().isEmpty()); - assertNull(dps[0].getAnnotations()); - assertEquals("web01", dps[0].getTags().get("host")); - - double i = 1.375D; - for (DataPoint dp : dps[0]) { - assertEquals(i, dp.doubleValue(), 0.00001); - i += 0.5D; - } - assertEquals(150, dps[0].size()); - } - - @Test - public void runFloatSingleTSDownsampleMs() throws Exception { - storeFloatTimeSeriesMs(); - HashMap tags = new HashMap(1); - tags.put("host", "web01"); - query.setStartTime(1356998400); - query.setEndTime(1357041600); - query.downsample(1000, Aggregators.AVG); - query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, false); - final DataPoints[] dps = query.run(); - assertNotNull(dps); - assertEquals("sys.cpu.user", dps[0].metricName()); - assertTrue(dps[0].getAggregatedTags().isEmpty()); - assertNull(dps[0].getAnnotations()); - assertEquals("web01", dps[0].getTags().get("host")); - - double i = 1.375D; - for (DataPoint dp : dps[0]) { - assertEquals(i, dp.doubleValue(), 0.00001); - i += 0.5D; - } - assertEquals(150, dps[0].size()); - } - - @Test - public void runFloatSingleTSDownsampleAndRate() throws Exception { - storeFloatTimeSeriesSeconds(true, false); - HashMap tags = new HashMap(1); - tags.put("host", "web01"); - query.setStartTime(1356998400); - query.setEndTime(1357041600); - query.downsample(60000, Aggregators.AVG); - query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, true); - final DataPoints[] dps = query.run(); - assertNotNull(dps); - assertEquals("sys.cpu.user", dps[0].metricName()); - assertTrue(dps[0].getAggregatedTags().isEmpty()); - assertNull(dps[0].getAnnotations()); - assertEquals("web01", dps[0].getTags().get("host")); - - for (DataPoint dp : dps[0]) { - assertEquals(0.00833F, dp.doubleValue(), 0.00001); - } - assertEquals(149, dps[0].size()); - } - - @Test - public void runFloatSingleTSDownsampleAndRateMs() throws Exception { - storeFloatTimeSeriesMs(); - HashMap tags = new HashMap(1); - tags.put("host", "web01"); - query.setStartTime(1356998400); - query.setEndTime(1357041600); - query.downsample(1000, Aggregators.AVG); - query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, true); - final DataPoints[] dps = query.run(); - assertNotNull(dps); - assertEquals("sys.cpu.user", dps[0].metricName()); - assertTrue(dps[0].getAggregatedTags().isEmpty()); - assertNull(dps[0].getAnnotations()); - assertEquals("web01", dps[0].getTags().get("host")); - - for (DataPoint dp : dps[0]) { - assertEquals(0.5F, dp.doubleValue(), 0.00001); - } - assertEquals(149, dps[0].size()); - } - + @Test public void runFloatSingleTSCompacted() throws Exception { storeFloatCompactions(); diff --git a/test/core/TestTsdbQueryDownsample.java b/test/core/TestTsdbQueryDownsample.java new file mode 100644 index 0000000000..f93cdaaf96 --- /dev/null +++ b/test/core/TestTsdbQueryDownsample.java @@ -0,0 +1,517 @@ +// This file is part of OpenTSDB. +// Copyright (C) 2013 The OpenTSDB Authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 2.1 of the License, or (at your +// option) any later version. This program is distributed in the hope that it +// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. You should have received a copy +// of the GNU Lesser General Public License along with this program. If not, +// see . +package net.opentsdb.core; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.mock; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +import com.stumbleupon.async.Deferred; + +import net.opentsdb.meta.Annotation; +import net.opentsdb.storage.MockBase; +import net.opentsdb.uid.NoSuchUniqueName; +import net.opentsdb.uid.UniqueId; +import net.opentsdb.utils.Config; +import net.opentsdb.utils.DateTime; + +import org.apache.zookeeper.proto.DeleteRequest; +import org.hbase.async.GetRequest; +import org.hbase.async.HBaseClient; +import org.hbase.async.KeyValue; +import org.hbase.async.PutRequest; +import org.hbase.async.Scanner; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +/** + * Tests downsampling with query. + */ +@RunWith(PowerMockRunner.class) +@PowerMockIgnore({"javax.management.*", "javax.xml.*", + "ch.qos.*", "org.slf4j.*", + "com.sum.*", "org.xml.*"}) +@PrepareForTest({TSDB.class, Config.class, UniqueId.class, HBaseClient.class, + CompactionQueue.class, GetRequest.class, PutRequest.class, KeyValue.class, + Scanner.class, TsdbQuery.class, DeleteRequest.class, Annotation.class, + RowKey.class, Span.class, SpanGroup.class, IncomingDataPoints.class }) +public class TestTsdbQueryDownsample { + + private Config config; + private TSDB tsdb = null; + private HBaseClient client = mock(HBaseClient.class); + private UniqueId metrics = mock(UniqueId.class); + private UniqueId tag_names = mock(UniqueId.class); + private UniqueId tag_values = mock(UniqueId.class); + private TsdbQuery query = null; + private MockBase storage = null; + + @Before + public void before() throws Exception { + config = new Config(false); + tsdb = new TSDB(config); + query = new TsdbQuery(tsdb); + + // replace the "real" field objects with mocks + Field cl = tsdb.getClass().getDeclaredField("client"); + cl.setAccessible(true); + cl.set(tsdb, client); + + Field met = tsdb.getClass().getDeclaredField("metrics"); + met.setAccessible(true); + met.set(tsdb, metrics); + + Field tagk = tsdb.getClass().getDeclaredField("tag_names"); + tagk.setAccessible(true); + tagk.set(tsdb, tag_names); + + Field tagv = tsdb.getClass().getDeclaredField("tag_values"); + tagv.setAccessible(true); + tagv.set(tsdb, tag_values); + + // mock UniqueId + when(metrics.getId("sys.cpu.user")).thenReturn(new byte[] { 0, 0, 1 }); + when(metrics.getNameAsync(new byte[] { 0, 0, 1 })) + .thenReturn(Deferred.fromResult("sys.cpu.user")); + when(metrics.getId("sys.cpu.system")) + .thenThrow(new NoSuchUniqueName("sys.cpu.system", "metric")); + when(metrics.getId("sys.cpu.nice")).thenReturn(new byte[] { 0, 0, 2 }); + when(metrics.getNameAsync(new byte[] { 0, 0, 2 })) + .thenReturn(Deferred.fromResult("sys.cpu.nice")); + when(tag_names.getId("host")).thenReturn(new byte[] { 0, 0, 1 }); + when(tag_names.getIdAsync("host")).thenReturn( + Deferred.fromResult(new byte[] { 0, 0, 1 })); + when(tag_names.getNameAsync(new byte[] { 0, 0, 1 })) + .thenReturn(Deferred.fromResult("host")); + when(tag_names.getOrCreateIdAsync("host")).thenReturn( + Deferred.fromResult(new byte[] { 0, 0, 1 })); + when(tag_names.getIdAsync("dc")) + .thenThrow(new NoSuchUniqueName("dc", "metric")); + when(tag_values.getId("web01")).thenReturn(new byte[] { 0, 0, 1 }); + when(tag_values.getIdAsync("web01")).thenReturn( + Deferred.fromResult(new byte[] { 0, 0, 1 })); + when(tag_values.getNameAsync(new byte[] { 0, 0, 1 })) + .thenReturn(Deferred.fromResult("web01")); + when(tag_values.getOrCreateIdAsync("web01")).thenReturn( + Deferred.fromResult(new byte[] { 0, 0, 1 })); + when(tag_values.getId("web02")).thenReturn(new byte[] { 0, 0, 2 }); + when(tag_values.getIdAsync("web02")).thenReturn( + Deferred.fromResult(new byte[] { 0, 0, 2 })); + when(tag_values.getNameAsync(new byte[] { 0, 0, 2 })) + .thenReturn(Deferred.fromResult("web02")); + when(tag_values.getOrCreateIdAsync("web02")).thenReturn( + Deferred.fromResult(new byte[] { 0, 0, 2 })); + when(tag_values.getId("web03")) + .thenThrow(new NoSuchUniqueName("web03", "metric")); + + when(metrics.width()).thenReturn((short)3); + when(tag_names.width()).thenReturn((short)3); + when(tag_values.width()).thenReturn((short)3); + } + + @Test + public void downsample() throws Exception { + int downsampleInterval = (int)DateTime.parseDuration("60s"); + query.downsample(downsampleInterval, Aggregators.SUM); + query.setStartTime(1356998400); + query.setEndTime(1357041600); + assertEquals(60000, TsdbQuery.ForTesting.getDownsampleIntervalMs(query)); + long scanStartTime = 1356998400 - Const.MAX_TIMESPAN * 2 - 60; + assertEquals(scanStartTime, TsdbQuery.ForTesting.getScanStartTimeSeconds(query)); + long scanEndTime = 1357041600 + Const.MAX_TIMESPAN + 1 + 60; + assertEquals(scanEndTime, TsdbQuery.ForTesting.getScanEndTimeSeconds(query)); + } + + @Test + public void downsampleMilliseconds() throws Exception { + int downsampleInterval = (int)DateTime.parseDuration("60s"); + query.downsample(downsampleInterval, Aggregators.SUM); + query.setStartTime(1356998400000L); + query.setEndTime(1357041600000L); + assertEquals(60000, TsdbQuery.ForTesting.getDownsampleIntervalMs(query)); + long scanStartTime = 1356998400 - Const.MAX_TIMESPAN * 2 - 60; + assertEquals(scanStartTime, TsdbQuery.ForTesting.getScanStartTimeSeconds(query)); + long scanEndTime = 1357041600 + Const.MAX_TIMESPAN + 1 + 60; + assertEquals(scanEndTime, TsdbQuery.ForTesting.getScanEndTimeSeconds(query)); + } + + @Test (expected = NullPointerException.class) + public void downsampleNullAgg() throws Exception { + query.downsample(60, null); + } + + @Test (expected = IllegalArgumentException.class) + public void downsampleInvalidInterval() throws Exception { + query.downsample(0, Aggregators.SUM); + } + + @Test + public void runLongSingleTSDownsample() throws Exception { + storeLongTimeSeriesSeconds(true, false);; + HashMap tags = new HashMap(1); + tags.put("host", "web01"); + query.setStartTime(1356998400); + query.setEndTime(1357041600); + query.downsample(60000, Aggregators.AVG); + query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, false); + final DataPoints[] dps = query.run(); + assertNotNull(dps); + assertEquals("sys.cpu.user", dps[0].metricName()); + assertTrue(dps[0].getAggregatedTags().isEmpty()); + assertNull(dps[0].getAnnotations()); + assertEquals("web01", dps[0].getTags().get("host")); + + // Timeseries: (1, 2, 3, 4, ..., 299, 300) at 30-second interval timestamps. + // Timeseries in 60s intervals: (1, 2), (3, 4), ..., (299, 300) + // Integer average downsampling: 1, 3, 5, ... 297, 299 + int i = 1; + for (DataPoint dp : dps[0]) { + assertEquals(i, dp.longValue()); + i += 2; + } + assertEquals(150, dps[0].size()); + } + + @Test + public void runLongSingleTSDownsampleMs() throws Exception { + storeLongTimeSeriesMs(); + HashMap tags = new HashMap(1); + tags.put("host", "web01"); + query.setStartTime(1356998400); + query.setEndTime(1357041600); + query.downsample(1000, Aggregators.AVG); + query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, false); + final DataPoints[] dps = query.run(); + verify(client).newScanner(tsdb.table); + assertNotNull(dps); + assertEquals("sys.cpu.user", dps[0].metricName()); + assertTrue(dps[0].getAggregatedTags().isEmpty()); + assertNull(dps[0].getAnnotations()); + assertEquals("web01", dps[0].getTags().get("host")); + + // Timeseries: (1, 2, 3, 4, ..., 299, 300) at 500-ms interval timestamps. + // Timeseries in 1sec intervals: (1, 2), (3, 4), ..., (299, 300) - 150 DPs + int i = 1; + for (DataPoint dp : dps[0]) { + assertEquals(i, dp.longValue()); + i += 2; + } + assertEquals(150, dps[0].size()); + } + @Test + public void runLongSingleTSDownsampleAndRate() throws Exception { + storeLongTimeSeriesSeconds(true, false);; + HashMap tags = new HashMap(1); + tags.put("host", "web01"); + query.setStartTime(1356998400); + query.setEndTime(1357041600); + query.downsample(60000, Aggregators.AVG); + query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, true); + final DataPoints[] dps = query.run(); + assertNotNull(dps); + assertEquals("sys.cpu.user", dps[0].metricName()); + assertTrue(dps[0].getAggregatedTags().isEmpty()); + assertNull(dps[0].getAnnotations()); + assertEquals("web01", dps[0].getTags().get("host")); + + // Timeseries: (1, 2, 3, 4, ..., 299, 300) at 30-second interval timestamps. + // Integer average 60s downsampling: 1, 3, 5, ... 297, 299 + // Timeseries in rate: 2 every 60 seconds or 1/30 per second + for (DataPoint dp : dps[0]) { + assertEquals(0.033F, dp.doubleValue(), 0.001); + } + assertEquals(149, dps[0].size()); + } + + @Test + public void runLongSingleTSDownsampleAndRateMs() throws Exception { + storeLongTimeSeriesMs(); + HashMap tags = new HashMap(1); + tags.put("host", "web01"); + query.setStartTime(1356998400); + query.setEndTime(1357041600); + query.downsample(1000, Aggregators.AVG); + query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, true); + final DataPoints[] dps = query.run(); + assertNotNull(dps); + assertEquals("sys.cpu.user", dps[0].metricName()); + assertTrue(dps[0].getAggregatedTags().isEmpty()); + assertNull(dps[0].getAnnotations()); + assertEquals("web01", dps[0].getTags().get("host")); + + // Timeseries: (1, 2, 3, 4, ..., 299, 300) at 500-ms interval timestamps. + // Integer average 1 sec downsampling: 1, 3, 5, ... 297, 299 + for (DataPoint dp : dps[0]) { + assertEquals(2.0F, dp.doubleValue(), 0.001); + } + assertEquals(149, dps[0].size()); + } + + @Test + public void runFloatSingleTSDownsample() throws Exception { + storeFloatTimeSeriesSeconds(true, false); + HashMap tags = new HashMap(1); + tags.put("host", "web01"); + query.setStartTime(1356998400); + query.setEndTime(1357041600); + query.downsample(60000, Aggregators.AVG); + query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, false); + final DataPoints[] dps = query.run(); + assertNotNull(dps); + assertEquals("sys.cpu.user", dps[0].metricName()); + assertTrue(dps[0].getAggregatedTags().isEmpty()); + assertNull(dps[0].getAnnotations()); + assertEquals("web01", dps[0].getTags().get("host")); + + // Timeseries in 30s intervals: (1.25, 1.5, 1.75, 2, 2.25, ..., 75.75, 76). + // Float average 60s downsampling: 2.75/2, 3.75/2, ... 151.75/2 + double i = 1.375D; + for (DataPoint dp : dps[0]) { + assertEquals(i, dp.doubleValue(), 0.00001); + i += 0.5D; + } + assertEquals(150, dps[0].size()); + } + + @Test + public void runFloatSingleTSDownsampleMs() throws Exception { + storeFloatTimeSeriesMs(); + HashMap tags = new HashMap(1); + tags.put("host", "web01"); + query.setStartTime(1356998400); + query.setEndTime(1357041600); + query.downsample(1000, Aggregators.AVG); + query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, false); + final DataPoints[] dps = query.run(); + assertNotNull(dps); + assertEquals("sys.cpu.user", dps[0].metricName()); + assertTrue(dps[0].getAggregatedTags().isEmpty()); + assertNull(dps[0].getAnnotations()); + assertEquals("web01", dps[0].getTags().get("host")); + + // Timeseries in 500ms intervals: (1.25, 1.5, 1.75, 2, ..., 75.75, 76). + // Float average 1s downsampling: 2.75/2, 3.75/2, ... 151.75/2 + double i = 1.375D; + for (DataPoint dp : dps[0]) { + assertEquals(i, dp.doubleValue(), 0.00001); + i += 0.5D; + } + assertEquals(150, dps[0].size()); + } + + @Test + public void runFloatSingleTSDownsampleAndRate() throws Exception { + storeFloatTimeSeriesSeconds(true, false); + HashMap tags = new HashMap(1); + tags.put("host", "web01"); + query.setStartTime(1356998400); + query.setEndTime(1357041600); + query.downsample(60000, Aggregators.AVG); + query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, true); + final DataPoints[] dps = query.run(); + assertNotNull(dps); + assertEquals("sys.cpu.user", dps[0].metricName()); + assertTrue(dps[0].getAggregatedTags().isEmpty()); + assertNull(dps[0].getAnnotations()); + assertEquals("web01", dps[0].getTags().get("host")); + + // Timeseries in 30s intervals: (1.25, 1.5, 1.75, 2, 2.25, ..., 75.75, 76). + // Float average 60s downsampling: 2.75/2, 3.75/2, ... 151.75/2 + // Rate = (3.75/2 - 2.75/2) / 60 = 1 / 120. + for (DataPoint dp : dps[0]) { + assertEquals(0.00833F, dp.doubleValue(), 0.00001); + } + assertEquals(149, dps[0].size()); + } + + @Test + public void runFloatSingleTSDownsampleAndRateMs() throws Exception { + storeFloatTimeSeriesMs(); + HashMap tags = new HashMap(1); + tags.put("host", "web01"); + query.setStartTime(1356998400); + query.setEndTime(1357041600); + query.downsample(1000, Aggregators.AVG); + query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, true); + final DataPoints[] dps = query.run(); + assertNotNull(dps); + assertEquals("sys.cpu.user", dps[0].metricName()); + assertTrue(dps[0].getAggregatedTags().isEmpty()); + assertNull(dps[0].getAnnotations()); + assertEquals("web01", dps[0].getTags().get("host")); + + // Timeseries in 500ms intervals: (1.25, 1.5, 1.75, 2, ..., 75.75, 76). + // Float average 1s downsampling: 2.75/2, 3.75/2, ... 151.75/2 + for (DataPoint dp : dps[0]) { + assertEquals(0.5F, dp.doubleValue(), 0.00001); + } + assertEquals(149, dps[0].size()); + } + + // ----------------- // + // Helper functions. // + // ----------------- // + + private void storeLongTimeSeriesSeconds(final boolean two_metrics, + final boolean offset) throws Exception { + storeLongTimeSeriesSecondsWithBasetime(1356998400L, two_metrics, offset); + } + + private void storeLongTimeSeriesSecondsWithBasetime(final long baseTimestamp, + final boolean two_metrics, final boolean offset) throws Exception { + setQueryStorage(); + // dump a bunch of rows of two metrics so that we can test filtering out + // on the metric + HashMap tags = new HashMap(1); + tags.put("host", "web01"); + long timestamp = baseTimestamp; + for (int i = 1; i <= 300; i++) { + tsdb.addPoint("sys.cpu.user", timestamp += 30, i, tags).joinUninterruptibly(); + if (two_metrics) { + tsdb.addPoint("sys.cpu.nice", timestamp, i, tags).joinUninterruptibly(); + } + } + + // dump a parallel set but invert the values + tags.clear(); + tags.put("host", "web02"); + timestamp = baseTimestamp + (offset ? 15 : 0); + for (int i = 300; i > 0; i--) { + tsdb.addPoint("sys.cpu.user", timestamp += 30, i, tags).joinUninterruptibly(); + if (two_metrics) { + tsdb.addPoint("sys.cpu.nice", timestamp, i, tags).joinUninterruptibly(); + } + } + } + + private void storeLongTimeSeriesMs() throws Exception { + setQueryStorage(); + // dump a bunch of rows of two metrics so that we can test filtering out + // on the metric + HashMap tags = new HashMap(1); + tags.put("host", "web01"); + long timestamp = 1356998400000L; + for (int i = 1; i <= 300; i++) { + tsdb.addPoint("sys.cpu.user", timestamp += 500, i, tags).joinUninterruptibly(); + tsdb.addPoint("sys.cpu.nice", timestamp, i, tags).joinUninterruptibly(); + } + + // dump a parallel set but invert the values + tags.clear(); + tags.put("host", "web02"); + timestamp = 1356998400000L; + for (int i = 300; i > 0; i--) { + tsdb.addPoint("sys.cpu.user", timestamp += 500, i, tags).joinUninterruptibly(); + tsdb.addPoint("sys.cpu.nice", timestamp, i, tags).joinUninterruptibly(); + } + } + + private void storeFloatTimeSeriesSeconds(final boolean two_metrics, + final boolean offset) throws Exception { + setQueryStorage(); + // dump a bunch of rows of two metrics so that we can test filtering out + // on the metric + HashMap tags = new HashMap(1); + tags.put("host", "web01"); + long timestamp = 1356998400; + for (float i = 1.25F; i <= 76; i += 0.25F) { + tsdb.addPoint("sys.cpu.user", timestamp += 30, i, tags).joinUninterruptibly(); + if (two_metrics) { + tsdb.addPoint("sys.cpu.nice", timestamp, i, tags).joinUninterruptibly(); + } + } + + // dump a parallel set but invert the values + tags.clear(); + tags.put("host", "web02"); + timestamp = offset ? 1356998415 : 1356998400; + for (float i = 75F; i > 0; i -= 0.25F) { + tsdb.addPoint("sys.cpu.user", timestamp += 30, i, tags).joinUninterruptibly(); + if (two_metrics) { + tsdb.addPoint("sys.cpu.nice", timestamp, i, tags).joinUninterruptibly(); + } + } + } + + private void storeFloatTimeSeriesMs() throws Exception { + setQueryStorage(); + // dump a bunch of rows of two metrics so that we can test filtering out + // on the metric + HashMap tags = new HashMap(1); + tags.put("host", "web01"); + long timestamp = 1356998400000L; + for (float i = 1.25F; i <= 76; i += 0.25F) { + tsdb.addPoint("sys.cpu.user", timestamp += 500, i, tags).joinUninterruptibly(); + tsdb.addPoint("sys.cpu.nice", timestamp, i, tags).joinUninterruptibly(); + } + + // dump a parallel set but invert the values + tags.clear(); + tags.put("host", "web02"); + timestamp = 1356998400000L; + for (float i = 75F; i > 0; i -= 0.25F) { + tsdb.addPoint("sys.cpu.user", timestamp += 500, i, tags).joinUninterruptibly(); + tsdb.addPoint("sys.cpu.nice", timestamp, i, tags).joinUninterruptibly(); + } + } + + @SuppressWarnings("unchecked") + private void setQueryStorage() throws Exception { + storage = new MockBase(tsdb, client, true, true, true, true); + storage.setFamily("t".getBytes(MockBase.ASCII())); + + PowerMockito.mockStatic(IncomingDataPoints.class); + PowerMockito.doAnswer( + new Answer() { + public byte[] answer(final InvocationOnMock args) + throws Exception { + final String metric = (String)args.getArguments()[1]; + final Map tags = + (Map)args.getArguments()[2]; + + if (metric.equals("sys.cpu.user")) { + if (tags.get("host").equals("web01")) { + return new byte[] { 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1}; + } else { + return new byte[] { 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2}; + } + } else { + if (tags.get("host").equals("web01")) { + return new byte[] { 0, 0, 2, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1}; + } else { + return new byte[] { 0, 0, 2, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2}; + } + } + } + } + ).when(IncomingDataPoints.class, "rowKeyTemplate", (TSDB)any(), anyString(), + (Map)any()); + } +}