From f45739997edff519e9adadc3ad183ecb3511a094 Mon Sep 17 00:00:00 2001 From: Riccardo Sirchia Date: Fri, 3 May 2013 00:44:17 +0200 Subject: [PATCH] Solved multi-threading issues in DisruptorReference --- .../disruptor/DisruptorEndpoint.java | 9 +- .../DisruptorNotStartedException.java | 39 +++ .../disruptor/DisruptorProducer.java | 19 +- .../disruptor/DisruptorReference.java | 241 +++++++++++------- ...ruptorReconfigureWithBlockingProducer.java | 112 ++++++++ .../disruptor/SedaDisruptorCompareTest.java | 20 +- 6 files changed, 332 insertions(+), 108 deletions(-) create mode 100644 src/main/java/com/github/camel/component/disruptor/DisruptorNotStartedException.java create mode 100644 src/test/java/com/github/camel/component/disruptor/DisruptorReconfigureWithBlockingProducer.java diff --git a/src/main/java/com/github/camel/component/disruptor/DisruptorEndpoint.java b/src/main/java/com/github/camel/component/disruptor/DisruptorEndpoint.java index e540539..7800db2 100644 --- a/src/main/java/com/github/camel/component/disruptor/DisruptorEndpoint.java +++ b/src/main/java/com/github/camel/component/disruptor/DisruptorEndpoint.java @@ -17,8 +17,6 @@ package com.github.camel.component.disruptor; import com.lmax.disruptor.dsl.ProducerType; -import java.util.*; -import java.util.concurrent.CopyOnWriteArraySet; import org.apache.camel.*; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedOperation; @@ -26,6 +24,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.*; +import java.util.concurrent.CopyOnWriteArraySet; + /** * TODO: documentation */ @@ -58,7 +59,7 @@ public int getSize() { } @ManagedOperation(description = "Remaining capacity in ring buffer") - public long remainingCapacity() { + public long remainingCapacity() throws DisruptorNotStartedException { return getDisruptor().remainingCapacity(); } @@ -205,7 +206,7 @@ Collection createConsumerEventHandlers() { * * @param exchange */ - void publish(final Exchange exchange) { + void publish(final Exchange exchange) throws DisruptorNotStartedException { disruptorReference.publish(exchange); } diff --git a/src/main/java/com/github/camel/component/disruptor/DisruptorNotStartedException.java b/src/main/java/com/github/camel/component/disruptor/DisruptorNotStartedException.java new file mode 100644 index 0000000..ff6f9e1 --- /dev/null +++ b/src/main/java/com/github/camel/component/disruptor/DisruptorNotStartedException.java @@ -0,0 +1,39 @@ +/* + * Copyright 2012 Riccardo Sirchia + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.camel.component.disruptor; + +/** + * This exception is thrown when a producer attempts to publish an exchange while the Disruptor is not yet started or + * already shut down + */ +public class DisruptorNotStartedException extends Exception { + public DisruptorNotStartedException() { + super(); + } + + public DisruptorNotStartedException(String message) { + super(message); + } + + public DisruptorNotStartedException(String message, Throwable cause) { + super(message, cause); + } + + public DisruptorNotStartedException(Throwable cause) { + super(cause); + } +} diff --git a/src/main/java/com/github/camel/component/disruptor/DisruptorProducer.java b/src/main/java/com/github/camel/component/disruptor/DisruptorProducer.java index 65aa1ad..9556e92 100644 --- a/src/main/java/com/github/camel/component/disruptor/DisruptorProducer.java +++ b/src/main/java/com/github/camel/component/disruptor/DisruptorProducer.java @@ -16,8 +16,6 @@ package com.github.camel.component.disruptor; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; @@ -26,6 +24,9 @@ import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.ExchangeHelper; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + /** * A Producer for the Disruptor component. */ @@ -110,7 +111,12 @@ public String toString() { }); log.trace("Publishing Exchange to disruptor ringbuffer: {}", copy); - endpoint.publish(copy); + try { + endpoint.publish(copy); + } catch (DisruptorNotStartedException e) { + log.trace("Exception while publishing Exchange to disruptor ringbuffer", e); + copy.setException(e); + } if (timeout > 0) { if (log.isTraceEnabled()) { @@ -156,7 +162,12 @@ public String toString() { // handover the completion so its the copy which performs that, as we do not wait final Exchange copy = prepareCopy(exchange, true); log.trace("Publishing Exchange to disruptor ringbuffer: {}", copy); - endpoint.publish(copy); + try { + endpoint.publish(copy); + } catch (DisruptorNotStartedException e) { + log.trace("Exception while publishing Exchange to disruptor ringbuffer", e); + copy.setException(e); + } } // we use OnCompletion on the Exchange to callback and wait for the Exchange to be done diff --git a/src/main/java/com/github/camel/component/disruptor/DisruptorReference.java b/src/main/java/com/github/camel/component/disruptor/DisruptorReference.java index d178f71..38c3476 100644 --- a/src/main/java/com/github/camel/component/disruptor/DisruptorReference.java +++ b/src/main/java/com/github/camel/component/disruptor/DisruptorReference.java @@ -19,12 +19,15 @@ import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; -import java.util.*; -import java.util.concurrent.*; import org.apache.camel.Exchange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicMarkableReference; +import java.util.concurrent.locks.LockSupport; + /** * Holder for Disruptor references. *

@@ -38,23 +41,29 @@ class DisruptorReference { private final DisruptorComponent component; private final String uri; - //TODO ascertain thread safe access to disruptor - private Disruptor disruptor; + //The mark on the reference indicates if we are in the process of reconfiguring the Disruptor: + //(ref, mark) : Description + //(null, false) : not started or completely shut down + //(null, true) : in process of reconfiguring + //( x , false) : normally functioning Disruptor + //( x , true) : never set + private final AtomicMarkableReference> disruptor = new AtomicMarkableReference>(null, false); private final DelayedExecutor delayedExecutor = new DelayedExecutor(); - private ExecutorService executor; - private final ProducerType producerType; private final int size; private final DisruptorWaitStrategy waitStrategy; - private LifecycleAwareExchangeEventHandler[] handlers = new LifecycleAwareExchangeEventHandler[0]; - private final Queue temporaryExchangeBuffer; + //access guarded by this + private ExecutorService executor; + + private LifecycleAwareExchangeEventHandler[] handlers = new LifecycleAwareExchangeEventHandler[0]; + DisruptorReference(final DisruptorComponent component, final String uri, final int size, final ProducerType producerType, final DisruptorWaitStrategy waitStrategy) throws Exception { this.component = component; @@ -66,54 +75,54 @@ class DisruptorReference { reconfigure(); } - private void createDisruptor() throws Exception { - disruptor = new Disruptor(ExchangeEventFactory.INSTANCE, size, delayedExecutor, producerType, - waitStrategy.createWaitStrategyInstance()); + public boolean hasNullReference() { + return disruptor.getReference() == null; } - public int getEndpointCount() { - return endpoints.size(); - } + private Disruptor getCurrentDisruptor() throws DisruptorNotStartedException { + Disruptor currentDisruptor = disruptor.getReference(); + + if (currentDisruptor == null) { + // no current Disruptor reference, we may be reconfiguring or it was not started + // check which by looking at the reference mark... + boolean[] changeIsPending = new boolean[1]; + + while (currentDisruptor == null) { + currentDisruptor = disruptor.get(changeIsPending); + //Check if we are reconfiguring + if (currentDisruptor == null && !changeIsPending[0]) { + throw new DisruptorNotStartedException("Disruptor is not yet started or already shut down."); + } else if (currentDisruptor == null && changeIsPending[0]) { + //We should be back shortly...keep trying but spare CPU resources + LockSupport.parkNanos(1L); + } + } + } - public boolean hasNullReference() { - return disruptor == null; + return currentDisruptor; } - public void publish(final Exchange exchange) { - final RingBuffer ringBuffer = disruptor.getRingBuffer(); + public void publish(final Exchange exchange) throws DisruptorNotStartedException { + publishExchangeOnRingBuffer(exchange, getCurrentDisruptor().getRingBuffer()); + } + private static void publishExchangeOnRingBuffer(final Exchange exchange, + final RingBuffer ringBuffer) { final long sequence = ringBuffer.next(); ringBuffer.get(sequence).setExchange(exchange); ringBuffer.publish(sequence); } - public void reconfigure() throws Exception { - /* - TODO handle reconfigure correctly with a full buffer and producers blocking - Instead of blocking until a fre spot on the ringbuffer is avaiable, they now - probably get an exception - */ - shutdown(); - - createDisruptor(); - - final ArrayList eventHandlers = new ArrayList(); - - for (final DisruptorEndpoint endpoint : endpoints) { - final Collection consumerEventHandlers = endpoint.createConsumerEventHandlers(); - - if (consumerEventHandlers != null) { - eventHandlers.addAll(consumerEventHandlers); - } - } - - handleEventsWith(eventHandlers.toArray(new LifecycleAwareExchangeEventHandler[eventHandlers.size()])); + public synchronized void reconfigure() throws Exception { + shutdownDisruptor(true); start(); } - private void start() { - disruptor.start(); + private void start() throws Exception { + Disruptor newDisruptor = createDisruptor(); + + newDisruptor.start(); if (executor != null) { //and use our delayed executor to really really execute the event handlers now @@ -131,7 +140,7 @@ private void start() { if (!handler.awaitStarted(10, TimeUnit.SECONDS)) { //we wait for a relatively long, but limited amount of time to prevent an application using //this component from hanging indefinitely - //Please report a bug if you can repruduce this + //Please report a bug if you can reproduce this LOGGER.error("Disruptor/event handler failed to start properly, PLEASE REPORT"); } eventHandlerStarted = true; @@ -141,27 +150,93 @@ private void start() { } } + publishBufferedExchanges(newDisruptor); + + disruptor.set(newDisruptor, false); + } + + private Disruptor createDisruptor() throws Exception { + //create a new Disruptor + final Disruptor newDisruptor = new Disruptor(ExchangeEventFactory.INSTANCE, size, delayedExecutor, producerType, + waitStrategy.createWaitStrategyInstance()); + + //determine the list of eventhandlers to be associated to the Disruptor + final ArrayList eventHandlers = new ArrayList(); + + for (final DisruptorEndpoint endpoint : endpoints) { + final Collection consumerEventHandlers = endpoint.createConsumerEventHandlers(); + + if (consumerEventHandlers != null) { + eventHandlers.addAll(consumerEventHandlers); + } + } + + handleEventsWith(newDisruptor, eventHandlers.toArray(new LifecycleAwareExchangeEventHandler[eventHandlers.size()])); + + return newDisruptor; + } + + private void handleEventsWith(Disruptor newDisruptor, final LifecycleAwareExchangeEventHandler[] newHandlers) { + if (newHandlers == null || newHandlers.length == 0) { + handlers = new LifecycleAwareExchangeEventHandler[1]; + handlers[0] = new BlockingExchangeEventHandler(); + } else { + handlers = newHandlers; + } + resizeThreadPoolExecutor(handlers.length); + newDisruptor.handleEventsWith(handlers); + } + + private void publishBufferedExchanges(Disruptor newDisruptor) { //now empty out all buffered Exchange if we had any final List exchanges = new ArrayList(temporaryExchangeBuffer.size()); while (!temporaryExchangeBuffer.isEmpty()) { exchanges.add(temporaryExchangeBuffer.remove()); } + RingBuffer ringBuffer = newDisruptor.getRingBuffer(); //and offer them again to our new ringbuffer for (final Exchange exchange : exchanges) { - publish(exchange); + publishExchangeOnRingBuffer(exchange, ringBuffer); } } - private void shutdown() { - if (disruptor != null) { + private void resizeThreadPoolExecutor(final int newSize) { + if (executor == null && newSize > 0) { + //no thread pool executor yet, create a new one + executor = component.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, uri, + newSize); + } else if (executor != null && newSize <= 0) { + //we need to shut down our executor + component.getCamelContext().getExecutorServiceManager().shutdown(executor); + executor = null; + } else if (executor instanceof ThreadPoolExecutor) { + //our thread pool executor is of type ThreadPoolExecutor, we know how to resize it + final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; + threadPoolExecutor.setCorePoolSize(newSize); + threadPoolExecutor.setMaximumPoolSize(newSize); + } else if (newSize > 0) { + //hmmm...no idea what kind of executor this is...just kill it and start fresh + component.getCamelContext().getExecutorServiceManager().shutdown(executor); + + executor = component.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, uri, + newSize); + } + } + + private synchronized void shutdownDisruptor(boolean isReconfiguring) { + Disruptor currentDisruptor = disruptor.getReference(); + disruptor.set(null, isReconfiguring); + + if (currentDisruptor != null) { //check if we had a blocking event handler to keep an empty disruptor 'busy' if (handlers != null && handlers.length == 1 && handlers[0] instanceof BlockingExchangeEventHandler) { - //yes we did, unblock it so we can get rid of our backlog empty its pending exchanged in our temporary buffer + // yes we did, unblock it so we can get rid of our backlog, + // The eventhandler will empty its pending exchanges in our temporary buffer final BlockingExchangeEventHandler blockingExchangeEventHandler = (BlockingExchangeEventHandler) handlers[0]; blockingExchangeEventHandler.unblock(); } - disruptor.shutdown(); + currentDisruptor.shutdown(); //they have already been given a trigger to halt when they are done by shutting down the disruptor //we do however want to await their completion before they are scheduled to process events from the new @@ -187,29 +262,18 @@ private void shutdown() { } handlers = new LifecycleAwareExchangeEventHandler[0]; - - disruptor = null; } + } + private synchronized void shutdownExecutor() { if (executor != null) { component.getCamelContext().getExecutorServiceManager().shutdown(executor); executor = null; } } - private void handleEventsWith(final LifecycleAwareExchangeEventHandler[] newHandlers) { - if (newHandlers == null || newHandlers.length == 0) { - handlers = new LifecycleAwareExchangeEventHandler[1]; - handlers[0] = new BlockingExchangeEventHandler(); - } else { - handlers = newHandlers; - } - resizeThreadPoolExecutor(handlers.length); - disruptor.handleEventsWith(handlers); - } - - public long remainingCapacity() { - return disruptor.getRingBuffer().remainingCapacity(); + public long remainingCapacity() throws DisruptorNotStartedException { + return getCurrentDisruptor().getRingBuffer().remainingCapacity(); } public DisruptorWaitStrategy getWaitStrategy() { @@ -221,48 +285,37 @@ ProducerType getProducerType() { } public int getBufferSize() { - return disruptor.getRingBuffer().getBufferSize(); + return size; } - public void addEndpoint(final DisruptorEndpoint disruptorEndpoint) { + public int getPendingExchangeSize() { + try { + if (!hasNullReference()) { + return (int) (getBufferSize() - remainingCapacity() + temporaryExchangeBuffer.size()); + } + } catch (DisruptorNotStartedException e) { + //fall through... + } + return temporaryExchangeBuffer.size(); + } + + public synchronized void addEndpoint(final DisruptorEndpoint disruptorEndpoint) { endpoints.add(disruptorEndpoint); } - public void removeEndpoint(final DisruptorEndpoint disruptorEndpoint) { + public synchronized void removeEndpoint(final DisruptorEndpoint disruptorEndpoint) { if (getEndpointCount() == 1) { - this.shutdown(); - } - endpoints.remove(disruptorEndpoint); - } + //Shutdown our disruptor + shutdownDisruptor(false); - public int getPendingExchangeSize() { - if (disruptor != null) { - return (int) (getBufferSize() - remainingCapacity() + temporaryExchangeBuffer.size()); + //As there are no endpoints dependent on this Disruptor, we may also shutdown our executor + shutdownExecutor(); } - return temporaryExchangeBuffer.size(); + endpoints.remove(disruptorEndpoint); } - private void resizeThreadPoolExecutor(final int newSize) { - if (executor == null && newSize > 0) { - //no thread pool executor yet, create a new one - executor = component.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, uri, - newSize); - } else if (executor != null && newSize <= 0) { - //we need to shut down our executor - component.getCamelContext().getExecutorServiceManager().shutdown(executor); - executor = null; - } else if (executor instanceof ThreadPoolExecutor) { - //our thread pool executor is of type ThreadPoolExecutor, we know how to resize it - final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; - threadPoolExecutor.setCorePoolSize(newSize); - threadPoolExecutor.setMaximumPoolSize(newSize); - } else if (newSize > 0) { - //hmmm...no idea what kind of executor this is...just kill it and start fresh - component.getCamelContext().getExecutorServiceManager().shutdown(executor); - - executor = component.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, uri, - newSize); - } + public synchronized int getEndpointCount() { + return endpoints.size(); } /** diff --git a/src/test/java/com/github/camel/component/disruptor/DisruptorReconfigureWithBlockingProducer.java b/src/test/java/com/github/camel/component/disruptor/DisruptorReconfigureWithBlockingProducer.java new file mode 100644 index 0000000..a50b4f1 --- /dev/null +++ b/src/test/java/com/github/camel/component/disruptor/DisruptorReconfigureWithBlockingProducer.java @@ -0,0 +1,112 @@ +/* + * Copyright 2012 Riccardo Sirchia + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.camel.component.disruptor; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * @version + */ +public class DisruptorReconfigureWithBlockingProducer extends CamelTestSupport { + + @Test + public void testDisruptorReconfigureWithBlockingProducer() throws Exception { + getMockEndpoint("mock:a").expectedMessageCount(20); + getMockEndpoint("mock:b").expectedMinimumMessageCount(10); + + long beforeStart = System.currentTimeMillis(); + ProducerThread producerThread = new ProducerThread(); + producerThread.start(); + + //synchronize with the producer to the point that the buffer is full + assertTrue(producerThread.awaitFullBufferProduced()); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("disruptor:foo?multipleConsumers=true&size=8").id("testRoute").to("mock:b"); + } + }); + + // adding the consumer may take place after the current buffer is flushed + // which will take approximately 8*200=1600 ms because of delay on route. + // If the reconfigure does not correctly hold back the producer thread on this request, + // it will take approximately 20*200=4000 ms. + // be on the safe side and check that it was at least faster than 2 seconds. + assertTrue("Reconfigure of Disruptor blocked", (System.currentTimeMillis() - beforeStart) < 2000); + + //Wait and check that the producer has produced all messages without throwing an exception + assertTrue(producerThread.checkResult()); + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("disruptor:foo?multipleConsumers=true&size=8").delay(200).to("mock:a"); + } + }; + } + + private class ProducerThread extends Thread { + private final CountDownLatch startedLatch = new CountDownLatch(1); + private final CountDownLatch resultLatch = new CountDownLatch(1); + private Exception exception; + + @Override + public void run() { + for (int i = 0; i < 8; i++) { + template.sendBody("disruptor:foo", "Message"); + } + + startedLatch.countDown(); + + try { + for (int i = 0; i < 12; i++) { + template.sendBody("disruptor:foo", "Message"); + } + } catch (Exception e) { + exception = e; + } + + resultLatch.countDown(); + } + + public boolean awaitFullBufferProduced() throws InterruptedException { + return startedLatch.await(5, TimeUnit.SECONDS); + } + + public boolean checkResult() throws Exception { + if (exception != null) { + throw exception; + } + boolean result = resultLatch.await(5, TimeUnit.SECONDS); + if (exception != null) { + throw exception; + } + + return result; + } + } +} diff --git a/src/test/java/com/github/camel/component/disruptor/SedaDisruptorCompareTest.java b/src/test/java/com/github/camel/component/disruptor/SedaDisruptorCompareTest.java index 476f634..c844092 100644 --- a/src/test/java/com/github/camel/component/disruptor/SedaDisruptorCompareTest.java +++ b/src/test/java/com/github/camel/component/disruptor/SedaDisruptorCompareTest.java @@ -17,11 +17,6 @@ package com.github.camel.component.disruptor; import com.lmax.disruptor.collections.Histogram; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.*; import org.apache.camel.*; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.seda.SedaEndpoint; @@ -31,6 +26,12 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.*; + /** * This class does not perform any functional test, but instead makes a comparison between the performance of the * Disruptor and SEDA component in several use cases. @@ -265,7 +266,14 @@ public void run() { endpointSizeQueue.offer(sedaEndpoint.getCurrentQueueSize()); } else if (endpoint instanceof DisruptorEndpoint) { final DisruptorEndpoint disruptorEndpoint = (DisruptorEndpoint) endpoint; - endpointSizeQueue.offer((int) (disruptorEndpoint.getSize() - disruptorEndpoint.remainingCapacity())); + + long remainingCapacity = 0; + try { + remainingCapacity = disruptorEndpoint.remainingCapacity(); + } catch (DisruptorNotStartedException e) { + //ignore + } + endpointSizeQueue.offer((int) (disruptorEndpoint.getSize() - remainingCapacity)); } } };