Subject可以看成是一個(gè)橋梁或者代理,在某些ReactiveX實(shí)現(xiàn)中(如RxJava),它同時(shí)充當(dāng)了Observer和Observable的角色。因?yàn)樗且粋€(gè)Observer,它可以訂閱一個(gè)或多個(gè)Observable;又因?yàn)樗且粋€(gè)Observable,它可以轉(zhuǎn)發(fā)它收到(Observe)的數(shù)據(jù),也可以發(fā)射新的數(shù)據(jù)。
由于一個(gè)Subject訂閱一個(gè)Observable,它可以觸發(fā)這個(gè)Observable開始發(fā)射數(shù)據(jù)(如果那個(gè)Observable是"冷"的--就是說(shuō),它等待有訂閱才開始發(fā)射數(shù)據(jù))。因此有這樣的效果,Subject可以把原來(lái)那個(gè)"冷"的Observable變成"熱"的。
針對(duì)不同的場(chǎng)景一共有四種類型的Subject。他們并不是在所有的實(shí)現(xiàn)中全部都存在,而且一些實(shí)現(xiàn)使用其它的命名約定(例如,在RxScala中Subject被稱作PublishSubject)。
一個(gè)AsyncSubject只在原始Observable完成后,發(fā)射來(lái)自原始Observable的最后一個(gè)值。(如果原始Observable沒(méi)有發(fā)射任何值,AsyncObject也不發(fā)射任何值)它會(huì)把這最后一個(gè)值發(fā)射給任何后續(xù)的觀察者。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.AsyncSubject.png" alt="" />
然而,如果原始的Observable因?yàn)榘l(fā)生了錯(cuò)誤而終止,AsyncSubject將不會(huì)發(fā)射任何數(shù)據(jù),只是簡(jiǎn)單的向前傳遞這個(gè)錯(cuò)誤通知。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.AsyncSubject.e.png" alt="" />
當(dāng)觀察者訂閱BehaviorSubject時(shí),它開始發(fā)射原始Observable最近發(fā)射的數(shù)據(jù)(如果此時(shí)還沒(méi)有收到任何數(shù)據(jù),它會(huì)發(fā)射一個(gè)默認(rèn)值),然后繼續(xù)發(fā)射其它任何來(lái)自原始Observable的數(shù)據(jù)。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.BehaviorSubject.png" alt="" />
然而,如果原始的Observable因?yàn)榘l(fā)生了一個(gè)錯(cuò)誤而終止,BehaviorSubject將不會(huì)發(fā)射任何數(shù)據(jù),只是簡(jiǎn)單的向前傳遞這個(gè)錯(cuò)誤通知。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.BehaviorSubject.e.png" alt="" />
PublishSubject只會(huì)把在訂閱發(fā)生的時(shí)間點(diǎn)之后來(lái)自原始Observable的數(shù)據(jù)發(fā)射給觀察者。需要注意的是,PublishSubject可能會(huì)一創(chuàng)建完成就立刻開始發(fā)射數(shù)據(jù)(除非你可以阻止它發(fā)生),因此這里有一個(gè)風(fēng)險(xiǎn):在Subject被創(chuàng)建后到有觀察者訂閱它之前這個(gè)時(shí)間段內(nèi),一個(gè)或多個(gè)數(shù)據(jù)可能會(huì)丟失。如果要確保來(lái)自原始Observable的所有數(shù)據(jù)都被分發(fā),你需要這樣做:或者使用Create創(chuàng)建那個(gè)Observable以便手動(dòng)給它引入"冷"Observable的行為(當(dāng)所有觀察者都已經(jīng)訂閱時(shí)才開始發(fā)射數(shù)據(jù)),或者改用ReplaySubject。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.PublishSubject.png" alt="" />
如果原始的Observable因?yàn)榘l(fā)生了一個(gè)錯(cuò)誤而終止,PublishSubject將不會(huì)發(fā)射任何數(shù)據(jù),只是簡(jiǎn)單的向前傳遞這個(gè)錯(cuò)誤通知。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.PublishSubject.e.png" alt="" />
ReplaySubject會(huì)發(fā)射所有來(lái)自原始Observable的數(shù)據(jù)給觀察者,無(wú)論它們是何時(shí)訂閱的。也有其它版本的ReplaySubject,在重放緩存增長(zhǎng)到一定大小的時(shí)候或過(guò)了一段時(shí)間后會(huì)丟棄舊的數(shù)據(jù)(原始Observable發(fā)射的)。
如果你把ReplaySubject當(dāng)作一個(gè)觀察者使用,注意不要從多個(gè)線程中調(diào)用它的onNext方法(包括其它的on系列方法),這可能導(dǎo)致同時(shí)(非順序)調(diào)用,這會(huì)違反Observable協(xié)議,給Subject的結(jié)果增加了不確定性。
http://wiki.jikexueyuan.com/project/rx-docs/images/S.ReplaySubject.png" alt="" />
假設(shè)你有一個(gè)Subject,你想把它傳遞給其它的代理或者暴露它的Subscriber接口,你可以調(diào)用它的asObservable方法,這個(gè)方法返回一個(gè)Observable。具體使用方法可以參考Javadoc文檔。
如果你把 Subject 當(dāng)作一個(gè) Subscriber 使用,注意不要從多個(gè)線程中調(diào)用它的onNext方法(包括其它的on系列方法),這可能導(dǎo)致同時(shí)(非順序)調(diào)用,這會(huì)違反Observable協(xié)議,給Subject的結(jié)果增加了不確定性。
要避免此類問(wèn)題,你可以將 Subject 轉(zhuǎn)換為一個(gè) SerializedSubject ,類似于這樣:
mySafeSubject = new SerializedSubject( myUnsafeSubject );