没经过我同意,flink window就把数据存到state里的了?

news2025/5/15 21:49:11

欢迎关注我

不知道大家在初次使用Flink的时候,是否对Flink中定义本地变量和状态比较好奇,这俩有啥区别?

而且在使用Window API时明明没有显式地创建状态,也没调用getState(),却依然把每个窗口里的所有元素都自动缓存到 StateBackend里,这到底是怎么做到的?它怎么可以自作主张呢。

本地变量 vs Managed State

我们先来看看第一个问题,怎么清楚的区分本地变量和状态的区别呢?我们看下表:

特性本地变量(Local Variable / Field)Managed State (ValueState, ListState…)
生命周期仅在当前 processElement() 或算子实例初始化时有效;Task 重启或 failover 后重置为初始值跨事件、跨 checkpoint 持久化;重启后按最新 checkpoint 恢复
容错不参与 Flink 容错;Task 重启或 Job 恢复后丢失参与 checkpoint/savepoint;保证 Exactly-once 语义
序列化不会被 Flink 自动序列化;只存在 JVM 堆栈或算子对象里通过 TypeSerializer 序列化到 StateBackend(内存或 RocksDB)
使用场景临时计数、方法内临时缓存,无需跨事件保留需要累积、聚合、窗口缓存、跨事件关联时使用

简单写个代码看看

//本地字段无法容错 
public class MyMapFunction extends RichMapFunction<Event, Integer> {
    private int counter = 0;  // 普通字段

    @Override
    public Integer map(Event value) {
        counter += 1;
        return counter;       // 失败重启后,counter 会被重置为 0
    }
}

//使用 Managed State
public class MyStatefulMap extends RichMapFunction<Event, Integer> {
    private transient ValueState<Integer> counterState;

    @Override
    public void open(Configuration cfg) {
        counterState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("counter", Integer.class));
    }

    @Override
    public Integer map(Event value) throws Exception {
        Integer cnt = Optional.ofNullable(counterState.value()).orElse(0);
        cnt += 1;
        counterState.update(cnt);  // 这个值会被 checkpoint 序列化并恢复
        return cnt;
    }
}

Window怎么存元素到state的?

Flink 的 Window 算子并没有让我们在代码里手动 getState(),我们一般都只是这样写:

dataStream
  .keyBy(Event::getUserId)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .apply(new ProcessWindowFunction<Event, Result, String, TimeWindow>() {
      @Override
      public void process(String key,
                          Context ctx,
                          Iterable<Event> elems,
                          Collector<Result> out) {}
  });

却能自动把 5 分钟窗口里的所有 Event 缓存起来。而且在强哥之前的文章中也提到,如果Window定义的时间跨度太长,缓存在state里面的数据过多,可能会对服务性能造成影响,官网也是提到了的:

Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the Window Assigners section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.

那么,Window背后究竟发生了什么?我们来看关键源码位置(注:以下源码基于flink-streaming-java:1.18.1)。

1. 隐式注册StateDescriptor

我们先看示例代码

source
.map(new MapFunction<String, Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> map(String value) {
        return Tuple2.of(value, 1);
    }
})
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new WindowFunctionExample())
;

在执行process方法的时候,Flink会调用org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorBuilder#apply(org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction<java.lang.Iterable<T>,R,K,W>),创建WindowOperator

   private <R> WindowOperator<K, T, ?, R, W> apply(
            InternalWindowFunction<Iterable<T>, R, K, W> function) {
        if (evictor != null) {
            return buildEvictingWindowOperator(function);
        } else {
            ListStateDescriptor<T> stateDesc =
                    new ListStateDescriptor<>(
                            WINDOW_STATE_NAME, inputType.createSerializer(config));

            return buildWindowOperator(stateDesc, function);
        }
    }

可以看到这里创建了ListStateDescriptor

然后,在执行Window的生命周期方法org.apache.flink.streaming.runtime.operators.windowing.WindowOperator#open(),Flink 会为每个 key + window 分配一个内部缓存state:

