RxJava2: Consejos - Tips

1. En RxJava2 es ilegal emitir valores nulos, tratar de emitir un null generará un error. Esto significa que ahora no es posible manejar o checkear un valor null en el subscribe (if value != null) ya que antes de llegar al subscribe se generará un error.


Observable obs = Observable.just(null);
obs.subscribe(System.out::println);

Exception in thread "main" java.lang.NullPointerException: The item is null
 at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
 at io.reactivex.Observable.just(Observable.java:2249)
 at Launcher.main(Launcher.java:6)

2. onError deber ser siempre implementado, es una mala practica dejar este evento sin implementación ya que si llegase a ocurrir una excepción no tendríamos la trazabilidad del mismo.

List items = Arrays.asList(1, 2, 3, 4, 5);

Observable source = Observable.fromIterable(items);

          source.map(i -> i / 0)
                .subscribe(s -> System.out.println("Received data: " + s),
                        throwable -> System.err.println("Received error: " + throwable));

3. Recordemos que cuando se requiera ejecutar una actividad final después de finalizar las tareas del Observer, siempre se tiene como opción usar el evento completed.


List items = Arrays.asList(1, 2, 3, 4, 5);

Observable source = Observable.fromIterable(items);

        source.subscribe(s -> System.out.println("Received data: " + s),
                throwable -> System.err.println("Received error: " + throwable),
                () -> System.out.println("Finished stream! "));

4. Recordar que map es un operador destinado a transformaciones one-to-one, flatMap/concatMap son operadores útiles para transformaciones one-to-many.

5. Los operadores de reducción como por ejemplo count, reduce, all, contains deben aplicarse sobre streams finitos ya que dependen de la ejecución de onComplete. Para los casos donde sea necesario trabajar con streams infinitos se recomienda otro tipo de operadores como por ejemplo scan.

6. switchMap es un operador con el cuál es posible evitar múltiples emisiones innecesarias, por ejemplo evitando click múltiples en un botón o cualquier Widget UI. Una buena estrategia es combinarlo con el operador doOnNext para inhabilitar la vista y no recibir eventos. Otros casos donde es muy útil es para evitar peticiones duplicadas a una API.

switchMap es similar a flatMap, con la diferencia de que sólo será tenido en cuenta el dato emitido por el último Observable, los datos emitidos por todos los Observables previos serán cancelados y no tenidos en cuenta. Esto evitará redundancia en la emisión de datos.

7. merge es un operador que trabaja con streams infinitos. A través de merge es posible mezclar múltiples stream infinitos.

8. Al usar concat se debe tener cuidado con los streams infinitos, ya que si el primer Observable es infinito el segundo Observable nunca emitirá.


9. Con ConnectableObservable y publish/connect es posible generar multicasting, es decir emitir simultáneamente hacia cada uno de los Observers suscritos. ConnectableObservable genera Observables de tipo hot, incluso si el Observable inicial es cold a través de esta subclase se convierte en un Observable hot.

//Create and Publish the Observable
ConnectableObservable observable =
                Observable.just("One", "Two", "Three", "Four")
                        .publish();

        //Observer 1
        observable.subscribe(s -> System.out.println("Observer 1: " + s));

        //Observer 2
        observable.map(String::toUpperCase)
                .subscribe(i -> System.out.println("Observer 2: " + i));

        //Connect Observable
        observable.connect();

Observer 1: One
Observer 2: ONE
Observer 1: Two
Observer 2: TWO
Observer 1: Three
Observer 2: THREE
Observer 1: Four
Observer 2: FOUR 


10. Otra forma de generar multicasting es compartir la suscripción a múltiples Observables, es decir consolidando las emisiones de múltiples Observables en uno sólo Observer suscrito. Esto es posible con Subjects, por ejemplo en este caso se utiliza el PublishSubject:

//Create the Subject
Subject subject = PublishSubject.create();

    //Apply any transformation
    subject.map(String::toUpperCase)
           .subscribe(System.out::println);

    // Emit, emit , emit
    subject.onNext("One");
    subject.onNext("Two");
    subject.onNext("Three");
    subject.onNext("Four");
    subject.onNext("Five");
    subject.onComplete();

ONE
TWO
THREE
FOUR
FIVE        


11. publish().refCount() = autoConnect(1) + auto dispose = share().

12. AsyncSubject no debería usarse en streams infinitos.

13. autoConnect permite reactivar la suscripción de un Observable.

14. Hay operadores como por ejemplo interval, throttleLast, debounce, buffer que vienen con un scheduler predeterminado, sin embargo es posible reasignarle otro tipo de scheduler a través del método sobrecargado que recibe el tipo de scheduler como argumento.

Por ejemplo, en el siguiente código vemos como las operaciones se realizan sobre scheduler Schedulers.computation() que por defecto tiene definido el operador interval:

Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(s -> System.out.println(s + " Mississippi" + " on thread "
                        + Thread.currentThread().getName()));

0 Mississippi on thread RxComputationThreadPool-1
1 Mississippi on thread RxComputationThreadPool-1
2 Mississippi on thread RxComputationThreadPool-1
3 Mississippi on thread RxComputationThreadPool-1
4 Mississippi on thread RxComputationThreadPool-1

Sin embargo, si especificamos el scheduler vía argumento como en el siguiente código notaremos como las operaciones ahora se ejecutan sobre Schedulers.newThread():

Observable.interval(1, TimeUnit.SECONDS, Schedulers.newThread())
            .subscribe(s -> System.out.println(s + " Mississippi" + " on thread "
                    + Thread.currentThread().getName()));

0 Mississippi on thread RxNewThreadScheduler-1
1 Mississippi on thread RxNewThreadScheduler-1
2 Mississippi on thread RxNewThreadScheduler-1
3 Mississippi on thread RxNewThreadScheduler-1
4 Mississippi on thread RxNewThreadScheduler-1

15. observeOn puede inducir a escenarios backpressure cuando se ubica entre operadores. En el código de ejemplo, si Operación A llega a emitir más rápido que Operación B se estaría frente a un escenario de backpressure.


Observable.just(1, 2, 3, 4)
            .subscribeOn(Schedulers.io())
            //Operation A
            .map(i -> i * 2)
            .observeOn(Schedulers.computation())
            //Operation B
            .map(Launcher::calculateBigNumber)
            .subscribe(i -> System.out.println("Received " + i + " on thread "
                        + Thread.currentThread().getName()));

16. Las operaciones de efecto colateral tales como doOnNext, doOnSuccess, doOnComplete deben contener código sincrónico no bloqueante. A veces se comete el error de asociar una operación reactive dentro del doOnXXX.

Por ejemplo, supongamos que se tiene las siguientes operaciones:

private static Completable firstOperation() {
   return Completable.fromRunnable(() -> runProcess());
}

private static Completable lastOperation() {
   return Completable.fromRunnable(() -> runProcess());
}

private static void runProcess() {
    //Any process here
    System.out.println("Any process");
}

La forma correcta de usarlas podría ser:

firstOperation()
          .doOnComplete(() -> runProcess())
          .subscribeOn(Schedulers.io())
          .subscribe(() -> 
             System.out.println("Done!"));

La forma incorrecta de usarla sería:

firstOperation()
          .doOnComplete(() -> lastOperation())
          .subscribeOn(Schedulers.io())
          .subscribe(() -> 
                System.out.println("Done!"));

Referencias:

Learning Rxjava by Thomas Nield
Reactivex - RxJava
Thanks for your comment