From 96f18cd72d0bbca3191444db593f19e96b5d0606 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Tue, 17 Jan 2023 18:50:23 +0100 Subject: [PATCH 1/4] Use Message#canAggregate to control aggregation Revive previously unused Message#canAggregate to control aggregation. This will allow subclasses to have finer control over aggregation. --- src/main/java/com/timgroup/statsd/Message.java | 9 ++++++--- .../java/com/timgroup/statsd/StatsDAggregator.java | 10 +--------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/Message.java b/src/main/java/com/timgroup/statsd/Message.java index be3d14a3..3aff955a 100644 --- a/src/main/java/com/timgroup/statsd/Message.java +++ b/src/main/java/com/timgroup/statsd/Message.java @@ -2,8 +2,11 @@ import java.util.Arrays; import java.util.Objects; +import java.util.EnumSet; +import java.util.Set; public abstract class Message implements Comparable { + final String aspect; final Message.Type type; final String[] tags; @@ -34,6 +37,8 @@ public String toString() { } } + protected static final Set AGGREGATE_SET = EnumSet.of(Type.COUNT, Type.GAUGE, Type.SET); + protected Message(Message.Type type) { this("", type, null); } @@ -92,13 +97,11 @@ public String[] getTags() { /** * Return whether a message can be aggregated. - * Not sure if this makes sense. * * @return boolean on whether or not this message type may be aggregated. */ public boolean canAggregate() { - // return (this.type == m.type); - return false; + return AGGREGATE_SET.contains(type); } public void setDone(boolean done) { diff --git a/src/main/java/com/timgroup/statsd/StatsDAggregator.java b/src/main/java/com/timgroup/statsd/StatsDAggregator.java index fc8ae3c6..be7182fb 100644 --- a/src/main/java/com/timgroup/statsd/StatsDAggregator.java +++ b/src/main/java/com/timgroup/statsd/StatsDAggregator.java @@ -1,11 +1,9 @@ package com.timgroup.statsd; import java.util.ArrayList; -import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -15,8 +13,6 @@ public class StatsDAggregator { public static int DEFAULT_SHARDS = 4; // 4 partitions to reduce contention. protected final String AGGREGATOR_THREAD_NAME = "statsd-aggregator-thread"; - protected static final Set AGGREGATE_SET = EnumSet.of(Message.Type.COUNT, Message.Type.GAUGE, - Message.Type.SET); protected final ArrayList> aggregateMetrics; protected final int shardGranularity; @@ -81,10 +77,6 @@ public void stop() { } } - public boolean isTypeAggregate(Message.Type type) { - return AGGREGATE_SET.contains(type); - } - /** * Aggregate a message if possible. * @@ -93,7 +85,7 @@ public boolean isTypeAggregate(Message.Type type) { * * */ public boolean aggregateMessage(Message message) { - if (flushInterval == 0 || !isTypeAggregate(message.getType()) || message.getDone()) { + if (flushInterval == 0 || !message.canAggregate() || message.getDone()) { return false; } From e0776900f79a61b915490175db189fc0453ca16a Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Tue, 17 Jan 2023 18:53:21 +0100 Subject: [PATCH 2/4] Implement sending metrics with a timestamp Timestamps outside of the protocol range are normalized to the minimum representable value, as the running convention is to to not have any exceptions. --- .../com/timgroup/statsd/NoOpStatsDClient.java | 14 +++ .../statsd/NonBlockingStatsDClient.java | 106 ++++++++++++++-- .../com/timgroup/statsd/StatsDClient.java | 119 ++++++++++++++++++ .../statsd/NonBlockingStatsDClientTest.java | 81 ++++++++++++ 4 files changed, 312 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java b/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java index adcf2a08..e396a1ed 100644 --- a/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java @@ -1,5 +1,7 @@ package com.timgroup.statsd; +import java.time.Instant; + /** * A No-Op StatsDClient, which can be substituted in when metrics are not * required. @@ -16,10 +18,14 @@ public final class NoOpStatsDClient implements StatsDClient { @Override public void count(String aspect, long delta, double sampleRate, String... tags) { } + @Override public void count(String aspect, long delta, Instant timestamp, String... tags) { } + @Override public void count(String aspect, double delta, String... tags) { } @Override public void count(String aspect, double delta, double sampleRate, String... tags) { } + @Override public void count(String aspect, double delta, Instant timestamp, String... tags) { } + @Override public void incrementCounter(String aspect, String... tags) { } @Override public void incrementCounter(String aspect, double sampleRate, String... tags) { } @@ -40,18 +46,26 @@ public final class NoOpStatsDClient implements StatsDClient { @Override public void recordGaugeValue(String aspect, double value, double sampleRate, String... tags) { } + @Override public void recordGaugeValue(String aspect, double value, Instant timestamp, String... tags) { } + @Override public void recordGaugeValue(String aspect, long value, String... tags) { } @Override public void recordGaugeValue(String aspect, long value, double sampleRate, String... tags) { } + @Override public void recordGaugeValue(String aspect, long value, Instant timestamp, String... tags) { } + @Override public void gauge(String aspect, double value, String... tags) { } @Override public void gauge(String aspect, double value, double sampleRate, String... tags) { } + @Override public void gauge(String aspect, double value, Instant timestamp, String... tags) { } + @Override public void gauge(String aspect, long value, String... tags) { } @Override public void gauge(String aspect, long value, double sampleRate, String... tags) { } + @Override public void gauge(String aspect, long value, Instant timestamp, String... tags) { } + @Override public void recordExecutionTime(String aspect, long timeInMs, String... tags) { } @Override public void recordExecutionTime(String aspect, long timeInMs, double sampleRate, String... tags) { } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index b9f28223..a478336e 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -20,6 +20,8 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.time.Instant; + /** * A simple StatsD client implementation facilitating metrics recording. * @@ -42,6 +44,11 @@ * IO operations being carried out in a separate thread. Furthermore, these methods are guaranteed * not to throw an exception which may disrupt application execution. * + *

Some methods allow recording a value for a specific point in time by taking an Instant + * parameter. Such values are exempt from aggregation and should indicate the final metric value + * at the given time. Please refer to Datadog documentation for the range of accepted timestamp + * values. + * *

