目录
- RxJava是什么?为什么使用。
- RxJava是如何使用的呢?
- RxJava如何和Retrofit一起使用。
- RxJava源码分析。
- (1)他执行流程是如何的。
- (2)map
- (3)线程的切换。
- 如何自定义RxJava操作符?
一、RxJava是什么?为什么使用
RxJava 是一个基于 响应式编程范式 的库,用于通过观察者模式和链式操作符,简化异步、事件驱动、多线程数据流处理的开发。
简单来说,RxJava 就像是一个“流水线工厂”,专门处理需要等待的任务(比如网络请求、数据库查询、复杂计算等)。它能把这些任务串成一条流水线,每个环节处理完数据后,自动传给下一个环节,还能灵活控制任务在哪个线程执行(比如后台线程干活,主线程更新UI)。
1.1 为什么要使用RxJava呢?
接下来,我们看看不使用的问题,以网络请求为例。
需求:按顺序做三件事(登录 → 查询订单 → 更新UI)。
传统写法:“回调地狱”,层层嵌套,像俄罗斯套娃!
// 伪代码:传统嵌套回调(问题代码)
api.login(new Callback<User>() {
@Override
public void onSuccess(User user) {
api.getOrders(user.getId(), new Callback<List<Order>>() {
@Override
public void onSuccess(List<Order> orders) {
runOnUiThread(() -> {
showOrders(orders); // 切主线程更新UI
});
}
@Override
public void onFailure(Throwable error) {
showError(error); // 每个回调都要处理错误!
}
});
}
@Override
public void onFailure(Throwable error) {
showError(error);
}
});
问题总结:
- 代码缩进成“金字塔”,维护困难。
- 重复处理错误,每个回调都要写
onFailure。 - 手动切换线程(如
runOnUiThread),容易遗漏。
RxJava写法:
// RxJava 链式调用(解决方案)
api.rxLogin() // 1. 登录(被观察者)
.flatMap(user -> api.rxGetOrders(user.getId())) // 2. 查询订单(操作符)
.observeOn(AndroidSchedulers.mainThread()) // 3. 切到主线程
.subscribe(orders -> showOrders(orders), // 4. 观察者消费数据
error -> showError(error)); // 统一错误处理!
优势:
1️ 代码变“直线”,逻辑清晰。
2️ 统一错误处理,一个 onError 搞定所有。
3️ 自动线程切换,不用写 runOnUiThread。
二、RxJava是如何使用的呢?
(1)添加依赖
implementation 'io.reactivex.rxjava3:rxjava:3.1.8'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2' // Android 需要
(2)使用
Observable.just("你好")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe: 订阅开始");
}
@Override
public void onNext(@NonNull String s) {
Log.d(TAG, "onNext: 拿到事件"+s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError: 错误事件");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: 事件完成");
}
});

