专家之路上的Flow高级秘籍

news2025/5/22 13:23:23

公众号「稀有猿诉」        原文链接 专家之路上的Flow高级秘籍

『君不见,黄河之水天上来,奔流到海不复回。』

学习与河流一样,一方面学无止境,又是逆水行舟,不进则退,因为其他人都在卷。前文一篇文章讲了Flow的基础,大多数情况下够用了,但是不能停止卷,因为你不卷,就会被别人卷。一旦涉及到复杂的应用场景,就需要用到一些高级的API。今天就来学习一下Flow的高级特性,当遇到问题时也能更从容的应对。

上下文切换

Flow是基于协程的,是用协程来实现并发,前面也提到过像flow {…},在上游生产数据,以及中游做变幻时,都是可以直接调用suspend,耗时甚至是阻塞的函数的。而终端操作符如collect则是suspend的,调用者(也就是消费者)需要负责确保collect是在协程中调用。我们还知道Flow是是冷流,消费者终端才会触发上游生产者生产,所以对于flow {…}来说,它的上游和中游运行的上下文来自于终端调用者的上下文,这个叫做『上下文保留』(context preservation),我们可以用一个🌰 来验证一下:

fun main() = runBlocking {
    // Should be main by default
    simple().collect { log("Got: $it") }

    // Collect in a specified context
    withContext(Dispatchers.Default) {
        simple().collect { log("Now got: $it") }
    }
}

private fun simple(): Flow<Int> = flow {
    log("Started the simple flow")
    for (i in 1..3) {
        delay(100)
        log("Producing $i")
        emit(i)
    }
}

输出如下:

