Flink-经典案例WordCount快速上手以及安装部署

news2025/7/21 10:54:39

2 Flink快速上手

2.1 批处理api

  1. 经典案例WordCount
public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        //1.创建一个执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2.从文件中读取数据
        //得到数据源,DataSource底层是DataSet这个数据集
        DataSource<String> lineDataSource = env.readTextFile("input/words.txt");

        //3.将每行数据进行分词,转换成二元组类型
        //FlatMapOperator返回的是一个算子,底层是DataSet这个数据集
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
                    //将一行文本进行分词
                    String[] words = line.split(" ");
                    for (String word : words) {
                        //collect是收集器的用法,of是构建二元组的实例,并输出
                        out.collect(Tuple2.of(word, 1L));
                    }
                })       //泛型擦除,指定tuple的类型
                .returns(Types.TUPLE(Types.STRING, Types.LONG));

        //4.按照word进行分组
        //和spark不一样,没有groupby,所以要根据索引指定key
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);

        //5.所以分组内进行聚合统计
        //也是需要索引指定需要对哪一个求和,然后得到一个聚合算子
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);

        //6.打印输出
        sum.print();
    }
}

  1. 说明

以上的代码还是基于DataSet的api,但是DataSet的api已经处于软弃用,默认流处理,需要批处理的时候,将提交任务时通过执行模式设为batch进行,脚本如下

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2.2 流处理

2.2.1 有界流

  1. 代码
public class BoundedStreamWordCount {
    public static void main(String[] args) throws Exception {
        //1.创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.读取文件
        DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/words.txt");

        //3.转换计算
        //底层是DataStream
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        //4.分组
        //keyby传一个lambda表达式,Tuple提取当前第一个字段,Tuple的第一个字段定义分别是f0,f1
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);

        //5.求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);

        //6.打印输出
        sum.print();

        //7.启动执行
        //一直处于流状态,需要给他启动
        env.execute();
    }
  1. 输出结果
10> (flink,1)
4> (hello,1)
2> (java,1)
4> (hello,2)
7> (world,1)
4> (hello,3)

最后结果和批处理一样,并伴随中间过程,而且乱序

代码使用多线程模拟的分布式集群,也就是并行度(默认是电脑cpu的核数),数字表示数字槽

2.2.2 无界流

  1. 代码

主要注意1,2,3到最后都是跟上面一样

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        //1.创建流失执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.读取文本流
        DataStreamSource<String> lineDataStream = env.socketTextStream("192.168.60.132", 7777);

        //3.转换计算
        //底层是DataStream
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        //4.分组
        //keyby传一个lambda表达式,Tuple提取当前第一个字段,Tuple的第一个字段定义分别是f0,f1
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);

        //5.求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);

        //6.打印输出
        sum.print();

        //7.启动执行
        //一直处于流状态,需要给他启动
        env.execute();
    }
}
  1. 或者直接配置好主机名和端口号的参数
  • 代码编写
        //从参数提取主机名和端口号
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String hostname = parameterTool.get("host");
        Integer port = parameterTool.getInt("port");
  • idea设置

在这里插入图片描述

3 Flink部署

3.1 部署模式

  1. 会话模式
  • 内容

先确定集群,并且资源确定,提交的作业会竞争集群中的集群

  • 缺点

资源不够,提交作业失败

  • 使用范围

会话模式使用与单个规模小,执行时间短的大量作业

  1. 单作业模式
  • 内容

每个作业启动后启动集群,运行结束后,集群就会关闭

  • 条件

单作业需要借助资源管理器

  1. 应用模式
  • 内容

根据一个应用而后启动集群,直接交给JobManager

3.2 独立模式(standalone)的部署

  1. 会话模式

就刚刚的那些代码,先启动集群,在提交的作业

  1. 单作业模式

没有

  1. 应用模式部署
  • 先把jar包放到/lib下
  • 然后根据flink自带的jar包启动,会去自动扫描lib下的jar启动
  • 启动taskmanager
  • 停掉集群

3.3 yarn模式

