An Interest In:
Web News this Week
- April 24, 2024
- April 23, 2024
- April 22, 2024
- April 21, 2024
- April 20, 2024
- April 19, 2024
- April 18, 2024
Leak investigation: Rx disposal race in SQLDelight
Header image: The In-Between by Romain Guy.
In this blog we'll look into how an easy mistake when using Observable.create()
can lead to subtle leaks.
I recently investigated the following leak, which I couldn't reproduce systematically:
... com.example.hockey.PlayerQueries$selectAllQuery instance Query.listeners ~~~~~~~~~ java.util.concurrent.CopyOnWriteArrayList instance CopyOnWriteArrayList.array ~~~~~ java.lang.Object[] array Object[].[0] ~~~ sqldelight.runtime.rx.QueryListenerAndDisposable instance Retaining 4.3 kB in 56 objects QueryListenerAndDisposable.emitter ~~~~~~~... RxJava observer chain com.example.hockey.PlayersView$onAttachedToWindow$1 instance Anonymous class implementing io.reactivex.functions.Function PlayersView$onAttachedToWindow$1.this$0 ~~~~~~ com.example.hockey.view.PlayersView instance Leaking: YES (View.mContext references a destroyed activity)
In the above leaktrace, PlayerQueries$selectAllQuery
is a generated SQLDelight query. Our PlayersView
is listening for updates to that query while the view is attached by leveraging Query.asObservable(). Once the view is detached, the observable chain is disposed and the query is expected to let go of the corresponding listener.
I inspected the heap dump and found that the view was indeed detached, the observable chain was correctly disposed, and yet the QueryListenerAndDisposable
listener had not been removed from the query. Let's look at the Query.asObservable()
implementation:
fun <T : Any> Query<T>.asObservable(): Observable<Query<T>> { return Observable.create(QueryOnSubscribe(this))}private class QueryOnSubscribe<T : Any>( private val query: Query<T>) : ObservableOnSubscribe<Query<T>> { override fun subscribe(emitter: ObservableEmitter<Query<T>>) { val listener = QueryListenerAndDisposable(emitter, query) emitter.setDisposable(listener) query.addListener(listener) emitter.onNext(query) }}private class QueryListenerAndDisposable<T : Any>( private val emitter: ObservableEmitter<Query<T>>, private val query: Query<T>) : AtomicBoolean(), Query.Listener, Disposable { override fun queryResultsChanged() { emitter.onNext(query) } override fun isDisposed() = get() override fun dispose() { if (compareAndSet(false, true)) { query.removeListener(this) } }}
Let's now look at the implementation example from Observable.create():
val observable = Observable.create<Event> { emitter -> val closeable = api.someMethod { event -> emitter.onNext(event) } emitter.setCancellable(closeable::close)}
We can rewrite the above example code to use a listener instead of a closeable:
val observable = Observable.create<Event> { emitter -> // 1. Create a listener val listener: (Event) -> Unit = { event -> emitter.onNext(event) } // 2. Set the listener api.addListener(listener) // 3. Remove the listener on dispose emitter.setCancellable { api.removeListener(listener) }}
The above implementation is fairly close to the SQLDelight implementation, with one major difference: SQLDelight sets the disposable before adding the listener to the query, i.e. something like this:
val observable = Observable.create<Event> { emitter -> // 1. Create a listener val listener: (Event) -> Unit = { event -> emitter.onNext(event) } // 2. Remove the listener on dispose emitter.setCancellable { api.removeListener(listener) } // 3. Set the listener api.addListener(listener)}
It turns out that's a mistake! If emitter
is already disposed when the subscription runs, then we'll add the listener but never remove it:
val observable = Observable.create<Event> { emitter -> // 1. Create a listener val listener: (Event) -> Unit = { event -> emitter.onNext(event) } // 2. if emitter is currently already disposed, // the cancellable callback fires immediately and // there's no listener to remove yet. emitter.setCancellable { api.removeListener(listener) } // 3. Set the listener, which will never be removed. Leak! api.addListener(listener)}
Can emitter
be already disposed when the subscription runs? Yes! This can happen if the subscription runs on a separate thread from the thread that called subscribe()
:
val subscription = observable.subscribeOn(Schedulers.io()) .subscribe()// `dispose()` might execute at the time as the subscription// callback.subscription.dispose()
Take aways
- When using
Observable.create()
, check the order in which you set a listener vs set the disposable. - I opened a PR to fix SQLDelight. In the meantime, remove
subscribeOn
calls for observables that originate fromQuery.asObservable()
. As you saw from the code above, allQuery.asObservable()
is doing is setting a listener so there's no need to be on any special scheduler here.
Original Link: https://dev.to/pyricau/leak-investigation-rx-disposal-race-in-sqldelight-3n06
Dev To
An online community for sharing and discovering great ideas, having debates, and making friendsMore About this Source Visit Dev To