ReactiveX的每種編程語言的實(shí)現(xiàn)都實(shí)現(xiàn)了一組操作符的集合。不同的實(shí)現(xiàn)之間有很多重疊的部分,也有一些操作符只存在特定的實(shí)現(xiàn)中。每種實(shí)現(xiàn)都傾向于用那種編程語言中他們熟悉的上下文中相似的方法給這些操作符命名。
本文首先會給出ReactiveX的核心操作符列表和對應(yīng)的文檔鏈接,后面還有一個決策樹用于幫助你根據(jù)具體的場景選擇合適的操作符。最后有一個語言特定實(shí)現(xiàn)的按字母排序的操作符列表。
如果你想實(shí)現(xiàn)你自己的操作符,可以參考這里:實(shí)現(xiàn)自定義操作符
用于創(chuàng)建Observable的操作符
Create — 通過調(diào)用觀察者的方法從頭創(chuàng)建一個ObservableDefer — 在觀察者訂閱之前不創(chuàng)建這個Observable,為每一個觀察者創(chuàng)建一個新的ObservableEmpty/Never/Throw — 創(chuàng)建行為受限的特殊ObservableFrom — 將其它的對象或數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為ObservableInterval — 創(chuàng)建一個定時發(fā)射整數(shù)序列的ObservableJust — 將對象或者對象集合轉(zhuǎn)換為一個會發(fā)射這些對象的ObservableRange — 創(chuàng)建發(fā)射指定范圍的整數(shù)序列的ObservableRepeat — 創(chuàng)建重復(fù)發(fā)射特定的數(shù)據(jù)或數(shù)據(jù)序列的ObservableStart — 創(chuàng)建發(fā)射一個函數(shù)的返回值的ObservableTimer — 創(chuàng)建在一個指定的延遲之后發(fā)射單個數(shù)據(jù)的Observable這些操作符可用于對Observable發(fā)射的數(shù)據(jù)進(jìn)行變換,詳細(xì)解釋可以看每個操作符的文檔
Buffer — 緩存,可以簡單的理解為緩存,它定期從Observable收集數(shù)據(jù)到一個集合,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個FlatMap — 扁平映射,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個單獨(dú)的Observable,可以認(rèn)為是一個將嵌套的數(shù)據(jù)結(jié)構(gòu)展開的過程。GroupBy — 分組,將原來的Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個Observable發(fā)射一組不同的數(shù)據(jù)Map — 映射,通過對序列的每一項(xiàng)都應(yīng)用一個函數(shù)變換Observable發(fā)射的數(shù)據(jù),實(shí)質(zhì)是對序列中的每一項(xiàng)執(zhí)行一個函數(shù),函數(shù)的參數(shù)就是這個數(shù)據(jù)項(xiàng)Scan — 掃描,對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個函數(shù),然后按順序依次發(fā)射這些值Window — 窗口,定期將來自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口,而不是每次發(fā)射一項(xiàng)。類似于Buffer,但Buffer發(fā)射的是數(shù)據(jù),Window發(fā)射的是Observable,每一個Observable發(fā)射原始Observable的數(shù)據(jù)的一個子集這些操作符用于從Observable發(fā)射的數(shù)據(jù)中進(jìn)行選擇
Debounce — 只有在空閑了一段時間后才發(fā)射數(shù)據(jù),通俗的說,就是如果一段時間沒有操作,就執(zhí)行一次操作Distinct — 去重,過濾掉重復(fù)數(shù)據(jù)項(xiàng)ElementAt — 取值,取特定位置的數(shù)據(jù)項(xiàng)Filter — 過濾,過濾掉沒有通過謂詞測試的數(shù)據(jù)項(xiàng),只發(fā)射通過測試的First — 首項(xiàng),只發(fā)射滿足條件的第一條數(shù)據(jù)IgnoreElements — 忽略所有的數(shù)據(jù),只保留終止通知(onError或onCompleted)Last — 末項(xiàng),只發(fā)射最后一條數(shù)據(jù)Sample — 取樣,定期發(fā)射最新的數(shù)據(jù),等于是數(shù)據(jù)抽樣,有的實(shí)現(xiàn)里叫ThrottleFirstSkip — 跳過前面的若干項(xiàng)數(shù)據(jù)SkipLast — 跳過后面的若干項(xiàng)數(shù)據(jù)Take — 只保留前面的若干項(xiàng)數(shù)據(jù)TakeLast — 只保留后面的若干項(xiàng)數(shù)據(jù)組合操作符用于將多個Observable組合成一個單一的Observable
And/Then/When — 通過模式(And條件)和計(jì)劃(Then次序)組合兩個或多個Observable發(fā)射的數(shù)據(jù)集CombineLatest — 當(dāng)兩個Observables中的任何一個發(fā)射了一個數(shù)據(jù)時,通過一個指定的函數(shù)組合每個Observable發(fā)射的最新數(shù)據(jù)(一共兩個數(shù)據(jù)),然后發(fā)射這個函數(shù)的結(jié)果Join — 無論何時,如果一個Observable發(fā)射了一個數(shù)據(jù)項(xiàng),只要在另一個Observable發(fā)射的數(shù)據(jù)項(xiàng)定義的時間窗口內(nèi),就將兩個Observable發(fā)射的數(shù)據(jù)合并發(fā)射Merge — 將兩個Observable發(fā)射的數(shù)據(jù)組合并成一個StartWith — 在發(fā)射原來的Observable的數(shù)據(jù)序列之前,先發(fā)射一個指定的數(shù)據(jù)序列或數(shù)據(jù)項(xiàng)Switch — 將一個發(fā)射Observable序列的Observable轉(zhuǎn)換為這樣一個Observable:它逐個發(fā)射那些Observable最近發(fā)射的數(shù)據(jù)Zip — 打包,使用一個指定的函數(shù)將多個Observable發(fā)射的數(shù)據(jù)組合在一起,然后將這個函數(shù)的結(jié)果作為單項(xiàng)數(shù)據(jù)發(fā)射這些操作符用于從錯誤通知中恢復(fù)
Catch — 捕獲,繼續(xù)序列操作,將錯誤替換為正常的數(shù)據(jù),從onError通知中恢復(fù)Retry — 重試,如果Observable發(fā)射了一個錯誤通知,重新訂閱它,期待它正常終止一組用于處理Observable的操作符
Delay — 延遲一段時間發(fā)射結(jié)果數(shù)據(jù)Do — 注冊一個動作占用一些Observable的生命周期事件,相當(dāng)于Mock某個操作Materialize/Dematerialize — 將發(fā)射的數(shù)據(jù)和通知都當(dāng)做數(shù)據(jù)發(fā)射,或者反過來ObserveOn — 指定觀察者觀察Observable的調(diào)度程序(工作線程)Serialize — 強(qiáng)制Observable按次序發(fā)射數(shù)據(jù)并且功能是有效的Subscribe — 收到Observable發(fā)射的數(shù)據(jù)和通知后執(zhí)行的操作SubscribeOn — 指定Observable應(yīng)該在哪個調(diào)度程序上執(zhí)行TimeInterval — 將一個Observable轉(zhuǎn)換為發(fā)射兩個數(shù)據(jù)之間所耗費(fèi)時間的ObservableTimeout — 添加超時機(jī)制,如果過了指定的一段時間沒有發(fā)射數(shù)據(jù),就發(fā)射一個錯誤通知Timestamp — 給Observable發(fā)射的每個數(shù)據(jù)項(xiàng)添加一個時間戳Using — 創(chuàng)建一個只在Observable的生命周期內(nèi)存在的一次性資源這些操作符可用于單個或多個數(shù)據(jù)項(xiàng),也可用于Observable
All — 判斷Observable發(fā)射的所有的數(shù)據(jù)項(xiàng)是否都滿足某個條件Amb — 給定多個Observable,只讓第一個發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù)Contains — 判斷Observable是否會發(fā)射一個指定的數(shù)據(jù)項(xiàng)DefaultIfEmpty — 發(fā)射來自原始Observable的數(shù)據(jù),如果原始Observable沒有發(fā)射數(shù)據(jù),就發(fā)射一個默認(rèn)數(shù)據(jù)SequenceEqual — 判斷兩個Observable是否按相同的數(shù)據(jù)序列SkipUntil — 丟棄原始Observable發(fā)射的數(shù)據(jù),直到第二個Observable發(fā)射了一個數(shù)據(jù),然后發(fā)射原始Observable的剩余數(shù)據(jù)SkipWhile — 丟棄原始Observable發(fā)射的數(shù)據(jù),直到一個特定的條件為假,然后發(fā)射原始Observable剩余的數(shù)據(jù)TakeUntil — 發(fā)射來自原始Observable的數(shù)據(jù),直到第二個Observable發(fā)射了一個數(shù)據(jù)或一個通知TakeWhile — 發(fā)射原始Observable的數(shù)據(jù),直到一個特定的條件為真,然后跳過剩余的數(shù)據(jù)這些操作符可用于整個數(shù)據(jù)序列
Average — 計(jì)算Observable發(fā)射的數(shù)據(jù)序列的平均值,然后發(fā)射這個結(jié)果Concat — 不交錯的連接多個Observable的數(shù)據(jù)Count — 計(jì)算Observable發(fā)射的數(shù)據(jù)個數(shù),然后發(fā)射這個結(jié)果Max — 計(jì)算并發(fā)射數(shù)據(jù)序列的最大值Min — 計(jì)算并發(fā)射數(shù)據(jù)序列的最小值Reduce — 按順序?qū)?shù)據(jù)序列的每一個應(yīng)用某個函數(shù),然后返回這個值Sum — 計(jì)算并發(fā)射數(shù)據(jù)序列的和一些有精確可控的訂閱行為的特殊Observable
Connect — 指示一個可連接的Observable開始發(fā)射數(shù)據(jù)給訂閱者Publish — 將一個普通的Observable轉(zhuǎn)換為可連接的RefCount — 使一個可連接的Observable表現(xiàn)得像一個普通的ObservableReplay — 確保所有的觀察者收到同樣的數(shù)據(jù)序列,即使他們在Observable開始發(fā)射數(shù)據(jù)之后才訂閱幾種主要的需求