// create (or restore) the state that hold the actual window contents
// NOTE - the state may be null in the case of the overriding evicting window operator
if (windowStateDescriptor != null) {
    windowState =
            (InternalAppendingState<K, W, IN, ACC, ACC>)
                    getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
}

 public <N, S extends State, V> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(namespaceSerializer, "Namespace serializer");
        Preconditions.checkNotNull(this.keySerializer, "State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        InternalKvState<K, ?, ?> kvState = (InternalKvState)this.keyValueStatesByName.get(stateDescriptor.getName());
        if (kvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            }

            kvState = LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled((InternalKvState)TtlStateFactory.createStateAndWrapWithTtlIfEnabled(namespaceSerializer, stateDescriptor, this, this.ttlTimeProvider), stateDescriptor, this.latencyTrackingStateConfig);
            this.keyValueStatesByName.put(stateDescriptor.getName(), kvState);
            this.publishQueryableStateIfEnabled(stateDescriptor, kvState);
        }

        return kvState;
    }

这里的 windowState 就是 Flink 为你隐藏起来的“窗口元素缓存”:

  • windowState 存放当前 window 的所有元素
  • Flink 会在每次触发窗口(watermark 到达或触发器触发)时,把这个windowState 内容取出来,交给你的 process() 方法
  • 当窗口关闭后,会 clear()这个windowState

2. 数据到达:write to state

WindowOperator#processElement() 中,Flink 收到新的事件时,会执行:

// 把元素追加到 windowState
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
……
//触发器触发,则获取窗口状态内容
TriggerResult triggerResult = triggerContext.onElement(element);

if (triggerResult.isFire()) {
    ACC contents = windowState.get();
    if (contents == null) {
        continue;
    }
    emitWindowContents(window, contents);
    
   if (triggerResult.isPurge()) {
                windowState.clear();
            }
            registerCleanupTimer(window);
}

这段代码每来一条事件,就把它 序列化 并写入到 StateBackend(Memory/RocksDB),而不是保存在 Java内存对象里。

同时,如果是触发了触发器,则会返回窗口内容。还会创建一个定时器,定时执行窗口计算。

3. 定时触发:read from state

当定时器触发时,WindowOperator#onEventTime() 会调用:

TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
//触发器触发
if (triggerResult.isFire()) {
    // 读取并反序列化所有元素
    ACC contents = windowState.get();
    if (contents != null) {
        emitWindowContents(triggerContext.window, contents);
    }
}

// 清空 state
if (triggerResult.isPurge()) {
    windowState.clear();
}                          

add())到 get())再到 clear()),整个过程都是通过 Flink 的 StateBackend 完成的——这保证了:

  1. Exactly-once 语义:即使 TaskManager 宕机或网络抖动,StateBackend 里缓存的窗口数据都能在恢复后自动重新加载。
  2. 水平扩容/重分区:当做 rescale 操作或 job 重启时,Flink 会把 state 分片迁移到新的并行度实例,保证窗口数据完整。

为什么 Window 必须序列化?

  1. 分布式容错
    Window 算子可能在多个 TM 上并行,节点故障、网络分区、作业重调度都可能发生,只有把窗口数据写到 StateBackend,才能在恢复时不丢失任何事件。

  2. Checkpoint & Savepoint
    Flink 借助 Kafka、文件系统做 checkpoint/savepoint;所有 state(包括窗口元素)都会被打包到 checkpoint 里,保证 Exactly-once 与故障恢复。

  3. 弹性伸缩
    扩容、缩容时需要重新分配并行任务,StateBackend 会把各个 key + window 的数据按照新的并行度迁移到对应实例。

总结

源码分析完了,写个小总结吧

  • 本地变量 只能在当前算子实例、当前方法调用中生存,不会参与序列化;重启或缩容后会丢失。
  • Managed State(包括我们手动声明的 ValueState、也包括 WindowOperator 背后隐式的 ListState)会被 Flink 序列化到 StateBackend,参与 checkpoint/savepoint、支持容错恢复和重分区。
  • 虽然 Window API 没让你在代码里 getState(),但其核心实现却在算子初始化时自动注册了 ListStateDescriptor,并在 processElement() / onTimer() 里读写、清理这个 state。

了解了这套机制,你就能:

  1. 在自定义算子里灵活选择到底用本地变量还是 Managed State;
  2. 明白为什么 Window API 自带的“隐式状态”一定要序列化到后端,以及如何通过 StateBackend 配置(Memory vs RocksDB)来优化性能

