Topic: 12 Understanding ReactiveX in Android

This is our twelfth topic from learn android from basic to advance series

Topic: 12 Understanding ReactiveX in Android

Hello devs, In our previous topic, we talk about Coroutines, right? So in this blog, we talk about ReactiveX. This is also used for performing and handling async tasks like coroutines. Let's explore the ReactiveX.

ReactiveX

ReactiveX is a library for composing asynchronous and event-based programs using observable sequences. These sequences can represent any data stream and can be transformed, filtered, and manipulated using a rich set of operators. ReactiveX, commonly known as Rx.

Rx is not only for the Android platform or Java and Kotlin languages.

Rx Supported Languages:

Rx Supported Platform and framework:

In this blog, we talk about RxJava, RxKotlin, and RxAndroid. Alright, devs let's explore this one by one.

Imagine an app where there’s a button on the screen. Clicking the button changes the color of the background. This is an event-based application. Clicking on the button triggers a method that changes the color of the background, hence reacting to the user input(the click).

What makes Reactive Programming different is anything can be observed and reacted to. An input, data change, event, click, network request(called streams).. anything! They can be merged, filtered, mapped, or even react to each other. All this can be done in some other thread very easily.

RxJava

RxJava introduces the concept of reactive programming, where data streams are treated as first-class citizens. Its main building blocks are Observables, Operators, and Observers also called triple O’s. An observable represents a sequence of data or events over time, while an observer subscribes to these sequences and reacts to emitted items.

Here are some reasons I can say about why RxJava is commonly used in Android development:

  1. Asynchronous programming: In Android development, asynchronous programming is crucial to avoid blocking the UI thread and keeping the app responsive. RxJava provides an easy and efficient way to handle asynchronous operations using Observable and Subscriber.

  2. Data streams: RxJava provides a way to handle data streams in a reactive way, making it easy to handle events such as user input or network responses. This allows developers to easily chain together multiple operations, such as filtering, mapping, and combining data streams.

  3. Error handling: RxJava provides an easy way to handle errors and exceptions that occur during data processing or network operations. By using operators such as onErrorResumeNext or onErrorReturn, developers can gracefully handle errors and continue processing data streams.

  4. Testing: RxJava provides a way to test reactive code using TestSubscriber and TestObserver. This allows developers to write unit tests for reactive code, making it easier to identify and fix bugs.

Alright, devs let's talk about triple O’s. Think of it this way, The Observable is the Speaker, the Operator is the Translator, and the Observer is the Listener.

