Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sending metrics with a timestamp #211

Merged
merged 4 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/main/java/com/timgroup/statsd/Message.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.timgroup.statsd;

import java.util.Arrays;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Set;

public abstract class Message implements Comparable<Message> {

final String aspect;
final Message.Type type;
final String[] tags;
Expand Down Expand Up @@ -34,6 +37,8 @@ public String toString() {
}
}

protected static final Set<Type> AGGREGATE_SET = EnumSet.of(Type.COUNT, Type.GAUGE, Type.SET);

protected Message(Message.Type type) {
this("", type, null);
}
Expand Down Expand Up @@ -92,13 +97,11 @@ public String[] getTags() {

/**
* Return whether a message can be aggregated.
* Not sure if this makes sense.
*
* @return boolean on whether or not this message type may be aggregated.
*/
public boolean canAggregate() {
// return (this.type == m.type);
return false;
return AGGREGATE_SET.contains(type);
}

public void setDone(boolean done) {
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/timgroup/statsd/NoOpStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public final class NoOpStatsDClient implements StatsDClient {

@Override public void count(String aspect, double delta, double sampleRate, String... tags) { }

@Override public void countWithTimestamp(String aspect, long delta, long timestamp, String... tags) { }

@Override public void countWithTimestamp(String aspect, double delta, long timestamp, String... tags) { }

@Override public void incrementCounter(String aspect, String... tags) { }

@Override public void incrementCounter(String aspect, double sampleRate, String... tags) { }
Expand Down Expand Up @@ -52,6 +56,10 @@ public final class NoOpStatsDClient implements StatsDClient {

@Override public void gauge(String aspect, long value, double sampleRate, String... tags) { }

@Override public void gaugeWithTimestamp(String aspect, double value, long timestamp, String... tags) { }

@Override public void gaugeWithTimestamp(String aspect, long value, long timestamp, String... tags) { }

@Override public void recordExecutionTime(String aspect, long timeInMs, String... tags) { }

@Override public void recordExecutionTime(String aspect, long timeInMs, double sampleRate, String... tags) { }
Expand Down
91 changes: 82 additions & 9 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;


/**
* A simple StatsD client implementation facilitating metrics recording.
*
Expand All @@ -42,6 +43,11 @@
* IO operations being carried out in a separate thread. Furthermore, these methods are guaranteed
* not to throw an exception which may disrupt application execution.
*
* <p>Some methods allow recording a value for a specific point in time by taking an extra
* timestamp parameter. Such values are exempt from aggregation and the value should indicate the
* final metric value at the given time. Please refer to Datadog documentation for the range of
* accepted timestamp values.
*
* <p>As part of a clean system shutdown, the {@link #stop()} method should be invoked
* on any StatsD clients.</p>
*
Expand All @@ -57,6 +63,8 @@ public class NonBlockingStatsDClient implements StatsDClient {
private static final String ENTITY_ID_TAG_NAME = "dd.internal.entity_id" ;
public static final String ORIGIN_DETECTION_ENABLED_ENV_VAR = "DD_ORIGIN_DETECTION_ENABLED";

private static final long MIN_TIMESTAMP = 1;

enum Literal {
SERVICE,
ENV,
Expand Down Expand Up @@ -487,10 +495,12 @@ ClientChannel createByteChannel(Callable<SocketAddress> addressLookup, int timeo

abstract class StatsDMessage<T extends Number> extends NumericMessage<T> {
final double sampleRate; // NaN for none
final long timestamp; // zero for none

protected StatsDMessage(String aspect, Message.Type type, T value, double sampleRate, String[] tags) {
protected StatsDMessage(String aspect, Message.Type type, T value, double sampleRate, long timestamp, String[] tags) {
super(aspect, type, value, tags);
this.sampleRate = sampleRate;
this.timestamp = timestamp;
}

@Override
Expand All @@ -501,6 +511,9 @@ public final void writeTo(StringBuilder builder, String containerID) {
if (!Double.isNaN(sampleRate)) {
builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate));
}
if (timestamp != 0) {
builder.append("|T").append(timestamp);
}
tagString(this.tags, builder);
if (containerID != null && !containerID.isEmpty()) {
builder.append("|c:").append(containerID);
Expand All @@ -509,6 +522,12 @@ public final void writeTo(StringBuilder builder, String containerID) {
builder.append('\n');
}

@Override
public boolean canAggregate() {
// Timestamped values can not be aggregated.
return super.canAggregate() && this.timestamp == 0;
}

protected abstract void writeValue(StringBuilder builder);
}

Expand All @@ -528,8 +547,8 @@ private boolean send(final Message message) {
return success;
}

// send double with sample rate
private void send(String aspect, final double value, Message.Type type, double sampleRate, String[] tags) {
// send double with sample rate and timestamp
private void send(String aspect, final double value, Message.Type type, double sampleRate, long timestamp, String[] tags) {
if (statsDProcessor.getAggregator().getFlushInterval() != 0 && !Double.isNaN(sampleRate)) {
switch (type) {
case COUNT:
Expand All @@ -542,21 +561,25 @@ private void send(String aspect, final double value, Message.Type type, double s

if (Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) {

sendMetric(new StatsDMessage<Double>(aspect, type, Double.valueOf(value), sampleRate, tags) {
sendMetric(new StatsDMessage<Double>(aspect, type, Double.valueOf(value), sampleRate, timestamp, tags) {
@Override protected void writeValue(StringBuilder builder) {
builder.append(format(NUMBER_FORMATTER, this.value));
}
});
}
}

private void send(String aspect, final double value, Message.Type type, double sampleRate, String[] tags) {
send(aspect, value, type, sampleRate, 0, tags);
}

// send double without sample rate
private void send(String aspect, final double value, Message.Type type, String[] tags) {
send(aspect, value, type, Double.NaN, tags);
send(aspect, value, type, Double.NaN, 0, tags);
}

// send long with sample rate
private void send(String aspect, final long value, Message.Type type, double sampleRate, String[] tags) {
private void send(String aspect, final long value, Message.Type type, double sampleRate, long timestamp, String[] tags) {
if (statsDProcessor.getAggregator().getFlushInterval() != 0 && !Double.isNaN(sampleRate)) {
switch (type) {
case COUNT:
Expand All @@ -569,17 +592,36 @@ private void send(String aspect, final long value, Message.Type type, double sam

if (Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) {

sendMetric(new StatsDMessage<Long>(aspect, type, value, sampleRate, tags) {
sendMetric(new StatsDMessage<Long>(aspect, type, value, sampleRate, timestamp, tags) {
@Override protected void writeValue(StringBuilder builder) {
builder.append(this.value);
}
});
}
}

private void send(String aspect, final long value, Message.Type type, double sampleRate, String[] tags) {
send(aspect, value, type, sampleRate, 0, tags);
}

// send long without sample rate
private void send(String aspect, final long value, Message.Type type, String[] tags) {
send(aspect, value, type, Double.NaN, tags);
send(aspect, value, type, Double.NaN, 0, tags);
}

private void sendWithTimestamp(String aspect, final double value, Message.Type type, long timestamp, String[] tags) {
if (timestamp < MIN_TIMESTAMP) {
timestamp = MIN_TIMESTAMP;
}
send(aspect, value, type, Double.NaN, timestamp, tags);
}

private void sendWithTimestamp(String aspect, final long value, Message.Type type, long timestamp, String[] tags) {
if (timestamp < MIN_TIMESTAMP) {
timestamp = MIN_TIMESTAMP;
}

send(aspect, value, type, Double.NaN, timestamp, tags);
}

/**
Expand Down Expand Up @@ -632,6 +674,22 @@ public void count(final String aspect, final double delta, final double sampleRa
send(aspect, delta, Message.Type.COUNT, sampleRate, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void countWithTimestamp(final String aspect, final long value, final long timestamp, final String...tags) {
sendWithTimestamp(aspect, value, Message.Type.COUNT, timestamp, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void countWithTimestamp(final String aspect, final double value, final long timestamp, final String...tags) {
sendWithTimestamp(aspect, value, Message.Type.COUNT, timestamp, tags);
}

/**
* Increments the specified counter by one.
*
Expand Down Expand Up @@ -776,7 +834,6 @@ public void gauge(final String aspect, final double value, final double sampleRa
recordGaugeValue(aspect, value, sampleRate, tags);
}


/**
* Convenience method equivalent to {@link #recordGaugeValue(String, long, String[])}.
*/
Expand All @@ -793,6 +850,22 @@ public void gauge(final String aspect, final long value, final double sampleRate
recordGaugeValue(aspect, value, sampleRate, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void gaugeWithTimestamp(final String aspect, final double value, final long timestamp, final String... tags) {
sendWithTimestamp(aspect, value, Message.Type.GAUGE, timestamp, tags);
}

/**
* {@inheritDoc}
*/
@Override
public void gaugeWithTimestamp(final String aspect, final long value, final long timestamp, final String... tags) {
sendWithTimestamp(aspect, value, Message.Type.GAUGE, timestamp, tags);
}

/**
* Records an execution time in milliseconds for the specified named operation.
*
Expand Down
10 changes: 1 addition & 9 deletions src/main/java/com/timgroup/statsd/StatsDAggregator.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package com.timgroup.statsd;

import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;

Expand All @@ -15,8 +13,6 @@ public class StatsDAggregator {
public static int DEFAULT_SHARDS = 4; // 4 partitions to reduce contention.

protected final String AGGREGATOR_THREAD_NAME = "statsd-aggregator-thread";
protected static final Set<Message.Type> AGGREGATE_SET = EnumSet.of(Message.Type.COUNT, Message.Type.GAUGE,
Message.Type.SET);
protected final ArrayList<Map<Message, Message>> aggregateMetrics;

protected final int shardGranularity;
Expand Down Expand Up @@ -81,10 +77,6 @@ public void stop() {
}
}

public boolean isTypeAggregate(Message.Type type) {
return AGGREGATE_SET.contains(type);
}

/**
* Aggregate a message if possible.
*
Expand All @@ -93,7 +85,7 @@ public boolean isTypeAggregate(Message.Type type) {
*
* */
public boolean aggregateMessage(Message message) {
if (flushInterval == 0 || !isTypeAggregate(message.getType()) || message.getDone()) {
if (flushInterval == 0 || !message.canAggregate() || message.getDone()) {
return false;
}

Expand Down
77 changes: 77 additions & 0 deletions src/main/java/com/timgroup/statsd/StatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,48 @@ public interface StatsDClient extends Closeable {
*/
void count(String aspect, double delta, double sampleRate, String... tags);

/**
* Set the counter metric at the given time to the specified value.
*
* <p>Values with an explicit timestamp are never aggregated and
* will be recorded as the metric value at the given time.</p>
*
* <p>This method is a DataDog extension, and may not work with other servers.</p>
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the counter to adjust
* @param value
* the amount to adjust the counter by
* @param timestamp
* timestamp of the value, as seconds from the epoch of 1970-01-01T00:00:00Z
* @param tags
* array of tags to be added to the data
*/
void countWithTimestamp(String aspect, long value, long timestamp, String... tags);

/**
* Set the counter metric at the given time to the specified value.
*
* <p>Values with an explicit timestamp are never aggregated and
* will be recorded as the metric value at the given time.</p>
*
* <p>This method is a DataDog extension, and may not work with other servers.</p>
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the counter to adjust
* @param value
* the amount to adjust the counter by
* @param timestamp
* timestamp of the value, as seconds from the epoch of 1970-01-01T00:00:00Z
* @param tags
* array of tags to be added to the data
*/
void countWithTimestamp(String aspect, double value, long timestamp, String... tags);

/**
* Increments the specified counter by one.
*
Expand Down Expand Up @@ -323,8 +365,43 @@ public interface StatsDClient extends Closeable {
* @param tags
* array of tags to be added to the data
*/

void gauge(String aspect, long value, double sampleRate, String... tags);

/**
* Set the gauge metric at the given time to the specified value.
*
* <p>Values with an explicit timestamp are never aggregated and
* will be recorded as the metric value at the given time.</p>
*
* @param aspect
* the name of the gauge
* @param value
* the new reading of the gauge
* @param timestamp
* timestamp of the value, as seconds from the epoch of 1970-01-01T00:00:00Z
* @param tags
* array of tags to be added to the data
*/
void gaugeWithTimestamp(String aspect, double value, long timestamp, String... tags);

/**
* Set the gauge metric at the given time to the specified value.
*
* <p>Values with an explicit timestamp are never aggregated and
* will be recorded as the metric value at the given time.</p>
*
* @param aspect
* the name of the gauge
* @param value
* the new reading of the gauge
* @param timestamp
* timestamp of the value, as seconds from the epoch of 1970-01-01T00:00:00Z
* @param tags
* array of tags to be added to the data
*/
void gaugeWithTimestamp(String aspect, long value, long timestamp, String... tags);

/**
* Records an execution time in milliseconds for the specified named operation.
*
Expand Down
Loading