上一篇文章介绍了RxJava
的执行原理。下面继续介绍一下操作符的执行原理,但是操作符太多了,这里用map
来进行说明。
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello ");
subscriber.onNext("World !");
subscriber.onCompleted();
}
});
observable.map(new Func1<String, String>() {
@Override
public String call(String s) {
return "say" + s;
}
});
observable.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i("@@@", s);
}
});
执行结果很显然是say Hello
和say World !
。
我们直接进入Observable.map()
方法的源码:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(new OnSubscribeMap<T, R>(this, func));
}
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
map
的内部调用了create()
方法,而create()
方法的源码我们再上一个版本已经介绍了,也就是说map
内部会创建一个新的Observable
对象,而且用一个新的OnSubscribeMap
对象作为参数。
OnSubscribeMap()
对象的参数分别是之前通过create()
方法创建的Observable
对象,以及map
中传递过来的Func1
类的对象。接下来就直接看OnSubscribeMap
类的源码,他实现了OnSubscribe
接口,并重写了call()
方法:
/**
* Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of
* this transformation as a new {@code Observable}.
*/
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
// 最初Observable.create()创建的Observable对象
final Observable<T> source;
// map方法传递过来的func1对象,它是一个转换功能
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
// 把新创建的MapSubscriber添加到Observable.subscribe(subscribe)方法传递的参数subscriber中
o.add(parent);
// unsafeSubscribe是subscribe方法的一个安全性不高的操作,可以简单理解为subscribe方法
source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
// 先会执行以下转换函数的call方法,然后将结果再传递给Subscribe对象调用它的onNext方法
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
}
因为在执行observable.subscribe(subscriber)
方法会调用到call()
方法,这里看一下call()
方法的核心:
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
// 把新创建的MapSubscriber添加到Observable.subscribe(subscribe)方法传递的参数subscriber中
o.add(parent);
// unsafeSubscribe是subscribe方法的一个安全性不高的操作,可以简单理解为subscribe方法,注意这里传递的是parent,也就是先创建的MapSubscriber对象,而这里的source是谁呢? 它是最初Observable.create创建的Observable对象
source.unsafeSubscribe(parent);
add()
及unsasfeSubscribe()
方法如下:
private final SubscriptionList subscriptions;
public final void add(Subscription s) {
subscriptions.add(s);
}
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r; // NOPMD
}
return Subscriptions.unsubscribed();
}
}
这一块代码就很简单了,因为和前面一篇我们分析的subscribe()
方法类似,相当于直接调用了最初Observable.create()
创建的Observable
对象的call(subscriber)
方法,而这里的subscriber
又是我们创建的MapSubscriber
的子类,所以这里相当于调用了MapSubscriber
类中的onNext()
、onComplete()
和onError()
方法:
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
// 先会执行以下转换函数的call方法,这个就是我们把Hello修改为say Hello的部分
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
// 将转换函数的call方法的执行结果交给最初的Subscriber.onNext()方法的参数来执行
actual.onNext(result);
}
...
}
乱不乱?
梳理一下:
- 我们把不使用
map
操作符时正常的操作创建的Observable
和Subscriber
分别称为1号。 - 而
map
又会通过create
分辨创建一个Observable
2号和Subscriber
2号,当我们执行map
的时候,会最终执行到Subscriber
2号的onNext()
方法中,而该方法内部会先执行一些转换操作,然后将执行完的结果作为参数传递给并调用最初的Subscriber
1号的onNext()
方法。懂不? 多看两遍,这里有点绕。
更多内容请看下一篇文章RxJava详解(六)
- 邮箱 :charon.chui@gmail.com
- Good Luck!