rxjava 2 zip operator example in Android

The zip operator “combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function”

This is for adding and disposing observables. Observalbes should be cleared onPause or onDestroy of an Activity or Fragment in Android.

private final CompositeDisposable disposables = new CompositeDisposable();

This returns a function that returns an Observable of list of strings.

private Observable<List<String>> getStrings(final String str1, final String str2) {
    return Observable.fromCallable(new Callable<List<String>>() {
        @Override
        public List<String> call() {
            //simulating a heavy duty computational expensive operation
            for (int i=0; i<1000000000; i++) {}
            Log.d("rxzip", Thread.currentThread().getName() + " " + str1 + " " + str2);
            List<String> strings = new ArrayList<>();
            strings.add(str1);
            strings.add(str2);
            return strings;
        }
    });
}

This returns a function which will combine the results from a zip operator in rxjava 2

private Function3<List<String>, List<String>, List<String>, List<String>> mergeStringLists() {
    return new Function3<List<String>, List<String>, List<String>, List<String>>() {
        @Override
        public List<String> apply(List<String> strings, List<String> strings2, List<String> strings3) {
            Log.d("rxzip", "...");

            for (String s : strings2) {
                strings.add(s);
            }

            for (String s : strings3) {
                strings.add(s);
            }

            return strings;
        }
    };
}

The zip operator in action. It puts the 6 strings into a single list of strings. There are 3 threads running, each combines 2 strings, and all of the are combined at the end in the mergeStringLists().

disposables.add(
        Observable
                .zip(getStrings("One", "Two").subscribeOn(Schedulers.newThread()),
                        getStrings("Three", "Four").subscribeOn(Schedulers.newThread()),
                        getStrings("Five", "Six").subscribeOn(Schedulers.newThread()),
                        mergeStringLists())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableObserver<List<String>>() {
                    @Override
                    public void onNext(List<String> value) {
                        displayTvShows(value);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                })
);

Disposes all observables in the onDestroy method

@Override
protected void onDestroy() {
    super.onDestroy();
    if (disposables != null) {
        disposables.clear();
    }
}

Search within Codexpedia

Custom Search

Search the entire web

Custom Search