From de086446af2c49677a6d3e375bf746b6846a80e1 Mon Sep 17 00:00:00 2001 From: Riccardo Sirchia Date: Tue, 16 Apr 2013 00:32:48 +0200 Subject: [PATCH] Merged ladoe00-test branch --- .gitignore | 2 + .settings/.gitignore | 3 + .../disruptor/DisruptorBufferingTest.java | 116 ++++++++++++++++++ .../disruptor/SedaDisruptorCompareTest.java | 102 +++++++++------ 4 files changed, 188 insertions(+), 35 deletions(-) create mode 100644 .settings/.gitignore create mode 100644 src/test/java/com/github/camel/component/disruptor/DisruptorBufferingTest.java diff --git a/.gitignore b/.gitignore index 2f7896d..88b5123 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ target/ +/.classpath +/.project diff --git a/.settings/.gitignore b/.settings/.gitignore new file mode 100644 index 0000000..1de83a6 --- /dev/null +++ b/.settings/.gitignore @@ -0,0 +1,3 @@ +/org.eclipse.core.resources.prefs +/org.eclipse.jdt.core.prefs +/org.eclipse.m2e.core.prefs diff --git a/src/test/java/com/github/camel/component/disruptor/DisruptorBufferingTest.java b/src/test/java/com/github/camel/component/disruptor/DisruptorBufferingTest.java new file mode 100644 index 0000000..ef9ea9b --- /dev/null +++ b/src/test/java/com/github/camel/component/disruptor/DisruptorBufferingTest.java @@ -0,0 +1,116 @@ +/* + * Copyright 2012 Riccardo Sirchia + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.camel.component.disruptor; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * This test suite is testing different scenarios where a disruptor is forced to + * buffer exchanges locally until a consumer is registered. + */ +public class DisruptorBufferingTest extends CamelTestSupport { + + @Test + public void testDisruptorBufferingWhileWaitingOnFirstConsumer() throws Exception { + template.sendBody("disruptor:foo", "A"); + template.sendBody("disruptor:foo", "B"); + template.sendBody("disruptor:foo", "C"); + + DisruptorEndpoint disruptorEndpoint = getMandatoryEndpoint("disruptor:foo", DisruptorEndpoint.class); + + assertEquals(5, disruptorEndpoint.getDisruptor().remainingCapacity()); + + // Add a first consumer on the endpoint + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("disruptor:foo").routeId("bar").to("mock:bar"); + } + }); + + // Now that we have a consumer, the disruptor should send the buffered + // events downstream. Expect to receive the 3 original exchanges. + MockEndpoint mockEndpoint = getMockEndpoint("mock:bar"); + mockEndpoint.expectedMessageCount(3); + mockEndpoint.assertIsSatisfied(200); + } + + @Test + public void testDisruptorBufferingWhileWaitingOnNextConsumer() throws Exception { + template.sendBody("disruptor:foo", "A"); + template.sendBody("disruptor:foo", "B"); + template.sendBody("disruptor:foo", "C"); + + DisruptorEndpoint disruptorEndpoint = getMandatoryEndpoint("disruptor:foo", DisruptorEndpoint.class); + + assertEquals(5, disruptorEndpoint.getDisruptor().remainingCapacity()); + + // Add a first consumer on the endpoint + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("disruptor:foo").routeId("bar1").delay(200).to("mock:bar"); + } + }); + + // Now that we have a consumer, the disruptor should send the buffered + // events downstream. Wait until we have processed at least one + // exchange. + MockEndpoint mockEndpoint = getMockEndpoint("mock:bar"); + mockEndpoint.expectedMinimumMessageCount(1); + mockEndpoint.assertIsSatisfied(200); + + // Stop route and make sure all exchanges have been flushed. + context.stopRoute("bar1"); + mockEndpoint.expectedMessageCount(3); + mockEndpoint.assertIsSatisfied(); + + resetMocks(); + template.sendBody("disruptor:foo", "D"); + template.sendBody("disruptor:foo", "E"); + template.sendBody("disruptor:foo", "F"); + + // Add a new consumer on the endpoint + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("disruptor:foo").routeId("bar2").to("mock:bar"); + } + }); + template.sendBody("disruptor:foo", "G"); + + // Make sure we have received the 3 buffered exchanges plus the one + // added late. + mockEndpoint = getMockEndpoint("mock:bar"); + mockEndpoint.expectedMessageCount(4); + mockEndpoint.assertIsSatisfied(100); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("foo").to("disruptor:foo?size=8"); + } + }; + + } +} diff --git a/src/test/java/com/github/camel/component/disruptor/SedaDisruptorCompareTest.java b/src/test/java/com/github/camel/component/disruptor/SedaDisruptorCompareTest.java index 85b4c5c..d3c8f61 100644 --- a/src/test/java/com/github/camel/component/disruptor/SedaDisruptorCompareTest.java +++ b/src/test/java/com/github/camel/component/disruptor/SedaDisruptorCompareTest.java @@ -27,6 +27,7 @@ import org.apache.camel.*; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -35,7 +36,7 @@ /** * This class does not perform any functional test, but instead makes a comparison between the performance of the * Disruptor and SEDA component in several use cases. - * + *

* As memory management may have great impact on the results, it is adviced to run this test with a large, fixed heap * (e.g. run with -Xmx1024m -Xms1024m JVM parameters) */ @@ -47,7 +48,7 @@ public class SedaDisruptorCompareTest extends CamelTestSupport { protected ProducerTemplate producerTemplate; private static final int SPEED_TEST_EXCHANGE_COUNT = 80000; - private static final long[] LATENCY_HISTOGRAM_BOUNDS = new long[] {1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000}; + private static final long[] LATENCY_HISTOGRAM_BOUNDS = new long[]{1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000}; private ExchangeAwaiter[] exchangeAwaiters; private final String componentName; @@ -55,9 +56,21 @@ public class SedaDisruptorCompareTest extends CamelTestSupport { private final int amountProducers; private final int amountConsumers; + @BeforeClass + public static void legend() { + System.out.println("-----------------------"); + System.out.println("- Tests output legend -"); + System.out.println("-----------------------"); + System.out.println("P: Number of concurrent Producer(s) sharing the load for publishing exchanges to the disruptor."); + System.out.println("C: Number of Consumer(s) receiving a copy of each exchange from the disruptor (pub/sub)."); + System.out.println("CCT: Number of ConcurrentConsumerThreads sharing the load for consuming exchanges from the disruptor."); + System.out.println("Each test is creating " + SPEED_TEST_EXCHANGE_COUNT + " exchanges."); + System.out.println(); + } + public SedaDisruptorCompareTest(String componentName, String endpointUri, int amountProducers, int amountConsumers, int concurrentConsumerThreads) { - this.componentName = componentName + " ("+amountProducers+"P, " + amountConsumers+"C, " + concurrentConsumerThreads+"CCT)"; + this.componentName = componentName + " (" + amountProducers + "P, " + amountConsumers + "C, " + concurrentConsumerThreads + "CCT)"; this.endpointUri = endpointUri; this.amountProducers = amountProducers; this.amountConsumers = amountConsumers; @@ -67,34 +80,54 @@ public SedaDisruptorCompareTest(String componentName, String endpointUri, int am } } + private static int singleProducer() { + return 1; + } + + private static int multipleProducers() { + return 4; + } + + private static int singleConsumer() { + return 1; + } + + private static int multipleConsumers() { + return 4; + } + + private static int singleConcurrentConsumerThread() { + return 1; + } + + private static int multipleConcurrentConsumerThreads() { + return 2; + } + @Parameterized.Parameters public static Collection parameters() { List parameters = new ArrayList(); - int multipleProducers = 4; - int multipleConsumers = 4; - int parallelConsumerThreads = 2; - - //This parameter set can be compared to the next and shows the impact of a 'long' endpoint name - //It defines all parameters to the same values as the default, so the result should be the same as - //'seda:speedtest'. This shows that disruptor has a slight disadvantage as its name is longer than 'seda' :) - parameters.add(new Object[] {"SEDA LONG (1P, 1C, 1CCT)", "seda:speedtest?concurrentConsumers=1&waitForTaskToComplete=IfReplyExpected&timeout=30000&multipleConsumers=false&limitConcurrentConsumers=true&blockWhenFull=false", 1, 1, 1}); - addParameterPair(parameters, 1,1,1); - addParameterPair(parameters, 1,1,parallelConsumerThreads); - addParameterPair(parameters, 1,multipleConsumers,1); - addParameterPair(parameters, 1,multipleConsumers,parallelConsumerThreads); - addParameterPair(parameters, multipleProducers,1,1); - addParameterPair(parameters, multipleProducers,1,parallelConsumerThreads); - addParameterPair(parameters, multipleProducers,multipleConsumers,1); - addParameterPair(parameters, multipleProducers,multipleConsumers,parallelConsumerThreads); + // This parameter set can be compared to the next and shows the impact of a 'long' endpoint name + // It defines all parameters to the same values as the default, so the result should be the same as + // 'seda:speedtest'. This shows that disruptor has a slight disadvantage as its name is longer than 'seda' :) + parameters.add(new Object[]{"SEDA LONG (1P, 1C, 1CCT)", "seda:speedtest?concurrentConsumers=1&waitForTaskToComplete=IfReplyExpected&timeout=30000&multipleConsumers=false&limitConcurrentConsumers=true&blockWhenFull=false", singleProducer(), singleConsumer(), + singleConcurrentConsumerThread()}); + addParameterPair(parameters, singleProducer(), singleConsumer(), singleConcurrentConsumerThread()); + addParameterPair(parameters, singleProducer(), singleConsumer(), multipleConcurrentConsumerThreads()); + addParameterPair(parameters, singleProducer(), multipleConsumers(), singleConcurrentConsumerThread()); + addParameterPair(parameters, singleProducer(), multipleConsumers(), multipleConcurrentConsumerThreads()); + addParameterPair(parameters, multipleProducers(), singleConsumer(), singleConcurrentConsumerThread()); + addParameterPair(parameters, multipleProducers(), singleConsumer(), multipleConcurrentConsumerThreads()); + addParameterPair(parameters, multipleProducers(), multipleConsumers(), singleConcurrentConsumerThread()); + addParameterPair(parameters, multipleProducers(), multipleConsumers(), multipleConcurrentConsumerThreads()); return parameters; } - private static void addParameterPair(List parameters, int producers, int consumers, - int parallelConsumerThreads) { + private static void addParameterPair(List parameters, int producers, int consumers, int parallelConsumerThreads) { String multipleConsumerOption = (consumers > 1 ? "multipleConsumers=true" : ""); - String concurrentConsumerOptions = (parallelConsumerThreads > 1 ? "concurrentConsumers="+parallelConsumerThreads : ""); + String concurrentConsumerOptions = (parallelConsumerThreads > 1 ? "concurrentConsumers=" + parallelConsumerThreads : ""); String options = ""; if (!multipleConsumerOption.isEmpty() || !concurrentConsumerOptions.isEmpty()) { @@ -111,17 +144,17 @@ private static void addParameterPair(List parameters, int producers, i options += concurrentConsumerOptions; } - parameters.add(new Object[] {"SEDA", "seda:speedtest"+options, producers, consumers, parallelConsumerThreads}); - parameters.add(new Object[] {"Disruptor", "disruptor:speedtest"+options, producers, consumers, parallelConsumerThreads}); + parameters.add(new Object[]{"SEDA", "seda:speedtest" + options, producers, consumers, parallelConsumerThreads}); + parameters.add(new Object[]{"Disruptor", "disruptor:speedtest" + options, producers, consumers, parallelConsumerThreads}); } @Test public void speedTestDisruptor() throws InterruptedException { - System.out.println("Warming up for test of: " +componentName); + System.out.println("Warming up for test of: " + componentName); performTest(true); - System.out.println("Starting real test of: " +componentName); + System.out.println("Starting real test of: " + componentName); forceGC(); Thread.sleep(1000); @@ -130,8 +163,8 @@ public void speedTestDisruptor() throws InterruptedException { } private void forceGC() { - //unfortunately there is no nice API that forces the Garbage collector to run, but it may consider our request - //more seriously if we ask it twice :) + // unfortunately there is no nice API that forces the Garbage collector to run, but it may consider our request + // more seriously if we ask it twice :) System.gc(); System.gc(); } @@ -145,7 +178,7 @@ private void resetExchangeAwaiters() { private void awaitExchangeAwaiters() throws InterruptedException { for (ExchangeAwaiter exchangeAwaiter : exchangeAwaiters) { while (!exchangeAwaiter.awaitMessagesReceived(10, TimeUnit.SECONDS)) { - System.err.println("Processing takes longer then expected: " + componentName + " " + exchangeAwaiter.getStatus() ); + System.err.println("Processing takes longer then expected: " + componentName + " " + exchangeAwaiter.getStatus()); } } } @@ -155,7 +188,7 @@ private void outputExchangeAwaitersResult(long start) throws InterruptedExceptio long stop = exchangeAwaiter.getCountDownReachedTime(); Histogram histogram = exchangeAwaiter.getLatencyHistogram(); - System.out.println(componentName + " time spent = " + (stop-start) + " ms. Latency (ms): "+histogram.toString()); + System.out.printf("%-45s time spent = %5d ms. Latency (ms): %s %n", componentName, (stop - start), histogram.toString()); } } @@ -164,7 +197,7 @@ private void performTest(boolean warmup) throws InterruptedException { ProducerThread[] producerThread = new ProducerThread[amountProducers]; for (int i = 0; i < producerThread.length; ++i) { - producerThread[i] = new ProducerThread(SPEED_TEST_EXCHANGE_COUNT/amountProducers); + producerThread[i] = new ProducerThread(SPEED_TEST_EXCHANGE_COUNT / amountProducers); } long start = System.currentTimeMillis(); @@ -181,7 +214,6 @@ private void performTest(boolean warmup) throws InterruptedException { } - @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @@ -218,7 +250,7 @@ public boolean awaitMessagesReceived(long timeout, TimeUnit unit) throws Interru public String getStatus() { StringBuilder sb = new StringBuilder(100); sb.append("processed "); - sb.append(count-latch.getCount()); + sb.append(count - latch.getCount()); sb.append('/'); sb.append(count); sb.append(" messages"); @@ -244,7 +276,7 @@ public long getCountDownReachedTime() { public Histogram getLatencyHistogram() { Histogram histogram = new Histogram(LATENCY_HISTOGRAM_BOUNDS); for (Long latencyValue : latencyQueue) { - histogram.addObservation(latencyValue/1000000); + histogram.addObservation(latencyValue / 1000000); } return histogram; } @@ -269,7 +301,7 @@ public void run() { public String getStatus() { StringBuilder sb = new StringBuilder(100); sb.append("produced "); - sb.append(producedMessageCount-1); + sb.append(producedMessageCount - 1); sb.append('/'); sb.append(totalMessageCount); sb.append(" messages");