diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java index 9147a47578..cc78656603 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResource.java @@ -17,6 +17,7 @@ import javax.servlet.http.HttpSession; import java.io.IOException; +import java.util.List; import java.util.concurrent.TimeUnit; /** @@ -137,21 +138,51 @@ enum TRANSPORT {POLLING, LONG_POLLING, STREAMING, WEBSOCKET, JSONP, UNDEFINED, S AtmosphereConfig getAtmosphereConfig(); /** - * Return the current {@link Broadcaster}. + * Return the first added {@link Broadcaster}. * * @return the current {@link Broadcaster} */ Broadcaster getBroadcaster(); /** - * Set the current {@link Broadcaster}. If null, a new Broadcaster will be created with {@link Broadcaster.SCOPE#REQUEST} - * if that resource hasn't been suspended yet. + * Return an unmodifiable list of {@link Broadcaster}s associated with this resource + * + * @return an unmodifiable list of {@link Broadcaster} + */ + List broadcasters(); + + /** + * Remove this {@link org.atmosphere.cpr.AtmosphereResource} from all {@link org.atmosphere.cpr.Broadcaster} + * + * @return this + */ + public AtmosphereResource removeFromAllBroadcasters(); + + /** + * Set the first {@link Broadcaster} associated with this resource. This {@link org.atmosphere.cpr.Broadcaster} + * will be returned when {@link #getBroadcaster()} is invoked. * * @param broadcaster * @return this */ AtmosphereResource setBroadcaster(Broadcaster broadcaster); + /** + * Add/Associate a {@link org.atmosphere.cpr.Broadcaster} with this resource. + * + * @param broadcaster + * @return this + */ + AtmosphereResource addBroadcaster(Broadcaster broadcaster); + + /** + * Remove a {@link org.atmosphere.cpr.Broadcaster} with this resource. + * + * @param broadcaster + * @return this + */ + AtmosphereResource removeBroadcaster(Broadcaster broadcaster); + /** * Set the {@link Serializer} to use when {@link AtmosphereResource#write} execute the operation. * By default, the {@link Serializer} is null. diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceFactory.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceFactory.java index 20589b907e..86283a3d85 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceFactory.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceFactory.java @@ -167,7 +167,9 @@ AtmosphereResource create(AtmosphereConfig config, * * @param uuid the {@link org.atmosphere.cpr.AtmosphereResource#uuid()} * @return all {@link Broadcaster} associated with a {@link AtmosphereResource#uuid} - */ + * @deprecated Use {@link org.atmosphere.cpr.AtmosphereResourceFactory#find(String)}.broadcasters() instead +q */ + @Deprecated Set broadcasters(String uuid); /** diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java index f8a141ae47..34bc4cb747 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java @@ -24,9 +24,12 @@ import javax.servlet.http.HttpSession; import java.io.IOException; +import java.util.Collections; import java.util.Enumeration; +import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -57,7 +60,7 @@ public class AtmosphereResourceImpl implements AtmosphereResource { private AtmosphereRequest req; private AtmosphereResponse response; private final Action action = new Action(); - protected Broadcaster broadcaster; + protected final List broadcasters = new CopyOnWriteArrayList(); private AtmosphereConfig config; protected AsyncSupport asyncSupport; private Serializer serializer; @@ -112,12 +115,13 @@ public AtmosphereResource initialize(AtmosphereConfig config, Broadcaster broadc AsyncSupport asyncSupport, AtmosphereHandler atmosphereHandler) { this.req = req; this.response = response; - this.broadcaster = broadcaster; this.config = config; this.asyncSupport = asyncSupport; this.atmosphereHandler = atmosphereHandler; this.event = new AtmosphereResourceEventImpl(this); + broadcasters.add(broadcaster); + String s = (String) req.getAttribute(SUSPENDED_ATMOSPHERE_RESOURCE_UUID); if (s == null) { s = response.getHeader(HeaderConfig.X_ATMOSPHERE_TRACKING_ID); @@ -261,32 +265,24 @@ public AtmosphereResource resume() { action.type(Action.TYPE.RESUME); - // We need it as Jetty doesn't support timeout - Broadcaster b = getBroadcaster(false); - if (!b.isDestroyed() && b instanceof DefaultBroadcaster) { - ((DefaultBroadcaster) b).broadcastOnResume(this); - } - - notifyListeners(); - - try { - if (!b.isDestroyed()) { - broadcaster.removeAtmosphereResource(this); + boolean notify = true; + for (Broadcaster b : broadcasters) { + // We need it as Jetty doesn't support timeout + if (!b.isDestroyed() && b instanceof DefaultBroadcaster) { + ((DefaultBroadcaster) b).broadcastOnResume(this); } - } catch (IllegalStateException ex) { - logger.warn("Unable to resume", this); - logger.debug(ex.getMessage(), ex); - } - if (b.getScope() == Broadcaster.SCOPE.REQUEST) { - logger.debug("Broadcaster's scope is set to request, destroying it {}", b.getID()); - b.destroy(); - } + if (notify) { + notify = false; + notifyListeners(); + } - // Resuming here means we need to pull away from all other Broadcaster, if they exists. - if (config.getBroadcasterFactory() != null) { - config.getBroadcasterFactory().removeAllAtmosphereResource(this); + if (b.getScope() == Broadcaster.SCOPE.REQUEST) { + logger.debug("Broadcaster's scope is set to request, destroying it {}", b.getID()); + b.destroy(); + } } + removeFromAllBroadcasters(); try { req.setAttribute(ApplicationConfig.RESUMED_ON_TIMEOUT, Boolean.FALSE); @@ -393,6 +389,8 @@ public AtmosphereResource suspend(long timeout) { skipCreation = true; } + Broadcaster broadcaster = broadcasters.get(0); + // Null means SCOPE=REQUEST set by a Meteor if (!skipCreation && (broadcaster == null || broadcaster.getScope() == Broadcaster.SCOPE.REQUEST) && !isJersey) { String id = broadcaster != null ? broadcaster.getID() : ROOT_MASTER; @@ -447,7 +445,13 @@ public Broadcaster getBroadcaster() { return getBroadcaster(true); } + @Override + public List broadcasters() { + return Collections.unmodifiableList(broadcasters); + } + protected Broadcaster getBroadcaster(boolean autoCreate) { + Broadcaster broadcaster = broadcasters.size() == 0 ? null : broadcasters.get(0); if (broadcaster == null) { throw new IllegalStateException("No Broadcaster associated with this AtmosphereResource."); } @@ -477,7 +481,24 @@ protected Broadcaster getBroadcaster(boolean autoCreate) { @Override public AtmosphereResourceImpl setBroadcaster(Broadcaster broadcaster) { - this.broadcaster = broadcaster; + // For legacy + if (broadcasters.size() == 0) { + broadcasters.add(broadcaster); + } else { + broadcasters.set(0, broadcaster); + } + return this; + } + + @Override + public AtmosphereResource addBroadcaster(Broadcaster broadcaster) { + broadcasters.add(broadcaster); + return this; + } + + @Override + public AtmosphereResource removeBroadcaster(Broadcaster broadcaster) { + broadcasters.remove(broadcaster); return this; } @@ -637,10 +658,7 @@ public AtmosphereResource notifyListeners(AtmosphereResourceEvent event) { if (event.isSuspended()) { logger.warn("Exception during suspend() operation {}", t.toString()); logger.debug("", t); - broadcaster.removeAtmosphereResource(this); - if (config.getBroadcasterFactory() != null) { - config.getBroadcasterFactory().removeAllAtmosphereResource(this); - } + removeFromAllBroadcasters(); } else { logger.debug("Listener error {}", t); } @@ -654,6 +672,18 @@ public AtmosphereResource notifyListeners(AtmosphereResourceEvent event) { return this; } + @Override + public AtmosphereResource removeFromAllBroadcasters(){ + for (Broadcaster b: broadcasters) { + try { + b.removeAtmosphereResource(this); + } catch (Exception ex) { + logger.trace("", ex); + } + } + return this; + } + /** * Notify {@link AtmosphereResourceEventListener} thah an unexpected exception occured. * @@ -757,12 +787,12 @@ public void cancel() throws IOException { logger.trace("Cancelling {}", uuid); if (config.getBroadcasterFactory() != null) { - config.getBroadcasterFactory().removeAllAtmosphereResource(this); + removeFromAllBroadcasters(); if (transport.equals(TRANSPORT.WEBSOCKET)) { String parentUUID = (String) req.getAttribute(SUSPENDED_ATMOSPHERE_RESOURCE_UUID); AtmosphereResource p = config.resourcesFactory().find(parentUUID); if (p != null) { - config.getBroadcasterFactory().removeAllAtmosphereResource(p); + p.removeFromAllBroadcasters(); } } } @@ -806,12 +836,9 @@ private void unregister() { public void _destroy() { try { if (!isCancelled.get()) { - if (broadcaster != null) broadcaster.removeAtmosphereResource(this); - - if (config.getBroadcasterFactory() != null) { - config.getBroadcasterFactory().removeAllAtmosphereResource(this); - } + removeFromAllBroadcasters(); } + broadcasters.clear(); unregister(); removeEventListeners(); } catch (Throwable t) { @@ -832,7 +859,7 @@ public String toString() { ",\n\t isResumed=" + isResumed() + ",\n\t isCancelled=" + isCancelled() + ",\n\t isSuspended=" + isSuspended() + - ",\n\t broadcaster=" + broadcaster.getID() + " size: " + broadcaster.getAtmosphereResources().size() + + ",\n\t broadcasters=" + broadcasters + ",\n\t isClosedByClient=" + (event != null ? event.isClosedByClient() : false) + ",\n\t isClosedByApplication=" + (event != null ? event.isClosedByApplication() : false) + ",\n\t action=" + action + @@ -906,7 +933,15 @@ public AtmosphereResourceImpl cloneState(AtmosphereResource r) { addEventListener(l); } AtmosphereResourceImpl.class.cast(r).session(r.session()); - setBroadcaster(r.getBroadcaster()); + boolean isFirst = true; + for (Broadcaster b : broadcasters) { + if (isFirst) { + isFirst = false; + setBroadcaster(b); + } else { + addBroadcaster(b); + } + } atmosphereHandler(r.getAtmosphereHandler()); return this; } diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterFactory.java b/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterFactory.java index 6617b255a5..732382beba 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterFactory.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/BroadcasterFactory.java @@ -124,7 +124,9 @@ public interface BroadcasterFactory { * Remove all instances of {@link AtmosphereResource} from all registered {@link Broadcaster}s. * * @param r an void {@link AtmosphereResource} + * */ + @Deprecated void removeAllAtmosphereResource(AtmosphereResource r); /** diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultAtmosphereResourceFactory.java b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultAtmosphereResourceFactory.java index d04e02f251..52c17ba48a 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultAtmosphereResourceFactory.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultAtmosphereResourceFactory.java @@ -257,7 +257,7 @@ public AtmosphereResource remove(String uuid) { logger.trace("Removing: {}", uuid); AtmosphereResource r = resources.remove(uuid); if (r != null) { - r.getAtmosphereConfig().getBroadcasterFactory().removeAllAtmosphereResource(r); + r.removeFromAllBroadcasters(); } return r; } @@ -290,19 +290,13 @@ public void locate(String uuid, Async async) { * * @param uuid the {@link org.atmosphere.cpr.AtmosphereResource#uuid()} * @return all {@link Broadcaster} associated with a {@link AtmosphereResource#uuid} + * @deprecated Use {@link org.atmosphere.cpr.AtmosphereResourceFactory#find(String)}.broadcasters() instead */ @Override + @Deprecated public Set broadcasters(String uuid) { - Collection l = broadcasterFactory.lookupAll(); - Set h = new HashSet(); - for (Broadcaster b : l) { - for (AtmosphereResource r : b.getAtmosphereResources()) { - if (r.uuid().equalsIgnoreCase(uuid)) { - h.add(b); - } - } - } - return h; + AtmosphereResource r = find(uuid); + return new HashSet(r.broadcasters()); } /** diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java index 2fcd8ac391..d3d8cb3b45 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java @@ -853,10 +853,10 @@ protected void executeAsyncWrite(final AsyncWriteToken token) { logger.trace("If you are using Tomcat 7.0.22 and lower, you're most probably hitting http://is.gd/NqicFT"); logger.trace("ApplicationConfig.CACHE_MESSAGE_ON_IO_FLUSH_EXCEPTION {}", cacheOnIOFlushException, t); - lostCandidate = cacheOnIOFlushException ? cacheOnIOFlushException : cacheMessageOnIOException(t); + lostCandidate = cacheOnIOFlushException ? cacheOnIOFlushException : cacheMessageOnIOException(t); // The Request/Response associated with the AtmosphereResource has already been written and commited removeAtmosphereResource(r, false); - config.getBroadcasterFactory().removeAllAtmosphereResource(r); + r.removeFromAllBroadcasters(); event.setCancelled(true); event.setThrowable(t); r.setIsInScope(false); @@ -905,7 +905,7 @@ protected void executeAsyncWrite(final AsyncWriteToken token) { } } - protected boolean cacheMessageOnIOException(Throwable cause){ + protected boolean cacheMessageOnIOException(Throwable cause) { for (StackTraceElement element : cause.getStackTrace()) { if (element.getMethodName().equals("flush") || element.getMethodName().equals("flushBuffer")) { return false; @@ -1423,6 +1423,7 @@ protected void cacheAndSuspend(AtmosphereResource r) { protected void notifyAndAdd(AtmosphereResource r) { resources.add(r); + r.addBroadcaster(this); notifyOnAddAtmosphereResourceListener(r); } diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcasterFactory.java b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcasterFactory.java index e9aba69f53..5438329e2a 100755 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcasterFactory.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcasterFactory.java @@ -235,6 +235,7 @@ public T lookup(Class c, Object id, boolean createIfN } } + @Deprecated @Override public void removeAllAtmosphereResource(AtmosphereResource r) { // Remove inside all Broadcaster as well. diff --git a/modules/cpr/src/main/java/org/atmosphere/interceptor/IdleResourceInterceptor.java b/modules/cpr/src/main/java/org/atmosphere/interceptor/IdleResourceInterceptor.java index 24b32a4e3d..bc48d90bab 100644 --- a/modules/cpr/src/main/java/org/atmosphere/interceptor/IdleResourceInterceptor.java +++ b/modules/cpr/src/main/java/org/atmosphere/interceptor/IdleResourceInterceptor.java @@ -82,7 +82,7 @@ protected void idleResources() { try { if (req.getAttribute(MAX_INACTIVE) == null) { logger.error("Invalid state {}", r); - config.getBroadcasterFactory().removeAllAtmosphereResource(r); + r.removeFromAllBroadcasters(); config.resourcesFactory().unRegisterUuidForFindCandidate(r); continue; } @@ -109,7 +109,7 @@ protected void idleResources() { AsynchronousProcessor.class.cast(config.framework().getAsyncSupport()).endRequest(AtmosphereResourceImpl.class.cast(r), true); } } finally { - config.getBroadcasterFactory().removeAllAtmosphereResource(r); + r.removeFromAllBroadcasters(); config.resourcesFactory().unRegisterUuidForFindCandidate(r); } }