As part of a clean system shutdown, the {@link #stop()} method should be invoked * on any StatsD clients.

* @@ -57,6 +64,8 @@ public class NonBlockingStatsDClient implements StatsDClient { private static final String ENTITY_ID_TAG_NAME = "dd.internal.entity_id" ; public static final String ORIGIN_DETECTION_ENABLED_ENV_VAR = "DD_ORIGIN_DETECTION_ENABLED"; + private static final Instant MIN_TIMESTAMP = Instant.ofEpochSecond(1); + enum Literal { SERVICE, ENV, @@ -487,10 +496,12 @@ ClientChannel createByteChannel(Callable addressLookup, int timeo abstract class StatsDMessage extends NumericMessage { final double sampleRate; // NaN for none + final Instant timestamp; // null for none - protected StatsDMessage(String aspect, Message.Type type, T value, double sampleRate, String[] tags) { + protected StatsDMessage(String aspect, Message.Type type, T value, double sampleRate, Instant timestamp, String[] tags) { super(aspect, type, value, tags); this.sampleRate = sampleRate; + this.timestamp = timestamp; } @Override @@ -501,6 +512,9 @@ public final void writeTo(StringBuilder builder, String containerID) { if (!Double.isNaN(sampleRate)) { builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate)); } + if (timestamp != null) { + builder.append("|T").append(timestamp.getEpochSecond()); + } tagString(this.tags, builder); if (containerID != null && !containerID.isEmpty()) { builder.append("|c:").append(containerID); @@ -509,6 +523,12 @@ public final void writeTo(StringBuilder builder, String containerID) { builder.append('\n'); } + @Override + public boolean canAggregate() { + // Timestamped values can not be aggregated. + return super.canAggregate() && this.timestamp == null; + } + protected abstract void writeValue(StringBuilder builder); } @@ -528,8 +548,8 @@ private boolean send(final Message message) { return success; } - // send double with sample rate - private void send(String aspect, final double value, Message.Type type, double sampleRate, String[] tags) { + // send double with sample rate and timestamp + private void send(String aspect, final double value, Message.Type type, double sampleRate, Instant timestamp, String[] tags) { if (statsDProcessor.getAggregator().getFlushInterval() != 0 && !Double.isNaN(sampleRate)) { switch (type) { case COUNT: @@ -542,7 +562,7 @@ private void send(String aspect, final double value, Message.Type type, double s if (Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) { - sendMetric(new StatsDMessage(aspect, type, Double.valueOf(value), sampleRate, tags) { + sendMetric(new StatsDMessage(aspect, type, Double.valueOf(value), sampleRate, timestamp, tags) { @Override protected void writeValue(StringBuilder builder) { builder.append(format(NUMBER_FORMATTER, this.value)); } @@ -550,13 +570,24 @@ private void send(String aspect, final double value, Message.Type type, double s } } + private void send(String aspect, final double value, Message.Type type, Instant timestamp, String[] tags) { + if (timestamp.isBefore(MIN_TIMESTAMP)) { + timestamp = MIN_TIMESTAMP; + } + send(aspect, value, type, Double.NaN, timestamp, tags); + } + + private void send(String aspect, final double value, Message.Type type, double sampleRate, String[] tags) { + send(aspect, value, type, sampleRate, null, tags); + } + // send double without sample rate private void send(String aspect, final double value, Message.Type type, String[] tags) { - send(aspect, value, type, Double.NaN, tags); + send(aspect, value, type, Double.NaN, null, tags); } // send long with sample rate - private void send(String aspect, final long value, Message.Type type, double sampleRate, String[] tags) { + private void send(String aspect, final long value, Message.Type type, double sampleRate, Instant timestamp, String[] tags) { if (statsDProcessor.getAggregator().getFlushInterval() != 0 && !Double.isNaN(sampleRate)) { switch (type) { case COUNT: @@ -569,7 +600,7 @@ private void send(String aspect, final long value, Message.Type type, double sam if (Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) { - sendMetric(new StatsDMessage(aspect, type, value, sampleRate, tags) { + sendMetric(new StatsDMessage(aspect, type, value, sampleRate, timestamp, tags) { @Override protected void writeValue(StringBuilder builder) { builder.append(this.value); } @@ -577,9 +608,21 @@ private void send(String aspect, final long value, Message.Type type, double sam } } + private void send(String aspect, final long value, Message.Type type, Instant timestamp, String[] tags) { + if (timestamp.isBefore(MIN_TIMESTAMP)) { + timestamp = MIN_TIMESTAMP; + } + + send(aspect, value, type, Double.NaN, timestamp, tags); + } + + private void send(String aspect, final long value, Message.Type type, double sampleRate, String[] tags) { + send(aspect, value, type, sampleRate, null, tags); + } + // send long without sample rate private void send(String aspect, final long value, Message.Type type, String[] tags) { - send(aspect, value, type, Double.NaN, tags); + send(aspect, value, type, Double.NaN, null, tags); } /** @@ -607,6 +650,14 @@ public void count(final String aspect, final long delta, final double sampleRate send(aspect, delta, Message.Type.COUNT, sampleRate, tags); } + /** + * {@inheritDoc} + */ + @Override + public void count(final String aspect, final long value, final Instant timestamp, final String...tags) { + send(aspect, value, Message.Type.COUNT, timestamp, tags); + } + /** * Adjusts the specified counter by a given delta. * @@ -631,6 +682,13 @@ public void count(final String aspect, final double delta, final String... tags) public void count(final String aspect, final double delta, final double sampleRate, final String...tags) { send(aspect, delta, Message.Type.COUNT, sampleRate, tags); } + /** + * {@inheritDoc} + */ + @Override + public void count(final String aspect, final double value, final Instant timestamp, final String...tags) { + send(aspect, value, Message.Type.COUNT, timestamp, tags); + } /** * Increments the specified counter by one. @@ -735,6 +793,14 @@ public void recordGaugeValue(final String aspect, final double value, final doub send(aspect, value, Message.Type.GAUGE, sampleRate, tags); } + /** + * {@inheritDoc} + */ + @Override + public void recordGaugeValue(final String aspect, final double value, final Instant timestamp, final String... tags) { + send(aspect, value, Message.Type.GAUGE, timestamp, tags); + } + /** * Records the latest fixed value for the specified named gauge. * @@ -760,6 +826,14 @@ public void recordGaugeValue(final String aspect, final long value, final double send(aspect, value, Message.Type.GAUGE, sampleRate, tags); } + /** + * {@inheritDoc} + */ + @Override + public void recordGaugeValue(final String aspect, final long value, final Instant timestamp, final String... tags) { + send(aspect, value, Message.Type.GAUGE, timestamp, tags); + } + /** * Convenience method equivalent to {@link #recordGaugeValue(String, double, String[])}. */ @@ -776,6 +850,14 @@ public void gauge(final String aspect, final double value, final double sampleRa recordGaugeValue(aspect, value, sampleRate, tags); } + /** + * {@inheritDoc} + */ + @Override + public void gauge(final String aspect, final double value, final Instant timestamp, final String... tags) { + recordGaugeValue(aspect, value, timestamp, tags); + } + /** * Convenience method equivalent to {@link #recordGaugeValue(String, long, String[])}. @@ -793,6 +875,14 @@ public void gauge(final String aspect, final long value, final double sampleRate recordGaugeValue(aspect, value, sampleRate, tags); } + /** + * {@inheritDoc} + */ + @Override + public void gauge(final String aspect, final long value, final Instant timestamp, final String... tags) { + recordGaugeValue(aspect, value, timestamp, tags); + } + /** * Records an execution time in milliseconds for the specified named operation. * diff --git a/src/main/java/com/timgroup/statsd/StatsDClient.java b/src/main/java/com/timgroup/statsd/StatsDClient.java index b96567e9..e078d16a 100644 --- a/src/main/java/com/timgroup/statsd/StatsDClient.java +++ b/src/main/java/com/timgroup/statsd/StatsDClient.java @@ -1,6 +1,7 @@ package com.timgroup.statsd; import java.io.Closeable; +import java.time.Instant; /** * Describes a client connection to a StatsD server, which may be used to post metrics @@ -67,6 +68,27 @@ public interface StatsDClient extends Closeable { */ void count(String aspect, long delta, double sampleRate, String... tags); + /** + * Set the counter metric at the given time to the specified value. + * + *

Values with an explicit timestamp are never aggregated and + * will be recorded as the metric value at the given time.

+ * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the counter to adjust + * @param value + * the amount to adjust the counter by + * @param timestamp + * timestamp of the value + * @param tags + * array of tags to be added to the data + */ + void count(String aspect, long value, Instant timestamp, String... tags); + /** * Adjusts the specified counter by a given delta. * @@ -101,6 +123,27 @@ public interface StatsDClient extends Closeable { */ void count(String aspect, double delta, double sampleRate, String... tags); + /** + * Set the counter metric at the given time to the specified value. + * + *

Values with an explicit timestamp are never aggregated and + * will be recorded as the metric value at the given time.

+ * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the counter to adjust + * @param value + * the amount to adjust the counter by + * @param timestamp + * timestamp of the value + * @param tags + * array of tags to be added to the data + */ + void count(String aspect, double value, Instant timestamp, String... tags); + /** * Increments the specified counter by one. * @@ -239,6 +282,27 @@ public interface StatsDClient extends Closeable { */ void recordGaugeValue(String aspect, double value, double sampleRate, String... tags); + /** + * Set the gauge metric at the given time to the specified value. + * + *

Values with an explicit timestamp are never aggregated and + * will be recorded as the metric value at the given time.

+ * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the gauge + * @param value + * the new reading of the gauge + * @param timestamp + * timestamp of the value + * @param tags + * array of tags to be added to the data + */ + void recordGaugeValue(String aspect, double value, Instant timestamp, String... tags); + /** * Records the latest fixed value for the specified named gauge. * @@ -273,6 +337,27 @@ public interface StatsDClient extends Closeable { */ void recordGaugeValue(String aspect, long value, double sampleRate, String... tags); + /** + * Set the gauge metric at the given time to the specified value. + * + *

Values with an explicit timestamp are never aggregated and + * will be recorded as the metric value at the given time.

+ * + *

This method is a DataDog extension, and may not work with other servers.

+ * + *

This method is non-blocking and is guaranteed not to throw an exception.

+ * + * @param aspect + * the name of the gauge + * @param value + * the new reading of the gauge + * @param timestamp + * timestamp of the value + * @param tags + * array of tags to be added to the data + */ + void recordGaugeValue(String aspect, long value, Instant timestamp, String... tags); + /** * Convenience method equivalent to {@link #recordGaugeValue(String, double, String[])}. * @@ -299,6 +384,23 @@ public interface StatsDClient extends Closeable { */ void gauge(String aspect, double value, double sampleRate, String... tags); + /** + * Convenience method equivalent to {@link #recordGaugeValue(String, double, Instant, String[])}. + * + *

Values with an explicit timestamp are never aggregated and + * will be recorded as the metric value at the given time.

+ * + * @param aspect + * the name of the gauge + * @param value + * the new reading of the gauge + * @param timestamp + * timestamp of the value + * @param tags + * array of tags to be added to the data + */ + void gauge(String aspect, double value, Instant timestamp, String... tags); + /** * Convenience method equivalent to {@link #recordGaugeValue(String, long, String[])}. * @@ -325,6 +427,23 @@ public interface StatsDClient extends Closeable { */ void gauge(String aspect, long value, double sampleRate, String... tags); + /** + * Convenience method equivalent to {@link #recordGaugeValue(String, long, Instant, String[])}. + * + *

Values with an explicit timestamp are never aggregated and + * will be recorded as the metric value at the given time.

+ * + * @param aspect + * the name of the gauge + * @param value + * the new reading of the gauge + * @param timestamp + * timestamp of the value + * @param tags + * array of tags to be added to the data + */ + void gauge(String aspect, long value, Instant timestamp, String... tags); + /** * Records an execution time in milliseconds for the specified named operation. * diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index a20e371e..a1d6f4fd 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -8,6 +8,7 @@ import org.junit.contrib.java.lang.system.EnvironmentVariables; import org.junit.FixMethodOrder; import org.junit.runners.MethodSorters; +import org.junit.function.ThrowingRunnable; import java.io.IOException; import java.net.SocketAddress; @@ -23,6 +24,7 @@ import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; import java.text.NumberFormat; +import java.time.Instant; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.comparesEqualTo; @@ -148,6 +150,45 @@ public void sends_counter_value_with_sample_rate_to_statsd_with_tags() throws Ex assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mycount:24|c|@1.000000|#baz,foo:bar"))); } + @Test(timeout = 5000L) + public void sends_long_counter_value_with_timestamp() throws Exception { + clientUnaggregated.count("mycount", 24l, Instant.ofEpochSecond(1032127200), "foo:bar", "baz"); + clientUnaggregated.count("mycount", 42l, Instant.ofEpochSecond(1032127200), "foo:bar", "baz"); + server.waitForMessage("my.prefix"); + + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mycount:24|c|T1032127200|#baz,foo:bar"))); + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mycount:42|c|T1032127200|#baz,foo:bar"))); + } + + @Test(timeout = 5000L) + public void sends_double_counter_value_with_timestamp() throws Exception { + clientUnaggregated.count("mycount", 24.5d, Instant.ofEpochSecond(1032127200), "foo:bar", "baz"); + clientUnaggregated.count("mycount", 42.5d, Instant.ofEpochSecond(1032127200), "foo:bar", "baz"); + server.waitForMessage("my.prefix"); + + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mycount:24.5|c|T1032127200|#baz,foo:bar"))); + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mycount:42.5|c|T1032127200|#baz,foo:bar"))); + } + + @Test(timeout = 5000L) + public void sends_long_counter_value_with_incorrect_timestamp() throws Exception { + clientUnaggregated.count("mycount", 24l, Instant.ofEpochSecond(-1), "foo:bar", "baz"); + clientUnaggregated.count("mycount", 42l, Instant.ofEpochSecond(0), "foo:bar", "baz"); + server.waitForMessage("my.prefix"); + + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mycount:24|c|T1|#baz,foo:bar"))); + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mycount:42|c|T1|#baz,foo:bar"))); + } + + @Test(timeout = 5000L) + public void sends_double_counter_value_with_incorrect_timestamp() throws Exception { + clientUnaggregated.count("mycount", 24.5d, Instant.ofEpochSecond(-1), "foo:bar", "baz"); + clientUnaggregated.count("mycount", 42.5d, Instant.ofEpochSecond(0), "foo:bar", "baz"); + server.waitForMessage("my.prefix"); + + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mycount:24.5|c|T1|#baz,foo:bar"))); + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mycount:42.5|c|T1|#baz,foo:bar"))); + } @Test(timeout = 5000L) public void sends_counter_increment_to_statsd() throws Exception { @@ -271,6 +312,46 @@ public void sends_gauge_with_sample_rate_to_statsd_with_tags() throws Exception assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mygauge:423|g|@1.000000|#baz,foo:bar"))); } + @Test(timeout = 5000L) + public void sends_long_gauge_with_timestamp() throws Exception { + clientUnaggregated.recordGaugeValue("mygauge", 234l, Instant.ofEpochSecond(1205794800), "foo:bar", "baz"); + clientUnaggregated.recordGaugeValue("mygauge", 423l, Instant.ofEpochSecond(1205794800), "foo:bar", "baz"); + server.waitForMessage("my.prefix"); + + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mygauge:234|g|T1205794800|#baz,foo:bar"))); + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mygauge:423|g|T1205794800|#baz,foo:bar"))); + } + + @Test(timeout = 5000L) + public void sends_double_gauge_with_timestamp() throws Exception { + clientUnaggregated.recordGaugeValue("mygauge", 243.5d, Instant.ofEpochSecond(1205794800), "foo:bar", "baz"); + clientUnaggregated.recordGaugeValue("mygauge", 423.5d, Instant.ofEpochSecond(1205794800), "foo:bar", "baz"); + server.waitForMessage("my.prefix"); + + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mygauge:243.5|g|T1205794800|#baz,foo:bar"))); + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mygauge:423.5|g|T1205794800|#baz,foo:bar"))); + } + + @Test(timeout = 5000L) + public void sends_long_gauge_with_incorrect_timestamp() throws Exception { + clientUnaggregated.recordGaugeValue("mygauge", 234l, Instant.ofEpochSecond(0), "foo:bar", "baz"); + clientUnaggregated.recordGaugeValue("mygauge", 423l, Instant.ofEpochSecond(-1), "foo:bar", "baz"); + server.waitForMessage("my.prefix"); + + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mygauge:234|g|T1|#baz,foo:bar"))); + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mygauge:423|g|T1|#baz,foo:bar"))); + } + + @Test(timeout = 5000L) + public void sends_double_gauge_with_incorrect_timestamp() throws Exception { + clientUnaggregated.recordGaugeValue("mygauge", 243.5d, Instant.ofEpochSecond(0), "foo:bar", "baz"); + clientUnaggregated.recordGaugeValue("mygauge", 423.5d, Instant.ofEpochSecond(-1), "foo:bar", "baz"); + server.waitForMessage("my.prefix"); + + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mygauge:243.5|g|T1|#baz,foo:bar"))); + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mygauge:423.5|g|T1|#baz,foo:bar"))); + } + @Test(timeout = 5000L) public void sends_double_gauge_to_statsd_with_tags() throws Exception { From 1d7f4b531aeac7090b86dbe7a90e69a6aff4b9b0 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Wed, 18 Jan 2023 14:20:19 +0100 Subject: [PATCH 3/4] fix style --- src/main/java/com/timgroup/statsd/Message.java | 2 +- src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/Message.java b/src/main/java/com/timgroup/statsd/Message.java index 3aff955a..1dbb1b13 100644 --- a/src/main/java/com/timgroup/statsd/Message.java +++ b/src/main/java/com/timgroup/statsd/Message.java @@ -1,8 +1,8 @@ package com.timgroup.statsd; import java.util.Arrays; -import java.util.Objects; import java.util.EnumSet; +import java.util.Objects; import java.util.Set; public abstract class Message implements Comparable { diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index a478336e..e0faaaa1 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -9,6 +9,7 @@ import java.text.DecimalFormat; import java.text.DecimalFormatSymbols; import java.text.NumberFormat; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -20,7 +21,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.time.Instant; /** * A simple StatsD client implementation facilitating metrics recording. @@ -682,6 +682,7 @@ public void count(final String aspect, final double delta, final String... tags) public void count(final String aspect, final double delta, final double sampleRate, final String...tags) { send(aspect, delta, Message.Type.COUNT, sampleRate, tags); } + /** * {@inheritDoc} */ From fc63a60892f7f55acbd3d5b8dd9a29d5e5f104ce Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Thu, 19 Jan 2023 17:13:17 +0100 Subject: [PATCH 4/4] Use long instead of time.Instant java.time.Instant is only available on Java 8 and later. To avoid confusion with sample rate, which occupies the same slot in the parameter list, methods that accept a timestamp were given a distinct new name. --- .../com/timgroup/statsd/NoOpStatsDClient.java | 18 +-- .../statsd/NonBlockingStatsDClient.java | 118 ++++++++---------- .../com/timgroup/statsd/StatsDClient.java | 108 +++++----------- .../statsd/NonBlockingStatsDClientTest.java | 33 +++-- 4 files changed, 105 insertions(+), 172 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java b/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java index e396a1ed..65b80a47 100644 --- a/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java @@ -1,7 +1,5 @@ package com.timgroup.statsd; -import java.time.Instant; - /** * A No-Op StatsDClient, which can be substituted in when metrics are not * required. @@ -18,13 +16,13 @@ public final class NoOpStatsDClient implements StatsDClient { @Override public void count(String aspect, long delta, double sampleRate, String... tags) { } - @Override public void count(String aspect, long delta, Instant timestamp, String... tags) { } - @Override public void count(String aspect, double delta, String... tags) { } @Override public void count(String aspect, double delta, double sampleRate, String... tags) { } - @Override public void count(String aspect, double delta, Instant timestamp, String... tags) { } + @Override public void countWithTimestamp(String aspect, long delta, long timestamp, String... tags) { } + + @Override public void countWithTimestamp(String aspect, double delta, long timestamp, String... tags) { } @Override public void incrementCounter(String aspect, String... tags) { } @@ -46,25 +44,21 @@ public final class NoOpStatsDClient implements StatsDClient { @Override public void recordGaugeValue(String aspect, double value, double sampleRate, String... tags) { } - @Override public void recordGaugeValue(String aspect, double value, Instant timestamp, String... tags) { } - @Override public void recordGaugeValue(String aspect, long value, String... tags) { } @Override public void recordGaugeValue(String aspect, long value, double sampleRate, String... tags) { } - @Override public void recordGaugeValue(String aspect, long value, Instant timestamp, String... tags) { } - @Override public void gauge(String aspect, double value, String... tags) { } @Override public void gauge(String aspect, double value, double sampleRate, String... tags) { } - @Override public void gauge(String aspect, double value, Instant timestamp, String... tags) { } - @Override public void gauge(String aspect, long value, String... tags) { } @Override public void gauge(String aspect, long value, double sampleRate, String... tags) { } - @Override public void gauge(String aspect, long value, Instant timestamp, String... tags) { } + @Override public void gaugeWithTimestamp(String aspect, double value, long timestamp, String... tags) { } + + @Override public void gaugeWithTimestamp(String aspect, long value, long timestamp, String... tags) { } @Override public void recordExecutionTime(String aspect, long timeInMs, String... tags) { } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index e0faaaa1..f11f8df8 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -9,7 +9,6 @@ import java.text.DecimalFormat; import java.text.DecimalFormatSymbols; import java.text.NumberFormat; -import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -44,10 +43,10 @@ * IO operations being carried out in a separate thread. Furthermore, these methods are guaranteed * not to throw an exception which may disrupt application execution. * - *

Some methods allow recording a value for a specific point in time by taking an Instant - * parameter. Such values are exempt from aggregation and should indicate the final metric value - * at the given time. Please refer to Datadog documentation for the range of accepted timestamp - * values. + *

Some methods allow recording a value for a specific point in time by taking an extra + * timestamp parameter. Such values are exempt from aggregation and the value should indicate the + * final metric value at the given time. Please refer to Datadog documentation for the range of + * accepted timestamp values. * *

As part of a clean system shutdown, the {@link #stop()} method should be invoked * on any StatsD clients.

@@ -64,7 +63,7 @@ public class NonBlockingStatsDClient implements StatsDClient { private static final String ENTITY_ID_TAG_NAME = "dd.internal.entity_id" ; public static final String ORIGIN_DETECTION_ENABLED_ENV_VAR = "DD_ORIGIN_DETECTION_ENABLED"; - private static final Instant MIN_TIMESTAMP = Instant.ofEpochSecond(1); + private static final long MIN_TIMESTAMP = 1; enum Literal { SERVICE, @@ -496,9 +495,9 @@ ClientChannel createByteChannel(Callable addressLookup, int timeo abstract class StatsDMessage extends NumericMessage { final double sampleRate; // NaN for none - final Instant timestamp; // null for none + final long timestamp; // zero for none - protected StatsDMessage(String aspect, Message.Type type, T value, double sampleRate, Instant timestamp, String[] tags) { + protected StatsDMessage(String aspect, Message.Type type, T value, double sampleRate, long timestamp, String[] tags) { super(aspect, type, value, tags); this.sampleRate = sampleRate; this.timestamp = timestamp; @@ -512,8 +511,8 @@ public final void writeTo(StringBuilder builder, String containerID) { if (!Double.isNaN(sampleRate)) { builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate)); } - if (timestamp != null) { - builder.append("|T").append(timestamp.getEpochSecond()); + if (timestamp != 0) { + builder.append("|T").append(timestamp); } tagString(this.tags, builder); if (containerID != null && !containerID.isEmpty()) { @@ -526,7 +525,7 @@ public final void writeTo(StringBuilder builder, String containerID) { @Override public boolean canAggregate() { // Timestamped values can not be aggregated. - return super.canAggregate() && this.timestamp == null; + return super.canAggregate() && this.timestamp == 0; } protected abstract void writeValue(StringBuilder builder); @@ -549,7 +548,7 @@ private boolean send(final Message message) { } // send double with sample rate and timestamp - private void send(String aspect, final double value, Message.Type type, double sampleRate, Instant timestamp, String[] tags) { + private void send(String aspect, final double value, Message.Type type, double sampleRate, long timestamp, String[] tags) { if (statsDProcessor.getAggregator().getFlushInterval() != 0 && !Double.isNaN(sampleRate)) { switch (type) { case COUNT: @@ -570,24 +569,17 @@ private void send(String aspect, final double value, Message.Type type, double s } } - private void send(String aspect, final double value, Message.Type type, Instant timestamp, String[] tags) { - if (timestamp.isBefore(MIN_TIMESTAMP)) { - timestamp = MIN_TIMESTAMP; - } - send(aspect, value, type, Double.NaN, timestamp, tags); - } - private void send(String aspect, final double value, Message.Type type, double sampleRate, String[] tags) { - send(aspect, value, type, sampleRate, null, tags); + send(aspect, value, type, sampleRate, 0, tags); } // send double without sample rate private void send(String aspect, final double value, Message.Type type, String[] tags) { - send(aspect, value, type, Double.NaN, null, tags); + send(aspect, value, type, Double.NaN, 0, tags); } // send long with sample rate - private void send(String aspect, final long value, Message.Type type, double sampleRate, Instant timestamp, String[] tags) { + private void send(String aspect, final long value, Message.Type type, double sampleRate, long timestamp, String[] tags) { if (statsDProcessor.getAggregator().getFlushInterval() != 0 && !Double.isNaN(sampleRate)) { switch (type) { case COUNT: @@ -608,21 +600,28 @@ private void send(String aspect, final long value, Message.Type type, double sam } } - private void send(String aspect, final long value, Message.Type type, Instant timestamp, String[] tags) { - if (timestamp.isBefore(MIN_TIMESTAMP)) { + private void send(String aspect, final long value, Message.Type type, double sampleRate, String[] tags) { + send(aspect, value, type, sampleRate, 0, tags); + } + + // send long without sample rate + private void send(String aspect, final long value, Message.Type type, String[] tags) { + send(aspect, value, type, Double.NaN, 0, tags); + } + + private void sendWithTimestamp(String aspect, final double value, Message.Type type, long timestamp, String[] tags) { + if (timestamp < MIN_TIMESTAMP) { timestamp = MIN_TIMESTAMP; } - send(aspect, value, type, Double.NaN, timestamp, tags); } - private void send(String aspect, final long value, Message.Type type, double sampleRate, String[] tags) { - send(aspect, value, type, sampleRate, null, tags); - } + private void sendWithTimestamp(String aspect, final long value, Message.Type type, long timestamp, String[] tags) { + if (timestamp < MIN_TIMESTAMP) { + timestamp = MIN_TIMESTAMP; + } - // send long without sample rate - private void send(String aspect, final long value, Message.Type type, String[] tags) { - send(aspect, value, type, Double.NaN, null, tags); + send(aspect, value, type, Double.NaN, timestamp, tags); } /** @@ -650,14 +649,6 @@ public void count(final String aspect, final long delta, final double sampleRate send(aspect, delta, Message.Type.COUNT, sampleRate, tags); } - /** - * {@inheritDoc} - */ - @Override - public void count(final String aspect, final long value, final Instant timestamp, final String...tags) { - send(aspect, value, Message.Type.COUNT, timestamp, tags); - } - /** * Adjusts the specified counter by a given delta. * @@ -687,8 +678,16 @@ public void count(final String aspect, final double delta, final double sampleRa * {@inheritDoc} */ @Override - public void count(final String aspect, final double value, final Instant timestamp, final String...tags) { - send(aspect, value, Message.Type.COUNT, timestamp, tags); + public void countWithTimestamp(final String aspect, final long value, final long timestamp, final String...tags) { + sendWithTimestamp(aspect, value, Message.Type.COUNT, timestamp, tags); + } + + /** + * {@inheritDoc} + */ + @Override + public void countWithTimestamp(final String aspect, final double value, final long timestamp, final String...tags) { + sendWithTimestamp(aspect, value, Message.Type.COUNT, timestamp, tags); } /** @@ -794,14 +793,6 @@ public void recordGaugeValue(final String aspect, final double value, final doub send(aspect, value, Message.Type.GAUGE, sampleRate, tags); } - /** - * {@inheritDoc} - */ - @Override - public void recordGaugeValue(final String aspect, final double value, final Instant timestamp, final String... tags) { - send(aspect, value, Message.Type.GAUGE, timestamp, tags); - } - /** * Records the latest fixed value for the specified named gauge. * @@ -827,14 +818,6 @@ public void recordGaugeValue(final String aspect, final long value, final double send(aspect, value, Message.Type.GAUGE, sampleRate, tags); } - /** - * {@inheritDoc} - */ - @Override - public void recordGaugeValue(final String aspect, final long value, final Instant timestamp, final String... tags) { - send(aspect, value, Message.Type.GAUGE, timestamp, tags); - } - /** * Convenience method equivalent to {@link #recordGaugeValue(String, double, String[])}. */ @@ -851,15 +834,6 @@ public void gauge(final String aspect, final double value, final double sampleRa recordGaugeValue(aspect, value, sampleRate, tags); } - /** - * {@inheritDoc} - */ - @Override - public void gauge(final String aspect, final double value, final Instant timestamp, final String... tags) { - recordGaugeValue(aspect, value, timestamp, tags); - } - - /** * Convenience method equivalent to {@link #recordGaugeValue(String, long, String[])}. */ @@ -880,8 +854,16 @@ public void gauge(final String aspect, final long value, final double sampleRate * {@inheritDoc} */ @Override - public void gauge(final String aspect, final long value, final Instant timestamp, final String... tags) { - recordGaugeValue(aspect, value, timestamp, tags); + public void gaugeWithTimestamp(final String aspect, final double value, final long timestamp, final String... tags) { + sendWithTimestamp(aspect, value, Message.Type.GAUGE, timestamp, tags); + } + + /** + * {@inheritDoc} + */ + @Override + public void gaugeWithTimestamp(final String aspect, final long value, final long timestamp, final String... tags) { + sendWithTimestamp(aspect, value, Message.Type.GAUGE, timestamp, tags); } /** diff --git a/src/main/java/com/timgroup/statsd/StatsDClient.java b/src/main/java/com/timgroup/statsd/StatsDClient.java index e078d16a..2c0311d8 100644 --- a/src/main/java/com/timgroup/statsd/StatsDClient.java +++ b/src/main/java/com/timgroup/statsd/StatsDClient.java @@ -1,7 +1,6 @@ package com.timgroup.statsd; import java.io.Closeable; -import java.time.Instant; /** * Describes a client connection to a StatsD server, which may be used to post metrics @@ -69,10 +68,7 @@ public interface StatsDClient extends Closeable { void count(String aspect, long delta, double sampleRate, String... tags); /** - * Set the counter metric at the given time to the specified value. - * - *

Values with an explicit timestamp are never aggregated and - * will be recorded as the metric value at the given time.

+ * Adjusts the specified counter by a given delta. * *

This method is a DataDog extension, and may not work with other servers.

* @@ -80,14 +76,12 @@ public interface StatsDClient extends Closeable { * * @param aspect * the name of the counter to adjust - * @param value + * @param delta * the amount to adjust the counter by - * @param timestamp - * timestamp of the value * @param tags * array of tags to be added to the data */ - void count(String aspect, long value, Instant timestamp, String... tags); + void count(String aspect, double delta, String... tags); /** * Adjusts the specified counter by a given delta. @@ -100,13 +94,18 @@ public interface StatsDClient extends Closeable { * the name of the counter to adjust * @param delta * the amount to adjust the counter by + * @param sampleRate + * percentage of time metric to be sent * @param tags * array of tags to be added to the data */ - void count(String aspect, double delta, String... tags); + void count(String aspect, double delta, double sampleRate, String... tags); /** - * Adjusts the specified counter by a given delta. + * Set the counter metric at the given time to the specified value. + * + *

Values with an explicit timestamp are never aggregated and + * will be recorded as the metric value at the given time.

* *

This method is a DataDog extension, and may not work with other servers.

* @@ -114,14 +113,14 @@ public interface StatsDClient extends Closeable { * * @param aspect * the name of the counter to adjust - * @param delta + * @param value * the amount to adjust the counter by - * @param sampleRate - * percentage of time metric to be sent + * @param timestamp + * timestamp of the value, as seconds from the epoch of 1970-01-01T00:00:00Z * @param tags * array of tags to be added to the data */ - void count(String aspect, double delta, double sampleRate, String... tags); + void countWithTimestamp(String aspect, long value, long timestamp, String... tags); /** * Set the counter metric at the given time to the specified value. @@ -138,11 +137,11 @@ public interface StatsDClient extends Closeable { * @param value * the amount to adjust the counter by * @param timestamp - * timestamp of the value + * timestamp of the value, as seconds from the epoch of 1970-01-01T00:00:00Z * @param tags * array of tags to be added to the data */ - void count(String aspect, double value, Instant timestamp, String... tags); + void countWithTimestamp(String aspect, double value, long timestamp, String... tags); /** * Increments the specified counter by one. @@ -282,27 +281,6 @@ public interface StatsDClient extends Closeable { */ void recordGaugeValue(String aspect, double value, double sampleRate, String... tags); - /** - * Set the gauge metric at the given time to the specified value. - * - *

Values with an explicit timestamp are never aggregated and - * will be recorded as the metric value at the given time.

- * - *

This method is a DataDog extension, and may not work with other servers.

- * - *

This method is non-blocking and is guaranteed not to throw an exception.

- * - * @param aspect - * the name of the gauge - * @param value - * the new reading of the gauge - * @param timestamp - * timestamp of the value - * @param tags - * array of tags to be added to the data - */ - void recordGaugeValue(String aspect, double value, Instant timestamp, String... tags); - /** * Records the latest fixed value for the specified named gauge. * @@ -337,27 +315,6 @@ public interface StatsDClient extends Closeable { */ void recordGaugeValue(String aspect, long value, double sampleRate, String... tags); - /** - * Set the gauge metric at the given time to the specified value. - * - *

Values with an explicit timestamp are never aggregated and - * will be recorded as the metric value at the given time.

- * - *

This method is a DataDog extension, and may not work with other servers.

- * - *

This method is non-blocking and is guaranteed not to throw an exception.

- * - * @param aspect - * the name of the gauge - * @param value - * the new reading of the gauge - * @param timestamp - * timestamp of the value - * @param tags - * array of tags to be added to the data - */ - void recordGaugeValue(String aspect, long value, Instant timestamp, String... tags); - /** * Convenience method equivalent to {@link #recordGaugeValue(String, double, String[])}. * @@ -385,50 +342,51 @@ public interface StatsDClient extends Closeable { void gauge(String aspect, double value, double sampleRate, String... tags); /** - * Convenience method equivalent to {@link #recordGaugeValue(String, double, Instant, String[])}. - * - *

Values with an explicit timestamp are never aggregated and - * will be recorded as the metric value at the given time.

+ * Convenience method equivalent to {@link #recordGaugeValue(String, long, String[])}. * * @param aspect * the name of the gauge * @param value * the new reading of the gauge - * @param timestamp - * timestamp of the value * @param tags * array of tags to be added to the data */ - void gauge(String aspect, double value, Instant timestamp, String... tags); + void gauge(String aspect, long value, String... tags); /** - * Convenience method equivalent to {@link #recordGaugeValue(String, long, String[])}. + * Convenience method equivalent to {@link #recordGaugeValue(String, long, double, String[])}. * * @param aspect * the name of the gauge * @param value * the new reading of the gauge + * @param sampleRate + * percentage of time metric to be sent * @param tags * array of tags to be added to the data */ - void gauge(String aspect, long value, String... tags); + + void gauge(String aspect, long value, double sampleRate, String... tags); /** - * Convenience method equivalent to {@link #recordGaugeValue(String, long, double, String[])}. + * Set the gauge metric at the given time to the specified value. + * + *

Values with an explicit timestamp are never aggregated and + * will be recorded as the metric value at the given time.

* * @param aspect * the name of the gauge * @param value * the new reading of the gauge - * @param sampleRate - * percentage of time metric to be sent + * @param timestamp + * timestamp of the value, as seconds from the epoch of 1970-01-01T00:00:00Z * @param tags * array of tags to be added to the data */ - void gauge(String aspect, long value, double sampleRate, String... tags); + void gaugeWithTimestamp(String aspect, double value, long timestamp, String... tags); /** - * Convenience method equivalent to {@link #recordGaugeValue(String, long, Instant, String[])}. + * Set the gauge metric at the given time to the specified value. * *

Values with an explicit timestamp are never aggregated and * will be recorded as the metric value at the given time.

@@ -438,11 +396,11 @@ public interface StatsDClient extends Closeable { * @param value * the new reading of the gauge * @param timestamp - * timestamp of the value + * timestamp of the value, as seconds from the epoch of 1970-01-01T00:00:00Z * @param tags * array of tags to be added to the data */ - void gauge(String aspect, long value, Instant timestamp, String... tags); + void gaugeWithTimestamp(String aspect, long value, long timestamp, String... tags); /** * Records an execution time in milliseconds for the specified named operation. diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index a1d6f4fd..461b7a59 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -24,7 +24,6 @@ import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; import java.text.NumberFormat; -import java.time.Instant; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.comparesEqualTo; @@ -152,8 +151,8 @@ public void sends_counter_value_with_sample_rate_to_statsd_with_tags() throws Ex @Test(timeout = 5000L) public void sends_long_counter_value_with_timestamp() throws Exception { - clientUnaggregated.count("mycount", 24l, Instant.ofEpochSecond(1032127200), "foo:bar", "baz"); - clientUnaggregated.count("mycount", 42l, Instant.ofEpochSecond(1032127200), "foo:bar", "baz"); + clientUnaggregated.countWithTimestamp("mycount", 24l, 1032127200, "foo:bar", "baz"); + clientUnaggregated.countWithTimestamp("mycount", 42l, 1032127200, "foo:bar", "baz"); server.waitForMessage("my.prefix"); assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mycount:24|c|T1032127200|#baz,foo:bar"))); @@ -162,8 +161,8 @@ public void sends_long_counter_value_with_timestamp() throws Exception { @Test(timeout = 5000L) public void sends_double_counter_value_with_timestamp() throws Exception { - clientUnaggregated.count("mycount", 24.5d, Instant.ofEpochSecond(1032127200), "foo:bar", "baz"); - clientUnaggregated.count("mycount", 42.5d, Instant.ofEpochSecond(1032127200), "foo:bar", "baz"); + clientUnaggregated.countWithTimestamp("mycount", 24.5d, 1032127200, "foo:bar", "baz"); + clientUnaggregated.countWithTimestamp("mycount", 42.5d, 1032127200, "foo:bar", "baz"); server.waitForMessage("my.prefix"); assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mycount:24.5|c|T1032127200|#baz,foo:bar"))); @@ -172,8 +171,8 @@ public void sends_double_counter_value_with_timestamp() throws Exception { @Test(timeout = 5000L) public void sends_long_counter_value_with_incorrect_timestamp() throws Exception { - clientUnaggregated.count("mycount", 24l, Instant.ofEpochSecond(-1), "foo:bar", "baz"); - clientUnaggregated.count("mycount", 42l, Instant.ofEpochSecond(0), "foo:bar", "baz"); + clientUnaggregated.countWithTimestamp("mycount", 24l, -1, "foo:bar", "baz"); + clientUnaggregated.countWithTimestamp("mycount", 42l, 0, "foo:bar", "baz"); server.waitForMessage("my.prefix"); assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mycount:24|c|T1|#baz,foo:bar"))); @@ -182,8 +181,8 @@ public void sends_long_counter_value_with_incorrect_timestamp() throws Exception @Test(timeout = 5000L) public void sends_double_counter_value_with_incorrect_timestamp() throws Exception { - clientUnaggregated.count("mycount", 24.5d, Instant.ofEpochSecond(-1), "foo:bar", "baz"); - clientUnaggregated.count("mycount", 42.5d, Instant.ofEpochSecond(0), "foo:bar", "baz"); + clientUnaggregated.countWithTimestamp("mycount", 24.5d, -1, "foo:bar", "baz"); + clientUnaggregated.countWithTimestamp("mycount", 42.5d, 0, "foo:bar", "baz"); server.waitForMessage("my.prefix"); assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mycount:24.5|c|T1|#baz,foo:bar"))); @@ -314,8 +313,8 @@ public void sends_gauge_with_sample_rate_to_statsd_with_tags() throws Exception @Test(timeout = 5000L) public void sends_long_gauge_with_timestamp() throws Exception { - clientUnaggregated.recordGaugeValue("mygauge", 234l, Instant.ofEpochSecond(1205794800), "foo:bar", "baz"); - clientUnaggregated.recordGaugeValue("mygauge", 423l, Instant.ofEpochSecond(1205794800), "foo:bar", "baz"); + clientUnaggregated.gaugeWithTimestamp("mygauge", 234l, 1205794800, "foo:bar", "baz"); + clientUnaggregated.gaugeWithTimestamp("mygauge", 423l, 1205794800, "foo:bar", "baz"); server.waitForMessage("my.prefix"); assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mygauge:234|g|T1205794800|#baz,foo:bar"))); @@ -324,8 +323,8 @@ public void sends_long_gauge_with_timestamp() throws Exception { @Test(timeout = 5000L) public void sends_double_gauge_with_timestamp() throws Exception { - clientUnaggregated.recordGaugeValue("mygauge", 243.5d, Instant.ofEpochSecond(1205794800), "foo:bar", "baz"); - clientUnaggregated.recordGaugeValue("mygauge", 423.5d, Instant.ofEpochSecond(1205794800), "foo:bar", "baz"); + clientUnaggregated.gaugeWithTimestamp("mygauge", 243.5d, 1205794800, "foo:bar", "baz"); + clientUnaggregated.gaugeWithTimestamp("mygauge", 423.5d, 1205794800, "foo:bar", "baz"); server.waitForMessage("my.prefix"); assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mygauge:243.5|g|T1205794800|#baz,foo:bar"))); @@ -334,8 +333,8 @@ public void sends_double_gauge_with_timestamp() throws Exception { @Test(timeout = 5000L) public void sends_long_gauge_with_incorrect_timestamp() throws Exception { - clientUnaggregated.recordGaugeValue("mygauge", 234l, Instant.ofEpochSecond(0), "foo:bar", "baz"); - clientUnaggregated.recordGaugeValue("mygauge", 423l, Instant.ofEpochSecond(-1), "foo:bar", "baz"); + clientUnaggregated.gaugeWithTimestamp("mygauge", 234l, 0, "foo:bar", "baz"); + clientUnaggregated.gaugeWithTimestamp("mygauge", 423l, -1, "foo:bar", "baz"); server.waitForMessage("my.prefix"); assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mygauge:234|g|T1|#baz,foo:bar"))); @@ -344,8 +343,8 @@ public void sends_long_gauge_with_incorrect_timestamp() throws Exception { @Test(timeout = 5000L) public void sends_double_gauge_with_incorrect_timestamp() throws Exception { - clientUnaggregated.recordGaugeValue("mygauge", 243.5d, Instant.ofEpochSecond(0), "foo:bar", "baz"); - clientUnaggregated.recordGaugeValue("mygauge", 423.5d, Instant.ofEpochSecond(-1), "foo:bar", "baz"); + clientUnaggregated.gaugeWithTimestamp("mygauge", 243.5d, 0, "foo:bar", "baz"); + clientUnaggregated.gaugeWithTimestamp("mygauge", 423.5d, -1, "foo:bar", "baz"); server.waitForMessage("my.prefix"); assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mygauge:243.5|g|T1|#baz,foo:bar")));