RxJava: Behavior of observeOn

I like RxJava and use it every day: it is already very mature library and helps a lot, but some parts of it still need to improve the documentation. And one of them is observeOn operator - mobile developers use it a lot. Its JavaDoc states the following:

Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly asynchronous

So what does it mean? I guess many devs didn't put attention to this sentence. Let's write the following simple test:

    Scheduler newThreadScheduler = Schedulers.newThread();

    Observable<Integer> stream = Observable.create(integerEmitter -> {
        integerEmitter.onNext(1);
        integerEmitter.onNext(2);
        integerEmitter.onNext(3);
        integerEmitter.onNext(4);
        integerEmitter.onNext(5);
        integerEmitter.onError(new RuntimeException());
    }, Emitter.BackpressureMode.NONE);

    TestSubscriber<Integer> subscriber = new TestSubscriber<>();
    stream.subscribeOn(Schedulers.computation())
            .observeOn(newThreadScheduler).subscribe(subscriber);

    subscriber.awaitTerminalEvent();

    subscriber.assertValues(1, 2, 3, 4, 5);
    subscriber.assertError(RuntimeException.class);

Do you think it will pass? No, it won't. The trick here is in the observeOn operator. When there is a queue of events from upstream with Error and thread switch is needed, this Error can cut off all the previous events from this queue and subscriber will receive only it. This behavior was implemented long time ago and motivation to do like that can be found here https://github.com/ReactiveX/RxJava/issues/1680.

I've learned it hard way - via bug. Say you are developing app, which uses cache, then network approach to show the data. And once you has shown data from the cache you want to show the error, stating that the latest version of data cannot be loaded. In this case you don't want Error to cut off data from the cache, and you don't want to use onErrorResumeNext, because subscriber perfectly knows how to handle such case in onError. So what to do? Thanks to the creators of RxJava, there is overloaded observeOn where you can pass delayError parameter:

indicates if the onError notification may not cut ahead of onNext notification on the other side of the scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received from upstream

The test above will be fixed if we replace observeOn(newThreadScheduler) with observeOn(newThreadScheduler, true).

comments powered by Disqus