將其它種類的對象和數(shù)據(jù)類型轉(zhuǎn)換為Observable
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/from.c.png" alt="from" />
當你使用Observable時,如果你要處理的數(shù)據(jù)都可以轉(zhuǎn)換成展現(xiàn)為Observables,而不是需要混合使用Observables和其它類型的數(shù)據(jù),會非常方便。這讓你在數(shù)據(jù)流的整個生命周期中,可以使用一組統(tǒng)一的操作符來管理它們。
例如,Iterable可以看成是同步的Observable;Future,可以看成是總是只發(fā)射單個數(shù)據(jù)的Observable。通過顯式地將那些數(shù)據(jù)轉(zhuǎn)換為Observables,你可以像使用Observable一樣與它們交互。
因此,大部分ReactiveX實現(xiàn)都提供了將語言特定的對象和數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為Observables的方法。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/from.png" alt="from" />
在RxJava中,from操作符可以轉(zhuǎn)換Future、Iterable和數(shù)組。對于Iterable和數(shù)組,產(chǎn)生的Observable會發(fā)射Iterable或數(shù)組的每一項數(shù)據(jù)。
示例代碼
Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);
myObservable.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println(item);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable error) {
System.out.println("Error encountered: " + error.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println("Sequence complete");
}
}
);
輸出
0
1
2
3
4
5
Sequence complete
對于Future,它會發(fā)射Future.get()方法返回的單個數(shù)據(jù)。from方法有一個可接受兩個可選參數(shù)的版本,分別指定超時時長和時間單位。如果過了指定的時長Future還沒有返回一個值,這個Observable會發(fā)射錯誤通知并終止。
from默認不在任何特定的調(diào)度器上執(zhí)行。然而你可以將Scheduler作為可選的第二個參數(shù)傳遞給Observable,它會在那個調(diào)度器上管理這個Future。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/fromFunc0.png" alt="from func)" />
此外,在可選包 RxJavaAsyncUtil 中,你還可以用下面這些操作符將actions,callables,functions和runnables轉(zhuǎn)換為發(fā)射這些動作的執(zhí)行結(jié)果的Observable:
在這個頁面 Start 查看關(guān)于這些操作符的更多信息。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/St.from.png" alt="from" />
注意:還有一個可選的StringObservable類中也有一個from方法,它將一個字符流或者一個REader轉(zhuǎn)換為一個發(fā)射字節(jié)數(shù)組或字符串的Observable。
注意:這里與后面start操作符里的runAsync說明重復了
在單獨的RxJavaAsyncUtil包中(默認不包含在RxJava中),還有一個runAsync函數(shù)。傳遞一個Action和一個Scheduler給runAsync,它會返回一個StoppableObservable,這個Observable使用Action產(chǎn)生發(fā)射的數(shù)據(jù)項。
傳遞一個Action和一個Scheduler給runAsync,它返回一個使用這個Action產(chǎn)生數(shù)據(jù)的StoppableObservable。這個Action接受一個Observable和一個Subscription作為參數(shù),它使用Subscription檢查unsubscribed條件,一旦發(fā)現(xiàn)條件為真就立即停止發(fā)射數(shù)據(jù)。在任何時候你都可以使用unsubscribe方法手動停止一個StoppableObservable(這會同時取消訂閱與這個StoppableObservable關(guān)聯(lián)的Subscription)。
由于runAsync會立即調(diào)用Action并開始發(fā)射數(shù)據(jù),在你創(chuàng)建StoppableObservable之后到你的觀察者準備好接受數(shù)據(jù)之前這段時間里,可能會有一部分數(shù)據(jù)會丟失。如果這不符合你的要求,可以使用runAsync的一個變體,它也接受一個Subject參數(shù),傳遞一個ReplaySubject給它,你可以獲取其它丟失的數(shù)據(jù)了。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/St.decode.png" alt="decode" />
StringObservable類不是默認RxJava的一部分,包含一個decode操作符,這個操作符將一個多字節(jié)字符流轉(zhuǎn)換為一個發(fā)射字節(jié)數(shù)組的Observable,這些字節(jié)數(shù)組按照字符的邊界劃分。