前言
这篇文章主要介绍Flow的一些基础使用方法
 同时介绍如何用Flow请求网络数据
 下面开始!
什么是Flow
Flow翻译过来,是“流”的意思
 举例说明,在大自然中,常见的如水流
 是从高往低流动的
 那么在计算机世界里,所谓的“流”
 其实指的是数据流
 也就是从获取原始数据,到进行处理,最后使用的过程
 比如拿到一个json,转换为bean
 然后进行筛选和过滤
 拿到最后要用的最终数据
 这个过程就称之为数据的流动
 如下图:
 
 而为了处理这个过程
 我们就可以使用到Flow这个工具
Flow流的使用
简单使用
要使用Flow,首先需要导入协程相关工具类:
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7-mpp-dev-11'
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.7-mpp-dev-11'
 
最简单的Flow使用:
suspend fun flow1() {
    flow<Int> {
        (0..4).forEach {
            emit(it)//生产者发送数据
        }
    }.collect {
        Log.d("flow1", "it:$it")//消费者处理数据
    }
}
 
我们分析下这段代码:
 1:(0…4)是一个0-4的列表,这个是原始数据
 2:emit其实就是把原始数据发送出去
 3:collect用来收集发送的原始数据,并且内部自己打印了收到的数据
 4:flow { } 函数包裹的代码块就是负责发送数据的,这个函数返回一个Flow对象
 注意:Flow流是“冷流”
 意思是 collect 被调用后 flow 内的方法体才会被调用
 如果没有collect方法,不管如何emit,数据都是不会发送出去的
流操作符
Flow对于数据的操作,为我们提供了一系列的API
 我们称之为“操作符”
 一般分为“流构建器”、“中间操作符”和“末端操作符”
 流构建器一般用来构建Flow流对象
 中间操作符仅仅只是预先定义一些对流的操作方式,
 比如过滤,转换等
 并不会主动触发动作执行
 而末端操作符则是对流的最终处理
 比如collect就是末端操作符
 下面就介绍一些操作符
流构建器
flowof
可以将 flowOf 内的可变长参数一一发射
flowOf(1, 2, 5, 4).collect {
        println(it)
}
 
asFlow
flowOf 可以将集合转换成 flow 发射
suspend fun asFlowM(){
    listOf(1,2,9,0,8).asFlow().collect{
        println(it)
    }
}
 
中间操作符
map
我们可以再 map 中执行一些过渡操作,
 比如本例中将生产者发送的数据*9,然后再发射给消费者
 值得一提的是,我们是可以再 map 中进行异步操作的
 注意,这个map和集合没什么关系,别被误导了
suspend fun mapM(){
    (1..9).asFlow().map {
        it*9
    }.collect{
        println(it)
    }
}
 
transform
transform 主要强调的是类型的转换
(1..3).asFlow() // 一个请求流
        //transform中的泛型<Int,String> 表示将Int类型转换为String后,继续发射
        .transform<Int, String> { request ->
            emit("transform Int to String $request")
        }
        .collect { response -> println(response) }
 
take
限长操作符 take 可以限定我们要消费的数据的数量,见代码
(1..9).asFlow().take(3).collect {
        println(it)
}
 
conflate
当生产者发射数据速度大于消费者的时候,消费者只能拿到生产者最新发射的数据
suspend fun conflate(){
    flow<Int> {
        (1..9).forEach {
            delay(100)
            emit(it)
        }
    }.conflate().collect {
        delay(300)
        println(it)
    }
}
 
比如上面这段代码,因为有conflate的存在,输出如下:
1
3
6
9
 
如果没有conflate存在输出如下:
1
2
3
4
5
6
7
8
9
 
两者对比,明显能发现使用conflate的例子替我们忽略了很多无法即时处理的数据
collectLast
这个操作符的意思:如果生产者数据以及发射过来了,消费者还没有把上一个数据处理完,那么直接停止处理上一条数据,直接处理最新的数据
suspend fun collectLastM(){
    flow<Int> {
        (1..9).forEach {
            delay(100)
            emit(it)
        }
    }.collectLatest {
        delay(800)
        println(it)
    }
}
 
比如本例的输出为9
zip
zip操作符可以把两个流合并为一个流,然后再zip方法中将两个流发射的数据进行处理组合后继续发射给消费者,
 如果两个流长度不一致,按比较短的流来处理:
 1.两个流长度一致,都是3
suspend fun zipM(){
    val flow1 = (1..3).asFlow()
    val flow2 = flowOf("李白","杜甫","安安安安卓")
    flow1.zip(flow2){a,b->
        "$a : $b"
    }.collect {
        println(it)
    }
}
 
输出:
1 : 李白
2 : 杜甫
3 : 安安安安卓
 
上面的代码我们进行一下改变,将flow1的长度改为5
val flow1 = (1..5).asFlow()
 
查看输出结果:
1 : 李白
2 : 杜甫
3 : 安安安安卓
 
