目录
- 一、观察者Observer创建过程
- 二、被观察者Observable创建过程
- 三、subscribe订阅过程
- 四、map操作符
- 五、线程切换原理
简单示例1:
private Disposable mDisposable;
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null) {
if (!mDisposable.isDisposed()) {
mDisposable.dispose();
}
}
}
特别注意:上面示例代码中的mDisposable最后必须要释放掉,不然会出现内存泄漏
一、观察者Observer创建过程
首先对观察者Observer源码开始进行简单分析下:
Observer.java
public interface Observer<T> {
//表示一执行subscribe订阅就会执行该函数,这个函数一定执行在主线程中
void onSubscribe(@NonNull Disposable d);
// 表示拿到上一个流程的数据
void onNext(@NonNull T t);
// 表示拿到上一个流程的错误数据
void onError(@NonNull Throwable e);
// 表示事件流程结束
void onComplete();
}
具体的对象创建是在上面示例代码1中的new Observer<String>()
操作,这个称这个为自定义观察者。
二、被观察者Observable创建过程
分析完观察者Observer的创建,现在来分析下被观察者Observable的创建流程,
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
}
})
将new ObservableOnSubscribe()过程可以理解为是自定义source的过程。
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
}
}
执行Observable.create()代码流程
Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null"); //校验是否为null
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
其中,RxJavaPlugins.onAssembly()采用了hook技术,如果没有重写RxJavaPlugins.setOnObservableAssembly()
方法,这个可以不要考虑。
ObservableCreate.java
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source; // 自定义source
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
/**
* Serializes calls to onNext, onError and onComplete.
*
* @param <T> the value type
*/
static final class SerializedEmitter<T>
extends AtomicInteger
implements ObservableEmitter<T> {
private static final long serialVersionUID = 4883307006032401862L;
final ObservableEmitter<T> emitter;
final AtomicThrowable error;
final SpscLinkedArrayQueue<T> queue;
volatile boolean done;
SerializedEmitter(ObservableEmitter<T> emitter) {
this.emitter = emitter;
this.error = new AtomicThrowable();
this.queue = new SpscLinkedArrayQueue<T>(16);
}
@Override
public void onNext(T t) {
if (emitter.isDisposed() || done) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (get() == 0 && compareAndSet(0, 1)) {
emitter.onNext(t);
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<T> q = queue;
synchronized (q) {
q.offer(t);
}
if (getAndIncrement() != 0) {
return;
}
}
drainLoop();
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (emitter.isDisposed() || done) {
return false;
}
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (error.addThrowable(t)) {
done = true;
drain();
return true;
}
return false;
}
@Override
public void onComplete() {
if (emitter.isDisposed() || done) {
return;
}
done = true;
drain();
}
void drain() {
if (getAndIncrement() == 0) {
drainLoop();
}
}
void drainLoop() {
ObservableEmitter<T> e = emitter;
SpscLinkedArrayQueue<T> q = queue;
AtomicThrowable error = this.error;
int missed = 1;
for (;;) {
for (;;) {
if (e.isDisposed()) {
q.clear();
return;
}
if (error.get() != null) {
q.clear();
e.onError(error.terminate());
return;
}
boolean d = done;
T v = q.poll();
boolean empty = v == null;
if (d && empty) {
e.onComplete();
return;
}
if (empty) {
break;
}
e.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public void setDisposable(Disposable s) {
emitter.setDisposable(s);
}
@Override
public void setCancellable(Cancellable c) {
emitter.setCancellable(c);
}
@Override
public boolean isDisposed() {
return emitter.isDisposed();
}
@Override
public ObservableEmitter<T> serialize() {
return this;
}
}
}
这里将ObservableCreate的源码全部放在这,作为一个埋点
其实,Observable.create()方法主要功能就是创建了一个ObservableCreate对象,并将自定义的source传给ObservableCreate。该方法最终返回的是ObserverableCreate对象。
三、subscribe订阅过程
分析执行subscribe()订阅流程,并将自定义观察者作为参数传入。
Observable.java
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null"); // 功能校验,判定observer是否为null
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
首先会执行一些功能校验,最后执行到subscribeActual()方法中。
Observable.java
protected abstract void subscribeActual(Observer<? super T> observer);
subscribeActual()是一个抽象类,从而最终调用的是ObservableCreate的subscribeActual()方法中。
ObservableCreate.java
@Override
protected void subscribeActual(Observer<? super T> observer) { // observer为自定义观察者
// 自定义一个CreateEmitter发射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 执行该方法就会执行自定义观察者的onSubscribe()方法中
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
subscribeActual()方法里面会执行如下三个操作:
1)CreateEmitter<T> parent = new CreateEmitter<T>(observer);
--> 首先会创建一个CreateEmitter发射器,并将自定义观察者传入该发射器中
2)observer.onSubscribe(parent);
–> 执行自定义观察者的onSubscribe()方法,所以该方法也是最先执行调用,并且一定在主线程中
3)source.subscribe(parent);
-->执行自定义source的subscribe()订阅操作,从而跳转到示例代码1中ObservableOnSubscribe的subscribe()方法,并将CreateEmitter发射器作为参数传入进去
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
}
}
执行e.onNext("test")
就会跳转到CreateEmitter发射器中的onNext()
方法
ObservableCreate.java
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t); //执行该流程,observer为自定义观察者
}
}
...
}
该observer为上面流程中自定义的CreateEmitter发射器CreateEmitter<T> parent = new CreateEmitter<T>(observer);
传入进来的自定义观察者对象,执行observer.onNext(t)
该语句就调到示例代码1中的
@Override
public void onNext(String s) {
}
Observable与Observer订阅的过程时序图如下:
在标准的观察者设计模式中,是一个“被观察者”,多个“观察者”,并且需要“被观察者”发出改变通知后,所以的“观察者”才能观察到
在RxJava观察者设计模式中,是多个“被观察者”,一个“观察者”,并且需要 起点(被观察者) 和 终点(观察者) 在“订阅”一次后,才发出改变通知,终点(观察者)才能观察到
图1:RxJava简单订阅过程:
四、map操作符
加入map操作符之后的简单示例代码2:
private Disposable mDisposable;
// 创建ObserverCreate
Observable.create(new ObservableOnSubscribe<String>() { //自定义source
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
}
})
// ObservableCreate.map
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s;
}
})
// ObservableMap.subscribe
.subscribe(new Observer<String>() { //自定义观察者
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null) {
if (!mDisposable.isDisposed()) {
mDisposable.dispose();
}
}
}
这个示例代码2写法采用装饰模型
图2加入map操作符之后的流程:
从①~⑥流程简称为封包裹,⑦ ~⑨流程简称为拆包裹
其实图1与图2的区别不大,主要就是多了一个ObservableMap封包裹的流程,其他流程都类似。针对这个区别进行代码流程阐述下:
从示例代码2中执行map()
操作进行分析:
Observable.java
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
进行创建ObservableMap对象
ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source); //source指ObservableCreate
this.function = function; // 自定义的Function方法
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function)); //这里面的t为下一层包裹即图2中的自定义观察者,source指上一层ObservableCreate
}
...
}
这里需要注意,在ObservableMap()构造函数中,参数source指从上一层传过来的ObservableCreate对象,参数function指示例代码2中的new Function()方法。
.map(new Function<String, String>()
执行示例代码2中的.subscribe()
其实就是执行到了ObservableMap类的subscribeActual()
方法,在这个方法中会对MapObserver进行封装一层包裹,并将下一层的包裹即自定义观察者也就是参数t
传入。
MapObserver为ObservableMap的内部类。
ObservableMap.java
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual); // actual为自定义观察者
this.mapper = mapper;
}
...
}
在执行图2的第⑧步流程时,就会调用执行包裹1的onNext()方法,即MapObserver类的onNext();
ObservableMap.java
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
// 代码1
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 代码2
actual.onNext(v);
}
1:代码1
执行mapper.apply(t)流程的时候,其实就是调用了示例代码2中的apply()方法。
Function.java
public interface Function<T, R> {
R apply(@NonNull T t) throws Exception;
}
@Override
public String apply(String s) throws Exception {
return s;
}
2:代码2
actual.onNext(v);中的actual是在ObservableMap构造函数传过来的,actual对应图2中的自定义观察者对象,也就是对应图2中的第9步流程。
五、线程切换原理
subscribeOn:给上面代码分配线程
observeOn:给下面代码分配线程
Scheduler分类:
调度器类型 | 效果 |
---|---|
Schedulers.computation() | 用于计算任务,如事件循环或回调处理,不要用于IO操作(IO操作使用Schedulers.io());默认线程数等于处理器的数量 |
Schedulers.from(executor) | 使用指定的Executor作为调度器 |
Schedulers.immediate() | 在当前线程立即开始执行任务 |
Schedulers.io() | 用于IO密集型任务 |
Schedulers.newThread() | 为每个任务创建一个新任务 |
Schedulers.trampoline() | 当其他排队的任务完成后,在当前线程排队开始执行 |
AndroidSchedulers.mainThread() | 用于Android的UI更新操作 |