Skip to content

Commit

Permalink
Merged ladoe00-test branch
Browse files Browse the repository at this point in the history
  • Loading branch information
sirchia committed Apr 15, 2013
1 parent f15c0fe commit de08644
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 35 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
target/
/.classpath
/.project
3 changes: 3 additions & 0 deletions .settings/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/org.eclipse.core.resources.prefs
/org.eclipse.jdt.core.prefs
/org.eclipse.m2e.core.prefs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2012 Riccardo Sirchia
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.camel.component.disruptor;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;

/**
* This test suite is testing different scenarios where a disruptor is forced to
* buffer exchanges locally until a consumer is registered.
*/
public class DisruptorBufferingTest extends CamelTestSupport {

@Test
public void testDisruptorBufferingWhileWaitingOnFirstConsumer() throws Exception {
template.sendBody("disruptor:foo", "A");
template.sendBody("disruptor:foo", "B");
template.sendBody("disruptor:foo", "C");

DisruptorEndpoint disruptorEndpoint = getMandatoryEndpoint("disruptor:foo", DisruptorEndpoint.class);

assertEquals(5, disruptorEndpoint.getDisruptor().remainingCapacity());

// Add a first consumer on the endpoint
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("disruptor:foo").routeId("bar").to("mock:bar");
}
});

// Now that we have a consumer, the disruptor should send the buffered
// events downstream. Expect to receive the 3 original exchanges.
MockEndpoint mockEndpoint = getMockEndpoint("mock:bar");
mockEndpoint.expectedMessageCount(3);
mockEndpoint.assertIsSatisfied(200);
}

@Test
public void testDisruptorBufferingWhileWaitingOnNextConsumer() throws Exception {
template.sendBody("disruptor:foo", "A");
template.sendBody("disruptor:foo", "B");
template.sendBody("disruptor:foo", "C");

DisruptorEndpoint disruptorEndpoint = getMandatoryEndpoint("disruptor:foo", DisruptorEndpoint.class);

assertEquals(5, disruptorEndpoint.getDisruptor().remainingCapacity());

// Add a first consumer on the endpoint
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("disruptor:foo").routeId("bar1").delay(200).to("mock:bar");
}
});

// Now that we have a consumer, the disruptor should send the buffered
// events downstream. Wait until we have processed at least one
// exchange.
MockEndpoint mockEndpoint = getMockEndpoint("mock:bar");
mockEndpoint.expectedMinimumMessageCount(1);
mockEndpoint.assertIsSatisfied(200);

// Stop route and make sure all exchanges have been flushed.
context.stopRoute("bar1");
mockEndpoint.expectedMessageCount(3);
mockEndpoint.assertIsSatisfied();

resetMocks();
template.sendBody("disruptor:foo", "D");
template.sendBody("disruptor:foo", "E");
template.sendBody("disruptor:foo", "F");

// Add a new consumer on the endpoint
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("disruptor:foo").routeId("bar2").to("mock:bar");
}
});
template.sendBody("disruptor:foo", "G");

