Flink 并行度的设置

news2025/5/20 19:46:36

在 Apache Flink 中,并行度(Parallelism) 是控制任务并发执行的核心参数之一。Flink 提供了 多个层级设置并行度的方式,优先级从高到低如下:


🧩 一、Flink 并行度的四个设置层级

层级描述设置方式
Operator Level为某个具体的算子设置并行度operator.setParallelism(n)
Execution Environment Level为整个流处理环境设置默认并行度env.setParallelism(n)
Client Level(提交作业时)通过命令行指定全局并行度flink run -p n
System Level(系统配置)flink-conf.yaml 中定义全局默认值parallelism.default: n

✅ 二、各层级设置详解与示例

1. Operator Level(算子级别)

  • 优先级最高
  • 可以为特定算子设置不同并行度,适用于数据倾斜或资源敏感操作
🔧 示例:
DataStream<String> stream = env.fromElements("a", "b", "c");

// 单独为 map 算子设置并行度为4
stream.map(new MyMapFunction())
      .setParallelism(4)
      .print();
✅ 适用场景:
  • 某个算子计算密集,需要更多资源
  • 数据源分区数较少,但后续算子可并行化处理

2. Execution Environment Level(执行环境级别)

  • 设置整个 Job 的默认并行度
  • 如果未对某些算子单独设置,并使用此值
🔧 示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // 所有算子默认并行度为4

DataStream<String> stream = env.fromElements("a", "b", "c");
stream.map(new MyMapFunction()).print(); // 默认并行度为4
✅ 适用场景:
  • 多数算子使用相同并行度
  • 统一配置便于管理和维护

3. Client Level(客户端提交作业时)

  • 使用命令行参数动态设置并行度
  • 不修改代码即可适配不同运行环境(如测试/生产)
🔧 示例:
flink run -p 4 -c com.example.MyJob ./myjob.jar
✅ 适用场景:
  • 快速调整不同集群资源配置
  • 测试阶段快速验证性能

4. System Level(系统级别)

  • flink-conf.yaml 中设置全局默认并行度
  • 对所有提交的作业生效(除非被更高级别覆盖)
🔧 示例(flink-conf.yaml):
parallelism.default: 4
✅ 适用场景:
  • 所有作业共享相同的默认资源配置
  • 避免手动重复设置

📊 三、并行度优先级对比表

设置方式是否推荐场景覆盖关系
Operator Level✅✅✅特定算子优化最高优先级
Execution Environment Level✅✅整体统一配置被 Operator 覆盖
Client Level (-p)动态部署被前两者覆盖
System Level (flink-conf.yaml)⚠️兜底默认值最低优先级

💡 四、并行度设置建议

✅ 推荐做法:

  • 开发/测试环境:使用 .setParallelism()-p 命令行设置较小值(如1~4)
  • 生产环境
    • 使用 flink-conf.yaml 设置基础并行度
    • 使用 env.setParallelism() 明确控制默认值
    • 为关键算子单独设置更高并行度(如窗口聚合、复杂逻辑)

⚙️ 示例组合:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // 默认并行度

env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
   .setParallelism(8) // Kafka Source 并行度设为8(等于topic分区数)
   .map(new MyMapFunction()) // 使用默认并行度4
   .keyBy(keySelector)
   .window(TumblingEventTimeWindows.of(Time.seconds(5)))
   .process(new MyProcessWindowFunction()) // 可选 setParallelism()
   .print();

🧠 五、并行度与资源的关系

并行度TaskManager 数量Slot 数量资源要求
≤ TM × slot✅ 正常运行✅ 正常运行资源充足
> TM × slot❌ 无法启动❌ 无法启动资源不足

✅ 建议:确保总并行度 ≤ 总 slot 数量


📈 六、实际调优建议

场景建议设置
Kafka Source并行度 = Kafka Topic 分区数
Map / FlatMap根据 CPU 利用率设置
Keyed Window Aggregation可适当提高并行度提升吞吐
Join / CoGroup视数据分布决定是否提高并行度
Sink若写入慢可适当增加并行度

✅ 七、完整示例(Java + Shell)

Java 设置(Env + Operator):

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);

env.fromElements("a", "b", "c")
   .map(x -> x)
   .setParallelism(2) // 覆盖默认值
   .print();