解释一下执行流程:
(1)首先我们先需要知道几个角色:观察者( Observer),被观察者( Observable)。
(2)当被观察者发送出数据(调用just方法)你好的时候,那么观察者就会收到消息(subscribe方法就是订阅)。
(3) subscribe() 将观察者与被观察者连接,触发 Observable 开始发射数据,Observer接收并处理事件(数据、错误、完成信号)。
(4)Observer的 onSubscribe 方法订阅时立即调用(最先执行)。 通知观察者订阅已建立。
(5) onNext 方法Observable 发射数据时调用。
(6) onComplete() 方法Observable 正常完成数据发射后调用 。onError 反之。
三、RxJava如何和Retrofit一起使用
其实就是将Retrofit的响应结果交给RxJava来处理
3.1 发起一个请求
(1)我们需要在Retrofit这里配置 RxJava适配器
public class RetrofitClient {
private static final String BASE_URL = "https://www.wanandroid.com/";
private static Retrofit retrofit;
public static WanAndroidService getService() {
if (retrofit == null) {
retrofit = new Retrofit.Builder()
.baseUrl(BASE_URL)
.addConverterFactory(GsonConverterFactory.create()) // Gson 解析
.addCallAdapterFactory(RxJava3CallAdapterFactory.create()) // RxJava 支持
.build();
}
return retrofit.create(WanAndroidService.class);
}
}
这行代码的作用是 让 Retrofit 支持返回 RxJava 3 的响应式类型(如 Observable、Flowable、Single 等),使得网络请求的结果可以直接通过 RxJava 的流式操作符处理。
(2)在接口这里,我们也是使用Observable来接收。
public interface WanAndroidService {
// 示例1:登录接口(POST)
@FormUrlEncoded
@POST("/user/login")
Observable<ApiResponse<User>> login(
@Field("username") String username,
@Field("password") String password
);
// 示例2:获取首页文章列表(GET)
@GET("/article/list/{page}/json")
Observable<ApiResponse<List<Article>>> getHomeArticles(
@Path("page") int page
);
}
(3)调用接口
private void login(String username, String password) {
WanAndroidService service = RetrofitClient.getService();
service.login(username, password)
.subscribeOn(Schedulers.io()) // 在IO线程发起请求
.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理结果
.subscribe(new Observer<ApiResponse<User>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
compositeDisposable.add(d); // 统一管理订阅
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(@NonNull ApiResponse<User> response) {
Log.d(TAG, "onNext: "+response);
}
@Override
public void onError(@NonNull Throwable e) {
// 网络错误
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
// 请求完成
Log.d(TAG, "onComplete: ");
}
});
}
执行逻辑:
service.login(username, password):
通过 Retrofit 定义的接口发起网络请求,返回一个Observable<ApiResponse<User>>。注意 此时网络请求尚未执行,只是定义了数据源。.subscribeOn(Schedulers.io()):
指定 Observable 的工作线程为 IO 线程observeOn(AndroidSchedulers.mainThread()):
指定 Observer 的回调方法(onNext、onError、onComplete)在 主线程 执行。-
.subscribe(Observer):
订阅 Observable,触发网络请求执行,并绑定观察者处理结果。 此时网络请求正式启动。
3.2 发起两个请求,网络嵌套
需求:先登录,登录成功后再获取用户的收藏列表。
// 获取收藏列表 (GET, 需要登录态)
@GET("/lg/collect/list/{page}/json")
Observable<CollectListResponse> getCollectList(
@Path("page") int page
);
private void nestedNetworkRequest() {
WanAndroidService service = RetrofitClient.getService();
service.login("xxx", "xxx")
.flatMap(loginResponse -> {
Log.d(TAG, "nestedNetworkRequest: "+loginResponse);
// 登录成功后,获取收藏列表
if (loginResponse.getErrorCode() == 0) {
return service.getCollectList(0); // 第0页
} else {
return Observable.error(new Throwable("登录失败"));
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<CollectListResponse>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull CollectListResponse response) {
Log.d(TAG, "onNext: "+response);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError: "+e.getMessage());
}
@Override
public void onComplete() {
// 请求完成
}
});
}
flatMap方法是当 login 请求成功返回数据后才会调用。

四、源码分析:
4.1 先从Observer(观察者)开始
里面会有一个泛型,onNext也是使用这个泛型。

4.2 Observable(被观察者)
(1)调用create方法的时候,就会创建一个ObservableCreate

ObservableCreate里面,包裹了source,按照上面的例子,就类似于调用了我们的login方法,将要发送的请求包裹起来。那么包裹起来干嘛?因为他不是现在执行的,我们都知道,需要调用订阅,才会触发整个流程执行。
4.3 Observable的subscribe订阅过程

订阅发生:调用 subscribe() 时,触发 subscribeActual。

进来以后,我们就可以看到,先执行了我们的onSubscribe方法,然后再去执行source,source就是我们前面说的,将请求包裹起来的内容。
我们看一个非常原始的代码。

然后再里面调用了onNext,就是执行了观察者的onNext方法,然后执行onCompleter那么到这里,整个执行流程就结束了。
4.4 Map变换操作符原理
为什么map可以改变onnext的接收类型呢?我们继续看看。

可以看到,这里的类型就进行了转换。但是为什么观察者也跟着变了呢?

在这里的时候,就已经处理。 返回一个R类型的Observable实例,那么T也就变成了R。
4.5 异步线程切换:subscribeOn(Schedulers.io())
我们拿一个代码来进行分析。
service.login(username, password)
.subscribeOn(Schedulers.io()) // 在IO线程发起请求
.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理结果
.subscribe(new Observer<ApiResponse<User>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
compositeDisposable.add(d); // 统一管理订阅
Log.d(TAG, "onSubscribe: ");
}
...
}
}
subscribeOn接收一个Scheduler参数,Scheduler类它封装了线程池和线程切换逻辑。


