Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmed-mez committed Mar 11, 2022
1 parent 1705a9d commit b5d5c8b
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 122 deletions.
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,3 @@ build

# rbenv for pimpmychangelog
.ruby-version

# vscode
.vscode/*
73 changes: 73 additions & 0 deletions src/main/java/com/timgroup/statsd/CgroupReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.timgroup.statsd;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* A reader class that retrieves the current container ID parsed from a the cgroup file.
*
*/
public class CgroupReader {
private static final Path CGROUP_PATH = Paths.get("/proc/self/cgroup");
private static final String CONTAINER_SOURCE = "[0-9a-f]{64}";
private static final String TASK_SOURCE = "[0-9a-f]{32}-\\d+";
private static final Pattern LINE_RE = Pattern.compile("^\\d+:[^:]*:(.+)$", Pattern.MULTILINE | Pattern.UNIX_LINES);
private static final Pattern CONTAINER_RE =
Pattern.compile(
"(" + CONTAINER_SOURCE + "|" + TASK_SOURCE + ")(?:.scope)?$");

private boolean readOnce = false;
public String containerID;

/**
* Parses /proc/self/cgroup and returns the container ID if available.
*
* @throws IOException
* if /proc/self/cgroup is readable and still an I/O error occurs reading from the stream
*/
public String getContainerID() throws IOException {
if (readOnce) {
return containerID;
}

containerID = read(CGROUP_PATH);
return containerID;
}

private String read(Path path) throws IOException {
readOnce = true;
if (!Files.isReadable(path)) {
return null;
}

final String content = new String(Files.readAllBytes(path));
if (content.isEmpty()) {
return null;
}

return parse(content);
}

/**
* Parses a Cgroup file content and returns the corresponding container ID.
*
* @param cgroupsContent
* Cgroup file content
*/
public static String parse(final String cgroupsContent) {
final Matcher lines = LINE_RE.matcher(cgroupsContent);
while (lines.find()) {
final String path = lines.group(1);
final Matcher matcher = CONTAINER_RE.matcher(path);
if (matcher.find()) {
return matcher.group(1);
}
}

return null;
}
}
89 changes: 0 additions & 89 deletions src/main/java/com/timgroup/statsd/ContainerID.java

This file was deleted.

31 changes: 17 additions & 14 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory();

statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize,
processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory);
processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory, containerID);

Properties properties = new Properties();
properties.load(getClass().getClassLoader().getResourceAsStream(
Expand All @@ -313,7 +313,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final

// similar settings, but a single worker and non-blocking.
telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes,
poolSize, 1, false, 0, aggregationShards, threadFactory);
poolSize, 1, false, 0, aggregationShards, threadFactory, containerID);
}

this.telemetry = new Telemetry.Builder()
Expand All @@ -336,9 +336,6 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
statsDProcessor.setTelemetry(this.telemetry);
statsDSender.setTelemetry(this.telemetry);

// set container ID
statsDProcessor.setContainerID(containerID);

} catch (final Exception e) {
throw new StatsDClientException("Failed to start StatsD client", e);
}
Expand Down Expand Up @@ -378,14 +375,15 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) thr

protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler,
final int maxPacketSizeBytes, final int bufferPoolSize, final int workers, final boolean blocking,
final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory threadFactory)
final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory threadFactory,
final String containerID)
throws Exception {
if (blocking) {
return new StatsDBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize,
workers, aggregationFlushInterval, aggregationShards, threadFactory);
workers, aggregationFlushInterval, aggregationShards, threadFactory, containerID);
} else {
return new StatsDNonBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize,
workers, aggregationFlushInterval, aggregationShards, threadFactory);
workers, aggregationFlushInterval, aggregationShards, threadFactory, containerID);
}
}

Expand Down Expand Up @@ -1221,13 +1219,13 @@ protected boolean isOriginDetectionEnabled(String containerID, boolean originDet
return false;
}

final String value = System.getenv(ORIGIN_DETECTION_ENABLED_ENV_VAR);
if (value != null && !value.trim().isEmpty()) {
Set<String> falseValues = new HashSet<String>(Arrays.asList("no", "false", "0", "n", "off"));
return !falseValues.contains(value.toLowerCase());
String value = System.getenv(ORIGIN_DETECTION_ENABLED_ENV_VAR);
value = value != null ? value.trim() : null;
if (value != null && !value.isEmpty()) {
return !Arrays.asList("no", "false", "0", "n", "off").contains(value.toLowerCase());
}

// DD_ORIGIN_DETECTION_ENABLED is not set
// DD_ORIGIN_DETECTION_ENABLED is not set or is empty
// default to true
return true;
}
Expand All @@ -1238,7 +1236,12 @@ protected String getContainerID(String containerID, boolean originDetectionEnabl
}

