你可以在這里找到JVM平臺幾種語言的例子 language adaptor:
下面的示例從一個字符串列表創(chuàng)建一個Observable,然后使用一個方法訂閱這個Observable。
public static void hello(String... names) {
Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Hello " + s + "!");
}
});
}
hello("Ben", "George");
Hello Ben!
Hello George!
def hello(String[] names) {
Observable.from(names).subscribe { println "Hello ${it}!" }
}
hello("Ben", "George")
Hello Ben!
Hello George!
(defn hello
[&rest]
(-> (Observable/from &rest)
(.subscribe #(println (str "Hello " % "!")))))
(hello ["Ben" "George"])
Hello Ben!
Hello George!
import rx.lang.scala.Observable
def hello(names: String*) {
Observable.from(names) subscribe { n =>
println(s"Hello $n!")
}
}
hello("Ben", "George")
Hello Ben!
Hello George!
要使用RxJava,首先你需要創(chuàng)建Observable(它們發(fā)射數(shù)據(jù)序列),使用Observable操作符變換那些Observables,獲取嚴(yán)格符合你要求的數(shù)據(jù),然后觀察并處理對這些數(shù)據(jù)序列(通過實(shí)現(xiàn)觀察者或訂閱者,然后訂閱變換后的Observable)。
要創(chuàng)建Observable,你可以手動實(shí)現(xiàn)Observable的行為,也可以傳遞一個函數(shù)給create(?),還可以使用這些 創(chuàng)建操作符 將一個已有的數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為Observable。
你可以使用just(?) 和from(?) 方法將對象,列表,對象屬性轉(zhuǎn)換為發(fā)射那些對象的Observable:
Observable<String> o = Observable.from("a", "b", "c");
def list = [5, 6, 7, 8]
Observable<Integer> o = Observable.from(list);
Observable<String> o = Observable.just("one object");
轉(zhuǎn)換后的Observable每發(fā)射一項(xiàng)數(shù)據(jù),會同步地調(diào)用任何訂閱者的onNext()方法,最后會調(diào)用訂閱者的onCompleted()方法。
create(?)創(chuàng)建一個Observable使用 create(?) 方法,你可以創(chuàng)建你自己的Observable,可以實(shí)現(xiàn)異步I/O,計(jì)算操作,甚至是無限的數(shù)據(jù)流。
/**
* 這個例子展示了一個自定義的Observable,當(dāng)有訂閱時(shí)他會阻塞當(dāng)前線程。
*/
def customObservableBlocking() {
return Observable.create { aSubscriber ->
50.times { i ->
if (!aSubscriber.unsubscribed) {
aSubscriber.onNext("value_${i}")
}
}
// after sending all values we complete the sequence
if (!aSubscriber.unsubscribed) {
aSubscriber.onCompleted()
}
}
}
// To see output:
customObservableBlocking().subscribe { println(it) }
The following example uses Groovy to create an Observable that emits 75 strings.
下面的例子使用Groovy創(chuàng)建了一個發(fā)射75個字符串的Observable。
為了讓它更清楚,例子很詳細(xì),使用靜態(tài)類型和匿名內(nèi)部類Func1:
/**
* This example shows a custom Observable that does not block
* when subscribed to as it spawns a separate thread.
*/
def customObservableNonBlocking() {
return Observable.create({ subscriber ->
Thread.start {
for (i in 0..<75) {
if (subscriber.unsubscribed) {
return
}
subscriber.onNext("value_${i}")
}
// after sending all values we complete the sequence
if (!subscriber.unsubscribed) {
subscriber.onCompleted()
}
}
} as Observable.OnSubscribe)
}
// To see output:
customObservableNonBlocking().subscribe { println(it) }
這是一個用Clojure寫的例子,使用Future(而不是直接用線程),實(shí)現(xiàn)很簡潔:
(defn customObservableNonBlocking []
"This example shows a custom Observable that does not block
when subscribed to as it spawns a separate thread.
returns Observable<String>"
(Observable/create
(fn [subscriber]
(let [f (future
(doseq [x (range 50)] (-> subscriber (.onNext (str "value_" x))))
; after sending all values we complete the sequence
(-> subscriber .onCompleted))
))
))
; To see output
(.subscribe (customObservableNonBlocking) #(println %))
這個例子從維基百科網(wǎng)站抓取文章,每抓取一篇會調(diào)用一次onNext:
(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
"Fetch a list of Wikipedia articles asynchronously.
return Observable<String> of HTML"
(Observable/create
(fn [subscriber]
(let [f (future
(doseq [articleName wikipediaArticleNames]
(-> subscriber (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
; after sending response to onnext we complete the sequence
(-> subscriber .onCompleted))
))))
(-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"])
(.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")))
回到Groovy,同樣是從維基百科抓取文章,這兒使用閉包代替匿名內(nèi)部類:
/*
* Fetch a list of Wikipedia articles asynchronously.
*/
def fetchWikipediaArticleAsynchronously(String... wikipediaArticleNames) {
return Observable.create { subscriber ->
Thread.start {
for (articleName in wikipediaArticleNames) {
if (subscriber.unsubscribed) {
return
}
subscriber.onNext(new URL("http://en.wikipedia.org/wiki/${articleName}").text)
}
if (!subscriber.unsubscribed) {
subscriber.onCompleted()
}
}
return subscriber
}
}
fetchWikipediaArticleAsynchronously("Tiger", "Elephant")
.subscribe { println "--- Article ---\n${it.substring(0, 125)}" }
結(jié)果:
--- Article ---
<!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Tiger - Wikipedia, the free encyclopedia</title> ...
--- Article ---
<!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Elephant - Wikipedia, the free encyclopedia</tit ...
Note that all of the above examples ignore error handling, for brevity. See below for examples that include error handling.
More information can be found on the [[Observable]] and [[Creating Observables|Creating-Observables]] pages.
注意:為了簡潔,上面的所有例子都忽略了錯誤處理,查看下面包含錯誤處理的例子。
更多的信息可以在這里找到:Observable 和 Creating Observables。
RxJava讓你可以鏈?zhǔn)绞褂?code>操作符用來轉(zhuǎn)換和組合多個Observables。
The following example, in Groovy, uses a previously defined, asynchronous Observable that emits 75 items, skips over the first 10 of these (skip(10)), then takes the next 5 (take(5)), and transforms them (map(...)) before subscribing and printing the items:
下面是一個Groovy的例子,使用之前的定義,它會異步發(fā)射75個字符串,跳過最開始的10個((skip(10)),然后獲取接下來的5個(take(5)),在訂閱之前使用map()轉(zhuǎn)換它們,然后打印結(jié)果字符串。
/**
* Asynchronously calls 'customObservableNonBlocking' and defines
* a chain of operators to apply to the callback sequence.
*/
def simpleComposition() {
customObservableNonBlocking().skip(10).take(5)
.map({ stringValue -> return stringValue + "_xform"})
.subscribe({ println "onNext => " + it})
}
輸出結(jié)果
onNext => value_10_xform
onNext => value_11_xform
onNext => value_12_xform
onNext => value_13_xform
onNext => value_14_xform
這里有一個圖例解釋了轉(zhuǎn)換過程:
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/Composition.1.png" width="640" height="536" />這一個例子使用Clojure,使用了三個異步的Observable,其中一個依賴另一個,使用zip組合這三個發(fā)射的數(shù)據(jù)項(xiàng)為一個單個數(shù)據(jù)項(xiàng),最后使用map()轉(zhuǎn)換這個結(jié)果:
(defn getVideoForUser [userId videoId]
"Get video metadata for a given userId
- video metadata
- video bookmark position
- user data
return Observable<Map>"
(let [user-observable (-> (getUser userId)
(.map (fn [user] {:user-name (:name user) :language (:preferred-language user)})))
bookmark-observable (-> (getVideoBookmark userId videoId)
(.map (fn [bookmark] {:viewed-position (:position bookmark)})))
; getVideoMetadata requires :language from user-observable so nest inside map function
video-metadata-observable (-> user-observable
(.mapMany
; fetch metadata after a response from user-observable is received
(fn [user-map]
(getVideoMetadata videoId (:language user-map)))))]
; now combine 3 observables using zip
(-> (Observable/zip bookmark-observable video-metadata-observable user-observable
(fn [bookmark-map metadata-map user-map]
{:bookmark-map bookmark-map
:metadata-map metadata-map
:user-map user-map}))
; and transform into a single response object
(.map (fn [data]
{:video-id videoId
:video-metadata (:metadata-map data)
:user-id userId
:language (:language (:user-map data))
:bookmark (:viewed-position (:bookmark-map data))
})))))
輸出是這樣的:
{:video-id 78965,
:video-metadata {:video-id 78965, :title House of Cards: Episode 1,
:director David Fincher, :duration 3365},
:user-id 12345, :language es-us, :bookmark 0}
這里有一個圖例解釋了這個過程:
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/Composition.2.png" width="640" height="742" />The following example, in Groovy, comes from Ben Christensen’s QCon presentation on the evolution of the Netflix API. It combines two Observables with the merge operator, then uses the reduce operator to construct a single item out of the resulting sequence, then transforms that item with map before emitting it:
下面的例子使用Groovy,來自這里 Ben Christensen’s QCon presentation on the evolution of the Netflix API,它使用merge操作結(jié)合兩個Observables,使用reduce操作符從結(jié)果序列構(gòu)建一個單獨(dú)的結(jié)果數(shù)據(jù)項(xiàng),然后在發(fā)射之前,使用map()變換那個結(jié)果。
public Observable getVideoSummary(APIVideo video) {
def seed = [id:video.id, title:video.getTitle()];
def bookmarkObservable = getBookmark(video);
def artworkObservable = getArtworkImageUrl(video);
return( Observable.merge(bookmarkObservable, artworkObservable)
.reduce(seed, { aggregate, current -> aggregate << current })
.map({ [(video.id.toString() : it] }))
}
這里也有一個圖例解釋reduce從多個Observable的結(jié)果構(gòu)建一個單一結(jié)構(gòu)的過程:
這里是另一個版本的維基百科的例子,包含錯誤處理代碼:
/*
* Fetch a list of Wikipedia articles asynchronously, with error handling.
*/
def fetchWikipediaArticleAsynchronouslyWithErrorHandling(String... wikipediaArticleNames) {
return Observable.create({ subscriber ->
Thread.start {
try {
for (articleName in wikipediaArticleNames) {
if (true == subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(new URL("http://en.wikipedia.org/wiki/"+articleName).getText());
}
if (false == subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
} catch(Throwable t) {
if (false == subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
}
return (subscriber);
}
});
}
下面的例子使用Groovy,注意錯誤發(fā)生時(shí)現(xiàn)在是如何調(diào)用onError(Throwable t)的,下面的代碼傳遞給subscribe()第二個方法用戶處理onError通知:
fetchWikipediaArticleAsynchronouslyWithErrorHandling("Tiger", "NonExistentTitle", "Elephant")
.subscribe(
{ println "--- Article ---\n" + it.substring(0, 125) },
{ println "--- Error ---\n" + it.getMessage() })
查看 錯誤處理操作符 這一夜了解更多RxJava中的錯誤處理技術(shù),包括使用 onErrorResumeNext()和onErrorReturn()等方法,它們讓你可以從錯誤中恢復(fù)。
這里是一個Groovy的例子:
myModifiedObservable = myObservable.onErrorResumeNext({ t ->
Throwable myThrowable = myCustomizedThrowableCreator(t);
return (Observable.error(myThrowable));
});