Types of Observables in RxJava

  1. Observable

    Represents a stream of data that can emit zero or more items, and optionally terminates with either a completion or an error. Observable is commonly used when dealing with asynchronous data streams or events that can emit multiple items.

     import io.reactivex.rxjava3.core.Observable
    
     fun main() {
         // Create an observable that emits a sequence of movie titles
         Observable.just("Titanic", "Conjuring", "Cars")
             .subscribe(
                 // onNext: Action to take when an item is emitted
                 { item -> println("Latest item is $item") },
                 // onError: Action to take if an error occurs
                 { error -> println("Error is $error") },
                 // onComplete: Action to take when the observable completes
                 { println("Task got completed") }
             )
     }
    

    In this example:

    • We import the Observable class from the RxJava library.

    • We create an Observable using the just() method, which emits the specified movie titles ("Titanic", "Conjuring", "Cars") and then completes.

    • We subscribe to the Observable using the subscribe() method, passing three lambda expressions:

      • The first lambda expression defines what to do when an item is emitted. Here, we print each emitted movie title.

      • The second lambda expression defines what to do if an error occurs during emission.

      • The third lambda expression defines what to do when the Observable completes successfully.

    • When the program runs, the Observable emits each movie title in the sequence ("Titanic", "Conjuring", "Cars"), and we print each title. Finally, the program prints "Task got completed" to indicate that the Observable has completed its emission.

  2. SingleRepresents a stream that emits a single item or an error.

    • Subscribers of a Single can handle the emitted item via the onSuccess method or handle an error via the onError method.

    • Single is useful when you expect only a single result from an operation, such as making a network request and receiving a response.

        import io.reactivex.rxjava3.core.Single
      
        fun main() {
            // Create a Single observable that emits a single item
            Single.just("Example of Single Observable")
                .subscribe(
                    // onSuccess: Action to take when the item is successfully emitted
                    { result -> println("Result is $result") },
                    // onError: Action to take if an error occurs during emission
                    { error -> println("Error is $error") }
                )
        }
      

      In this example:

      • We import the Single class from the RxJava library.

      • We create a Single observable using the just() method, which emits a single item ("Example of Single Observable").

      • We subscribe to the Single observable using the subscribe() method, passing two lambda expressions:

        • The first lambda expression defines what to do when the item is successfully emitted. Here, we print the emitted item.

        • The second lambda expression defines what to do if an error occurs during emission. Here, we print the error message.

      • When the program runs, the Single observable emits the single item, and we print it. If an error occurs during emission, we print the error message.

  3. MaybeRepresents a stream that can emit either a single item, no item at all, or terminate with an error.

    • Subscribers of a Maybe can handle the emitted item via the onSuccess method, handle the absence of an item via the onComplete method, or handle an error via the onError method.

    • Maybe is suitable when you need to handle situations where the result may or may not be present.

        import io.reactivex.rxjava3.core.Maybe
      
        fun main() {
            // Create a Maybe observable that emits a single optional item
            Maybe.just("Example of Maybe Observable")
                .subscribe(
                    // onSuccess: Action to take when the item is successfully emitted
                    { result -> println("Result is $result") },
                    // onError: Action to take if an error occurs during emission
                    { error -> println("Error is $error") },
                    // onComplete: Action to take when the observable completes without emitting an item
                    { println("Completed the task") }
                )
        }
      

      In this example:

      • We import the Maybe class from the RxJava library.

      • We create a Maybe observable using the just() method, which emits a single optional item ("Example of Maybe Observable").

      • We subscribe to the Maybe observable using the subscribe() method, passing three lambda expressions:

        • The first lambda expression defines what to do when the item is successfully emitted. Here, we print the emitted item.

        • The second lambda expression defines what to do if an error occurs during emission. Here, we print the error message.

        • The third lambda expression defines what to do when the observable completes without emitting an item. Here, we print a completion message.

      • When the program runs, the Maybe observable emits the single optional item, and we print it. If an error occurs during emission, we print the error message. If the observable completes without emitting an item, we print a completion message.

  4. CompletableRepresents an asynchronous operation that either completes successfully without emitting any item or terminates with an error.

    • Subscribers of a Completable only need to handle the completion event or an error, as it does not emit any items.

    • Completable is useful when you are only interested in the completion status of an operation, rather than receiving data.

        import io.reactivex.rxjava3.core.Completable
      
        fun main() {
            // Create a Completable observable
            Completable.create { completableEmitter ->
                // Emit a completion event
                completableEmitter.onComplete()
      
                // Emit an error event
                completableEmitter.onError(Exception("Something wrong!!"))
            }
            .subscribe(
                // onComplete: Action to take when the Completable completes
                { println("Completable completed") },
                // onError: Action to take if an error occurs during emission
                { error -> println("Error is $error") }
            )
        }
      

      In this example:

      • We import the Completable class from the RxJava library.

      • We create a Completable observable using the create() method, passing a lambda that takes a CompletableEmitter as a parameter.

      • Inside the lambda, we manually emit a completion event using onComplete() and an error event using onError(Exception("Something wrong!!")).

      • We subscribe to the Completable observable using the subscribe() method, passing two lambda expressions:

        • The first lambda expression defines what to do when the Completable completes. Here, we print a completion message.

        • The second lambda expression defines what to do if an error occurs during emission. Here, we print the error message.

      • When the program runs, the Completable observable emits either a completion event or an error event, and we handle them accordingly.

  5. Flowable

    Similar to Observable, but designed to handle backpressure, which is a mechanism to handle situations where an Observable is emitting data faster than the subscriber can consume.

    • Flowable supports asynchronous data streams that can emit zero or more items, and optionally terminate with either a completion or an error.

    • It provides additional operators for dealing with backpressure and controlling the flow of data between the producer and the consumer.

        import io.reactivex.rxjava3.core.Flowable
      
        fun main() {
            // Create a Flowable observable that emits a single item
            Flowable.just("Example of Flowable Observable")
                .subscribe(
                    // onNext: Action to take when an item is emitted
                    { result -> println("Result is $result") },
                    // onError: Action to take if an error occurs during emission
                    { error -> println("Error is $error") },
                    // onComplete: Action to take when the observable completes
                    { println("Completed the task") }
                )
        }
      

      In this example:

      • We import the Flowable class from the RxJava library.

      • We create a Flowable observable using the just() method, which emits a single item ("Example of Flowable Observable").

      • We subscribe to the Flowable observable using the subscribe() method, passing three lambda expressions:

        • The first lambda expression defines what to do when an item is emitted. Here, we print the emitted item.

        • The second lambda expression defines what to do if an error occurs during emission. Here, we print the error message.

        • The third lambda expression defines what to do when the observable completes successfully. Here, we print a completion message.

      • When the program runs, the Flowable observable emits a single item, and we print it. If an error occurs during emission, we print the error message. If the observable completes successfully, we print a completion message.

