Flink 优化 (四) --------- 数据倾斜

news2025/5/19 10:26:15

目录

  • 一、判断是否存在数据倾斜
  • 二、数据倾斜的解决
    • 1. keyBy 后的聚合操作存在数据倾斜
    • 2. keyBy 之前发生数据倾斜
    • 3. keyBy 后的窗口聚合操作存在数据倾斜


一、判断是否存在数据倾斜

相同 Task 的多个 Subtask 中,个别 Subtask 接收到的数据量明显大于其他Subtask 接收到的数据量,通过 Flink Web UI 可以精确地看到每个 Subtask 处理了多少数据,即可判断出 Flink 任务是否存在数据倾斜。通常,数据倾斜也会引起反压。

在这里插入图片描述

另外,有时 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。

二、数据倾斜的解决

1. keyBy 后的聚合操作存在数据倾斜

提交案例:

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.fancy.flink.tuning.SkewDemo1 \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--local-keyby false

查看 webui:

在这里插入图片描述

1)为什么不能直接用二次聚合来处理

Flink 是实时流处理,如果 keyby 之后的聚合操作存在数据倾斜,且没有开窗口 (没攒批) 的情况下,简单的认为使用两阶段聚合,是不能解决问题的。因为这个时候 Flink 是来一条处理一条,且向下游发送一条结果,对于原来 keyby 的维度(第二阶段聚合)来讲,数据量并没有减少,且结果重复计算(非 FlinkSQL,未使用回撤流),如下图所示:

在这里插入图片描述
2)使用 LocalKeyBy 的思想

在 keyBy 上游算子数据发送之前,首先在上游算子的本地对数据进行聚合后,再发送到下游,使下游接收到的数据量大大减少,从而使得 keyBy 之后的聚合操作不再是任务的瓶颈。类似MapReduce 中 Combiner 的思想,但是这要求聚合操作必须是多条数据或者一批数据才能聚合,单条数据没有办法通过聚合来减少数据量。从 Flink LocalKeyBy 实现原理来讲,必然会存在一个积攒批次的过程,在上游算子中必须攒够一定的数据量,对这些数据聚合后再发送到下游。

实现方式:

➢ DataStreamAPI 需要自己写代码实现
➢ SQL 可以指定参数,开启 miniBatch 和 LocalGlobal 功能(推荐,后续介绍)

3)DataStream API 自定义实现的案例

