Wrapping Existing Code With RxJava

We are using RxJava in Android a lot, with good reasons. However, we still need to use code that is not built with RxJava, so let’s wrap them.

Synchronous APIs

For very simple synchronous APIs, you can use Observable.just() to wrap them. e.g. you can use Observable.just(1, 2, 3, 4, 5) to emit an integer sequence from 1 to 5.

If the API is blocking, you can use Observable.defer() to wrap them:

Observable<String> observable = Observable.defer(
  new Func0<Observable<String>>() {
    public Observable<String> call() {
      try {
        return Observable.just(getStringBlocking());
      } catch (Exception e) {
        return Observable.error(e);

And if you want RxJava to do try...catch for you, you can use Observable.fromCallable():

Observable<String> observable = Observable.fromCallable(
  new Callable<String>() {
    public String call() throws Exception {
      return getStringBlocking();

Asynchronous APIs

Usually, existing code uses callbacks to support asynchronous operations, e.g. in Android we can request location updates like this:

  LocationManager.GPS_PROVIDER, 1000L, 10.0F,
  new LocationListener() {
    public void onLocationChanged(Location location) {


For advanced RxJava users, who don’t need to read this article, you can use Observable.create() to wrap it, and fulfill the contract by yourself. But for us, we can easily use Observable.fromEmitter() to handle the case, and let the framework to help us:

Observable<Location> observable = Observable.fromEmitter(
  new Action1<AsyncEmitter<Location>>() {
    public void call(final AsyncEmitter<Location> emitter) {
      final LocationListener locationListener = new LocationListener() {
        public void onLocationChanged(Location location) {
          // emits location


      emitter.setCancellation(new AsyncEmitter.Cancellable() { 
        public void cancel() throws Exception {
          // stops location updates when unsubscribed

        1000L, 10.0F, locationListener);

      // if you also emit onError() or onComplete(),
      // the framework will make sure the Observable
      // contract is fullfilled
     // let the framework to worry about backpressure
  }, AsyncEmitter.BackpressureMode.BUFFER);

Happy coding!

