在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 問答/Android/ RxJava怎么定時依次發(fā)送集合的每個元素?

RxJava怎么定時依次發(fā)送集合的每個元素?

我有一個集合,其中包含了若干Message對象。
我想通過RxJava從集合中每隔一秒依次發(fā)送集合中的對象

我一開始的時候想這樣操作,發(fā)現(xiàn)一次也不發(fā)送對象

Flowable<Message> messageFlowable = Flowable.fromIterable(mock.messages);
Flowable<Long> timeFlowable = Flowable.interval(1, TimeUnit.SECONDS);
Flowable<String> flowable = Flowable.zip(messageFlowable, timeFlowable, new BiFunction<Message, Long, String>() {
            @Override
            public String apply(Message message, Long aLong) throws Exception {
                return message.content;
            }
        });
flowable.subscribe(/*....*/)

后來我就想用這種方式

 Flowable.fromIterable(mock.messages)
            .map(message -> message.content)
            .delay(1, TimeUnit.SECONDS)
            .subscribe(s -> Timber.d("s = %s", s));
                

但是發(fā)現(xiàn)集合一瞬間就被發(fā)送完了。并沒有被延時。

有什么辦法可以延時發(fā)送集合嗎?

    delay(1s)      delay(1s)      delay(1s)
 0 -----------> 1 -----------> 2 -----------> 3 ...
回答
編輯回答
孤慣

使用doOnNext() + sleep(2000)

  Flowable.fromIterable(this.mock.messages)
                .doOnNext(message -> SystemClock.sleep(2000))
                .subscribe(message -> Timber.d("message.content = %s", message.content));

使用Zip操作符

Observable<Message> listObservable = Observable.fromIterable(mock.messages);
Observable<Long> timeObservable = Observable.interval(300, TimeUnit.MILLISECONDS);
Observable<String> zip =
        Observable.zip(listObservable, timeObservable, (message, aLong) -> message.content);

zip.doOnComplete(() -> Timber.d("complete"))
        .subscribe(s -> Timber.d("s = %s", s));
2018年2月11日 11:07
編輯回答
心夠野

RxJava我沒用過,但我用過RxJS, 大體思想應該一下
你應該這樣先起一個定時器的流,然后每個流都去接上新的流。
因為interval的時間不保準,所以用flatMap使用流上所有的數(shù)據(jù)。
具體的API可能不一樣,但是大致是這個思想吧。

Flowable.interval(1, TimeUnit.SECONDS).flatMap(() => return messageFlowable);
2017年3月30日 13:14
編輯回答
糖果果

代碼用kotlin寫的,寫android的人不管會不會寫,大概應該能看懂
純kotlin工程,非android工程,線程隨便用的,android里根據(jù)你的線程不同自己考慮用不用主線程

package com.github.caijinglong.rxjava

import io.reactivex.Flowable
import io.reactivex.FlowableSubscriber
import io.reactivex.schedulers.Schedulers
import org.reactivestreams.Subscription

fun main(args: Array<String>) {

    var mSubscription: Subscription? = null 

    val list = arrayListOf(1, 2, 3, 4, 5)
    Flowable.fromIterable(list)
            .observeOn(Schedulers.io())
            .subscribeOn(Schedulers.computation())
            .subscribe(object : FlowableSubscriber<Int> {
                override fun onComplete() {
                }

                override fun onSubscribe(s: Subscription) {
                    mSubscription = s
                    s.request(1)
                }

                override fun onNext(p0: Int?) {
                    println("${Date().toLocaleString()} : $p0" )
                    Thread.sleep(1000)
                    mSubscription?.request(1)
                }

                override fun onError(p0: Throwable?) {
                }
            })

    Thread.sleep(5000)//為了防止退出main函數(shù)結束,實際的android工程中不會退出,這里可以不用寫
}

日志

2018-1-15 15:53:38 : 1
2018-1-15 15:53:39 : 2
2018-1-15 15:53:40 : 3
2018-1-15 15:53:41 : 4
2018-1-15 15:53:42 : 5

Process finished with exit code 0
2017年12月31日 18:48