Android RxJava框架分析:它的执行流程是如何的?它的线程是如何切换的?如何自定义RxJava操作符?

news2025/7/19 15:32:43

目录

  1. RxJava是什么?为什么使用。
  2. RxJava是如何使用的呢?
  3. RxJava如何和Retrofit一起使用。
  4. RxJava源码分析。
  • (1)他执行流程是如何的。
  • (2)map
  • (3)线程的切换。
  1. 如何自定义RxJava操作符?

一、RxJava是什么?为什么使用

RxJava 是一个基于 ​​响应式编程范式​​ 的库,用于通过​​观察者模式​​和​​链式操作符​​,简化异步、事件驱动、多线程数据流处理的开发。

简单来说,RxJava 就像是一个​​“流水线工厂”​​,专门处理需要等待的任务(比如网络请求、数据库查询、复杂计算等)。它能把这些任务串成一条流水线,每个环节处理完数据后,自动传给下一个环节,还能灵活控制任务在哪个线程执行(比如后台线程干活,主线程更新UI)。

1.1 为什么要使用RxJava呢?

接下来,我们看看不使用的问题,以网络请求为例

需求​​:按顺序做三件事(登录 → 查询订单 → 更新UI)。

​传统写法​​:​​“回调地狱”​​,层层嵌套,像俄罗斯套娃!

// 伪代码:传统嵌套回调(问题代码)
api.login(new Callback<User>() {
    @Override
    public void onSuccess(User user) {
        api.getOrders(user.getId(), new Callback<List<Order>>() {
            @Override
            public void onSuccess(List<Order> orders) {
                runOnUiThread(() -> {
                    showOrders(orders); // 切主线程更新UI
                });
            }
            @Override
            public void onFailure(Throwable error) {
                showError(error); // 每个回调都要处理错误!
            }
        });
    }
    @Override
    public void onFailure(Throwable error) {
        showError(error);
    }
});

​问题总结​​:

  • ​代码缩进成“金字塔”​​,维护困难。
  • ​重复处理错误​​,每个回调都要写 onFailure
  • ​手动切换线程​​(如 runOnUiThread),容易遗漏。

​RxJava写法​​:

// RxJava 链式调用(解决方案)
api.rxLogin()                   // 1. 登录(被观察者)
    .flatMap(user -> api.rxGetOrders(user.getId())) // 2. 查询订单(操作符)
    .observeOn(AndroidSchedulers.mainThread()) // 3. 切到主线程
    .subscribe(orders -> showOrders(orders), // 4. 观察者消费数据
               error -> showError(error));   // 统一错误处理!

优势​​:
1️ ​​代码变“直线”​​,逻辑清晰。
2️ ​​统一错误处理​​,一个 onError 搞定所有。
3️ ​​自动线程切换​​,不用写 runOnUiThread


二、RxJava是如何使用的呢?

(1)添加依赖

implementation 'io.reactivex.rxjava3:rxjava:3.1.8'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2' // Android 需要

(2)使用

Observable.just("你好")
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d(TAG, "onSubscribe: 订阅开始");
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.d(TAG, "onNext: 拿到事件"+s);

            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d(TAG, "onError: 错误事件");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: 事件完成");
            }
        });

在这里插入图片描述

解释一下执行流程:

(1)首先我们先需要知道几个角色:观察者( Observer),被观察者( Observable)。

(2)当被观察者发送出数据(调用just方法)你好的时候,那么观察者就会收到消息(subscribe方法就是订阅)。

(3) subscribe() 将观察者与被观察者连接,触发 Observable 开始发射数据,Observer接收并处理事件(数据、错误、完成信号)。

(4)Observer的 onSubscribe 方法订阅时立即调用​​(最先执行)。 通知观察者订阅已建立。

(5) onNext 方法Observable 发射数据时调用。

(6) onComplete() 方法Observable ​​正常完成数据发射​​后调用 。onError 反之。


三、RxJava如何和Retrofit一起使用

其实就是将Retrofit的响应结果交给RxJava来处理

3.1 发起一个请求

(1)我们需要在Retrofit这里配置 RxJava适配器

public class RetrofitClient {

    private static final String BASE_URL = "https://www.wanandroid.com/";

    private static Retrofit retrofit;

    public static WanAndroidService getService() {
        if (retrofit == null) {
            retrofit = new Retrofit.Builder()
                .baseUrl(BASE_URL)
                .addConverterFactory(GsonConverterFactory.create()) // Gson 解析
                .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) // RxJava 支持
                .build();
        }
        return retrofit.create(WanAndroidService.class);
    }
}

