rxjava 2 zip operator example in Android

The zip operator “combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function”

This is for adding and disposing observables. Observalbes should be cleared onPause or onDestroy of an Activity or Fragment in Android.

private final CompositeDisposable disposables = new CompositeDisposable();

This returns a function that returns an Observable of list of strings.

private Observable<List<String>> getStrings(final String str1, final String str2) {
    return Observable.fromCallable(new Callable<List<String>>() {
        @Override
        public List<String> call() {
            //simulating a heavy duty computational expensive operation
            for (int i=0; i<1000000000; i++) {}
            Log.d("rxzip", Thread.currentThread().getName() + " " + str1 + " " + str2);
            List<String> strings = new ArrayList<>();
            strings.add(str1);
            strings.add(str2);
            return strings;
        }
    });
}

This returns a function which will combine the results from a zip operator in rxjava 2

private Function3<List<String>, List<String>, List<String>, List<String>> mergeStringLists() {
    return new Function3<List<String>, List<String>, List<String>, List<String>>() {
        @Override
        public List<String> apply(List<String> strings, List<String> strings2, List<String> strings3) {
            Log.d("rxzip", "...");

            for (String s : strings2) {
                strings.add(s);
            }

            for (String s : strings3) {
                strings.add(s);
            }

            return strings;
        }
    };
}

The zip operator in action. It puts the 6 strings into a single list of strings. There are 3 threads running, each combines 2 strings, and all of the are combined at the end in the mergeStringLists().

disposables.add(
        Observable
                .zip(getStrings("One", "Two").subscribeOn(Schedulers.newThread()),
                        getStrings("Three", "Four").subscribeOn(Schedulers.newThread()),
                        getStrings("Five", "Six").subscribeOn(Schedulers.newThread()),
                        mergeStringLists())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableObserver<List<String>>() {
                    @Override
                    public void onNext(List<String> value) {
                        displayTvShows(value);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                })
);

Disposes all observables in the onDestroy method

@Override
protected void onDestroy() {
    super.onDestroy();
    if (disposables != null) {
        disposables.clear();
    }
}

The rx zip operator in Kotlin

import android.os.Bundle
import android.support.v7.app.AppCompatActivity
import android.util.Log
import android.view.View
import io.reactivex.Observable
import kotlinx.android.synthetic.main.activity_main.*
import io.reactivex.disposables.CompositeDisposable
import java.util.concurrent.Callable
import io.reactivex.observers.DisposableObserver
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.schedulers.Schedulers
import io.reactivex.functions.Function3
import kotlinx.android.synthetic.main.content_main.*

class MainActivity : AppCompatActivity() {

    private val disposables = CompositeDisposable()

    private fun getStrings(str1: String, str2: String): Observable<List<String>> {
        return Observable.fromCallable(object : Callable<List<String>> {
            override fun call(): List<String> {
                for (i in 0..99999999) {
                    //simulating a heavy duty computational expensive operation
                }
                log(Thread.currentThread().name + " " + str1 + " " + str2)
                val strings = ArrayList<String>()
                strings.add(str1)
                strings.add(str2)
                return strings
            }
        })
    }

    private fun mergeStringLists(): Function3<List<String>, List<String>, List<String>, List<String>> {
        return Function3 { strings, strings2, strings3 ->
            log("...")
            strings as ArrayList<String>
            strings2.forEach({
                strings.add(it)
            })

            strings3.forEach({
                strings.add(it)
            })

            strings
        }
    }

    private fun runRxZip() {
        disposables.clear()
        disposables.add(
                Observable
                        .zip(getStrings("One", "Two").subscribeOn(Schedulers.newThread()),
                                getStrings("Three", "Four").subscribeOn(Schedulers.newThread()),
                                getStrings("Five", "Six").subscribeOn(Schedulers.newThread()),
                                mergeStringLists())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribeWith(object : DisposableObserver<List<String>>() {
                            override fun onNext(value: List<String>) {
                                progressbar.visibility = View.GONE
                                log("combined result: " + value.toString())
                            }

                            override fun onError(e: Throwable) {
                                e.printStackTrace()
                                progressbar.visibility = View.GONE
                            }

                            override fun onComplete() {

                            }
                        })
        )
    }

    private fun log(text : String) {
        runOnUiThread {
            Log.d("rxzip", text)
            tv_display.text = "${tv_display.text.toString()}\n$text"
        }
    }

    private fun clearLog() {
        tv_display.text = ""
    }

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        setSupportActionBar(toolbar)

        fab.setOnClickListener { view ->
            clearLog()
            progressbar.visibility = View.VISIBLE
            runRxZip()
        }
    }

    override fun onDestroy() {
        super.onDestroy()
        disposables?.clear()
    }

}

Complete example in Github

Search within Codexpedia

Custom Search

Search the entire web

Custom Search