文章目录
- 一、什么是RxJava
 - 二、使用前的准备
 - 1、导入相关依赖
 - 2、字段含意
 - 3、Upstream/Downstream——上/下游
 - 4、BackPressure
 - 5、BackPressure策略
 - 6、“热” and “冷” Observables
 - 7、 基类
 - 8、事件调度器
 - 9、操作符是什么?
 
- 三、RxJava的简单用法
 - 1、Observable——Observer
 - 2、Flowable——Subscriber
 - 3、Completable——CompletableObserver
 - 4、Maybe——MaybeObserver
 - 5、Single——SingleObserver
 - **Single的操作符**
 - 5.1、just: 创建一个发射单一数据项的 `Single`
 - 5.2、error:创建一个发射错误通知的 `Single`
 - 5.3 map: 对 `Single` 中的数据进行转换
 - 5.4 flatMap: 将一个 `Single` 转换为另一个 `Single`
 - 5.5 zip:将多个 `Single` 组合成一个新的 `Single`,并在它们都成功时触发。
 - 5.6 Single的转化模式
 - 5.6.1 将 Single 转换为 Observable——single.toObservable
 - 5.6.2 将 Observable 转换为 Single
 - 5.6.3 将 Single转换为 Completable——single.ignoreElement
 - 5.6.4 将 Single转换为 Maybe——single.toMaybe
 
- 四、事件调度器释放事件
 - 五、Scheduler——调度者
 
一、什么是RxJava
ReactiveX 是一个使用可观察序列编写异步和基于事件的程序的库。它扩展了观察者模式以支持数据和/或事件序列,并添加了运算符,允许以声明方式将序列组合在一起,同时抽象出对低级线程、同步、线程安全、并发数据结构和非线程等问题的关注和阻塞 I/O。
官网链接:ReactiveX
什么是响应式编程?
响应式编程是一种编程范式,旨在处理异步数据流和事件驱动的编程。它着重于数据流和变化的处理,使得在异步和事件驱动环境中更容易构建可维护、可伸缩和高响应的应用程序。
- 数据流: 响应式编程关注数据流,将数据视为一系列事件或变化。
 - 响应性: 响应式编程强调应用程序对事件和数据的即时响应能力。它允许应用程序根据数据流中的事件来触发操作,而不是等待数据的拉取或轮询。
 - 观察者模式: 响应式编程经常使用观察者模式,其中存在一个可观察对象(Observable)和一个或多个观察者(Observer)。可观察对象发出事件,观察者订阅并对这些事件作出反应。
 - 流式操作: 响应式编程提供了一组丰富的操作符,用于处理、过滤、转换和合并数据流。这些操作符允许开发人员以声明性方式构建数据流处理管道。
 - 背压处理: 响应式编程处理异步数据流时,考虑了背压问题,即生产者产生数据的速度大于消费者处理数据的速度。它提供了一些机制来处理背压,如缓冲、丢弃、错误处理等。
 - 异步性: 在响应式编程中,大部分操作都是异步执行的,这有助于避免应用程序的阻塞,提高性能和响应能力。
 
RxJava的观察者模式
RxJava有四个基本概念:Observer(观察者),Observable(被观察者),subscribe(订阅),事件。
Observer和Observable通过subscribe()实现订阅关系,从而Observable可以在需要的时候发出事件通知Observer。
- Observer: 观察者,它决定事件发生时有怎么样的行为;
 - Observable: 被观察者,它决定什么时候出发事件以及触发什么样的事件;
 - subscribe:订阅,将Observer和Observable关联起来
 
二、使用前的准备
1、导入相关依赖
最新依赖地址:Github-RxJava/RxAndroid
implementation "io.reactivex.rxjava3:rxjava:3.1.8"
 
2、字段含意
Reactive:根据上下文一般翻译为反应式、响应式。
Iterable :可迭代对象,支持以迭代器的形式遍历。
Observable: 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者。
Observer :观察者对象,监听Observable发射的数据并做出响应,Subscriber是它的一个特殊实现。
emit: 含义是Observable在数据产生或变化时发送通知给Observer,调用Observer对应的方法,翻译为发射。
items: 在Rx里是指Observable发射的数据项。
3、Upstream/Downstream——上/下游
在响应式编程中,"上游"和"下游"通常用于描述数据流的生产者和消费者之间的关系。
- 上游(Upstream):上游是数据流的生产者,它生成和发出数据项。上游通常是源(例如,传感器、数据库查询、文件读取等),它们生成数据并将其传递给下游。
 - 下游(Downstream):下游是数据流的消费者,它接收和处理来自上游的数据项。下游可以执行各种操作,如过滤、映射、转换、订阅等。它们通常是应用程序中的组件,用于处理和响应来自上游的数据。
 
