Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added metrics to HystrixObservableCollapser #599

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -70,6 +72,7 @@ public abstract class HystrixObservableCollapser<K, BatchReturnType, ResponseTyp
private final RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> collapserFactory;
private final HystrixRequestCache requestCache;
private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> collapserInstanceWrapper;
private final HystrixCollapserMetrics metrics;

/**
* The scope of request collapsing.
Expand Down Expand Up @@ -112,28 +115,41 @@ protected HystrixObservableCollapser(HystrixCollapserKey collapserKey) {
* Fluent interface for constructor arguments
*/
protected HystrixObservableCollapser(Setter setter) {
this(setter.collapserKey, setter.scope, new RealCollapserTimer(), setter.propertiesSetter);
this(setter.collapserKey, setter.scope, new RealCollapserTimer(), setter.propertiesSetter, null);
}

/* package for tests */HystrixObservableCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder) {
/* package for tests */HystrixObservableCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) {
if (collapserKey == null || collapserKey.name().trim().equals("")) {
String defaultKeyName = getDefaultNameFromClass(getClass());
collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName);
}

this.collapserFactory = new RequestCollapserFactory<>(collapserKey, scope, timer, propertiesBuilder);
HystrixCollapserProperties properties = HystrixPropertiesFactory.getCollapserProperties(collapserKey, propertiesBuilder);
this.collapserFactory = new RequestCollapserFactory<>(collapserKey, scope, timer, properties);
this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());

if (metrics == null) {
this.metrics = HystrixCollapserMetrics.getInstance(collapserKey, properties);
} else {
this.metrics = metrics;
}

final HystrixObservableCollapser<K, BatchReturnType, ResponseType, RequestArgumentType> self = this;

/* strategy: HystrixMetricsPublisherCollapser */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCollapser(collapserKey, this.metrics, properties);


/**
* Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class.
*/
collapserInstanceWrapper = new HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType>() {

@Override
public Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
return self.shardRequests(requests);
Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = self.shardRequests(requests);
self.metrics.markShards(shards.size());
return shards;
}

@Override
Expand All @@ -142,7 +158,7 @@ public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedR

// mark the number of requests being collapsed together
command.markAsCollapsedCommand(requests.size());

self.metrics.markBatch(requests.size());
return command.toObservable();
}

Expand Down Expand Up @@ -223,6 +239,14 @@ public Scope getScope() {
return Scope.valueOf(collapserFactory.getScope().name());
}

/**
* Return the {@link HystrixCollapserMetrics} for this collapser
* @return {@link HystrixCollapserMetrics} for this collapser
*/
public HystrixCollapserMetrics getMetrics() {
return metrics;
}

/**
* The request arguments to be passed to the {@link HystrixCommand}.
* <p>
Expand Down Expand Up @@ -373,17 +397,14 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {
if (getProperties().requestCacheEnabled().get()) {
Observable<ResponseType> fromCache = requestCache.get(getCacheKey());
if (fromCache != null) {
/* mark that we received this response from cache */
// TODO Add collapser metrics so we can capture this information
// we can't add it to the command metrics because the command can change each time (dynamic key for example)
// and we don't have access to it when responding from cache
// collapserMetrics.markResponseFromCache();
metrics.markResponseFromCache();
return fromCache;
}
}

RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
metrics.markRequestBatched();
if (getProperties().requestCacheEnabled().get()) {
/*
* A race can occur here with multiple threads queuing but only one will be cached.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
package com.netflix.hystrix;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import com.netflix.hystrix.strategy.properties.HystrixPropertiesCollapserDefault;
import com.netflix.hystrix.util.HystrixRollingNumberEvent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -38,11 +42,8 @@
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

public class HystrixObservableCollapserTest {
static AtomicInteger counter = new AtomicInteger();

@Before
public void init() {
counter.set(0);
// since we're going to modify properties of the same class between tests, wipe the cache each time
HystrixCollapser.reset();
/* we must call this to simulate a new request lifecycle running and clearing caches */
Expand All @@ -61,57 +62,66 @@ public void cleanup() {
@Test
public void testTwoRequests() throws Exception {
TestCollapserTimer timer = new TestCollapserTimer();
Future<String> response1 = new TestRequestCollapser(timer, counter, 1).observe().toBlocking().toFuture();
Future<String> response2 = new TestRequestCollapser(timer, counter, 2).observe().toBlocking().toFuture();
HystrixObservableCollapser<String, String, String, String> collapser1 = new TestRequestCollapser(timer, 1);
HystrixObservableCollapser<String, String, String, String> collapser2 = new TestRequestCollapser(timer, 2);
Future<String> response1 = collapser1.observe().toBlocking().toFuture();
Future<String> response2 = collapser2.observe().toBlocking().toFuture();
timer.incrementTime(10); // let time pass that equals the default delay/period

assertEquals("1", response1.get());
assertEquals("2", response2.get());

assertEquals(1, counter.get());

assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());

HystrixCollapserMetrics metrics = collapser1.getMetrics();
assertSame(metrics, collapser2.getMetrics());
assertEquals(2L, metrics.getRollingCount(HystrixRollingNumberEvent.COLLAPSER_REQUEST_BATCHED));
assertEquals(1L, metrics.getRollingCount(HystrixRollingNumberEvent.COLLAPSER_BATCH));
assertEquals(0L, metrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE));
}

private static class TestRequestCollapser extends HystrixObservableCollapser<String, String, String, String> {

private final AtomicInteger count;
private final String value;
private ConcurrentLinkedQueue<HystrixObservableCommand<String>> commandsExecuted;

public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, int value) {
this(timer, counter, String.valueOf(value));
public TestRequestCollapser(TestCollapserTimer timer, int value) {
this(timer, String.valueOf(value));
}

public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value) {
this(timer, counter, value, 10000, 10);
public TestRequestCollapser(TestCollapserTimer timer, String value) {
this(timer, value, 10000, 10);
}

public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
this(timer, counter, value, 10000, 10, executionLog);
public TestRequestCollapser(TestCollapserTimer timer, String value, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
this(timer, value, 10000, 10, executionLog);
}

public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, int value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
this(timer, counter, String.valueOf(value), defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds);
public TestRequestCollapser(TestCollapserTimer timer, int value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
this(timer, String.valueOf(value), defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds);
}

public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
this(timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null);
public TestRequestCollapser(TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
this(timer, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null);
}

public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
this(scope, timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null);
public TestRequestCollapser(Scope scope, TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
this(scope, timer, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null);
}

public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
this(Scope.REQUEST, timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, executionLog);
public TestRequestCollapser(TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
this(Scope.REQUEST, timer, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, executionLog);
}

public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
private static HystrixCollapserMetrics createMetrics() {
HystrixCollapserKey key = HystrixCollapserKey.Factory.asKey("COLLAPSER_ONE");
return HystrixCollapserMetrics.getInstance(key, new HystrixPropertiesCollapserDefault(key, HystrixCollapserProperties.Setter()));
}

public TestRequestCollapser(Scope scope, TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
// use a CollapserKey based on the CollapserTimer object reference so it's unique for each timer as we don't want caching
// of properties to occur and we're using the default HystrixProperty which typically does caching
super(collapserKeyFromString(timer), scope, timer, HystrixCollapserProperties.Setter().withMaxRequestsInBatch(defaultMaxRequestsInBatch).withTimerDelayInMilliseconds(defaultTimerDelayInMilliseconds));
this.count = counter;
super(collapserKeyFromString(timer), scope, timer, HystrixCollapserProperties.Setter().withMaxRequestsInBatch(defaultMaxRequestsInBatch).withTimerDelayInMilliseconds(defaultTimerDelayInMilliseconds), createMetrics());
this.value = value;
this.commandsExecuted = executionLog;
}
Expand All @@ -133,9 +143,6 @@ public HystrixObservableCommand<String> createCommand(final Collection<Collapsed

@Override
protected Func1<String, String> getBatchReturnTypeToResponseTypeMapper() {
// count how many times a batch is executed (this method is executed once per batch)
System.out.println("increment count: " + count.incrementAndGet());

return new Func1<String, String>() {

@Override
Expand Down