rxjava 2 zip operator example in Android
The zip operator “combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function”
This is for adding and disposing observables. Observalbes should be cleared onPause or onDestroy of an Activity or Fragment in Android.
private final CompositeDisposable disposables = new CompositeDisposable();
This returns a function that returns an Observable of list of strings.
private Observable> getStrings(final String str1, final String str2) { return Observable.fromCallable(new Callable
>() { @Override public List
call() { //simulating a heavy duty computational expensive operation for (int i=0; i<1000000000; i++) {} Log.d("rxzip", Thread.currentThread().getName() + " " + str1 + " " + str2); List strings = new ArrayList<>(); strings.add(str1); strings.add(str2); return strings; } }); }
This returns a function which will combine the results from a zip operator in rxjava 2
private Function3, List
, List , List > mergeStringLists() { return new Function3 , List
, List , List >() { @Override public List apply(List strings, List strings2, List strings3) { Log.d("rxzip", "..."); for (String s : strings2) { strings.add(s); } for (String s : strings3) { strings.add(s); } return strings; } }; }
The zip operator in action. It puts the 6 strings into a single list of strings. There are 3 threads running, each combines 2 strings, and all of the are combined at the end in the mergeStringLists()
.
disposables.add( Observable .zip(getStrings("One", "Two").subscribeOn(Schedulers.newThread()), getStrings("Three", "Four").subscribeOn(Schedulers.newThread()), getStrings("Five", "Six").subscribeOn(Schedulers.newThread()), mergeStringLists()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver>() { @Override public void onNext(List
value) { displayTvShows(value); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }) );
Disposes all observables in the onDestroy method
@Override protected void onDestroy() { super.onDestroy(); if (disposables != null) { disposables.clear(); } }
The rx zip operator in Kotlin
import android.os.Bundle import android.support.v7.app.AppCompatActivity import android.util.Log import android.view.View import io.reactivex.Observable import kotlinx.android.synthetic.main.activity_main.* import io.reactivex.disposables.CompositeDisposable import java.util.concurrent.Callable import io.reactivex.observers.DisposableObserver import io.reactivex.android.schedulers.AndroidSchedulers import io.reactivex.schedulers.Schedulers import io.reactivex.functions.Function3 import kotlinx.android.synthetic.main.content_main.* class MainActivity : AppCompatActivity() { private val disposables = CompositeDisposable() private fun getStrings(str1: String, str2: String): Observable> { return Observable.fromCallable(object : Callable
> { override fun call(): List
{ for (i in 0..99999999) { //simulating a heavy duty computational expensive operation } log(Thread.currentThread().name + " " + str1 + " " + str2) val strings = ArrayList () strings.add(str1) strings.add(str2) return strings } }) } private fun mergeStringLists(): Function3 , List
, List , List > { return Function3 { strings, strings2, strings3 -> log("...") strings as ArrayList strings2.forEach({ strings.add(it) }) strings3.forEach({ strings.add(it) }) strings } } private fun runRxZip() { disposables.clear() disposables.add( Observable .zip(getStrings("One", "Two").subscribeOn(Schedulers.newThread()), getStrings("Three", "Four").subscribeOn(Schedulers.newThread()), getStrings("Five", "Six").subscribeOn(Schedulers.newThread()), mergeStringLists()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(object : DisposableObserver >() { override fun onNext(value: List
) { progressbar.visibility = View.GONE log("combined result: " + value.toString()) } override fun onError(e: Throwable) { e.printStackTrace() progressbar.visibility = View.GONE } override fun onComplete() { } }) ) } private fun log(text : String) { runOnUiThread { Log.d("rxzip", text) tv_display.text = "${tv_display.text.toString()}\n$text" } } private fun clearLog() { tv_display.text = "" } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) setSupportActionBar(toolbar) fab.setOnClickListener { view -> clearLog() progressbar.visibility = View.VISIBLE runRxZip() } } override fun onDestroy() { super.onDestroy() disposables?.clear() } }
Search within Codexpedia
Search the entire web