Skip to content

Commit

Permalink
🐎 Fixes #1885
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarcand committed Mar 14, 2015
1 parent f6822b7 commit 6a38386
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import javax.servlet.http.HttpSession;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -137,21 +138,51 @@ enum TRANSPORT {POLLING, LONG_POLLING, STREAMING, WEBSOCKET, JSONP, UNDEFINED, S
AtmosphereConfig getAtmosphereConfig();

/**
* Return the current {@link Broadcaster}.
* Return the first added {@link Broadcaster}.
*
* @return the current {@link Broadcaster}
*/
Broadcaster getBroadcaster();

/**
* Set the current {@link Broadcaster}. If null, a new Broadcaster will be created with {@link Broadcaster.SCOPE#REQUEST}
* if that resource hasn't been suspended yet.
* Return an unmodifiable list of {@link Broadcaster}s associated with this resource
*
* @return an unmodifiable list of {@link Broadcaster}
*/
List<Broadcaster> broadcasters();

/**
* Remove this {@link org.atmosphere.cpr.AtmosphereResource} from all {@link org.atmosphere.cpr.Broadcaster}
*
* @return this
*/
public AtmosphereResource removeFromAllBroadcasters();

/**
* Set the first {@link Broadcaster} associated with this resource. This {@link org.atmosphere.cpr.Broadcaster}
* will be returned when {@link #getBroadcaster()} is invoked.
*
* @param broadcaster
* @return this
*/
AtmosphereResource setBroadcaster(Broadcaster broadcaster);

/**
* Add/Associate a {@link org.atmosphere.cpr.Broadcaster} with this resource.
*
* @param broadcaster
* @return this
*/
AtmosphereResource addBroadcaster(Broadcaster broadcaster);

/**
* Remove a {@link org.atmosphere.cpr.Broadcaster} with this resource.
*
* @param broadcaster
* @return this
*/
AtmosphereResource removeBroadcaster(Broadcaster broadcaster);

/**
* Set the {@link Serializer} to use when {@link AtmosphereResource#write} execute the operation.
* By default, the {@link Serializer} is null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ AtmosphereResource create(AtmosphereConfig config,
*
* @param uuid the {@link org.atmosphere.cpr.AtmosphereResource#uuid()}
* @return all {@link Broadcaster} associated with a {@link AtmosphereResource#uuid}
*/
* @deprecated Use {@link org.atmosphere.cpr.AtmosphereResourceFactory#find(String)}.broadcasters() instead
q */

This comment has been minimized.

Copy link
@slovdahl

slovdahl Mar 16, 2015

Contributor

Typo :)

@Deprecated
Set<Broadcaster> broadcasters(String uuid);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@

import javax.servlet.http.HttpSession;
import java.io.IOException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -57,7 +60,7 @@ public class AtmosphereResourceImpl implements AtmosphereResource {
private AtmosphereRequest req;
private AtmosphereResponse response;
private final Action action = new Action();
protected Broadcaster broadcaster;
protected final List<Broadcaster> broadcasters = new CopyOnWriteArrayList<Broadcaster>();
private AtmosphereConfig config;
protected AsyncSupport asyncSupport;
private Serializer serializer;
Expand Down Expand Up @@ -112,12 +115,13 @@ public AtmosphereResource initialize(AtmosphereConfig config, Broadcaster broadc
AsyncSupport asyncSupport, AtmosphereHandler atmosphereHandler) {
this.req = req;
this.response = response;
this.broadcaster = broadcaster;
this.config = config;
this.asyncSupport = asyncSupport;
this.atmosphereHandler = atmosphereHandler;
this.event = new AtmosphereResourceEventImpl(this);

broadcasters.add(broadcaster);

String s = (String) req.getAttribute(SUSPENDED_ATMOSPHERE_RESOURCE_UUID);
if (s == null) {
s = response.getHeader(HeaderConfig.X_ATMOSPHERE_TRACKING_ID);
Expand Down Expand Up @@ -261,32 +265,24 @@ public AtmosphereResource resume() {

action.type(Action.TYPE.RESUME);

// We need it as Jetty doesn't support timeout
Broadcaster b = getBroadcaster(false);
if (!b.isDestroyed() && b instanceof DefaultBroadcaster) {
((DefaultBroadcaster) b).broadcastOnResume(this);
}

notifyListeners();

try {
if (!b.isDestroyed()) {
broadcaster.removeAtmosphereResource(this);
boolean notify = true;
for (Broadcaster b : broadcasters) {
// We need it as Jetty doesn't support timeout
if (!b.isDestroyed() && b instanceof DefaultBroadcaster) {
((DefaultBroadcaster) b).broadcastOnResume(this);
}
} catch (IllegalStateException ex) {
logger.warn("Unable to resume", this);
logger.debug(ex.getMessage(), ex);
}

if (b.getScope() == Broadcaster.SCOPE.REQUEST) {
logger.debug("Broadcaster's scope is set to request, destroying it {}", b.getID());
b.destroy();
}
if (notify) {
notify = false;
notifyListeners();
}

// Resuming here means we need to pull away from all other Broadcaster, if they exists.
if (config.getBroadcasterFactory() != null) {
config.getBroadcasterFactory().removeAllAtmosphereResource(this);
if (b.getScope() == Broadcaster.SCOPE.REQUEST) {
logger.debug("Broadcaster's scope is set to request, destroying it {}", b.getID());
b.destroy();
}
}
removeFromAllBroadcasters();

try {
req.setAttribute(ApplicationConfig.RESUMED_ON_TIMEOUT, Boolean.FALSE);
Expand Down Expand Up @@ -393,6 +389,8 @@ public AtmosphereResource suspend(long timeout) {
skipCreation = true;
}

Broadcaster broadcaster = broadcasters.get(0);

// Null means SCOPE=REQUEST set by a Meteor
if (!skipCreation && (broadcaster == null || broadcaster.getScope() == Broadcaster.SCOPE.REQUEST) && !isJersey) {
String id = broadcaster != null ? broadcaster.getID() : ROOT_MASTER;
Expand Down Expand Up @@ -447,7 +445,13 @@ public Broadcaster getBroadcaster() {
return getBroadcaster(true);
}

@Override
public List<Broadcaster> broadcasters() {
return Collections.unmodifiableList(broadcasters);
}

protected Broadcaster getBroadcaster(boolean autoCreate) {
Broadcaster broadcaster = broadcasters.size() == 0 ? null : broadcasters.get(0);
if (broadcaster == null) {
throw new IllegalStateException("No Broadcaster associated with this AtmosphereResource.");
}
Expand Down Expand Up @@ -477,7 +481,24 @@ protected Broadcaster getBroadcaster(boolean autoCreate) {

@Override
public AtmosphereResourceImpl setBroadcaster(Broadcaster broadcaster) {
this.broadcaster = broadcaster;
// For legacy
if (broadcasters.size() == 0) {
broadcasters.add(broadcaster);
} else {
broadcasters.set(0, broadcaster);
}
return this;
}

@Override
public AtmosphereResource addBroadcaster(Broadcaster broadcaster) {
broadcasters.add(broadcaster);
return this;
}

@Override
public AtmosphereResource removeBroadcaster(Broadcaster broadcaster) {
broadcasters.remove(broadcaster);
return this;
}

Expand Down Expand Up @@ -637,10 +658,7 @@ public AtmosphereResource notifyListeners(AtmosphereResourceEvent event) {
if (event.isSuspended()) {
logger.warn("Exception during suspend() operation {}", t.toString());
logger.debug("", t);
broadcaster.removeAtmosphereResource(this);
if (config.getBroadcasterFactory() != null) {
config.getBroadcasterFactory().removeAllAtmosphereResource(this);
}
removeFromAllBroadcasters();
} else {
logger.debug("Listener error {}", t);
}
Expand All @@ -654,6 +672,18 @@ public AtmosphereResource notifyListeners(AtmosphereResourceEvent event) {
return this;
}

@Override
public AtmosphereResource removeFromAllBroadcasters(){
for (Broadcaster b: broadcasters) {
try {
b.removeAtmosphereResource(this);
} catch (Exception ex) {
logger.trace("", ex);
}
}
return this;
}

/**
* Notify {@link AtmosphereResourceEventListener} thah an unexpected exception occured.
*
Expand Down Expand Up @@ -757,12 +787,12 @@ public void cancel() throws IOException {
logger.trace("Cancelling {}", uuid);

if (config.getBroadcasterFactory() != null) {
config.getBroadcasterFactory().removeAllAtmosphereResource(this);
removeFromAllBroadcasters();
if (transport.equals(TRANSPORT.WEBSOCKET)) {
String parentUUID = (String) req.getAttribute(SUSPENDED_ATMOSPHERE_RESOURCE_UUID);
AtmosphereResource p = config.resourcesFactory().find(parentUUID);
if (p != null) {
config.getBroadcasterFactory().removeAllAtmosphereResource(p);
p.removeFromAllBroadcasters();
}
}
}
Expand Down Expand Up @@ -806,12 +836,9 @@ private void unregister() {
public void _destroy() {
try {
if (!isCancelled.get()) {
if (broadcaster != null) broadcaster.removeAtmosphereResource(this);

if (config.getBroadcasterFactory() != null) {
config.getBroadcasterFactory().removeAllAtmosphereResource(this);
}
removeFromAllBroadcasters();
}
broadcasters.clear();
unregister();
removeEventListeners();
} catch (Throwable t) {
Expand All @@ -832,7 +859,7 @@ public String toString() {
",\n\t isResumed=" + isResumed() +
",\n\t isCancelled=" + isCancelled() +
",\n\t isSuspended=" + isSuspended() +
",\n\t broadcaster=" + broadcaster.getID() + " size: " + broadcaster.getAtmosphereResources().size() +
",\n\t broadcasters=" + broadcasters +
",\n\t isClosedByClient=" + (event != null ? event.isClosedByClient() : false) +
",\n\t isClosedByApplication=" + (event != null ? event.isClosedByApplication() : false) +
",\n\t action=" + action +
Expand Down Expand Up @@ -906,7 +933,15 @@ public AtmosphereResourceImpl cloneState(AtmosphereResource r) {
addEventListener(l);
}
AtmosphereResourceImpl.class.cast(r).session(r.session());
setBroadcaster(r.getBroadcaster());
boolean isFirst = true;
for (Broadcaster b : broadcasters) {
if (isFirst) {
isFirst = false;
setBroadcaster(b);
} else {
addBroadcaster(b);
}
}
atmosphereHandler(r.getAtmosphereHandler());
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ public interface BroadcasterFactory {
* Remove all instances of {@link AtmosphereResource} from all registered {@link Broadcaster}s.
*
* @param r an void {@link AtmosphereResource}
*
*/
@Deprecated
void removeAllAtmosphereResource(AtmosphereResource r);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public AtmosphereResource remove(String uuid) {
logger.trace("Removing: {}", uuid);
AtmosphereResource r = resources.remove(uuid);
if (r != null) {
r.getAtmosphereConfig().getBroadcasterFactory().removeAllAtmosphereResource(r);
r.removeFromAllBroadcasters();
}
return r;
}
Expand Down Expand Up @@ -290,19 +290,13 @@ public void locate(String uuid, Async async) {
*
* @param uuid the {@link org.atmosphere.cpr.AtmosphereResource#uuid()}
* @return all {@link Broadcaster} associated with a {@link AtmosphereResource#uuid}
* @deprecated Use {@link org.atmosphere.cpr.AtmosphereResourceFactory#find(String)}.broadcasters() instead
*/
@Override
@Deprecated
public Set<Broadcaster> broadcasters(String uuid) {
Collection<Broadcaster> l = broadcasterFactory.lookupAll();
Set<Broadcaster> h = new HashSet<Broadcaster>();
for (Broadcaster b : l) {
for (AtmosphereResource r : b.getAtmosphereResources()) {
if (r.uuid().equalsIgnoreCase(uuid)) {
h.add(b);
}
}
}
return h;
AtmosphereResource r = find(uuid);
return new HashSet<Broadcaster>(r.broadcasters());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,10 +853,10 @@ protected void executeAsyncWrite(final AsyncWriteToken token) {
logger.trace("If you are using Tomcat 7.0.22 and lower, you're most probably hitting http://is.gd/NqicFT");
logger.trace("ApplicationConfig.CACHE_MESSAGE_ON_IO_FLUSH_EXCEPTION {}", cacheOnIOFlushException, t);

lostCandidate = cacheOnIOFlushException ? cacheOnIOFlushException : cacheMessageOnIOException(t);
lostCandidate = cacheOnIOFlushException ? cacheOnIOFlushException : cacheMessageOnIOException(t);
// The Request/Response associated with the AtmosphereResource has already been written and commited
removeAtmosphereResource(r, false);
config.getBroadcasterFactory().removeAllAtmosphereResource(r);
r.removeFromAllBroadcasters();
event.setCancelled(true);
event.setThrowable(t);
r.setIsInScope(false);
Expand Down Expand Up @@ -905,7 +905,7 @@ protected void executeAsyncWrite(final AsyncWriteToken token) {
}
}

protected boolean cacheMessageOnIOException(Throwable cause){
protected boolean cacheMessageOnIOException(Throwable cause) {
for (StackTraceElement element : cause.getStackTrace()) {
if (element.getMethodName().equals("flush") || element.getMethodName().equals("flushBuffer")) {
return false;
Expand Down Expand Up @@ -1423,6 +1423,7 @@ protected void cacheAndSuspend(AtmosphereResource r) {

protected void notifyAndAdd(AtmosphereResource r) {
resources.add(r);
r.addBroadcaster(this);
notifyOnAddAtmosphereResourceListener(r);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public <T extends Broadcaster> T lookup(Class<T> c, Object id, boolean createIfN
}
}

@Deprecated
@Override
public void removeAllAtmosphereResource(AtmosphereResource r) {
// Remove inside all Broadcaster as well.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected void idleResources() {
try {
if (req.getAttribute(MAX_INACTIVE) == null) {
logger.error("Invalid state {}", r);
config.getBroadcasterFactory().removeAllAtmosphereResource(r);
r.removeFromAllBroadcasters();
config.resourcesFactory().unRegisterUuidForFindCandidate(r);
continue;
}
Expand All @@ -109,7 +109,7 @@ protected void idleResources() {
AsynchronousProcessor.class.cast(config.framework().getAsyncSupport()).endRequest(AtmosphereResourceImpl.class.cast(r), true);
}
} finally {
config.getBroadcasterFactory().removeAllAtmosphereResource(r);
r.removeFromAllBroadcasters();
config.resourcesFactory().unRegisterUuidForFindCandidate(r);
}
}
Expand Down

0 comments on commit 6a38386

Please sign in to comment.