It’s 2020, I still write a topic about converting Android callback API to RxJava stream
Originally published on Medium
I never write any blog posts, but seeing many of my colleagues wrote their blogs and built their open-sources, encouraging me to choose a topic to start.
Why do I choose an old topic to start
Nowadays, everyone keeps talking about Kotlin Coroutine with Channels, Flow, and so on. But until now, the beginning of 2020, RxJava still a potent tool for building Android apps that I have not found any alternative. Flow may be a good option in the feature, but not at this moment. And I still see inadequate approaches from others when they create reactive objects. Especially when they use Observable.create()
method. I could list some of them:
- Use the Subject instead create an Observable. This is a good topic regarding the problem with subjects from Thomas Nield
- Does not call
onComplete
/onError
/tryOnError
when using theObservable.create
factory method to create Observable or Flowable which could lead to leaking - Does not care about threading when using “create”, what if the client calls
subscribeOn
. - Execute the logic outside of the factories method, which causes the logic being executed before the reactive stream being subscribe.
- It does not use the power of Rxjava operation to build a clean Rxjava API. Mix the logic to convert Android API to Rxjava with the UI
How to create an Observable/Single/Flowable/Completable
As many of you may know that we can create observable via multiple factory methods.
Observable.fromCallable()
Observable.fromPublisher()
Observable.fromFuture()
Observable.fromArray()
Observable.just()
Observable.fromIterable()
Observable.defer()
- …
It recommended using these methods, if possible. There are a lot of great posts about this topic on the Internet, and I should not talk about them here. What I will focus on this post are:
- Convert Android callback-style API to the reactive world with
Observable.create()
method - Use Rxjava operations to design a clean API
Let’s see an example of converting location API from Android & Google Play Service to RxJava stream. I choose this example because it is complicated enough to present the solution to have a clean API.
TL;DR: you can check out the source code in this post from here
How to get location from Android
For Android devices have Google Play Service, we can fetch the location or observe location change from two sources:
- Google Play Service (Fused Location Provider API)
- Android API, that is LocationManager
These two sources provide us the locations with the callback API. To convert to a reactive stream, we have to use Observable.create()
method.
Let define the API that we want to build.
Most of the apps request locations for two scenarios.
- Getting the last known location, you can check more detail here
- Listen to the location change, you can check more detail here
From that requirement, we build this simple interface
interface RxLocationManager {
fun singleLocation(): Single<Location>
fun observeLocationChange(): Observable<Location>
}
The interface has two methods:
fun singleLocation(): Single<Location>
This method will return a Single, which emits the last known location value. We get the last known location by:
- If there is other(s) call to observe the location change from RxLocationManager, the last received location(a cache value) should be emitted
- If there is no other call & cached location value, we will fetch the last know location from our sources(either Google Play Service or LocationManager)
fun observeLocationChange(): Observable<Location>
Return an observable that will emit the location after some time if there are any observer subscribes to it (the period should be configurable)
- It should be a hot Observable. Multiple observers should subscribe to the same observable and got the same emitted item.
- The last emitted item should be cached. If there is a new observer subscribe to the observable, it will receive the cached location (if available) as the first item
- The observable should cancel itself if all the observers are disposed
- We can tell the observable to emit locations on a specific thread
The implementation detail
First of all, as mentioned earlier, there are two sources of getting locations, from either Google Play Service or Android Location manager. They provide quite similar APIs. Therefore, I create another interface present for the location services (sources). Let call it LocationService
interface LocationService {
fun requestLocationUpdates(attributes: RxLocationAttributes): Observable<Location>
}
The LocationService.requestLocationUpdates()
method will return an Observable, which will emit the locations based on the RxLocationAttributes
values. You can think of RxLocationAttributes
as a configuration object tell to the location services what do we want. Let focus on the implementations.
Implementation with FusedLocationProviderClient
Google Play Service provides us with FusedLocationProviderClient
. There are two main methods that we will call:
FusedLocationProviderClient.getLastLocation()
FusedLocationProviderClient.requestLocationUpdates()
I create FusedLocationService
class to implement the LocationService
interface.
class FusedLocationService(
private val context: Context,
private val fusedLocationProviderClient: FusedLocationProviderClient
) : LocationService {
override fun requestLocationUpdates(attributes: RxLocationAttributes): Observable<Location> {
return createLocationObservable(attributes)
.retry(attributes.retryAttempt) { throwable ->
throwable !is GooglePlayServicesNotAvailableException &&
throwable !is SecurityException
}
}
private fun createLocationObservable(attributes: RxLocationAttributes): Observable<Location> {
return Observable.create { emitter ->
val listener = object : LocationCallback() {
override fun onLocationResult(locationResult: LocationResult?) {
locationResult?.locations?.forEach { location ->
if (!emitter.isDisposed) {
emitter.onNext(location)
}
}
}
override fun onLocationAvailability(locationAvailability: LocationAvailability?) {
if (locationAvailability?.isLocationAvailable == false && !emitter.isDisposed) {
emitter.tryOnError(LocationNotAvailableException())
}
}
}
// 1. Fetch last known location first
fusedLocationProviderClient.lastLocation
.addOnSuccessListener { location ->
if (location != null && !emitter.isDisposed) {
emitter.onNext(location)
}
}
.addOnFailureListener { exception ->
if (!emitter.isDisposed) {
emitter.tryOnError(exception)
}
}
// 2. Register for location updates
try {
fusedLocationProviderClient.requestLocationUpdates(
getLocationRequest(attributes),
listener,
if (attributes.useCalledThreadToEmitValue) null else Looper.getMainLooper()
)
} catch (e: Exception) {
emitter.tryOnError(e)
return@create
}
// 3. Set disposal callback
emitter.setDisposable(Disposables.fromAction {
fusedLocationProviderClient.removeLocationUpdates(listener)
})
}
}
private fun getLocationRequest(attributes: RxLocationAttributes): LocationRequest {
return LocationRequest.create().apply {
interval = attributes.updateInterval
fastestInterval = attributes.fastestInterval
priority = attributes.priority
}
}
}
Let focus on createLocationObservable()
method.
- Fetch last known location from Google Play service and emit to the stream
- Listen to locations change from Google Play Service and emit to the stream.
Here, we create an Observable by passing an ObservableOnSubscribe
implementation to the Observable.create
method. Inside the ObservableOnSubscribe
, we have to implement the ObservableOnSubscribe.subscribe
method. This method will be called when we subscribe to the observable, it receives an ObservableEmitter
that allows pushing events in a cancellation-safe manner.
When creating an Observable from Observable.create
, we should make sure:
1. Clean up resources if the ObservableEmitter is canceled or disposed. To clean up, we have a two options, ObservableEmitter.setDisposable()
or ObservableEmitter.setCancellable()
. They are equivalence. The main differences are setCancellable
will deliver the exceptions during clean up to RxJavaPlugins.onError(Throwable)
. For our case, it’s remove the LocationUpdatesListener
from FusedLocationProviderClient
2. Catch the exception and make sure calling ObservableEmitter.tryOnError()
or ObservableEmitter.onError()
to pass the error to the stream. The main difference between these two methods is that ObservableEmitter.tryOnError
will not forward the error to RxJavaPlugins.onError
if the error could not be delivered.
try {
// register the callback
} catch (e: Exception) {
emitter.tryOnError(e)
}
3. Make sure you call ObservableEmitter.onComplete()
if the source finishes emitting the items (applicable for Observable & Flowable). For our example, it’s a hot observable, and we don’t have any complete points.
4. These methods ObservableEmitter.onNext()
, ObservableEmitter.onError()
, ObservableEmitter.tryOnError()
and ObservableEmitter.onComplete()
should be called in a sequential
5. What thread your observable will emit the value on. Most of the callback API provided will emit the value to the register thread (the thread execute the registration callback to the framework) by default. In Android many callback API will require an HandlerThread
for the default register thread and also if you want to tell to the API to call the callback in another thread, then you have to pass a looper or an handler to the API. In our example you can see:
fusedLocationProviderClient.requestLocationUpdates(
getLocationRequest(attributes),
listener,
if (attributes.useCalledThreadToEmitValue) null else Looper.getMainLooper()
)
Base on the attributes.useCalledThreadToEmitValue
, we will pass a null value or the main looper to the method.
- Null value: The callback will be called from the thread, which executes the
ObservableOnSubscribe.subscribe
method. Usually, it will be the thread that you subscribe to the stream, or you can tell the observable to emit the value from which thread bysubscribeOn(Thread)
. Please be aware, in our case. The thread must be aHandlerThread
, otherwise an exception is throwed. You can read more aboutHandlerThread
here or here. - Looper.getMainLooper(): We default the callback will be called from the main thread. It means that the Observable will emit location from the main thread even you applied
subscribeOn(Thread)
to the stream.
That is, regarding create the Observable by Observable.create
method.
Let look more detail on the FusedLocationService.requestLocationUpdates
method
override fun requestLocationUpdates(attributes: RxLocationAttributes): Observable<Location> {
return createLocationObservable(attributes)
.retry(attributes.retryAttempt) { throwable ->
throwable !is GooglePlayServicesNotAvailableException &&
throwable !is SecurityException
}
}
We will retry the observable attribute.retryAttempt
times, if any errors occurred excepts GooglePlayServicesNotAvailableException
(Google play services is not available) or SecurityException
(permission has not granted). Be aware that we can set attribute.retryAttempt
via RxLocationAttributes
.
So, with the same approach, I have an implementation to getting the location from Android API (LocationManager
)
class AndroidLocationService(
private val context: Context,
private val locationManager: LocationManager
) : LocationService {
override fun requestLocationUpdates(attributes: RxLocationAttributes): Observable<Location> {
return createLocationObservable(attributes)
.retry(attributes.retryAttempt) { throwable ->
throwable !is SecurityException
}
}
private fun createLocationObservable(attributes: RxLocationAttributes): Observable<Location> {
return Observable.create { emitter ->
val listener = object : LocationListener {
override fun onLocationChanged(location: Location) {
if (!emitter.isDisposed) {
emitter.onNext(location)
}
}
override fun onStatusChanged(provider: String?, status: Int, extras: Bundle?) {}
override fun onProviderEnabled(provider: String?) {}
override fun onProviderDisabled(provider: String?) {
if (!emitter.isDisposed) {
emitter.tryOnError(LocationProviderDisabledException(provider))
}
}
}
// Get last known location first
try {
val lastKnownLocation = locationManager.getLastKnownLocation(LocationManager.GPS_PROVIDER)
if (lastKnownLocation != null && !emitter.isDisposed) {
emitter.onNext(lastKnownLocation)
}
} catch (e: SecurityException) {
emitter.tryOnError(e)
return@create
}
// Register for location updates
try {
locationManager.requestLocationUpdates(
LocationManager.GPS_PROVIDER,
attributes.updateInterval,
attributes.minDistance.toFloat(),
listener,
if (attributes.useCalledThreadToEmitValue) null else Looper.getMainLooper()
)
} catch (e: Exception) {
emitter.tryOnError(e)
return@create
}
emitter.setDisposable(Disposables.fromAction {
locationManager.removeUpdates(listener)
})
}
}
}
Implement the RxLocationManager interface
Now it’s the time for us to integrate the two services, and implement the interface RxLocationManager
that we designed at the beginning.
class RxLocationManagerImpl(
private val fusedLocationService: FusedLocationService,
private val androidLocationService: AndroidLocationService
) : RxLocationManager {
override fun observeLocationChange(): Observable<Location> {
return createLocationObservable(getDefaultLocationAttributes())
}
override fun singleLocation(): Single<Location> {
return createLocationObservable(getDefaultLocationAttributes())
.firstOrError()
.timeout(30, TimeUnit.SECONDS)
}
private fun createLocationObservable(locationAttributes: RxLocationAttributes): Observable<Location> {
return fusedLocationService.requestLocationUpdates(locationAttributes)
.onErrorResumeNext { it: Throwable ->
when (it) {
is GooglePlayServicesNotAvailableException -> {
androidLocationService.requestLocationUpdates(locationAttributes)
}
else -> Observable.error(it)
}
}
.replay(1)
.refCount()
}
private fun getDefaultLocationAttributes(): RxLocationAttributes {
return RxLocationAttributes.Builder()
.setUpdateInterval(10000)
.setFastestInterval(5000)
.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
.setRetryAttempt(3)
.build()
}
}
Let look into RxLocationManagerImpl.createLocationObservable
method
private fun createLocationObservable(locationAttributes: RxLocationAttributes): Observable<Location> {
return fusedLocationService.requestLocationUpdates(locationAttributes)
.onErrorResumeNext { it: Throwable ->
when (it) {
is GooglePlayServicesNotAvailableException -> {
androidLocationService.requestLocationUpdates(locationAttributes)
}
else -> Observable.error(it)
}
}
.replay(1)
.refCount()
}
First we will observe the location update from FusedLocationService
, if there is any error occurred, we will switch to observe location change from AndroidLocationService
. For example, if we run this piece of code in any Chinese devices without Google Play Service, then AndroidLocationService
will be used.
Next, we apply replay
and refCount
operations
fusedLocationService.requestLocationUpdates(locationAttributes)
.onErrorResumeNext { it: Throwable ->...}
.replay(1)
.refCount()
replay(1)
turn the observable to aConnectableObservable
, the latest item will be replay, if any new observer subscribe to the stream..refCount()
make theConnectableObservable
behave like an ordinary Observable. RefCount then keeps track of how many other observers subscribe to it and does not disconnect from the underlying connectable Observable until the last observer has done so.
So, in the end, our target only registers one callback to the API, even multiple observers subscribe to the Observable. And all observers should receive the same locations.
Let move to the RxLocationManager.singleLocation
method
override fun singleLocation(): Single<Location> {
return createLocationObservable(getDefaultLocationAttributes())
.firstOrError()
.timeout(30, TimeUnit.SECONDS)
}
We reuse the observable from RxLocationManager.observeLocationChange()
method, convert it to a single by calling Observable.firstOrError()
. For this approach, we will benefit from the caching value from the observeLocationChange
if there are any observers currently subscribe to the Observable. But the downside is if your app does not actively observe location change, and keep calling the singleLocation
multiple times. We may consume more battery than expected. Because if there is no observer subscribe to the observable, then every time you call singleLocation
, we will register the callback to location provides, which high chance will fire our device GPS or other tools.
Secondly, I add the option to set up a location request time out for singleLocation
. Most of the time we call to get one single location, we expect to get it fast, and possibly there is delaying/hanging on getting the location. For that case, we should fire an error instead of keeping waiting. So, the downstream could show the user an error message.
That it, it’s very appreciate that you reach to this point of my post
As mention earlier, the source code is available here. I create a sample in the repo for you to play with the implementation, it’s recommended to check it out.
Comments