在响应式编程中,数据通过流动的方式从上游传递到下游,这是一种异步的、非阻塞的方式。
上游和下游之间的通信通常是通过观察者模式或发布-订阅模式进行的,以实现数据的异步传递和处理。这种方式使得可以构建高效的、响应式的应用程序,能够处理异步数据流。
4、BackPressure
BackPressure直译为:背压,也叫做反压。
背压(Backpressure)是指在异步编程中,当生产者(Producer)生成数据的速度快于消费者(Consumer)处理数据的速度时,数据压力会在系统中积累,可能导致一些问题,如内存溢出或性能下降。
在RxJava中也就是被观察者(Observable)发送事件的速度快于观察者(Observer)的速度。
背压问题通常出现在处理数据流的情况下,其中数据生产速度不受消费速度的限制。
5、BackPressure策略

MISSING:缺省设置,不做任何操作,而不进行任何缓冲或丢弃。ERROR: 当订阅者无法处理来自发布者的数据时,会引发MissingBackpressureException异常,表示出现了背压问题。BUFFER:当订阅者无法处理来自发布者的数据时,数据会被缓冲在内存中,直到订阅者可以处理它们。DROP: 把存不下的事件丢弃。LATEST:只保留最新的数据项,丢弃之前的数据。
6、“热” and “冷” Observables
Observable 何时开始发出其items?这取决于Observable。一个“热”Observable 可能会在创建后立即开始发射items,因此任何后续订阅该 Observable 的观察者都可能会开始观察中间某个位置的序列。另一方面,“冷”Observable 会等到观察者订阅它之后才开始发射items,因此这个观察者可以确保会收到整个数据序列。
7、 基类
RxJava 3 中的基类相比RxJava 2 没啥改变,主要有以下几个基类:
-  
io.reactivex.Observable:发送0个/N个的数据,不支持BackPressure,有
onNext和onComplete -  
io.reactivex.Flowable:发送0个/N个的数据,支持Reactive-Streams和支持BackPressure,有
onNext和onComplete -  
io.reactivex.Single:只能发送单个数据或者一个错误,有
onSuccess。 -  
io.reactivex.Completable:没有发送任何数据,但只处理 onComplete 和 onError 事件。有
onComplete -  
io.reactivex.Maybe:能够发射0或者1个数据,要么成功,要么失败。有
onSuccess和onComplete 
8、事件调度器
RxJava事件发出去并不是置之不顾,要有合理的管理者来管理它们,在合适的时机要进行释放事件,这样才不会导致内存泄漏,这里的管理者我们称为事件调度器CompositeDisposable。
9、操作符是什么?
RxJava 提供了各种操作符,用于对观察序列进行转换、过滤、组合和处理。这些操作符可帮助你更灵活地处理异步数据流。
常见的操作符有:create、just、error、map、flatMap。在后面介绍Single的时候会简单的介绍。更多关于操作符的使用在下一篇博客这里简单了解概念就行了。
三、RxJava的简单用法
**RxJava以观察者模式为骨架,**有两种常见的观察者模式:
- Observable(被观察者)/Observer(观察者)
 - Flowable(被观察者)/Subscriber(观察者)
 

使用流程:
- 创建被观察者
 - 创建观察者
 - 订阅被观察者
 - 取消订阅(这一步可以省略)
 
1、Observable——Observer
一般用法:
//创建被观察者/事件源
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
        emitter.onNext("a");
        emitter.onNext("b");
        emitter.onNext("c");
        emitter.onComplete();
    }
});
//创建观察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Log.e("TAG", "onSubscribe == 订阅");
    }
    @Override
    public void onNext(@NonNull String s) {
        Log.e("TAG", "onNext == " + s);
    }
    @Override
    public void onError(@NonNull Throwable e) {
        Log.e("TAG", "onError == " + e.toString());
    }
    @Override
    public void onComplete() {
        Log.e("TAG", "onComplete");
    }
};
//订阅(观察者监视被观察着)
observable.subscribe(observer);
//取消订阅
observable.distinct();
 

