Share Observables, share Observers, or both in RxJS, RxSwift and RxJava

Share Observables, share Observers, or both in RxJS, RxSwift, and RxJava


When designing operations through Rx, two types of needs could arise:

  • Share the Observables.
  • Share the Observers.

Let’s review what these two types of mechanisms consist of and why they might be necessary.

Share Observables

Sharing the Observable could be necessary when defining an operation that executes multiple tasks and whose result is of interest to numerous observers. That is, the result delivered by the Observable will be used for various decisions.


What is the primary motivation for sharing the Observable?

Whenever an Observer is linked to an Observable, a separate stream is created. That implies that the code defined in the Observable would be executed as many times as Observers have linked.

Consider the following example:

const sourceObservable = interval(1000)
.pipe(
flatMap(id => of(`Request to API whith userId =: ${id}`)),
tap(result => console.log(result)),
take(1));

const subscriptionA = sourceObservable
.subscribe(element => {
// A Observing code
});

const subscriptionB = sourceObservable
.subscribe(element => {
// B Observing code
});

const subscriptionC = sourceObservable
.subscribe(element => {
// C Observing code
});

setTimeout(() => {
subscriptionA.unsubscribe();
subscriptionB.unsubscribe();
subscriptionC.unsubscribe();
}, 5000);

There are 1 Observable and 3 Observers linked. The answer is as follows when the code is run:

Console output:
---------------
Request to API with userId = 0
Request to API with userId = 0
Request to API with userId = 0

The operation .tap (...) is to trace and demonstrate the Observable 3 times' execution. Here the reader could ask himself the following question:

Wouldn’t it be more optimal to run the logic contained by the Observable once and share the operation with multiple Observers?

Effectively. A good practice is to share the emission of said Observable through multicasting.

For this purpose, multicasting operators are available. For example, the following code will use the share operator to modify the previous example:

const sourceObservable = interval(1000)
.pipe(
flatMap(id => of(`Request to API whith userId =: ${id}`)),
tap(result => console.log(result)),
take(1),
share());

const subscriptionA = sourceObservable
.subscribe(element => {
// A Observing code
});

const subscriptionB = sourceObservable
.subscribe(element => {
// B Observing code
});

const subscriptionC = sourceObservable
.subscribe(element => {
// C Observing code
});

setTimeout(() => {
subscriptionA.unsubscribe();
subscriptionB.unsubscribe();
subscriptionC.unsubscribe();
}, 5000);

With such modification, the result is as follows:

Console output:
---------------
Request to API with userId = 0

In this hypothetical case, the query in the remote API runs only once and is shared among its multiple subscribers.

The same effect could be obtained through the publish, and connect operators applied in cold observables like this:

const sourceObservable = interval(1000)
.pipe(
flatMap(id => of(`Request to API whith userId =: ${id}`)),
tap(result => console.log(result)),
take(1),
publish()) as ConnectableObservable<string>;

const subscriptionA = sourceObservable
.subscribe(element => {
// A Observing code
});

const subscriptionB = sourceObservable
.subscribe(element => {
// B Observing code
});

const subscriptionC = sourceObservable
.subscribe(element => {
// C Observing code
});

sourceObservable.connect();

setTimeout(() => {
subscriptionA.unsubscribe();
subscriptionB.unsubscribe();
subscriptionC.unsubscribe();
}, 5000);


Console output:
---------------
Request to API with userId = 0

These cases are also known in Rx as multicasting scenarios, and for this case, there is additional share, publish, and connect other additional operators: 

Share Observers

On the other hand, there are the scenarios in which the Observer needs to be shared, that is when an Observer needs to receive and process the results sent from multiple Observables.


In these cases, we no longer talk about multicasting. What is needed now is to receive the information from multiple data generation sources and consolidate them into a single subscription.

For these purposes, you can also go to a set of operators that allow this task. These operators are the so-called combination operators.

const observableA = interval(1000)
.pipe(
flatMap(id => of(`Request to API A whith userId =: ${id}`)),
take(1));

const observableB = interval(1000)
.pipe(
flatMap(id => of(`Request to API B whith userId =: ${id}`)),
take(1));

const observableC = interval(1000)
.pipe(
flatMap(id => of(`Request to API C whith userId =: ${id}`)),
take(1));

const subscription = merge(
observableA,
observableB,
observableC
).subscribe(result =>
console.log(result));

setTimeout(() => {
subscription.unsubscribe();
}, 5000);