3.3.1 总体流程

  • 客户端把flink应用提交给Yarn的ResourceManager,然后RM再向NodeManager申请容器
  • flink会部署JobManager和TaskManager的实例
  • Flink会根据运行在JobManager上的作业所需要的Slot数量动态分配TaskManager资源

3.3.2 安装部署

  1. 解压
[hadoop1@hadoop2 software]$ tar -zxvf flink-1.13.0-bin-scala_2.12.tgz -C /opt/module/
  1. 分发
[hadoop1@hadoop2 module]$ xsync flink/
  1. 启动
[hadoop1@hadoop2 bin]$ ./start-cluster.sh 
  1. 网页看下

http://hadoop2:8081/

在这里插入图片描述

  1. 配置环境变量
  • hadoop
#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath
  • 激活
source /etc/profile.d/my_env.sh
  • 分发
sudo /home/hadoop1/bin/xsync /etc/profile.d/my_env.sh 
  • 群起hadoop
[hadoop1@hadoop2 bin]$ hdp.sh start
 =================== 启动 hadoop集群 ===================
 --------------- 启动 hdfs ---------------
Starting namenodes on [hadoop2]
Starting datanodes
Starting secondary namenodes [hadoop4]
 --------------- 启动 yarn ---------------
Starting resourcemanager
Starting nodemanagers
hadoop4: nodemanager is running as process 2335.  Stop it first.
 --------------- 启动 historyserver ---------------

  • 检查情况
[hadoop1@hadoop2 bin]$ xcall jps
--------- hadoop2 ----------
3092 JobHistoryServer
2901 NodeManager
3174 Jps
2366 NameNode
2527 DataNode
--------- hadoop3 ----------
2032 DataNode
2258 ResourceManager
2888 Jps
2478 NodeManager
--------- hadoop4 ----------
2149 Jps
2070 SecondaryNameNode
1945 DataNode
2854 NodeManager

3.3.3 会话模式部署

  1. 去bin下启动会话
[hadoop1@hadoop2 flink]$ ls bin/
bash-java-utils.jar  jobmanager.sh              pyflink-shell.sh           stop-zookeeper-quorum.sh
config.sh            kubernetes-entry.sh        sql-client.sh              taskmanager.sh
find-flink-home.sh   kubernetes-session.sh      standalone-job.sh          yarn-session.sh
flink                mesos-appmaster-job.sh     start-cluster.sh           zookeeper.sh
flink-console.sh     mesos-appmaster.sh         start-scala-shell.sh
flink-daemon.sh      mesos-taskmanager.sh       start-zookeeper-quorum.sh
historyserver.sh     pyflink-gateway-server.sh  stop-cluster.sh
[hadoop1@hadoop2 flink]$ ./bin/yarn-session.sh -nm test -d
  1. 网页

在这里插入图片描述

http://hadoop2:46082/ UI对应随着启动而变

在这里插入图片描述

  1. 提交参数

在这里插入图片描述

-n参数和-s参数表示TaskManager和slot数量,原来可以指定,到了flink1.11.0版本后,进行动态分配,避免资源设置过大造成的浪费

  1. 提交作业
  • 将jar包打包到flink
  • 执行脚本
 ./bin/flink run -c com.atguigu.wc.StreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar

会出现启动running的网页

在这里插入图片描述

  • 在hadoop2的7777端口输入一些数据

在这里插入图片描述

会出现结果

在这里插入图片描述

  • 继续提交作业并且并行度设为2
[hadoop1@hadoop3 flink]$  ./bin/flink run -c com.atguigu.wc.StreamWordCount -p 2 ./FlinkTutorial-1.0-SNAPSHOT.jar

在这里插入图片描述
在这里插入图片描述

3.3.4 单作业模式部署

  1. 提交

yarn-per-job表示作业模式

in/flink run -d -t yarn-per-job -c com.atguigu.wc.StreamWordCount 
FlinkTutorial-1.0-SNAPSHOT.jar

3.3.5 应用模式

  1. 提交