这种观察者模型不支持背压:当被观察者快速发送大量数据时,下游不会做其他处理,即使数据大量堆积,调用链也不会报MissingBackpressureException。
消耗内存过大只会OOM。所以,当我们使用Observable——Observer的时候,我们需要考虑的是,数据量是不是很大(官方给出以1000个事件为分界线作为参考)。
并且观察者具有多个重载方法:
    //观察者不对被观察者发送的事件做出响应(但是被观察者还可以继续发送事件)
    public final Disposable subscribe()
 
    //观察者对被观察者发送的任何事件都做出响应
    public final void subscribe(Observer<? super T> observer)
 
    //表示观察者只对被观察者发送的Next事件做出响应
    public final Disposable subscribe(Consumer<? super T> onNext)
 
    //表示观察者只对被观察者发送的Next & Error事件做出响应
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
 
    //表示观察者只对被观察者发送的Next & Error & Complete事件做出响应
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                                      Action onComplete)
 
    //表示观察者只对被观察者发送的Next & Error & Complete & onSubscribe事件做出响应
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                                      Action onComplete, Consumer<? super Disposable> onSubscribe)
 
2、Flowable——Subscriber
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull FlowableEmitter<Integer> emitter) throws Throwable {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onNext(4);
        emitter.onComplete();
    }
}, BackpressureStrategy.BUFFER);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
    Subscription sub;
    @Override
    public void onSubscribe(Subscription s) {
        Log.w("TAG", "onsubscribe start");
        sub = s;
        s.request(1);
        Log.w("TAG", "onsubscribe end");
    }
    @Override
    public void onNext(Integer integer) {
        Log.e("TAG", "onNext == " + integer);
        sub.request(1);
    }
    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }
    @Override
    public void onComplete() {
        Log.e("TAG", "onComplete");
    }
};
flowable.subscribe(subscriber);
 

Flowable是支持背压的,也就是说,一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用request(n)来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。
Flowable使用create()创建时,必须指定BackPressure策略。
注意
尽可能确保在request()之前已经完成了所有的初始化工作,否则就有空指针的风险。
3、Completable——CompletableObserver
它只有onComplete和onError两个事件
//被观察者
Completable completable = Completable.create(new CompletableOnSubscribe() {
    @Override
    public void subscribe(@NonNull CompletableEmitter emitter) throws Throwable {
        emitter.onComplete();
    }
});
//订阅观察者
completable.subscribe(new CompletableObserver() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
    }
    @Override
    public void onComplete() {
        Log.e("TAG","onComplete");
    }
    @Override
    public void onError(@NonNull Throwable e) {
    }
});
 

要转换成其他类型的被观察者,也是可以使用toFlowable()、toObservable()等方法去转换。
4、Maybe——MaybeObserver
如果你的需求是可能发送一个数据或者不会发送任何数据,这时候你就需要Maybe,它类似于Single和Completable的混合体。
        //被观察者
        Maybe<String> maybe = Maybe.create(new MaybeOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull MaybeEmitter<String> emitter) throws Throwable {
                emitter.onSuccess("have Data"); //发送一个数据的情况
//                emitter.onComplete();   //不发送数据的情况
            }
        });
        //订阅观察者
        maybe.subscribe(new MaybeObserver<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }
            @Override
            public void onSuccess(@NonNull String s) {
                Log.e("TAG",s);
            }
            @Override
            public void onError(@NonNull Throwable e) {
            }
            @Override
            public void onComplete() {
                Log.e("TAG","无数据");
            }
        });
 

5、Single——SingleObserver
Single类似于Observable,不同的是,它总是只发射一个值,而不是发射一系列的值(并不存在MissingBackpressureException问题),所以当你使用一个单一连续事件流,这样可以使用Single。
Single观察者只包含两个事件,一个是正常处理成功的onSuccess,另一个是处理失败的onError。

Single<String> stringSingle = Single.create(new SingleOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull SingleEmitter<String> emitter) throws Throwable {
        emitter.onSuccess("success1");
        emitter.onSuccess("success2");
    }
});
stringSingle.subscribe(new SingleObserver<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Log.e("TAG", "onSubscribe: "+d);
    }
    @Override
    public void onSuccess(@NonNull String s) {
        Log.e("TAG", "onSuccess: "+s);
    }
    @Override
    public void onError(@NonNull Throwable e) {
        e.printStackTrace();
    }
});
 