Types of Operators

  1. Just

    The just operator in RxJava is used to create an Observable that emits the specified items one by

     import io.reactivex.rxjava3.core.Observable
    
     fun main() {
         // Create an Observable using the just operator
         val observable = Observable.just("Apple", "Banana", "Orange")
    
         // Subscribe to the Observable and define actions for emitted items
         observable.subscribe(
             { item -> println("Received item: $item") }, // onNext: Action to take when an item is emitted
             { error -> println("Error: $error") }, // onError: Action to take if an error occurs
             { println("Observable completed") } // onComplete: Action to take when the Observable completes
         )
     }
    
  2. map()

    The map() operator in RxJava transforms the items emitted by an Observable by applying a function to each item. It's commonly used to convert one type of data into another or to perform some computation on the emitted items.

     import io.reactivex.rxjava3.core.Observable
    
     fun main() {
         // Create an Observable that emits a sequence of numbers
         val observable = Observable.just(1, 2, 3, 4, 5)
    
         // Apply the map operator to double each emitted number
         val mappedObservable = observable.map { number -> number * 2 }
    
         // Subscribe to the mapped Observable and print the transformed items
         mappedObservable.subscribe { doubledNumber ->
             println("Transformed item: $doubledNumber")
         }
     }
    

    In this example:

    • We import the Observable class from the RxJava library.

    • We create an Observable using the just operator, emitting the numbers 1, 2, 3, 4, and 5.

    • We apply the map operator to the Observable, which doubles each emitted number by multiplying it by 2.

    • We subscribe to the mapped Observable and define an action to take when an item is emitted. Inside the lambda expression, we print each transformed item.

    • When the program runs, the original Observable emits each number in the sequence (1, 2, 3, 4, 5), and the map operator doubles each emitted number. Finally, we print each transformed item (2, 4, 6, 8, 10).

  3. flatMap()

    The flatMap() operator in RxJava is used to transform the items emitted by an Observable into Observables, and then flatten the emissions into a single Observable. It's particularly useful for scenarios where each emitted item needs to be mapped to another Observable, and you want to merge the emissions of all these Observables into one.

     import io.reactivex.rxjava3.core.Observable
    
     fun main() {
         // Create an Observable that emits a sequence of numbers
         val observable = Observable.just(1, 2, 3)
    
         // Apply the flatMap operator to transform each number into an Observable emitting its square and cube
         val transformedObservable = observable.flatMap { number ->
             Observable.just("Square of $number: ${number * number}", "Cube of $number: ${number * number * number}")
         }
    
         // Subscribe to the transformed Observable and print the emitted items
         transformedObservable.subscribe { transformedItem ->
             println(transformedItem)
         }
     }
    

    In this example:

    • We import the Observable class from the RxJava library.

    • We create an Observable using the just operator, emitting the numbers 1, 2, and 3.

    • We apply the flatMap operator to the Observable. Inside the lambda expression, we transform each number into an Observable that emits strings representing the square and cube of that number.

    • The emissions from all the inner Observables are flattened into a single Observable.

    • We subscribe to the transformed Observable and define an action to take when an item is emitted. Inside the lambda expression, we print each transformed item, which includes both the square and cube of each number.

    • When the program runs, the original Observable emits each number in the sequence (1, 2, 3), and the flatMap operator transforms each number into two strings representing its square and cube. Finally, we print all the transformed items.

  4. filter()

    The filter() operator in RxJava is used to selectively emit items emitted by an Observable based on a predicate function. It allows you to filter out items that don't meet certain criteria, letting only the desired items pass through.

     import io.reactivex.rxjava3.core.Observable
    
     fun main() {
         // Create an Observable that emits a sequence of numbers
         val observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
         // Apply the filter operator to emit only even numbers
         val filteredObservable = observable.filter { number ->
             number % 2 == 0 // Filter condition: emit only even numbers
         }
    
         // Subscribe to the filtered Observable and print the emitted items
         filteredObservable.subscribe { filteredItem ->
             println("Filtered item: $filteredItem")
         }
     }
    

    In this example:

    • We import the Observable class from the RxJava library.

    • We create an Observable using the just operator, emitting the numbers 1 through 10.

    • We apply the filter operator to the Observable. Inside the lambda expression, we define a filter condition to emit only even numbers.

    • Only items that satisfy the filter condition (even numbers) are allowed to pass through.

    • We subscribe to the filtered Observable and define an action to take when an item is emitted. Inside the lambda expression, we print each filtered item.

    • When the program runs, the original Observable emits each number in the sequence (1 through 10), and the filter operator allows only the even numbers to pass through. Finally, we print all the filtered items (2, 4, 6, 8, 10).

  5. zip()

    The zip() operator in RxJava is used to combine the emissions of multiple Observables together, using a function to combine the items emitted by each Observable pairwise. It emits items only when each of the source Observables has emitted an item and the corresponding items have been combined.

     import io.reactivex.rxjava3.core.Observable
    
     fun main() {
         // Create two Observables emitting sequences of letters and numbers respectively
         val lettersObservable = Observable.just("A", "B", "C")
         val numbersObservable = Observable.just(1, 2, 3)
    
         // Apply the zip operator to combine items from both Observables pairwise
         val zippedObservable = Observable.zip(
             lettersObservable,
             numbersObservable,
             { letter, number -> "$letter$number" } // Function to combine items from both Observables
         )
    
         // Subscribe to the zipped Observable and print the combined items
         zippedObservable.subscribe { combinedItem ->
             println("Combined item: $combinedItem")
         }
     }
    

    In this example:

    • We import the Observable class from the RxJava library.

    • We create two Observables using the just operator, one emitting the letters "A", "B", "C" and the other emitting the numbers 1, 2, 3.

    • We apply the zip operator to the Observables, passing them as arguments along with a function to combine the items from both Observables.

    • The function combines each letter emitted by the first Observable with the corresponding number emitted by the second Observable.

    • We subscribe to the zipped Observable and define an action to take when an item is emitted. Inside the lambda expression, we print each combined item.

    • When the program runs, the zipped Observable emits combined items by pairing each letter with its corresponding number (e.g., "A1", "B2", "C3").

  6. concat()

    The concat() operator in RxJava is used to concatenate emissions from multiple Observables, emitting all the items from the first Observable, then all the items from the second Observable, and so on, preserving the order of emission.

     import io.reactivex.rxjava3.core.Observable
    
     fun main() {
         // Create two Observables emitting sequences of letters and numbers respectively
         val lettersObservable = Observable.just("A", "B", "C")
         val numbersObservable = Observable.just(1, 2, 3)
    
         // Apply the concat operator to concatenate emissions from both Observables
         val concatenatedObservable = Observable.concat(lettersObservable, numbersObservable)
    
         // Subscribe to the concatenated Observable and print the emitted items
         concatenatedObservable.subscribe { item ->
             println("Emitted item: $item")
         }
     }
    

    In this example:

    • We import the Observable class from the RxJava library.

    • We create two Observables using the just operator, one emitting the letters "A", "B", "C" and the other emitting the numbers 1, 2, 3.

    • We apply the concat operator to the Observables, passing them as arguments.

    • The concat operator concatenates emissions from both Observables, emitting all the items from the first Observable (lettersObservable), followed by all the items from the second Observable (numbersObservable).

    • We subscribe to the concatenated Observable and define an action to take when an item is emitted. Inside the lambda expression, we print each emitted item.

    • When the program runs, the concatenated Observable emits all the items from the first Observable (lettersObservable), followed by all the items from the second Observable (numbersObservable), preserving the order of emission.

  7. merge() with take()

    The merge() operator in RxJava is used to merge emissions from multiple Observables into a single Observable, without preserving any specific order. On the other hand, the take() operator is used to emit a specified number of items from an Observable before completing.

     import io.reactivex.rxjava3.core.Observable
     import java.util.concurrent.TimeUnit
    
     fun main() {
         // Create two Observables emitting sequences of letters and numbers respectively
         val lettersObservable = Observable.just("A", "B", "C")
         val numbersObservable = Observable.just(1, 2, 3)
    
         // Apply the merge operator to merge emissions from both Observables
         // Apply the take operator to limit the emission to 5 items
         val mergedObservable = Observable.merge(lettersObservable, numbersObservable).take(5)
    
         // Subscribe to the merged Observable and print the emitted items
         mergedObservable.subscribe { item ->
             println("Emitted item: $item")
         }
     }
    

    In this example:

    • We import the Observable class from the RxJava library.

    • We create two Observables using the just operator, one emitting the letters "A", "B", "C", and the other emitting the numbers 1, 2, 3.

    • We apply the merge operator to merge emissions from both Observables into a single Observable.

    • We apply the take operator to limit the emission to 5 items.

    • We subscribe to the merged Observable and define an action to take when an item is emitted. Inside the lambda expression, we print each emitted item.

    • When the program runs, the merged Observable emits a total of 5 items, merging emissions from both Observables, without preserving any specific order. If one of the Observables completes before emitting 5 items, the merged Observable completes accordingly.

  8. take()

    The take() operator in RxJava is used to emit only the first n items emitted by an Observable before completing. It's particularly useful when you're only interested in a limited number of items from a potentially infinite sequence.

     import io.reactivex.rxjava3.core.Observable
    
     fun main() {
         // Create an Observable emitting a sequence of numbers
         val observable = Observable.just(1, 2, 3, 4, 5)
    
         // Apply the take operator to emit only the first 3 items
         val takenObservable = observable.take(3)
    
         // Subscribe to the taken Observable and print the emitted items
         takenObservable.subscribe { item ->
             println("Emitted item: $item")
         }
     }
    

    In this example:

    • We import the Observable class from the RxJava library.

    • We create an Observable using the just operator, emitting the numbers 1 through 5.

    • We apply the take operator to the Observable, specifying that we want to take only the first 3 items emitted.

    • We subscribe to the taken Observable and define an action to take when an item is emitted. Inside the lambda expression, we print each emitted item.

    • When the program runs, the Observable emits only the first 3 items (1, 2, 3) before completing. Any subsequent items emitted by the Observable are ignored.

  9. repeat()

    The repeat() operator in RxJava is used to create an Observable that repeats the sequence of emissions from the source Observable a specified number of times or indefinitely. It's particularly useful when you want to create a repeating sequence of emissions.

     import io.reactivex.rxjava3.core.Observable
    
     fun main() {
         // Create an Observable emitting a sequence of numbers
         val observable = Observable.just(1, 2, 3)
    
         // Apply the repeat operator to repeat the sequence 3 times
         val repeatedObservable = observable.repeat(3)
    
         // Subscribe to the repeated Observable and print the emitted items
         repeatedObservable.subscribe { item ->
             println("Emitted item: $item")
         }
     }
    

    In this example:

    • We import the Observable class from the RxJava library.

    • We create an Observable using the just operator, emitting the numbers 1, 2, and 3.

    • We apply the repeat operator to the Observable, specifying that we want to repeat the sequence 3 times.

    • We subscribe to the repeated Observable and define an action to take when an item is emitted. Inside the lambda expression, we print each emitted item.

    • When the program runs, the Observable emits the sequence of numbers (1, 2, 3) three times in total, and we print each emitted item. If the repeat operator was given no arguments, it would repeat the sequence indefinitely.

  10. buffer()

    The buffer() operator in RxJava is used to periodically gather items emitted by an Observable into a list or buffer, and then emit that buffer as a single item. It's useful when you want to process or batch multiple items at once.

    import io.reactivex.rxjava3.core.Observable
    
    fun main() {
        // Create an Observable emitting a sequence of numbers
        val observable = Observable.range(1, 10)
    
        // Apply the buffer operator to gather items into buffers of size 3
        val bufferedObservable = observable.buffer(3)
    
        // Subscribe to the buffered Observable and print the emitted items
        bufferedObservable.subscribe { buffer ->
            println("Buffered items: $buffer")
        }
    }
    

    In this example:

    • We import the Observable class from the RxJava library.

    • We create an Observable using the range operator, emitting numbers from 1 to 10.

    • We apply the buffer operator to the Observable, specifying that we want to gather items into buffers of size 3.

    • The Observable emits buffers containing the specified number of items (3 in this case).

    • We subscribe to the buffered Observable and define an action to take when a buffer is emitted. Inside the lambda expression, we print each emitted buffer.

    • When the program runs, the Observable emits buffers of size 3 containing the emitted numbers (1, 2, 3), (4, 5, 6), (7, 8, 9), and (10).

