ReactiveX

So, you have heard about RxJava. You see it ever so frequently in the wild. But whenever you try to wrap your head around it, it hurts. Here's what the Github page says about RxJava:

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

That's a mouthful. Let's try to break it down. RxJava is an implementation of ReactiveX. In the most simple terms, the ReactiveX Observable model allows you to treat streams of asynchronous events with the same sort of simple, composable operations that you use for collections of data items like arrays. No more tangled webs of callbacks.

When building a frontend application, most of your tasks involve reacting to user interactions and doing something in response to them. It's a perfect use case for RxJava. Let's take the example of a simple log in flow. Here are the events that you usually need to take care of:

  • Make a backend call to authenticate the user when he clicks login.
  • On success, get user's info from the backend.
  • Handle errors and show messages on UI.
BackendService.login(email, password)  
  .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
  .flatMap({ response ->
    if (response.isSuccess) {
      [email protected] BackendService.userAccount().subscribeOn(Schedulers.io())
    }
    handleError(response)
    Observable.error<Response<User>>(Exception("Failed to get access token"))
  })
  .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
  .subscribe({ response ->
    if (response.isSuccess) {
      onUserLoginSuccess(user, device)
    } else {
      handleError(response)
    }
  }, { throwable ->
    handleError()
  })

Let's break this up.

  1. .login makes the actual call to the backend to authenticate our user. You could use any implementation that you prefer (I have used Retrofit that plays really nice with RxJava) that returns an Observable. The implementation itself doesn't need to worry about the thread it should perform the task on.
  2. .subscribeOn instructs the observable to perform the task on a IO thread that performs the task in the background.
  3. .observeOn instructs which thread the Observer will be observed upon. This is so that we can perform the UI tasks on the main thread.

If the login succeeds, it calls flatMap with an item of the observable. flatMap returns another observable which could then be subscribed and observed again using chaining. This is where we make another request to the backend if our login was successful.

Finally, with .subscribe, we start listening to the results from this Observable chain. The first argument to subscribe is invoked for each successful result of the Observable. If the Observable fails with an error, the second function is called instead with an Exception.

Let's assume we did some changes to the UI when the login failed. Now we would like to clear UI errors when text fields are edited. RxJava makes it very simple (with RxBinding that converts UI interaction into RxJava Observables):

Observable.merge(emailEditText.textChanges(), passwordEditText.textChanges())  
  .subscribe { 
    updateButton(Progress.NORMAL) 
  }

We simply get the .textChanges Observables for the edit texts, merge them into one Observable (which means that the subscribe will be called for events from either of these two Observables) and then reset the UI inside subscribe.

This is a very simple example of how to manage simple UI flows in your application with RxJava. It requires a bit of getting used to, but it's well worth the effort. It helps write maintainable code that is easy to read or refactor at a later stage. If at any point, we want to add another call to the chain, we would simply need to add another flatMap operator in the Observable chain.

This is supposed to be a very simple primer to RxJava and in no way even close to the covering the myriad of operators available at your convenience.