欢迎关注我

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2376383.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

家用或办公 Windows 电脑玩人工智能开源项目配备核显的必要性(含 NPU 及显卡类型补充)

一、GPU 与显卡的概念澄清 首先需要明确一个容易误解的概念&#xff1a;GPU 不等同于显卡。 显卡和GPU是两个不同的概念。 【概念区分】 在讨论图形计算领域时&#xff0c;需首先澄清一个常见误区&#xff1a;GPU&#xff08;图形处理单元&#xff09;与显卡&#xff08;视…

实现一个简单的 TCP 客户端/服务器

注意&#xff1a; TCP 三次握手建立连接建立连接后&#xff0c;TCP 提供全双工的通信服务&#xff0c;也就是在同一个连接中&#xff0c;通信双方 可以在同一时刻同时写数据&#xff0c;相对的概念叫做半双工&#xff0c;同一个连接的同一时刻&#xff0c;只能由一方来写数据T…

对抗帕金森:在疾病阴影下,如何重掌生活主动权?

帕金森病&#xff0c;一种影响全球超 1000 万人的神经退行性疾病&#xff0c;正无声地改变着患者的生活轨迹。随着大脑中多巴胺分泌减少&#xff0c;患者逐渐出现肢体震颤、肌肉僵硬、步态迟缓等症状&#xff0c;甚至连扣纽扣、端水杯这类日常动作都变得艰难。更棘手的是&#…

鸿蒙 UIAbility组件与UI的数据同步和窗口关闭

使用 EventHub 进行数据通信 Stage模型概念图 根据 Stage 模型概念图 UIAbility 先于 ArkUI Page 创建 所以&#xff0c;事件要先 .on 订阅 再 emit 发布 假如现在有页面 Page1 和他的 UIAbility // src/main/ets/page1ability/Page1Ability.ets onCreate(want: Want, laun…

Vue3学习(组合式API——计算属性computed详解)

目录 一、计算属性computed。 Vue官方提供的案例。(普通写法与计算属性写法) 使用计算属性computed重构——>简化描述响应式状态的复杂逻辑。 &#xff08;1&#xff09;计算属性computed小案例。 <1>需求说明。&#xff08;筛选原数组——>得新数组&#xff09; &…

Android Studio 模拟器配置方案

Android Studio 模拟器配置方案 1.引言2.使用Android Studio中的模拟器3.使用国产模拟器1.引言 前面介绍【React Native基础环境配置】的时候需要配置模拟器,当时直接使用了USB调试方案,但是有些时候可能不太方便连接手机调试,比如没有iPhone调不了ios。接下来说明另外两种可…

k8s中ingress-nginx介绍

1. 介绍 Ingress是一种Kubernetes资源&#xff0c;用于将外部流量路由到Kubernetes集群内的服务。与NodePort相比&#xff0c;它提供了更高级别的路由功能和负载平衡&#xff0c;可以根据HTTP请求的路径、主机名、HTTP方法等来路由流量。可以说Ingress是为了弥补NodePort在流量…

字节DeerFlow开源框架:多智能体深度研究框架,实现端到端自动化研究流程

&#x1f98c; DeerFlow DeerFlow&#xff08;Deep Exploration and Efficient Research Flow&#xff09;是一个社区驱动的深度研究框架&#xff0c;它建立在开源社区的杰出工作基础之上。目标是将语言模型与专业工具&#xff08;如网络搜索、爬虫和Python代码执行&#xff0…

算法第十八天|530. 二叉搜索树的最小绝对差、501.二叉搜索树中的众数、236. 二叉树的最近公共祖先

530. 二叉搜索树的最小绝对差 题目 思路与解法 第一想法&#xff1a; 一个二叉搜索树的最小绝对差&#xff0c;从根结点看&#xff0c;它的结点与它的最小差值一定出现在 左子树的最右结点&#xff08;左子树最大值&#xff09;和右子树的最左结点&#xff08;右子树的最小值…

微服务调试问题总结

本地环境调试。 启动本地微服务&#xff0c;使用公共nacos配置。利用如apifox进行本地代码调试解决调试问题。除必要的业务微服务依赖包需要下载到本地。使用mvn clean install -DskipTests进行安装启动前选择好profile环境进行启动&#xff0c;启动前记得mvn clean清理项目。…

