Skip to content

Commit

Permalink
Merge pull request #188 from DataDog/ahmed/dsd-1.2
Browse files Browse the repository at this point in the history
feat: origin detection with container ID field (dogstatsd 1.2)
  • Loading branch information
ahmed-mez authored Mar 18, 2022
2 parents 54f01ab + 2866eb1 commit 1fc57b6
Show file tree
Hide file tree
Showing 15 changed files with 548 additions and 28 deletions.
73 changes: 73 additions & 0 deletions src/main/java/com/timgroup/statsd/CgroupReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.timgroup.statsd;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* A reader class that retrieves the current container ID parsed from a the cgroup file.
*
*/
class CgroupReader {
private static final Path CGROUP_PATH = Paths.get("/proc/self/cgroup");
private static final String CONTAINER_SOURCE = "[0-9a-f]{64}";
private static final String TASK_SOURCE = "[0-9a-f]{32}-\\d+";
private static final Pattern LINE_RE = Pattern.compile("^\\d+:[^:]*:(.+)$", Pattern.MULTILINE | Pattern.UNIX_LINES);
private static final Pattern CONTAINER_RE =
Pattern.compile(
"(" + CONTAINER_SOURCE + "|" + TASK_SOURCE + ")(?:.scope)?$");

private boolean readOnce = false;
public String containerID;

/**
* Parses /proc/self/cgroup and returns the container ID if available.
*
* @throws IOException
* if /proc/self/cgroup is readable and still an I/O error occurs reading from the stream
*/
public String getContainerID() throws IOException {
if (readOnce) {
return containerID;
}

containerID = read(CGROUP_PATH);
return containerID;
}

private String read(Path path) throws IOException {
readOnce = true;
if (!Files.isReadable(path)) {
return null;
}

final String content = new String(Files.readAllBytes(path));
if (content.isEmpty()) {
return null;
}

return parse(content);
}

/**
* Parses a Cgroup file content and returns the corresponding container ID.
*
* @param cgroupsContent
* Cgroup file content
*/
public static String parse(final String cgroupsContent) {
final Matcher lines = LINE_RE.matcher(cgroupsContent);
while (lines.find()) {
final String path = lines.group(1);
final Matcher matcher = CONTAINER_RE.matcher(path);
if (matcher.find()) {
return matcher.group(1);
}
}

return null;
}
}
2 changes: 1 addition & 1 deletion src/main/java/com/timgroup/statsd/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected Message(String aspect, Message.Type type, String[] tags) {
* @param builder
* StringBuilder the text representation will be written to.
*/
abstract void writeTo(StringBuilder builder);
abstract void writeTo(StringBuilder builder, String containerID);

/**
* Aggregate message.
Expand Down
98 changes: 86 additions & 12 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.text.DecimalFormatSymbols;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class NonBlockingStatsDClient implements StatsDClient {
public static final String DD_NAMED_PIPE_ENV_VAR = "DD_DOGSTATSD_PIPE_NAME";
public static final String DD_ENTITY_ID_ENV_VAR = "DD_ENTITY_ID";
private static final String ENTITY_ID_TAG_NAME = "dd.internal.entity_id" ;
public static final String ORIGIN_DETECTION_ENABLED_ENV_VAR = "DD_ORIGIN_DETECTION_ENABLED";

enum Literal {
SERVICE,
Expand Down Expand Up @@ -87,6 +89,7 @@ String tag() {
public static final boolean DEFAULT_ENABLE_TELEMETRY = true;

public static final boolean DEFAULT_ENABLE_AGGREGATION = true;
public static final boolean DEFAULT_ENABLE_ORIGIN_DETECTION = true;

public static final String CLIENT_TAG = "client:java";
public static final String CLIENT_VERSION_TAG = "client_version:";
Expand Down Expand Up @@ -214,6 +217,21 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
* Aggregation flush interval integer, in milliseconds. 0 disables aggregation.
* @param aggregationShards
* Aggregation flush interval integer, in milliseconds. 0 disables aggregation.
* @param containerID
* Allows passing the container ID, this will be used by the Agent to enrich
* metrics with container tags.
* This feature requires Datadog Agent version >=6.35.0 && <7.0.0 or Agent versions >=7.35.0.
* When configured, the provided container ID is prioritized over the container ID discovered
* via Origin Detection. When entityID or DD_ENTITY_ID are set, this value is ignored.
* @param originDetectionEnabled
* Enable/disable the client origin detection.
* This feature requires Datadog Agent version >=6.35.0 && <7.0.0 or Agent versions >=7.35.0.
* When enabled, the client tries to discover its container ID and sends it to the Agent
* to enrich the metrics with container tags.
* Origin detection can be disabled by configuring the environment variabe DD_ORIGIN_DETECTION_ENABLED=false
* The client tries to read the container ID by parsing the file /proc/self/cgroup.
* This is not supported on Windows.
* The client prioritizes the value passed via or entityID or DD_ENTITY_ID (if set) over the container ID.
* @throws StatsDClientException
* if the client could not be started
*/
Expand All @@ -222,7 +240,8 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
final Callable<SocketAddress> telemetryAddressLookup, final int timeout, final int bufferSize,
final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers,
final int senderWorkers, boolean blocking, final boolean enableTelemetry, final int telemetryFlushInterval,
final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory customThreadFactory)
final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory customThreadFactory,
String containerID, final boolean originDetectionEnabled)
throws StatsDClientException {

if ((prefix != null) && (!prefix.isEmpty())) {
Expand All @@ -245,7 +264,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
}
}
// Support "dd.internal.entity_id" internal tag.
updateTagsWithEntityID(costantPreTags, entityID);
final boolean hasEntityID = updateTagsWithEntityID(costantPreTags, entityID);
for (final Literal literal : Literal.values()) {
final String envVal = literal.envVal();
if (envVal != null && !envVal.trim().isEmpty()) {
Expand All @@ -259,6 +278,13 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
costantPreTags.toArray(new String[costantPreTags.size()]), null, new StringBuilder()).toString();
}
costantPreTags = null;
// Origin detection
if (hasEntityID) {
containerID = null;
} else {
boolean originEnabled = isOriginDetectionEnabled(containerID, originDetectionEnabled, hasEntityID);
containerID = getContainerID(containerID, originEnabled);
}
}

