RxJava to Kotlin Coroutines: The Ultimate Migration Guide

Matthew Shoemaker
ITNEXT
Published in
12 min readJul 7, 2024

As a developer working in large legacy applications, I’ve had the pleasure (and sometimes pain) of leading the modernization of large codebases that relied heavily on RxJava for their reactive programming. This modernization effort revolved heavily around translating RxJava code into equivalent Coroutines and Flow. I learned a lot during the migrations, and I’m going to share everything I’ve learned in this article.

The focus of this article is to provide a comprehensive guide for developers to use when migrating legacy RxJava code to Kotlin Coroutines and Flow. My hope is to document the common use cases of RxJava and provide a step by step and thorough guide to translating RxJava code into the more modern and native Kotlin Coroutines and Flow.

Learning

The first and most important step to migrating RxJava to coroutines is quite obviously learning how to use Coroutines and Flow. To this end, I have some resources that I have found helpful in my learning of Coroutines and Flow.

Dependencies

In order to use Kotlin Coroutines and Flow in your applications, you will need several dependencies, outlined below with descriptions of what they provide so you can determine which you need. Of course, the versions may change by the time you read this, so use your best judgment there.

dependencies {
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0-RC" // Essential to use Coroutines
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9" // Essential to use Coroutines in Android
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.6.0-RC" // Utilities for interop with RxJava and Coroutines
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.8.1" // Utilities for interop with LiveData and Coroutines

testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0-RC" // Essential to use Coroutines in your tests
testImplementation "app.cash.turbine:turbine:1.1.0" // Library to simplify testing of Flows
}

Special Discussion on Single and Coroutines

Before we get to the nitty gritty, we need to place special emphasis on something that I’ve seen many developers struggle with when adopting coroutines after using RxJava for years.

There will never be a situation where you convert an RxJava Single to a Flow. Every time you see RxJava's Single type, it is always directly equivalent to a suspend function or plain coroutine.

This is by far the most common anti-pattern that I’ve seen developers struggle to understand.

If I ever catch any of you converting a Single to a Flow I will summon the Coroutines Gods down to smite thee immediately :)

CoroutineScope == CompositeDisposable

For all practical purposes, you can imagine that CoroutineScope is directly equivalent to RxJava’s CompositeDisposable.

Both CompositeDisposable and CoroutineScope serve the exact same purpose. They both boil down to a container to track active resources and cancel them at the appropriate time. So for all migration purposes, it is useful to treat them as directly equivalent.

The CoroutineScope interface does provide some additional information via itsCoroutineContext, but an explanation of that structure is likely better served by reading the learning material linked above.

TL;DR
RxJava’s CompositeDisposable.dispose()is directly equivalent to Coroutines'CoroutineScope.cancel()

Interop with RxJava

Before we get into a migration strategy, we need to cover the equivalencies between all of the RxJava data types and how to convert back and forth between RxJava and Coroutines.

The next few sections should serve as a reference for you while doing your development to help remember which scenarios require which solutions.

There are 3 distinct scenarios in which you may find yourself during your migrations:

  1. Re-writing RxJava as Coroutines code.
  2. Bridging RxJava code to work within your Coroutines code.
  3. Bridging Coroutines code to work within your RxJava code.

Scenario 1: Re-writing RxJava as Coroutines

This is the most typical scenario, but perhaps the least simple. The table below provides a concise reference for how to convert each data type.

Conversion table for converting RxJava directly to Coroutines

Scenario 2: Integrating RxJava into Coroutines code

In this scenario, there may be some RxJava code that you don’t have access to, and as such, you cannot directly convert it to Coroutines. In this case, there are very simple operators that allow you to integrate RxJava directly into your Coroutines. The table below provides a concise reference for how to convert each data type.

Conversion table for integrating RxJava into your Coroutines

Scenario 3: Integrating Coroutines into RxJava code

In this scenario, you may need to bridge some existing Coroutines code to work with some RxJava code that you just don’t have the bandwidth to migrate yet. In which case, there are very simple operators available to integrate Coroutines directly into your RxJava code. The table below provides a concise reference for how to convert each data type.