这行代码的作用是 ​​让 Retrofit 支持返回 RxJava 3 的响应式类型​​(如 ObservableFlowableSingle 等),使得网络请求的结果可以直接通过 RxJava 的流式操作符处理。

(2)在接口这里,我们也是使用Observable来接收。

public interface WanAndroidService {

    // 示例1:登录接口(POST)
    @FormUrlEncoded
    @POST("/user/login")
    Observable<ApiResponse<User>> login(
        @Field("username") String username,
        @Field("password") String password
    );

    // 示例2:获取首页文章列表(GET)
    @GET("/article/list/{page}/json")
    Observable<ApiResponse<List<Article>>> getHomeArticles(
        @Path("page") int page
    );
}

(3)调用接口

private void login(String username, String password) {
    WanAndroidService service = RetrofitClient.getService();

    service.login(username, password)
            .subscribeOn(Schedulers.io()) // 在IO线程发起请求
            .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理结果
            .subscribe(new Observer<ApiResponse<User>>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    compositeDisposable.add(d); // 统一管理订阅
                    Log.d(TAG, "onSubscribe: ");
                }

                @Override
                public void onNext(@NonNull ApiResponse<User> response) {
                    Log.d(TAG, "onNext: "+response);
                }

                @Override
                public void onError(@NonNull Throwable e) {
                    // 网络错误
                    Log.d(TAG, "onError: ");
                }

                @Override
                public void onComplete() {
                    // 请求完成
                    Log.d(TAG, "onComplete: ");
                }
            });
}

执行逻辑:

  1. service.login(username, password)​:
    通过 Retrofit 定义的接口发起网络请求,返回一个 Observable<ApiResponse<User>>。注意 ​​此时网络请求尚未执行​​,只是定义了数据源。
  2. .subscribeOn(Schedulers.io())​:
    指定 Observable 的工作线程为 ​​IO 线程​
  3. observeOn(AndroidSchedulers.mainThread())​:
    指定 Observer 的回调方法(onNextonErroronComplete)在 ​​主线程​​ 执行。
  4. .subscribe(Observer)​:
    订阅 Observable,触发网络请求执行,并绑定观察者处理结果。 ​​此时网络请求正式启动​​。

3.2 发起两个请求,网络嵌套

需求:先登录,登录成功后再获取用户的收藏列表。

// 获取收藏列表 (GET, 需要登录态)
@GET("/lg/collect/list/{page}/json")
Observable<CollectListResponse> getCollectList(
        @Path("page") int page
);
private void nestedNetworkRequest() {
    WanAndroidService service = RetrofitClient.getService();

    service.login("xxx", "xxx")
            .flatMap(loginResponse -> {
                Log.d(TAG, "nestedNetworkRequest: "+loginResponse);
                // 登录成功后,获取收藏列表
                if (loginResponse.getErrorCode() == 0) {
                    return service.getCollectList(0); // 第0页
                } else {
                    return Observable.error(new Throwable("登录失败"));
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<CollectListResponse>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                }

                @Override
                public void onNext(@NonNull CollectListResponse response) {
                    Log.d(TAG, "onNext: "+response);

                }

                @Override
                public void onError(@NonNull Throwable e) {
                    Log.d(TAG, "onError: "+e.getMessage());
                }

                @Override
                public void onComplete() {
                    // 请求完成
                }
            });
}

flatMap方法是当 login 请求成功返回数据后才会调用。

在这里插入图片描述

四、源码分析:

4.1 先从Observer(观察者)开始

里面会有一个泛型,onNext也是使用这个泛型。

在这里插入图片描述

4.2 Observable(被观察者)

(1)调用create方法的时候,就会创建一个ObservableCreate
在这里插入图片描述

ObservableCreate里面,包裹了source,按照上面的例子,就类似于调用了我们的login方法,将要发送的请求包裹起来。那么包裹起来干嘛?因为他不是现在执行的,我们都知道,需要调用订阅,才会触发整个流程执行。

4.3 Observable的subscribe订阅过程

在这里插入图片描述

​订阅发生​​:调用 subscribe() 时,触发 subscribeActual

在这里插入图片描述

进来以后,我们就可以看到,先执行了我们的onSubscribe方法,然后再去执行source,source就是我们前面说的,将请求包裹起来的内容。

我们看一个非常原始的代码。
在这里插入图片描述

然后再里面调用了onNext,就是执行了观察者的onNext方法,然后执行onCompleter那么到这里,整个执行流程就结束了。

4.4 Map变换操作符原理

为什么map可以改变onnext的接收类型呢?我们继续看看。

在这里插入图片描述

