當(dāng)我們異步執(zhí)行一些復(fù)雜的事情,Java提供了傳統(tǒng)的類,例如Thread、Future、FutureTask、CompletableFuture來處理這些問題。當(dāng)復(fù)雜度提升,這些方案就會變得麻煩和難以維護(hù)。最糟糕的是,它們都不支持鏈?zhǔn)秸{(diào)用。
RxJava Observables被設(shè)計(jì)用來解決這些問題。它們靈活,且易于使用,也可以鏈?zhǔn)秸{(diào)用,并且可以作用于單個結(jié)果程序上,更有甚者,也可以作用于序列上。無論何時你想發(fā)射單個標(biāo)量值,或者一連串值,甚至是無窮個數(shù)值流,你都可以使用Observable。
Observable的生命周期包含了三種可能的易于與Iterable生命周期事件相比較的事件,下表展示了如何將Observable async/push 與 Iterable sync/pull相關(guān)聯(lián)起來。
| Event | Iterable(pull) | Observable(push) |
|---|---|---|
| 檢索數(shù)據(jù) | T next() |
onNext(T) |
| 發(fā)現(xiàn)錯誤 | throws Exception |
onError(Throwable) |
| 完成 | !hasNext() |
onCompleted() |
使用Iterable時,消費(fèi)者從生產(chǎn)者那里以同步的方式得到值,在這些值得到之前線程處于阻塞狀態(tài)。相反,使用Observable時,生產(chǎn)者以異步的方式把值推給觀察者,無論何時,這些值都是可用的。這種方法之所以更靈活是因?yàn)榧幢阒凳峭交虍惒椒绞降竭_(dá),消費(fèi)者在這兩種場景都可以根據(jù)自己的需要來處理。
為了更好地復(fù)用Iterable接口,RxJava Observable類擴(kuò)展了GOF觀察者模式的語義。引入了兩個新的接口:
從發(fā)射物的角度來看,有兩種不同的Observables:熱的和冷的。一個"熱"的Observable典型的只要一創(chuàng)建完就開始發(fā)射數(shù)據(jù),因此所有后續(xù)訂閱它的觀察者可能從序列中間的某個位置開始接受數(shù)據(jù)(有一些數(shù)據(jù)錯過了)。一個"冷"的Observable會一直等待,直到有觀察者訂閱它才開始發(fā)射數(shù)據(jù),因此這個觀察者可以確保會收到整個數(shù)據(jù)序列。
在接下來的小節(jié)中將討論Observables提供的兩種創(chuàng)建Observable的方法。
create()方法使開發(fā)者有能力從頭開始創(chuàng)建一個Observable。它需要一個OnSubscribe對象,這個對象繼承Action1,當(dāng)觀察者訂閱我們的Observable時,它作為一個參數(shù)傳入并執(zhí)行call()函數(shù)。
Observable.create(new Observable.OnSubscribe<Object>(){
@Override
public void call(Subscriber<? super Object> subscriber) {
}
});
Observable通過使用subscriber變量并根據(jù)條件調(diào)用它的方法來和觀察者通信。讓我們看一個“現(xiàn)實(shí)世界”的例子:
Observable<Integer> observableString = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
for (int i = 0; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
});
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
例子故意寫的簡單,是因?yàn)榧幢闶悄愕谝淮我姷絉xJava的操作,我想讓你明白接下來要發(fā)生什么。
我們創(chuàng)建一個新的Observable<Integer>,它執(zhí)行了5個元素的for循環(huán),一個接一個的發(fā)射他們,最后完成。
另一方面,我們訂閱了Observable,返回一個Subscription 。一旦我們訂閱了,我們就開始接受整數(shù),并一個接一個的打印出它們。我們并不知道要接受多少整數(shù)。事實(shí)上,我們也無需知道是因?yàn)槲覀優(yōu)槊糠N場景都提供對應(yīng)的處理操作:
在上一個例子中,我們創(chuàng)建了一個整數(shù)序列并一個一個的發(fā)射它們。假如我們已經(jīng)有一個列表呢?我們是不是可以不用for循環(huán)而也可以一個接一個的發(fā)射它們呢?
在下面的例子代碼中,我們從一個已有的列表中創(chuàng)建一個Observable序列:
List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(100);
items.add(200);
Observable<Integer> observableString = Observable.from(items);
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
輸出的結(jié)果和上面的例子絕對是一樣的。
from()創(chuàng)建符可以從一個列表/數(shù)組來創(chuàng)建Observable,并一個接一個的從列表/數(shù)組中發(fā)射出來每一個對象,或者也可以從Java Future類來創(chuàng)建Observable,并發(fā)射Future對象的.get()方法返回的結(jié)果值。傳入Future作為參數(shù)時,我們可以指定一個超時的值。Observable將等待來自Future的結(jié)果;如果在超時之前仍然沒有結(jié)果返回,Observable將會觸發(fā)onError()方法通知觀察者有錯誤發(fā)生了。
如果我們已經(jīng)有了一個傳統(tǒng)的Java函數(shù),我們想把它轉(zhuǎn)變?yōu)橐粋€Observable又改怎么辦呢?我們可以用create()方法,正如我們先前看到的,或者我們也可以像下面那樣使用以此來省去許多模板代碼:
Observable<String> observableString = Observable.just(helloWorld());
Subscription subscriptionPrint = observableString.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
helloWorld()方法比較簡單,像這樣:
private String helloWorld(){
return "Hello World";
}
不管怎樣,它可以是我們想要的任何函數(shù)。在剛才的例子中,我們一旦創(chuàng)建了Observable,just()執(zhí)行函數(shù),當(dāng)我們訂閱Observable時,它就會發(fā)射出返回的值。
just()方法可以傳入一到九個參數(shù),它們會按照傳入的參數(shù)的順序來發(fā)射它們。just()方法也可以接受列表或數(shù)組,就像from()方法,但是它不會迭代列表發(fā)射每個值,它將會發(fā)射整個列表。通常,當(dāng)我們想發(fā)射一組已經(jīng)定義好的值時會用到它。但是如果我們的函數(shù)不是時變性的,我們可以用just來創(chuàng)建一個更有組織性和可測性的代碼庫。
最后注意just()創(chuàng)建符,它發(fā)射出值后,Observable正常結(jié)束,在上面那個例子中,我們會在控制臺打印出兩條信息:“Hello World”和“Observable completed”。
當(dāng)我們需要一個Observable毫無理由的不再發(fā)射數(shù)據(jù)正常結(jié)束時,我們可以使用empty()。我們可以使用never()創(chuàng)建一個不發(fā)射數(shù)據(jù)并且也永遠(yuǎn)不會結(jié)束的Observable。我們也可以使用throw()創(chuàng)建一個不發(fā)射數(shù)據(jù)并且以錯誤結(jié)束的Observable。