From 2775e060d05b629cba7119ccc1a2873c00af34b0 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Fri, 14 Oct 2016 17:01:57 -0700 Subject: [PATCH] fix(WebSocketSubject.prototype.multiplex): no longer nulls out socket after first unsubscribe This fix ensure the observers count goes to zero before the state is reset on the WebSocketSubject instance fixes #2037 --- spec/observables/dom/webSocket-spec.ts | 55 ++++++++++++++++++++++++++ src/observable/dom/WebSocketSubject.ts | 8 ++-- 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/spec/observables/dom/webSocket-spec.ts b/spec/observables/dom/webSocket-spec.ts index eb00be6646..8dc4cfe9fc 100644 --- a/spec/observables/dom/webSocket-spec.ts +++ b/spec/observables/dom/webSocket-spec.ts @@ -507,6 +507,61 @@ describe('Observable.webSocket', () => { (socket.close).restore(); }); + it('should keep the same socket for multiple multiplex subscriptions', () => { + const socketSubject = Rx.Observable.webSocket({url: 'ws://mysocket'}); + const results = []; + const socketMessages = [ + {id: 'A'}, + {id: 'B'}, + {id: 'A'}, + {id: 'B'}, + {id: 'B'}, + ]; + + const sub1 = socketSubject.multiplex( + () => 'no-op', + () => results.push('A unsub'), + (req: any) => req.id === 'A') + .takeWhile((req: any) => !req.complete) + .subscribe( + () => results.push('A next'), + (e) => results.push('A error ' + e), + () => results.push('A complete') + ); + + socketSubject.multiplex( + () => 'no-op', + () => results.push('B unsub'), + (req: any) => req.id === 'B') + .subscribe( + () => results.push('B next'), + (e) => results.push('B error ' + e), + () => results.push('B complete') + ); + + // Setup socket and send messages + let socket = MockWebSocket.lastSocket; + socket.open(); + socketMessages.forEach((msg, i) => { + if (i === 1) { + sub1.unsubscribe(); + expect(socketSubject.socket).to.equal(socket); + } + socket.triggerMessage(JSON.stringify(msg)); + }); + socket.triggerClose({ wasClean: true }); + + expect(results).to.deep.equal([ + 'A next', + 'A unsub', + 'B next', + 'B next', + 'B next', + 'B complete', + 'B unsub', + ]); + }); + it('should not close the socket until all subscriptions complete', () => { const socketSubject = Rx.Observable.webSocket({url: 'ws://mysocket'}); const results = []; diff --git a/src/observable/dom/WebSocketSubject.ts b/src/observable/dom/WebSocketSubject.ts index 3dea185c29..247a85357f 100644 --- a/src/observable/dom/WebSocketSubject.ts +++ b/src/observable/dom/WebSocketSubject.ts @@ -220,10 +220,12 @@ export class WebSocketSubject extends AnonymousSubject { subscription.add(this._output.subscribe(subscriber)); subscription.add(() => { const { socket } = this; - if (this._output.observers.length === 0 && socket && socket.readyState === 1) { - socket.close(); + if (this._output.observers.length === 0) { + if (socket && socket.readyState === 1) { + socket.close(); + } + this._resetState(); } - this._resetState(); }); return subscription; }