那么Schedulers.io()做了什么?Schedulers.io()是一种策略,他会将内部的线程池配置成IO密集型的。我们会发现里面有很多种策略。

那么我们看看subscribeOn做了什么呢?他拿到线程池以后,做什么呢?先保存起来。我们先记住subscribeOn返回了一个ObservableSubscribeOn类

因为我们知道任务需要靠订阅方法才触发的,所以我们来看看ObservableSubscribeOn中的
subscribeActual订阅方法

scheduler.scheduleDirect

createWorker是一个抽象方法,因为前面我们配置的是Schedulers.io(),所以打开IoScheduler的createWorker
,然后会调用schedult方法执行现成。



所以我们的任务就被异步线程执行了。
4.5 主线程切换
那么他如何从异步线程,又切换回主线程的?
.observeOn(AndroidSchedulers.mainThread())
AndroidSchedulers.mainThread()也是一个Scheduler,这里就不多介绍了。我们主要看看他返回的Scheduler

最终切换主线程,还是使用到了handler

我们记住HandlerScheduler类

我们来到observeOn方法

所以,然后最终交给了HandlerScheduler类来执行。
好了,到这里,源码分析就结束了。
那么我们看源码是为了什么?我们可以自定义RxJava操作符来玩玩,也会让我们对前面学习更加的通透理解。
五、自定义RxJava操作符
我们之定义RxJava操作符,并不是说我们自己实现Observable,而是继承他去实现一些功能。
我们看过just方法就知道,其实继承了Observable


然后重写subscribeActual方法,将value包裹进行处理,然后再调用观察者进行分发。
下面我们就写一个防抖操作符,帮助我们理解整个流程。
5.1 自定义防抖操作符
public class RxView {
private final static String TAG = RxView.class.getSimpleName();
// 我们自己的操作符 == 函数
public static Observable<Object> clicks(View view) {
return new ViewClickObservable(view);
}
}
public class ViewClickObservable extends Observable<Object> {
private final View view;
private static final Object EVENT = new Object();
public ViewClickObservable(View view) {
this.view = view;
}
@Override
protected void subscribeActual(Observer<? super Object> observer) {
MyListener myListener = new MyListener(view, observer);//1.拿到view进行处理
observer.onSubscribe(myListener);
this.view.setOnClickListener(myListener);
}
// 拿到view
static final class MyListener implements View.OnClickListener, Disposable {
private final View view;
private Observer<Object> observer;
private final AtomicBoolean isDisposable = new AtomicBoolean();
public MyListener(View view, Observer<Object> observer) {
this.view = view;
this.observer = observer;
}
@Override
public void onClick(View v) {
if (isDisposed() == false) {
observer.onNext(EVENT);
}
}
// 如果用调用了 中断
@Override
public void dispose() {
// 如果没有中断过,才有资格, 取消view.setOnClickListener(null);
if (isDisposable.compareAndSet(false, true)) {
// 主线程 很好的中断
if (Looper.myLooper() == Looper.getMainLooper()) {
view.setOnClickListener(null);
} else {
// 主线程,通过Handler的切换
/*new Handler(Looper.getMainLooper()) {
@Override
public void handleMessage(@NonNull Message msg) {
super.handleMessage(msg);
view.setOnClickListener(null);
}
};*/
//放到主线程执行。
AndroidSchedulers.mainThread().scheduleDirect(new Runnable() {
@Override
public void run() {
view.setOnClickListener(null);
}
});
}
}
}
@Override
public boolean isDisposed() {
return isDisposable.get();
}
}
}

RxView.clicks(button)
.throttleFirst(2000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
}
});
}
}
执行流程:
- 用户通过
RxView.clicks(view)创建ViewClickObservable。 - 调用
subscribe()后触发subscribeActual,创建MyListener并绑定到View的点击事件。 - 点击事件触发
onClick,通过Observer发送onNext事件。然后Observer的accept方法就收到了事件(object) - 调用
dispose()时移除View的点击监听。



















