Skip to content

Commit

Permalink
Fix for #890
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarcand committed Feb 11, 2013
1 parent 5ea5722 commit 01b5681
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,4 +321,8 @@ public interface ApplicationConfig {
* Set to true if order of message delivered to the client is not important
*/
String OUT_OF_ORDER_BROADCAST = Broadcaster.class.getName() + ".supportOutOfOrderBroadcast";
/**
* The write operation timeout
*/
String WRITE_TIMEOUT = Broadcaster.class.getName() + ".writeTimeout";
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public class DefaultBroadcaster implements Broadcaster {
protected BroadcasterCache.STRATEGY cacheStrategy = BroadcasterCache.STRATEGY.AFTER_FILTER;
private final Object[] awaitBarrier = new Object[0];
private final AtomicBoolean outOfOrderBroadcastSupported = new AtomicBoolean(false);
private int writeTimeoutInSecond = -1;

public DefaultBroadcaster(String name, URI uri, AtmosphereConfig config) {
this.name = name;
Expand All @@ -156,6 +157,11 @@ public DefaultBroadcaster(String name, URI uri, AtmosphereConfig config) {
if (s != null) {
outOfOrderBroadcastSupported.set(Boolean.valueOf(s));
}

s = config.getInitParameter(ApplicationConfig.WRITE_TIMEOUT);
if (s != null) {
writeTimeoutInSecond = Integer.valueOf(s);
}
}

public DefaultBroadcaster(String name, AtmosphereConfig config) {
Expand Down Expand Up @@ -1011,10 +1017,48 @@ protected void trackBroadcastMessage(final AtmosphereResource r, Entry entry) {
}

protected void invokeOnStateChange(final AtmosphereResource r, final AtmosphereResourceEvent e) {
try {
r.getAtmosphereHandler().onStateChange(e);
} catch (Throwable t) {
onException(t, r);
if (writeTimeoutInSecond != -1) {
WriteOperation w = new WriteOperation(r, e);
bc.getScheduledExecutorService().schedule(w, writeTimeoutInSecond, TimeUnit.MILLISECONDS);

try {
w.call();
} catch (Exception ex) {
logger.warn("", ex);
}
} else {
try {
r.getAtmosphereHandler().onStateChange(e);
} catch (Throwable t) {
onException(t, r);
}
}
}

final class WriteOperation implements Callable<Object> {

private final AtmosphereResource r;
private final AtmosphereResourceEvent e;
private AtomicBoolean completed = new AtomicBoolean();

private WriteOperation(AtmosphereResource r, AtmosphereResourceEvent e) {
this.r = r;
this.e = e;
}

@Override
public Object call() throws Exception {
if (!completed.getAndSet(true)) {
try {
r.getAtmosphereHandler().onStateChange(e);
} catch (Throwable t) {
onException(t, r);
}
} else {
onException(new IOException("Unable to write after " + writeTimeoutInSecond), r);
AtmosphereResourceImpl.class.cast(r).cancel();
}
return null;
}
}

Expand Down

0 comments on commit 01b5681

Please sign in to comment.