烦恼一般都是想太多了。

0%

关于RxJava操作符链式调用的理解

源于想要知道,对于链式调用背后的工作原理是什么,上游与下游间的通信是如何进行的。

前言

经常我们会看到这样的写法:

source.operator1().operator2().operator3().subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

站在 operator2 的位置,左边称为 UpStream,右边直到 subscriber/consumer 都称作 DownStream

如果我们把每个操作符单独放在一行的话看起来会更明了:

source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)

Observable的三个状态

对于每个 Observable 都会经历三个状态,配置、订阅、运行

配置(Assembly Time)

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;

这个状态会对 Observable 应用多个操作符,在这个时候,数据还没有开始流动。

订阅(Subscription Time)

一个中间状态,表示当 subscribe() 被调用的时候。

运行时(Runtime)

这个时候, Observable 就开始发射数据了。

Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);

例子

我们以一个简单的例子来分析:

Observable.fromArray("A","B","C")
.map( v-> v + "1")
.subscribe(System.out::println);

配置阶段

也即是对 Observable 应用 map() 操作符的阶段:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

每个操作符,其实都是返回了一个全新的 Observable,同时返回的每个 Observable 都保留了其操作符应用的对象,也就是其 UpStream

这个例子中,map() 操作符,新建了一个 ObservableMap,同时其对我们最开始的 source 有一个引用。

订阅阶段

当我们调用 subscribeOn() 的时候,实际上,是在我们最后一个操作符返回的 Observable 上进行的。

同时我们也知道,对于每个 Observable.subscribeOn() 其实最终执行的都是由实现类的 subscribeActual() 方法。

对于 ObservableMap:

@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}

其用一个 MapObserver 来订阅了其 UpStream

对于我们例子中的 ObservableFromArray, 其会建立一个 FromArrayDisposable 对象:

@Override
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);

observer.onSubscribe(d);

if (d.fusionMode) {
return;
}

d.run();
}

很惊喜的发现,其会将 MapObserver 称作 DownStream

FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.downstream = actual;
this.array = array;
}

运行时

当数据开始流动,我们的 FromArrayDisposable 开始发射数据:

void run() {
T[] a = array;
int n = a.length;

for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}

可以看到,数据都是直接丢给了 MapObserver的,接着 MapObserver 又把数据丢给了我们定义的 Consumer:

@Override
public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != NONE) {
downstream.onNext(null);
return;
}

U v;

try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}

总结

  1. 在配置阶段。按照操作符应用的顺序,形成 Observable 的链式关系。
  2. 在订阅阶段。从最后一个 Observable 开始,内部以不同类型的 Observer 来从后往前传递订阅关系。
  3. 发射阶段。最开始的 Observable 会通过上下游间的 Observer 来传递数据,直到我们定义的消费者。