以计算每个 mid 出现的次数为例,keyby 之前,使用 flatMap 实现 LocalKeyby 功能

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class LocalKeyByFlatMapFunc extends RichFlatMapFunction<Tuple2<String,  Long>, Tuple2<String, Long>> implements CheckpointedFunction {
	//Checkpoint 时为了保证 Exactly Once,将 buffer 中的数据保存到该 ListState 中
	private ListState<Tuple2<String, Long>> listState;
	//本地 buffer,存放 local 端缓存的 mid 的 count 信息
	private HashMap<String, Long> localBuffer;
	//缓存的数据量大小,即:缓存多少数据再向下游发送
	private int batchSize;
	//计数器,获取当前批次接收的数据量
	private AtomicInteger currentSize;
	//构造器,批次大小传参
	public LocalKeyByFlatMapFunc(int batchSize) {
		this.batchSize = batchSize;
	}
	@Override
	public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, Long>> 	out) throws Exception {
		// 1、将新来的数据添加到 buffer 中
		Long count = localBuffer.getOrDefault(value, 0L);
		localBuffer.put(value.f0, count + 1);
		// 2、如果到达设定的批次,则将 buffer 中的数据发送到下游
		if (currentSize.incrementAndGet() >= batchSize) {
			// 2.1 遍历 Buffer 中数据,发送到下游
			for (Map.Entry<String, Long> midAndCount : localBuffer.entrySet()) {
				out.collect(Tuple2.of(midAndCount.getKey(), 
				midAndCount.getValue()));
			}
			// 2.2 Buffer 清空,计数器清零
			localBuffer.clear();
			currentSize.set(0);
		}
	}
	@Override
	public void snapshotState(FunctionSnapshotContext context) throws Exception {
		// 将 buffer 中的数据保存到状态中,来保证 Exactly Once
		listState.clear();
		for (Map.Entry<String, Long> midAndCount : localBuffer.entrySet()) {
			listState.add(Tuple2.of(midAndCount.getKey(), midAndCount.getValue()));
		}
	}
	
	@Override
	public void initializeState(FunctionInitializationContext context) throws Exception {
		// 从状态中恢复 buffer 中的数据
		listState = context.getOperatorStateStore().getListState(
			new ListStateDescriptor<Tuple2<String, Long>>(
				"localBufferState",
				Types.TUPLE(Types.STRING, Types.LONG)
			)
		);
		localBuffer = new HashMap();
		if (context.isRestored()) {
		// 从状态中恢复数据到 buffer 中
		for (Tuple2<String, Long> midAndCount : listState.get()) {
			// 如果出现 pv != 0,说明改变了并行度,ListState 中的数据会被均匀分发到新的 subtask 中
			// 单个 subtask 恢复的状态中可能包含多个相同的 mid 的 count 数据
			// 所以每次先取一下 buffer 的值,累加再 put
			long count = localBuffer.getOrDefault(midAndCount.f0, 0L);
			localBuffer.put(midAndCount.f0, count + midAndCount.f1);
		}
		// 从状态恢复时,默认认为 buffer 中数据量达到了 batchSize,需要向下游发
		currentSize = new AtomicInteger(batchSize);
		} else {
			currentSize = new AtomicInteger(0);
		}
	}
}

提交 localkeyby 案例:

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.SkewDemo1 \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--local-keyby true

查看 webui:

在这里插入图片描述
可以看到每个 subtask 处理的数据量基本均衡,另外处理的数据量相比原先少了很多。

2. keyBy 之前发生数据倾斜

如果 keyBy 之前就存在数据倾斜,上游算子的某些实例可能处理的数据较多,某些实例可能处理的数据较少,产生该情况可能是因为数据源的数据本身就不均匀,例如由于某些原因 Kafka 的 topic 中某些 partition 的数据量较大,某些 partition 的数据量较少对于不存在 keyBy 的 Flink 任务也会出现该情况。

这种情况,需要让 Flink 任务强制进行 shuffle。使用 shuffle、rebalance 或 rescale算子即可将数据均匀分配,从而解决数据倾斜的问题。

3. keyBy 后的窗口聚合操作存在数据倾斜

因为使用了窗口,变成了有界数据(攒批)的处理,窗口默认是触发时才会输出一条结果发往下游,所以可以使用两阶段聚合的方式:

1)实现思路:

➢ 第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合

注意:聚合完不再是 WindowedStream,要获取 WindowEnd 作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合到一起)

➢ 第二阶段聚合:按照原来的 key 及 windowEnd 作 keyby、聚合

2)提交原始案例

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.SkewDemo2 \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--two-phase false

查看 WebUI:

在这里插入图片描述
3)提交两阶段聚合的案例

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.fancy.flink.tuning.SkewDemo2 \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--two-phase true \
--random-num 16

查看 WebUI:可以看到第一次打散的窗口聚合,比较均匀

在这里插入图片描述
第二次聚合,也比较均匀:

在这里插入图片描述
随机数范围,需要自己去测,因为 keyby 的分区器是(两次 hash*下游并行度/最大并行度)
SQL 写法参考:https://zhuanlan.zhihu.com/p/197299746

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/414151.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【DevOps】GitOps之痛 -不完美的GitOps

前言 在前两篇文章中&#xff0c;我们对GitOps进行了大致的介绍&#xff1a; 【DevOps】GitOps初识(上) - 让DevOps变得更好 【DevOps】GitOps初识(下) - 让DevOps变得更好 GitOps 作为软件发布实践方式&#xff0c;有着许多的优点&#xff0c;然而&#xff0c;世上并没有完美…

