EMR-StarRocks 与 Flink 在汇量实时写入场景的最佳实践

news2025/7/12 15:09:49

作者:

刘腾飞 汇量后端开发工程师

阿里云开源OLAP研发团队

EMR-StarRocks介绍

阿里云EMR在年初推出了StarRocks服务,StarRocks是新一代极速全场景MPP(Massively Parallel Processing)数据仓库,致力于构建极速和统一分析体验。EMR StarRocks具备如下特点:

  • 兼容MySQL协议,可使用MySQL客户端和常用BI工具对接StarRocks来分析数据
  • 采用分布式架构:
    • 对数据表进行水平划分并以多副本存储
    • 集群规模可以灵活伸缩,支持10 PB级别的数据分析
    • 支持MPP框架,并行加速计算
    • 支持多副本,具有弹性容错能力
    • 支持向量化引擎和CBO
    • 支持弹性扩缩容
    • 支持明细模型、聚合模型、主键模型和更新模型

更多详细信息可以参考https://help.aliyun.com/document_detail/405463.html

Flink-CDC概念介绍

在这里插入图片描述

CDC的全称是Change Data Capture,面向的场景包括数据同步、数据分发、数据采集,Flink CDC 主要面向数据库的变更,可以将上游数据和Schema的变更同步到下游数据湖和数据仓库中。2020年7月,Flink CDC项目提交了第一个Commit,去年8月,Flink社区发布了CDC2.0,经过两年时间的打磨,在商业化使用上已经非常成熟。本文主要以Mysql CDC为例,介绍StarRocks+Flink CDC实时入仓中用户遇到的痛点,以及在Flink和StarRocks层面进行的对应优化和解决方案。

使用CDC将一张Mysql表中的数据导入到StarRocks的表中,首先需要在StarRocks上建立用来承接Mysql数据的目标表,然后在Flink上分别创建Mysql表和StarRocks表在Flink中Sink和Source表的映射,然后执行一条insert into sink_table from source_table语句。执行完Insert into之后,会生成一个CDC任务,CDC任务首先向目标表同步源表的全量数据,完成后继续基于Binlog进行增量数据的同步。通过一个任务,完成数据的全量+增量同步,对于用户来讲是非常友好的。但是在使用的过程中,依然发现了一些痛点。

实时写入场景的用户痛点

SQL开发工作量大

对于一些还没有完成数仓建设的新业务,或是刚刚开始依托StarRocks进行OLAP平台建设的用户而言,在StarRocks中建表以承载Mysql同步过来的数据是第一步。在一些复杂的业务中,Mysql中的表往往有几十上百张,每张表又有数十个字段,要把它们对应的StarRocks表的建表语句全部编写出来是一个很大的工作量。第一个痛点StarRocks建表的工作量大。

Flink字段的数据类型映射关系复杂易错

在StarRocks中建表是第一步,建表完成之后,为了启动CDC任务,还需要在Flink中建立Mysql对应的Source表,以及StarRocks对应的Sink表,其中Flink建表时,每个字段的字段类型与Mysql、与StarRocks的映射关系需要严格注意,对于动辄几十上百个需要字段的表,每个字段都需要查找对应在Flink的类型映射关系,尤其令开发人员痛苦。因此,第二个痛点是上下游表与Flink字段的数据类型映射关系复杂,容易出错。

Schema变更操作繁琐

第三个痛点来自于业务数据Schema的变化,据Fivetran公司调查,约有60%的公司数据Schema每个月都会发生变化,30%的公司数据Schema每周都会发生变化。对于Mysql表中字段的增删改,用户希望在不影响CDC任务的情况下,将Schema变化同步到下游的StarRocks。目前常用的方案,是在手动停止任务后,更改StarRocks和Mysql的Schema,更改Flink侧的Sink和Source表结构,通过指定savepoints的方式再次启动任务。Schema变更的操作繁琐,无法自动化是第三个痛点。

数据同步任务占用资源多

第四个痛点,是在表的数量多、实时增量数据量大的场景下,CDC任务占用的内存和cpu资源较高,出于节省成本的考虑,用户希望尽可能的在资源利用方面进行优化。

接下来,我们来看针对这些痛点,EMR-StarRocks在与Flink深度结合方面做了哪些优化,提供了什么样的解决方案。

CTAS&CDAS

EMR-StarRocks与Flink团队推出的CTAS&CDAS功能主要是针对前三个痛点研发的一个解决方案。通过CTAS&CDAS,可以使用一条SQL语句,完成StarRocks建表、Flink-CDC任务创建、实时同步Schema变更等原本需要多项繁杂操作的任务,令开发和运维的工作量大大降低。

CTAS介绍

CTAS的全称是create table as,语法结构如下:

CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
'database-name'='test_cdc',
'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
'load-url'='172.16.**.**:8030',
'table-name'='runoob_tbl_sr',
'username'='test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000'
)
 as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
  'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port' = '3306',
  'username' = 'test',
  'password' = '123456',
  'database-name' = 'test_cdc',
  'table-name' = 'runoob_tbl'  )*/;

通过CTAS的语法结构可以看到,除了集群信息和DataBase信息外,还有一个特殊配置“starrocks.create.table.properties”,这是由于Mysql与StarRocks的表结构有一些不同,如Key Type、分区、Bucket Number等特殊配置,因此用它来承接StarRocks建表语句中字段定义后面的内容。

为了方便用户更快的建表,还设置了一个Simple Mode,配置方式如下:

CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'database-name'='test_cdc',
'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
'load-url'='172.16.**.**:8030',
'table-name'='runoob_tbl_sr',
'username'='test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000'
)
 as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
  'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port' = '3306',
  'username' = 'test',
  'password' = '123456',
  'database-name' = 'test_cdc',
  'table-name' = 'runoob_tbl'  )*/;

开启Simple Mode之后,将默认使用Primary Key模型,默认使用Mysql中的主键作为Primary Key,默认使用哈希(主键)进行分桶,这样,用户在启动Simple Mode对表使用CTAS语句时,就完全不需要关心Mysql中原表有哪些字段,字段名称是什么,主键是什么,只需要知道表名,就可以高效的完成SQL编写。

CTAS的原理

在这里插入图片描述

如图所示,在执行了CTAS语句后,首先Flink会自动在StarRocks中创建一个与Mysql源表的Schema相同的目标表,然后建立Mysql与StarRocks表在Flink中的Sink和Source映射,接下来启动一个CDC任务,该任务将同步源表数据到目标表,并在运行时监测Mysql源表发送过来的数据发生的Schema变更,自动将Schema变更同步到StarRocks目标表中。CTAS功能实际上是用一个SQL,完成了原本需要手动编写SQL和执行的多项操作。

接下来介绍CTAS的实现原理。CTAS的实现主要依赖了Flink CDC、Flink Catalog和Schema Evolution。Flink的CDC功能前面已经介绍过了。其中的Catalog功能,使Flink可以感知到StarRocks中所有的DataBase和所有table的Schema,并对它们进行DDL操作。而Schema Evolution功能,是通过对数据的Schema变化进行检测和记录实现的,例如,当Mysql发生增列操作时,CTAS任务并不会根据Mysql的DDL变化,立刻对下游StarRocks进行添加列的操作,而是当第一条使用了新Schema的数据被处理时,才会通过对比新旧数据Schema的区别,生成对应的Alter Table Add Column语句,对StarRocks进行增列操作,在等待StarRocks的Schema变更完成之后,新的数据才会被推送到下游。

CDAS介绍

CDAS是CTAS的一个语法糖。通过CDAS语句,可以实现Mysql中的整库同步,即生成一个Flink Job,Source是Mysql中的database,目标表是StarRocks中对应的多张表。

CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'database-name'='test_cdc',
'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
'load-url'='172.16.**.**:8030',
'username'='test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000'
)
 as table mysql.test_cdc.runoob_tbl including table
 'tabl1','tbl2','tbl3'   /*+ OPTIONS (   'connector' = 'mysql-cdc',
  'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port' = '3306',
  'username' = 'test',
  'password' = '123456',
  'database-name' = 'test_cdc' )*/;

由于我们期望使用一条SQL生成多张表的Schema和CDC任务,因此需要统一使用Simple模式。在实际使用过程中,一个DataBase中可能有些表不需要同步、有些表需要自定义配置,因此我们可以使用Including Table语法,只选择一个DataBase中的部分表进行CDAS操作,对于需要自定义属性配置的表,则使用CTAS语句进行操作。

重要特性

CTAS&CDAS的几个重要特性包括:

  • 支持将多个CDC任务使用同一个Job执行,节省了大量的内存和CPU资源。
  • 支持Source合并,在使用CDAS进行数据同步时,会使用一个Job管理所有表的同步任务,并自动将所有表的Source合并为一个,减少Mysql侧并发读取的压力。
  • 支持的Schema Change类型包括增加列、删除列和修改列名。这里需要注意的是,当前所支持的删除列操作,是通过将对应字段的值置空来实现的,例如上游Mysql表删除了一个字段,在Flink检测到数据Schema变更后,并不会将StarRocks中对应的列删除,而是在将数据写入到StarRocks时,把对应的字段的值填为空值。而修改列名的操作,也是通过增加一个新列,并把新数据中原来的列的值置空来实现的。

Connector-V2介绍

