diff --git a/.gitignore b/.gitignore index a506f68c..64d86c2d 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,3 @@ build # rbenv for pimpmychangelog .ruby-version - -# vscode -.vscode/* diff --git a/src/main/java/com/timgroup/statsd/CgroupReader.java b/src/main/java/com/timgroup/statsd/CgroupReader.java new file mode 100644 index 00000000..a2343115 --- /dev/null +++ b/src/main/java/com/timgroup/statsd/CgroupReader.java @@ -0,0 +1,73 @@ +package com.timgroup.statsd; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A reader class that retrieves the current container ID parsed from a the cgroup file. + * + */ +public class CgroupReader { + private static final Path CGROUP_PATH = Paths.get("/proc/self/cgroup"); + private static final String CONTAINER_SOURCE = "[0-9a-f]{64}"; + private static final String TASK_SOURCE = "[0-9a-f]{32}-\\d+"; + private static final Pattern LINE_RE = Pattern.compile("^\\d+:[^:]*:(.+)$", Pattern.MULTILINE | Pattern.UNIX_LINES); + private static final Pattern CONTAINER_RE = + Pattern.compile( + "(" + CONTAINER_SOURCE + "|" + TASK_SOURCE + ")(?:.scope)?$"); + + private boolean readOnce = false; + public String containerID; + + /** + * Parses /proc/self/cgroup and returns the container ID if available. + * + * @throws IOException + * if /proc/self/cgroup is readable and still an I/O error occurs reading from the stream + */ + public String getContainerID() throws IOException { + if (readOnce) { + return containerID; + } + + containerID = read(CGROUP_PATH); + return containerID; + } + + private String read(Path path) throws IOException { + readOnce = true; + if (!Files.isReadable(path)) { + return null; + } + + final String content = new String(Files.readAllBytes(path)); + if (content.isEmpty()) { + return null; + } + + return parse(content); + } + + /** + * Parses a Cgroup file content and returns the corresponding container ID. + * + * @param cgroupsContent + * Cgroup file content + */ + public static String parse(final String cgroupsContent) { + final Matcher lines = LINE_RE.matcher(cgroupsContent); + while (lines.find()) { + final String path = lines.group(1); + final Matcher matcher = CONTAINER_RE.matcher(path); + if (matcher.find()) { + return matcher.group(1); + } + } + + return null; + } +} diff --git a/src/main/java/com/timgroup/statsd/ContainerID.java b/src/main/java/com/timgroup/statsd/ContainerID.java deleted file mode 100644 index 2f6efa95..00000000 --- a/src/main/java/com/timgroup/statsd/ContainerID.java +++ /dev/null @@ -1,89 +0,0 @@ -package com.timgroup.statsd; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * A reader class that retrieves the current container ID parsed from a the cgroup file. - * - */ -public class ContainerID { - private static final Path CGROUP_PATH = Paths.get("/proc/self/cgroup"); - private static final String UUID_SOURCE = "[0-9a-f]{8}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{12}"; - private static final String CONTAINER_SOURCE = "[0-9a-f]{64}"; - private static final String TASK_SOURCE = "[0-9a-f]{32}-\\d+"; - private static final Pattern LINE_RE = Pattern.compile("(\\d+):([^:]*):(.+)$"); - private static final Pattern CONTAINER_RE = - Pattern.compile( - "(?:.+)?(" + UUID_SOURCE + "|" + CONTAINER_SOURCE + "|" + TASK_SOURCE + ")(?:.scope)?$"); - - private static final ContainerID INSTANCE; - - public String containerID; - - public String getContainerID() { - return containerID; - } - - public void setContainerID(String containerID) { - this.containerID = containerID; - } - - static { - INSTANCE = ContainerID.read(CGROUP_PATH); - } - - public static ContainerID get() { - return INSTANCE; - } - - static ContainerID read(Path path) { - final String content; - try { - content = new String(Files.readAllBytes(path)); - } catch (final IOException e) { - return new ContainerID(); - } - - if (content.isEmpty()) { - return new ContainerID(); - } - - return parse(content); - } - - /** - * Parses a Cgroup file content and returns the corresponding container ID. - * - * @param cgroupsContent - * Cgroup file content - */ - public static ContainerID parse(final String cgroupsContent) { - final ContainerID containerID = new ContainerID(); - final String[] lines = cgroupsContent.split("\n"); - for (final String line : lines) { - final Matcher matcher = LINE_RE.matcher(line); - - if (!matcher.matches()) { - continue; - } - - final String path = matcher.group(3); - final String[] pathParts = path.split("/"); - - if (pathParts.length >= 1) { - final Matcher containerIDMatcher = CONTAINER_RE.matcher(pathParts[pathParts.length - 1]); - if (containerIDMatcher.matches()) { - containerID.setContainerID(containerIDMatcher.group(1)); - return containerID; - } - } - } - - return containerID; - } -} diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index d878646b..17869d5c 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -295,7 +295,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory(); statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize, - processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory); + processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory, containerID); Properties properties = new Properties(); properties.load(getClass().getClassLoader().getResourceAsStream( @@ -313,7 +313,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final // similar settings, but a single worker and non-blocking. telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, - poolSize, 1, false, 0, aggregationShards, threadFactory); + poolSize, 1, false, 0, aggregationShards, threadFactory, containerID); } this.telemetry = new Telemetry.Builder() @@ -336,9 +336,6 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final statsDProcessor.setTelemetry(this.telemetry); statsDSender.setTelemetry(this.telemetry); - // set container ID - statsDProcessor.setContainerID(containerID); - } catch (final Exception e) { throw new StatsDClientException("Failed to start StatsD client", e); } @@ -378,14 +375,15 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) thr protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int bufferPoolSize, final int workers, final boolean blocking, - final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory threadFactory) + final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory threadFactory, + final String containerID) throws Exception { if (blocking) { return new StatsDBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize, - workers, aggregationFlushInterval, aggregationShards, threadFactory); + workers, aggregationFlushInterval, aggregationShards, threadFactory, containerID); } else { return new StatsDNonBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize, - workers, aggregationFlushInterval, aggregationShards, threadFactory); + workers, aggregationFlushInterval, aggregationShards, threadFactory, containerID); } } @@ -1221,13 +1219,13 @@ protected boolean isOriginDetectionEnabled(String containerID, boolean originDet return false; } - final String value = System.getenv(ORIGIN_DETECTION_ENABLED_ENV_VAR); - if (value != null && !value.trim().isEmpty()) { - Set falseValues = new HashSet(Arrays.asList("no", "false", "0", "n", "off")); - return !falseValues.contains(value.toLowerCase()); + String value = System.getenv(ORIGIN_DETECTION_ENABLED_ENV_VAR); + value = value != null ? value.trim() : null; + if (value != null && !value.isEmpty()) { + return !Arrays.asList("no", "false", "0", "n", "off").contains(value.toLowerCase()); } - // DD_ORIGIN_DETECTION_ENABLED is not set + // DD_ORIGIN_DETECTION_ENABLED is not set or is empty // default to true return true; } @@ -1238,7 +1236,12 @@ protected String getContainerID(String containerID, boolean originDetectionEnabl } if (originDetectionEnabled) { - return ContainerID.get().getContainerID(); + CgroupReader reader = new CgroupReader(); + try { + return reader.getContainerID(); + } catch (final IOException e) { + throw new StatsDClientException("Failed to get container ID", e); + } } return null; diff --git a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java index 25a94768..f32005b9 100644 --- a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java @@ -30,10 +30,10 @@ protected boolean haveMessages() { StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers, final int aggregatorFlushInterval, final int aggregatorShards, - final ThreadFactory threadFactory) throws Exception { + final ThreadFactory threadFactory, final String containerID) throws Exception { super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, - aggregatorFlushInterval, aggregatorShards, threadFactory); + aggregatorFlushInterval, aggregatorShards, threadFactory, containerID); this.messages = new ArrayBlockingQueue<>(queueSize); } diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index 51faa969..0ff9d8ca 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -38,10 +38,10 @@ protected boolean haveMessages() { StatsDNonBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers, final int aggregatorFlushInterval, final int aggregatorShards, - final ThreadFactory threadFactory) throws Exception { + final ThreadFactory threadFactory, final String containerID) throws Exception { super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, - aggregatorFlushInterval, aggregatorShards, threadFactory); + aggregatorFlushInterval, aggregatorShards, threadFactory, containerID); this.qsize = new AtomicInteger(0); this.messages = new ConcurrentLinkedQueue<>(); } diff --git a/src/main/java/com/timgroup/statsd/StatsDProcessor.java b/src/main/java/com/timgroup/statsd/StatsDProcessor.java index 6bc66e3c..08c37de4 100644 --- a/src/main/java/com/timgroup/statsd/StatsDProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDProcessor.java @@ -159,7 +159,7 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) { StatsDProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers, final int aggregatorFlushInterval, final int aggregatorShards, - final ThreadFactory threadFactory) throws Exception { + final ThreadFactory threadFactory, final String containerID) throws Exception { this.handler = handler; this.threadFactory = threadFactory; @@ -172,6 +172,8 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) { this.endSignal = new CountDownLatch(workers); this.closeSignal = new CountDownLatch(workers); this.aggregator = new StatsDAggregator(this, aggregatorShards, aggregatorFlushInterval); + + this.containerID = containerID; } protected abstract ProcessingTask createProcessingTask(); @@ -217,10 +219,6 @@ public Telemetry getTelemetry() { return telemetry; } - public void setContainerID(final String containerID) { - this.containerID = containerID; - } - void shutdown(boolean blocking) throws InterruptedException { shutdown = true; aggregator.stop(); diff --git a/src/test/java/com/timgroup/statsd/ContainerIDTest.java b/src/test/java/com/timgroup/statsd/CgroupReaderTest.java similarity index 69% rename from src/test/java/com/timgroup/statsd/ContainerIDTest.java rename to src/test/java/com/timgroup/statsd/CgroupReaderTest.java index 6b5d389c..692d36b2 100644 --- a/src/test/java/com/timgroup/statsd/ContainerIDTest.java +++ b/src/test/java/com/timgroup/statsd/CgroupReaderTest.java @@ -1,11 +1,13 @@ package com.timgroup.statsd; +import static org.junit.Assert.assertNull; + import org.junit.Test; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -public class ContainerIDTest { +public class CgroupReaderTest { @Test public void containerID_parse() throws Exception { // Docker @@ -25,7 +27,7 @@ public void containerID_parse() throws Exception { .append("1:cpuset:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860") .toString(); - assertThat(ContainerID.parse(docker).getContainerID(), equalTo("3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860")); + assertThat(CgroupReader.parse(docker), equalTo("3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860")); // Kubernetes String kubernetes = new StringBuilder() @@ -42,7 +44,7 @@ public void containerID_parse() throws Exception { .append("1:name=systemd:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1") .toString(); - assertThat(ContainerID.parse(kubernetes).getContainerID(), equalTo("3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1")); + assertThat(CgroupReader.parse(kubernetes), equalTo("3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1")); // ECS EC2 String ecs = new StringBuilder() @@ -57,7 +59,7 @@ public void containerID_parse() throws Exception { .append("1:blkio:/ecs/name-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce") .toString(); - assertThat(ContainerID.parse(ecs).getContainerID(), equalTo("38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce")); + assertThat(CgroupReader.parse(ecs), equalTo("38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce")); // ECS Fargate String ecsFargate = new StringBuilder() @@ -74,6 +76,40 @@ public void containerID_parse() throws Exception { .append("1:name=systemd:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da\n") .toString(); - assertThat(ContainerID.parse(ecsFargate).getContainerID(), equalTo("432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da")); + assertThat(CgroupReader.parse(ecsFargate), equalTo("432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da")); + + // ECS Fargate >= 1.4.0 + String ecsFargate14 = new StringBuilder() + .append("11:hugetlb:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("10:pids:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("9:cpuset:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("8:net_cls,net_prio:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("7:cpu,cpuacct:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("6:perf_event:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("5:freezer:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("4:devices:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("3:blkio:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("2:memory:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .append("1:name=systemd:/ecs/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n") + .toString(); + + assertThat(CgroupReader.parse(ecsFargate14), equalTo("34dc0b5e626f2c5c4c5170e34b10e765-1234567890")); + + // Linux non-containerized + String nonContainerized = new StringBuilder() + .append("11:blkio:/user.slice/user-0.slice/session-14.scope\n") + .append("10:memory:/user.slice/user-0.slice/session-14.scope\n") + .append("9:hugetlb:/\n") + .append("8:cpuset:/\n") + .append("7:pids:/user.slice/user-0.slice/session-14.scope\n") + .append("6:freezer:/\n") + .append("5:net_cls,net_prio:/\n") + .append("4:perf_event:/\n") + .append("3:cpu,cpuacct:/user.slice/user-0.slice/session-14.scope\n") + .append("2:devices:/user.slice/user-0.slice/session-14.scope\n") + .append("1:name=systemd:/user.slice/user-0.slice/session-14.scope\n") + .toString(); + + assertNull(CgroupReader.parse(nonContainerized)); } } diff --git a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java index e5b3d44f..5b0f9b57 100644 --- a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java +++ b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java @@ -86,7 +86,7 @@ public static class FakeProcessor extends StatsDProcessor { private final AtomicInteger messageAggregated = new AtomicInteger(0); FakeProcessor(final StatsDClientErrorHandler handler) throws Exception { - super(0, handler, 0, 1, 1, 0, 0, new StatsDThreadFactory()); + super(0, handler, 0, 1, 1, 0, 0, new StatsDThreadFactory(), null); this.messages = new ConcurrentLinkedQueue<>(); } diff --git a/src/test/java/com/timgroup/statsd/TelemetryTest.java b/src/test/java/com/timgroup/statsd/TelemetryTest.java index 00bc0508..9f4d1a8d 100644 --- a/src/test/java/com/timgroup/statsd/TelemetryTest.java +++ b/src/test/java/com/timgroup/statsd/TelemetryTest.java @@ -26,7 +26,7 @@ public static class FakeProcessor extends StatsDProcessor { public final List messages = new ArrayList<>(); FakeProcessor(final StatsDClientErrorHandler handler) throws Exception { - super(0, handler, 0, 1, 1, 0, 0, new StatsDThreadFactory()); + super(0, handler, 0, 1, 1, 0, 0, new StatsDThreadFactory(), null); }