Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collapser metrics #571

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
/**
* Copyright 2012 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix.contrib.servopublisher;

import java.util.ArrayList;
import java.util.List;

import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCollapserMetrics;
import com.netflix.hystrix.HystrixCollapserProperties;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherCollapser;
import com.netflix.servo.DefaultMonitorRegistry;
import com.netflix.servo.annotations.DataSourceLevel;
import com.netflix.servo.monitor.BasicCompositeMonitor;
import com.netflix.servo.monitor.Monitor;
import com.netflix.servo.monitor.MonitorConfig;
import com.netflix.servo.tag.Tag;

/**
* Implementation of {@link HystrixMetricsPublisherCollapser} using Servo (https://github.com/Netflix/servo)
*/
public class HystrixServoMetricsPublisherCollapser extends HystrixServoMetricsPublisherAbstract implements HystrixMetricsPublisherCollapser {

private final HystrixCollapserKey key;
private final HystrixCollapserMetrics metrics;
private final HystrixCollapserProperties properties;
private final Tag servoInstanceTag;
private final Tag servoTypeTag;

public HystrixServoMetricsPublisherCollapser(HystrixCollapserKey threadPoolKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) {
this.key = threadPoolKey;
this.metrics = metrics;
this.properties = properties;

this.servoInstanceTag = new Tag() {

@Override
public String getKey() {
return "instance";
}

@Override
public String getValue() {
return key.name();
}

@Override
public String tagString() {
return key.name();
}

};
this.servoTypeTag = new Tag() {

@Override
public String getKey() {
return "type";
}

@Override
public String getValue() {
return "HystrixCollapser";
}

@Override
public String tagString() {
return "HystrixCollapser";
}

};
}

@Override
public void initialize() {
/* list of monitors */
List<Monitor<?>> monitors = getServoMonitors();

// publish metrics together under a single composite (it seems this name is ignored)
MonitorConfig commandMetricsConfig = MonitorConfig.builder("HystrixCollapser_" + key.name()).build();
BasicCompositeMonitor commandMetricsMonitor = new BasicCompositeMonitor(commandMetricsConfig, monitors);

DefaultMonitorRegistry.getInstance().register(commandMetricsMonitor);
}

@Override
protected Tag getServoTypeTag() {
return servoTypeTag;
}

@Override
protected Tag getServoInstanceTag() {
return servoInstanceTag;
}

/**
* Servo will flatten metric names as: getServoTypeTag()_getServoInstanceTag()_monitorName
*/
private List<Monitor<?>> getServoMonitors() {

List<Monitor<?>> monitors = new ArrayList<Monitor<?>>();

monitors.add(new InformationalMetric<String>(MonitorConfig.builder("name").build()) {
@Override
public String getValue() {
return key.name();
}
});

// allow Servo and monitor to know exactly at what point in time these stats are for so they can be plotted accurately
monitors.add(new GaugeMetric(MonitorConfig.builder("currentTime").withTag(DataSourceLevel.DEBUG).build()) {
@Override
public Number getValue() {
return System.currentTimeMillis();
}
});

monitors.add(new CounterMetric(MonitorConfig.builder("countRequestsBatched").build()) {
@Override
public Number getValue() {
return metrics.getCumulativeCountRequestsBatched();
}
});

monitors.add(new CounterMetric(MonitorConfig.builder("countBatches").build()) {
@Override
public Number getValue() {
return metrics.getCumulativeCountBatches();
}
});

monitors.add(new CounterMetric(MonitorConfig.builder("countResponsesFromCache").build()) {
@Override
public Number getValue() {
return metrics.getCumulativeCountResponsesFromCache();
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("batchSize_percentile_25").build()) {
@Override
public Number getValue() {
return metrics.getBatchSizePercentile(25);
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("batchSize_percentile_50").build()) {
@Override
public Number getValue() {
return metrics.getBatchSizePercentile(50);
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("batchSize_percentile_75").build()) {
@Override
public Number getValue() {
return metrics.getBatchSizePercentile(75);
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("batchSize_percentile_95").build()) {
@Override
public Number getValue() {
return metrics.getBatchSizePercentile(95);
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("batchSize_percentile_99").build()) {
@Override
public Number getValue() {
return metrics.getBatchSizePercentile(99);
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("batchSize_percentile_99_5").build()) {
@Override
public Number getValue() {
return metrics.getBatchSizePercentile(99.5);
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("batchSize_percentile_100").build()) {
@Override
public Number getValue() {
return metrics.getBatchSizePercentile(100);
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_25").build()) {
@Override
public Number getValue() {
return metrics.getShardSizePercentile(25);
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_50").build()) {
@Override
public Number getValue() {
return metrics.getShardSizePercentile(50);
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_75").build()) {
@Override
public Number getValue() {
return metrics.getShardSizePercentile(75);
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_90").build()) {
@Override
public Number getValue() {
return metrics.getShardSizePercentile(90);
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_95").build()) {
@Override
public Number getValue() {
return metrics.getShardSizePercentile(95);
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_99").build()) {
@Override
public Number getValue() {
return metrics.getShardSizePercentile(99);
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_99_5").build()) {
@Override
public Number getValue() {
return metrics.getShardSizePercentile(99.5);
}
});

monitors.add(new GaugeMetric(MonitorConfig.builder("shardSize_percentile_100").build()) {
@Override
public Number getValue() {
return metrics.getShardSizePercentile(100);
}
});

return monitors;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherCollapser;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,6 +69,7 @@ public abstract class HystrixCollapser<BatchReturnType, ResponseType, RequestArg
private final RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> collapserFactory;
private final HystrixRequestCache requestCache;
private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> collapserInstanceWrapper;
private final HystrixCollapserMetrics metrics;

/**
* The scope of request collapsing.
Expand Down Expand Up @@ -108,36 +112,52 @@ protected HystrixCollapser(HystrixCollapserKey collapserKey) {
* Fluent interface for constructor arguments
*/
protected HystrixCollapser(Setter setter) {
this(setter.collapserKey, setter.scope, new RealCollapserTimer(), setter.propertiesSetter);
this(setter.collapserKey, setter.scope, new RealCollapserTimer(), setter.propertiesSetter, null);
}

/* package for tests */ HystrixCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder) {
this(collapserKey, scope, timer, propertiesBuilder, null);
}

/* package for tests */ HystrixCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) {
if (collapserKey == null || collapserKey.name().trim().equals("")) {
String defaultKeyName = getDefaultNameFromClass(getClass());
collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName);
}

this.collapserFactory = new RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType>(collapserKey, scope, timer, propertiesBuilder);
HystrixCollapserProperties properties = HystrixPropertiesFactory.getCollapserProperties(collapserKey, propertiesBuilder);
this.collapserFactory = new RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType>(collapserKey, scope, timer, properties);
this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());

if (metrics == null) {
this.metrics = HystrixCollapserMetrics.getInstance(collapserKey, properties);
} else {
this.metrics = metrics;
}

final HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> self = this;

/* strategy: HystrixMetricsPublisherCollapser */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCollapser(collapserKey, this.metrics, properties);

/**
* Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class.
*/
collapserInstanceWrapper = new HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType>() {

@Override
public Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
return self.shardRequests(requests);
Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = self.shardRequests(requests);
self.metrics.markShards(shards.size());
return shards;
}

@Override
public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
HystrixCommand<BatchReturnType> command = self.createCommand(requests);

// mark the number of requests being collapsed together
command.markAsCollapsedCommand(requests.size());
self.metrics.markBatch(requests.size());

return command.toObservable();
}
Expand Down Expand Up @@ -196,6 +216,14 @@ public Scope getScope() {
return Scope.valueOf(collapserFactory.getScope().name());
}

/**
* Return the {@link HystrixCollapserMetrics} for this collapser
* @return {@link HystrixCollapserMetrics} for this collapser
*/
public HystrixCollapserMetrics getMetrics() {
return metrics;
}

/**
* The request arguments to be passed to the {@link HystrixCommand}.
* <p>
Expand Down Expand Up @@ -298,7 +326,7 @@ protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentTy
* <b>Callback Scheduling</b>
* <p>
* <ul>
* <li>When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.</li>
* <li>When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#computation()} for callbacks.</li>
* <li>When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.</li>
* </ul>
* Use {@link #toObservable(rx.Scheduler)} to schedule the callback differently.
Expand All @@ -323,7 +351,7 @@ public Observable<ResponseType> observe() {
* <b>Callback Scheduling</b>
* <p>
* <ul>
* <li>When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.</li>
* <li>When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#computation()} for callbacks.</li>
* <li>When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.</li>
* </ul>
* <p>
Expand Down Expand Up @@ -354,17 +382,14 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {
if (getProperties().requestCachingEnabled().get()) {
Observable<ResponseType> fromCache = requestCache.get(getCacheKey());
if (fromCache != null) {
/* mark that we received this response from cache */
// TODO Add collapser metrics so we can capture this information
// we can't add it to the command metrics because the command can change each time (dynamic key for example)
// and we don't have access to it when responding from cache
// collapserMetrics.markResponseFromCache();
metrics.markResponseFromCache();
return fromCache;
}
}

RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
metrics.markRequestBatched();
if (getProperties().requestCachingEnabled().get()) {
/*
* A race can occur here with multiple threads queuing but only one will be cached.
Expand Down
Loading