可以看到,这里的类型就进行了转换。但是为什么观察者也跟着变了呢?
在这里插入图片描述

在这里的时候,就已经处理。 返回一个R类型的Observable实例,那么T也就变成了R。

4.5 异步线程切换:subscribeOn(Schedulers.io())

我们拿一个代码来进行分析。

service.login(username, password)
        .subscribeOn(Schedulers.io()) // 在IO线程发起请求
        .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理结果
        .subscribe(new Observer<ApiResponse<User>>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                compositeDisposable.add(d); // 统一管理订阅
                Log.d(TAG, "onSubscribe: ");
            }
            ...
        }
}

subscribeOn接收一个Scheduler参数,Scheduler类它封装了线程池和线程切换逻辑。

在这里插入图片描述

在这里插入图片描述

那么Schedulers.io()做了什么?Schedulers.io()​是一种策略,他会将内部的线程池配置成IO密集型的。我们会发现里面有很多种策略。

在这里插入图片描述

那么我们看看subscribeOn做了什么呢?他拿到线程池以后,做什么呢?先保存起来。我们先记住subscribeOn返回了一个ObservableSubscribeOn类
在这里插入图片描述

因为我们知道任务需要靠订阅方法才触发的,所以我们来看看ObservableSubscribeOn中的
subscribeActual订阅方法
在这里插入图片描述

scheduler.scheduleDirect

在这里插入图片描述

createWorker是一个抽象方法,因为前面我们配置的是Schedulers.io(),所以打开IoScheduler的createWorker
,然后会调用schedult方法执行现成。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

所以我们的任务就被异步线程执行了。

4.5 主线程切换

那么他如何从异步线程,又切换回主线程的?

.observeOn(AndroidSchedulers.mainThread())

AndroidSchedulers.mainThread()也是一个Scheduler,这里就不多介绍了。我们主要看看他返回的Scheduler

在这里插入图片描述

最终切换主线程,还是使用到了handler

在这里插入图片描述

我们记住HandlerScheduler类

在这里插入图片描述

我们来到observeOn方法

在这里插入图片描述

所以,然后最终交给了HandlerScheduler类来执行。

好了,到这里,源码分析就结束了。

那么我们看源码是为了什么?我们可以自定义RxJava操作符来玩玩,也会让我们对前面学习更加的通透理解。

五、自定义RxJava操作符

我们之定义RxJava操作符,并不是说我们自己实现Observable,而是继承他去实现一些功能。

我们看过just方法就知道,其实继承了Observable
在这里插入图片描述

在这里插入图片描述

然后重写subscribeActual方法,将value包裹进行处理,然后再调用观察者进行分发。

下面我们就写一个防抖操作符,帮助我们理解整个流程。

5.1 自定义防抖操作符

public class RxView {

    private final static String TAG = RxView.class.getSimpleName();

    // 我们自己的操作符 == 函数
    public static Observable<Object> clicks(View view) {
        return new ViewClickObservable(view);
    }

}

public class ViewClickObservable extends Observable<Object> {

    private final View view;

    private static final Object EVENT = new Object();

    public ViewClickObservable(View view) {
        this.view = view;
    }

    @Override
    protected void subscribeActual(Observer<? super Object> observer) {
        MyListener myListener = new MyListener(view, observer);//1.拿到view进行处理
        observer.onSubscribe(myListener);
        this.view.setOnClickListener(myListener);
    }

    // 拿到view
    static final class MyListener implements View.OnClickListener, Disposable {

        private final View view;
        private Observer<Object> observer;
        
        private final AtomicBoolean isDisposable = new AtomicBoolean();

        public MyListener(View view, Observer<Object> observer) {
            this.view = view;
            this.observer = observer;
        }

        @Override
        public void onClick(View v) {
            if (isDisposed() == false) {
                observer.onNext(EVENT);
            }
        }

        // 如果用调用了 中断
        @Override
        public void dispose() {
            // 如果没有中断过,才有资格,   取消view.setOnClickListener(null);
            if (isDisposable.compareAndSet(false, true)) {
                // 主线程 很好的中断
                if (Looper.myLooper() == Looper.getMainLooper()) {
                    view.setOnClickListener(null);

                } else { 
                // 主线程,通过Handler的切换
                    /*new Handler(Looper.getMainLooper()) {
                        @Override
                        public void handleMessage(@NonNull Message msg) {
                            super.handleMessage(msg);
                            view.setOnClickListener(null);
                        }
                    };*/

                    //放到主线程执行。
                    AndroidSchedulers.mainThread().scheduleDirect(new Runnable() {
                        @Override
                        public void run() {
                            view.setOnClickListener(null);
                        }
                    });
                }
            }
        }