可以看见数据只会发送一次,Single只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。
Single 类型的操作符用于处理这些单一的数据项或错误。
Single的操作符
更多关于操作符的使用在下一篇博客这里简单了解概念就行了。
| 操作符 | 返回值 | 说明 | 
|---|---|---|
| compose | Single | 创建一个自定义的操作符 | 
| concat and concatWith | Observable | 连接多个 Single 和 Observable 发射的数据 | 
| create | Single | 调用观察者的 create 方法创建一个 Single | 
| error | Single | 返回一个立即给订阅者发射错误通知的 Single | 
| flatMap | Single | 返回一个 Single,它发射对原 Single 的数据执行 flatMap 操作后的结果 | 
| flatMapObservable | Observable | 返回一个 Observable,它发射对原 Single 的数据执行 flatMap 操作后的结果 | 
| from | Single | 将 Future 转换成 Single | 
| just | Single | 返回一个发射一个指定值的 Single | 
| map | Single | 返回一个 Single,它发射对原 Single 的数据执行 map 操作后的结果 | 
| merge | Single | 将一个 Single(它发射的数据是另一个 Single,假设为 B)转换成另一个 Single(它发射来自另一个 Single(B) 的数据) | 
| merge and mergeWith | Observable | 合并发射来自多个 Single 的数据 | 
| observeOn | Single | 指示 Single 在指定的调度程序上调用订阅者的方法 | 
| onErrorReturn | Single | 将一个发射错误通知的 Single 转换成一个发射指定数据项的 Single | 
| subscribeOn | Single | 指示 Single 在指定的调度程序上执行操作 | 
| timeout | Single | 它给原有的 Single 添加超时控制,如果超时了就发射一个错误通知 | 
| toSingle | Single | 将一个发射单个值的 Observable 转换为一个 Single | 
| zip and zipWith | Single | 将多个 Single 转换为一个,后者发射的数据是对前者应用一个函数后的结果 | 
用法示例:
5.1、just: 创建一个发射单一数据项的 Single
 

Single<Integer> single = Single.just(42);
 
5.2、error:创建一个发射错误通知的 Single
 

Single<String> single = Single.error(new RuntimeException("Something went wrong"));
 
5.3 map: 对 Single 中的数据进行转换
 

        Single<Integer> source = Single.just(5);
        Single<String> mapped = source.map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Throwable {
                return "integer : "+integer;
            }
        });
 
这意味着原始整数数据 5 经过映射操作转变为了字符串数据,带有特定的前缀。
这种操作在响应式编程中非常有用,因为它允许你对数据进行转换和处理,而不改变数据流的类型。你可以将原始数据映射为需要的格式,以满足应用程序的需求。
5.4 flatMap: 将一个 Single 转换为另一个 Single
 

        Single<Integer> source = Single.just(5);
        Single<String> mapped = source.flatMap(new Function<Integer, SingleSource<? extends String>>() {
            @Override
            public SingleSource<? extends String> apply(Integer integer) throws Throwable {
                return Single.just("Return : "+integer);
            }
        });
 
5.5 zip:将多个 Single 组合成一个新的 Single,并在它们都成功时触发。
 

Single<Integer> source = Single.just(5);
Single<String> mapped = source.flatMap(new Function<Integer, SingleSource<? extends String>>() {
    @Override
    public SingleSource<? extends String> apply(Integer integer) throws Throwable {
        return Single.just("Return : " + integer);
    }
});
Single single = Single.zip(source, mapped, new BiFunction<Integer, String, Object>() {
    @Override
    public Object apply(Integer integer, String s) throws Throwable {
        return "Return : " + integer + s;
    }
});
 
5.6 Single的转化模式
5.6.1 将 Single 转换为 Observable——single.toObservable
Single<Integer> source = Single.just(5);
// 将 Single 转换为 Observable
Observable<Integer> observable = source.toObservable();
// 现在你可以将 Single 的结果集成到 Observable 中
observable.subscribe(
    value -> Log.e("TAG","Received value: " + value),
    error -> Log.e("TAG","Error: " + error),
    () -> Log.e("TAG","Completed")
);
 
5.6.2 将 Observable 转换为 Single
Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
// 将 Observable 转换为 Single,只发射第一个数据项或错误
Single<Integer> single = observable.first(0);
// 现在你可以将 Observable 的结果集成到 Single 中
single.subscribe(
    value -> System.out.println("Received value: " + value),
    error -> System.err.println("Error: " + error)
);
 
5.6.3 将 Single转换为 Completable——single.ignoreElement
Single<Integer> single = Single.just(42);
// 将 Single 转换为 Completable,忽略结果,只关注完成或错误
Completable completable = single.ignoreElement();
// 现在你可以使用 Completable 来执行某些操作
completable.subscribe(
    () -> System.out.println("Completed"),
    error -> System.err.println("Error: " + error)
);
 
5.6.4 将 Single转换为 Maybe——single.toMaybe
Single<Integer> single = Single.just(42);
// 将 Single 转换为 Maybe,考虑成功结果、错误或没有结果
Maybe<Integer> maybe = single.toMaybe();
// 现在你可以使用 Maybe 来处理这三种情况
maybe.subscribe(
    value -> System.out.println("Received value: " + value),
    error -> System.err.println("Error: " + error),
    () -> System.out.println("No result")
);
 
