Handle asynchronous processes more elegantly with RxJS

Angular

Any webdeveloper who has worked with Angular as their front-end framework lately has probably encountered the RxJS (reactive extensions for javascript) library. This is because the angular http module is dependent on RxJS and this modules’ various methods for http put, post and patch requests return a RxJS ‘Observable’, which is the key component and concept of RxJS.

I was, until recently, used to working with promises to handle asynchronous Javascript operations. Although I was familiar with the observer design pattern in general, the way RxJS works and how to work with it confused me at first. I decided to write down my recent experience with RxJS in the form of this blog. This is partly to benefit my own learning process and partly hoping that it may aid that of the reader. Alternatively, I hope it will peak the readers’ interest in making use of RxJS when writing code.

RxJS turns out to be an elegant and useful tool when structuring javascript applications that need to handle asynchronous processes. As such, I feel it is useful for any javascript developer to take note of.

So what is a RxJS observable?

Let’s start off with the most basic example of handling a http get request may look like in an Angular app, taken straight from the angular.io documentation:

javascript
getConfigResponse(): Observable<HttpResponse<Config>> {
  return this.http.get<Config>(
    this.configUrl, { observe: 'response' });
}

Let’s try to put into words what this code is for:

This is a piece of typescript that shows a method getConfigResponse(), that returns an observable of a HttpResponse , that maps to a Config object’.  In this piece of code, this.http is the angular http client. If we look at the get() methods of this ‘class’ we can see that they return an observable object from the rxjs library.  And, courtesy of typescript, we can check out the typescript definitions of Observable ourselves. At the very top of the code in the Observable.d.ts interface it says the following about the Observable<T> class:

javascript
/**
 * A representation of any set of values over any amount of time. This is the most basic building block
 * of RxJS.
 * @class Observable<T>
*/

The official RxJS documentation describes an observable like this: “An observable represents the idea of an invokable collection of future values or events”. So we can infer that the Observable class is at the heart of RxJS and that this building block always functions as a representation, or wrapper, of something else. Moreover, it represents the things it wraps as values or events, that are bound to become available at some unknown time in the future.

Let’s explore some examples of RxJS observables used in code from an actual application!

In the interface definition of Observable we see the definitions of some methods:  subscribe() and  pipe() are the most interesting ones and will be discussed shortly. Apparently, a RxJS observable can be interacted with via a limited set of methods defined on the Observable class.

And how should I use this thing?

javascript
import { Observable } from 'rxjs/Rx';   

@Injectable()
export class AuthService {

...//other AuthService code

private userSettings: UserSettings;

...//other AuthService code

public getProfile(): Observable<UserSettings> {
        if (!this.userSettings) {
            return this.http.get<UserSettings>(this.config.userServiceUrl + '/users/usersettings', { withCredentials: true }).do(userSettings=> {
                this.userSettings= userSettings;
            });
        }
        return Observable.create(this.userSettings);
    }
}

This code fragment is already illustrative of the powerful things we can do with RxJS observables. Here, we return an observable of some user’s userprofile settings (note that userId is not handled via url in this specific case).  If there are no user settings in memory yet, we assign the object returned from the http response to the userSettings variable of the class, then return an observable of the user settings for further interaction. If instead there are user settings in memory already, we simply instantiate a new observable of the existing userSettings with Observable.create().

The latter is a very handy aspect of RxJS observables: You can wrap anything in the observable and subsequently program against that observable in a reactive manner. Thus easily mixing synchronous with asynchronous code.

The .do method is the other new concept in the above piece of code. This method is not present on Observable (that returns from this.http.get<UserSettings>()) itself. Rather this ‘operator’ gets imported from ‘rxjs/operators’. The official RxJS documentation describes pretty clearly what this specific operator is for: “Perform a side effect for every emission on the source Observable, but return an Observable that is identical to the source”.

RxJS has a large number of operators similar to .do, that can filter, buffer, delay, map (etc., etc.) your observables and always create a new observable as output. Observable.pipe() that I mentioned before is present on the Observable object itself, but otherwise functions more or less the same as an operator in that it returns a new observable based on the input of an existing one. The new Observable in the case of .pipe () is projected to from the old one, by passing through the functions defined in the pipe. I’ll refer to this external article for readers who want more info on piping observables.

Now let’s cover how to actually observe a RxJs Observable. This is where, gasp, observable.subscribe() comes into play. I’ll give a hypothetical example based on the code above for getProfile():

javascript
@Component
export class SomeViewComponent {

constructor (private authService: AuthService){
}

private userSettingsOnScreen(): void {
   this.authService.getProfile().subscribe(userSettings => {
   this.backGroundColor = userSettings.preferences.backGroundColor;
 });
}

The subscribe method in the above example takes a function as method parameter. This function kind of acts like an observer in a classic observer design pattern and in fact the RxJS documentation actually calls this function the ‘observer’ as well. This makes a lot of sense given that:

  • You can subscribe multiple of these functions to the same Observable
  • You can unsubscribe these methods from the Observable

The ‘observer’ function(s) can listen to only 3 kind of ‘events’ emitted by the Observable and those are:

‘onnext’: The Observable delivering it’s wrapped value (as in the above example).

‘onerror’: The Observable delivering an error result due to an unhandled exception.

‘oncompleted’: All onnext calls have been completed and the observer function has finished.

The onnext callback confused me for a while, but it makes more sense knowing you can do this (example taken from here):

javascript
public Listener(name: string): void {
  return {
    onNext: function (v) {
      console.log(name + ".onNext:", v)
    },
    onError: function (err) {
      console.log(name + ".onError:", err.message)
    },
    onCompleted: function () {
      console.log(name + ".onCompleted!")
    },
  }

let obs = Observable.of(1, 2)
obs.subscribe(Listener("obs"));

//output
obs.onNext: 1
obs.onNext: 2
obs.onCompleted!

Not only can we subscribe multiple times to a single Observable but, as the example above should clarify, we can also observe multiple things in sequence with the same Observable. Pretty nice! I'll round up this write-up with showing error handling in RxJS:

javascript
// Try three times with the retry operator, to get the data and then give up
var observableOfResponse = this.http.get('someurl').retry(2);

var subscription = observableOfResponse.subscribe(
  data => console.log(data),
  err => console.log(err)
);


// catch a potential error after 2 retries with the catch operator
var observableOfResponse = get('someurl').retry(2).catch(trySomethingElse());

var subscription = observableOfResponse.subscribe(
  data => {
    // Displays the data from the URL or cached data
    console.log(data);
  }
);

Here we see that we can either let the error (maybe a code 500?) happen like in the top example, or catch and manage it like in the bottom one. It’s pretty intuitive, once you know what operators are available. Note that in the first scenario an exception still gets thrown.

Rounding up

RxJS is an extensive subject and deserves more attention than I’m able to give in this little blog. I have tried in this article to condense a vast amount of information into a short overview that aimed to cover the core ideas of the library but at the same time be very practical:  The information above is enough to start working with it immediately. So rounding up, I’ll say good luck and have fun to anyone using RxJS in their front-end. I know I will.

Angular