        @Override
        public boolean isDisposed() {
            return isDisposable.get();
        }
    }
}

在这里插入图片描述

RxView.clicks(button)
        .throttleFirst(2000, TimeUnit.MILLISECONDS)
        .subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
            }
        });
    }
}

执行流程:

  • 用户通过 RxView.clicks(view) 创建 ViewClickObservable
  • 调用 subscribe() 后触发 subscribeActual,创建 MyListener 并绑定到 View 的点击事件。
  • 点击事件触发 onClick,通过 Observer 发送 onNext 事件。然后Observeraccept方法就收到了事件(object)
  • 调用 dispose() 时移除 View 的点击监听。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2375086.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL及线程关于锁的面试题

目录 1.了解过 MySQL 死锁问题吗&#xff1f; 2.什么是线程死锁&#xff1f;死锁相关面试题 2.1 什么是死锁&#xff1a; 2.2 形成死锁的四个必要条件是什么&#xff1f; 2.3 如何避免线程死锁&#xff1f; 3. MySQL 怎么排查死锁问题&#xff1f; 4.Java线上死锁问题如…

【工作记录】crmeb后端项目打开、运行

1、下载代码 1&#xff09;安装git 不再详述 2&#xff09;git拉代码 项目地址如下&#xff0c;在vscode-分支中拉代码 # 克隆项目 git clone https://gitee.com/ZhongBangKeJi/crmeb_java/ 截图如下是已经成功拉下来 注意安装对应版本 2、maven配置 安装配置见&#x…

智能手表测试计划文档(软/硬件)

&#x1f4c4; 智能手表测试计划文档&#xff08;软/硬件&#xff09; 项目名称&#xff1a;Aurora Watch S1 文档编号&#xff1a;AW-S1-QA-TP-001 编制日期&#xff1a;2025-xx-xx 版本&#xff1a;V1.0 编写人&#xff1a;xxx&#xff08;测试主管&#xff09; 一、测试目标…

k8s监控方案实践(三):部署与配置Grafana可视化平台

k8s监控方案实践&#xff08;三&#xff09;&#xff1a;部署与配置Grafana可视化平台 文章目录 k8s监控方案实践&#xff08;三&#xff09;&#xff1a;部署与配置Grafana可视化平台一、Grafana简介1. 什么是Grafana&#xff1f;2. Grafana与Prometheus的关系3. Grafana应用场…

嵌入式系统架构验证工具:AADL Inspector v1.10 全新升级

软件架构建模与早期验证是嵌入式应用的关键环节。架构分析与设计语言&#xff08;AADL&#xff09;是专为应用软件及执行平台架构模型设计的语言&#xff0c;兼具文本与图形化的双重特性。AADL Inspector是一款轻量级的独立工具&#xff1a; 核心处理能力包括 √ 支持处理AA…

STM32-模电

目录 一、MOS管 二、二极管 三、IGBT 四、运算放大器 五、推挽、开漏、上拉电阻 一、MOS管 1. MOS简介 这里以nmos管为例&#xff0c;注意箭头方向。G门极/栅极&#xff0c;D漏极&#xff0c;S源极。 当给G通高电平时&#xff0c;灯泡点亮&#xff0c;给G通低电平时&a…

华为云Flexus+DeepSeek征文|从开通到应用:华为云DeepSeek-V3/R1商用服务深度体验

前言 本文章主要讲述在华为云ModelArts Studio上 开通DeepSeek-V3/R1商用服务的流程&#xff0c;以及开通过程中的经验分享和使用感受帮我更多开发者&#xff0c;在华为云平台快速完成 DeepSeek-V3/R1商用服务的开通以及使用入门注意&#xff1a;避免测试过程中出现部署失败等问…

鸿蒙NEXT开发动画案例5

1.创建空白项目 2.Page文件夹下面新建Spin.ets文件&#xff0c;代码如下&#xff1a; /*** TODO SpinKit动画组件 - Pulse 脉冲动画* author: CSDN—鸿蒙布道师* since: 2024/05/09*/ ComponentV2 export struct SpinFive {// 参数定义Require Param spinSize: number 48;Re…

ctfshow——web入门351~356

SSRF没有出网的部分 web入门351 $ch curl_init($url); 作用&#xff1a;初始化一个 cURL 会话&#xff0c;并设置目标 URL。解释&#xff1a; curl_init($url) 创建一个新的 cURL 资源&#xff0c;并将其与 $url 关联。这里的 $url 是用户提供的&#xff0c;因此目标地址完全…

