Skip to content

Commit

Permalink
feat(Subscriber.create): Removed.
Browse files Browse the repository at this point in the history
+ Removes `Subscriber.create`.
+ Removes `SafeSubscriber`, which is no longer necessary.

BREAKING CHANGE: Subscriber.create has been removed. Please use `new Subscriber` instead.
  • Loading branch information
benlesh committed Feb 21, 2022
1 parent bdab737 commit 826d807
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 36 deletions.
29 changes: 0 additions & 29 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,6 @@ import { isSubscriber } from './util/isSubscriber';
* @class Subscriber<T>
*/
export class Subscriber<T> extends Subscription implements Observer<T> {
/**
* A static factory for a Subscriber, given a (potentially partial) definition
* of an Observer.
* @param next The `next` callback of an Observer.
* @param error The `error` callback of an
* Observer.
* @param complete The `complete` callback of an
* Observer.
* @return A Subscriber wrapping the (partially defined)
* Observer represented by the given arguments.
* @nocollapse
* @deprecated Do not use. Will be removed in v8. There is no replacement for this
* method, and there is no reason to be creating instances of `Subscriber` directly.
* If you have a specific use case, please file an issue.
*/
static create<T>(next?: (x?: T) => void, error?: (e?: any) => void, complete?: () => void): Subscriber<T> {
return new SafeSubscriber({ next, error, complete });
}

/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
protected isStopped: boolean = false;
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
Expand Down Expand Up @@ -180,16 +161,6 @@ export function createSafeSubscriber<T>(observerOrNext?: Partial<Observer<T>> |
return new Subscriber(observerOrNext);
}

class SafeSubscriber<T> extends Subscriber<T> {
constructor(observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null) {
super();

// Wrap the partial observer to ensure it's a full observer, and
// make sure proper error handling is accounted for.
this.destination = createSafeObserver(observerOrNext);
}
}

/**
* An error handler used when no error handler was supplied
* to the SafeSubscriber -- meaning no error handler was supplied
Expand Down
14 changes: 7 additions & 7 deletions src/internal/observable/dom/WebSocketSubject.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Subject, AnonymousSubject } from '../../Subject';
import { Subscriber } from '../../Subscriber';
import { createSafeSubscriber, Subscriber } from '../../Subscriber';
import { Observable } from '../../Observable';
import { Subscription } from '../../Subscription';
import { Operator } from '../../Operator';
Expand Down Expand Up @@ -296,8 +296,8 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {

const queue = this.destination;

this.destination = Subscriber.create<T>(
(x) => {
this.destination = createSafeSubscriber({
next: (x: T) => {
if (socket!.readyState === 1) {
try {
const { serializer } = this._config;
Expand All @@ -307,7 +307,7 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
}
}
},
(err) => {
error: (err: any) => {
const { closingObserver } = this._config;
if (closingObserver) {
closingObserver.next(undefined);
Expand All @@ -319,15 +319,15 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
}
this._resetState();
},
() => {
complete: () => {
const { closingObserver } = this._config;
if (closingObserver) {
closingObserver.next(undefined);
}
socket!.close();
this._resetState();
}
) as Subscriber<any>;
},
});

if (queue && queue instanceof ReplaySubject) {
subscription.add((queue as ReplaySubject<T>).subscribe(this.destination));
Expand Down

0 comments on commit 826d807

Please sign in to comment.