Skip to content

Commit

Permalink
INT-4384: Micrometer Support
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-4384

Add support for micrometer metrics collection.

Initial commit.

* Rebase; upgrade to Micrometer rc.8

* Polishing - PR Comments

* Final Polishing
  • Loading branch information
garyrussell authored and artembilan committed Feb 2, 2018
1 parent 3bd4930 commit 5475474
Show file tree
Hide file tree
Showing 19 changed files with 648 additions and 40 deletions.
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ subprojects { subproject ->

ext {
activeMqVersion = '5.15.2'
aspectjVersion = '1.8.13'
apacheSshdVersion = '1.6.0'
aspectjVersion = '1.8.13'
assertjVersion = '2.6.0'
boonVersion = '0.34'
commonsDbcp2Version = '2.1.1'
commonsIoVersion = '2.4'
Expand Down Expand Up @@ -119,6 +120,7 @@ subprojects { subproject ->
jythonVersion = '2.5.3'
kryoShadedVersion = '3.0.3'
log4jVersion = '2.10.0'
micrometerVersion = '1.0.0-rc.8'
mockitoVersion = '2.11.0'
mysqlVersion = '6.0.6'
pahoMqttClientVersion = '1.2.0'
Expand Down Expand Up @@ -272,6 +274,7 @@ project('spring-integration-test-support') {
compile ("org.mockito:mockito-core:$mockitoVersion") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
compile "org.assertj:assertj-core:$assertjVersion"
compile "org.springframework:spring-context:$springVersion"
compile "org.springframework:spring-messaging:$springVersion"
compile "org.springframework:spring-test:$springVersion"
Expand Down Expand Up @@ -316,6 +319,7 @@ project('spring-integration-core') {
}
compile("io.fastjson:boon:$boonVersion", optional)
compile("com.esotericsoftware:kryo-shaded:$kryoShadedVersion", optional)
compile "io.micrometer:micrometer-core:$micrometerVersion"

testCompile ("org.aspectj:aspectjweaver:$aspectjVersion")
testCompile "io.projectreactor:reactor-test:$reactorVersion"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -433,12 +433,19 @@ public boolean send(Message<?> message, long timeout) {
}
}
if (countsEnabled) {
metrics = channelMetrics.beforeSend();
if (channelMetrics.getTimer() != null) {
final Message<?> messageToSend = message;
sent = channelMetrics.getTimer().recordCallable(() -> doSend(messageToSend, timeout));
}
else {
metrics = channelMetrics.beforeSend();
sent = doSend(message, timeout);
channelMetrics.afterSend(metrics, sent);
metricsProcessed = true;
}
}
sent = this.doSend(message, timeout);
if (countsEnabled) {
channelMetrics.afterSend(metrics, sent);
metricsProcessed = true;
else {
sent = doSend(message, timeout);
}

if (debugEnabled) {
Expand All @@ -452,7 +459,12 @@ public boolean send(Message<?> message, long timeout) {
}
catch (Exception e) {
if (countsEnabled && !metricsProcessed) {
channelMetrics.afterSend(metrics, false);
if (channelMetrics.getErrorCounter() != null) {
channelMetrics.getErrorCounter().increment();
}
else {
channelMetrics.afterSend(metrics, false);
}
}
if (interceptorStack != null) {
interceptors.afterSendCompletion(message, this, sent, e, interceptorStack);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.springframework.messaging.MessagingException;
import org.springframework.util.CollectionUtils;

import io.micrometer.core.instrument.Counter;

/**
* @author Mark Fisher
* @author Oleg Zhurakousky
Expand All @@ -51,11 +53,13 @@ public abstract class AbstractMessageSource<T> extends AbstractExpressionEvaluat

private volatile Map<String, Expression> headerExpressions = Collections.emptyMap();

private volatile String beanName;
private String beanName;

private String managedType;

private volatile String managedType;
private String managedName;

private volatile String managedName;
private Counter counter;

private volatile boolean countsEnabled;

Expand Down Expand Up @@ -118,6 +122,11 @@ public void setLoggingEnabled(boolean loggingEnabled) {
this.managementOverrides.loggingConfigured = true;
}

@Override
public void setCounter(Counter counter) {
this.counter = counter;
}

@Override
public void reset() {
this.messageCount.set(0);
Expand Down Expand Up @@ -182,7 +191,12 @@ else if (result != null) {
.build();
}
if (this.countsEnabled && message != null) {
this.messageCount.incrementAndGet();
if (this.counter != null) {
this.counter.increment();
}
else {
this.messageCount.incrementAndGet();
}
}
return message;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -136,16 +136,31 @@ public void handleMessage(Message<?> message) {
message = MessageHistory.write(message, this, this.getMessageBuilderFactory());
}
if (countsEnabled) {
start = handlerMetrics.beforeHandle();
if (handlerMetrics.getTimer() != null) {
final Message<?> messageToSend = message;
handlerMetrics.getTimer().recordCallable(() -> {
handleMessageInternal(messageToSend);
return null;
});
}
else {
start = handlerMetrics.beforeHandle();
handleMessageInternal(message);
handlerMetrics.afterHandle(start, true);
}
}
this.handleMessageInternal(message);
if (countsEnabled) {
handlerMetrics.afterHandle(start, true);
else {
handleMessageInternal(message);
}
}
catch (Exception e) {
if (countsEnabled) {
handlerMetrics.afterHandle(start, false);
if (handlerMetrics.getErrorCounter() != null) {
handlerMetrics.getErrorCounter().increment();
}
else {
handlerMetrics.afterHandle(start, false);
}
}
if (e instanceof MessagingException) {
throw (MessagingException) e;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;

/**
* Abstract base class for channel metrics implementations.
*
Expand All @@ -32,10 +38,36 @@ public abstract class AbstractMessageChannelMetrics implements ConfigurableMetri

protected final String name;

private final Timer timer;

private final Counter errorCounter;

private volatile boolean fullStatsEnabled;

/**
* Construct an instance with the provided name.
* @param name the name.
*/
public AbstractMessageChannelMetrics(String name) {
this(name, null, null);
}

/**
* Construct an instance with the provided name, timer and error counter.
* A non-null timer requires a non-null error counter. When a timer is provided,
* Micrometer metrics are used and the legacy metrics are not maintained.
* @param name the name.
* @param timer the timer.
* @param errorCounter the error counter.
* @since 5.0.2
*/
public AbstractMessageChannelMetrics(String name, Timer timer, Counter errorCounter) {
if (timer != null) {
Assert.notNull(errorCounter, "'errorCounter' cannot be null if a timer is provided");
}
this.name = name;
this.timer = timer;
this.errorCounter = errorCounter;
}

/**
Expand Down Expand Up @@ -72,6 +104,26 @@ protected boolean isFullStatsEnabled() {
*/
public abstract void reset();

/**
* Return the timer if Micrometer metrics are being used.
* @return the timer, or null to indicate Micrometer is not being used.
* @since 5.0.2
*/
@Nullable
public Timer getTimer() {
return this.timer;
}

/**
* Return the error counter if Micrometer metrics are being used.
* @return the counter or null if Micrometer is not being used.
* @since 5.0.2
*/
@Nullable
public Counter getErrorCounter() {
return this.errorCounter;
}

public abstract int getSendCount();

public abstract long getSendCountLong();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;

/**
* Abstract base class for handler metrics implementations.
*
Expand All @@ -32,10 +35,20 @@ public abstract class AbstractMessageHandlerMetrics implements ConfigurableMetri

protected final String name;

private final Timer timer;

private final Counter errorCounter;

private volatile boolean fullStatsEnabled;

public AbstractMessageHandlerMetrics(String name) {
this(name, null, null);
}

public AbstractMessageHandlerMetrics(String name, Timer timer, Counter errorCounter) {
this.name = name;
this.timer = timer;
this.errorCounter = errorCounter;
}

/**
Expand Down Expand Up @@ -66,6 +79,14 @@ protected boolean isFullStatsEnabled() {

public abstract void reset();

public Timer getTimer() {
return this.timer;
}

public Counter getErrorCounter() {
return this.errorCounter;
}

public abstract long getHandleCountLong();

public abstract int getHandleCount();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2009-2017 the original author or authors.
* Copyright 2009-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,9 @@

import java.util.concurrent.atomic.AtomicLong;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;

/**
* Default implementation; use the full constructor to customize the moving averages.
*
Expand Down Expand Up @@ -61,13 +64,26 @@ public DefaultMessageChannelMetrics() {
* @param name the name.
*/
public DefaultMessageChannelMetrics(String name) {
this(name, null, null);
}

/**
* Construct an instance with default metrics with {@code window=10, period=1 second,
* lapsePeriod=1 minute}.
* @param name the name.
* @param timer a timer.
* @param errorCounter a counter.
* @since 5.0.2
*/
public DefaultMessageChannelMetrics(String name, Timer timer, Counter errorCounter) {
this(name, new ExponentialMovingAverage(DEFAULT_MOVING_AVERAGE_WINDOW, 1000000.),
new ExponentialMovingAverageRate(
ONE_SECOND_SECONDS, ONE_MINUTE_SECONDS, DEFAULT_MOVING_AVERAGE_WINDOW, true),
new ExponentialMovingAverageRatio(
ONE_MINUTE_SECONDS, DEFAULT_MOVING_AVERAGE_WINDOW, true),
new ExponentialMovingAverageRate(
ONE_SECOND_SECONDS, ONE_MINUTE_SECONDS, DEFAULT_MOVING_AVERAGE_WINDOW, true));
new ExponentialMovingAverageRate(
ONE_SECOND_SECONDS, ONE_MINUTE_SECONDS, DEFAULT_MOVING_AVERAGE_WINDOW, true),
new ExponentialMovingAverageRatio(
ONE_MINUTE_SECONDS, DEFAULT_MOVING_AVERAGE_WINDOW, true),
new ExponentialMovingAverageRate(
ONE_SECOND_SECONDS, ONE_MINUTE_SECONDS, DEFAULT_MOVING_AVERAGE_WINDOW, true),
timer, errorCounter);
}

/**
Expand All @@ -84,7 +100,26 @@ public DefaultMessageChannelMetrics(String name) {
public DefaultMessageChannelMetrics(String name, ExponentialMovingAverage sendDuration,
ExponentialMovingAverageRate sendErrorRate, ExponentialMovingAverageRatio sendSuccessRatio,
ExponentialMovingAverageRate sendRate) {
super(name);
this(name, sendDuration, sendErrorRate, sendSuccessRatio, sendRate, null, null);
}

/**
* Construct an instance with the supplied metrics. For proper representation of metrics, the
* supplied sendDuration must have a {@code factor=1000000.} and the the other arguments
* must be created with the {@code millis} constructor argument set to true.
* @param name the name.
* @param sendDuration an {@link ExponentialMovingAverage} for calculating the send duration.
* @param sendErrorRate an {@link ExponentialMovingAverageRate} for calculating the send error rate.
* @param sendSuccessRatio an {@link ExponentialMovingAverageRatio} for calculating the success ratio.
* @param sendRate an {@link ExponentialMovingAverageRate} for calculating the send rate.
* @param timer a timer.
* @param errorCounter a counter.
* @since 5.0.2
*/
public DefaultMessageChannelMetrics(String name, ExponentialMovingAverage sendDuration,
ExponentialMovingAverageRate sendErrorRate, ExponentialMovingAverageRatio sendSuccessRatio,
ExponentialMovingAverageRate sendRate, Timer timer, Counter errorCounter) {
super(name, timer, errorCounter);
this.sendDuration = sendDuration;
this.sendErrorRate = sendErrorRate;
this.sendSuccessRatio = sendSuccessRatio;
Expand Down
Loading

0 comments on commit 5475474

Please sign in to comment.