Compartir Observables o compartir Observers en RxSwift, RxJS, RxJava.

Cuando se diseñan operaciones a través de Rx quizás pueda surgir dos tipos de necesidades:

  • Compartir el Observable
  • Compartir el Observer

Repasemos un poco en qué consisten estos dos tipos de mecanismos y por qué podrían ser necesarios.

Compartir el Observable

Compartir el Observable podría ser necesario cuando se define una operación que ejecuta múltiples tareas y cuyo resultado le interesa a múltiples observadores, es decir, el resultado entregado por el Observable será usado para múltiples decisiones.



¿Cuál es la motivación principal de compartir el Observable

Cada vez que se vincula un Observer a un Observable se crea un stream independiente. De tal forma que si por ejemplo el Observable encapsula una tarea que implica consultar a una API remota, dicha tarea será ejecutada tantas veces como Observers vinculados haya.


let sourceObservable = Observable<Int>

            .interval(.seconds(1), scheduler: serialScheduler)

            .flatMap { id -> Observable<String> in

                return Observable.just("Request to API with userId = \(id)")}

            .do(onNext: { result in print(result) })

            .take(1)


        let subscriptionA = sourceObservable

            .subscribe(onNext: { element in

                // A Observing code

            })


        let subscriptionB = sourceObservable

            .subscribe(onNext: { element in

                // B Observing code

            })


        let subscriptionC = sourceObservable

            .subscribe(onNext: { element in

                // C Observing code

            })


        // To avoid immediately dispose of the

        // emission and to see the share effect

        Thread.sleep(forTimeInterval: 5.0)

        subscriptionA.dispose()

        subscriptionB.dispose()

        subscriptionC.dispose()


Tenemos 1 Observable y 3 Observers vinculados. La respuesta es la siguiente cuando se ejecuta el código:

Request to API with userId = 0

Request to API with userId = 0

Request to API with userId = 0


La operación .do(onNext: ...) es simplemente para hacer la traza y evidenciar la ejecución de la operación del Observable 3 veces.

Acá el lector podría hacerse la siguiente pregunta:

¿No sería más óptimo ejecutar la lógica contenida por el Observable una sola vez y compartir la operación con múltiples Observers?

Efectivamente. Una buena práctica es compartir la emisión de dicho Observable a través de multicasting.

Para dicho propósito, se tiene a disposición los operadores multicasting. Por ejemplo, en el siguiente código se usará el operador share para modificar el ejemplo anterior:

let sourceObservable = Observable<Int>

            .interval(.seconds(1), scheduler: serialScheduler)

            .flatMap { id -> Observable<String> in

                return Observable.just("Request to API with userId = \(id)")}

            .do(onNext: { result in print(result) })

            .take(1)

            .share()


        let subscriptionA = sourceObservable

            .subscribe(onNext: { element in

                // A Observing code

            })


        let subscriptionB = sourceObservable

            .subscribe(onNext: { element in

                // B Observing code

            })


        let subscriptionC = sourceObservable

            .subscribe(onNext: { element in

                // C Observing code

            })


        Thread.sleep(forTimeInterval: 5.0)

        subscriptionA.dispose()

        subscriptionB.dispose()

        subscriptionC.dispose()


Con dicha modificación, el resultado es el siguiente: 


Request to API with userId = 0


En este caso hipotético, la consulta a una API remota se ejecuta una sola vez y es compartido entre sus múltiples subscriptores.

El mismo efecto se podría obtener a través de los operadores publish y connect aplicados en cold observables así:

let sourceObservable = Observable<Int>

            .interval(.seconds(1), scheduler: serialScheduler)

            .flatMap { id -> Observable<String> in

                return Observable.just("Request to API with userId = \(id)")}

            .do(onNext: { result in print(result) })

            .take(1)

            .publish()


        sourceObservable

            .subscribe(onNext: { element in

                // A Observing code

            }).disposed(by: disposeBag)


        sourceObservable

            .subscribe(onNext: { element in

                // B Observing code

            }).disposed(by: disposeBag)


        sourceObservable

            .subscribe(onNext: { element in

                // C Observing code

            }).disposed(by: disposeBag)


        //Apply Connect

        sourceObservable.connect()