if (originDetectionEnabled) {
return ContainerID.get().getContainerID();
CgroupReader reader = new CgroupReader();
try {
return reader.getContainerID();
} catch (final IOException e) {
throw new StatsDClientException("Failed to get container ID", e);
}
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ protected boolean haveMessages() {
StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler,
final int maxPacketSizeBytes, final int poolSize, final int workers,
final int aggregatorFlushInterval, final int aggregatorShards,
final ThreadFactory threadFactory) throws Exception {
final ThreadFactory threadFactory, final String containerID) throws Exception {

super(queueSize, handler, maxPacketSizeBytes, poolSize, workers,
aggregatorFlushInterval, aggregatorShards, threadFactory);
aggregatorFlushInterval, aggregatorShards, threadFactory, containerID);
this.messages = new ArrayBlockingQueue<>(queueSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ protected boolean haveMessages() {
StatsDNonBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler,
final int maxPacketSizeBytes, final int poolSize, final int workers,
final int aggregatorFlushInterval, final int aggregatorShards,
final ThreadFactory threadFactory) throws Exception {
final ThreadFactory threadFactory, final String containerID) throws Exception {

super(queueSize, handler, maxPacketSizeBytes, poolSize, workers,
aggregatorFlushInterval, aggregatorShards, threadFactory);
aggregatorFlushInterval, aggregatorShards, threadFactory, containerID);
this.qsize = new AtomicInteger(0);
this.messages = new ConcurrentLinkedQueue<>();
}
Expand Down
8 changes: 3 additions & 5 deletions src/main/java/com/timgroup/statsd/StatsDProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) {
StatsDProcessor(final int queueSize, final StatsDClientErrorHandler handler,
final int maxPacketSizeBytes, final int poolSize, final int workers,
final int aggregatorFlushInterval, final int aggregatorShards,
final ThreadFactory threadFactory) throws Exception {
final ThreadFactory threadFactory, final String containerID) throws Exception {

this.handler = handler;
this.threadFactory = threadFactory;
Expand All @@ -172,6 +172,8 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) {
this.endSignal = new CountDownLatch(workers);
this.closeSignal = new CountDownLatch(workers);
this.aggregator = new StatsDAggregator(this, aggregatorShards, aggregatorFlushInterval);

this.containerID = containerID;
}

protected abstract ProcessingTask createProcessingTask();
Expand Down Expand Up @@ -217,10 +219,6 @@ public Telemetry getTelemetry() {
return telemetry;
}

public void setContainerID(final String containerID) {
this.containerID = containerID;
}

void shutdown(boolean blocking) throws InterruptedException {
shutdown = true;
aggregator.stop();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.timgroup.statsd;

import static org.junit.Assert.assertNull;

import org.junit.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

public class ContainerIDTest {
public class CgroupReaderTest {
@Test
public void containerID_parse() throws Exception {
// Docker
Expand All @@ -25,7 +27,7 @@ public void containerID_parse() throws Exception {
.append("1:cpuset:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860")
.toString();

assertThat(ContainerID.parse(docker).getContainerID(), equalTo("3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"));
assertThat(CgroupReader.parse(docker), equalTo("3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"));

// Kubernetes
String kubernetes = new StringBuilder()
Expand All @@ -42,7 +44,7 @@ public void containerID_parse() throws Exception {
.append("1:name=systemd:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1")
.toString();

assertThat(ContainerID.parse(kubernetes).getContainerID(), equalTo("3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1"));
assertThat(CgroupReader.parse(kubernetes), equalTo("3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1"));

// ECS EC2
String ecs = new StringBuilder()
Expand All @@ -57,7 +59,7 @@ public void containerID_parse() throws Exception {
.append("1:blkio:/ecs/name-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce")
.toString();

assertThat(ContainerID.parse(ecs).getContainerID(), equalTo("38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce"));
assertThat(CgroupReader.parse(ecs), equalTo("38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce"));

// ECS Fargate
String ecsFargate = new StringBuilder()
Expand All @@ -74,6 +76,40 @@ public void containerID_parse() throws Exception {
.append("1:name=systemd:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da\n")
.toString();

assertThat(ContainerID.parse(ecsFargate).getContainerID(), equalTo("432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"));
assertThat(CgroupReader.parse(ecsFargate), equalTo("432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"));

// ECS Fargate >= 1.4.0
String ecsFargate14 = new StringBuilder()
.append("11:hugetlb:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n")
.append("10:pids:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n")
.append("9:cpuset:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n")
.append("8:net_cls,net_prio:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n")
.append("7:cpu,cpuacct:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n")
.append("6:perf_event:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n")
.append("5:freezer:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n")
.append("4:devices:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n")
.append("3:blkio:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n")
.append("2:memory:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n")
.append("1:name=systemd:/ecs/34dc0b5e626f2c5c4c5170e34b10e765-1234567890\n")
.toString();

assertThat(CgroupReader.parse(ecsFargate14), equalTo("34dc0b5e626f2c5c4c5170e34b10e765-1234567890"));

// Linux non-containerized
String nonContainerized = new StringBuilder()
.append("11:blkio:/user.slice/user-0.slice/session-14.scope\n")
.append("10:memory:/user.slice/user-0.slice/session-14.scope\n")
.append("9:hugetlb:/\n")
.append("8:cpuset:/\n")
.append("7:pids:/user.slice/user-0.slice/session-14.scope\n")
.append("6:freezer:/\n")
.append("5:net_cls,net_prio:/\n")
.append("4:perf_event:/\n")
.append("3:cpu,cpuacct:/user.slice/user-0.slice/session-14.scope\n")
.append("2:devices:/user.slice/user-0.slice/session-14.scope\n")
.append("1:name=systemd:/user.slice/user-0.slice/session-14.scope\n")
.toString();

assertNull(CgroupReader.parse(nonContainerized));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static class FakeProcessor extends StatsDProcessor {
private final AtomicInteger messageAggregated = new AtomicInteger(0);

FakeProcessor(final StatsDClientErrorHandler handler) throws Exception {
super(0, handler, 0, 1, 1, 0, 0, new StatsDThreadFactory());
super(0, handler, 0, 1, 1, 0, 0, new StatsDThreadFactory(), null);
this.messages = new ConcurrentLinkedQueue<>();
}

Expand Down
Loading

0 comments on commit b5d5c8b

Please sign in to comment.