Skip to content

Commit

Permalink
Add vararg of Subscriptions to composite subscription.
Browse files Browse the repository at this point in the history
  • Loading branch information
Klemen Kresnik authored and Klemen Kresnik committed Apr 6, 2016
1 parent c8e1b03 commit 114f50c
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 16 deletions.
42 changes: 37 additions & 5 deletions src/main/java/rx/subscriptions/CompositeSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
*/
package rx.subscriptions;

import rx.Subscription;
import rx.exceptions.Exceptions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import rx.Subscription;
import rx.exceptions.*;

/**
* Subscription that represents a group of Subscriptions that are unsubscribed together.
* <p>
Expand Down Expand Up @@ -54,7 +54,7 @@ public boolean isUnsubscribed() {
* well.
*
* @param s
* the {@link Subscription} to add
* the {@link Subscription} to add
*/
public void add(final Subscription s) {
if (s.isUnsubscribed()) {
Expand All @@ -75,12 +75,44 @@ public void add(final Subscription s) {
s.unsubscribe();
}

/**
* Adds collection of {@link Subscription} to this {@code CompositeSubscription} if the
* {@code CompositeSubscription} is not yet unsubscribed. If the {@code CompositeSubscription} <em>is</em>
* unsubscribed, {@code addAll} will indicate this by explicitly unsubscribing all {@code Subscription} in collection as
* well.
*
* @param subscriptions
* the collection of {@link Subscription} to add
*/
public void addAll(final Subscription... subscriptions) {
if (!unsubscribed) {
synchronized (this) {
if (!unsubscribed) {
if (this.subscriptions == null) {
this.subscriptions = new HashSet<Subscription>(subscriptions.length);
}

for (Subscription s : subscriptions) {
if (!s.isUnsubscribed()) {
this.subscriptions.add(s);
}
}
return;
}
}
}

for (Subscription s : subscriptions) {
s.unsubscribe();
}
}

/**
* Removes a {@link Subscription} from this {@code CompositeSubscription}, and unsubscribes the
* {@link Subscription}.
*
* @param s
* the {@link Subscription} to remove
* the {@link Subscription} to remove
*/
public void remove(final Subscription s) {
if (!unsubscribed) {
Expand Down
139 changes: 128 additions & 11 deletions src/test/java/rx/subscriptions/CompositeSubscriptionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,19 @@
*/
package rx.subscriptions;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Test;
import rx.Subscription;
import rx.exceptions.CompositeException;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Subscription;
import rx.exceptions.CompositeException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class CompositeSubscriptionTest {

Expand Down Expand Up @@ -324,17 +323,18 @@ public void run() {
// we should have only unsubscribed once
assertEquals(1, counter.get());
}

@Test
public void testTryRemoveIfNotIn() {
CompositeSubscription csub = new CompositeSubscription();

CompositeSubscription csub1 = new CompositeSubscription();
CompositeSubscription csub2 = new CompositeSubscription();

csub.add(csub1);
csub.remove(csub1);
csub.add(csub2);

csub.remove(csub1); // try removing again
}

Expand All @@ -344,4 +344,121 @@ public void testAddingNullSubscriptionIllegal() {
csub.add(null);
}

@Test
public void testAddAll() {
final AtomicInteger counter = new AtomicInteger();
CompositeSubscription s = new CompositeSubscription();
s.addAll(new Subscription() {
@Override
public void unsubscribe() {
counter.incrementAndGet();
}

@Override
public boolean isUnsubscribed() {
return false;
}
}, new Subscription() {
@Override
public void unsubscribe() {
counter.incrementAndGet();
}

@Override
public boolean isUnsubscribed() {
return false;
}
}, new Subscription() {
@Override
public void unsubscribe() {
counter.incrementAndGet();
}

@Override
public boolean isUnsubscribed() {
return false;
}
}, new Subscription() {
@Override
public void unsubscribe() {
counter.incrementAndGet();
}

@Override
public boolean isUnsubscribed() {
return false;
}
});

s.unsubscribe();

assertEquals(4, counter.get());
}

@Test(timeout = 1000)
public void testAddAllConcurrent() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
final CompositeSubscription s = new CompositeSubscription();

final int count = 10;
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch end = new CountDownLatch(10);
final List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < count; i++) {
final Thread t = new Thread() {
@Override
public void run() {
try {
start.await();
s.addAll(new Subscription() {
@Override
public void unsubscribe() {
counter.incrementAndGet();
}

@Override
public boolean isUnsubscribed() {
return false;
}
}, new Subscription() {
@Override
public void unsubscribe() {
counter.incrementAndGet();
}

@Override
public boolean isUnsubscribed() {
return false;
}
}, new Subscription() {
@Override
public void unsubscribe() {
counter.incrementAndGet();
}

@Override
public boolean isUnsubscribed() {
return false;
}
});
end.countDown();
} catch (final InterruptedException e) {
fail(e.getMessage());
}
}
};
t.start();
threads.add(t);
}

start.countDown();
end.await();
s.unsubscribe();
for (final Thread t : threads) {
t.join();
}

assertEquals(30, counter.get());
}

}

0 comments on commit 114f50c

Please sign in to comment.