Skip to content

Commit

Permalink
Merge pull request #4 from lookout/fetch-some-jmx-stats
Browse files Browse the repository at this point in the history
Fetch some jmx stats
  • Loading branch information
rkuris committed May 29, 2015
2 parents b459a53 + 8459f0a commit 0891264
Show file tree
Hide file tree
Showing 9 changed files with 404 additions and 140 deletions.
36 changes: 33 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ make sure pom.xml has <version>2.2.0</version>.

Alternatively, grab the binary from bintray:

`curl -L http://dl.bintray.com/lookout/systems/com/github/lookout/metrics/agent/1.1/agent-1.1.jar -o agent-1.1.jar`
`curl -L http://dl.bintray.com/lookout/systems/com/github/lookout/metrics/agent/1.2/agent-1.2.jar -o agent-1.2.jar`

INSTALL
----------------

Copy the statsd library from the .m2 folder to cassandra/lib.
Add the following to your cassandra startup script:

Copy the agent-1.1.jar to a new directory cassandra/plugins
Copy the agent-1.2.jar to a new directory cassandra/plugins

Change cassandra startup to add this agent. This can be done in
a stock install by adding the following to /etc/default/cassandra:

`export JVM_OPTS="-javaagent:/usr/share/cassandra/plugins/agent-1.1.jar=localhost"`
`export JVM_OPTS="-javaagent:/usr/share/cassandra/plugins/agent-1.2.jar=localhost"`

Note the '=localhost' at the end. This supports the following syntaxes:
`hostname:port@interval`
Expand All @@ -51,6 +51,36 @@ confirm that everything is running, it looks like this:

`INFO [metrics-statsd-thread-1] 2014-12-19 19:05:37,120 StatsdReporter.java:65 - Statsd reporting to host localhost port 8125 every 10 seconds`

WHAT GETS REPORTED
------------------
Lots of stuff:

* Gossip statistics:
gossip.score.<IP>, which help decide who is closer/faster for queries
gossip.severity, which indicates how busy this node is self-reporting to others
* Per table statistics:
cfstats.<keyspace>.<columnfamily>.ReadCount
cfstats.<keyspace>.<columnfamily>.WriteCount
cfstats.<keyspace>.<columnfamily>.RecentReadLatencyMicros
cfstats.<keyspace>.<columnfamily>.RecentWriteLatencyMicros
cfstats.<keyspace>.<columnfamily>.TombstonesPerSlice
cfstats.<keyspace>.<columnfamily>.estimatedKeys
The last one is great for monitoring general trends, but of course don't
rely on that number to be very accurate.
* PHI reporter
Also supported is the currently-experimental PHI reporter, in PHI.<IP>,
coming to a Cassandra cluster near you soon.
* JVM GC metrics
* Anything else registered with yammer-metrics

DEBUGGING
----------------
Not working? There's a lot of tracing and debugging available. Change the
log4j-server.properties and add something like this to get extremely detailed
traces of what it's doing in the server.log.

`log4j.logger.com.github.lookout.metrics.agent.generators=TRACE`

TODO
----------------
Errors that happen during startup are not reported as well as they should
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<groupId>com.github.lookout.metrics</groupId>
<artifactId>agent</artifactId>

<version>1.1</version>
<version>1.2</version>
<packaging>jar</packaging>

<name>cassandra-statsd-reporter</name>
Expand Down
11 changes: 2 additions & 9 deletions src/main/java/com/github/lookout/metrics/agent/ReportAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import java.util.concurrent.TimeUnit;

public class ReportAgent {
public static void premain(final String agentArgs, final Instrumentation inst) throws IOException {

public static void premain(final String agentArgs, final Instrumentation inst) {
String host;
try {
host = InetAddress.getLocalHost().getHostName();
Expand All @@ -26,13 +27,5 @@ public static void premain(final String agentArgs, final Instrumentation inst) t
reporter.start(hostPortInterval.getInterval(), TimeUnit.SECONDS);
}
}

public static void main(final String[] args) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// impossible
}
}
}

149 changes: 22 additions & 127 deletions src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,164 +19,59 @@

package com.github.lookout.metrics.agent;

import com.github.lookout.metrics.agent.generators.CassandraJMXGenerator;
import com.github.lookout.metrics.agent.generators.JavaVMGenerator;
import com.github.lookout.metrics.agent.generators.MetricGenerator;
import com.github.lookout.metrics.agent.generators.YammerMetricsGenerator;
import com.timgroup.statsd.StatsDClient;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.*;
import com.yammer.metrics.reporting.AbstractPollingReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.*;
import java.util.concurrent.TimeUnit;

