From 01b5681b0d76e8ee89b16c51a2b1043430a589e0 Mon Sep 17 00:00:00 2001 From: jfarcand Date: Wed, 6 Feb 2013 13:57:43 -0500 Subject: [PATCH] Fix for #890 --- .../org/atmosphere/cpr/ApplicationConfig.java | 4 ++ .../atmosphere/cpr/DefaultBroadcaster.java | 52 +++++++++++++++++-- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/ApplicationConfig.java b/modules/cpr/src/main/java/org/atmosphere/cpr/ApplicationConfig.java index 90cbf376a82..5afdeca2041 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/ApplicationConfig.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/ApplicationConfig.java @@ -321,4 +321,8 @@ public interface ApplicationConfig { * Set to true if order of message delivered to the client is not important */ String OUT_OF_ORDER_BROADCAST = Broadcaster.class.getName() + ".supportOutOfOrderBroadcast"; + /** + * The write operation timeout + */ + String WRITE_TIMEOUT = Broadcaster.class.getName() + ".writeTimeout"; } diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java index 6feeec90c17..7d10b0a0f9a 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java @@ -137,6 +137,7 @@ public class DefaultBroadcaster implements Broadcaster { protected BroadcasterCache.STRATEGY cacheStrategy = BroadcasterCache.STRATEGY.AFTER_FILTER; private final Object[] awaitBarrier = new Object[0]; private final AtomicBoolean outOfOrderBroadcastSupported = new AtomicBoolean(false); + private int writeTimeoutInSecond = -1; public DefaultBroadcaster(String name, URI uri, AtmosphereConfig config) { this.name = name; @@ -156,6 +157,11 @@ public DefaultBroadcaster(String name, URI uri, AtmosphereConfig config) { if (s != null) { outOfOrderBroadcastSupported.set(Boolean.valueOf(s)); } + + s = config.getInitParameter(ApplicationConfig.WRITE_TIMEOUT); + if (s != null) { + writeTimeoutInSecond = Integer.valueOf(s); + } } public DefaultBroadcaster(String name, AtmosphereConfig config) { @@ -1011,10 +1017,48 @@ protected void trackBroadcastMessage(final AtmosphereResource r, Entry entry) { } protected void invokeOnStateChange(final AtmosphereResource r, final AtmosphereResourceEvent e) { - try { - r.getAtmosphereHandler().onStateChange(e); - } catch (Throwable t) { - onException(t, r); + if (writeTimeoutInSecond != -1) { + WriteOperation w = new WriteOperation(r, e); + bc.getScheduledExecutorService().schedule(w, writeTimeoutInSecond, TimeUnit.MILLISECONDS); + + try { + w.call(); + } catch (Exception ex) { + logger.warn("", ex); + } + } else { + try { + r.getAtmosphereHandler().onStateChange(e); + } catch (Throwable t) { + onException(t, r); + } + } + } + + final class WriteOperation implements Callable { + + private final AtmosphereResource r; + private final AtmosphereResourceEvent e; + private AtomicBoolean completed = new AtomicBoolean(); + + private WriteOperation(AtmosphereResource r, AtmosphereResourceEvent e) { + this.r = r; + this.e = e; + } + + @Override + public Object call() throws Exception { + if (!completed.getAndSet(true)) { + try { + r.getAtmosphereHandler().onStateChange(e); + } catch (Throwable t) { + onException(t, r); + } + } else { + onException(new IOException("Unable to write after " + writeTimeoutInSecond), r); + AtmosphereResourceImpl.class.cast(r).cancel(); + } + return null; } }