當(dāng)我們異步執(zhí)行一些復(fù)雜的事情,Java提供了傳統(tǒng)的類,例如Thread、Future、FutureTask、CompletableFuture來處理這些問題。當(dāng)復(fù)雜度提升,這些方案就會(huì)變得麻煩和難以維護(hù)。最糟糕的是,它們都不支持鏈?zhǔn)秸{(diào)用。
RxJava Observables被設(shè)計(jì)用來解決這些問題。它們靈活,且易于使用,也可以鏈?zhǔn)秸{(diào)用,并且可以作用于單個(gè)結(jié)果程序上,更有甚者,也可以作用于序列上。無論何時(shí)你想發(fā)射單個(gè)標(biāo)量值,或者一連串值,甚至是無窮個(gè)數(shù)值流,你都可以使用Observable。
Observable的生命周期包含了三種可能的易于與Iterable生命周期事件相比較的事件,下表展示了如何將Observable async/push 與 Iterable sync/pull相關(guān)聯(lián)起來。
| Event | Iterable(pull) | Observable(push) |
|---|---|---|
| 檢索數(shù)據(jù) | T next() |
onNext(T) |
| 發(fā)現(xiàn)錯(cuò)誤 | throws Exception |
onError(Throwable) |
| 完成 | !hasNext() |
onCompleted() |
使用Iterable時(shí),消費(fèi)者從生產(chǎn)者那里以同步的方式得到值,在這些值得到之前線程處于阻塞狀態(tài)。相反,使用Observable時(shí),生產(chǎn)者以異步的方式把值推給觀察者,無論何時(shí),這些值都是可用的。這種方法之所以更靈活是因?yàn)榧幢阒凳峭交虍惒椒绞降竭_(dá),消費(fèi)者在這兩種場景都可以根據(jù)自己的需要來處理。
為了更好地復(fù)用Iterable接口,RxJava Observable類擴(kuò)展了GOF觀察者模式的語義。引入了兩個(gè)新的接口:
從發(fā)射物的角度來看,有兩種不同的Observables:熱的和冷的。一個(gè)"熱"的Observable典型的只要一創(chuàng)建完就開始發(fā)射數(shù)據(jù),因此所有后續(xù)訂閱它的觀察者可能從序列中間的某個(gè)位置開始接受數(shù)據(jù)(有一些數(shù)據(jù)錯(cuò)過了)。一個(gè)"冷"的Observable會(huì)一直等待,直到有觀察者訂閱它才開始發(fā)射數(shù)據(jù),因此這個(gè)觀察者可以確保會(huì)收到整個(gè)數(shù)據(jù)序列。
在接下來的小節(jié)中將討論Observables提供的兩種創(chuàng)建Observable的方法。
create()方法使開發(fā)者有能力從頭開始創(chuàng)建一個(gè)Observable。它需要一個(gè)OnSubscribe對(duì)象,這個(gè)對(duì)象繼承Action1,當(dāng)觀察者訂閱我們的Observable時(shí),它作為一個(gè)參數(shù)傳入并執(zhí)行call()函數(shù)。
Observable.create(new Observable.OnSubscribe<Object>(){
@Override
public void call(Subscriber<? super Object> subscriber) {
}
});
Observable通過使用subscriber變量并根據(jù)條件調(diào)用它的方法來和觀察者通信。讓我們看一個(gè)“現(xiàn)實(shí)世界”的例子:
Observable<Integer> observableString = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
for (int i = 0; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
});
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
例子故意寫的簡單,是因?yàn)榧幢闶悄愕谝淮我姷絉xJava的操作,我想讓你明白接下來要發(fā)生什么。
我們創(chuàng)建一個(gè)新的Observable<Integer>,它執(zhí)行了5個(gè)元素的for循環(huán),一個(gè)接一個(gè)的發(fā)射他們,最后完成。
另一方面,我們訂閱了Observable,返回一個(gè)Subscription 。一旦我們訂閱了,我們就開始接受整數(shù),并一個(gè)接一個(gè)的打印出它們。我們并不知道要接受多少整數(shù)。事實(shí)上,我們也無需知道是因?yàn)槲覀優(yōu)槊糠N場景都提供對(duì)應(yīng)的處理操作:
在上一個(gè)例子中,我們創(chuàng)建了一個(gè)整數(shù)序列并一個(gè)一個(gè)的發(fā)射它們。假如我們已經(jīng)有一個(gè)列表呢?我們是不是可以不用for循環(huán)而也可以一個(gè)接一個(gè)的發(fā)射它們呢?
在下面的例子代碼中,我們從一個(gè)已有的列表中創(chuàng)建一個(gè)Observable序列:
List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(100);
items.add(200);
Observable<Integer> observableString = Observable.from(items);
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
輸出的結(jié)果和上面的例子絕對(duì)是一樣的。
from()創(chuàng)建符可以從一個(gè)列表/數(shù)組來創(chuàng)建Observable,并一個(gè)接一個(gè)的從列表/數(shù)組中發(fā)射出來每一個(gè)對(duì)象,或者也可以從Java Future類來創(chuàng)建Observable,并發(fā)射Future對(duì)象的.get()方法返回的結(jié)果值。傳入Future作為參數(shù)時(shí),我們可以指定一個(gè)超時(shí)的值。Observable將等待來自Future的結(jié)果;如果在超時(shí)之前仍然沒有結(jié)果返回,Observable將會(huì)觸發(fā)onError()方法通知觀察者有錯(cuò)誤發(fā)生了。
如果我們已經(jīng)有了一個(gè)傳統(tǒng)的Java函數(shù),我們想把它轉(zhuǎn)變?yōu)橐粋€(gè)Observable又改怎么辦呢?我們可以用create()方法,正如我們先前看到的,或者我們也可以像下面那樣使用以此來省去許多模板代碼:
Observable<String> observableString = Observable.just(helloWorld());
Subscription subscriptionPrint = observableString.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
helloWorld()方法比較簡單,像這樣:
private String helloWorld(){
return "Hello World";
}
不管怎樣,它可以是我們想要的任何函數(shù)。在剛才的例子中,我們一旦創(chuàng)建了Observable,just()執(zhí)行函數(shù),當(dāng)我們訂閱Observable時(shí),它就會(huì)發(fā)射出返回的值。
just()方法可以傳入一到九個(gè)參數(shù),它們會(huì)按照傳入的參數(shù)的順序來發(fā)射它們。just()方法也可以接受列表或數(shù)組,就像from()方法,但是它不會(huì)迭代列表發(fā)射每個(gè)值,它將會(huì)發(fā)射整個(gè)列表。通常,當(dāng)我們想發(fā)射一組已經(jīng)定義好的值時(shí)會(huì)用到它。但是如果我們的函數(shù)不是時(shí)變性的,我們可以用just來創(chuàng)建一個(gè)更有組織性和可測性的代碼庫。
最后注意just()創(chuàng)建符,它發(fā)射出值后,Observable正常結(jié)束,在上面那個(gè)例子中,我們會(huì)在控制臺(tái)打印出兩條信息:“Hello World”和“Observable completed”。
當(dāng)我們需要一個(gè)Observable毫無理由的不再發(fā)射數(shù)據(jù)正常結(jié)束時(shí),我們可以使用empty()。我們可以使用never()創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)并且也永遠(yuǎn)不會(huì)結(jié)束的Observable。我們也可以使用throw()創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)并且以錯(cuò)誤結(jié)束的Observable。