Estos casos son también conocidos en Rx como escenarios de multicasting y para tal caso existen además de publish y connect otros operadores adicionales:


Compartir el Observer

Por otro lado, existen los escenarios en los que se necesita compartir el Observer en lugar del Observable, es decir, cuando se necesita que un Observador reciba y procese los resultados enviados desde múltiples Observables.


En estos casos ya no hablamos de multicasting. Lo que se necesita ahora es recibir la información de múltiples fuentes de generación de datos y consolidarlas en una única subscripción.

Para estos propósitos también se puede acudir a un conjunto de operadores que permitan dicha tarea. Dichos operadores son los llamados operadores de combinación.

Veamos el siguiente ejemplo de código:

let observableA = Observable<Int>

            .interval(.seconds(1), scheduler: serialScheduler)

            .flatMap { id -> Observable<String> in

                return Observable.just("Request to API A with userId = \(id)")}

            .take(1)



        let observableB = Observable<Int>

            .interval(.seconds(1), scheduler: serialScheduler)

            .flatMap { id -> Observable<String> in

                return Observable.just("Request to API B with userId = \(id)")}

            .take(1)


        let observableC = Observable<Int>

            .interval(.seconds(1), scheduler: serialScheduler)

            .flatMap { id -> Observable<String> in

                return Observable.just("Request to API C with userId = \(id)")}

            .take(1)


        let subscription = Observable.merge(observableA, observableB, observableC)

            .subscribe(onNext: { result in

                print(result)

            })


        Thread.sleep(forTimeInterval: 5.0)

        subscription.dispose()


Tenemos 3 Observables y 1 Observer. La emisión de las tres fuentes se consolidan en una única subscripción procesadas por el Observer.

Request to API A with userId = 0

Request to API B with userId = 0

Request to API C with userId = 0


Ahora, hay varias formas de procesar las emisiones de las fuentes. Se pueden combinar o mezclar de acuerdo a un algoritmo establecido por el tipo de operador.

Por ejemplo, si se tienen 3 Observables y se combinan a través del operador merge, el Observador recibirá las emisiones provenientes de los 3 Observables

Si utiliza el operador combineLatest, el Observador recibirá las emisiones consolidadas con el último ítem de emitido de cada fuente.

Pero en cambio, se usa el operador amb, el Observador recibirá solo la emisión del primer Observable que emita más rápido, es decir un escenario de race condition.

Cuando se usan los operadores de combinación, se debe verificar la naturaleza del stream, considerar si es infinite o finite, ya que podría presentarse escenarios no deseados. 


Compartir múltiples Observables a múltiples Observers

Otros escenarios muy comunes que resultan cuando se diseña con Rx, son aquellos cuando se tienen múltiples fuentes emitiendo datos y se requiere que múltiples observadores sean notificados.

En estos diseños es importante un componente que funcione tanto de Observable como de Observer.

Estamos hablando nada más ni nada menos que de los Subjects, los cuales tienen la capacidad de trabajar casi de proxy para interconectar estos múltiples eventos.






Veamos un ejemplo de aplicación bastante común. La paginación de resultados con el patrón infinite scrolling.

En este caso Paginator es el componente responsable de manejar la paginación. Es un claro candidato para ser Subject y sus funciones podrían ser las siguientes:

Como Observador, debe estar atento a una petición de cargue de página inicial con los resultados, también debe estar atento a cuando el scroll alcanza el button de la página e iniciar la carga de resultados de la siguiente página. Y adicionalmente, debe estar atento cuando el usuario manualmente solicita actualizar los resultados.

Como Observable, debe notificar a un observer que se encarga de actualizar una lista en la vista con los resultados encontrados. También debe notificar a otro observer que se encarga de ocultar el componente UI que notifica el progreso de la obtención de la información, un progress bar, por ejemplo.
Además, podría necesitar notificar a un tercer observer que hará cualquier otra operación en la vista con los resultados obtenidos.



En Rx hay tres clases de Subjects: 
  • PublishSubjects.
  • BehaviorSubjects.
  • ReplaySubjects.

¿Cómo se decide que tipo de Subject utilizar o definir?