所以验证一下我们开头的结论,两个长度不同的流zip合并,消费者输出的数据长度是较短的流的长度
combine
上一节zip的缺点我们清楚了,就是两个流长度不等的时候,较长的流后面部分无法输出
那么combine就是用来解决zip这个缺点的(也很难说是缺点,只是应用场景不同罢了,你姑且可以认为是缺点)
suspend fun combineM(){
    val flowA = (1..5).asFlow()
    val flowB = flowOf("李白","杜甫","安安安安卓")
    flowA.combine(flowB){a,b->
        "$a : $b"
    }.collect {
        println(it)
    }
}
 
输出日志:
1 : 李白
2 : 李白
2 : 杜甫
3 : 杜甫
3 : 安安安安卓
4 : 安安安安卓
5 : 安安安安卓
 
我们的两个流,数字流长度为5,字符串流为3。
实现的效果简单逻辑分析:
flow发射1,flow2发射 ”李白“ ,打印:1 : 李白
flow发射2,flow2未发射数据  ,打印:2 : 李白
flow未发射,flow2发射 ”杜甫“ ,2 : 杜甫
flow发射3,flow2未发射 ,打印:3 : 杜甫
flow未发射,flow2发射 ”安安安安卓“ ,打印:3 : 安安安安卓
flow发射4,flow2发射完成  ,打印:4 : 安安安安卓
flow发射5,flow2发射完成  ,打印:5 : 安安安安卓
 
onCompletion
使用onCompletion可以再流完成的时候再发送一个值
 flowOf(1, 23, 5, 3, 4).onCompletion {
        println("流操作完成")
        emit(12344)//这里不返回值也没关系
    }.collect {
        println(it)
    }
 
输出:
1
23
5
3
4
流操作完成
12344
 
末端操作符
toList
会把数据消费到一个 List 列表中
suspend fun toList():List<Int> {
   return (1..9).asFlow().filter { it % 2 == 0 }.toList()
}
 
toSet
同 toList
frist
获取第一个元素
suspend fun firstM(): Int {
    return (2..9).asFlow().filter { it % 2 == 1 }.first()
}
 
reduce
reduce 的兰布达表达式会提供运算公式负责计算。
在 reduce 的兰布达表达式中,可以对当前要消费的值和之前计算的值进行计算,得到新值返回。所有值消费完成后返回最终值
suspend fun reduceM():Int {
    return (1..9).asFlow().reduce { accumulator, value ->
        println("$accumulator : $value")
        accumulator + value
    }
}
 
buffer
buffer可以缓存生产者数据,不会被消费者阻塞
suspend fun bufferM() {
    val startMillis = System.currentTimeMillis()
    flow<Int> {
        (1..3).forEach {
            delay(300)
            emit(it)
        }
    }.buffer(4)
        .collect {
            delay(400)
            println(it)
            println("时间已经过了${System.currentTimeMillis() - startMillis}")
        }
}
 
代码执行打印日志:
1
时间已经过了745
2
时间已经过了1148
3
时间已经过了1552
 
如果我们没有用buffer,那么总时长应该2100ms
 使用了buffer总时长是:1552=300+400*3
 所以使用buffer的时候生产者可以并发发射数据,不会被消费者阻塞
流异常
使用try/catch包裹流
 我们是可以使用try/catch来收集流异常的,但是不建议用这种方法
 使用flow的catch操作符处理流
 使用flow 的catch操作符处理异常更优雅
 不过catch也有缺点,它只能捕获生产者的异常不能捕获消费者的异常
suspend fun trycatch() {
    flow<Int> {
        (1..3).forEach {
            if (it == 2) {//故意抛出一个异常
                throw NullPointerException("强行空指针,嘿嘿嘿嘿")
            }
            emit(it)
        }
    }.catch {e->
        e.printStackTrace()
        emit(-1)//异常的情况下发射一个-1
    }.collect{
        println(it)
    }
}
 
消费者的异常如何处理
 尝试在消费者中抛出异常,查看是否可以被捕获
 flow<Int> {
        for (i in 1..3) {
            emit(i)
        }
    }.catch {
        emit(-1)
    }.collect {
        if(it==2){//在消费者中抛出数据
            throw IllegalArgumentException("数据不合法")
        }
        println(it)
    }
 
输出:
1
Exception in thread "main" java.lang.IllegalArgumentException: 数据不合法
	at HahaKt$consumerCatch$$inlined$collect$1.emit(Collect.kt:138)
 
将异常代码放在onEach中catch异常
suspend fun consumerCatch() {
    flow<Int> {
        for (i in 1..3) {
            emit(i)
        }
    }.onEach {
        if (it == 2) {//与上面的不同,在消费之前先用onEach处理一下
            throw IllegalArgumentException("数据不合法")
        }
    }.catch {
        emit(-1)
    }.collect {
        println(it)
    }
}
 
输出:
1
-1
 
相关资料
Kotlin Flow详解
 Kotlin Flow啊,你将流向何方?
 官方 flow 地址
 使用 Kotlin Flow 构建数据流 “管道”



