env.execute("Parallelism Example");

Shell 设置(Client Level):

flink run -p 8 -c com.example.MyJob ./myjob.jar

✅ 八、总结

层级用途是否推荐使用
Operator Level控制单个算子并行度✅✅✅ 强烈推荐用于关键路径优化
Execution Environment Level设置默认并行度✅✅ 推荐作为基础配置
Client Level动态设置并行度✅ 适合多环境部署
System Level全局兜底配置⚠️ 推荐配合其他方式使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2380207.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

get请求使用数组进行传参

get请求使用数组进行传参,无需添加中括号 mvc接口要添加参数名&#xff0c;使用array承接。不能用list, 否则会报错 这里是用apifox模拟前端调用。 前端调用代码 // 根据项目ID和角色ID查询相关审批人 export function findRelativeApproverByProjectIdAndRoleId(roleIds, p…

【MySQL成神之路】MySQL常用语法总结

目录 MySQL 语法总结 数据库操作 表操作 数据操作 查询语句 索引操作 约束 事务控制 视图操作 存储过程和函数 触发器 用户和权限管理 数据库操作 创建数据库&#xff1a; CREATE DATABASE database_name; 选择数据库&#xff1a; USE database_name; 删除数…

Linux动静态库制作与原理

什么是库 库是写好的现有的&#xff0c;成熟的&#xff0c;可以复用的代码。现实中每个程序都要依赖很多基础的底层库&#xff0c;不可能每个人的代码都从零开始&#xff0c;因此库的存在意义非同寻常。 本质上来说库是一种可执行代码的二进制形式&#xff0c;可以被操作系统…

ffmpeg 把一个视频复制3次

1. 起因&#xff0c; 目的: 前面我写过&#xff0c;使用 python 把一个视频复制3次但是速度太慢了&#xff0c;我想试试看能否改进。而且我想换一种新的视频处理思路&#xff0c;并试试看速度如何。 2. 先看效果 效果就是能行&#xff0c;而且速度也快。 3. 过程: 代码 1…

GPT/Claude3国内免费镜像站更新 亲测可用

无限次使用&#xff1a;无限制的提问次数&#xff0c;不设上限&#xff0c;随心所欲。 无需魔法、稳定流畅&#xff1a;操作简便&#xff0c;无需复杂设置&#xff0c;即可享受稳定流畅的服务。 手机和电脑均能用&#xff1a;轻松适配手机和电脑&#xff0c;使用体验更佳。 …

Python:操作Excel按行写入

Python按行写入Excel数据,5种实用方法大揭秘! 在日常的数据处理和分析工作中,我们经常需要将数据写入到Excel文件中。Python作为一门强大的编程语言,提供了多种库和方法来实现将数据按行写入Excel文件的功能。本文将详细介绍5种常见的Python按行写入Excel数据的方法,并附上…

Redis进阶知识

Redis 1.事务2. 主从复制2.1 如何启动多个Redis服务器2.2 监控主从节点的状态2.3 断开主从复制关系2.4 额外注意2.5拓扑结构2.6 复制过程2.6.1 数据同步 3.哨兵选举原理注意事项 4.集群4.1 数据分片算法4.2 故障检测 5. 缓存5.1 缓存问题 6. 分布式锁 1.事务 Redis的事务只能保…

12.vue整合springboot首页显示数据库表-实现按钮:【添加修改删除查询】

vue整合springboot首页显示数据库表&#xff1a;【添加修改删除查询】 提示&#xff1a;帮帮志会陆续更新非常多的IT技术知识&#xff0c;希望分享的内容对您有用。本章分享的是node.js和vue的使用。前后每一小节的内容是存在的有&#xff1a;学习and理解的关联性。【帮帮志系…

bisheng系列(一)- 本地部署(Docker)

目录 一、导读 二、说明 1、镜像说明 2、本节内容 三、docker部署 1、克隆代码 2、运行镜像 3、可能的错误信息 四、页面测试 1、注册用户 2、登陆成功 3、添加模型 一、导读 环境&#xff1a;Ubuntu 24.04、Windows 11、WSL 2、Python 3.10 、bisheng 1.1.1 背景…

如何用Python批量解压ZIP文件?快速解决方案

