在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ Android/ Buffer
調度器 Scheduler
Empty/Never/Throw
Replay
這個頁面展示了創(chuàng)建Observable的各種方法。
ObserveOn
ReactiveX
TimeInterval
Window
本頁展示的操作符用于對整個序列執(zhí)行算法操作或其它操作,由于這些操作必須等待數據發(fā)射完成(通常也必須緩存這些數據),它們對于非常長
IgnoreElements
Distinct
Last
Start
And/Then/When
Switch
創(chuàng)建操作
Materialize/Dematerialize
CombineLatest
Catch
實現(xiàn)自己的操作符
StringObservable
Map
ConnectableObservable
Using
Take
BlockingObservable
TakeLast
Defer
RxJavaSchedulersHook
First
FlatMap
這個頁面的操作符可用于根據條件發(fā)射或變換Observables,或者對它們做布爾運算:
Do
Repeat
Serialize
這個頁面展示的操作符可用于過濾和選擇Observable發(fā)射的數據序列。
這個頁面列出了很多用于Observable的輔助操作符
Single
Retry
從錯誤中恢復的技術
Sample
Merge
算術和聚合操作
Range
Timestamp
RxJava Issues
From
Subscribe
Subject
Delay
Skip
SubscribeOn
Filter
按字母順序排列的全部操作符列表
Timeout
Scan
onError
Zip
RxJava文檔和教程
Publish
ElementAt
第一個例子
SkipLast
Just
Timer
Debounce
GroupBy
條件和布爾操作
這個頁面展示了可用于對Observable發(fā)射的數據執(zhí)行變換操作的各種操作符。
Introduction
rxjava-async
介紹響應式編程
這個頁面展示的操作符可用于組合多個Observables。
ReactiveX
Connect
操作符分類
StartWith
Interval
Join
To
Buffer
RefCount
介紹
Observable

Buffer

Buffer

定期收集Observable的數據放進一個數據包裹,然后發(fā)射這些數據包裹,而不是一次發(fā)射一個值。

http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer.png" alt="buffer" />

Buffer操作符將一個Observable變換為另一個,原來的Observable正常發(fā)射數據,變換產生的Observable發(fā)射這些數據的緩存集合。Buffer操作符在很多語言特定的實現(xiàn)中有很多種變體,它們在如何緩存這個問題上存在區(qū)別。

注意:如果原來的Observable發(fā)射了一個onError通知,Buffer會立即傳遞這個通知,而不是首先發(fā)射緩存的數據,即使在這之前緩存中包含了原始Observable發(fā)射的數據。

Window操作符與Buffer類似,但是它在發(fā)射之前把收集到的數據放進單獨的Observable,而不是放進一個數據結構。

在RxJava中有許多Buffer的變體:

buffer(count)

http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer3.png" alt="buffer3" />

buffer(count)以列表(List)的形式發(fā)射非重疊的緩存,每一個緩存至多包含來自原始Observable的count項數據(最后發(fā)射的列表數據可能少于count項)

buffer(count, skip)

http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer4.png" alt="buffer4" />

buffer(count,?skip)從原始Observable的第一項數據開始創(chuàng)建新的緩存,此后每當收到skip項數據,用count項數據填充緩存:開頭的一項和后續(xù)的count-1項,它以列表(List)的形式發(fā)射緩存,取決于countskip的值,這些緩存可能會有重疊部分(比如skip < count時),也可能會有間隙(比如skip > count時)。

buffer(bufferClosingSelector)

http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer1.png" alt="buffer1" />

當它訂閱原來的Observable時,buffer(bufferClosingSelector)開始將數據收集到一個List,然后它調用bufferClosingSelector生成第二個Observable,當第二個Observable發(fā)射一個TClosing時,buffer發(fā)射當前的List,然后重復這個過程:開始組裝一個新的List,然后調用bufferClosingSelector創(chuàng)建一個新的Observable并監(jiān)視它。它會一直這樣做直到原來的Observable執(zhí)行完成。

buffer(boundary)

http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer8.png" alt="buffer8" />

buffer(boundary)監(jiān)視一個名叫boundary的Observable,每當這個Observable發(fā)射了一個值,它就創(chuàng)建一個新的List開始收集來自原始Observable的數據并發(fā)射原來的List

buffer(bufferOpenings, bufferClosingSelector)

http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer2.png" alt="buffer2" />

buffer(bufferOpenings,?bufferClosingSelector)監(jiān)視這個叫bufferOpenings的Observable(它發(fā)射BufferOpening對象),每當bufferOpenings發(fā)射了一個數據時,它就創(chuàng)建一個新的List開始收集原始Observable的數據,并將bufferOpenings傳遞給closingSelector函數。這個函數返回一個Observable。buffer監(jiān)視這個Observable,當它檢測到一個來自這個Observable的數據時,就關閉List并且發(fā)射它自己的數據(之前的那個List)。

buffer(timespan, unit[, scheduler])

http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer5.png" alt="buffer5" />

buffer(timespan,?unit)定期以List的形式發(fā)射新的數據,每個時間段,收集來自原始Observable的數據(從前面一個數據包裹之后,或者如果是第一個數據包裹,從有觀察者訂閱原來的Observale之后開始)。還有另一個版本的buffer接受一個Scheduler參數,默認情況下會使用computation調度器。

buffer(timespan, unit, count[, scheduler])

http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer6.png" alt="buffer6" />

每當收到來自原始Observable的count項數據,或者每過了一段指定的時間后,buffer(timespan,?unit,?count)就以List的形式發(fā)射這期間的數據,即使數據項少于count項。還有另一個版本的buffer接受一個Scheduler參數,默認情況下會使用computation調度器。

buffer(timespan, timeshift, unit[, scheduler])

http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer7.png" alt="buffer7" />

buffer(timespan,?timeshift,?unit)在每一個timeshift時期內都創(chuàng)建一個新的List,然后用原始Observable發(fā)射的每一項數據填充這個列表(在把這個List當做自己的數據發(fā)射前,從創(chuàng)建時開始,直到過了timespan這么長的時間)。如果timespan長于timeshift,它發(fā)射的數據包將會重疊,因此可能包含重復的數據項。

還有另一個版本的buffer接受一個Scheduler參數,默認情況下會使用computation調度器。

buffer-backpressure

你可以使用Buffer操作符實現(xiàn)反壓backpressure(意思是,處理這樣一個Observable:它產生數據的速度可能比它的觀察者消費數據的速度快)。

http://wiki.jikexueyuan.com/project/rx-docs/images/operators/bp.buffer2.png" alt="bp.buffer2" />

Buffer操作符可以將大量的數據序列縮減為較少的數據緩存序列,讓它們更容易處理。例如,你可以按固定的時間間隔,定期關閉和發(fā)射來自一個爆發(fā)性Observable的數據緩存。這相當于一個緩沖區(qū)。

示例代碼

Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);

http://wiki.jikexueyuan.com/project/rx-docs/images/operators/bp.buffer1.png" alt="bp.buffer1" />

或者,如果你想更進一步,可以在爆發(fā)期將數據收集到緩存,然后在爆發(fā)期終止時發(fā)射這些數據,使用 Debounce 操作符給buffer操作符發(fā)射一個緩存關閉指示器(buffer closing indicator)可以做到這一點。

代碼示例:


// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);

參見

上一篇:Single下一篇:Publish