I like RxJava
and use it every day: it is already very mature library and helps a lot, but some parts of it still need to improve the documentation. And one of them is observeOn
operator - mobile developers use it a lot. Its JavaDoc
states the following:
Note that
onError
notifications will cut ahead ofonNext
notifications on the emission thread ifScheduler
is truly asynchronous
So what does it mean? I guess many devs didn't put attention to this sentence. Let's write the following simple test:
Scheduler newThreadScheduler = Schedulers.newThread();
Observable<Integer> stream = Observable.create(integerEmitter -> {
integerEmitter.onNext(1);
integerEmitter.onNext(2);
integerEmitter.onNext(3);
integerEmitter.onNext(4);
integerEmitter.onNext(5);
integerEmitter.onError(new RuntimeException());
}, Emitter.BackpressureMode.NONE);
TestSubscriber<Integer> subscriber = new TestSubscriber<>();
stream.subscribeOn(Schedulers.computation())
.observeOn(newThreadScheduler).subscribe(subscriber);
subscriber.awaitTerminalEvent();
subscriber.assertValues(1, 2, 3, 4, 5);
subscriber.assertError(RuntimeException.class);
Do you think it will pass? No, it won't. The trick here is in the observeOn
operator. When there is a queue of events from upstream with Error
and thread switch is needed, this Error
can cut off all the previous events from this queue and subscriber will receive only it. This behavior was implemented long time ago and motivation to do like that can be found here https://github.com/ReactiveX/RxJava/issues/1680.
I've learned it hard way - via bug. Say you are developing app, which uses cache, then network
approach to show the data. And once you has shown data from the cache you want to show the error, stating that the latest version of data cannot be loaded. In this case you don't want Error
to cut off data from the cache, and you don't want to use onErrorResumeNext
, because subscriber perfectly knows how to handle such case in onError
. So what to do? Thanks to the creators of RxJava
, there is overloaded observeOn
where you can pass delayError
parameter:
indicates if the
onError
notification may not cut ahead ofonNext
notification on the other side of the scheduling boundary. If true a sequence ending inonError
will be replayed in the same order as was received from upstream
The test above will be fixed if we replace observeOn(newThreadScheduler)
with observeOn(newThreadScheduler, true)
.