我們學(xué)到了如何在一個(gè)調(diào)度器上運(yùn)行一個(gè)任務(wù)。但是我們?nèi)绾卫盟鼇砗蚈bservables一起工作呢?RxJava提供了subscribeOn()方法來用于每個(gè)Observable對象。subscribeOn()方法用Scheduler來作為參數(shù)并在這個(gè)Scheduler上執(zhí)行Observable調(diào)用。
在“真實(shí)世界”這個(gè)例子中,我們調(diào)整loadList()函數(shù)。首先,我們需要一個(gè)新的getApps()方法來檢索已安裝的應(yīng)用列表:
private Observable<AppInfo> getApps() {
return Observable.create(subscriber -> {
List<AppInfo> apps = new ArrayList<>();
SharedPreferences sharedPref = getActivity().getPreferences(Context.MODE_PRIVATE);
Type appInfoType = new TypeToken<List<AppInfo>>(){}.getType();
String serializedApps = sharedPref.getString("APPS", "");
if (!"".equals(serializedApps)) {
apps = new Gson().fromJson(serializedApps,appInfoType);
}
for (AppInfo app : apps) {
subscriber.onNext(app);
}
subscriber.onCompleted();
});
}
getApps()方法返回一個(gè)AppInfo的Observable。它先從Android的SharePreferences讀取到已安裝的應(yīng)用程序列表。反序列化,并一個(gè)接一個(gè)的發(fā)射AppInfo數(shù)據(jù)。使用新的方法來檢索列表,loadList()函數(shù)改成下面這樣:
private void loadList() {
mRecyclerView.setVisibility(View.VISIBLE);
getApps().subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
mSwipeRefreshLayout.setRefreshing(false);
}
@Override
public void onNext(AppInfo appInfo) {
mAddedApps.add(appInfo);
mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
}
});
}
如果我們運(yùn)行代碼,StrictMode將會(huì)報(bào)告一個(gè)不合規(guī)操作,這是因?yàn)?code>SharePreferences會(huì)減慢I/O操作。我們所需要做的是指定getApps()需要在調(diào)度器上執(zhí)行:
getApps().subscribeOn(Schedulers.io())
.subscribe(new Observer<AppInfo>() { [...]
Schedulers.io()將會(huì)去掉StrictMode的不合規(guī)操作,但是我們的App現(xiàn)在崩潰了是因?yàn)椋?/p>
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.jav a:58)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors. java:422)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutu reTask.access$201(ScheduledThreadPoolExecutor.java:152)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutu reTask.run(ScheduledThreadPoolExecutor.java:265)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolEx ecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolE xecutor.java:587)
at java.lang.Thread.run(Thread.java:841) Caused by:
android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views.
Only the original thread that created a view hierarchy can touch its views.
我們再次回到Android的世界。這條信息簡單的告訴我們我們試圖在一個(gè)非UI線程來修改UI操作。意思是我們需要在I/O調(diào)度器上執(zhí)行我們的代碼。因此我們需要和I/O調(diào)度器一起執(zhí)行代碼,但是當(dāng)結(jié)果返回時(shí)我們需要在UI線程上操作。RxJava讓你能夠訂閱一個(gè)指定的調(diào)度器并觀察它。我們只需在loadList()函數(shù)添加幾行代碼,那么每一項(xiàng)就都準(zhǔn)備好了:
getApps()
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() { [...]
observeOn()方法將會(huì)在指定的調(diào)度器上返回結(jié)果:如例子中的UI線程。onBackpressureBuffer()方法將告訴Observable發(fā)射的數(shù)據(jù)如果比觀察者消費(fèi)的數(shù)據(jù)要更快的話,它必須把它們存儲在緩存中并提供一個(gè)合適的時(shí)間給它們。做完這些工作之后,如果我們運(yùn)行App,就會(huì)出現(xiàn)已安裝的程序列表:
http://wiki.jikexueyuan.com/project/rxjava/images/chapter7_2.png" alt="" />