Conversion table for integrating Coroutines into RxJava code

Interop with LiveData

As you get to the end of each phase of your migration, you’ll likely be replacing LiveData with Flow. This step is not strictly mandatory in order to migrate to Coroutines and Flow, but it is nice to be able to rely on the same data structure throughout your app, from the network layer to the UI layer. It can be tedious forwarding events between LiveData and other streams, so having one data structure tends to simplify things.

Just as with RxJava, there are 3 distinct scenarios in which you may find yourself when converting LiveData to Flow:

  1. Replacing LiveData with Flow
  2. Convert a LiveData to a Flow
  3. Convert a Flow to a LiveData

Scenario 1: Replacing LiveData with Flow

In the ideal case, you own all the code and you can directly replace a LiveData with a Flow. In this case, you should almost always be converting all of your LiveData to StateFlow.

StateFlow has some important differences from LiveData to be aware of as you do your conversions:

  • It requires a default value. This allows StateFlow to ensure there is always something for consumers to consume.
  • It has distinctUntilChanged inherently applied to all emissions.
  • StateFlow is not inherently lifecycle-aware, so your collection must be done in a lifecycle-aware way. Shown below is a very simple way to accomplish this:
viewModel.someDataFlow
// flowWithLifecycle ensures emissions are only received during the specified lifecycle state
.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED)
.onEach {
// Process emission
}.launchIn(lifecycleScope)

Scenario 2: Convert a LiveData to a Flow

Sometimes, you will find that you cannot change a particular LiveData because you don’t have access to it or it may be just too impactful for the effort you’re alotted. In which case, you have the .asFlow operator which will convert that particular LiveData directly to a flow. You also have the option of using the .stateIn(viewModelScope) or .shareIn(viewModelScope) operators if you need to convert to a StateFlow or SharedFlow.

Scenario 3: Convert a Flow to a LiveData

Sometimes, you will find that you have some API that requires you to use a LiveData even though your code is all using Flow. In this case, you can just use the .asLiveData() operator available to bridge that gap.

Approaching your migration

Now that we’ve covered exactly how to convert each RxJava type to Coroutines and vice versa, as well as LiveData, we should have all the tools we need to start our migration. Anything beyond what was covered above will have been covered by your courses on Coroutines and Flow.

The most important thing to remember about the migration is that it is exceedingly easy to perform the migration in very small pieces. With large applications, it is simply not possible to migrate everything all at once. It may be a years-long process of phasing out RxJava support, so the fantastic interop support is what really makes the migration possible.

Given the extensive and simple interoperability between Coroutines/Flow and RxJava and LiveData, there should few to no obstacles to truly prevent you from migrating your application piece-by-piece.

Start with the service layer

Starting your migration with the service layer simplifies many things. This allows us to begin our migration at the root of the reactive chain and work our way up to the UI layer in steps.

This guide is going to assume you are using Retrofit and OkHttp for your network layer, as that is by far the most common pattern in use in Android Architecture.

I recommend that you start small with the least impactful service to get your feet wet. Follow that service all the way all the way up to the UI layer as a POC to work out the kinks before you convert more and more impactful services. There is little benefit to migrating your entire data layer at once.

Converting Retrofit Interfaces to Coroutines

A typical Retrofit service interface implementation with RxJava will look something like this:

interface MyService {
@Post("/my/service/url/foo/bar")
fun getSomeData(@Query("id") id: Int): Single<SomeData>
}

Converting this to use Coroutines is very simple. The only things that need to be modified are:

  • Add the suspend modifier
  • Return the data directly instead of wrapping with Single
interface MyService {
@Post("/my/service/url/foo/bar")
suspend fun getSomeData(@Query("id") id: Int): SomeData
}

Fixing the broken service consumers (temporarily)

Now, obviously, you’ve just broken the contract for the consumers of your services. Your repositories are going to have compilation errors now. I recommend you stop your migration here and resolve those errors, and run your application to validate things are still working as expected.

Resolving the error is as simple as wrapping the call to the broken method with the Kotlin operatorrxSingle.

