Apache Flink 快速入门
Flink开发环境准备学习一门新的编程语言时往往会从hello world程序开始而接触一套新的大数据计算框架时则一般会从WordCount案例入手下面以大数据中最经典入门案例WordCount为例来编写Flink代码Flink底层源码是基于Java代码进行开发在Flink编程中我们除了可以使用Java语言来进行编写Flink程序外还可以使用Scala、Python语言来进行编写Flink程序在后续章节中我们将会主要使用Java和Scala来编写Flink程序。下面来准备下Flink开发环境。Flink版本本套课程中我们采用Flink最新版本1.16.0Flink1.16.0版本官方文档地址https://nightlies.apache.org/flink/flink-docs-release-1.16/JDK环境Flink核心模块均采用Java开发所以运行环境需要依赖JDK,Flink可以基于类UNIX 环境中运行例如Linux、Max OS、Windows等在这些系统上运行Flink时都需要配置JDK环境Flink 1.16.0版本需要JDK版本为JDK11,目前版本也支持使用JDK8后续版本对JDK8的支持将会移除。考虑到Flink后期与一些大数据框架进行整合这些大数据框架对JDK11的支持并不完善例如Hive3.1.3版本还不支持JDK11所以本课程采用JDK8来开发Flink。对JDK8安装及配置不再详述。附JDK11 下载地址如下https://www.oracle.com/java/technologies/javase-jdk11-downloads.html。开发工具我们可以选择IntelliJ IDEA或者Eclipse作为Flink应用的开发IDEFlink开发官方建议使用IntelliJ IDEA因为它默认集成了Scala和Maven环境使用更加方便我们这门课使用IntelliJ IDEA开发工具具体安装步骤不再详述。Maven环境通过IntelliJ IDEA进行开发Flink Application时可以使用Maven来作为项目jar包管理工具需要在本地安装Maven及配置Maven的环境变量需要注意的是Maven版本需要使用3.0.4及以上否则编译或开发过程中会有问题。这里使用Maven 3.2.5版本。Scala环境Flink开发语言可以选择Java、Scala、Python如果用户选择使用Scala作为Flink应用开发语言则需要安装Scala执行环境。在Flink1.15之前版本如果只是使用Flink的Java api 对于一些没有Scala模块的包和表相关模块的包需要在Maven引入对应的包中加入scala后缀例如flink-table-planner_2.11后缀2.11代表的就是Scala版本。在Flink1.15.0版本后Flink添加对opting-out排除 Scala的支持如果你只使用Flink的Java api导入包也不必包含scala后缀你可以使用任何Scala版本。如果使用Flink的Scala api需要选择匹配的Scala版本。从Flink1.7版本往后支持Scala 2.11和2.12版本从Flink1.15.0版本后只支持Scala 2.12不再支持Scala 2.11。Scala环境可以通过本地安装Scala执行环境也可以通过Maven依赖Scala-lib引入如果本地安装了Scala某个版本建议在Maven中添加Scala-lib依赖。Scala2.12.8之后的版本与之前的2.12.x版本不兼容,建议使用Scala2.12.8之后版本。Hadoop环境Flink可以操作HDFS中的数据及基于Yarn进行资源调度所以需要对应的Hadoop环境Flink1.16.0版本支持的Hadoop最低版本为2.8.5本课程中我们使用Hadoop3.3.4版本。关于Hadoop3.3.4版本搭建参照第三章节Flink入门案例需求读取本地数据文件统计文件中每个单词出现的次数。IDEA Project创建及配置本课程编写Flink代码选择语言为Java和Scala所以这里我们通过IntelliJ IDEA创建一个目录其中包括Java项目模块和Scala项目模块将Flink Java api和Flink Scala api分别在不同项目模块中实现。步骤如下打开IDEA创建空项目在IntelliJ IDEA 中安装Scala插件使用IntelliJ IDEA开发Flink如果使用Scala api 那么还需在IntelliJ IDEA中安装Scala的插件如果已经安装可以忽略此步骤下图为以安装Scala插件。打开Structure,创建项目新模块FlinkScalaCode模块导入Flink Maven依赖如下properties project.build.sourceEncodingUTF-8/project.build.sourceEncoding maven.compiler.source1.8/maven.compiler.source maven.compiler.target1.8/maven.compiler.target flink.version1.16.0/flink.version slf4j.version1.7.31/slf4j.version log4j.version2.17.1/log4j.version scala.version2.12.10/scala.version scala.binary.version2.12/scala.binary.version /properties dependencies !-- Flink批和流开发依赖包 -- dependency groupIdorg.apache.flink/groupId artifactIdflink-scala_${scala.binary.version}/artifactId version${flink.version}/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-streaming-scala_${scala.binary.version}/artifactId version${flink.version}/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-clients/artifactId version${flink.version}/version /dependency !-- Scala包 -- dependency groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId version${scala.version}/version /dependency dependency groupIdorg.scala-lang/groupId artifactIdscala-compiler/artifactId version${scala.version}/version /dependency dependency groupIdorg.scala-lang/groupId artifactIdscala-reflect/artifactId version${scala.version}/version /dependency !-- slf4jlog4j 日志相关包 -- dependency groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId version${slf4j.version}/version /dependency dependency groupIdorg.apache.logging.log4j/groupId artifactIdlog4j-to-slf4j/artifactId version${log4j.version}/version /dependency /dependencies案例数据准备Flink案例实现数据源分为有界和无界之分有界数据源可以编写批处理程序无界数据源可以编写流式程序。DataSet API用于批处理DataStream API用于流式处理。批处理使用ExecutionEnvironment和DataSet流式处理使用StreamingExecutionEnvironment和DataStream。DataSet和DataStream是Flink中表示数据的特殊类DataSet处理的数据是有界的DataStream处理的数据是无界的这两个类都是不可变的一旦创建出来就无法添加或者删除数据元。Flink 批数据处理案例以上输出结果开头展示的是处理当前数据的线程一个Flink应用程序执行时默认的线程数与当前节点cpu的总线程数有关。Flink批和流案例总结关于以上Flink 批数据处理和流式数据处理案例有以下几个点需要注意Flink程序编写流程总结编写Flink代码要符合一定的流程Flink代码编写流程如下a. 获取flink的执行环境批和流不同Execution Environment。b. 加载数据数据-- soure。c. 对加载的数据进行转换-- transformation。d. 对结果进行保存或者打印-- sink。e. 触发flink程序的执行 --env.execute()在Flink批处理过程中不需要执行execute触发执行在流式处理过程中需要执行env.execute触发程序执行。关于Flink的批处理和流处理上下文环境创建Flink批和流上下文环境有以下三种方式批处理上下文创建环境如下//设置Flink运行环境如果在本地启动则创建本地环境如果是在集群中启动则创建集群环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();//指定并行度创建本地环境LocalEnvironment localEnv ExecutionEnvironment.createLocalEnvironment(10);//指定远程JobManagerIp 和RPC 端口以及运行程序所在Jar包及其依赖包ExecutionEnvironment remoteEnv ExecutionEnvironment.createRemoteEnvironment(JobManagerHost, 6021, 5, application.jar);流处理上下文创建环境如下//设置Flink运行环境如果在本地启动则创建本地环境如果是在集群中启动则创建集群环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//指定并行度创建本地环境LocalStreamEnvironment localEnv StreamExecutionEnvironment.createLocalEnvironment(5);//指定远程JobManagerIp 和RPC 端口以及运行程序所在Jar包及其依赖包StreamExecutionEnvironment remoteEnv StreamExecutionEnvironment.createRemoteEnvironment(JobManagerHost, 6021, 5, application.jar);同样在Scala api 中批和流创建Flink 上下文环境也有以上三种方式在实际开发中建议批处理使用ExecutionEnvironment.getExecutionEnvironment()方式创建。流处理使用StreamExecutionEnvironment.getExecution-Environment()方式创建。Flink批和流 Java 和 Scala导入包不同在编写Flink Java api代码和Flink Scala api代码处理批或者流数据时引入的ExecutionEnvironment或StreamExecutionEnvironment包不同在编写代码时导入错误的包会导致编程有问题。批处理不同API引入ExecutionEnvironment如下//Flink Java api 引入的包import org.apache.flink.api.java.ExecutionEnvironment;//Flink Scala api 引入的包import org.apache.flink.api.scala.ExecutionEnvironment流处理不同API引入StreamExecutionEnvironment如下//Flink Java api 引入的包import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;//Flink Scala api 引入的包import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentFlink Java Api中创建 Tuple 方式在Flink Java api中创建Tuple2时可以通过new Tuple2方式也可以通过Tuple2.of方式两者本质一样。Flink Scala api需要导入隐式转换在Flink Scala api中批处理和流处理代码编写过程中需要导入对应的隐式转换来推断函数操作后的类型在批和流中导入隐式转换不同具体如下//Scala 批处理导入隐式转换使用Scala API 时需要隐式转换来推断函数操作后的类型import org.apache.flink.api.scala._//Scala 流处理导入隐式转换使用Scala API 时需要隐式转换来推断函数操作后的类型import org.apache.flink.streaming.api.scala._关于Flink Java api 中的 returns 方法Flink Java api中可以使用Lambda表达式当涉及到使用泛型Java会擦除泛型类型信息需要最后调用returns方法指定类型明确声明类型告诉系统函数生成的数据集或者数据流的类型。批和流对数据进行分组方法不同批和流处理中都是通过readTextFile来读取数据文件对数据进行转换处理后Flink批处理过程中通过groupBy指定按照什么规则进行数据分组,groupBy中可以根据字段位置指定key例如groupBy(0)如果数据是POJO自定义类型也可以根据字段名称指定key(例如groupBy(name))对于复杂的数据类型也可以通过定义key的选择器KeySelector来实现分组的key。Flink流处理过程中通过keyBy指定按照什么规则进行数据分组keyBy中也有以上三种方式指定分组key建议使用通过KeySelector来选择key其他方式已经过时。关于DataSet Api (Legacy)软弃用Flink架构可以处理批和流Flink 批处理数据需要使用到Flink中的DataSet API此API 主要是支持Flink针对批数据进行操作本质上Flink处理批数据也是看成一种特殊的流处理有界流所以没有必要分成批和流两套API从Flink1.12版本往后Dataset API 已经标记为Legacy(已过时)已被官方软弃用官方建议使用Table API 或者SQL 来处理批数据我们也可以使用带有Batch执行模式的DataStream API来处理批数据在未来Flink版本中DataSet API 将会被删除。关于这些API 具体使用后续章节会进行讲解。DataStream BATCH模式除了在代码中设置处理模式外还可以在Flink配置文件(flink-conf.yaml)中设置execution.runtime-mode参数来指定对应的模式也可以在集群中提交Flink任务时指定execution.runtime-mode来指定Flink官方建议在提交Flink任务时指定执行模式这样减少了代码配置给Flink Application提供了更大的灵活性提交任务指定参数如下$FLINK_HOME/bin/flink run -Dexecution.runtime-modeBATCH -c xxx xxx.jar关于Flink集群提交任务及Flink flink-conf.yaml配置文件在下个章节集群搭建会进行介绍。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2635902.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!