Many other operators can used as per requirement like buffer() with take(),concapMap(), SwitchMap(), mergeMap(), and debounce().

Observer

An observer is an interface that subscribes to an Observable to receive notifications of items being emitted, errors occurring, or the completion of emissions. The Observer interface typically defines three callback methods:

  1. onNext(): Invoked when the Observable emits an item. This method receives the emitted item as a parameter.

  2. onError(): Invoked when an error occurs during the emission of items. This method receives the error object as a parameter.

  3. onComplete(): Invoked when the Observable completes its emission, indicating that no more items will be emitted.

Observers are responsible for handling the emitted items, errors, and completion signals emitted by the Observable they are subscribed to.

import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Observer
import io.reactivex.rxjava3.disposables.Disposable

fun main() {
    // Create an Observable emitting a sequence of numbers
    val observable = Observable.just(1, 2, 3, 4, 5)

    // Create an Observer to subscribe to the Observable
    val observer = object : Observer<Int> {
        override fun onSubscribe(d: Disposable) {
            // Called when the Observer subscribes to the Observable
            println("Subscribed")
        }

        override fun onNext(item: Int) {
            // Called when the Observable emits an item
            println("Received item: $item")
        }

        override fun onError(e: Throwable) {
            // Called when an error occurs during emission
            println("Error: ${e.message}")
        }

        override fun onComplete() {
            // Called when the Observable completes its emission
            println("Completed")
        }
    }

    // Subscribe the Observer to the Observable
    observable.subscribe(observer)
}