try {
Expand All @@ -267,7 +293,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory();

statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize,
processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory);
processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory, containerID);

Properties properties = new Properties();
properties.load(getClass().getClassLoader().getResourceAsStream(
Expand All @@ -285,7 +311,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final

// similar settings, but a single worker and non-blocking.
telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes,
poolSize, 1, false, 0, aggregationShards, threadFactory);
poolSize, 1, false, 0, aggregationShards, threadFactory, containerID);
}

this.telemetry = new Telemetry.Builder()
Expand Down Expand Up @@ -341,19 +367,21 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) thr
builder.bufferPoolSize, builder.processorWorkers, builder.senderWorkers,
builder.blocking, builder.enableTelemetry, builder.telemetryFlushInterval,
(builder.enableAggregation ? builder.aggregationFlushInterval : 0),
builder.aggregationShards, builder.threadFactory);
builder.aggregationShards, builder.threadFactory, builder.containerID,
builder.originDetectionEnabled);
}

protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler,
final int maxPacketSizeBytes, final int bufferPoolSize, final int workers, final boolean blocking,
final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory threadFactory)
final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory threadFactory,
final String containerID)
throws Exception {
if (blocking) {
return new StatsDBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize,
workers, aggregationFlushInterval, aggregationShards, threadFactory);
workers, aggregationFlushInterval, aggregationShards, threadFactory, containerID);
} else {
return new StatsDNonBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize,
workers, aggregationFlushInterval, aggregationShards, threadFactory);
workers, aggregationFlushInterval, aggregationShards, threadFactory, containerID);
}
}

Expand Down Expand Up @@ -466,14 +494,17 @@ protected StatsDMessage(String aspect, Message.Type type, T value, double sample
}

@Override
public final void writeTo(StringBuilder builder) {
public final void writeTo(StringBuilder builder, String containerID) {
builder.append(prefix).append(aspect).append(':');
writeValue(builder);
builder.append('|').append(type);
if (!Double.isNaN(sampleRate)) {
builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate));
}
tagString(this.tags, builder);
if (containerID != null && !containerID.isEmpty()) {
builder.append("|c:").append(containerID);
}

