diff --git a/src/main/java/com/timgroup/statsd/CgroupReader.java b/src/main/java/com/timgroup/statsd/CgroupReader.java new file mode 100644 index 00000000..31baa4b7 --- /dev/null +++ b/src/main/java/com/timgroup/statsd/CgroupReader.java @@ -0,0 +1,73 @@ +package com.timgroup.statsd; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A reader class that retrieves the current container ID parsed from a the cgroup file. + * + */ +class CgroupReader { + private static final Path CGROUP_PATH = Paths.get("/proc/self/cgroup"); + private static final String CONTAINER_SOURCE = "[0-9a-f]{64}"; + private static final String TASK_SOURCE = "[0-9a-f]{32}-\\d+"; + private static final Pattern LINE_RE = Pattern.compile("^\\d+:[^:]*:(.+)$", Pattern.MULTILINE | Pattern.UNIX_LINES); + private static final Pattern CONTAINER_RE = + Pattern.compile( + "(" + CONTAINER_SOURCE + "|" + TASK_SOURCE + ")(?:.scope)?$"); + + private boolean readOnce = false; + public String containerID; + + /** + * Parses /proc/self/cgroup and returns the container ID if available. + * + * @throws IOException + * if /proc/self/cgroup is readable and still an I/O error occurs reading from the stream + */ + public String getContainerID() throws IOException { + if (readOnce) { + return containerID; + } + + containerID = read(CGROUP_PATH); + return containerID; + } + + private String read(Path path) throws IOException { + readOnce = true; + if (!Files.isReadable(path)) { + return null; + } + + final String content = new String(Files.readAllBytes(path)); + if (content.isEmpty()) { + return null; + } + + return parse(content); + } + + /** + * Parses a Cgroup file content and returns the corresponding container ID. + * + * @param cgroupsContent + * Cgroup file content + */ + public static String parse(final String cgroupsContent) { + final Matcher lines = LINE_RE.matcher(cgroupsContent); + while (lines.find()) { + final String path = lines.group(1); + final Matcher matcher = CONTAINER_RE.matcher(path); + if (matcher.find()) { + return matcher.group(1); + } + } + + return null; + } +} diff --git a/src/main/java/com/timgroup/statsd/Message.java b/src/main/java/com/timgroup/statsd/Message.java index cfa716ac..0a29e99e 100644 --- a/src/main/java/com/timgroup/statsd/Message.java +++ b/src/main/java/com/timgroup/statsd/Message.java @@ -56,7 +56,7 @@ protected Message(String aspect, Message.Type type, String[] tags) { * @param builder * StringBuilder the text representation will be written to. */ - abstract void writeTo(StringBuilder builder); + abstract void writeTo(StringBuilder builder, String containerID); /** * Aggregate message. diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 9f13875b..b9f28223 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -10,6 +10,7 @@ import java.text.DecimalFormatSymbols; import java.text.NumberFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Properties; @@ -54,6 +55,7 @@ public class NonBlockingStatsDClient implements StatsDClient { public static final String DD_NAMED_PIPE_ENV_VAR = "DD_DOGSTATSD_PIPE_NAME"; public static final String DD_ENTITY_ID_ENV_VAR = "DD_ENTITY_ID"; private static final String ENTITY_ID_TAG_NAME = "dd.internal.entity_id" ; + public static final String ORIGIN_DETECTION_ENABLED_ENV_VAR = "DD_ORIGIN_DETECTION_ENABLED"; enum Literal { SERVICE, @@ -87,6 +89,7 @@ String tag() { public static final boolean DEFAULT_ENABLE_TELEMETRY = true; public static final boolean DEFAULT_ENABLE_AGGREGATION = true; + public static final boolean DEFAULT_ENABLE_ORIGIN_DETECTION = true; public static final String CLIENT_TAG = "client:java"; public static final String CLIENT_VERSION_TAG = "client_version:"; @@ -214,6 +217,21 @@ protected static String format(ThreadLocal formatter, Number value * Aggregation flush interval integer, in milliseconds. 0 disables aggregation. * @param aggregationShards * Aggregation flush interval integer, in milliseconds. 0 disables aggregation. + * @param containerID + * Allows passing the container ID, this will be used by the Agent to enrich + * metrics with container tags. + * This feature requires Datadog Agent version >=6.35.0 && <7.0.0 or Agent versions >=7.35.0. + * When configured, the provided container ID is prioritized over the container ID discovered + * via Origin Detection. When entityID or DD_ENTITY_ID are set, this value is ignored. + * @param originDetectionEnabled + * Enable/disable the client origin detection. + * This feature requires Datadog Agent version >=6.35.0 && <7.0.0 or Agent versions >=7.35.0. + * When enabled, the client tries to discover its container ID and sends it to the Agent + * to enrich the metrics with container tags. + * Origin detection can be disabled by configuring the environment variabe DD_ORIGIN_DETECTION_ENABLED=false + * The client tries to read the container ID by parsing the file /proc/self/cgroup. + * This is not supported on Windows. + * The client prioritizes the value passed via or entityID or DD_ENTITY_ID (if set) over the container ID. * @throws StatsDClientException * if the client could not be started */ @@ -222,7 +240,8 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final final Callable telemetryAddressLookup, final int timeout, final int bufferSize, final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers, final int senderWorkers, boolean blocking, final boolean enableTelemetry, final int telemetryFlushInterval, - final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory customThreadFactory) + final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory customThreadFactory, + String containerID, final boolean originDetectionEnabled) throws StatsDClientException { if ((prefix != null) && (!prefix.isEmpty())) { @@ -245,7 +264,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final } } // Support "dd.internal.entity_id" internal tag. - updateTagsWithEntityID(costantPreTags, entityID); + final boolean hasEntityID = updateTagsWithEntityID(costantPreTags, entityID); for (final Literal literal : Literal.values()) { final String envVal = literal.envVal(); if (envVal != null && !envVal.trim().isEmpty()) { @@ -259,6 +278,13 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final costantPreTags.toArray(new String[costantPreTags.size()]), null, new StringBuilder()).toString(); } costantPreTags = null; + // Origin detection + if (hasEntityID) { + containerID = null; + } else { + boolean originEnabled = isOriginDetectionEnabled(containerID, originDetectionEnabled, hasEntityID); + containerID = getContainerID(containerID, originEnabled); + } } try { @@ -267,7 +293,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory(); statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize, - processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory); + processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory, containerID); Properties properties = new Properties(); properties.load(getClass().getClassLoader().getResourceAsStream( @@ -285,7 +311,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final // similar settings, but a single worker and non-blocking. telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, - poolSize, 1, false, 0, aggregationShards, threadFactory); + poolSize, 1, false, 0, aggregationShards, threadFactory, containerID); } this.telemetry = new Telemetry.Builder() @@ -341,19 +367,21 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) thr builder.bufferPoolSize, builder.processorWorkers, builder.senderWorkers, builder.blocking, builder.enableTelemetry, builder.telemetryFlushInterval, (builder.enableAggregation ? builder.aggregationFlushInterval : 0), - builder.aggregationShards, builder.threadFactory); + builder.aggregationShards, builder.threadFactory, builder.containerID, + builder.originDetectionEnabled); } protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int bufferPoolSize, final int workers, final boolean blocking, - final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory threadFactory) + final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory threadFactory, + final String containerID) throws Exception { if (blocking) { return new StatsDBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize, - workers, aggregationFlushInterval, aggregationShards, threadFactory); + workers, aggregationFlushInterval, aggregationShards, threadFactory, containerID); } else { return new StatsDNonBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize, - workers, aggregationFlushInterval, aggregationShards, threadFactory); + workers, aggregationFlushInterval, aggregationShards, threadFactory, containerID); } } @@ -466,7 +494,7 @@ protected StatsDMessage(String aspect, Message.Type type, T value, double sample } @Override - public final void writeTo(StringBuilder builder) { + public final void writeTo(StringBuilder builder, String containerID) { builder.append(prefix).append(aspect).append(':'); writeValue(builder); builder.append('|').append(type); @@ -474,6 +502,9 @@ public final void writeTo(StringBuilder builder) { builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate)); } tagString(this.tags, builder); + if (containerID != null && !containerID.isEmpty()) { + builder.append("|c:").append(containerID); + } builder.append('\n'); } @@ -1019,7 +1050,7 @@ private StringBuilder eventMap(final Event event, StringBuilder res) { @Override public void recordEvent(final Event event, final String... eventTags) { statsDProcessor.send(new AlphaNumericMessage(Message.Type.EVENT, "") { - @Override public void writeTo(StringBuilder builder) { + @Override public void writeTo(StringBuilder builder, String containerID) { final String title = escapeEventString(prefix + event.getTitle()); final String text = escapeEventString(event.getText()); builder.append(Message.Type.EVENT.toString()) @@ -1037,6 +1068,9 @@ public void recordEvent(final Event event, final String... eventTags) { eventMap(event, builder); tagString(eventTags, builder); + if (containerID != null && !containerID.isEmpty()) { + builder.append("|c:").append(containerID); + } builder.append('\n'); } @@ -1071,7 +1105,7 @@ private int getUtf8Length(final String text) { @Override public void recordServiceCheckRun(final ServiceCheck sc) { statsDProcessor.send(new AlphaNumericMessage(Message.Type.SERVICE_CHECK, "") { - @Override public void writeTo(StringBuilder sb) { + @Override public void writeTo(StringBuilder sb, String containerID) { // see http://docs.datadoghq.com/guides/dogstatsd/#service-checks sb.append(Message.Type.SERVICE_CHECK.toString()) .append("|") @@ -1088,6 +1122,9 @@ public void recordServiceCheckRun(final ServiceCheck sc) { if (sc.getMessage() != null) { sb.append("|m:").append(sc.getEscapedMessage()); } + if (containerID != null && !containerID.isEmpty()) { + sb.append("|c:").append(containerID); + } sb.append('\n'); } @@ -1154,11 +1191,14 @@ protected void writeValue(StringBuilder builder) { builder.append(getValue()); } - @Override protected final void writeTo(StringBuilder builder) { + @Override protected final void writeTo(StringBuilder builder, String containerID) { builder.append(prefix).append(aspect).append(':'); writeValue(builder); builder.append('|').append(type); tagString(this.tags, builder); + if (containerID != null && !containerID.isEmpty()) { + builder.append("|c:").append(containerID); + } builder.append('\n'); } @@ -1169,5 +1209,39 @@ protected boolean isInvalidSample(double sampleRate) { return sampleRate != 1 && ThreadLocalRandom.current().nextDouble() > sampleRate; } + boolean isOriginDetectionEnabled(String containerID, boolean originDetectionEnabled, boolean hasEntityID) { + if (!originDetectionEnabled || hasEntityID || (containerID != null && !containerID.isEmpty())) { + // origin detection is explicitly disabled + // or DD_ENTITY_ID was found + // or a user-defined container ID was provided + return false; + } + + String value = System.getenv(ORIGIN_DETECTION_ENABLED_ENV_VAR); + value = value != null ? value.trim() : null; + if (value != null && !value.isEmpty()) { + return !Arrays.asList("no", "false", "0", "n", "off").contains(value.toLowerCase()); + } + // DD_ORIGIN_DETECTION_ENABLED is not set or is empty + // default to true + return true; + } + + private String getContainerID(String containerID, boolean originDetectionEnabled) { + if (containerID != null && !containerID.isEmpty()) { + return containerID; + } + + if (originDetectionEnabled) { + CgroupReader reader = new CgroupReader(); + try { + return reader.getContainerID(); + } catch (final IOException e) { + throw new StatsDClientException("Failed to get container ID", e); + } + } + + return null; + } } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index a1153790..74ef6b0a 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -32,6 +32,7 @@ public class NonBlockingStatsDClientBuilder implements Cloneable { public int telemetryFlushInterval = Telemetry.DEFAULT_FLUSH_INTERVAL; public int aggregationFlushInterval = StatsDAggregator.DEFAULT_FLUSH_INTERVAL; public int aggregationShards = StatsDAggregator.DEFAULT_SHARDS; + public boolean originDetectionEnabled = NonBlockingStatsDClient.DEFAULT_ENABLE_ORIGIN_DETECTION; public Callable addressLookup; public Callable telemetryAddressLookup; @@ -42,6 +43,7 @@ public class NonBlockingStatsDClientBuilder implements Cloneable { public String prefix; public String entityID; public String[] constantTags; + public String containerID; public StatsDClientErrorHandler errorHandler; public ThreadFactory threadFactory; @@ -173,6 +175,16 @@ public NonBlockingStatsDClientBuilder threadFactory(ThreadFactory val) { return this; } + public NonBlockingStatsDClientBuilder containerID(String val) { + containerID = val; + return this; + } + + public NonBlockingStatsDClientBuilder originDetectionEnabled(boolean val) { + originDetectionEnabled = val; + return this; + } + /** * NonBlockingStatsDClient factory method. * @return the built NonBlockingStatsDClient. diff --git a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java index 25a94768..f32005b9 100644 --- a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java @@ -30,10 +30,10 @@ protected boolean haveMessages() { StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers, final int aggregatorFlushInterval, final int aggregatorShards, - final ThreadFactory threadFactory) throws Exception { + final ThreadFactory threadFactory, final String containerID) throws Exception { super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, - aggregatorFlushInterval, aggregatorShards, threadFactory); + aggregatorFlushInterval, aggregatorShards, threadFactory, containerID); this.messages = new ArrayBlockingQueue<>(queueSize); } diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index 51faa969..0ff9d8ca 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -38,10 +38,10 @@ protected boolean haveMessages() { StatsDNonBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers, final int aggregatorFlushInterval, final int aggregatorShards, - final ThreadFactory threadFactory) throws Exception { + final ThreadFactory threadFactory, final String containerID) throws Exception { super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, - aggregatorFlushInterval, aggregatorShards, threadFactory); + aggregatorFlushInterval, aggregatorShards, threadFactory, containerID); this.qsize = new AtomicInteger(0); this.messages = new ConcurrentLinkedQueue<>(); } diff --git a/src/main/java/com/timgroup/statsd/StatsDProcessor.java b/src/main/java/com/timgroup/statsd/StatsDProcessor.java index b6579006..be06ffbd 100644 --- a/src/main/java/com/timgroup/statsd/StatsDProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDProcessor.java @@ -43,6 +43,8 @@ public abstract class StatsDProcessor { protected volatile boolean shutdown; volatile boolean shutdownAgg; + String containerID; + protected abstract class ProcessingTask implements Runnable { protected StringBuilder builder = new StringBuilder(); protected CharBuffer buffer = CharBuffer.wrap(builder); @@ -97,7 +99,7 @@ protected void processLoop() { } builder.setLength(0); - message.writeTo(builder); + message.writeTo(builder, containerID); int lowerBoundSize = builder.length(); if (sendBuffer.capacity() < lowerBoundSize) { @@ -157,7 +159,7 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) { StatsDProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers, final int aggregatorFlushInterval, final int aggregatorShards, - final ThreadFactory threadFactory) throws Exception { + final ThreadFactory threadFactory, final String containerID) throws Exception { this.handler = handler; this.threadFactory = threadFactory; @@ -170,6 +172,8 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) { this.endSignal = new CountDownLatch(workers); this.closeSignal = new CountDownLatch(workers); this.aggregator = new StatsDAggregator(this, aggregatorShards, aggregatorFlushInterval); + + this.containerID = containerID; } protected abstract ProcessingTask createProcessingTask(); diff --git a/src/main/java/com/timgroup/statsd/Telemetry.java b/src/main/java/com/timgroup/statsd/Telemetry.java index 1c6b58db..0f01a0d2 100644 --- a/src/main/java/com/timgroup/statsd/Telemetry.java +++ b/src/main/java/com/timgroup/statsd/Telemetry.java @@ -69,14 +69,19 @@ protected TelemetryMessage(String metric, Integer value, String tags) { } @Override - public final void writeTo(StringBuilder builder) { + public final void writeTo(StringBuilder builder, String containerID) { builder.append(aspect) .append(':') .append(this.value) .append('|') .append(type) - .append(tagsString) - .append('\n'); // already has the statsd separator baked-in + .append(tagsString); + + if (containerID != null && !containerID.isEmpty()) { + builder.append("|c:").append(containerID); + } + + builder.append('\n'); // already has the statsd separator baked-in } } diff --git a/src/test/java/com/timgroup/statsd/CgroupReaderTest.java b/src/test/java/com/timgroup/statsd/CgroupReaderTest.java new file mode 100644 index 00000000..8d6036da --- /dev/null +++ b/src/test/java/com/timgroup/statsd/CgroupReaderTest.java @@ -0,0 +1,123 @@ +package com.timgroup.statsd; + +import static org.junit.Assert.assertNull; + +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class CgroupReaderTest { + @Test + public void containerID_parse() throws Exception { + // Docker + String docker = new StringBuilder() + .append("13:name=systemd:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860\n") + .append("12:pids:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860\n") + .append("11:hugetlb:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860\n") + .append("10:net_prio:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860\n") + .append("9:perf_event:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860\n") + .append("8:net_cls:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860\n") + .append("7:freezer:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860\n") + .append("6:devices:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860\n") + .append("5:memory:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860\n") + .append("4:blkio:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860\n") + .append("3:cpuacct:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860\n") + .append("2:cpu:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860\n") + .append("1:cpuset:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860") + .toString(); + + assertThat(CgroupReader.parse(docker), equalTo("3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860")); + + // Kubernetes + String kubernetes = new StringBuilder() + .append("11:perf_event:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1\n") + .append("10:pids:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1\n") + .append("9:memory:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1\n") + .append("8:cpu,cpuacct:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1\n") + .append("7:blkio:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1\n") + .append("6:cpuset:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1\n") + .append("5:devices:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1\n") + .append("4:freezer:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1\n") + .append(" 3:net_cls,net_prio:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1\n") + .append("2:hugetlb:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1\n") + .append("1:name=systemd:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1") + .toString(); + + assertThat(CgroupReader.parse(kubernetes), equalTo("3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1")); + + // ECS EC2 + String ecs = new StringBuilder() + .append("9:perf_event:/ecs/name-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce\n") + .append("8:memory:/ecs/name-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce\n") + .append("7:hugetlb:/ecs/name-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce\n") + .append("6:freezer:/ecs/name-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce\n") + .append("5:devices:/ecs/name-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce\n") + .append("4:cpuset:/ecs/name-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce\n") + .append("3:cpuacct:/ecs/name-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce\n") + .append("2:cpu:/ecs/name-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce\n") + .append("1:blkio:/ecs/name-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce") + .toString(); + + assertThat(CgroupReader.parse(ecs), equalTo("38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce")); + + // ECS Fargate + String ecsFargate = new StringBuilder() + .append("11:hugetlb:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da\n") + .append("10:pids:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da\n") + .append("9:cpuset:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da\n") + .append("8:net_cls,net_prio:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da\n") + .append("7:cpu,cpuacct:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da\n") + .append("6:perf_event:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da\n") + .append("5:freezer:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da\n") + .append("4:devices:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da\n") + .append("3:blkio:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da\n") + .append("2:memory:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da\n") + .append("1:name=systemd:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da\n") + .toString(); + + assertThat(CgroupReader.parse(ecsFargate), equalTo("432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da")); + + // ECS Fargate >= 1.4.0 + String ecsFargate14 = new StringBuilder() + .append("11:hugetlb:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("10:pids:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("9:cpuset:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("8:net_cls,net_prio:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("7:cpu,cpuacct:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("6:perf_event:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("5:freezer:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("4:devices:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("3:blkio:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("2:memory:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("1:name=systemd:/ecs/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .toString(); + + assertThat(CgroupReader.parse(ecsFargate14), equalTo("34dc0b5e626f2c5c4c5170e34b10e765-1234567890")); + + // Linux non-containerized + String nonContainerized = new StringBuilder() + .append("11:blkio:/user.slice/user-0.slice/session-14.scope\n") + .append("10:memory:/user.slice/user-0.slice/session-14.scope\n") + .append("9:hugetlb:/\n") + .append("8:cpuset:/\n") + .append("7:pids:/user.slice/user-0.slice/session-14.scope\n") + .append("6:freezer:/\n") + .append("5:net_cls,net_prio:/\n") + .append("4:perf_event:/\n") + .append("3:cpu,cpuacct:/user.slice/user-0.slice/session-14.scope\n") + .append("2:devices:/user.slice/user-0.slice/session-14.scope\n") + .append("1:name=systemd:/user.slice/user-0.slice/session-14.scope\n") + .toString(); + + assertNull(CgroupReader.parse(nonContainerized)); + + // Linux 4.4 + String linux44 = new StringBuilder() + .append("11:pids:/system.slice/docker-cde7c2bab394630a42d73dc610b9c57415dced996106665d427f6d0566594411.scope\n") + .append("1:name=systemd:/system.slice/docker-cde7c2bab394630a42d73dc610b9c57415dced996106665d427f6d0566594411.scope\n") + .toString(); + + assertThat(CgroupReader.parse(linux44), equalTo("cde7c2bab394630a42d73dc610b9c57415dced996106665d427f6d0566594411")); + } +} diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java index 6a01d4e5..e6cc922a 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java @@ -27,6 +27,7 @@ public final class NonBlockingStatsDClientPerfTest { .blocking(true) // non-blocking processors will drop messages if the queue fills up .enableTelemetry(false) .enableAggregation(false) + .originDetectionEnabled(false) .build(); private static final NonBlockingStatsDClient clientAggr = new NonBlockingStatsDClientBuilder().prefix("my.prefix.aggregated") @@ -34,6 +35,7 @@ public final class NonBlockingStatsDClientPerfTest { .port(STATSD_SERVER_PORT) .blocking(true) // non-blocking processors will drop messages if the queue fills up .enableTelemetry(false) + .originDetectionEnabled(false) .build(); private ExecutorService executor; diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index ce27a0d2..17ea2cd9 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @FixMethodOrder(MethodSorters.NAME_ASCENDING) @@ -34,6 +35,7 @@ public class NonBlockingStatsDClientTest { private static final int STATSD_SERVER_PORT = 17254; private static NonBlockingStatsDClient client; private static NonBlockingStatsDClient clientUnaggregated; + private static NonBlockingStatsDClient clientWithContainerID; private static DummyStatsDServer server; private static Logger log = Logger.getLogger("NonBlockingStatsDClientTest"); @@ -49,6 +51,7 @@ public static void start() throws IOException { .hostname("localhost") .port(STATSD_SERVER_PORT) .enableTelemetry(false) + .originDetectionEnabled(false) .build(); clientUnaggregated = new NonBlockingStatsDClientBuilder() .prefix("my.prefix") @@ -56,6 +59,14 @@ public static void start() throws IOException { .port(STATSD_SERVER_PORT) .enableTelemetry(false) .enableAggregation(false) + .originDetectionEnabled(false) + .build(); + clientWithContainerID = new NonBlockingStatsDClientBuilder() + .prefix("my.prefix") + .hostname("localhost") + .port(STATSD_SERVER_PORT) + .enableTelemetry(false) + .containerID("fake-container-id") .build(); } @@ -64,6 +75,7 @@ public static void stop() { try { client.stop(); clientUnaggregated.stop(); + clientWithContainerID.stop(); server.close(); } catch (java.io.IOException ignored) { } @@ -444,6 +456,7 @@ public void sends_gauge_mixed_tags_deprecated() throws Exception { .port(STATSD_SERVER_PORT) .queueSize(Integer.MAX_VALUE) .constantTags("instance:foo", "app:bar") + .originDetectionEnabled(false) .build(); try { client.gauge("value", 423, "baz"); @@ -464,6 +477,7 @@ public void sends_gauge_mixed_tags() throws Exception { .port(STATSD_SERVER_PORT) .queueSize(Integer.MAX_VALUE) .constantTags("instance:foo", "app:bar") + .originDetectionEnabled(false) .build(); try { client.gauge("value", 423, "baz"); @@ -484,6 +498,7 @@ public void sends_gauge_mixed_tags_with_sample_rate_deprecated() throws Exceptio .port(STATSD_SERVER_PORT) .queueSize(Integer.MAX_VALUE) .constantTags("instance:foo", "app:bar") + .originDetectionEnabled(false) .build(); try { client.gauge("value", 423, 1, "baz"); @@ -506,6 +521,7 @@ public void sends_gauge_mixed_tags_with_sample_rate() throws Exception { .queueSize(Integer.MAX_VALUE) .constantTags("instance:foo", "app:bar") .enableAggregation(false) + .originDetectionEnabled(false) .build(); try { client.gauge("value", 423, 1, "baz"); @@ -526,6 +542,7 @@ public void sends_gauge_constant_tags_only_deprecated() throws Exception { .port(STATSD_SERVER_PORT) .queueSize(Integer.MAX_VALUE) .constantTags("instance:foo", "app:bar") + .originDetectionEnabled(false) .build(); try { client.gauge("value", 423); @@ -546,6 +563,7 @@ public void sends_gauge_constant_tags_only() throws Exception { .port(STATSD_SERVER_PORT) .queueSize(Integer.MAX_VALUE) .constantTags("instance:foo", "app:bar") + .originDetectionEnabled(false) .build(); try { client.gauge("value", 423); @@ -629,6 +647,7 @@ public void sends_gauge_entityID_from_env_and_constant_tags() throws Exception { .hostname("localhost") .port(STATSD_SERVER_PORT) .constantTags(constantTags) + .originDetectionEnabled(false) .build(); try { client.gauge("value", 423); @@ -694,6 +713,7 @@ public void init_client_from_env_vars() throws Exception { environmentVariables.set(NonBlockingStatsDClient.DD_AGENT_HOST_ENV_VAR, "localhost"); final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder() .prefix("my.prefix") + .originDetectionEnabled(false) .build(); try { client.gauge("value", 423); @@ -716,6 +736,7 @@ public void checkEnvVars() { .prefix("checkEnvVars") .hostname("localhost") .port(STATSD_SERVER_PORT) + .originDetectionEnabled(false) .build(); server.clear(); client.gauge("value", 42); @@ -740,6 +761,7 @@ public void sends_gauge_empty_prefix_deprecated() throws Exception { .prefix("") .hostname("localhost") .port(STATSD_SERVER_PORT) + .originDetectionEnabled(false) .build(); try { @@ -759,6 +781,7 @@ public void sends_gauge_empty_prefix() throws Exception { .prefix("") .hostname("localhost") .port(STATSD_SERVER_PORT) + .originDetectionEnabled(false) .build(); try { client.gauge("top.level.value", 423); @@ -777,6 +800,7 @@ public void sends_gauge_null_prefix_deprecated() throws Exception { .prefix(null) .hostname("localhost") .port(STATSD_SERVER_PORT) + .originDetectionEnabled(false) .build(); try { @@ -796,6 +820,7 @@ public void sends_gauge_null_prefix() throws Exception { .prefix(null) .hostname("localhost") .port(STATSD_SERVER_PORT) + .originDetectionEnabled(false) .build(); try { client.gauge("top.level.value", 423); @@ -813,6 +838,7 @@ public void sends_gauge_no_prefix() throws Exception { final NonBlockingStatsDClient no_prefix_client = new NonBlockingStatsDClientBuilder() .hostname("localhost") .port(STATSD_SERVER_PORT) + .originDetectionEnabled(false) .build(); try { no_prefix_client.gauge("top.level.value", 423); @@ -914,6 +940,7 @@ public void sends_event_empty_prefix_deprecated() throws Exception { .prefix("") .hostname("localhost") .port(STATSD_SERVER_PORT) + .originDetectionEnabled(false) .build(); final Event event = Event.builder() .withTitle("title1") @@ -942,6 +969,7 @@ public void sends_event_empty_prefix() throws Exception { .prefix("") .hostname("localhost") .port(STATSD_SERVER_PORT) + .originDetectionEnabled(false) .build(); final Event event = Event.builder() @@ -1038,6 +1066,7 @@ public void sends_too_large_message_deprecated() throws Exception { .hostname("localhost") .port(STATSD_SERVER_PORT) .errorHandler(errorHandler) + .originDetectionEnabled(false) .build()) { final byte[] messageBytes = new byte[1600]; @@ -1077,6 +1106,7 @@ public void sends_too_large_message() throws Exception { .hostname("localhost") .port(STATSD_SERVER_PORT) .errorHandler(errorHandler) + .originDetectionEnabled(false) .build()) { final byte[] messageBytes = new byte[1600]; @@ -1119,6 +1149,7 @@ public void sends_telemetry_elsewhere() throws Exception { .telemetryPort(STATSD_SERVER_PORT+10) .telemetryFlushInterval(3000) .errorHandler(errorHandler) + .originDetectionEnabled(false) .build(); try { @@ -1158,6 +1189,7 @@ public void testBasicGaugeAggregation() throws Exception { .enableAggregation(true) .aggregationFlushInterval(3000) .errorHandler(errorHandler) + .originDetectionEnabled(false) .build(); try { @@ -1187,6 +1219,7 @@ public void testBasicCountAggregation() throws Exception { .enableAggregation(true) .aggregationFlushInterval(3000) .errorHandler(errorHandler) + .originDetectionEnabled(false) .build(); try { @@ -1220,6 +1253,7 @@ public void testSampledCountAggregation() throws Exception { .enableAggregation(true) .aggregationFlushInterval(3000) .errorHandler(errorHandler) + .originDetectionEnabled(false) .build(); try { @@ -1255,6 +1289,7 @@ public void testBasicSetAggregation() throws Exception { .enableAggregation(true) .aggregationFlushInterval(3000) .errorHandler(errorHandler) + .originDetectionEnabled(false) .build(); try { @@ -1286,6 +1321,7 @@ public void testAggregationTelemetry() throws Exception { .aggregationFlushInterval(3000) .telemetryFlushInterval(3000) .errorHandler(errorHandler) + .originDetectionEnabled(false) .build(); try { @@ -1322,6 +1358,7 @@ public void testBasicUnaggregatedMetrics() throws Exception { .enableAggregation(true) .aggregationFlushInterval(3000) .errorHandler(errorHandler) + .originDetectionEnabled(false) .build(); try { @@ -1385,6 +1422,7 @@ public void shutdown_test() throws Exception { final NonBlockingStatsDClientBuilder builder = new SlowStatsDNonBlockingStatsDClientBuilder().prefix("") .hostname("localhost") + .originDetectionEnabled(false) .port(port); final SlowStatsDNonBlockingStatsDClient client = ((SlowStatsDNonBlockingStatsDClientBuilder)builder).build(); @@ -1400,6 +1438,138 @@ public void shutdown_test() throws Exception { } } + @Test(timeout = 5000L) + public void sends_gauge_with_containerID() throws Exception { + clientWithContainerID.gauge("value", 423, "foo"); + server.waitForMessage("my.prefix"); + + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.value:423|g|#foo|c:fake-container-id"))); + } + + @Test(timeout = 5000L) + public void sends_counter_with_containerID() throws Exception { + clientWithContainerID.count("value", 423, "foo"); + server.waitForMessage("my.prefix"); + + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.value:423|c|#foo|c:fake-container-id"))); + } + + @Test(timeout = 5000L) + public void sends_set_with_containerID() throws Exception { + clientWithContainerID.recordSetValue("myset", "myuserid", "foo"); + server.waitForMessage("my.prefix.myset"); + + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.myset:myuserid|s|#foo|c:fake-container-id"))); + } + + @Test(timeout = 5000L) + public void sends_service_check_with_containerID() throws Exception { + final String inputMessage = "\u266c \u2020\u00f8U \n\u2020\u00f8U \u00a5\u00bau|m: T0\u00b5 \u266a"; // "♬ †øU \n†øU ¥ºu|m: T0µ ♪" + final String outputMessage = "\u266c \u2020\u00f8U \\n\u2020\u00f8U \u00a5\u00bau|m\\: T0\u00b5 \u266a"; // note the escaped colon + final String[] tags = {"key1:val1", "key2:val2"}; + final ServiceCheck sc = ServiceCheck.builder() + .withName("my_check.name") + .withStatus(ServiceCheck.Status.WARNING) + .withMessage(inputMessage) + .withHostname("i-abcd1234") + .withTags(tags) + .withTimestamp(1420740000) + .build(); + + assertEquals(outputMessage, sc.getEscapedMessage()); + + clientWithContainerID.serviceCheck(sc); + server.waitForMessage("_sc"); + + assertThat(server.messagesReceived(), hasItem(comparesEqualTo(String.format("_sc|my_check.name|1|d:1420740000|h:i-abcd1234|#key2:val2,key1:val1|m:%s|c:fake-container-id", + outputMessage)))); + } + + @Test(timeout = 5000L) + public void sends_event_with_containerID() throws Exception { + + final Event event = Event.builder() + .withTitle("title1") + .withText("text1\nline2") + .withDate(1234567000) + .withHostname("host1") + .withPriority(Event.Priority.LOW) + .withAggregationKey("key1") + .withAlertType(Event.AlertType.ERROR) + .withSourceTypeName("sourcetype1") + .build(); + + clientWithContainerID.recordEvent(event); + server.waitForMessage(); + + assertThat( + server.messagesReceived(), + hasItem(comparesEqualTo("_e{16,12}:my.prefix.title1|text1\\nline2|d:1234567|h:host1|k:key1|p:low|t:error|s:sourcetype1|c:fake-container-id")) + ); + } + + @Test(timeout = 5000L) + public void origin_detection_env_false() throws Exception { + environmentVariables.set(NonBlockingStatsDClient.ORIGIN_DETECTION_ENABLED_ENV_VAR, "false"); + + final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder() + .prefix("my.prefix") + .hostname("localhost") + .port(STATSD_SERVER_PORT) + .queueSize(Integer.MAX_VALUE) + .errorHandler(null) + .enableTelemetry(false) + .build(); + + assertFalse(client.isOriginDetectionEnabled(null, NonBlockingStatsDClient.DEFAULT_ENABLE_ORIGIN_DETECTION, false)); + environmentVariables.clear(NonBlockingStatsDClient.ORIGIN_DETECTION_ENABLED_ENV_VAR); + } + + @Test(timeout = 5000L) + public void origin_detection_env_unknown() throws Exception { + environmentVariables.set(NonBlockingStatsDClient.ORIGIN_DETECTION_ENABLED_ENV_VAR, "unknown"); // default to true + + final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder() + .prefix("my.prefix") + .hostname("localhost") + .port(STATSD_SERVER_PORT) + .queueSize(Integer.MAX_VALUE) + .errorHandler(null) + .enableTelemetry(false) + .build(); + + assertTrue(client.isOriginDetectionEnabled(null, NonBlockingStatsDClient.DEFAULT_ENABLE_ORIGIN_DETECTION, false)); + environmentVariables.clear(NonBlockingStatsDClient.ORIGIN_DETECTION_ENABLED_ENV_VAR); + } + + @Test(timeout = 5000L) + public void origin_detection_env_unset() throws Exception { + final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder() + .prefix("my.prefix") + .hostname("localhost") + .port(STATSD_SERVER_PORT) + .queueSize(Integer.MAX_VALUE) + .errorHandler(null) + .enableTelemetry(false) + .build(); + + assertTrue(client.isOriginDetectionEnabled(null, NonBlockingStatsDClient.DEFAULT_ENABLE_ORIGIN_DETECTION, false)); + } + + @Test(timeout = 5000L) + public void origin_detection_arg_false() throws Exception { + final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder() + .prefix("my.prefix") + .hostname("localhost") + .port(STATSD_SERVER_PORT) + .queueSize(Integer.MAX_VALUE) + .errorHandler(null) + .enableTelemetry(false) + .build(); + + assertFalse(client.isOriginDetectionEnabled(null, false, false)); + } + private static class SlowStatsDNonBlockingStatsDClient extends NonBlockingStatsDClient { private CountDownLatch lock; @@ -1423,6 +1593,7 @@ private static class SlowStatsDNonBlockingStatsDClient extends NonBlockingStatsD .senderWorkers(senderWorkers) .processorWorkers(processorWorkers) .maxPacketSizeBytes(maxPacketSizeBytes) + .originDetectionEnabled(false) .resolve()); lock = new CountDownLatch(1); @@ -1487,6 +1658,7 @@ public void nonsampling_client_test() throws Exception { final NonBlockingStatsDClientBuilder builder = new NonsamplingClientBuilder() .prefix("") .hostname("localhost") + .originDetectionEnabled(false) .port(port); final NonsamplingClient client = ((NonsamplingClientBuilder)builder).build(); @@ -1512,6 +1684,7 @@ public void blocking_close_test() throws Exception { NonBlockingStatsDClientBuilder builder = new NonBlockingStatsDClientBuilder() { @Override public NonBlockingStatsDClient build() { + this.originDetectionEnabled(false); return new NonBlockingStatsDClient(resolve()) { @Override ClientChannel createByteChannel(Callable addressLookup, int timeout, int bufferSize) throws Exception { diff --git a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java index 3edcfa2c..5b0f9b57 100644 --- a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java +++ b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java @@ -65,7 +65,7 @@ protected FakeMessage(String aspect, Message.Type type, T value) { } @Override - protected void writeTo(StringBuilder builder){} + protected void writeTo(StringBuilder builder, String containerID){} } public static class FakeAlphaMessage extends AlphaNumericMessage { @@ -74,7 +74,7 @@ protected FakeAlphaMessage(String aspect, Message.Type type, String value) { } @Override - protected void writeTo(StringBuilder builder){} + protected void writeTo(StringBuilder builder, String containerID){} } @@ -86,7 +86,7 @@ public static class FakeProcessor extends StatsDProcessor { private final AtomicInteger messageAggregated = new AtomicInteger(0); FakeProcessor(final StatsDClientErrorHandler handler) throws Exception { - super(0, handler, 0, 1, 1, 0, 0, new StatsDThreadFactory()); + super(0, handler, 0, 1, 1, 0, 0, new StatsDThreadFactory(), null); this.messages = new ConcurrentLinkedQueue<>(); } diff --git a/src/test/java/com/timgroup/statsd/StatsDTestMessage.java b/src/test/java/com/timgroup/statsd/StatsDTestMessage.java index ed4f0a9a..7bf2b210 100644 --- a/src/test/java/com/timgroup/statsd/StatsDTestMessage.java +++ b/src/test/java/com/timgroup/statsd/StatsDTestMessage.java @@ -9,7 +9,7 @@ protected StatsDTestMessage(String aspect, Message.Type type, T value, double sa } @Override - public final void writeTo(StringBuilder builder) { + public final void writeTo(StringBuilder builder, String containerID) { builder.append("test.").append(aspect).append(':'); writeValue(builder); builder.append('|').append(type); @@ -17,6 +17,9 @@ public final void writeTo(StringBuilder builder) { builder.append('|').append('@').append(NonBlockingStatsDClient.format(NonBlockingStatsDClient.SAMPLE_RATE_FORMATTER, sampleRate)); } NonBlockingStatsDClient.tagString(this.tags, "", builder); + if (containerID != null && !containerID.isEmpty()) { + builder.append("|c:").append(containerID); + } builder.append('\n'); } diff --git a/src/test/java/com/timgroup/statsd/TelemetryTest.java b/src/test/java/com/timgroup/statsd/TelemetryTest.java index feab6d2a..9f4d1a8d 100644 --- a/src/test/java/com/timgroup/statsd/TelemetryTest.java +++ b/src/test/java/com/timgroup/statsd/TelemetryTest.java @@ -26,7 +26,7 @@ public static class FakeProcessor extends StatsDProcessor { public final List messages = new ArrayList<>(); FakeProcessor(final StatsDClientErrorHandler handler) throws Exception { - super(0, handler, 0, 1, 1, 0, 0, new StatsDThreadFactory()); + super(0, handler, 0, 1, 1, 0, 0, new StatsDThreadFactory(), null); } @@ -61,7 +61,7 @@ protected synchronized List getMessagesAsStrings() { ArrayList stringMessages = new ArrayList<>(messages.size()); for(Message m : messages) { sb.setLength(0); - m.writeTo(sb); + m.writeTo(sb, this.containerID); stringMessages.add(sb.toString()); } return stringMessages; @@ -80,6 +80,7 @@ public void clear() { .hostname("localhost") .constantTags("test") .port(STATSD_SERVER_PORT) + .originDetectionEnabled(false) .enableTelemetry(false); // disable telemetry so we can control calls to "flush" private static NonBlockingStatsDClient client = builder.build(); @@ -90,6 +91,7 @@ public void clear() { .constantTags("test") .port(STATSD_SERVER_PORT) .enableAggregation(false) + .originDetectionEnabled(false) .enableTelemetry(false); // disable telemetry so we can control calls to "flush" private static NonBlockingStatsDClient telemetryClient = telemetryBuilder.build(); @@ -131,6 +133,7 @@ public void clear() { client.telemetry.reset(); telemetryClient.telemetry.reset(); fakeProcessor.clear(); + fakeProcessor.containerID = null; } @Test(timeout = 5000L) @@ -382,6 +385,7 @@ public void telemetry_droppedData() throws Exception { .constantTags("test") .port(0) .enableTelemetry(false) // disable telemetry so we can control calls to "flush" + .originDetectionEnabled(false) .build(); assertThat(clientError.statsDProcessor.bufferPool.getBufferSize(), equalTo(8192)); @@ -492,4 +496,49 @@ public void telemetry_DevModeData() throws Exception { assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.aggregated_context_by_type:0|c|#test," + telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.SET) + "\n")); } + + @Test(timeout = 5000L) + public void telemetry_containerID() throws Exception { + final String fakeContainerID = "fake-container-id"; + client.count("mycount", 24); + fakeProcessor.containerID = fakeContainerID; + + // wait for the "mycount" to be sent + server.waitForMessage("mycount"); + server.clear(); + fakeProcessor.clear(); + + assertThat(client.telemetry.metricsSent.get(), equalTo(1)); + client.telemetry.flush(); + assertThat(client.telemetry.metricsSent.get(), equalTo(0)); + + List statsdMessages = fakeProcessor.getMessagesAsStrings() ; + + assertThat(statsdMessages, + hasItem("datadog.dogstatsd.client.metrics:1|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); + + assertThat(statsdMessages, + hasItem("datadog.dogstatsd.client.events:0|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); + + assertThat(statsdMessages, + hasItem("datadog.dogstatsd.client.service_checks:0|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); + + assertThat(statsdMessages, + hasItem("datadog.dogstatsd.client.bytes_sent:29|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); + + assertThat(statsdMessages, + hasItem("datadog.dogstatsd.client.bytes_dropped:0|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); + + assertThat(statsdMessages, + hasItem("datadog.dogstatsd.client.packets_sent:1|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); + + assertThat(statsdMessages, + hasItem("datadog.dogstatsd.client.packets_dropped:0|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); + + assertThat(statsdMessages, + hasItem("datadog.dogstatsd.client.packets_dropped_queue:0|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); + + assertThat(statsdMessages, + hasItem("datadog.dogstatsd.client.aggregated_context:0|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); + } } diff --git a/src/test/java/com/timgroup/statsd/UnixSocketTest.java b/src/test/java/com/timgroup/statsd/UnixSocketTest.java index 8dc47ae2..ca100bde 100644 --- a/src/test/java/com/timgroup/statsd/UnixSocketTest.java +++ b/src/test/java/com/timgroup/statsd/UnixSocketTest.java @@ -71,6 +71,7 @@ public void start() throws IOException { .socketBufferSize(1024 * 1024) .enableAggregation(false) .errorHandler(this) + .originDetectionEnabled(false) .build(); clientAggregate = new NonBlockingStatsDClientBuilder().prefix("my.prefix") @@ -81,6 +82,7 @@ public void start() throws IOException { .socketBufferSize(1024 * 1024) .enableAggregation(false) .errorHandler(this) + .originDetectionEnabled(false) .build(); }