我們學(xué)到了如何在一個調(diào)度器上運行一個任務(wù)。但是我們?nèi)绾卫盟鼇砗蚈bservables一起工作呢?RxJava提供了subscribeOn()方法來用于每個Observable對象。subscribeOn()方法用Scheduler來作為參數(shù)并在這個Scheduler上執(zhí)行Observable調(diào)用。
在“真實世界”這個例子中,我們調(diào)整loadList()函數(shù)。首先,我們需要一個新的getApps()方法來檢索已安裝的應(yīng)用列表:
private Observable<AppInfo> getApps() {
return Observable.create(subscriber -> {
List<AppInfo> apps = new ArrayList<>();
SharedPreferences sharedPref = getActivity().getPreferences(Context.MODE_PRIVATE);
Type appInfoType = new TypeToken<List<AppInfo>>(){}.getType();
String serializedApps = sharedPref.getString("APPS", "");
if (!"".equals(serializedApps)) {
apps = new Gson().fromJson(serializedApps,appInfoType);
}
for (AppInfo app : apps) {
subscriber.onNext(app);
}
subscriber.onCompleted();
});
}
getApps()方法返回一個AppInfo的Observable。它先從Android的SharePreferences讀取到已安裝的應(yīng)用程序列表。反序列化,并一個接一個的發(fā)射AppInfo數(shù)據(jù)。使用新的方法來檢索列表,loadList()函數(shù)改成下面這樣:
private void loadList() {
mRecyclerView.setVisibility(View.VISIBLE);
getApps().subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
mSwipeRefreshLayout.setRefreshing(false);
}
@Override
public void onNext(AppInfo appInfo) {
mAddedApps.add(appInfo);
mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
}
});
}
如果我們運行代碼,StrictMode將會報告一個不合規(guī)操作,這是因為SharePreferences會減慢I/O操作。我們所需要做的是指定getApps()需要在調(diào)度器上執(zhí)行:
getApps().subscribeOn(Schedulers.io())
.subscribe(new Observer<AppInfo>() { [...]
Schedulers.io()將會去掉StrictMode的不合規(guī)操作,但是我們的App現(xiàn)在崩潰了是因為:
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.jav a:58)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors. java:422)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutu reTask.access$201(ScheduledThreadPoolExecutor.java:152)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutu reTask.run(ScheduledThreadPoolExecutor.java:265)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolEx ecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolE xecutor.java:587)
at java.lang.Thread.run(Thread.java:841) Caused by:
android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views.
Only the original thread that created a view hierarchy can touch its views.
我們再次回到Android的世界。這條信息簡單的告訴我們我們試圖在一個非UI線程來修改UI操作。意思是我們需要在I/O調(diào)度器上執(zhí)行我們的代碼。因此我們需要和I/O調(diào)度器一起執(zhí)行代碼,但是當(dāng)結(jié)果返回時我們需要在UI線程上操作。RxJava讓你能夠訂閱一個指定的調(diào)度器并觀察它。我們只需在loadList()函數(shù)添加幾行代碼,那么每一項就都準(zhǔn)備好了:
getApps()
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() { [...]
observeOn()方法將會在指定的調(diào)度器上返回結(jié)果:如例子中的UI線程。onBackpressureBuffer()方法將告訴Observable發(fā)射的數(shù)據(jù)如果比觀察者消費的數(shù)據(jù)要更快的話,它必須把它們存儲在緩存中并提供一個合適的時間給它們。做完這些工作之后,如果我們運行App,就會出現(xiàn)已安裝的程序列表:
http://wiki.jikexueyuan.com/project/rxjava/images/chapter7_2.png" alt="" />