美SEC主席:探索比特币上市证券交易所

作者/演讲者&#xff1a;美SEC主席Paul S. Atkins 编译&#xff1a;Liam 5月12日&#xff0c;由美国SEC加密货币特别工作组发起的主题为《资产上链&#xff1a;TradFi与DeFi的交汇点》系列圆桌会议如期举行。 会议期间&#xff0c;现任美SEC主席Paul S. Atkins发表了主旨演讲。…

MySQL Join连接算法深入解析

引言 在关系型数据库中&#xff0c;Join操作是实现多表数据关联查询的关键手段&#xff0c;直接影响查询性能和资源消耗。MySQL支持多种Join算法&#xff0c;包括经典的索引嵌套循环连接&#xff08;Index Nested-Loop Join&#xff09;、块嵌套循环连接&#xff08;Block Nes…

http请求卡顿

接口有时出现卡顿&#xff0c;而且抓包显示有时tcp目标机器没有响应&#xff0c; 但nginx和java应用又没有错误日志&#xff0c;让人抓耳挠腮&#xff0c;最终还是请运维大哥帮忙&#xff0c;一顿操作后系统暂时无卡顿了&#xff0c;佩服的同时感觉疑惑到底调整了啥东…

vite+vue建立前端工程

​ 参考 开始 | Vite 官方中文文档 VUE教程地址 https://cn.vuejs.org/tutorial/#step-1 第一个工程 https://blog.csdn.net/qq_35221977/article/details/137171497 脚本 chcp 65001 echo 建立vite工程 set PRO_NAMEmy-vue-appif not exist %PRO_NAME% (call npm i…

vue使用路由技术实现登录成功后跳转到首页

文章目录 一、概述二、使用步骤安装vue-router在src/router/index.js中创建路由器&#xff0c;并导出在vue应用实例中使用router声明router-view标签&#xff0c;展示组件内容 三、配置登录成功后跳转首页四、参考资料 一、概述 路由&#xff0c;决定从起点到终点的路径的进程…

day20-线性表(链表II)

一、调试器 1.1 gdb&#xff08;调试器&#xff09; 在程序指定位置停顿 1.1.1 一般调试 gcc直接编译生成的是发布版&#xff08;Release&#xff09; gcc -g //-g调式版本&#xff0c;&#xff08;体积大&#xff0c;内部有源码&#xff09;&#xff08;DeBug&#…

HTTP 连接复用机制详解

文章目录 HTTP 连接复用机制详解为什么需要连接复用&#xff1f;连接复用的实现方式HTTP/1.1 的 Keep-AliveHTTP/2 多路复用 HTTP/1.1 的队头阻塞问题 HTTP 连接复用机制详解 HTTP 连接复用是 HTTP/1.1 及更高版本中的核心优化机制&#xff0c;旨在减少 TCP 连接建立和关闭的开…

网络协议分析 实验六 TCP和端口扫描

文章目录 实验6.1 TCP(Transfer Control Protocol)练习二 利用仿真编辑器编辑并发送TCP数据包实验6.2 UDP端口扫描实验6.3 TCP端口扫描练习一 TCP SYN扫描练习二 TCP FIN扫描 实验6.1 TCP(Transfer Control Protocol) 建立&#xff1a;syn,syn ack,ack 数据传送&#xff1a;tcp…

Spring Web MVC————入门(2)

1&#xff0c;请求 我们接下来继续讲请求的部分&#xff0c;上期将过很多了&#xff0c;我们来给请求收个尾。 还记得Cookie和Seesion吗&#xff0c;我们在HTTP讲请求和响应报文的时候讲过&#xff0c;现在再给大家讲一遍&#xff0c;我们HTTP是无状态的协议&#xff0c;这次的…

每日算法-250514

每日算法学习记录 (2024-05-14) 今天记录三道 LeetCode 算法题的解题思路和代码。 1. 两数之和 题目截图: 解题思路 这道题要求我们从一个整数数组中找出两个数&#xff0c;使它们的和等于一个给定的目标值 target&#xff0c;并返回这两个数的下标。 核心思路是使用 哈希…