public class StatsdReporter extends AbstractPollingReporter implements MetricProcessor<Long> {
public class StatsdReporter extends AbstractPollingReporter {

private static final Logger LOG = LoggerFactory.getLogger(StatsdReporter.class);

protected final VirtualMachineMetrics vm;

private final StatsDClient statsd;
protected final Clock clock;
private static final MetricPredicate predicate = MetricPredicate.ALL;

private boolean reportedStartup = false;
private final HostPortInterval hostPortInterval;
private final HashMap<String, Integer> previous_run_times;
private final HashMap<String, Integer> previous_run_counts;

private final Set<MetricGenerator> generators = new HashSet<>();

public StatsdReporter(final HostPortInterval hostPortInterval, final StatsDClient statsd) {
super(Metrics.defaultRegistry(), "statsd");
this.hostPortInterval = hostPortInterval;
this.statsd = statsd;
vm = VirtualMachineMetrics.getInstance();
clock = Clock.defaultClock();
previous_run_times = new HashMap<String, Integer>();
previous_run_counts = new HashMap<String, Integer>();
}

// This really should be done with an injection framework, but that's too heavy for this
generators.add(new CassandraJMXGenerator());
generators.add(new JavaVMGenerator());
generators.add(new YammerMetricsGenerator());
}

@Override
public void run() {
if (!reportedStartup || LOG.isDebugEnabled()) {
LOG.info("Statsd reporting to {}", hostPortInterval);
reportedStartup = true;
}
try {
final long epoch = clock.time() / 1000;
printMetrics(epoch);
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error writing to statsd", e);
} else {
LOG.warn("Error writing to statsd: {}", e.getMessage());
}
}
}

private int bytesToMB(double bytes) {
return (int)(bytes/(1024*1024));
}
private int doubleToPct(double pct) {
return (int) Math.round(100 * pct);
}

protected void printMetrics(long epoch) {
// Memory
statsd.gauge("jvm.memory.totalInitInMB", bytesToMB(vm.totalInit()));
statsd.gauge("jvm.memory.totalUsedInMB", bytesToMB(vm.totalUsed()));
statsd.gauge("jvm.memory.heapUsedInMB", bytesToMB(vm.heapUsed()));

statsd.gauge("jvm.memory.heapUsagePercent", doubleToPct(vm.heapUsage()));

for (Map.Entry<String, Double> pool : vm.memoryPoolUsage().entrySet()) {
statsd.gauge("jvm.memory.memory_pool_usages." + pool.getKey() + "Percent", doubleToPct(pool.getValue()));
}

statsd.gauge("jvm.fdUsagePercent", doubleToPct(vm.fileDescriptorUsage()));

for (Map.Entry<String, VirtualMachineMetrics.GarbageCollectorStats> entry : vm.garbageCollectors().entrySet()) {
// we only care about the delta times for the GC time and GC runs

final String name = "jvm.gc." + entry.getKey();
String stat_name_time = name + ".timeInMS";

int total_run_time = (int) entry.getValue().getTime(TimeUnit.MILLISECONDS);
Integer previous_total_run_time = previous_run_times.get(stat_name_time);

if (previous_total_run_time == null) {
previous_total_run_time = 0;
}
int delta_run_time = total_run_time - previous_total_run_time;
previous_run_times.put(stat_name_time, total_run_time);

statsd.gauge(stat_name_time, delta_run_time);
String stat_run_count = name + ".runs";

int total_runs = (int) entry.getValue().getRuns();

Integer previous_total_runs = previous_run_counts.get(stat_run_count);

if (previous_total_runs == null) {
previous_total_runs = 0;
}

statsd.gauge(stat_run_count, total_runs - previous_total_runs);
previous_run_counts.put(stat_run_count, total_runs);
}

final Set<Map.Entry<String, SortedMap<MetricName, Metric>>> entries = getMetricsRegistry().groupedMetrics(predicate).entrySet();

for (final Map.Entry<String, SortedMap<MetricName, Metric>> entry : entries) {
for (final Map.Entry<MetricName, Metric> subEntry : entry.getValue().entrySet()) {

final Metric metric = subEntry.getValue();

if (metric != null) {
try {
metric.processWith(this, subEntry.getKey(), epoch);
} catch (final Exception exception) {
LOG.error("Error processing key {}", subEntry.getKey(), exception);
}
for (MetricGenerator generator : generators) {
try {
generator.generate(statsd);
} catch (RuntimeException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error writing to statsd", e);
} else {
LOG.warn("Error writing to statsd: {}", e.getMessage());
}
}
}
}

@Override
public void processMeter(MetricName name, Metered meter, Long epoch) throws Exception {
LOG.debug("Meter {} {} skipped", name.getName(), meter.count());
}

@Override
public void processCounter(MetricName name, Counter counter, Long epoch) throws Exception {
statsd.gauge(name.getName(), counter.count());
}

@Override
public void processHistogram(MetricName name, Histogram histogram, Long epoch) throws Exception {
LOG.debug("Histogram {} mean {} skipped", name.getName(), histogram.mean());
}

@Override
public void processTimer(MetricName name, Timer timer, Long context) throws Exception {
LOG.debug("Timer {} skipped", name.getName());
}

@Override
public void processGauge(MetricName name, Gauge<?> gauge, Long context) throws Exception {
reportGaugeValue(name.getName(), gauge.value());
}
private void reportGaugeValue(String name, Object gaugeValue) {
if (gaugeValue instanceof Long) {
statsd.gauge(name, ((Long) gaugeValue).longValue());
} else if (gaugeValue instanceof Double) {
statsd.gauge(name, ((Double)gaugeValue).doubleValue());
} else if (gaugeValue instanceof Map) {
for (Map.Entry<?, ?> entry: ((Map<?,?>)gaugeValue).entrySet()) {
reportGaugeValue(name + "." + entry.getKey().toString(), entry.getValue());
}
}
}
}
Loading

0 comments on commit 0891264

Please sign in to comment.