[main @coroutine#1] Started the simple flow
[main @coroutine#1] Producing 1
[main @coroutine#1] Got: 1
[main @coroutine#1] Producing 2
[main @coroutine#1] Got: 2
[main @coroutine#1] Producing 3
[main @coroutine#1] Got: 3
[DefaultDispatcher-worker-1 @coroutine#1] Started the simple flow
[DefaultDispatcher-worker-1 @coroutine#1] Producing 1
[DefaultDispatcher-worker-1 @coroutine#1] Now got: 1
[DefaultDispatcher-worker-1 @coroutine#1] Producing 2
[DefaultDispatcher-worker-1 @coroutine#1] Now got: 2
[DefaultDispatcher-worker-1 @coroutine#1] Producing 3
[DefaultDispatcher-worker-1 @coroutine#1] Now got: 3

从这个🌰 可以清楚的看到,Flow的context是来自于终端调用者的。

用flowOn来指定上下文

有时候使用终端调用者的上下文可能不太方便,因为生产者与消费者的模式其实是解耦的,它们不应该相互受制于对方,对于关键的并发的上下文更是如此。比如说在GUI的应用中,明显应该在工作线程中生产数据,在UI线程中消费数据,从上面的例子来看,由终端调用者来决定上游上下文明显不可取。有同学举手了,欺负我没学过协程是吧?我可以在Flow内部使用withContext来指定上下文啊,我们来试试:

fun main() = runBlocking {
    // Should be main by default
    simple().collect { log("Got: $it") }
}

private fun simple(): Flow<Int> = flow {
    withContext(Dispatchers.Default) {
        log("Started the simple flow")
        for (i in 1..3) {
            delay(100)
            log("Producing $i")
            emit(i)
        }
    }
}

这位同学可以直接出去了,因为你的代码crash 了:

[DefaultDispatcher-worker-1 @coroutine#1] Started the simple flow
[DefaultDispatcher-worker-1 @coroutine#1] Producing 1
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@545486c7, BlockingEventLoop@13bfcf14],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@27015c5a, Dispatchers.Default].
		Please refer to 'flow' documentation or use 'flowOn' instead

意思大概是说Flow内部不让直接用withContext来切上下文,破坏了Flow的不变式,想切上下文要用flowOn。而且仔细看,异常是由emit函数抛出来的。

其实Flow的设计者已经考虑到了这个问题,并且给出了优雅的方式,如果想切换Flow内部(也即上游和中游)的运行上下文,要用flowOn函数:

fun main() = runBlocking {
    // Should be main by default
    simple().collect { log("Got: $it") }
}

private fun simple(): Flow<Int> = flow {
    log("Started the simple flow")
    for (i in 1..3) {
        delay(100)
        log("Producing $i")
        emit(i)
        Thread.sleep(50)
    }
}.flowOn(Dispatchers.Default)
//[DefaultDispatcher-worker-1 @coroutine#2] Started the simple flow
//[DefaultDispatcher-worker-1 @coroutine#2] Producing 1
//[main @coroutine#1] Got: 1
//[DefaultDispatcher-worker-1 @coroutine#2] Producing 2
//[main @coroutine#1] Got: 2
//[DefaultDispatcher-worker-1 @coroutine#2] Producing 3
//[main @coroutine#1] Got: 3

这回就和谐多了,后台搞生产,UI只展示,完美!还需要特别注意的是函数flowOn只影响它的上游,不影响它的下游,更不会影响终端,终端永远都在其调用者的上下文中,来看一个🌰 :

withContext(Dispatchers.Main) {
    val singleValue = intFlow // will be executed on IO if context wasn't specified before
        .map { ... } // Will be executed in IO
        .flowOn(Dispatchers.IO)
        .filter { ... } // Will be executed in Default
        .flowOn(Dispatchers.Default)
        .single() // Will be executed in the Main
}

第一个flowOn切到IO,只影响到它前面的创建和map,第二次切换到Default,只影响filter。single是终端,是在Main,因为它的调用者是在Main里面。

注意,注意: Flow是一个数据流,保持其数据流的特点是相当重要的,无论是正常数据,异常数据,还是出错都是一种数据,应该让其自上而下的流动,在中游变幻时或者终端时通过操作符来处理。所以,像硬性的上下文切换,或者异常的try/catch都是不允许的。这就是所谓的流的不变性(Flow invariant)。后面讲异常时还会提到这点。

任意上下文的Flow builders

从前面的学习我们知道了,下下文保留的特性,终端会决定上游生产者的上下文,当然也可以通过flowOn来改变上下文。Flow builder其实就是一个生产者,异步的emit数据。但有些时候生产数据时的上下文,也就是调用emit时的上下文,是不确定的。比如说安卓 上面的各种回调(callback)有些是回调在调用者的线程里,有些则不是。flow {…}中的emit就不能在异步的回调里面调用,这时就要用callbackFlow {…}。callbackFlow专门适用于把现有的一些回调转为Flow,最典型的应用就是位置信息:

fun locationFlow(): Flow<Location> = callbackFlow {
	val listener = object : LocationListener {
		override fun onLocationUpdate(loc: Location) {
			trySend(location)
		}
	}
	locationManager.reqisterLocaitonUpdates(listener)
	
	awaitClose {
		locationManager.unregisterLocationUpdates(listener)
	}
}

如果这个Flow,用flow {}去创建会抛异常,因为emit没法在回调中使用。callbackFlow会在回调中发射数据,并在awaitClose代码块中反注册回调以清理资源。awaitClose会在这个流结束时(完成或者被取消)被回调到,以有机会进行资源清理。

其实,无论是flow {}还是callbackFlow {}都是channelFlow {}的简单化,channelFlow非常复杂,也超级强大,它可以自带buffer,自带并发,适用于创建一些非常复杂的Flow。在多数时候flow {}和callbackFlow {}就够我们用了。

扩展阅读:

  • Android Kotlin Coroutines: what is the difference between flow, callbackFlow, channelFlow,… other flow constructors
  • Kotlin 协程四 —— Flow 和 Channel 的应用
  • [译]轻松学习Kotlin的Flow、ChannelFlow和CallbackFlow
  • 轻松搞定Kotlin的Flow, ChannelFlow和CallbackFlow - 2

副作用函数

Flow是一个数据流,核心思想是把数据的生产和处理和最终消费分开,上游只负责生产数据,各种操作都应该由中游操作符来做,最终数据由终端消费掉。需要加强数据的封装性,和流的不变性,不破坏管道,用各种转换器来对数据进行操作。那么,对于流何时开始,每个数据何时产生,流什么时候终止,这些事件对于调试来说是很有帮助的。Flow的设计者给出了一系列副作用函数来做之些事情。副作用的意思就是这些函数不会对流本身产生影响。

  • onStart Flow开始生产之前会调用此函数。
  • onEach 在生产(emit)每个数据之前调用此函数,这个函数最常用被用来打日志,以查看每个产生的数据。
  • onCompletion 当Flow终止时或者被取消后会调用此函数。
  • onSubscritpion 有消费者了时调用此函数(也就是有人collect了此Flow时)。

异常,取消和错误处理

这一小节重点来看看非正常代码逻辑的处理。先来看看异常处理(Exception handling)。

用catch函数来处理Flow过程中的异常

代码随时都可能抛出异常,所以异常处理是一个必须要考虑的事情。当然可以在Flow的各个节点如上游生产,中游变幻和下游终端的代码块里面各种try/catch。一来是不够优雅,再者这会破坏Flow的不变性或者说一致性,它就是管道,数据在里面流动,不应该加以过多的干扰,想要对数据处理应该用操作符。也就是说要让异常(包括其他错误也是如此)对Flow是透明的,意思是说Flow并不关心是否有异常。所以提供了一个catch函数,它的作用是捕获并处理上游操作中发生的异常:

simple()
    .catch { e -> emit("Caught $e") } // emit on exception
    .collect { value -> println(value) }

需要注意catch与flowOn一样,只影响上游发生的异常,管不了下游:

flow { emitData() }
    .map { computeOne(it) }
    .catch { ... } // catches exceptions in emitData and computeOne
    .map { computeTwo(it) }
    .collect { process(it) } // throws exceptions from process and computeTwo

取消Flow

Flow没有显式的取消函数。Flow是冷流,有消费者时才会去生产数据,消费者停止消费了,Flow自然也就被取消了。终端操作都是suspend的,也就是要在协程中调用,因此取消终端调用的协程,就会取消Flow。

错误处理

其实没有特别的错误处理函数,前面的异常算是一个,如果上游没有抛出异常,就不会有其他错误了,因为错误也是数据的一种类型,并且是由我们自己根据场景来定义的。比如说从网络获取新闻列表,正常时的数据当然是一个个的新闻条目。出错了,比如无网络,或者服务器无响应,这时可能返回一个空的条目,里面有错误的具体信息。但这都是由业务逻辑决定的,是业务逻辑层面的东西。对于Flow而言,都还是有数据的,都是一种数据,具体数据的解读,那是消费者终端的事情,Flow并不关心。

唯一算得上错误处理的函数就是onEmpty,它会在Flow是空的时候,也就是不生产任何数据的时候被回调。可以在onEmpty里面生产emit数据,比如产生一个带有错误信息的数据,或者产生一个默认值。因为Flow为空,不产生emit任何数据时,管子是空的数据没有流动,Flow的整个链路,特别是终端collect是不会被执行的,这时可能会有问题,比如UI根本无法做出任何react,除非你设置过了默认UI状态,否则可能会不对。这个时候如果用onEmpty去产生一些默认值或者错误信息的话,就能激活整个Flow,终端能做出预期的响应。

重试机制

另一个非常有用的函数就是retry,它可以预设一个条件,当条件满足时就会触发重新collect。Flow是冷流,有消费者collect时才会触发生产者emit数据,因此重新collect就能让Flow重新emit数据流。

背压

Flow是异步数据流,响应式编程范式,上游生产数据,下游终端消费数据。有时候可能会遇到这样一种情况,就是上游数据生产的速度超过了下游终端的消费速度,这会造成数据流积压在管道中,终端无法及时响应。这种情况称为『背压(Back pressure)』。想像一下一个水管,如果进水速度大于水龙头流出的速度,水就会积压在水管里,如果水管是比较薄弱的(如气球),那么它会膨胀,最后爆掉。

通常情况下,当上游是较为可控的生产者时,不会产生背压,但如果是一些不是开发人员可控的,如硬件(触摸事件,位置信息,传感器,摄像头),其他系统(系统框架的回调,或者服务器的Push)等等,就会产生背压,这时必须进行相应的处理。所有的FRP式异步数据流API都必须处理『背压』,Flow也有相应的API来处理:

  • buffer 把生产者的emit的数据缓存,然后用Channel以并发的方式流向中游和下游,可以简单理解为并发地调用collect。正常情况下Flow是顺序的(Sequentially),就是数据从上游到中游再到终端,按顺序流动,先生产的数据先流到collect,这就是顺序的数据流sequentially。用上buffer后,就是会是并发的流,先emit的数据不一定先到collect,这就是concurrently。明显,能用buffer的前提是终端处理数据时没有对数据顺序的依赖。
  • conflate 也会像buffer一样启动并发式emit数据,但未能及时被终端消费掉的数据会被丢弃,终端只处理最新数据。
  • collectLatest 当有新的数据流出来时,终端只处理最新的数据,此之的终端处理会被取消掉(如果还没有处理完)。

转为热流

常规的Flow都是冷的(cold flow),但有时热流(hot flow)也有它的应用场景,Flow API中也有创建热流的方法。

StateFlow

StateFlow是一个『状态持有』流,它仅包含一个当前元素value,可以用过update来更新此状态。它是一个热流,可以有多个终端colloctor,每次更新都会把当前的值emit给所有的终端。

可以用构造方法MutableStateFlow创建一个StateFlow,或者通过函数stateIn来把一个冷流转化为一个StateFlow。

StateFlow是比较常用的,在安卓开发中,几乎所有的ViewModel都会用StateFlow来暂存UI状态数据。

SharedFlow

比StateFlow更为通用的便是通用的热流SharedFlow。可以通过构造方法MutableSharedFlow来创建SharedFlow,或者通过函数sharedIn把一个冷流转为SharedFlow。

SharedFlow可以有多个终端collector,所以可以实现一对多的通知,如实现观察者模式,或者像设置/配置更新,或者广播等等就可以考虑用SharedFlow来实现。

扩展阅读:

  • StateFlow and SharedFlow
  • SharedFlow vs StateFlow,一篇看懂选择和使用技巧
  • Kotlin SharedFlow&StateFlow 热流到底有多热?
  • ShareFlow与StateFlow实战

欢迎搜索并关注 公众号「稀有猿诉」 获取更多的优质文章!

原创不易,「打赏」「点赞」「在看」「收藏」「分享」 总要有一个吧!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1459427.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

单片机02_寄存器_GPIO设置

芯片概述 C51&#xff1a;0口、1口、2口、3口&#xff0c;P00~p07、P10~P17、P20~P27、P30~P37 STM32&#xff1a;A口、B口、C口、D口&#xff0c;PA0~PA15/PA5 GPIOA.5 STM32F407ZGT6有7组GPIO端口&#xff0c;分别是&#xff1a;A B C D E F G&#xff0c;每组均有16个GPIO端…

com.alibaba.fastjson.JSONException: toJSON error的原因

问题&#xff1a; 导出接口报错&#xff0c;显示json格式化异常 发现问题&#xff1a; 第一个参数为HttpResponse,转换成json的时候报错 修改方法&#xff1a; 1.调换两个参数的位置 2.在aop判断里边 把ServletAPI过滤掉 Before("excudeWebController()")pub…

Leetcode1206(设计跳表)

例题&#xff1a; 分析&#xff1a; 我们先来找一找跳表与单链表的相同点和不同点。 相同点&#xff1a; 跳表和单链表一样&#xff0c;都是由一个一个的节点组成的链表。 不同点&#xff1a; ①&#xff1a;跳表中的元素已经是排好序的&#xff08;图中从小到大&#xff09;&…

突破性创新:OpenAI推出Sora视频模型,预示视频制作技术的未来已到来!

一、前言 此页面上的所有视频均由 Sora 直接生成&#xff0c;未经修改。 OpenAI - Sora is an AI model that can create realistic and imaginative scenes from text instructions. 2024 年 2 月 16 日&#xff0c;OpenAI 发布 AI 视频模型 Sora&#xff0c;60 秒的一镜到底…

STM32-启用蜂鸣器

目录 1 、电路构成及原理图 2、编写实现代码 main.c beep.c beep.h 3、代码讲解 4、 烧录到开发板调试、验证代码 5、检验效果 本人使用的是朗峰 STM32F103 系列开发板&#xff0c;此笔记基于这款开发板记录。 1 、电路构成及原理图 首先&#xff0c;通过朗峰 F1 开…

VILT算法解读

VILT是一种典型的单塔结构&#xff0c;不同于双塔结构由两个独立的Image Encoder以及Text Encoder组成&#xff08;比如clip&#xff09;&#xff0c;单塔结构的模型一般只有一个共用的编码器&#xff0c;称为Multi-Modal Encoder。 1、VILT算法原理 VILT被认为是最简单的…

SpringBoot中使用PageHelper插件实现Mybatis分页

场景 SpringBoot中整合Mybatis时一般添加的依赖为 <dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.2.1</version></dependency> 如果要实现分页查…

PostgreSQL按日期列创建分区表

在PostgreSQL中&#xff0c;实现自动创建分区表主要依赖于表的分区功能&#xff0c;这一功能从PostgreSQL 10开始引入。分区表可以帮助管理大量数据&#xff0c;通过分布数据到不同的分区来提高查询效率和数据维护的便捷性。以下是在PostgreSQL中自动创建分区表的一般步骤&…

docker之安装mongo创建运行环境

目录 一、docker pull 最新资源 二、启动mongo镜像 启动命令查看日志拉取低版本镜像成功启动 三、进入mongo容器 进入容器进入mongo环境查询当前所在库切换库至admin随意切换库 并 创建用户登录用户新增文档数据等 五、总结 版本兼容可备份操作 一、docker pull 最新资源…

java面试集合篇

上面是java中集合的整体框架图。 集合使用的数据结构 算法复杂度分析 时间复杂度分析 时间复杂度分析&#xff1a;来评估代码的执行耗时的 /*** 求1~n的累加和* param n* return*/ public int sum(int n) {int sum 0;for ( int i 1; i < n; i) {sum sum i;}return …

【常识】大数据设计基础知识

底层存储&#xff1a;hadoop&#xff08;hdfsmapreduce&#xff09; Hadoop已经有十几年的历史&#xff0c;它是大数据领域的存储基石&#xff0c;HDFS目前仍然没有成熟替代品;MapR 文件系统在业内已经具有一定知名度了&#xff0c;不仅 MapR 宣布它自己的文件系统比 HDFS 快2-…

模板(函数模板)---C++

模板目录 模板1.模板概念&#xff12;.泛型编程 1.函数模板1.1 函数模板语法1.2 函数模板注意事项1.3 普通函数与函数模板的区别1.4 普通函数与函数模板的调用规则1.5 模板的局限性1.6 函数模板案例 模板 1.模板概念 模板就是建立通用的模具&#xff0c;大大提高复用性。 模板…

如何在本地服务器部署TeslaMate并远程查看特斯拉汽车数据无需公网ip

文章目录 1. Docker部署TeslaMate2. 本地访问TeslaMate3. Linux安装Cpolar4. 配置TeslaMate公网地址5. 远程访问TeslaMate6. 固定TeslaMate公网地址7. 固定地址访问TeslaMate TeslaMate是一个开源软件&#xff0c;可以通过连接特斯拉账号&#xff0c;记录行驶历史&#xff0c;统…

解决Edge浏览器,微博无法查看大图(Edge Image Viewer)

使用Edge浏览器浏览微博或其它带校验的图片时&#xff0c;会导致无法查看。 主要原因为Edge自带了一个Edge Image Viewer, 但是该图片查看器无法查看带校验数据的图片&#xff0c;所以导致查看时一片空白。 解决方法 地址栏输入 edge://flags/搜索 Edge Image Viewer选择 Disa…

爬虫学习笔记-scrapy爬取电影天堂(双层网址嵌套)

1.终端运行scrapy startproject movie,创建项目 2.接口查找 3.终端cd到spiders,cd scrapy_carhome/scrapy_movie/spiders,运行 scrapy genspider mv https://dy2018.com/ 4.打开mv,编写代码,爬取电影名和网址 5.用爬取的网址请求,使用meta属性传递name ,callback调用自定义的…

JVM原理学习

一.栈上的数据存储P95 二.堆上的数据存储 标记字段 指针压缩(节省空间 内存对齐(提高CPU缓存行效率 字段重排列方便内存对齐 类排在基本类型之后 三.JIT实时编译 优化手段 C2编译器&#xff0c;直接将循环相加求和优化为乘法。 方法内联 逃逸分析 四.G1垃圾回收器原理 年轻代…

Vue | (三)使用Vue脚手架(中)| 尚硅谷Vue2.0+Vue3.0全套教程

文章目录 &#x1f4da;Todo-list 案例&#x1f407;组件化编码流程&#xff08;通用&#xff09;&#x1f407;实现静态组件&#x1f407;展示动态数据&#x1f407;交互⭐️添加一个todo⭐️todo勾选实现⭐️删除功能实现⭐️底部统计功能实现⭐️底部全选功能实现⭐️底部一…

Vue报错,xxx is defined #变量未定义

vue.js:5129 [Vue warn]: Error in v-on handler: "ReferenceError: count is not defined" 浏览器将这个变量 当做全局变量了&#xff0c;事实上它只是实例中的变量 加上this指定&#xff0c;是vue实例中的变量

Android之Android.bp文件格式语法(一百八十六)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

代码随想录算法训练营day20

题目&#xff1a;530.二叉搜索树的最小绝对差、501.二叉搜索树中的众数、236. 二叉树的最近公共祖先 参考链接&#xff1a;代码随想录 530.二叉搜索树的最小绝对差 思路&#xff1a;我一开始想到的方法是先生成中序序列&#xff0c;然后对相邻两项的差进行计算&#xff0c;取…