import kotlinx.coroutines.rx2.rxSingle
class MyRepository @Inject constructor(
myService: MyService
) {
fun getSomeData(id: Int): Single<SomeData> {
return rxSingle {
myService.getSomeData(id)
}
}
}

Once you’ve validated that your app still works with these changes, we can move on to the next steps.

Work your way up the data layer

Every application’s data layer is going to look different, and so I can’t give you the silver bullet step-by-step guide to converting your application’s data layer specifically.

But let’s imagine an application with a very simple data layer that consists of a Repository that holds 2 services with RxJava:

class MyRepository @Inject constructor(
private val myFirstService: MyFirstService,
private val mySecondService: MySecondService
) {
fun getSomeData(): Single<Int> {
return myFirstService
.getSomeData()
.flatMap { id ->
mySecondService.getMoreData(id)
}
}
}

As you can see, we’re using the result of one service call to make a new service call.

Converting this to use Coroutines becomes as simple as the following:

class MyRepository @Inject constructor(
private val myFirstService: MyFirstService,
private val mySecondService: MySecondService
) {
suspend fun getSomeData(): Int {
val firstResponse = myFirstService.getSomeData()
return mySecondService.getMoreData(firstResponse)
}
}

As you can see, this is much easier to read and has fewer lines than its RxJava counterpart.

Stop!

Once you’ve made your way through the data layer and you’re comfortable with the changes you’ve made (and resolved compilation errors), I strongly recommend stopping here before moving into your business layer. Take some time and do some regression testing on your features to ensure there are no strange or subtle bugs happening.

Moving On to the Business Layer

For this tutorial, I will assume the next logical step of your migration would be your ViewModel. But, of course, your architecture may differ so use your best judgment on where to start in your business layer. For example, if you’re implementing Clean Architecture then the next step would likely be to convert some associated UseCase implementations.

viewModelScope

ViewModels have built in support for coroutines, so you should not need to manually manage any CoroutineScope cancellation in your ViewModel. From within your ViewModel, you have access to a viewModelScope property. This property is a CorutineScope instance managed by the androidx libraries “automagically”. So the viewModelScope is guaranteed to always be cancelled upon destruction of your ViewModel instance. This ensures cancellation of any associated resources (provided you abide by cooperative cancellation in your coroutines).

ViewModel with RxJava

Suppose you have a pretty simple ViewModel that uses 2 repositories to provide some data to your view. It has one repository with an Observable stream of data, and each time it emits, we want to launch a service call from a different repository to get some related data.

Let’s take a look at what that would look like with RxJava:

class MyViewModel @Inject constructor(
private val myFirstRepository: MyFirstRepository,
private val mySecondRepository: MySecondRepository
): ViewModel() {
private val disposableContainer = CompositeDisposable()

private val _state = MutableLiveData<UiState>(UiState.Loading)
val state: LiveData<UiState> = _state

init {
disposableContainer.add(
myFirstRepository.data
.flatMapSingle {
mySecondRepository.makeSomeServiceCall(it)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
_state.value = UiState.Success(it)
}
)
}

fun loadData(id: Int) {
myFirstRepository.init(id)
}

override fun onCleared() {
disposableContainer.dispose()
}

}

Now let’s take a look at the relevant aspects of this to understand how things will change:

As you can see in the example above, we have to manually track the Disposable created by subscribing to the Observable from MyFirstRepository. We then need to make sure we override the onCleared() function so we can dispose of the CompositeDisposable. Also note that we are forced to specify the relevant schedulers (in reality there are ways around using observeOn if we use postValue instead of .value but this is for demonstration purposes).

Same ViewModel with Coroutines

Now let’s take a look at how things change with Coroutines:

class MyViewModel @Inject constructor(
private val myFirstRepository: MyFirstRepository,
private val mySecondRepository: MySecondRepository
): ViewModel() {

private val _state = MutableStateFlow<UiState>(UiState.Loading)
val state: StateFlow<UiState> = _state

init {
myFirstRepository.data
.map {
mySecondRepository.makeSomeServiceCall(it)
}
.onEach { data ->
_state.update { UiState.Success(data) }
}.launchIn(viewModelScope)
}

fun loadData(id: Int) {
viewModelScope.launch {
myFirstRepository.init(id)
}
}

}