// Make sure we have received the 3 buffered exchanges plus the one
// added late.
mockEndpoint = getMockEndpoint("mock:bar");
mockEndpoint.expectedMessageCount(4);
mockEndpoint.assertIsSatisfied(100);
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start").routeId("foo").to("disruptor:foo?size=8");
}
};

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.camel.*;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -35,7 +36,7 @@
/**
* This class does not perform any functional test, but instead makes a comparison between the performance of the
* Disruptor and SEDA component in several use cases.
*
* <p/>
* As memory management may have great impact on the results, it is adviced to run this test with a large, fixed heap
* (e.g. run with -Xmx1024m -Xms1024m JVM parameters)
*/
Expand All @@ -47,17 +48,29 @@ public class SedaDisruptorCompareTest extends CamelTestSupport {
protected ProducerTemplate producerTemplate;

private static final int SPEED_TEST_EXCHANGE_COUNT = 80000;
private static final long[] LATENCY_HISTOGRAM_BOUNDS = new long[] {1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000};
private static final long[] LATENCY_HISTOGRAM_BOUNDS = new long[]{1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000};

private ExchangeAwaiter[] exchangeAwaiters;
private final String componentName;
private final String endpointUri;
private final int amountProducers;
private final int amountConsumers;

@BeforeClass
public static void legend() {
System.out.println("-----------------------");
System.out.println("- Tests output legend -");
System.out.println("-----------------------");
System.out.println("P: Number of concurrent Producer(s) sharing the load for publishing exchanges to the disruptor.");
System.out.println("C: Number of Consumer(s) receiving a copy of each exchange from the disruptor (pub/sub).");
System.out.println("CCT: Number of ConcurrentConsumerThreads sharing the load for consuming exchanges from the disruptor.");
System.out.println("Each test is creating " + SPEED_TEST_EXCHANGE_COUNT + " exchanges.");
System.out.println();
}

public SedaDisruptorCompareTest(String componentName, String endpointUri, int amountProducers, int amountConsumers, int concurrentConsumerThreads) {

this.componentName = componentName + " ("+amountProducers+"P, " + amountConsumers+"C, " + concurrentConsumerThreads+"CCT)";
this.componentName = componentName + " (" + amountProducers + "P, " + amountConsumers + "C, " + concurrentConsumerThreads + "CCT)";
this.endpointUri = endpointUri;
this.amountProducers = amountProducers;
this.amountConsumers = amountConsumers;
Expand All @@ -67,34 +80,54 @@ public SedaDisruptorCompareTest(String componentName, String endpointUri, int am
}
}

private static int singleProducer() {
return 1;
}

private static int multipleProducers() {
return 4;
}

private static int singleConsumer() {
return 1;
}

private static int multipleConsumers() {
return 4;
}

private static int singleConcurrentConsumerThread() {
return 1;
}

private static int multipleConcurrentConsumerThreads() {
return 2;
}

@Parameterized.Parameters
public static Collection<Object[]> parameters() {
List<Object[]> parameters = new ArrayList<Object[]>();

int multipleProducers = 4;
int multipleConsumers = 4;
int parallelConsumerThreads = 2;

//This parameter set can be compared to the next and shows the impact of a 'long' endpoint name
//It defines all parameters to the same values as the default, so the result should be the same as
//'seda:speedtest'. This shows that disruptor has a slight disadvantage as its name is longer than 'seda' :)
parameters.add(new Object[] {"SEDA LONG (1P, 1C, 1CCT)", "seda:speedtest?concurrentConsumers=1&waitForTaskToComplete=IfReplyExpected&timeout=30000&multipleConsumers=false&limitConcurrentConsumers=true&blockWhenFull=false", 1, 1, 1});
addParameterPair(parameters, 1,1,1);
addParameterPair(parameters, 1,1,parallelConsumerThreads);
addParameterPair(parameters, 1,multipleConsumers,1);
addParameterPair(parameters, 1,multipleConsumers,parallelConsumerThreads);
addParameterPair(parameters, multipleProducers,1,1);
addParameterPair(parameters, multipleProducers,1,parallelConsumerThreads);
addParameterPair(parameters, multipleProducers,multipleConsumers,1);
addParameterPair(parameters, multipleProducers,multipleConsumers,parallelConsumerThreads);
// This parameter set can be compared to the next and shows the impact of a 'long' endpoint name
// It defines all parameters to the same values as the default, so the result should be the same as
// 'seda:speedtest'. This shows that disruptor has a slight disadvantage as its name is longer than 'seda' :)
parameters.add(new Object[]{"SEDA LONG (1P, 1C, 1CCT)", "seda:speedtest?concurrentConsumers=1&waitForTaskToComplete=IfReplyExpected&timeout=30000&multipleConsumers=false&limitConcurrentConsumers=true&blockWhenFull=false", singleProducer(), singleConsumer(),
singleConcurrentConsumerThread()});
addParameterPair(parameters, singleProducer(), singleConsumer(), singleConcurrentConsumerThread());
addParameterPair(parameters, singleProducer(), singleConsumer(), multipleConcurrentConsumerThreads());
addParameterPair(parameters, singleProducer(), multipleConsumers(), singleConcurrentConsumerThread());
addParameterPair(parameters, singleProducer(), multipleConsumers(), multipleConcurrentConsumerThreads());
addParameterPair(parameters, multipleProducers(), singleConsumer(), singleConcurrentConsumerThread());
addParameterPair(parameters, multipleProducers(), singleConsumer(), multipleConcurrentConsumerThreads());
addParameterPair(parameters, multipleProducers(), multipleConsumers(), singleConcurrentConsumerThread());
addParameterPair(parameters, multipleProducers(), multipleConsumers(), multipleConcurrentConsumerThreads());

return parameters;
}

private static void addParameterPair(List<Object[]> parameters, int producers, int consumers,
int parallelConsumerThreads) {
private static void addParameterPair(List<Object[]> parameters, int producers, int consumers, int parallelConsumerThreads) {
String multipleConsumerOption = (consumers > 1 ? "multipleConsumers=true" : "");
String concurrentConsumerOptions = (parallelConsumerThreads > 1 ? "concurrentConsumers="+parallelConsumerThreads : "");
String concurrentConsumerOptions = (parallelConsumerThreads > 1 ? "concurrentConsumers=" + parallelConsumerThreads : "");

String options = "";
if (!multipleConsumerOption.isEmpty() || !concurrentConsumerOptions.isEmpty()) {
Expand All @@ -111,17 +144,17 @@ private static void addParameterPair(List<Object[]> parameters, int producers, i
options += concurrentConsumerOptions;
}

parameters.add(new Object[] {"SEDA", "seda:speedtest"+options, producers, consumers, parallelConsumerThreads});
parameters.add(new Object[] {"Disruptor", "disruptor:speedtest"+options, producers, consumers, parallelConsumerThreads});
parameters.add(new Object[]{"SEDA", "seda:speedtest" + options, producers, consumers, parallelConsumerThreads});
parameters.add(new Object[]{"Disruptor", "disruptor:speedtest" + options, producers, consumers, parallelConsumerThreads});
}

@Test
public void speedTestDisruptor() throws InterruptedException {

System.out.println("Warming up for test of: " +componentName);
System.out.println("Warming up for test of: " + componentName);

performTest(true);
System.out.println("Starting real test of: " +componentName);
System.out.println("Starting real test of: " + componentName);

forceGC();
Thread.sleep(1000);
Expand All @@ -130,8 +163,8 @@ public void speedTestDisruptor() throws InterruptedException {
}

private void forceGC() {
//unfortunately there is no nice API that forces the Garbage collector to run, but it may consider our request
//more seriously if we ask it twice :)
// unfortunately there is no nice API that forces the Garbage collector to run, but it may consider our request
// more seriously if we ask it twice :)
System.gc();
System.gc();
}
Expand All @@ -145,7 +178,7 @@ private void resetExchangeAwaiters() {
private void awaitExchangeAwaiters() throws InterruptedException {
for (ExchangeAwaiter exchangeAwaiter : exchangeAwaiters) {
while (!exchangeAwaiter.awaitMessagesReceived(10, TimeUnit.SECONDS)) {
System.err.println("Processing takes longer then expected: " + componentName + " " + exchangeAwaiter.getStatus() );
System.err.println("Processing takes longer then expected: " + componentName + " " + exchangeAwaiter.getStatus());
}
}
}
Expand All @@ -155,7 +188,7 @@ private void outputExchangeAwaitersResult(long start) throws InterruptedExceptio
long stop = exchangeAwaiter.getCountDownReachedTime();
Histogram histogram = exchangeAwaiter.getLatencyHistogram();

System.out.println(componentName + " time spent = " + (stop-start) + " ms. Latency (ms): "+histogram.toString());
System.out.printf("%-45s time spent = %5d ms. Latency (ms): %s %n", componentName, (stop - start), histogram.toString());
}
}

Expand All @@ -164,7 +197,7 @@ private void performTest(boolean warmup) throws InterruptedException {

ProducerThread[] producerThread = new ProducerThread[amountProducers];
for (int i = 0; i < producerThread.length; ++i) {
producerThread[i] = new ProducerThread(SPEED_TEST_EXCHANGE_COUNT/amountProducers);
producerThread[i] = new ProducerThread(SPEED_TEST_EXCHANGE_COUNT / amountProducers);
}

long start = System.currentTimeMillis();
Expand All @@ -181,7 +214,6 @@ private void performTest(boolean warmup) throws InterruptedException {

}


@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
Expand Down Expand Up @@ -218,7 +250,7 @@ public boolean awaitMessagesReceived(long timeout, TimeUnit unit) throws Interru
public String getStatus() {
StringBuilder sb = new StringBuilder(100);
sb.append("processed ");
sb.append(count-latch.getCount());
sb.append(count - latch.getCount());
sb.append('/');
sb.append(count);
sb.append(" messages");
Expand All @@ -244,7 +276,7 @@ public long getCountDownReachedTime() {
public Histogram getLatencyHistogram() {
Histogram histogram = new Histogram(LATENCY_HISTOGRAM_BOUNDS);
for (Long latencyValue : latencyQueue) {
histogram.addObservation(latencyValue/1000000);
histogram.addObservation(latencyValue / 1000000);
}
return histogram;
}
Expand All @@ -269,7 +301,7 @@ public void run() {
public String getStatus() {
StringBuilder sb = new StringBuilder(100);
sb.append("produced ");
sb.append(producedMessageCount-1);
sb.append(producedMessageCount - 1);
sb.append('/');
sb.append(totalMessageCount);
sb.append(" messages");
Expand Down

0 comments on commit de08644

Please sign in to comment.