如何正確使用 ReactiveX 的 Timeout Operator

使用了 RxJava 來開發 Android App 已經一段時間了,最近在追一個問題時才發現自己一直都沒有正確理解 Timeout operator,並且大部分時候都誤用了。這篇文章簡短分享一下這個經驗。
Observable
.interval(6, TimeUnit.SECONDS)
.startWith(0)
.subscribeOn(Schedulers.io())
.timeout(5, TimeUnit.SECONDS)
.subscribeBy(onNext = {
Log.i(tag, "onNext $it")
}, onError = {
Log.e(tag, "onError $it")
}, onComplete = {
Log.e(tag, "onComplete")
})
.disposeTo(compositeDisposable)
上面這一段 code,一開始的 interval 用來模擬任意 observable streams,最近遇到的 case 是前端 stream 來源是一個自己封裝的 network request,而 timeout 則是用來判斷是否 network request 超時未回應。
這裡會產生的問題是當 network request 在第 3 秒 respond 時,onNext 會正確收到 response 值,但在第 8 秒時,onError 又會收到一次 TimeoutException,這就導致此處的功能狀態錯亂了。
這裡的問題在於第一個一開始的 network request 封裝沒有正確的在 respond 後送出 complete 的 event。 第二是 timeout operator 根據文件是指定時間內沒收到任何 stream,所以每收到一個 emitted item 之後,在指定時間內必須再收到下一個 emitted item,否則就拋出 error。

一開始沒有認真讀文件,一直以為 timeout 就是一次性的。如果確實就是需要一次性的 timeout operator 可以加一個 extension:
fun <T> Observable<T>.timeoutFirstMessage(timeout: Long, unit: TimeUnit): Observable<T> {
return this.timeout<Long, Long>(
Observable.timer(timeout, unit),
Function { o -> Observable.never<Long>() }
)
}
上面 timeout 的參數第一個參數是指定第一個 item 的 timeout window,第二個參數則是後續的 timeout indicator,第二個返回 never 後,第二個之後的 timeout 就不會為再被 emit 了。
於是上面有問題的 code 就可以改寫為:
Observable
.interval(6, TimeUnit.SECONDS)
.startWith(0)
.subscribeOn(Schedulers.io())
.timeoutFirstMessage(5, TimeUnit.SECONDS)
.subscribeBy(onNext = {
Log.i(tag, "onNext $it")
}, onError = {
Log.e(tag, "onError $it")
}, onComplete = {
Log.e(tag, "onComplete")
})
.disposeTo(compositeDisposable)
小結
RxJava 已經發展到使用上非常複雜的階段了,在使用任何 opertator 時還是要確認看懂文件再使用。這篇文章是分享平常工作時遇到的一些小問題,其實也是一開始 iCoding blog 設立的初衷,希望對剛好搜索到的人有幫助。
Image Credit: Pixabay License
Free for commercial use
No attribution required
(https://pixabay.com/photos/timeout-clock-morning-pointer-hour-3373341/)
我是 K,以前幻想著可以做些偉大的事。但發現把一件簡單的事情做到很好就是一件很難又很厲害的事。所以現在正處於一個把簡單的事情做好的狀態。