Processing Multiple Observables

angular, rxjs, programming

If you have used Angular for frontend development, you have definitely dealt with RxJS Observables. These seemingly magic async values can be hard to process, especially if you are not experienced with functional programming.

In this post, we will try to process a service that returns an Observable<number[]>, and use each of the responses to call two more services to further process our data.

Modeling the process

We will use marble diagrams to model our process. I have used Swirly to generate the following diagrams. Please note that exact timing is not that important for our problem, so lines from left to right do not represent the timings accurately.

Modeling calls

Initially, we will have an observable that emits an array of values. Since a concrete example makes more sense to me, let’s make it return the array [1, 2, 3]. We will call this function getValues().

Observable that emits an array

Our initial observable, which returns an Observable<number[]>

We also need two processing functions, which both will have Observable<number> return values. Let’s create a function that doubles the input and one that triples the input. We will call these functions doubleValue() and tripleValue() respectively.

Function that outputs double of the input as an observable

doubleValue() function, that doubles the input and returns the output as an Observable<number>

Function that outputs triple of the input as an observable

tripleValue() function, that triples the input and returns the output as an Observable<number>

Note that these functions do not directly map a value to another. These functions return Observables, which aim to simulate external calls for processing values.

Modeling overall process

Now, let’s put everything together to model the whole process end to end. This will help us get a better overview of how everything will work together.

Overall modeling of the problem

Initial modeling of the overall problem

We need some “magic” functions that can convert our state from one to the other, which are marked with ???. We first need a way to convert the array to single values, so we can pipe them all together to get two values for each number. Another thing to keep in mind is that doubleValue() and tripleValue() functions spawn new Observables, so we get an Observable of Observables, concisely a “Higher-order Observable”.

Finding the magic functions

There are a lot of RxJS operators that may help us process Observables. We can flatten, process, or do even more things to help us shape our pipe.

Mapping each value with map

map operator takes an input Observable and maps each value to a different one, similar to the Array.prototype.map() function. A thing to keep in mind is that the input and the output will both be Observables.

Marble diagram of the map operator

Marble diagram of the map operator

Flattening values with mergeAll

mergeAll operator flattens a higher-order Observable into a single Observable. But there is also another not well-documented functionality of mergeAll, which is flattening an Observable<T[]> into Observable<T>s. The following marble diagrams will explain these functionalities better.

Marble diagram of the mergeAll operator for flattening higher-order observables

Using the mergeAll operator for flattening higher-order Observables

Marble diagram of the mergeAll operator for flattening an array

Using the mergeAll operator for flattening an Observable<number[]>

Converting an array with forkJoin

forkJoin function converts an Observable<T>[] into an Observable<T[]>. It also works similarly with objects as it works with arrays. It is different than mergeAll, as it only returns the combination of the latest values as an array or an object.

Marble diagram of the forkJoin function

Using the forkJoin function to get a value from an array of Observables

All three of these are quite important for processing various kinds of Observables. We will use these (and a bonus one) for solving our problem.

A better model for our process

Now that we know what the magic functions might be, we can create our initial solution for the problem.

Initial solution of the problem

Our initial solution for the problem

It works! But we aren’t yet finished — we can still do better! Since the map and the mergeAll functions are used together so frequently, there is another function called mergeMap which combines both functionalities. Our final solution now looks like the image below.

Final solution of the problem

Our final solution for the problem

Nice and concise! I hope this tutorial helped you be more comfortable with the RxJS operators. Don’t forget to check out other operators like switchMap, concatMap, exhaustMap and others.

If you are more into code rather than diagrams, you can check out this gist for the code for this solution. Note that some taps are used for logging the current state without affecting any input or output values. Good luck with handling Observables!