RxJava入门(4):组合合并操作符

2018-02-27 11:31:53来源:https://www.jianshu.com/p/058e5836b677作者:tmyzh人点击

分享


concat/concatArray

组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
区别:concat()组合被观察者数量<=4个,concatArray数量大于4个


 Observable.concat(Observable.just(1, 2, 3),
Observable.just(4, 5, 6),
Observable.just(7, 8, 9),
Observable.just(10, 11, 12))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});

打印结果





concat
 Observable.concatArray(Observable.just(1, 2, 3),
Observable.just(4, 5, 6),
Observable.just(7, 8, 9),
Observable.just(10, 11, 12),
Observable.just(11,12,13))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});

打印结果





concatArray
merge()/mergeArray()

组合多个被观察者一起发送数据,将同一时刻的事件合并然后发送,再顺序合并下面的事件
区别与concat/concatArray一样


 Observable.merge(Observable.intervalRange(0,3,1,1, TimeUnit.SECONDS),
Observable.intervalRange(2,3,1,1,TimeUnit.SECONDS))
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubScribe");
}
@Override
public void onNext(Long aLong) {
Log.e("yzh","onNext--"+aLong);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});

打印结果





merge
concatDelayError() / mergeDelayError()

当合并的被观察中有一个发出onError事件时,其他的被观察者的事件也会被阻止发送,使用上面这两个方法可以将onError事件推迟到其他被观察者发送事件结束后才触发


 Observable.concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new NullPointerException());
}
}),Observable.just(1,2,3))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer serializable) {
Log.e("yzh","onNext--"+serializable);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError-"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});

打印结果





concatArrayDelayError

如果直接使用concat结果如下


onNext--1
onNext--2
onError--java.lang.NullPointException

zip

合并多个被观察者发送的事件,生成一个新的事件序列,然后发送


 Observable<Integer> observable1 =Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.e("yzh","被观察者1发送事件1");
e.onNext(1);
Thread.sleep(1000);
Log.e("yzh","被观察者1发送事件2");
e.onNext(2);
Thread.sleep(1000);
// e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 =Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.e("yzh","观察者2发送事件1");
e.onNext("a");
Thread.sleep(1000);
Log.e("yzh","观察者2发送事件2");
e.onNext("b");
Thread.sleep(1000);
Log.e("yzh","被观察者2发送事件3");
e.onNext("c");
Thread.sleep(1000);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String string) throws Exception {
Log.e("yzh","apply") ;
return integer + string;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh", "onSubscribe");
}
@Override
public void onNext(String value) {
Log.e("yzh", "onNext = " + value);
}
@Override
public void onError(Throwable e) {
Log.e("yzh", "onError");
}
@Override
public void onComplete() {
Log.e("yzh", "onComplete");
}
});

打印结果




zip

注意 例子中的两个观察者用subscribeOn使用了不同的 线程,如果不加上这句代码,zip效果与concat一样,可以试一试。
combineLatest()

对两个被观察者中的事件组合再发送,特点是将第一个被观察者中最后一个事件分别与另一个被观察者中的事件组合再发送。


Observable.combineLatest(Observable.just(1L, 2L, 3L),
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long integer, Long aLong) throws Exception {
Log.e("yzh","合并的对象--"+integer+"--"+aLong);
return integer+aLong;
}
}).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Long s) {
Log.e("yzh","onNext--"+s);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});

打印结果





combineLatest
reduce()

把被观察者需要发送的事件聚合成1个事件然后发送,有点斐波那契数列的意思


Observable.just(1,2,3,4)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.e("yzh","操作数据--"+integer+"---"+integer2);
return integer*integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("yzh","接受到的数据--"+integer);
}
});

打印结果





reduce
collect()

将被观察者Observable发送的数据事件收集到一个数据结构里


 Observable.just(1,2,3,4,5)
.collect(new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
integers.add(integer);
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> integers) throws Exception {
Log.e("yzh","accept--"+integers.toString());
}
});

打印结果





collect
startWith() / startWithArray()

在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者
注意 后面的方法添加的数据在前面


 Observable.just(4,5,6)
.startWith(0)
.startWithArray(1,2,3)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});

打印结果





startWith







最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台