run-application表示应用模式

$ bin/flink run-application -t yarn-application -c com.atguigu.wc.StreamWordCount 
FlinkTutorial-1.0-SNAPSHOT.jar

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/7825.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[附源码]java毕业设计基于Web留学管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Linux下C++开发笔记--编译静态链接库和动态链接库

目录 1--前言 2--生成静态链接库 3--生成动态链接库 1--前言 承接上一篇Linux下C开发笔记&#xff08;g命令的使用笔记&#xff09;&#xff0c;依据教程记录学习笔记。 2--生成静态链接库 ①回顾项目结构&#xff1a; ​ ②汇编&#xff0c;生成swap.o文件 cd srcg sw…

基于simulink的牛鞭效应模型建模与仿真

欢迎订阅《FPGA学习入门100例教程》、《MATLAB学习入门100例教程》 目录 一、理论基础 二、核心程序 三、测试结果 一、理论基础 牛鞭效应&#xff0c;是经济学中的一个术语&#xff0c;它也被称为需求放大效应。牛鞭效应指的是当信息流从最终客户端传输到原始供应商时&…

9.行为建模(Behavioral modeling)

9.1行为模型概述 Verilog行为模型包含控制模拟和操纵先前描述的数据类型变量的过程语句。这些语句包含在程序中。每个过程都有一个与其关联的活动流。活动开始于initial和always语句。每个initial语句和每个always语句都会启动一个单独的活动流。所有活动流都是并发的&…

【机器学习】线性分类【上】广义线性模型

主要参考了B站UP主“shuhuai008”&#xff0c;包含自己的理解。 有任何的书写错误、排版错误、概念错误等&#xff0c;希望大家包含指正。 由于字数限制&#xff0c;分成两篇博客。 【机器学习】线性分类【上】广义线性模型 【机器学习】线性分类【下】经典线性分类算法 1. 线…

C语言实现线索化二叉树(先序、中序、后序)

》》如何用C语言构建一颗二叉树? 第一种方法: ThreadTree A = (ThreadTree)malloc(sizeof(ThreadNode));A->data = { A };A->ltag = 0;A->rtag = 0;A->lchild = NULL;A->rchild = NULL;ThreadTree B = (ThreadTree)malloc(sizeof(ThreadNode));B->data =…

【python自动化】使用关键字驱动实现appium自动化

在写app自动化用例时&#xff0c;尝试用了关键字驱动的框架 记录一下自己对关键字驱动的理解&#xff1a; 1 关键字驱动指将用例步骤的操作封装为关键字&#xff0c;比如定位元素、点击元素、获取元素属性值、断言&#xff0c;这些都是操作关键字 2 在excel中按照用例执行过程&…

Java8方法引用和Lambda表达式实例源码+笔记分享

前言 Java8的lambda表达式&#xff0c;通过lambda表达式可以替代我们之前写的匿名内部类来实现接口。lambda表达式本质是一个匿名函数。 1、lambda表达式本质是一个匿名函数。 1 package com.demo.main;2 3 public class LambdaMain {4 5 public static void main(String[…

环辛炔衍生物DBCO-NH2,amine,Acid,NHS,Maleimide无铜点击反应

DBCO对叠氮化物具有非常高的反应选择性&#xff0c;可用于修饰生物分子&#xff0c;包括肽、蛋白质、酶、活细胞、整个生物体等。在生理温度和pH值范围内&#xff0c;DBCO基团不与胺或羟基反应&#xff0c;DBCO也与叠氮化物基团发生反应DBCO也称为ADIBO&#xff08;氮杂二苯并环…

2022.11.15-二分图专练

目录 50 years, 50 colors(HDU-1498) Uncle Toms Inherited Land*(HDU-1507) Matrix(HDU-2119) Arbiter(HDU-3118) [ZJOI2007]矩阵游戏(黑暗爆炸1059) Jimmy’s Assignment(HDU-1845) 50 years, 50 colors(HDU-1498) 原题链接:传送门 题意:一个n*n的矩阵中&#xff0c;…