In this example:

  • We import the necessary classes from the RxJava library, including Observable, Observer, and Disposable.

  • We create an Observable using the just operator, emitting numbers from 1 to 5.

  • We create an Observer by implementing the Observer interface and overriding its callback methods (onSubscribe, onNext, onError, and onComplete).

  • We subscribe the Observer to the Observable using the subscribe method.

  • When the program runs, the Observer receives notifications from the Observable: each emitted item (1, 2, 3, 4, 5), any errors that occur, and the completion signal. The Observer then prints these notifications accordingly.

Schedulers

Although RxJava is heavily marketed as an asynchronous way of doing reactive programming, it’s important to clarify that RxJava is single-threaded by default, and you need to specify otherwise, and that’s where Schedulers come in.

subscribeOn and observeOn

These methods allow you to control the action of the subscription and how you receive the changes.

subscribeOn

With subscribeOn you get to decide which thread your Emitter (such as Observable , Flowable , Single , etc) is executed.

The subscribeOn (as well as the observeOn ) needs the Scheduler param to know which thread to run on. Let’s talk about the difference between the threads.

Scheduler.io() This is the most common type of Scheduler that is used. They’re generally used for IO-related stuff, such as network requests, and file system operations, and a thread pool backs it. A Java Thread Pool represents a group of worker threads that are waiting for the job and are reused many times.