There are 3 Observables and 1 Observer. The issuance of the three sources are consolidated into a single subscription processed by the Observer:

Console output:
---------------
Request to API A with userId = 0
Request to API B with userId = 0
Request to API C with userId = 0

Now, there are several ways to process emissions from sources. They can be combined or mixed according to an algorithm established by the type of operator.

For example, through the merge operator, the Observer will receive mixed emissions from all three sources. 

If you use the combineLatest operator, the Observer will receive the consolidated emissions with each source's last emitted item.
 
But instead, the amb operator is used, the Observer will receive only the emission of the first Observable that emits the fastest, that is, a race condition scenario.


When using join operators, you should check the nature of the stream, consider whether it is infinite or finite, as this could lead to unwanted scenarios.

Sharing multiple Observables to multiple Observers

Other prevalent scenarios that result when designing with Rx are those when you have multiple sources emitting data and require numerous observers to be notified.

A component that works from both Observable and Observer is essential in these designs.

In Rx, the component with these characteristics is known as Subject, which can act as a proxy to interconnect these multiple events.






A practical example is the pagination of results with the infinite scrolling pattern.

In this case, Paginator is the component responsible for handling pagination. That component is a clear candidate to be a Subject, and his functions could be the following:

Like Observer, you should be on the lookout for a start page load request with the results. You should also be on the lookout for when the scroll reaches the button on the page and start loading the results on the next page. And additionally, you should be vigilant when the user manually requests to update the results.

As Observable, you must notify an observer that it is responsible for updating a list in the view with the results found. You must also notify another observer responsible for hiding the UI component that reports the progress of obtaining the information, a progress bar, for example.

You may also need to notify a third observer that it will do any other operations on the view with the results obtained.


In Rx there are three classes of Subjects:
  • PublishSubjects.
  • BehaviorSubjects.
  • ReplaySubjects.

What type of Subject should be used?

It depends on the type of memory the component needs:
  • If the component needs long-term memory, ReplaySubject is used.
  • If the component needs to have a short-term memory, BehaviorSubject is used.
  • If the component does not need memory, PublishSubject is used.
Below is an example application of a component based on Subjects, and that requires acting as a proxy.

At the View level, you could have an implementation like the following:

export class PaginatorComponent implements OnInit, OnDestroy {

paginator: Paginator;
resultsText: string;
subscriptions = [];

constructor() { }

ngOnInit(): void {
this.bindPaginator();
this.resultsText = "Results will be show here!";
}

ngOnDestroy() {
this.subscriptions.forEach(subscription => subscription.unsubscribe());
}

bindPaginator() {

this.paginator = new Paginator();

this.subscriptions.push(
this.paginator.getResults("Query 1")
.pipe(
map(result => `Updating the list with => ${result}`)
)
.subscribe(message =>
this.showResults(message)));

this.subscriptions.push(
this.paginator.getResults("Query 2")
.pipe(
map(result => `Hiding the loader after getting => ${result}`)
)
.subscribe(message =>
this.showResults(message)));

this.subscriptions.push(
this.paginator.getResults("Query 3")
.pipe(
map(result => `Doing any other update with => ${result}`)
)
.subscribe(message =>
this.showResults(message)));
}

loadInitialPage() {
this.queryData(1);
}

getNewPageByScrolling() {
this.queryData(5);
}

getNewPageManually() {
this.queryData(21);
}

private queryData(page: number) {
this.paginator.pushPage(page);
}

private showResults(result: string) {
this.resultsText = result;
}
}

Paginator component would be something like:


export class Paginator {

private page: Subject<number>;

constructor() {
this.page = new Subject();
}

getResults(query: string): Observable<string> {
return this.getPage()
.pipe(
flatMap(numberPage => of(`page: ${numberPage} Comment: ${query}`)),
share()
);
}

pushPage(numberPage: number) {
this.page.next(numberPage);
}

complete() {
this.page.complete();
}

private getPage(): Observable<number> {
return this.page.asObservable();
}
}

Using Subjects also allows some flexibility to be added to the design.

In the previous example, the reader will be able to appreciate how the subscription is made at the moment it is required, that is, in the execution of the method.

In the same way, in other scenarios, it can be done through the combination operators as already mentioned.

I hope this article is useful and has clarified doubts about the use of multiple Observables and Observers.

If you want more information on how to moderate the use of Subjects and use it only when necessary, I recommend reading the guide book:

The Clean Way to Use Rx. There you will also find the equivalent code of the example with RxSwift, RxJava.






Previous
Next Post »
Thanks for your comment