Cumulations Logo

While developing an Android application I was introduced to RxJava, a reactive programming pattern in Java for composing asynchronous and event-based programs by using observable sequences. RxJava uses the Observer pattern to deliver asynchronous data stream.

RxJava allows us to represent anything as an asynchronous data stream that can be created and consumed on any thread. This helps us to avoid nested callbacks with clean, concise and readable code.

The basic building blocks of reactive code or observer pattern are Observables and Subscribers. An Observable emits items; a Subscriber consumes those items.
An Observable may emit any number of items (including zero), then it terminates either by successfully completing or due to an error. For each Subscriber, it has, an Observable calls Subscriber.onNext() any number of times (depending on the number of items it emits), followed by either Subscriber.onComplete() or Subscriber.onError().

Let’s understand how RxJava works with a simple retrofit example by converting callback into subscriber methods.

Public interface Apiservice {
@GET("/user/{id}/details")
Observable
getUserDetails(@Path("id") int id); 
}

 

Creating Observable

Observable

myObservable = apiservice.getUserDetails(userId) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedules.mainThread());

Our Observable emits details then completes. Now let's create a Subscriber to consume the data.

RxJava subscriber

Subscriber

mySubscriber = new Subscriber

() { @Override public void onNext(Details details) { } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } }; myObservable.subscribe(mySubscriber);

 

The above code can be simplified to handle only onNext(), onError() and avoid onCompleted() as below

 

Simplified RxJava subscriber

apiservice.getUserDetails(userId)
    .subscribeOn(Schedulers.newThread)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1

() { @Override public void call(Details details) { } }, new Action1(){ @Override public void call(Throwable throwable) { } });

 

Now we have understood how reactive programming works, let's make things a bit more interesting. Consider a use case where we need to display data received from the response of 2 APIs as described below.

API 1. getUserDetails() gets the details about the user.
API 2. getUserPhotos() gets the list of photos uploaded by a user.

The above behaviour can be achieved by using RxJava zip operator. This allows to fire more than one API at a time and emit responses only when each of the response is available.

The zip method is interesting merge feature. The zip operator takes the combination of observable and brings together emitted items and allows you to compose a result from results of different observable.

Example:
class Combined {
UserDetails details;
  	List photos;
}


Observable

userDetails = api.getUserDetails((userId) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()); Observable> userPhotos = api.getUserPhotos(userId) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()); Observable.zip(userDetails, userPhotos , new Func2, Combined>() { @Override public Combined call(UserDetails details, List photos) { Combined r = new Combined(); r.details = details; r.photos = photos; return r; } }).subscribe(new Action1() { @Override public void call(Combined combined) { Log.d(TAG, "!inside onNext"); }, new Action1() { @Override public void call(Throwable throwable) { Log.d(TAG, "!inside onError"); } });

In the above code, we have received the combined response from 2 APIs inside a subscriber. The main disadvantage of zip operator is if any one of the API fails or if any observable doesn't emit data, onError() is called. To recover from error notifications, you can use any of the below few error handling operators of Observable.

ionErrorResumeNext( ) — if Observable encounters error then this function is handled to return a new Observable so that new observable will emit sequence of item and onNext(), onComplete() is invoked.

Example:
 Observable
.onErrorResumeNext(new Func1>() {
@Override
public Observable call(Throwable throwable) {
return Observable;      //return observable which emit the sequence of original observable type.
 } }) 
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
 

ii. onErrorReturn( ) — if Observable encounter error, then this function is used to return particular item after which onNext(), onComplete() is invoked.

Example:					
Observable
.onErrorReturn(new Func1() {
@Override
public T call(Throwable throwable) {
return t;
}}) 
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);

iii. retry( ) — if Observable encounters error, this function re-subscribes to the same observable.


Example:
 observable
.retry() 
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);