Observable.just("Apple", "Orange", "Banana")
    .subscribeOn(Schedulers.io())
    .subscribe{ v -> println("Received: $v") }

Scheduler.computation() This is quite similar to IO as the thread pool also backs it up, however, the number of threads that can be used is fixed to the number of cores present in the device. Say you have 2 cores, it means you’ll get 2 threads, 4 cores, 4 threads, and so on.

Observable.just("Apple", "Orange", "Banana")
    .subscribeOn(Schedulers.computation())
    .subscribe{ v -> println("Received: $v") }

Scheduler.newThread() The name here is self-explanatory, as it will create a new thread for each active Observable . You may want to be careful using this one as if there are a high number of Observable actions it may cause instability.

Observable.just("Apple", "Orange", "Banana")
    .subscribeOn(Schedulers.newThread())
    .subscribe{ v -> println("Received: $v") }

Remember, you can also set how many concurrent threads you want running, so you can do .subscribeOn(Schedulers.newThread(), 8) to have a maximum of 8 concurrent threads.

Scheduler.single() This Scheduler is backed up by a single thread. No matter how many Observable there are, it will only run in a single thread. Think about it as a replacement for the main thread.

Observable.just("Apple", "Orange", "Banana")
    .subscribeOn(Schedulers.single())
    .subscribe{ v -> println("Received: $v") }