四、事件调度器释放事件
-  
Disposable:
Disposable是 RxJava 的通用接口,用于表示订阅关系。- 它与所有的 RxJava 数据类型都相关,包括 
Observable、Flowable、Single、Completable和Maybe。 - 当你订阅一个数据流时,RxJava 会返回一个 
Disposable对象,你可以使用它来取消订阅或检查订阅状态。 
// 创建一个简单的 Observable,发射一些数据 Observable stringObservable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable { emitter.onNext("Xiyou"); emitter.onNext("3G"); emitter.onNext("Android"); emitter.onComplete(); } }); // 订阅 Observable 并获取 Disposable 对象 Disposable disposable = stringObservable.subscribe( value -> Log.e("TAG", value.toString()), error -> Log.e("TAG", "ERROR" + error), () -> Log.e("TAG", "Completed") ); disposable.dispose(); //在需要的时候取消订阅 -  
CompositeDisposable:
CompositeDisposable是Disposable接口的实现。- 它特别用于管理多个订阅关系,以便一次性取消多个订阅。
 CompositeDisposable可以添加多个Disposable对象,并在需要时一次性取消它们。- 这在管理多个订阅关系时非常有用,例如在 Android 中管理多个异步任务的订阅。
 
//创建一个简单的 Observable,发射一些数据 Observable stringObservable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable { emitter.onNext("Xiyou"); emitter.onNext("3G"); emitter.onNext("Android"); emitter.onComplete(); } }); // 创建一个 CompositeDisposable 来管理订阅关系 CompositeDisposable compositeDisposable = new CompositeDisposable(); // 订阅 Observable 并获取 Disposable 对象 Disposable disposable = stringObservable.subscribe( value -> Log.e("TAG", value.toString()), error -> Log.e("TAG", "ERROR" + error), () -> Log.e("TAG", "Completed") ); // 将 Disposable 对象添加到 CompositeDisposable 中 compositeDisposable.add(disposable); // 在不再需要订阅关系时,可以取消它们 // compositeDisposable.clear(); // 取消所有订阅 // 或者单独取消某个订阅 // disposable.dispose(); // 在不再需要 CompositeDisposable 时,清理它 compositeDisposable.dispose();CompositeDisposable提供的方法中,都是对事件的管理
- dispose():释放所有事件
 - clear():释放所有事件,实现同dispose()
 - add():增加某个事件
 - addAll():增加所有事件
 - remove():移除某个事件并释放
 - delete():移除某个事件
 
 
五、Scheduler——调度者
在RxJava默认规则中,事件的发出和消费都是在同一个线程中发生的,那么上面的这些例子来说,就是一个同步的观察者模式。
在RxJava中Scheduler(调度器)相当于线程控制器,RxJava通过Scheduler来指定那一部分代码执行在哪一个线程。我们来看看简单的例子:
 Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                        emitter.onNext("RxJava:e.onNext== 第一次");
                        emitter.onComplete();
                        Log.d("TAG", "subscribe()线程==" + Thread.currentThread().getId());
                    }
                }).subscribeOn(Schedulers.io())//指定被观察者subscribe()(发射事件的线程)在IO线程()
                .observeOn(AndroidSchedulers.mainThread());//指定观察者接收响应事件的线程在主线程
        observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }
            @Override
            public void onNext(@NonNull String s) {
                // 接收到数据时的回调,s 是传递的数据
                Log.d("TAG", "Received data: " + s);
                Log.d("TAG", "onNext()线程==" + Thread.currentThread().getId());
            }
            @Override
            public void onError(@NonNull Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });
 
- subscribeOn():用于指定
Observable被观察者subscribe()时所发生的线程,即指定发生事件的线程 - observeOn():指定
Observer观察者接收&响应事件的线程,即观察者接收事件的线程 
注意:多次指定发射事件的线程只有第一次指定有效,也就是说多次调用subscribeOn()只有第一次有效,其余的会被忽略;但是多次指定订阅者接收事件的线程是可以的,也就是说每observeOn()一次,接收事件的线程就会切换一次。
- Schedulers.io():代表IO操作的线程,通常用于网络、读写文件等IO密集型的操作。行为模式和new Thread()差不多,只是IO的内部是一个无上限的线程池,可重用空闲的线程,更高效(不要把计算工作放在IO内,可以避免创建不必要的线程)
 - AndroidSchedulers.mainThread():Android的主线程;用于更新UI
 - Schedulers.newThread():总是启用新线程,并在新线程中执行操作;多用于耗时操作
 - Schedulers.computation(): 代表CPU计算密集型的操作,即不会被IO等操作限制性能的操作。
 



















