Skip to content

Commit

Permalink
fix(WebSocketSubject): ensure WebSocketSubject can be resubscribed
Browse files Browse the repository at this point in the history
- Also fixes issue where Subject would throw if internal code nulled out `observers`.
  • Loading branch information
benlesh committed Jan 13, 2016
1 parent 58cd806 commit 861a0c1
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 8 deletions.
88 changes: 86 additions & 2 deletions spec/observables/dom/webSocket-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,90 @@ describe('Observable.webSocket', function () {
socket.triggerMessage('pong');
expect(messageReceived).toBe(true);
});

it ('receive multiple messages', function () {
var expected = ['what', 'do', 'you', 'do', 'with', 'a', 'drunken', 'sailor?'];
var results = [];
var subject = Observable.webSocket('ws://mysocket');

subject.subscribe(function (x) {
results.push(x);
});

var socket = MockWebSocket.lastSocket();

socket.open();

expected.forEach(function (x) {
socket.triggerMessage(x);
});

expect(results).toEqual(expected);
});

it ('should queue messages prior to subscription', function () {
var expected = ['make', 'him', 'walk', 'the', 'plank'];
var subject = Observable.webSocket('ws://mysocket');

expected.forEach(function (x) {
subject.next(x);
});

var socket = MockWebSocket.lastSocket();
expect(socket).not.toBeDefined();

subject.subscribe();

socket = MockWebSocket.lastSocket();
expect(socket.sent.length).toBe(0);

socket.open();
expect(socket.sent.length).toBe(expected.length);
});

it('should send messages immediately if alreayd open', function () {
var subject = Observable.webSocket('ws://mysocket');
subject.subscribe();
var socket = MockWebSocket.lastSocket();
socket.open();

subject.next('avast!');
expect(socket.lastMessageSent()).toBe('avast!');
subject.next('ye swab!');
expect(socket.lastMessageSent()).toBe('ye swab!');
});

it('should close the socket when completed', function () {
var subject = Observable.webSocket('ws://mysocket');
subject.subscribe();
var socket = MockWebSocket.lastSocket();
socket.open();

expect(socket.readyState).toBe(1); // open

spyOn(socket, 'close').and.callThrough();
expect(socket.close).not.toHaveBeenCalled();

subject.complete();
expect(socket.close).toHaveBeenCalled();
expect(socket.readyState).toBe(3); // closed
});

it('should allow resubscription after closure', function () {
var subject = Observable.webSocket('ws://mysocket');
subject.subscribe();
var socket1 = MockWebSocket.lastSocket();
socket1.open();
subject.complete();

subject.next('a mariner yer not. yarrr.');
subject.subscribe();
var socket2 = MockWebSocket.lastSocket();
socket2.open();

expect(socket2).not.toBe(socket1);
expect(socket2.lastMessageSent()).toBe('a mariner yer not. yarrr.');
});
});

var sockets = [];
Expand All @@ -44,7 +128,7 @@ function MockWebSocket(url, protocol) {
this.protocol = protocol;
this.sent = [];
this.handlers = {};
this.readyState = 1;
this.readyState = 0;
}

MockWebSocket.lastSocket = function () {
Expand Down Expand Up @@ -92,7 +176,7 @@ MockWebSocket.prototype = {
this.readyState = 2;
this.closeCode = code;
this.closeReason = reason;
this.triggerClose();
this.triggerClose({ wasClean: (!code || code === 1000) });
}
},

Expand Down
16 changes: 10 additions & 6 deletions src/Subject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,17 @@ export class Subject<T> extends Observable<T> implements Observer<T>, Subscripti
protected _finalError(err: any): void {
let index = -1;
const observers = this.observers;
const len = observers.length;

// optimization to block our SubjectSubscriptions from
// splicing themselves out of the observers list one by one.
this.observers = null;
this.isUnsubscribed = true;

while (++index < len) {
observers[index].error(err);
if (observers) {
const len = observers.length;
while (++index < len) {
observers[index].error(err);
}
}

this.isUnsubscribed = false;
Expand All @@ -179,15 +181,17 @@ export class Subject<T> extends Observable<T> implements Observer<T>, Subscripti
protected _finalComplete(): void {
let index = -1;
const observers = this.observers;
const len = observers.length;

// optimization to block our SubjectSubscriptions from
// splicing themselves out of the observers list one by one.
this.observers = null;
this.isUnsubscribed = true;

while (++index < len) {
observers[index].complete();
if (observers) {
const len = observers.length;
while (++index < len) {
observers[index].complete();
}
}

this.isUnsubscribed = false;
Expand Down
9 changes: 9 additions & 0 deletions src/observable/dom/webSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,26 @@ export class WebSocketSubject<T> extends Subject<T> {
}

_unsubscribe() {
this.socket = null;
this.source = null;
this.destination = new ReplaySubject();
this.isStopped = false;
this.hasErrored = false;
this.hasCompleted = false;
this.observers = null;
this.isUnsubscribed = false;
}

_subscribe(subscriber: Subscriber<T>) {
if (!this.observers) {
this.observers = [];
}

const subscription = <Subscription>super._subscribe(subscriber);
// HACK: For some reason transpilation wasn't honoring this in arrow functions below
// Doesn't seem right, need to reinvestigate.
const self = this;
const WebSocket = this.WebSocketCtor;

if (self.source || !subscription || (<Subscription>subscription).isUnsubscribed) {
return subscription;
Expand Down

0 comments on commit 861a0c1

Please sign in to comment.