【PostgreSQL数据分析实战:从数据清洗到可视化全流程】金融风控分析案例-10.1 风险数据清洗与特征工程

&#x1f449; 点击关注不迷路 &#x1f449; 点击关注不迷路 &#x1f449; 点击关注不迷路 文章大纲 PostgreSQL金融风控分析案例&#xff1a;风险数据清洗与特征工程实战一、案例背景&#xff1a;金融风控数据处理需求二、风险数据清洗实战&#xff08;一&#xff09;缺失值…

美女热舞混剪视频批量剪辑生产技术实践:智能处理与原创性提升方案解析

一、引言&#xff1a;短视频工业化生产的技术转型 在美女类短视频内容运营中&#xff0c;通过标准化技术流程实现「高质量、规模化」产出成为核心需求。本文结合实战经验&#xff0c;解析如何通过智能素材重组、AI 语音合成、动态元素叠加等技术手段&#xff0c;构建自动化生产…

神经网络基础-从零开始搭建一个神经网络

一、什么是神经网络 人工神经网络(Articial Neural Network,简写为ANN)也称为神经网络(NN),是一种模仿生物神经网络和功能的计算模型,人脑可以看做是一个生物神经网络,由众多的神经元连接而成,各个神经元传递复杂的电信号,树突接收到输入信号,然后对信号进行处理,通…

#Redis黑马点评#(五)Redisson原理详解

目录 一 基于Redis的分布式锁优化 二 Redisson 1 实现步骤 2 Redisson可重入锁机制 3 Redisson可重试机制 4 Redisson超时释放机制 5 RedissonMultiLock解决主从一致性 三 trylock与lock两者有何区别 四 Redis优化秒杀 一 基于Redis的分布式锁优化 二 Redisson Redis…

23.(vue3.x+vite)引入组件并动态切换(component)

让多个组件使用同一个挂载点,并动态切换,这就是动态组件 效果截图 A组件代码: <template><div><div>{{message }}</</

VBA会被Python代替吗

VBA不会完全被Python取代、但Python在自动化、数据分析与跨平台开发等方面的优势使其越来越受欢迎、两者将长期并存且各具优势。 Python以其易于学习的语法、强大的开源生态系统和跨平台支持&#xff0c;逐渐成为自动化和数据分析领域的主流工具。然而&#xff0c;VBA依旧在Exc…

SEMI E40-0200 STANDARD FOR PROCESSING MANAGEMENT(加工管理标准)-(三)完结

10 消息服务详情 10.1 本章定义实现加工管理概念所需的消息服务。这些消息已在第8.1节中初步介绍。 协议无关性&#xff1a;这些服务独立于所使用的消息协议&#xff0c;可映射至SECS-II&#xff08;SEMI E5&#xff09;或其他类似协议。 10.1.1 消息服务定义内容包括&#…

MySQL数据库创建、删除、修改

一&#xff1a;建库建表 我们以学校体系进行建表。将数据库命名为school。 以下代码中的大写均可小写不影响。如CREATE DATABASE与create database相同 四个关键的实体分别是学院、老师、学生和课程&#xff0c;其中&#xff0c;学生跟学院是从属关系&#xff0c;这个关系从…

【氮化镓】GaN在不同电子能量损失的SHI辐射下的损伤

该文的主要发现和结论如下: GaN的再结晶特性 :GaN在离子撞击区域具有较高的再结晶倾向,这导致其形成永久损伤的阈值较高。在所有研究的电子能量损失 regime 下,GaN都表现出这种倾向,但在电子能量损失增加时,其效率会降低,尤其是在材料发生解离并形成N₂气泡时。 能量损失…

防火墙来回路径不一致导致的业务异常

案例拓扑&#xff1a; 拓扑描述&#xff1a; 服务器有2块网卡&#xff0c;内网网卡2.2.2.1/24 网关2.2.254 提供内网用户访问&#xff1b; 外网网卡1.1.1.1/24&#xff0c;外网网关1.1.1.254 80端口映射到公网 这个时候服务器有2条默认路由&#xff0c;分布是0.0.0.0 0.0.0.0 1…

WTK6900C-48L:离线语音芯片重构玩具DNA,从“按键操控”到“声控陪伴”的交互跃迁

一&#xff1a;开发背景 随着消费升级和AI技术进步&#xff0c;传统玩具的机械式互动已难以满足市场需求。语音控制芯片的引入使玩具实现了从被动玩耍到智能交互的跨越式发展。通过集成高性价比的语音识别芯片&#xff0c;现代智能玩具不仅能精准响应儿童指令&#xff0c;还能实…