Skip to content

Commit

Permalink
Merge branch '1.6.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
shakuzen committed Nov 25, 2020
2 parents 84a26c6 + 8f2ba8a commit 133d359
Show file tree
Hide file tree
Showing 25 changed files with 203 additions and 215 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ buildscript {
classpath 'me.champeau.gradle:jmh-gradle-plugin:0.5.0'
classpath 'com.netflix.nebula:nebula-project-plugin:3.4.0'
classpath "io.spring.nohttp:nohttp-gradle:0.0.4.RELEASE"
classpath "org.gradle:test-retry-gradle-plugin:1.1.6"
classpath "org.gradle:test-retry-gradle-plugin:1.1.9"

constraints {
classpath('org.ow2.asm:asm:7.3.1') {
Expand Down Expand Up @@ -243,7 +243,7 @@ subprojects {
}

wrapper {
gradleVersion = '6.7'
gradleVersion = '6.7.1'
}

defaultTasks 'build'
4 changes: 1 addition & 3 deletions dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ def VERSIONS = [
'io.dropwizard.metrics:metrics-graphite:4.1.+',
'io.dropwizard.metrics:metrics-jmx:4.0.+',
'info.ganglia.gmetric4j:gmetric4j:latest.release',
'io.projectreactor:reactor-core:3.3.+',
'io.projectreactor:reactor-test:3.3.+',
'io.projectreactor.netty:reactor-netty:0.9.+',
'io.prometheus:simpleclient_common:latest.release',
'io.prometheus:simpleclient_pushgateway:latest.release',
'javax.cache:cache-api:latest.release',
Expand Down Expand Up @@ -87,6 +84,7 @@ subprojects {
runtimeOnly version
}
}
implementation platform('io.projectreactor:reactor-bom:2020.0.+')
}
}
}
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
2 changes: 1 addition & 1 deletion implementations/micrometer-registry-graphite/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ dependencies {
api 'io.dropwizard.metrics:metrics-graphite'

testImplementation project(':micrometer-test')
testImplementation 'io.projectreactor.netty:reactor-netty'
testImplementation 'io.projectreactor.netty:reactor-netty-core'
}
4 changes: 2 additions & 2 deletions implementations/micrometer-registry-statsd/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies {
api project(':micrometer-core')

implementation 'io.projectreactor:reactor-core'
implementation 'io.projectreactor.netty:reactor-netty'
implementation 'io.projectreactor.netty:reactor-netty-core'

testImplementation project(':micrometer-test')
testImplementation 'io.projectreactor:reactor-test'
Expand Down Expand Up @@ -48,7 +48,7 @@ publishing {
.dependencies
.dependency
.findAll {
['reactor-core', 'reactor-netty'].contains(it.artifactId.text())
['reactor-core', 'reactor-netty-core'].contains(it.artifactId.text())
}
.each { it.parent().remove(it) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.micrometer.core.instrument.AbstractMeter;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.util.MeterEquivalence;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;

import java.util.concurrent.atomic.DoubleAdder;

Expand All @@ -27,11 +27,11 @@
*/
public class StatsdCounter extends AbstractMeter implements Counter {
private final StatsdLineBuilder lineBuilder;
private final FluxSink<String> sink;
private final Sinks.Many<String> sink;
private DoubleAdder count = new DoubleAdder();
private volatile boolean shutdown;

StatsdCounter(Id id, StatsdLineBuilder lineBuilder, FluxSink<String> sink) {
StatsdCounter(Id id, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink) {
super(id);
this.lineBuilder = lineBuilder;
this.sink = sink;
Expand All @@ -41,7 +41,7 @@ public class StatsdCounter extends AbstractMeter implements Counter {
public void increment(double amount) {
if (!shutdown && amount > 0) {
count.add(amount);
sink.next(lineBuilder.count((long) amount));
sink.emitNext(lineBuilder.count((long) amount), Sinks.EmitFailureHandler.FAIL_FAST);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

import io.micrometer.core.instrument.AbstractDistributionSummary;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.TimeWindowMax;
import io.micrometer.core.instrument.util.MeterEquivalence;
import io.micrometer.core.lang.Nullable;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;

import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;
Expand All @@ -32,10 +31,10 @@ public class StatsdDistributionSummary extends AbstractDistributionSummary {
private final DoubleAdder amount = new DoubleAdder();
private final TimeWindowMax max;
private final StatsdLineBuilder lineBuilder;
private final FluxSink<String> sink;
private final Sinks.Many<String> sink;
private volatile boolean shutdown;

StatsdDistributionSummary(Meter.Id id, StatsdLineBuilder lineBuilder, FluxSink<String> sink, Clock clock,
StatsdDistributionSummary(Id id, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink, Clock clock,
DistributionStatisticConfig distributionStatisticConfig, double scale) {
super(id, clock, distributionStatisticConfig, scale, false);
this.max = new TimeWindowMax(clock, distributionStatisticConfig);
Expand All @@ -49,7 +48,7 @@ protected void recordNonNegative(double amount) {
count.increment();
this.amount.add(amount);
max.record(amount);
sink.next(lineBuilder.histogram(amount));
sink.emitNext(lineBuilder.histogram(amount), Sinks.EmitFailureHandler.FAIL_FAST);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.micrometer.statsd;

import io.micrometer.core.instrument.cumulative.CumulativeFunctionCounter;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ToDoubleFunction;
Expand All @@ -30,10 +30,10 @@
*/
public class StatsdFunctionCounter<T> extends CumulativeFunctionCounter<T> implements StatsdPollable {
private final StatsdLineBuilder lineBuilder;
private final FluxSink<String> sink;
private final Sinks.Many<String> sink;
private final AtomicReference<Long> lastValue = new AtomicReference<>(0L);

StatsdFunctionCounter(Id id, T obj, ToDoubleFunction<T> f, StatsdLineBuilder lineBuilder, FluxSink<String> sink) {
StatsdFunctionCounter(Id id, T obj, ToDoubleFunction<T> f, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink) {
super(id, obj, f);
this.lineBuilder = lineBuilder;
this.sink = sink;
Expand All @@ -43,7 +43,7 @@ public class StatsdFunctionCounter<T> extends CumulativeFunctionCounter<T> imple
public void poll() {
lastValue.updateAndGet(prev -> {
long count = (long) count();
sink.next(lineBuilder.count(count - prev));
sink.emitNext(lineBuilder.count(count - prev), Sinks.EmitFailureHandler.FAIL_FAST);
return count;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.micrometer.statsd;

import io.micrometer.core.instrument.cumulative.CumulativeFunctionTimer;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -25,13 +25,13 @@

public class StatsdFunctionTimer<T> extends CumulativeFunctionTimer<T> implements StatsdPollable {
private final StatsdLineBuilder lineBuilder;
private final FluxSink<String> sink;
private final Sinks.Many<String> sink;
private final AtomicReference<Long> lastCount = new AtomicReference<>(0L);
private final AtomicReference<Double> lastTime = new AtomicReference<>(0.0);

StatsdFunctionTimer(Id id, T obj, ToLongFunction<T> countFunction, ToDoubleFunction<T> totalTimeFunction,
TimeUnit totalTimeFunctionUnit, TimeUnit baseTimeUnit,
StatsdLineBuilder lineBuilder, FluxSink<String> sink) {
StatsdLineBuilder lineBuilder, Sinks.Many<String> sink) {
super(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit, baseTimeUnit);
this.lineBuilder = lineBuilder;
this.sink = sink;
Expand All @@ -53,7 +53,7 @@ public void poll() {
// occurrences.
double timingAverage = newTimingsSum / newTimingsCount;
for (int i = 0; i < newTimingsCount; i++) {
sink.next(lineBuilder.timing(timingAverage));
sink.emitNext(lineBuilder.timing(timingAverage), Sinks.EmitFailureHandler.FAIL_FAST);
}

return totalTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.util.MeterEquivalence;
import io.micrometer.core.lang.Nullable;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;

import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ToDoubleFunction;

public class StatsdGauge<T> extends AbstractMeter implements Gauge, StatsdPollable {
private final StatsdLineBuilder lineBuilder;
private final FluxSink<String> sink;
private final Sinks.Many<String> sink;

private final WeakReference<T> ref;
private final ToDoubleFunction<T> value;
private final AtomicReference<Double> lastValue = new AtomicReference<>(Double.NaN);
private final boolean alwaysPublish;

StatsdGauge(Id id, StatsdLineBuilder lineBuilder, FluxSink<String> sink, @Nullable T obj, ToDoubleFunction<T> value, boolean alwaysPublish) {
StatsdGauge(Id id, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink, @Nullable T obj, ToDoubleFunction<T> value, boolean alwaysPublish) {
super(id);
this.lineBuilder = lineBuilder;
this.sink = sink;
Expand All @@ -53,7 +53,7 @@ public double value() {
public void poll() {
double val = value();
if (Double.isFinite(val) && (alwaysPublish || lastValue.getAndSet(val) != val)) {
sink.next(lineBuilder.gauge(val));
sink.emitNext(lineBuilder.gauge(val), Sinks.EmitFailureHandler.FAIL_FAST);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@
import io.micrometer.core.instrument.Statistic;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.internal.DefaultLongTaskTimer;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class StatsdLongTaskTimer extends DefaultLongTaskTimer implements StatsdPollable {
private final StatsdLineBuilder lineBuilder;
private final FluxSink<String> sink;
private final Sinks.Many<String> sink;

private final AtomicReference<Long> lastActive = new AtomicReference<>(Long.MIN_VALUE);
private final AtomicReference<Double> lastDuration = new AtomicReference<>(Double.NEGATIVE_INFINITY);

private final boolean alwaysPublish;

StatsdLongTaskTimer(Id id, StatsdLineBuilder lineBuilder, FluxSink<String> sink, Clock clock, boolean alwaysPublish,
StatsdLongTaskTimer(Id id, StatsdLineBuilder lineBuilder, Sinks.Many<String> sink, Clock clock, boolean alwaysPublish,
DistributionStatisticConfig distributionStatisticConfig, TimeUnit baseTimeUnit) {
super(id, clock, baseTimeUnit, distributionStatisticConfig, false);
this.lineBuilder = lineBuilder;
Expand All @@ -45,17 +45,17 @@ public class StatsdLongTaskTimer extends DefaultLongTaskTimer implements StatsdP
public void poll() {
long active = activeTasks();
if (alwaysPublish || lastActive.getAndSet(active) != active) {
sink.next(lineBuilder.gauge(active, Statistic.ACTIVE_TASKS));
sink.emitNext(lineBuilder.gauge(active, Statistic.ACTIVE_TASKS), Sinks.EmitFailureHandler.FAIL_FAST);
}

double duration = duration(TimeUnit.MILLISECONDS);
if (alwaysPublish || lastDuration.getAndSet(duration) != duration) {
sink.next(lineBuilder.gauge(duration, Statistic.DURATION));
sink.emitNext(lineBuilder.gauge(duration, Statistic.DURATION), Sinks.EmitFailureHandler.FAIL_FAST);
}

double max = max(TimeUnit.MILLISECONDS);
if (alwaysPublish || lastDuration.getAndSet(duration) != duration) {
sink.next(lineBuilder.gauge(max, Statistic.MAX));
sink.emitNext(lineBuilder.gauge(max, Statistic.MAX), Sinks.EmitFailureHandler.FAIL_FAST);
}
}
}
Loading

0 comments on commit 133d359

Please sign in to comment.