RxBus学习之旅--从入门到提高

2017-01-14 15:49:13来源:http://www.jianshu.com/p/f43903b28cc2作者:sugaryaruan人点击

第七城市

在公司的技术分享会上,做了关于RxBus的学习分享,记录如下:


一.RxBus与RxJava

一次RxJava调用过程可以划分为以下环节:


创建观察内容 (片段1)
数据处理/映射(片段2)
选择线程(片段3)
订阅(片段4,片段5)
完成/错误处理(片段6)

示例代码:


Observable
// 片段1
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onStart();
String trim = mainEd.getText().toString().trim();
subscriber.onNext(trim);
subscriber.onError(new Throwable());
subscriber.onCompleted();
}
})
// 片段2
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + " sugarya";
}
})
// 片段3
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
// 片段4
.subscribe(
// 片段5
new Subscriber<String>() {
@Override
public void onStart() {
super.onStart();
}
//片段6
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
mainTv2.setText(s);
}
});

二.RxBus与接口回调

一次完整的接口回调,包括四个步骤:


接口定义
接口调用
接口实现
接口注入

RxBus的使用过程,就是一个接口回调的过程。


接口定义,在RxJava定义好了。


上面示例代码片段1里的suscriber.onNext(),onStart(),onError(),onComplete()对应接口调用。


代码片段5,片段6这些是接口实现


注入的过程是调用suscribe()方法订阅的过程


三.RxBus源码分析

RxBus的代码实现如下:


public class RxBus {
private static volatile RxBus instance;
private final Subject<Object, Object> _bus;

private RxBus() {
_bus = new SerializedSubject<>(PublishSubject.create());
}
public static RxBus getInstance() {
if (null == instance) {
synchronized (RxBus.class) {
if (null == instance) {
instance = new RxBus();
}
}
}
return instance;
}
public void send(Object object) {
try{
_bus.onNext(object);
}catch (Exception e){
e.printStackTrace();
}
}
public boolean hasObservers() {
return _bus.hasObservers();
}

private <T> Observable<T> toObservable(final Class<T> type) {
return _bus.ofType(type);//filter + cast
}



public <T> Subscription toSubscription(final Class<T> type, Observer<T> observer) {
return toObservable(type).subscribe(observer);
}
public <T> Subscription toSubscription(final Class<T> type, Action1<T> action1) {
return toObservable(type).subscribe(action1);
}
public <T> Subscription toSubscription(final Class<T> type, Action1<T> action1, Action1<Throwable> errorAction1) {
return toObservable(type).subscribe(action1,errorAction1);
}
}

接下来对上述代码做些简要分析:


volatile
保证instance可见性
禁止指令重排

这里涉及到Java内存模型,相关资料: 传送门


SerializedSubject

SerializedSubject extends Subject extends Observable implements Observer,既是观察内容,又是观察者,起到桥梁/数据转发的作用



保证多线程安全,Subject 当作一个 Subscriber 使用,从多个线程中调用它的onNext方法(包括其它的on系列方法)



PublishSubject

主题,RxJava里有四种主题


PublishSubject
BehaviorSubject
ReplaySubject
AsyncSubject

PublishSubject的含义是:在订阅者订阅的时间点之后的数据发送给观察者


ofType操作符 = filter操作符 + cast操作符
filter只有符合过滤条件的数据才会被“发射”
cast将一个Observable转换成指定类型的Observable
CompositeSubscription

该对象作为subscription的容器,方便统一取消订阅


四.RxBus异常处理

当RxBus在执行过程中,任意环节发生了错误异常,订阅关系就会被取消。之后再次发送,将无法执行订阅后的回调。


做了一个数组越界的错误来演示


@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
ButterKnife.bind(this);
onRxBus();
}

private void onRxBus() {
int[] array = new int[2];
RxBus.getInstance().toSubscription(Integer.class, new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
//onRxBus();
Log.e(TAG, "mSubscriber17 onError: " + e.toString());
}
@Override
public void onNext(Integer integer) {
array[integer] = integer;
Log.e(TAG, "mSubscriber17 onNext:" + integer);
}
});
}
@OnClick(R.id.btn_main17)
void onClick17() {
RxBus.getInstance().send(2);
}

这时候点击button按钮,数组越界异常,第二次再点击Button发送消息,没有响应了。这里,RxBus,确切的说是RxJava捕获到错误异常,就会取消订阅关系。


E/MainActivity: mSubscriber17 onError: java.lang.ArrayIndexOutOfBoundsException: length=2; index=2

解决的思路:


使用try-catch捕获异常,不让异常被RxJava捕获
在onError里重新订阅。

接下来说说第二种方法,具体怎么操作,其实就是在onError方法里,重新执行一遍订阅,执行上述注释掉的代码onRxBus,就解决问题了。


五.小结

上述其实是这次技术分享大纲,技术分享准备功课前,刷了下面的文章,要对RxBus有更细致的学习和了解,可以阅读:(推荐)


谢三弟系列

RxBus简单实现


RxBus深入源码解析


RxJava里onError异常处理


Yokey系列

RxJava实现事件总线


RxBus异常处理


RxBus实现Sticky事件(粘性订阅)


其他

RxBus从基础实现到升级——打造属于自己的RxBus


EventBus和RxBus实现和性能比较




第七城市

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台