如何用Python批量解压ZIP文件&#xff1f;快速解决方案 文章目录 **如何用Python批量解压ZIP文件&#xff1f;快速解决方案**代码结果详细解释 话不多说&#xff0c;先上干货&#xff01;&#xff01;&#xff01; 代码 import os import zipfiledef unzip_file(dir_path: str…

DriveGenVLM:基于视觉-语言模型的自动驾驶真实世界视频生成

《DriveGenVLM: Real-world Video Generation for Vision Language Model based Autonomous Driving》2024年8月发表&#xff0c;来自哥伦比亚大学的论文。 自动驾驶技术的进步需要越来越复杂的方法来理解和预测现实世界的场景。视觉语言模型&#xff08;VLM&#xff09;正在成…

企业标准信息公共服务平台已开放标准通编辑器访问入口

标准通 数字化标准编辑器 专业、高效、便捷 企业标准信息公共服务平台 近日&#xff0c;企业标准信息公共服务平台已开放标准通编辑器访问入口&#xff0c;可进入官网指定版块使用&#xff01; 核心功能亮点 解决企业痛点 传统标准编制&#xff0c;需反复核对格式、逐条…

进阶-数据结构部分:1、数据结构入门

飞书文档https://x509p6c8to.feishu.cn/wiki/HRLkwznHiiOgZqkqhLrcZNqVnLd 一、存储结构 顺序存储 链式存储 二、常用数据结构 2.1、栈 先进后出 场景&#xff1a; 后退/前进功能&#xff1a;网页浏览器中的后退和前进按钮可以使用栈来实现。在浏览网页时&#xff0c;每次…

React 19中useContext不需要Provider了。

文章目录 前言一、React 19中useContext移除了Provider&#xff1f;二、使用步骤总结 前言 在 React 19 中&#xff0c;useContext 的使用方式有所更新。开发者现在可以直接使用 作为提供者&#xff0c;而不再需要使用 <Context.Provider>。这一变化简化了代码结构&…

Json schema校验json字符串(networknt/json-schema-validator库)

学习链接 json-schema官网 - 英文 jsonschemavalidator 可在线校验网站 networknt的json-schema-validator github地址 networknt的json-schema-validator 个人gitee地址 - 里面有md文档说明和代码示例 JSON Schema 入门指南&#xff1a;如何定义和验证 JSON 数据结构 JS…

交易所开发:构建功能完备的金融基础设施全流程指南

交易所开发&#xff1a;构建功能完备的金融基础设施全流程指南 ——从技术架构到合规安全的系统性解决方案 一、开发流程&#xff1a;从需求分析到运维优化 开发一款功能完备的交易所需要遵循全生命周期管理理念&#xff0c;涵盖市场定位、技术实现、安全防护和持续迭代四大阶…

Axure疑难杂症:统计分析页面引入Echarts示例动态效果

亲爱的小伙伴,在您浏览之前,烦请关注一下,在此深表感谢! Axure产品经理精品视频课已登录CSDN可点击学习https://edu.csdn.net/course/detail/40420 课程主题:统计分析页面引入Echarts示例动态效果 主要内容:echart示例引入、大小调整、数据导入 应用场景:统计分析页面…

展锐Android14及更新版本split_build编译方法

更改split_build.py文件内容后按照下面方法编译&#xff1a; zip -r sys/vendor/sprd/release/split_build.zip sys/vendor/sprd/release/split_build/ rm -r sys/vendor/sprd/release/split_build/ cp -r vnd/vendor/sprd/release/split_build/ sys/vendor/sprd/release/cd s…

青少年ctf平台应急响应-应急响应2

题目&#xff1a; 当前服务器被创建了一个新的用户&#xff0c;请提交新用户的用户名&#xff0c;得到的结果 ssh rootchallenge.qsnctf.com -p 30327 这个命令用于通过 SSH 协议连接到指定的远程服务器。具体解释如下&#xff1a; ssh&#xff1a;这是在 Unix-like 系统中…

k8s监控方案实践补充(二):使用kube-state-metrics获取资源状态指标

k8s监控方案实践补充&#xff08;二&#xff09;&#xff1a;使用kube-state-metrics获取资源状态指标 文章目录 k8s监控方案实践补充&#xff08;二&#xff09;&#xff1a;使用kube-state-metrics获取资源状态指标一、Metrics Server简介二、kube-state-metrics实战部署1. 创…