Connector-V2是为了解决第四个痛点而研发的,可以帮助用户降低通过Flink导入StarRocks时的内存消耗,提升任务的稳定性。

在这里插入图片描述

如图所示,在V1版本中,为了保证Exactly-Once,我们需要将一次Checkpoint期间的所有数据都憋在Flink的Sink算子的内存中,由于Checkpoint时间不能设置的太短,且无法预测单位时间内数据的流量,因此不仅造成了内存资源的严重消耗,还经常因OOM带来稳定性问题。

V2版本通过两阶段提交的特性解决了这个问题,两阶段提交指的是,数据的提交分为两个阶段,第一阶段提交数据写入任务,在数据写入阶段数据都是不可见的,并且可以分批多次写入,第二阶段是提交阶段,通过Commit请求将之前多批次写入的数据同时置为可见。StarRocks侧提供了Begin、Prepare、Commit等接口,支持将多次数据写入请求作为同一个事务提交,保证了同一事务内数据的一致性。

通过显示的调用Transaction接口的方式,可以由原来在Flink侧积攒大批数据、一次性发送数据的方式,改进为连续小批量提交数据,在保证Exactly-Once的同时,大大降低了Flink侧用于存储数据Buffer的内存消耗问题,也提高了Flink任务的稳定性。

StarRocks + Flink在汇量的实践

在汇量的广告投放分析业务中,使用了CDAS特性来完成Mysql到Flink数据的实时变更。
在这里插入图片描述

此前,该业务主要依托某闭源数据仓库进行OLAP分析,随着数据量的增长,在单表查询和多表Join场景都出现了较大的瓶颈,查询耗时达到无法容忍的分钟级,因此重新选型采用了StarRocks进行数据分析,在对应场景下表现十分优异。

在汇量的业务场景下,StarRocks中有几十张涉及操作元数据的小表是使用CDAS进行实时同步的,另外几张数据量较大的明细表是以离线导入的形式按天更新的。使用CDAS的主要是数据更新和Schema变化较为频繁的小表和维度表,进行业务查询时,将这些实时更新的表与离线的数据表进行Join,通过小表实时更新、大表离线更新、大小表联合查询的方式,实现了实时性、成本以及导入与查询性能的取舍均衡。由于业务对数据的准确性要求较高,因此使用了Exactly-once语义,通过Flink的Checkpoint机制来保证数据的不丢不重。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/36574.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

帝国cms后台登录系统限制次数,60分钟过后重新登录解决办法

帝国cms后台登录系统一不小心登录频繁就提示: 系统限制的登录次数不得超过5次,请等60分钟过后,方可重新登录 主要原因就是频繁的输错用户名或者密码导致登录受限 解帝国cms后台登录系统限制次数方法一:等待60分钟,然后再尝试登录 解帝国cms后台登录系统限制次数方法二:修改…

Hive之DQL操作

Hive系列第六章 (实际是第七篇,就不改目录序号了,大家知道就行,后续的篇章类推即可) 第六章 DQL查询数据 DDL: Data Definition Language 数据定义语言 DML: Data Manipulation Language …

【科学文献计量】GC.networkCoInvestigator()和GC.networkCoInvestigator()中的参数解释

@TOC 1 数据 使用官网提供的基金数据导入到python环境中 2 GC.networkCoInvestigator()中的参数解释 GC.networkCoInvestigator()中的参数解释: targetTagsL: [list]数据类型。默认为None,可以指定为Grant中研究者的标签构成的列表,很多基金中作者没有已知的标签,需要自…

最新版本EasyRecovery15个人免费版电脑数据恢复工具

最新版本EasyRecovery15是一款是款恢复率高、速度快的数据恢复软件,Ontrack EasyRecovery (易恢复) 跨平台支持 Windows 以及 Mac 系统,能能够顺利找回因各种原因丢失的文件,比如文件误删除、误格式化、分区丢失等,且EasyRecovery…

一种获得离散型周期数据的变化周期的算法

400个数据像这样: 152 155 155 237 24 27 27 109 152 155 155 237 24 27 27 109 152 155 155 237 24 27 27 109 152 155 155 237 24 27 27 109 152 155 155 237 24 27 27 109 152 155 155 237 24 27 27 109 152 155 155 237 24 27 27 109 152 155 155 237 24 27 27 …

【Android 开发】 面试官刨根问底?教你如何避免翻车沟通表达能力

很久以前,凭借四大组件、Java基础等知识,便可开开心心的开发,轻松的上岗; 而随着Android的不断发展完善,各种组件库越来越成熟,学习资料越来越多,我们却慢慢的看不到了方向;信息爆炸…

Servlet(Cookie和Session)

