Skip to content
DeveloperMemos

How to Use RxJava's zip Operator (Kotlin)

RxJava, Kotlin, Observables1 min read

RxJava's zip operator is a powerful tool for combining emissions from multiple Observables into a single stream of emissions. It achieves this by applying a user-defined function to corresponding items emitted by each Observable.

This article will guide you through using zip in Kotlin, providing clear explanations and examples.

Understanding zip

  • Combining Observables: zip takes two or more Observables as arguments. It waits for each Observable to emit at least one item before combining them.
  • Matching Emissions: zip pairs corresponding emissions from each Observable based on their order of emission. The first emitted item from each Observable is used for the first combination, and so on.
  • Zipping Function: You provide a function (zipFunction) that defines how to combine the matched emissions. This function takes the corresponding items from each Observable as arguments and returns a single value.
  • Completion and Errors: zip completes only when all source Observables complete successfully. If any Observable emits an error, zip propagates the error immediately to its downstream subscriber, terminating the stream.

Using zip (Examples)

1. Combining Two Observables:

1val observable1 = Observable.just(1, 2, 3)
2val observable2 = Observable.just("a", "b", "c")
3
4val zippedObservable = observable1.zipWith(observable2, { num, letter -> "$num$letter" })
5
6zippedObservable.subscribe { combined -> println(combined) } // Output: 1a, 2b, 3c

This example combines two Observables emitting numbers and letters, respectively. The zipFunction creates a string by concatenating each number with its corresponding letter.

2. Combining More Than Two Observables:

1val observable3 = Observable.interval(0, 1, TimeUnit.SECONDS) // Emits every second
2
3val combinedObservable = Observable.zip(
4 observable1,
5 observable2,
6 observable3,
7 { num, letter, time -> "[$time] $num$letter" }
8)
9
10combinedObservable.subscribe { combined -> println(combined) }
11// Output: [0] 1a, [1] 2b, [2] 3c (may vary slightly due to timing)

This example demonstrates zip with three Observables. It combines the number and letter emissions with a timestamp from the interval Observable, creating a formatted output string.

3. Handling Incomplete Emissions or Errors:

1val incompleteObservable = Observable.just(1, 2) // Only emits two items
2
3zippedObservable = observable1.zipWith(incompleteObservable, { num, letter -> "$num$letter" },
4 { throwable -> "Error: $throwable" })
5
6zippedObservable.subscribe { combined -> println(combined) }
7// Output: 1a, 2b (no further emissions due to incomplete Observable)
8
9val errorObservable = Observable.error(Exception("Oops!"))
10
11zippedObservable = observable1.zipWith(errorObservable, { num, letter -> "$num$letter" })
12
13zippedObservable.subscribe { combined -> println(combined) } // Won't reach here due to error
14 .onErrorResumeNext { throwable -> Observable.just("Error: $throwable") }
15 .subscribe { combined -> println(combined) } // Output: Error: java.lang.Exception: Oops!

Here, we explore scenarios where one Observable might not emit enough items or an error occurs. The first example demonstrates handling an incomplete Observable using a custom function to handle the case where letter might be null. The second example shows how onErrorResumeNext can be used to recover from an error by emitting a new Observable with an error message.

Key Points:

  • zip is ideal for combining emissions that have a meaningful relationship and need to be processed together.
  • Consider error handling strategies when using zip to ensure your application behaves gracefully in case of issues with source Observables.

By effectively utilizing zip, you can streamline your RxJava code and create more complex data processing pipelines.