At first glance, you can see how there is much less code to maintain here. That is one of the great parts of using Coroutines. It tends to reduce the amount of code you need to write to accomplish a given task.

Looking closer, let’s see what has changed:

  • We no longer need a disposable container, since the viewModelScope is is managed “automagically”. We don’t need to maintain our own mechanism to dispose active resources
  • We also don’t need to remember to use addDisposable or any equivalent, since viewModelScope tracks resources for us and we’ve deleted disposableContainer
  • We no longer need to override onCleared() because viewModelScope is managed for us, as mentioned several times already
  • We do not need to manually provide any schedulers (although you can, it is strongly recommended that you leave Dispatcher switching to the IO thread within your data layer, not your business layer)
  • We are processing emissions in our onEach method instead of a subscribe block. This ensures that any exceptions are caught by downstream catch operators and it reduces the bracket-hell that can come with using collect
  • We’ve changed LiveData to StateFlow. There are some minor differences to be aware of, but for the majority of cases a direct replacement is perfectly advisable
  • We are using viewModelScope.launch to invoke myFirstRepository.init(id). This is because the init method is now a suspend function, whereas it was a Completable function before.

Updating your ViewModel to use Coroutines is largely an exercise in applying the RxJava Interop section outlined above, so be sure you are referring to that section frequently to find equivalencies.

Stop!

Just as we had done with the data layer, this is an important place to stop and do some regression testing to ensure your code still works the way you expect it to. It’s important to try to avoid biting off more than you can chew at one time!

Once you’ve resolved all compilation errors and you’re confident you haven’t broken your app, it’s time to move on to the final piece of the puzzle!

Finally, Migrating your View layer

Now that we’ve integrated coroutines into our data layer and our business layer, it’s time to start wrapping things up with the final piece of the puzzle: the View layer.

Implementations of the View layer tend to vary wildly, with many different technologies being used. However, there are 2 major cases that create distinct dichotomies: XML Views and Jetpack Compose.

For this article, I am going to assume that you are using XML views. Since you’re still using RxJava, it’s probably a safe assumption most of your code is also using XML views.

Fragments and Activities

Fragments and Activities function largely the same in the context of Coroutines. Just as with ViewModels and viewModelScope, we are provided with a very convenient lifecycleScope instance that manages CoroutineScope cancellation for us!

lifecycleScope

You will often find yourself needing to observe emissions from a Flow with coroutines, and in order to do so you can use the lifecycleScope as your CoroutineScope very similarly to the way you use viewModelScope.

Let’s consider the ViewModel discussed previously. I’ll simplify the boilerplate like ViewModel factory and view manipulation, etc. Below is how this would look in a Fragment:

class MyFragment: Fragment() {
lateinit var viewModel: MyViewModel

override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)

viewModel.state
.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED)
.onEach {
when (it) {
UiState.Success -> handleSuccess(it)
UiState.Loading -> handleLoading(it)
UiState.Error -> handleError(it)
}
}
.launchIn(lifecycleScope)
}
}

Let’s analyze the important aspects of this:

  • Most importantlyflowWithLifecycle. This ensures that we only receive emissions from the underlying Flow while the Fragment is in the Started state. This gives us lifecycle-aware emissions, just as LiveData would give us.
  • We are using lifecycleScope to manage the cancellation of our resources.

XML DataBinding

Data binding supports only the usage of StateFlow at the time of writing this article. So you cannot use SharedFlow or Flow. You can use StateFlow in exactly the same way you would have used LiveData

Conclusion

Migrating away from RxJava to use Coroutines is a great way to modernize your application. And with all the memory benefits gained by the efficiency of Coroutines’ usage of Threads, you may even encounter performance improvements. At the very least, your code will be much simpler and easier to maintain for years to come!

Thanks for reading, and I am always open to feedback or corrections, so if you found this article helpful, please leave a like and a comment to help me get some visibility! :)

--

--

Writer for

Senior Android Developer driven by a passion for learning and sharing knowledge. Engaging with the developer community through blogs and open-source projects.