Scheduler.trampoline() This will run on whatever the current thread is. If it’s the main thread, it will run the code on the queue of the main thread. Similar to Immediate Scheduler, it also blocks the thread. They trampoline may be used when we have more than one Observable and we want them to execute in order.

Observable.just("Apple", "Orange", "Banana")
    .subscribeOn(Schedulers.trampoline())
    .subscribe{ v -> println("Received: $v") }

Executor Scheduler This is a custom IO Scheduler, where we can set a custom pool of threads by specifying how many threads we want in that pool. It can be used in a scenario where the number of Observable can be huge for the IO thread pool.

val executor = Executors.newFixedThreadPool(10)
val pooledScheduler = Schedulers.from(executor)

Observable.just("Apple", "Orange", "Banana")
    .subscribeOn(pooledScheduler)
    .subscribe{ v -> println("Received: $v") }

AndroidSchedulers.mainThread() Calling this on observeOn will bring the thread back to the Main UI thread, and thus make any modification you need to your UI.

observeOn

The method subscribeOn() will instruct the source Observable on which thread to emit the items on and push the emissions on our Observer . But if it finds an observeOn() in the chain, it switches the emissions using the selected scheduler for the remaining operation.

Observable.just("Apple", "Orange", "Banana")
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe{ v -> println("Received: $v") }

Usually, the observing thread in Android is the Main UI thread.

RxJava Use-Cases in Android during development

Case 1: When you want to Call an API and save the response data to local storage/file. It would be long running task and doing this task on the main thread, will lead to unexpected behavior like the App not responding. So to overcome this situation we should use RxJava for better results.

Case 2: When you want to combine two API results then the zip() operator combines the result of multiple different API calls and returns a single response.

Case 3: A general approach is to do an API call and from the Collection, we can filter the content of the specific user based on the conditions and then return the data by using a direct filter() operator keeping thread management also in the whole process.

Cold vs Hot Observables

CharacteristicCold ObservablesHot Observables
DefinitionEmits items only when subscribed to, each subscriber gets its own sequence of itemsEmits items regardless of whether there are subscribers or not, subscribers may receive items emitted before they subscribed
ExampleObservable.create { ... }ConnectableObservable or Observable.publish()
Multiple SubscriptionsEach subscription starts its own independent sequenceSubscribers share the same sequence of items
Start of EmissionStarts emitting items when a subscriber subscribesStarts emitting items immediately or at a specific time
BackpressureSupports backpressure since each subscription has its own independent streamTypically doesn't support backpressure due to shared stream
Hotter/Colder AnalogyColder, as the sequence is generated anew for each subscriberHotter, as the sequence is generated independently of subscribers
ExamplesReading from a file, making an HTTP requestMouse clicks, Sensor data, Stock prices

RxKotlin

RxKotlin is a library that builds upon RxJava to provide Kotlin-specific extensions and syntactic sugar for working with reactive programming in Kotlin. It allows developers to write reactive code in a more idiomatic and concise way using Kotlin's language features. RxKotlin complements RxJava by providing Kotlin-friendly APIs and extensions, making it easier and more intuitive to work with reactive streams in Kotlin.

Here are some key features and benefits of RxKotlin:

  1. Kotlin Extensions: RxKotlin adds Kotlin extensions to RxJava types, allowing developers to use Kotlin's language features such as extension functions, lambdas, and Kotlin collections seamlessly with RxJava observables, singles, and other types.

  2. Coroutines Support: RxKotlin provides integration with Kotlin Coroutines, allowing developers to seamlessly interoperate between RxJava and coroutines. This enables developers to leverage the benefits of both reactive and coroutine-based programming paradigms in their applications.

  3. Functional Programming: Kotlin's support for functional programming aligns well with the reactive programming paradigm. RxKotlin leverages Kotlin's functional programming features to provide a more concise and expressive syntax for composing reactive streams.

  4. Null Safety: Kotlin's null safety features help prevent null pointer exceptions at compile time. RxKotlin takes advantage of Kotlin's null safety features to provide safer and more robust reactive code.

  5. DSLs and Builders: RxKotlin provides DSLs (Domain-Specific Languages) and builders that allow developers to create and compose observables and operators in a more declarative and readable manner.