第四章. Pandas进阶—数据格式化

第四章. Pandas进阶 4.1 数据格式化 1.设置小数位数(round函数) DataFrame.round(decimals0&#xff0c;*args&#xff0c;**kwargs)参数说明: decimals:用于设置保留的小数位数 args&#xff0c;kwargs:附加关键字的参数 返回值&#xff1a;返回DataFrame对象 1).示例&#…

HTML常用标签的使用

HTML常用标签的使用 文章目录HTML常用标签的使用1.排版标签1.1 标题标签&#xff08;h&#xff09;1.2 段落标签&#xff08;p&#xff09;1.3 换行标签&#xff08;br&#xff09;1.4 水平线标签&#xff08;hr&#xff09;2.文本格式化标签&#xff08;strong、ins、em、del&…

Vue(七)——Vue中的Ajax

目录 Vue脚手架配置代理 插槽 默认插槽 具名插槽 作用域插槽 Vue脚手架配置代理 本案例需要下载axios库&#xff1a;npm install axios 1.配置类方式(实现WebMvcConfigurer) 2.使用CrossOrigin注解 3.使用nginx反向代理解决跨域 4.Vue中配置代理服务器 代理服务器怎…

懒人的法宝——办公自动化!

没错&#xff01;办公自动化他来了&#xff01;果然&#xff0c;代码都是懒人发明出来的。接下来让我们一起来看看这个批改作业的自动化脚本吧&#xff01;学会了这种思想可以帮助我们高效解决许多重复性的工作&#xff0c;比如说批量修改文件的名称、类型、位置等等&#xff0…

【附源码】计算机毕业设计JAVA计算机系教师教研科研管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot mybatis Maven Vue 等等组成&#xff0c;B/…

计算机网络-网络层(BGP协议,IP组播,IGMP协议与组播路由选择协议)

文章目录1. BGP协议BGP协议报文格式2. RIP&#xff0c;OSPF&#xff0c;BGP协议对比3. IP组播4. IGMP协议与组播路由选择协议1. BGP协议 与其他自治系统的邻站BGP发言人&#xff08;BGP边界路由器&#xff09;交换信息 BGP边界路由器之间交换网络可达性的信息&#xff0c;即要…

C++Qt开发——SMTP协议

1. SMTP协议简介 SMTP协议&#xff1a;全称为 Simple Mail Transfer Protocol&#xff0c;简单邮件传输协议。它定义了邮件客户端软件和SMTP邮件服务器之间&#xff0c;以及两台SMTP邮件服务器之间的通信规则。 SMTP是一个相对简单的基于文本的协议。在其之上指定了一条消息的…

网络安全之命令执行漏洞复现

0x01 漏洞介绍 漏洞等级&#xff1a;严重 Webmin是功能最强大的基于Web的Unix系统管理工具。管理员通过浏览器访问Webmin的各种管理功能并完成相应的管理动作。在版本1.997之前的Webmin中存在一个任意命令注入漏洞&#xff0c;触发该漏洞需登录Webmin。 0x02 漏洞影响范围 …

GitHub最新发布 阿里十年架构师手写版spring全家桶笔记全新开源

嗨咯&#xff0c;大家好&#xff01; 没错&#xff0c;又是我&#xff0c;还跟前面一样&#xff0c;有好东西我才会出现。那是什么好东西呢&#xff1f;今天啊&#xff0c;给他分享阿里在Github最新发布的spring全家桶笔记第九版&#xff0c;这份笔记一共分三份&#xff1a;sp…

自学前端开发 - VUE 框架 (二):渲染、事件处理、表单输入绑定、生命周期、侦听器、组件基础

[TOC](自学前端开发 - VUE 框架 (二) 事件处理、表单输入绑定、生命周期、侦听器、组件基础) 事件处理 可以使用 v-on 指令 (简写为 ) 来监听 DOM 事件&#xff0c;并在事件触发时执行对应的 JavaScript。用法&#xff1a;v-on:click"methodName" 或 click"ha…