Skip to content

Commit

Permalink
Solved multi-threading issues in DisruptorReference
Browse files Browse the repository at this point in the history
  • Loading branch information
sirchia committed May 2, 2013
1 parent 803174a commit f457399
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
package com.github.camel.component.disruptor;

import com.lmax.disruptor.dsl.ProducerType;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.camel.*;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.impl.DefaultEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;

/**
* TODO: documentation
*/
Expand Down Expand Up @@ -58,7 +59,7 @@ public int getSize() {
}

@ManagedOperation(description = "Remaining capacity in ring buffer")
public long remainingCapacity() {
public long remainingCapacity() throws DisruptorNotStartedException {
return getDisruptor().remainingCapacity();
}

Expand Down Expand Up @@ -205,7 +206,7 @@ Collection<LifecycleAwareExchangeEventHandler> createConsumerEventHandlers() {
*
* @param exchange
*/
void publish(final Exchange exchange) {
void publish(final Exchange exchange) throws DisruptorNotStartedException {
disruptorReference.publish(exchange);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2012 Riccardo Sirchia
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.camel.component.disruptor;

/**
* This exception is thrown when a producer attempts to publish an exchange while the Disruptor is not yet started or
* already shut down
*/
public class DisruptorNotStartedException extends Exception {
public DisruptorNotStartedException() {
super();
}

public DisruptorNotStartedException(String message) {
super(message);
}

public DisruptorNotStartedException(String message, Throwable cause) {
super(message, cause);
}

public DisruptorNotStartedException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.github.camel.component.disruptor;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
Expand All @@ -26,6 +24,9 @@
import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.ExchangeHelper;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* A Producer for the Disruptor component.
*/
Expand Down Expand Up @@ -110,7 +111,12 @@ public String toString() {
});

log.trace("Publishing Exchange to disruptor ringbuffer: {}", copy);
endpoint.publish(copy);
try {
endpoint.publish(copy);
} catch (DisruptorNotStartedException e) {
log.trace("Exception while publishing Exchange to disruptor ringbuffer", e);
copy.setException(e);
}

if (timeout > 0) {
if (log.isTraceEnabled()) {
Expand Down Expand Up @@ -156,7 +162,12 @@ public String toString() {
// handover the completion so its the copy which performs that, as we do not wait
final Exchange copy = prepareCopy(exchange, true);
log.trace("Publishing Exchange to disruptor ringbuffer: {}", copy);
endpoint.publish(copy);
try {
endpoint.publish(copy);
} catch (DisruptorNotStartedException e) {
log.trace("Exception while publishing Exchange to disruptor ringbuffer", e);
copy.setException(e);
}
}

// we use OnCompletion on the Exchange to callback and wait for the Exchange to be done
Expand Down
Loading

0 comments on commit f457399

Please sign in to comment.