Depende del tipo de memoria que necesita el componente:

  • Si el componente necesita tener memoria de largo plazo, se utiliza ReplaySubject.
  • Si el componente necesita tener memoria de corto plazo, se utiliza BehaviorSubject.
  • Si el componente no necesita tener memoria, se utiliza PublishSubject.

Se muestra a continuación un ejemplo de aplicación de un componente basado en Subjects y que requiere actuar como proxy.

A nivel de la vista se podría tener una implementación como la siguiente:

import UIKit

import RxSwift


class PaginatorViewController: UIViewController {


    private let disposeBag = DisposeBag()

    private var paginator: Paginator?


    @IBOutlet weak var resultLabel: UILabel!

    

    override func viewDidLoad() {

        super.viewDidLoad()

        bindPaginator()

    }


    override func viewWillDisappear(_ animated: Bool) {

        super.viewWillDisappear(animated)

    }


    private func bindPaginator() {


        paginator = Paginator.instance


        paginator?.getResults(query: "Query 1")

            .map { result in "Updating the list with => \(result)" }

            .subscribe(onNext:  { result in

                self.showResults(result)

            })

            .disposed(by: disposeBag)


        paginator?.getResults(query: "Query 2")

            .map { result in "Hiding the loader after getting => \(result)" }

            .subscribe(onNext:  { result in

                self.showResults(result)

            })

            .disposed(by: disposeBag)


        paginator?.getResults(query: "Query 3")

            .map { result in "Doing any other update with => \(result)" }

            .subscribe(onNext:  { result in

                self.showResults(result)

            })

            .disposed(by: disposeBag)

    }


    @IBAction func loadInitialPage(_ sender: Any) {

        self.queryData(page: 1)

    }

    

    @IBAction func getNewPageByScrolling(_ sender: Any) {

        self.queryData(page: 5)

    }

    

    @IBAction func getNewPageManually(_ sender: Any) {

        self.queryData(page: 21)

    }


    private func queryData(page: Int) {

        paginator?.pushPage(numberPage: page)

    }


    private func showResults(_ result: String) {

        print(result)

        resultLabel.text = result

    }

}

El componente Paginator sería algo como:

import Foundation

import RxSwift


struct Paginator {


    private let page: PublishSubject<Int>

    static let instance = Paginator()


    private init() {

        page = PublishSubject<Int>()

    }


    func getResults(query: String) -> Observable<String> {

        

        return getPage()

            .flatMap { numberPage -> Observable<String> in

                return Observable.just("page: \(numberPage) and query: \(query)")

        }

        .share()

    }


    func pushPage(numberPage: Int) {

        page.onNext(numberPage)

    }


    func complete() {

        page.onCompleted()

    }


    private func getPage() -> Observable<Int> {

        return page.asObserver()

    }

}


Cuando se ejecuta el evento loadInitialPage


Updating the list with => page: 1 and query: Query 1

Hiding the loader after getting => page: 1 and query: Query 2

Doing any other update with => page: 1 and query: Query 3


Cuando se hace click en el botón getNewPageByScrolling:

Updating the list with => page: 5 and query: Query 1

Hiding the loader after getting => page: 5 and query: Query 2

Doing any other update with => page: 5 and query: Query 3


Cuando se hace click en el botón getNewPageByManually:

Updating the list with => page: 21 and query: Query 1

Hiding the loader after getting => page: 21 and query: Query 2

Doing any other update with => page: 21 and query: Query 3


El uso de Subjects también permite agregar cierta flexibilidad al diseño. En el ejemplo anterior el lector podrá apreciar cómo la subscripción se hace en el momento en el que se requiere, es decir en la ejecución del método. De igual forma en otros escenarios se podrá hacer a través de los operadores de combinación como ya se mencionaron.


Espero que este artículo sea de utilidad y haya permitido aclarar dudas sobre el uso de múltiples Observables y Observers.

Si desea mayor informarción sobre cómo moderar el uso de Subjects y utilizarlo únicamente en los casos que sean necesarios, les recomiendo la lectura del libro guía:

The Clean Way to Use Rx, versión tanto en Inglés como Español. Allí también encontrarán código equivalente del ejemplo con RxJS, RxJava.

 




Previous
Next Post »
Thanks for your comment