定期收集Observable的數據放進一個數據包裹,然后發(fā)射這些數據包裹,而不是一次發(fā)射一個值。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer.png" alt="buffer" />
Buffer操作符將一個Observable變換為另一個,原來的Observable正常發(fā)射數據,變換產生的Observable發(fā)射這些數據的緩存集合。Buffer操作符在很多語言特定的實現(xiàn)中有很多種變體,它們在如何緩存這個問題上存在區(qū)別。
注意:如果原來的Observable發(fā)射了一個onError通知,Buffer會立即傳遞這個通知,而不是首先發(fā)射緩存的數據,即使在這之前緩存中包含了原始Observable發(fā)射的數據。
Window操作符與Buffer類似,但是它在發(fā)射之前把收集到的數據放進單獨的Observable,而不是放進一個數據結構。
在RxJava中有許多Buffer的變體:
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer3.png" alt="buffer3" />
buffer(count)以列表(List)的形式發(fā)射非重疊的緩存,每一個緩存至多包含來自原始Observable的count項數據(最后發(fā)射的列表數據可能少于count項)
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer4.png" alt="buffer4" />
buffer(count,?skip)從原始Observable的第一項數據開始創(chuàng)建新的緩存,此后每當收到skip項數據,用count項數據填充緩存:開頭的一項和后續(xù)的count-1項,它以列表(List)的形式發(fā)射緩存,取決于count和skip的值,這些緩存可能會有重疊部分(比如skip < count時),也可能會有間隙(比如skip > count時)。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer1.png" alt="buffer1" />
當它訂閱原來的Observable時,buffer(bufferClosingSelector)開始將數據收集到一個List,然后它調用bufferClosingSelector生成第二個Observable,當第二個Observable發(fā)射一個TClosing時,buffer發(fā)射當前的List,然后重復這個過程:開始組裝一個新的List,然后調用bufferClosingSelector創(chuàng)建一個新的Observable并監(jiān)視它。它會一直這樣做直到原來的Observable執(zhí)行完成。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer8.png" alt="buffer8" />
buffer(boundary)監(jiān)視一個名叫boundary的Observable,每當這個Observable發(fā)射了一個值,它就創(chuàng)建一個新的List開始收集來自原始Observable的數據并發(fā)射原來的List。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer2.png" alt="buffer2" />
buffer(bufferOpenings,?bufferClosingSelector)監(jiān)視這個叫bufferOpenings的Observable(它發(fā)射BufferOpening對象),每當bufferOpenings發(fā)射了一個數據時,它就創(chuàng)建一個新的List開始收集原始Observable的數據,并將bufferOpenings傳遞給closingSelector函數。這個函數返回一個Observable。buffer監(jiān)視這個Observable,當它檢測到一個來自這個Observable的數據時,就關閉List并且發(fā)射它自己的數據(之前的那個List)。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer5.png" alt="buffer5" />
buffer(timespan,?unit)定期以List的形式發(fā)射新的數據,每個時間段,收集來自原始Observable的數據(從前面一個數據包裹之后,或者如果是第一個數據包裹,從有觀察者訂閱原來的Observale之后開始)。還有另一個版本的buffer接受一個Scheduler參數,默認情況下會使用computation調度器。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer6.png" alt="buffer6" />
每當收到來自原始Observable的count項數據,或者每過了一段指定的時間后,buffer(timespan,?unit,?count)就以List的形式發(fā)射這期間的數據,即使數據項少于count項。還有另一個版本的buffer接受一個Scheduler參數,默認情況下會使用computation調度器。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer7.png" alt="buffer7" />
buffer(timespan,?timeshift,?unit)在每一個timeshift時期內都創(chuàng)建一個新的List,然后用原始Observable發(fā)射的每一項數據填充這個列表(在把這個List當做自己的數據發(fā)射前,從創(chuàng)建時開始,直到過了timespan這么長的時間)。如果timespan長于timeshift,它發(fā)射的數據包將會重疊,因此可能包含重復的數據項。
還有另一個版本的buffer接受一個Scheduler參數,默認情況下會使用computation調度器。
你可以使用Buffer操作符實現(xiàn)反壓backpressure(意思是,處理這樣一個Observable:它產生數據的速度可能比它的觀察者消費數據的速度快)。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/bp.buffer2.png" alt="bp.buffer2" />
Buffer操作符可以將大量的數據序列縮減為較少的數據緩存序列,讓它們更容易處理。例如,你可以按固定的時間間隔,定期關閉和發(fā)射來自一個爆發(fā)性Observable的數據緩存。這相當于一個緩沖區(qū)。
示例代碼
Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/bp.buffer1.png" alt="bp.buffer1" />
或者,如果你想更進一步,可以在爆發(fā)期將數據收集到緩存,然后在爆發(fā)期終止時發(fā)射這些數據,使用 Debounce 操作符給buffer操作符發(fā)射一個緩存關閉指示器(buffer closing indicator)可以做到這一點。
代碼示例:
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);