用RxJava快速獲取海量數(shù)據(jù)
試想,需要一些動(dòng)態(tài)數(shù)據(jù)的時(shí)候,只要每次都請(qǐng)求網(wǎng)絡(luò)就可以了。但是,更有效率的做法是,把聯(lián)網(wǎng)得到的數(shù)據(jù),緩存到磁盤(pán)或內(nèi)存。
具體的說(shuō),計(jì)劃如下:
-
偶爾的聯(lián)網(wǎng)操作,只為獲取***數(shù)據(jù)。
-
盡可能快的讀取到數(shù)據(jù)(通過(guò)獲取之前緩存的網(wǎng)絡(luò)數(shù)據(jù))。
我將通過(guò)使用 RxJava,來(lái)實(shí)現(xiàn)這個(gè)計(jì)劃。
基本模式
為每一個(gè)數(shù)據(jù)源(網(wǎng)絡(luò),磁盤(pán)和內(nèi)存)創(chuàng)建Observable<Data>,使用concat()和first()操作符,構(gòu)造一個(gè)簡(jiǎn)單的實(shí)現(xiàn)方式。
concat()操作符持有多個(gè)Observable對(duì)象,并將它們按順序串聯(lián)成隊(duì)列。 first()操作符只從串聯(lián)隊(duì)列中取出并發(fā)送***個(gè)事件。因此,如果使用concat().first(),無(wú)論多少個(gè)數(shù)據(jù)源,只有***個(gè)事件會(huì)被檢索出并發(fā)送。
- // Our sources (left as an exercise for the reader)
- Observable<Data> memory = ...;
- Observable<Data> disk = ...;
- Observable<Data> network = ...;
- // Retrieve the first source with data
- Observable<Data> source = Observable
- .concat(memory, disk, network)
- .first();
這種模式的關(guān)鍵在于concat()操作符只有需要數(shù)據(jù)的時(shí)候才會(huì)訂閱所有的Observable數(shù)據(jù)源。由于first()操作符會(huì)較早的停止檢索隊(duì)列,所以,如果存在緩存數(shù)據(jù),就沒(méi)有必要訪(fǎng)問(wèn)較慢的數(shù)據(jù)源。 也就是說(shuō),如果memory返回結(jié)果,就不必?fù)?dān)心disk和network會(huì)被訪(fǎng)問(wèn)。相反地,如果內(nèi)存和磁盤(pán)都沒(méi)有數(shù)據(jù),才執(zhí)行網(wǎng)絡(luò)請(qǐng)求。
注意concat()所持有的Observable數(shù)據(jù)源,是按照一個(gè)接一個(gè)的順序被檢索的。
持久化數(shù)據(jù)
很明顯,下一步是緩存數(shù)據(jù)。如果不把網(wǎng)絡(luò)請(qǐng)求后的結(jié)果緩存到磁盤(pán),磁盤(pán)訪(fǎng)問(wèn)后的結(jié)果緩存到內(nèi)存,那么這根本不就不叫緩存。接下來(lái)要寫(xiě)的代碼就是,網(wǎng)絡(luò)數(shù)據(jù)的持久化操作。
我的解決方案是,讓每個(gè)數(shù)據(jù)源在發(fā)送完事件后,都保存或者緩存數(shù)據(jù)。
- Observable<Data> networkWithSave = network.doOnNext(new Action1<Data>() {
- @Override public void call(Data data) {
- saveToDisk(data);
- cacheInMemory(data);
- }
- });
- Observable<Data> diskWithCache = disk.doOnNext(new Action1<Data>() {
- @Override public void call(Data data) {
- cacheInMemory(data);
- }
- });
現(xiàn)在,如果你使用networkWithSave和diskWithCache,數(shù)據(jù)將會(huì)在加載后自動(dòng)保存。
(這個(gè)策略的另一個(gè)優(yōu)勢(shì)在于networkWithSave和diskWithCache可以在任何地方被使用,不局限于我們的多數(shù)據(jù)模式下。)
陳舊的數(shù)據(jù)
不幸的,現(xiàn)在我們保存數(shù)據(jù)的那些代碼,執(zhí)行的有點(diǎn)過(guò)頭了。無(wú)論數(shù)據(jù)是否過(guò)時(shí),它總是返回相同的數(shù)據(jù)。我們希望做到,偶爾連接服務(wù)器抓取***的數(shù)據(jù)。
解決方法在于,使用first()操作符進(jìn)行過(guò)濾。就是設(shè)置它拒絕接收毫無(wú)價(jià)值的數(shù)據(jù)。
- Observable<Data> source = Observable
- .concat(memory, diskWithCache, networkWithSave)
- .first(new Func1<Data, Boolean>() {
- @Override public Boolean call(Data data) {
- return data.isUpToDate();
- }
- });
現(xiàn)在,我們只需要發(fā)送被斷定為***數(shù)據(jù)的事件就OK了。因此,只要有一個(gè)數(shù)據(jù)源的數(shù)據(jù)過(guò)期,就繼續(xù)檢索下一個(gè)數(shù)據(jù)源,直到找到***數(shù)據(jù)為止。
first()和takeFirst()操作符的比較
對(duì)于這種設(shè)計(jì)模式,first()和takeFirst()操作符可以二選其一。
兩種調(diào)用方式的區(qū)別在于,如果所有數(shù)據(jù)源的數(shù)據(jù)均過(guò)期,沒(méi)有任何的有效數(shù)據(jù)作為事件發(fā)送,first()會(huì)拋出NoSuchElementException異常(譯者注:first()操作符均 return false),而takeFirst()操作符則直接調(diào)用完成操作,不會(huì)拋出任何異常。
使用哪個(gè)操作符,完全取決于是否需要明確處理缺失的數(shù)據(jù)。
代碼示例
可以從這里檢出,以上所有代碼的實(shí)現(xiàn)示例:https://github.com/dlew/rxjava-multiple-sources-sample。
如果需要一個(gè)真實(shí)示例,檢出 Gfycat App,它在獲取數(shù)據(jù)的時(shí)候使用了這種模式。項(xiàng)目并沒(méi)有使用以上展示的所有功能(因?yàn)椴恍枰?,但是,示范了concat().first()的基本用法。