DIY Reactive Model Store using RxJava
May 5, 2019In the last few years on Android, we’ve seen an explosion of architectures based on the idea of a unidirectional data flow.
I was first exposed to this idea while working on an MVI-based app using RxJava. One of the key concepts in MVI is to cleanly manage changes to your application state. The Model Store pattern is key in achieving this.
Working with Immutable State
So, what does a Model Store do, exactly? Basically, it stores immutable state.
That’s a pretty abstract concept. To make this idea more concrete, let’s imagine we’re building an upvoting application. This app will keep track of upvotes from our users.

In Kotlin, our app-state could look something like this:
data class UpvoteModel(val hearts:Int, val thumbsUp:Int)
Each time a vote is recorded, we’ll want to update the stored state. Given that a data class
is immutable, we use the copy()
function to update our count:
val initialState = UpvoteModel(hearts=0, thumbsUp=0)
val newState = initialState.copy(thumbsUp=initialState.thumbsUp+1)
Reducers
So the big idea here is to build newState
from oldState
. Let’s formalize this with the Reducer
interface:
// Reducer takes an old (immutable) state and builds a new state from it.
interface Reducer<S> {
fun reduce(oldState: S): S
}
Each reducer instance represents one “action” we received from the user. For our Android app, we have two buttons. We’ll need actions to increment ❤️ or 👍 by 1
class AddHeart :Reducer<UpvoteModel> {
override fun reduce(oldState: UpvoteModel) =
oldState.copy(hearts = oldState.hearts + 1)
}
class AddThumbsUp :Reducer<UpvoteModel> {
override fun reduce(oldState: UpvoteModel) =
oldState.copy(thumbsUp = oldState.thumbsUp + 1)
}
Model Store
Let put focus back on the Model Store. We’ll want our store to process Reducer
instances on the input side. And we’ll need some reactive stream on the output side. Using RxJava, the following interface works nicely:
interface ModelStore<S> {
fun process(reducer: Reducer<S>)
fun modelState(): Observable<S>
}
Let’s combine all these ideas together, and visualize them as a marble diagram:
How do we implement this? Let’s start with the process(r)
function. The first thing we need to do is take that function call, and turn it into an Observable<Reducer>
. The RxRelay library is ideal here.
val reducers = PublishRelay.create<Reducer<S>>()
fun process(reducer: Reducer<S>) = reducers.accept(reducer)
Once an Observer subscribes to a PublishRelay
, it will emit all the following observed items to the subscriber. That said, we’ll only be using this reducers:PublishRelay
internally:
private val store = reducers
.observeOn(AndroidSchedulers.mainThread())
.scan(startingState) { oldState, reducer -> reducer.reduce(oldState) }
Single Source of Truth
We are missing one important piece of the puzzle. We need our model store to be a shared, single source of truth. To get the behaviour we’re looking for, we need to turn our store into a ConnectableObservable
.
The replay(1)
operator gives us this ConnectableObservable. This insures that subscribers will get the current state of our ModelStore on subscription. Then, we use connect()
on initialization, priming our ModelStore immediately.
Here’s a generic RxModelStore
implementation:
open class RxModelStore<S>(startingState: S) : ModelStore<S> {
/**
* Gives us an Observable<Reducer> sourced from process(reducer)
*/
private val reducers = PublishRelay.create<Reducer<S>>()
/**
* Our shared single source of truth.
*/
private val store: ConnectableObservable<S> = reducers
.observeOn(AndroidSchedulers.mainThread())
.scan(startingState) { oldState, reducer -> reducer(oldState) }
.replay(1)
.apply { connect() }
/**
* Model will receive reducers to be processed via this function.
*/
override fun process(reducer: Reducer<S>) = reducers.accept(reducer)
/**
* Observable stream of states.
*/
override fun modelState(): Observable<S> = store
}
Subscribing to a ModelStore
The last step here is to subscribe to a model store. If we go back to our upvoting app example, your subscriptions will look something like this:
store.modelState().subscribe { model ->
textView.text =
resources.getString(
R.string.upvotes,
model.hearts,
model.thumbsUp
)
}
Key Benefits
By keeping your state immutable, and processing changes from within a single thread, you can make sure race conditions won’t become a concern.
You can apply this idea anywhere. Be it MVVM, MVI, MVP, if you deal with shared Model State, the Model Store pattern can become handy.
Interested in learning more? Check out this full Upvote MVI Sample App on Github, from my Simple MVI Architecture talk at Droicon Boston 2019.