目录 🐲 1. Cookie 的工作流程 🐲 2. Servlet中操作 Cookie 和 Session 的api 🐲 3. 案例1: 模拟登录 🐲 4. 上传文件 🐲 5. 案例2: 上传文件 🐲 1. Cookie 的工作流程 Cookie 是浏览器在本地持久化保…

[附源码]SSM计算机毕业设计朋辈帮扶系统JAVA

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

[毕业设计]opencv机器学习双目测距精度测量系统

前言 📅大四是整个大学期间最忙碌的时光,一边要忙着备考或实习为毕业后面临的就业升学做准备,一边要为毕业设计耗费大量精力。近几年各个学校要求的毕设项目越来越难,有不少课题是研究生级别难度的,对本科同学来说是充满挑战。为帮助大家顺利通过和节省时间与精力投…

Flutter高仿微信-第43篇-群聊列表

Flutter高仿微信系列共59篇,从Flutter客户端、Kotlin客户端、Web服务器、数据库表结构、Xmpp即时通讯服务器、视频通话服务器、腾讯云服务器全面讲解。 详情请查看 效果图: 实现代码: /*** Author : wangning* Email : maoning20080809163.c…

泰克Tektronix 信号发生器AFG31022 ,2频道,25MHz

AFG31022 泰克函数发生器 AFG31022 是 Tektronix 的 25 MHz 函数发生器。 产品特征: 2个频道 25 MHz 正弦波 输出幅度范围:1 mVP-P 至 10 VP-P,50 Ω 负载 14 位垂直分辨率 内置波形包括正弦波、方波、斜波、脉冲、噪声和其他常用波形…

数据库设计规范

一、概述 系统数据库表的设计如果有问题,可能造成数据冗余、信息重复、存储空间浪费、数据插入更新异常等。设计一个好的数据表可尽量避免上述问题的发生,如何设计一个好的数据库是有一定的规范的。而这些设计表的基本原则规范称为范式 二、范式 1、范…

实战渗透--一次对后台登录系统的简单渗透测试

某网站后台登录界面 发现有验证码框 猜想会不会存在验证码绕过的漏洞 首先随意输入用户名密码(用于抓包) 打开burp抓包 分析数据包后 找到对应的传参点 即输入的账号密码还有验证码 这里可以看到 账号和密码全都是明文传输 并没有进行加密 所以更改起来还…

【微服务】CORS跨越问题网关请求转发时进行路径重写问题

一、跨域 CORS 简述 跨域官方文档: https://developer.mozilla.org/zh-CN/docs/Web/HTTP/CORS 浏览器跨越问题的英文简写为CORS,其出现问题的截图如下: 浏览器拒绝跨域,是通过同源策略限制的。同源策略是指,发送请求…

文献学习03_GloVe: Global Vectors for Word Representation_20221124

论文信息 Subjects:《2014年自然语言处理经验方法会议论文集》(EMNLP),第1532–1543页,2014年10月25日至29日, (1)题目:GloVe: Global Vectors for Word Representation &#xff0…

本地GitLab服务器搭建

一、简介 GitLab 是一个用于仓库管理系统的开源项目,使用Git作为代码管理工具,并在此基础上搭建起来的Web服务。安装方法是参考GitLab在GitHub上的Wiki页面。Gitlab是被广泛使用的基于git的开源代码管理平台, 基于Ruby on Rails构建, 主要针对软件开发过…

把握性能测试重点,5步解决问题!

一、引言 很多做性能测试的同学都问过我这样一个问题:鱼哥(Carl_奕然),你说性能测试的重点是什么? 我的回答很简单:瓶颈分析与问题定位。 在性能项目的整个周期,不管是脚本设计,脚本编写还是脚本执行,都…

什么时候可以用到强化学习?强化学习怎么用?

我相信很多像我一样的初学者在学习强化学习的的过程会有一种困惑:强化学习内容搞懂了,算法流程也明白了,但是怎么用在自己的研究领域或者应用上呢?换句话说,什么样的情况可以用强化学习解决呢? 什么是强化…

基于MxNet实现目标检测-CenterNet【附部分源码及模型】

文章目录前言目标检测发展史及意义一、数据集的准备1.标注工具的安装2.数据集的准备3.标注数据4.解释xml文件的内容二、网络结构的介绍三、代码实现0.工程目录结构如下1.导入库2.配置GPU/CPU环境3.数据加载器4.模型构建5.模型训练1.学习率设置2.优化器设置3.损失设置4.循环训练…

24.java- File类的常用方法:遍历目录里的文件

遍历目录 通过遍历目录可以在指定的目录中查找文件,或者显示所有的文件列表。 1.File 类的 list() File 类的 list() 方法提供了遍历目录功能,该方法有如下两种重载形式。 String[] list() 该方法表示返回由 File 对象表示目录中所有文件和子目录名称…