builder.append('\n');
}
Expand Down Expand Up @@ -1019,7 +1050,7 @@ private StringBuilder eventMap(final Event event, StringBuilder res) {
@Override
public void recordEvent(final Event event, final String... eventTags) {
statsDProcessor.send(new AlphaNumericMessage(Message.Type.EVENT, "") {
@Override public void writeTo(StringBuilder builder) {
@Override public void writeTo(StringBuilder builder, String containerID) {
final String title = escapeEventString(prefix + event.getTitle());
final String text = escapeEventString(event.getText());
builder.append(Message.Type.EVENT.toString())
Expand All @@ -1037,6 +1068,9 @@ public void recordEvent(final Event event, final String... eventTags) {

eventMap(event, builder);
tagString(eventTags, builder);
if (containerID != null && !containerID.isEmpty()) {
builder.append("|c:").append(containerID);
}

builder.append('\n');
}
Expand Down Expand Up @@ -1071,7 +1105,7 @@ private int getUtf8Length(final String text) {
@Override
public void recordServiceCheckRun(final ServiceCheck sc) {
statsDProcessor.send(new AlphaNumericMessage(Message.Type.SERVICE_CHECK, "") {
@Override public void writeTo(StringBuilder sb) {
@Override public void writeTo(StringBuilder sb, String containerID) {
// see http://docs.datadoghq.com/guides/dogstatsd/#service-checks
sb.append(Message.Type.SERVICE_CHECK.toString())
.append("|")
Expand All @@ -1088,6 +1122,9 @@ public void recordServiceCheckRun(final ServiceCheck sc) {
if (sc.getMessage() != null) {
sb.append("|m:").append(sc.getEscapedMessage());
}
if (containerID != null && !containerID.isEmpty()) {
sb.append("|c:").append(containerID);
}

sb.append('\n');
}
Expand Down Expand Up @@ -1154,11 +1191,14 @@ protected void writeValue(StringBuilder builder) {
builder.append(getValue());
}

@Override protected final void writeTo(StringBuilder builder) {
@Override protected final void writeTo(StringBuilder builder, String containerID) {
builder.append(prefix).append(aspect).append(':');
writeValue(builder);
builder.append('|').append(type);
tagString(this.tags, builder);
if (containerID != null && !containerID.isEmpty()) {
builder.append("|c:").append(containerID);
}

builder.append('\n');
}
Expand All @@ -1169,5 +1209,39 @@ protected boolean isInvalidSample(double sampleRate) {
return sampleRate != 1 && ThreadLocalRandom.current().nextDouble() > sampleRate;
}

boolean isOriginDetectionEnabled(String containerID, boolean originDetectionEnabled, boolean hasEntityID) {
if (!originDetectionEnabled || hasEntityID || (containerID != null && !containerID.isEmpty())) {
// origin detection is explicitly disabled
// or DD_ENTITY_ID was found
// or a user-defined container ID was provided
return false;
}

String value = System.getenv(ORIGIN_DETECTION_ENABLED_ENV_VAR);
value = value != null ? value.trim() : null;
if (value != null && !value.isEmpty()) {
return !Arrays.asList("no", "false", "0", "n", "off").contains(value.toLowerCase());
}

// DD_ORIGIN_DETECTION_ENABLED is not set or is empty
// default to true
return true;
}

private String getContainerID(String containerID, boolean originDetectionEnabled) {
if (containerID != null && !containerID.isEmpty()) {
return containerID;
}

if (originDetectionEnabled) {
CgroupReader reader = new CgroupReader();
try {
return reader.getContainerID();
} catch (final IOException e) {
throw new StatsDClientException("Failed to get container ID", e);
}
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class NonBlockingStatsDClientBuilder implements Cloneable {
public int telemetryFlushInterval = Telemetry.DEFAULT_FLUSH_INTERVAL;
public int aggregationFlushInterval = StatsDAggregator.DEFAULT_FLUSH_INTERVAL;
public int aggregationShards = StatsDAggregator.DEFAULT_SHARDS;
public boolean originDetectionEnabled = NonBlockingStatsDClient.DEFAULT_ENABLE_ORIGIN_DETECTION;

public Callable<SocketAddress> addressLookup;
public Callable<SocketAddress> telemetryAddressLookup;
Expand All @@ -42,6 +43,7 @@ public class NonBlockingStatsDClientBuilder implements Cloneable {
public String prefix;
public String entityID;
public String[] constantTags;
public String containerID;

public StatsDClientErrorHandler errorHandler;
public ThreadFactory threadFactory;
Expand Down Expand Up @@ -173,6 +175,16 @@ public NonBlockingStatsDClientBuilder threadFactory(ThreadFactory val) {
return this;
}

public NonBlockingStatsDClientBuilder containerID(String val) {
containerID = val;
return this;
}

public NonBlockingStatsDClientBuilder originDetectionEnabled(boolean val) {
originDetectionEnabled = val;
return this;
}

/**
* NonBlockingStatsDClient factory method.
* @return the built NonBlockingStatsDClient.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ protected boolean haveMessages() {
StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler,
final int maxPacketSizeBytes, final int poolSize, final int workers,
final int aggregatorFlushInterval, final int aggregatorShards,
final ThreadFactory threadFactory) throws Exception {
final ThreadFactory threadFactory, final String containerID) throws Exception {

super(queueSize, handler, maxPacketSizeBytes, poolSize, workers,
aggregatorFlushInterval, aggregatorShards, threadFactory);
aggregatorFlushInterval, aggregatorShards, threadFactory, containerID);
this.messages = new ArrayBlockingQueue<>(queueSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ protected boolean haveMessages() {
StatsDNonBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler,
final int maxPacketSizeBytes, final int poolSize, final int workers,
final int aggregatorFlushInterval, final int aggregatorShards,
final ThreadFactory threadFactory) throws Exception {
final ThreadFactory threadFactory, final String containerID) throws Exception {

super(queueSize, handler, maxPacketSizeBytes, poolSize, workers,
aggregatorFlushInterval, aggregatorShards, threadFactory);
aggregatorFlushInterval, aggregatorShards, threadFactory, containerID);
this.qsize = new AtomicInteger(0);
this.messages = new ConcurrentLinkedQueue<>();
}
Expand Down
Loading

0 comments on commit 1fc57b6

Please sign in to comment.