rxjava 2 zip operator example in Android
The zip operator “combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function”
This is for adding and disposing observables. Observalbes should be cleared onPause or onDestroy of an Activity or Fragment in Android.
private final CompositeDisposable disposables = new CompositeDisposable();
This returns a function that returns an Observable of list of strings.
private Observable> getStrings(final String str1, final String str2) { return Observable.fromCallable(new Callable
>() { @Override public List
call() { //simulating a heavy duty computational expensive operation for (int i=0; i<1000000000; i++) {} Log.d("rxzip", Thread.currentThread().getName() + " " + str1 + " " + str2); List strings = new ArrayList<>(); strings.add(str1); strings.add(str2); return strings; } }); }
This returns a function which will combine the results from a zip operator in rxjava 2
private Function3, List
, List , List > mergeStringLists() { return new Function3 , List
, List , List >() { @Override public List apply(List strings, List strings2, List strings3) { Log.d("rxzip", "..."); for (String s : strings2) { strings.add(s); } for (String s : strings3) { strings.add(s); } return strings; } }; }
The zip operator in action. It puts the 6 strings into a single list of strings. There are 3 threads running, each combines 2 strings, and all of the are combined at the end in the mergeStringLists().
disposables.add(
Observable
.zip(getStrings("One", "Two").subscribeOn(Schedulers.newThread()),
getStrings("Three", "Four").subscribeOn(Schedulers.newThread()),
getStrings("Five", "Six").subscribeOn(Schedulers.newThread()),
mergeStringLists())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver>() {
@Override
public void onNext(List value) {
displayTvShows(value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
})
);
Disposes all observables in the onDestroy method
@Override
protected void onDestroy() {
super.onDestroy();
if (disposables != null) {
disposables.clear();
}
}
The rx zip operator in Kotlin
import android.os.Bundle
import android.support.v7.app.AppCompatActivity
import android.util.Log
import android.view.View
import io.reactivex.Observable
import kotlinx.android.synthetic.main.activity_main.*
import io.reactivex.disposables.CompositeDisposable
import java.util.concurrent.Callable
import io.reactivex.observers.DisposableObserver
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.schedulers.Schedulers
import io.reactivex.functions.Function3
import kotlinx.android.synthetic.main.content_main.*
class MainActivity : AppCompatActivity() {
private val disposables = CompositeDisposable()
private fun getStrings(str1: String, str2: String): Observable> {
return Observable.fromCallable(object : Callable> {
override fun call(): List {
for (i in 0..99999999) {
//simulating a heavy duty computational expensive operation
}
log(Thread.currentThread().name + " " + str1 + " " + str2)
val strings = ArrayList()
strings.add(str1)
strings.add(str2)
return strings
}
})
}
private fun mergeStringLists(): Function3, List, List, List> {
return Function3 { strings, strings2, strings3 ->
log("...")
strings as ArrayList
strings2.forEach({
strings.add(it)
})
strings3.forEach({
strings.add(it)
})
strings
}
}
private fun runRxZip() {
disposables.clear()
disposables.add(
Observable
.zip(getStrings("One", "Two").subscribeOn(Schedulers.newThread()),
getStrings("Three", "Four").subscribeOn(Schedulers.newThread()),
getStrings("Five", "Six").subscribeOn(Schedulers.newThread()),
mergeStringLists())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableObserver>() {
override fun onNext(value: List) {
progressbar.visibility = View.GONE
log("combined result: " + value.toString())
}
override fun onError(e: Throwable) {
e.printStackTrace()
progressbar.visibility = View.GONE
}
override fun onComplete() {
}
})
)
}
private fun log(text : String) {
runOnUiThread {
Log.d("rxzip", text)
tv_display.text = "${tv_display.text.toString()}\n$text"
}
}
private fun clearLog() {
tv_display.text = ""
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
setSupportActionBar(toolbar)
fab.setOnClickListener { view ->
clearLog()
progressbar.visibility = View.VISIBLE
runRxZip()
}
}
override fun onDestroy() {
super.onDestroy()
disposables?.clear()
}
}
Search within Codexpedia
Search the entire web