【Linux】Centos安装mvn命令(maven)

&#x1f341;博主简介 &#x1f3c5;云计算领域优质创作者   &#x1f3c5;华为云开发者社区专家博主   &#x1f3c5;阿里云开发者社区专家博主 &#x1f48a;交流社区&#xff1a;运维交流社区 欢迎大家的加入&#xff01; 文章目录一、下载maven包方法一&#xff1a;官…

CTF流量分析

在CTF里&#xff0c;一些pcapng或pcap文件后缀的数据 不同的数据包有不同的协议&#xff0c;常见的有HTTP&#xff0c;TCP协议 Wireshark 简介 是一个网络封包分析软件。网络封包分析软件的功能是获取网络封包&#xff0c;并尽可能显示出最为详细的网络封包资料 使用WinPC…

舔狗日记:学姐生日快到了,使用Python把她的照片做成视频当礼物

舔狗日记1前言一、需要调入的模块二、实现合并多张图片转成 mp4 视频三、优化改进一下总结前言 这不是学姐生日快到了&#xff0c;于是我学了一手使用Python来把学姐的照片生成为视频&#xff0c;到时候给她一个惊喜&#xff01; 好了先不舔了&#xff0c;下面分享一下用pytho…

基于朴素贝叶斯分类器的钞票真伪识别模型

基于朴素贝叶斯分类器的钞票真伪识别模型 内容 本实验通过实现钞票真伪判别案例来展开学习朴素贝叶斯分类器的原理及应用。 本实验的主要技能点&#xff1a; 1、 朴素贝叶斯分类器模型的构建 2、 模型的评估与预测 3、 分类概率的输出 源码下载 环境 操作系统&#xf…

Leetcode.130 被围绕的区域

题目链接 Leetcode.130 被围绕的区域 mid 题目描述 给你一个 m x n的矩阵 board&#xff0c;由若干字符 X和 O&#xff0c;找到所有被 X围绕的区域&#xff0c;并将这些区域里所有的 O用 X填充。 示例 1&#xff1a; 输入&#xff1a;board [[“X”,“X”,“X”,“X”],[“X…

stm32霸道-lvgl移植学习(一)

文章目录效果有用链接要求创建工程屏幕驱动以及触屏驱动LVGL PortWidgets demo其它效果 目前显示驱动显示较慢&#xff0c;后续会优化。 有用链接 LVGL官网 代码下载 要求 要求最低要求 建议要求架构16、32、64位微控制器或微处理器时钟 > 16 MHz > 48 MHzFlash/RO…

《低代码PaaS驱动集团企业数字化创新白皮书》-平台化加低代码提供破解之道(1)

平台化加低代码提供破解之道 大型企业亟需通过下一代平台开发技术实现软件创新&#xff0c;实现对海量数据的采集加工&#xff0c;以及企业内部数据的互联互通&#xff0c;帮助客户以低成本、短周期、高效率的方式实现数字化应用&#xff0c;进而赋能业务创新。基于此&#xf…

408--计算机网络--网络层总结1

目录 一、网络层概述&#xff1a; 1、网络层的主要任务&#xff1a; 2、网络层向上提供两种服务&#xff1a; 二、IPV4地址分类与子网划分&#xff1a; 1、分类编址&#xff1a; 一、网络层概述&#xff1a; 1、网络层的主要任务&#xff1a; 络层的主要任务就是将分组从…

【数据库基操】启动与连接MySQL数据库

一、启动与关闭 只介绍一种方法&#xff1a; 打开命令行工具&#xff0c;以管理员身份运行 1.启动数据库 net start mysql80 //80是在安装的时候设置的名字&#xff08;默认&#xff09;&#xff0c;不用在意 2.关闭数据库 net stop mysql80 如题已经成功&#…

java获取本机ip的方法