Here's a simple example demonstrating the usage of RxKotlin:

import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.rxkotlin.toObservable

fun main() {
    val list = listOf("Apple", "Banana", "Orange")

    list.toObservable()
        .map { it.toUpperCase() } // Transform items to uppercase
        .filter { it.startsWith("A") } // Filter items starting with "A"
        .subscribeBy( // Subscribe with lambda functions
            onNext = { println("Received item: $it") },
            onError = { println("Error: $it") },
            onComplete = { println("Completed") }
        )
}

In this example:

  • We import extensions from io.reactivex.rxkotlin to work with RxKotlin.

  • We create a list of fruits.

  • We convert the list to an Observable using the toObservable() extension function provided by RxKotlin.

  • We chain operators such as map and filter to transform and filter the items emitted by the Observable.

  • We subscribe to the Observable using the subscribeBy extension function, which takes lambda functions for handling onNext, onError, and onComplete events.

RxKotlin provides a more Kotlin-friendly API for working with RxJava, making it easier to write reactive code in Kotlin projects. It's a powerful tool for building asynchronous and event-driven applications in Kotlin.

RxAndroid

RxAndroid is an extension of RxJava specifically tailored for Android development. It provides additional functionalities and utilities to make reactive programming with RxJava more seamless and efficient in Android applications. RxAndroid builds upon RxJava and provides Android-specific features and integration points, making it easier to work with reactive streams in Android apps.

Here are some key features and benefits of RxAndroid:

  1. Schedulers: RxAndroid provides Android-specific schedulers that allow developers to specify on which thread certain Observable operations should be performed. For example, AndroidSchedulers.mainThread() allows developers to specify that a particular operation should be executed on the main (UI) thread.

  2. Lifecycle Awareness: RxAndroid provides integration with Android's lifecycle components, such as Activity and Fragment lifecycles. This allows developers to automatically manage subscriptions based on the lifecycle of Android components, preventing memory leaks and ensuring that subscriptions are disposed of when the associated component is destroyed.

  3. UI Thread Access: RxAndroid provides utilities to safely access the UI thread from background threads. This is particularly useful for updating UI components in response to events emitted by observables running on background threads.

  4. Android-specific Observables and Operators: RxAndroid includes additional observables and operators that are tailored for Android development. For example, ViewObservable provides observables for various Android UI events such as clicks and touch events, while NotificationObservable provides observables for handling notifications.

  5. Integration with Android Components: RxAndroid seamlessly integrates with other Android components and frameworks, such as Retrofit for network requests, Room for database operations, and LiveData for reactive UI updates.

Here's a simple example demonstrating the usage of RxAndroid:

import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.schedulers.Schedulers

fun main() {
    // Simulate a network request
    val networkObservable = Observable.just("Response from server")
        .subscribeOn(Schedulers.io()) // Perform operation on IO thread
        .observeOn(AndroidSchedulers.mainThread()) // Observe on main thread

    // Subscribe to the network observable
    networkObservable.subscribe { response ->
        // Update UI with the response
        println("Received response: $response")
    }
}

In this example:

  • We import classes from io.reactivex.rxjava3.android.schedulers to work with RxAndroid schedulers.

  • We create an observable representing a network request that emits a single response.

  • We specify that the network request should be performed on the IO thread using subscribeOn(Schedulers.io()).

  • We specify that the observation of the response should be done on the main thread using observeOn(AndroidSchedulers.mainThread()).

  • We subscribe to the network observable and update the UI with the response.

RxAndroid simplifies the development of reactive Android applications by providing Android-specific utilities and integration points. It enables developers to write concise, efficient, and responsive code for handling asynchronous and event-driven tasks in Android apps.

I know devs that this blog is too long but finally, it's time to wrap up this blog. I hope this blog helps you to understand ReactiveX in detail. Our next topic will be Data Storage in Android, which I will cover in detail in our next blog post. I know I need to cover this in our basic topic but I just forgot to include it in our basic topic discussion.


Connect with Me:

Hey there! If you enjoyed reading this blog and found it informative, why not connect with me on LinkedIn? 😊 You can also follow my Instagram page for more mobile development-related content. 📲👨‍💻 Let’s stay connected, share knowledge and have some fun in the exciting world of app development! 🌟

Check out my Instagram page

Check out my LinkedIn