Java中有一个类叫 Application&#xff0c;可以用来获取本机 ip&#xff0c;也可以用来获取网络连接的信息&#xff0c;例如网络上有什么主机、需要访问本机的主机名等。但是这个类只能在本机上使用&#xff0c;如果要访问外部的主机&#xff0c;还需要使用其它的方法。 首先在…

教育大数据总体解决方案(5)

&#xff08;4&#xff09;错题整理 将学生的本次考试错题进行集中整理&#xff0c;提炼出所有题目的题干和正确的答案。 &#xff08;5&#xff09;提高方案 分析学生对知识点掌握情况&#xff0c;推算出学生的进步空间以及下次考试的预测拔高分数。根据学生本次考试错误知识点…

你的APP内存还在暴增吗?试着用Bitmap管理下内存~

作者&#xff1a;layz4android 相信伙伴们在日常的开发中&#xff0c;一定对图片加载有所涉猎&#xff0c;而且对于图片加载现有的第三方库也很多&#xff0c;例如Glide、coil等&#xff0c;使用这些三方库我们好像就没有啥担忧的&#xff0c;他们内部的内存管理和缓存策略做的…

Java垃圾回收机制GC完全指南,让你彻底理解JVM运行原理

1、GC过程 1&#xff09;先判断对象是否存活(是否是垃圾) 可以通过引用计数算法和可达性分析算法来判断&#xff0c;由于引用计数算法无法解决循环引用的问题&#xff0c;所以目前使用的都是可达性分析算法 2&#xff09;再遍历并回收对象(回收垃圾) 可以通过垃圾收集器&…

使用Schrödinger Python API系列教程 -- 介绍 (一)

使用Schrdinger Python API系列教程 – 介绍 (一) 本文档可从Schrdinger网站www.schrodinger.com/pythonapi访问。 从Python文档字符串生成的完整API文档可以在这里访问 介绍 在最高级别上&#xff0c;Schrdinger Python API提供了一个基本的分子结构类&#xff0c;并允许与…

redis总结之-jedis

redis总结之-jedis4. Jedis4.1 Jedis简介4.1.1 编程语言与redis4.1.2 准备工作4.1.3 代码实现4.2 Jedis简易工具类开发4.2.1 基于连接池获取连接4.2.2 封装连接参数4.2.3 加载配置信息4.2.4 获取连接4.3 可视化客户端总结计划 1. Redis 入 门&#xff08;了解&#xff09;&…

LNMP网站框架搭建(编译安装的方式)

1. Nginx的工作原理 php-fpm.conf 是控制php-fpm守护进程的 php.ini是php解析器 工作进程&#xff1a; 1.客户端通过域名进行请求访问时&#xff0c;会找Nginx对应的虚拟主机 2. Nginx对该请求进行判断&#xff0c;如果是静态请求,Nginx会自行处理&#xff0c;并将处理结果…

因果推断14--DRNet论文和代码学习

目录 论文介绍 代码实现 DRNet ReadMe 因果森林 论文介绍 因果推断3--DRNet&#xff08;个人笔记&#xff09;_万三豹的博客-CSDN博客 摘要&#xff1a;估计个体在不同程度的治疗暴露下的潜在反应&#xff0c;对于医疗保健、经济学和公共政策等几个重要领域具有很高的实…

ERP系统有什么用?主要是这三方面

ERP 是Enterprise Resource Planning 的缩写&#xff0c;即企业资源计划系统,是建立在信息技术基础上,以系统化的管理思想,为企业决策层及员工提供决策运行手段的管理平台。它实现了企业内部资源和企业相关的外部资源的整合。通过软件把企业的人、财、物、产、供、销及相应的物…

socket 到底是个啥

哈喽大家好&#xff0c;我是咸鱼 我相信大家在面试过程中或多或少都会被问到这样一个问题&#xff1a;你能解释一下什么是 socket 吗 我记得我当初的回答很是浅显&#xff1a;socket 也叫套接字&#xff0c;用来负责不同主机程序之间的网络通信连接&#xff0c;socket 的表现…