flink的安装与使用(ubuntu)

news2025/7/13 23:46:48

组件版本

虚拟机:ubuntu-20.04.6-live-server-amd64.iso

flink:flink-1.18.0-bin-scala_2.12.tgz

jdk:jdk-8u291-linux-x64.tar

flink 下载

1、官网:https://flink.apache.org/downloads/

2、清华镜像:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/

flink 安装

1、上传文件至服务器指定路径

/usr/local/myapp/flink

2、解压文件

tar -zxvf flink-1.18.0-bin-scala_2.12.tgz -C /usr/local/myapp/flink

jdk 安装

1、ubuntu 中自带了 jdk,先将其卸载

sudo apt-get remove *openjdk*
sudo apt-get autoremove

2、上传文件至服务器指定路径

/usr/local/myapp/jdk

3、解压文件

tar -zxvf jdk-8u291-linux-x64.tar -C /usr/local/myapp/jdk

4、配置环境变量

vim /etc/profile

在文末增加配置(路径根据自身情况进行调整)

export JAVA_HOME=/usr/local/myapp/jdk/jdk1.8.0_291
export JRE_HOME=/usr/local/myapp/jdk/jdk1.8.0_291/jre
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JRE_HOME/lib

5、测试 jdk

root@vm1:/usr/local/myapp/jdk# java -version
java version "1.8.0_291"
Java(TM) SE Runtime Environment (build 1.8.0_291-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.291-b10, mixed mode)
root@vm1:/usr/local/myapp/jdk# javac -version
javac 1.8.0_291

测试 flink

1、进入到 flink 的安装路径下

cd /usr/local/myapp/flink/flink-1.18.0/

2、修改配置文件

vim conf/flink-conf.yaml

内容

jobmanager.bind-host: 0.0.0.0

3、关闭/禁用防火墙

systemctl stop ufw.service
systemctl disable ufw.service

4、启动 flink

./bin/start-cluster.sh

5、浏览器访问:http://ip:8081/

能看到内容说明正常

设置 flink 的 Standalone 模式集群并上传任务执行

1、机器规划

类型主机名IP
JobManagervm1192.168.141.120
TaskManagervm2192.168.141.121
TaskManagervm3192.168.141.122

2、设置每个服务器的机器名

vim /etc/hostname

3、设置每个服务器的 hosts 文件

vim /etc/hosts

增加三台服务器的机器名对照

192.168.141.120 vm1
192.168.141.121 vm2
192.168.141.122 vm3

使其立即生效(建议到这一步后,都重新启动下)

source /etc/hosts

4、设置服务器间的免密登录

4.1、自身免密

vm1 执行(vm2/vm3 同理)

ssh-keygen -t rsa

之后的内容全部回车即可

生成后,可在 /root/.ssh/ 中看到 id_rsa.pub 文件

通过命令设置到认证文件中

cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys 

重启服务器,通过命令测试是否可以免密登录自身

ssh vm1

通过 exit 命令可以退出当前的 ssh 登录

4.2、设置相互免密(以 vm1 为演示,其余服务器同理)

在 vm1 服务器中,将生成的自身密钥传输到其余两台服务器上

scp /root/.ssh/id_rsa.pub root@vm2:/root
scp /root/.ssh/id_rsa.pub root@vm3:/root

在 vm2/vm3 服务器中,将传输过来的密钥,通过命令设置到认证文件中

cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys 

vm1 设置完成,通过命令来测试能不能直接登录到 vm2/vm3 中

ssh vm2
ssh vm3

vm2/vm3 同理,都需要执行这些步骤:

A、生成自身密钥,添加到自身的认证文件中

B、将自身密钥传输到其余的服务器中,并在该服务器中通过命令设置自身密钥到其余服务器的认证文件中

注意:vm2 和 vm3 执行时,一个服务器完全执行结束/测试后,再进行下一个,不然会有密钥文件存在被覆盖的风险

5、设置主机时间同步

安装工具

apt-get install -y ntpdate

执行同步

ntpdate -u ntp.sjtu.edu.cn

6、配置 flink

以下以 vm1 为例,其他服务器的配置可将配置好的配置文件同步过去

6.1、masters 文件

vim masters

内容

vm1:8081

6.2、workers 文件

vim workers

内容

vm2
vm3

6.3、flink-conf.yaml 文件

vim flink-conf.yaml

内容(篇幅问题,去掉了注释)

env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED

jobmanager.rpc.address: vm1

jobmanager.rpc.port: 6123

jobmanager.bind-host: 0.0.0.0

jobmanager.memory.process.size: 1600m

taskmanager.bind-host: 0.0.0.0

taskmanager.memory.process.size: 1728m

taskmanager.numberOfTaskSlots: 3

parallelism.default: 1

jobmanager.execution.failover-strategy: region

rest.port: 8081

rest.address: vm1

rest.bind-address: vm1

blob.server.port: 45579

7、启动集群

只需在 vm1 上启动集群模式即可

root@vm1:/usr/local/myapp/flink/flink-1.18.0# ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host vm1.
Starting taskexecutor daemon on host vm2.
Starting taskexecutor daemon on host vm3.

可以看到 vm2/vm3 的也会被启动,不需要手动去 vm2/vm3 再启动一次了

可以通过 java 的 jps 命令查看程序是否启动成功了

vm1 上

在这里插入图片描述

vm2 上

在这里插入图片描述

vm3 上

在这里插入图片描述

从图上可以分析出是以 Standalone 的集群模式启动了,其中 vm1 是 JobManager,vm2/vm3 是 TaskManager

8、页面查看状态

浏览器输入地址:http://192.168.141.120:8081/

可看到主页面

在这里插入图片描述

9、自定义一个任务

idea 创建一个 maven 项目

9.1、依赖及插件

<properties>
    <flink.version>1.18.0</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>1.7.30</slf4j.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.14.0</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers combine.children="append">
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

9.2、程序内容

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

// 无界流
public class UnboundStreamJob {

    public static void main(String[] args) throws Exception {
        //1 获取flink运行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.加载数据源为dataStream  ,绑定客户机的9999端口,将这个网络端口发送的数据加载为dataStream
        DataStreamSource<String> dataStream = environment.socketTextStream("192.168.141.122", 9999, "\n");

        //3.执行多个转换算子 ,SingleOutputStreamOperator是DataStreamSource子类
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            //value:表示一个待处理的数据,在这里就是一行字符串
            //out:  用于输出结果的工具对象
            public void flatMap(String value, Collector<String> out) throws Exception {
                //拆分value,通过out输出结果
                String[] words = value.split("//s+");   //去除一个或多个空格
                for (String word : words) {
                    out.collect(word);
                }
            }
        })  //执行一行字符串拆分为多个单词
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        return Tuple2.of(value, 1);
                    }
                }) //将多个单词转换为(单词,1) 这种tuple2对象
                .keyBy(0)  //根据单词为key分组,0表示tuple2中的第一个属性,也就是单词
                .sum(1);//统计每组单词的个数,  1表示tuple2中第2个属性,也就是次数

        //4.通过sink算子输出结果
        result.print();
        //5.发布执行
        environment.execute("flinkWordCount"); //为任务起别名
    }

}

9.3、程序说明

与 vm3 所在的 IP 为 192.168.141.122 在 9999 端口上进行 socket 通信,程序接收到消息后,进行计算并输出到控制台中

10、在 vm3 上开启一个 socket 通信(这一步一定要在上传任务之前进行)

netcat -lk 9999

11、提交任务(WebUI 方式)

11.1、打包刚才的程序,将打包好的 jar 包复制到某个好找的路径

11.2、打开网页中的 Submit New Job 选项,并点击 Add New

在这里插入图片描述

11.3、选择刚才打包的 jar 包进行上传,之后点击该 jar 包,填写启动类的路径,之后点击 Submit 提交按钮

在这里插入图片描述

11.4、正常情况下,任务就发布完成了,可以在 Task Managers 查看哪个节点的 Free Slots 相比 All Slots 减少了一个,那么这个节点的服务器就是执行该任务的服务器

在这里插入图片描述

12、提交任务(命令方式)

12.1、上传 jar 包到服务器中(任意一个服务器都行)

root@vm1:/usr/local/myapp/flink/task# ls
demo01-1.0-SNAPSHOT.jar

12.2、添加到任务中

../flink-1.18.0/bin/flink run -d -c xx.xx.xx.UnboundStreamJob demo01-1.0-SNAPSHOT.jar

说明:需要指定启动类

12.3、看到下面的信息,说明提交任务完成

Job has been submitted with JobID a893314f5efbb93bf3e6edefa578fd35

13、测试

13.1、点击该服务器,其中的 Stdout 就是控制台输出的地方

我们在 vm3 中开启的 socket 通信中,发送一条消息

在这里插入图片描述

13.2、回到页面中,刷新下控制台输出,会发现多了一个输出信息

在这里插入图片描述

13.3、至此,测试就完成了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1158738.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue3+vite实现一个后台管理框架,毒蘑菇后台管理。

写后台管理的项目写了很多个了&#xff0c;虽说用的别人的模板&#xff0c;自己专注于自己的业务&#xff0c;保证自己的业务不出错就行了&#xff0c;但是自定义配置又不好去配置&#xff0c;大家用的模板都差不多&#xff0c;用模板自带的业务功能呢后台又得是模板自带的&…

Linu之VM及WindowsServer安装

# 1.操作系统 ## 1.1.什么是操作系统 操作系统&#xff08;Operating System&#xff0c;简称OS&#xff09;是一种系统软件&#xff0c;它是计算机硬件和应用软件之间的桥梁。它管理计算机的硬件和软件资源&#xff0c;为应用程序提供接口和服务&#xff0c;并协调应用程序的…

算法通过村第十八关-回溯|青铜笔记|什么叫回溯(后篇)

文章目录 前言回溯热身问题输出二叉树的所有路径&#xff1a;路径总和问题&#xff1a; 总结 前言 提示&#xff1a;今夜思量千条路&#xff0c;明朝依旧卖豆腐。 --谚语 回溯是非常重要的算法思想之一&#xff0c;主要解决一些暴力枚举也搞不定的问题&#xff08;这里埋个坑&a…

Ubuntu 20.04设置虚拟内存 (交换内存swap)解决内存不足

数据库服务器程序在运行起来之后&#xff0c;系统内存不足。 在系统监控中发现&#xff0c;当数据库服务程序启动后&#xff0c;占用了大量内存空间&#xff0c;导致系统的剩余的内存往往只有几十MB。 在ubuntu系统中&#xff0c;swap空间就是虚拟内存&#xff0c;所以考虑在磁…

2017年第三届 美亚杯电子取证 个人赛题解

记录做题个人赛题目 取证大师直接取证 1 Gary的笔记本电脑已成功取证并制作成镜像 (Forensic Image)&#xff0c;下列哪个是其MD5哈希值。 A.0CFB3A0BB016165F1BDEB87EE9F710C9 B.5F1BDEB87EE9F710C90CFB3A0BB01616 C.A0BB016160CFB3A0BB0161661670CFB3 D.16160CFB3A0BB0161…

〔001〕虚幻 UE5 发送 get、post 请求、读取 json 文件

✨ 目录 🎈 安装 varest 扩展🎈 开启 varest 扩展🎈 发送 get 请求🎈 发送 post 请求🎈 读取 json 文件🎈 安装 varest 扩展 打开 虚幻商城,搜索 varest 关键字进行检索, varest 是一个 api 调用插件,支持 http/https 请求,也支持 json 文件的读取,最关键是该…

【毕业设计】基于springboot+vue+ssm的家乡特色推荐系统【源码+LW+PPT】

摘 要 在Internet高速发展的今天&#xff0c;我们生活的各个领域都涉及到计算机的应用&#xff0c;其中包括家乡特色推荐的网络应用&#xff0c;在外国家乡特色推荐系统已经是很普遍的方式&#xff0c;不过国内的管理网站可能还处于起步阶段。家乡特色推荐系统采用java技术&am…

新工业革命?基于机器视觉技术分拣机器人的未来与发展

原创 | 文 BFT机器人 01 分拣机器人的应用 基于机器视觉技术的分拣机器人可以将工人从繁重的劳动中解放出来&#xff0c;大大提高了分拣的效率&#xff0c;因此被广泛地应用于食品、物流以及煤矿等多个行业。 1.1 分拣机器人在水果分拣中的应用 随着农业科技的发展和人民生活…

基于PyTorch的中文情绪分析器设计与开发

收藏和点赞&#xff0c;您的关注是我创作的动力 文章目录 概要 一、相关基础理论2.1 主流深度学习框架2.2 神经网络2.2.1 神经网络基础 二、中文情感分类模型构建3.1 开发环境3.2 数据部分3.3 文本特征提取3.3.1、过滤标点符号3.3.2 中文分词、单词过滤 三 运行结果与分析五 结…

解决找不到msvcr120.dll无法继续执行问题的5个方法,快速解决dll问题

在计算机使用过程中&#xff0c;我们经常会遇到一些错误提示&#xff0c;其中之一就是找不到msvcr120.dll的错误。这个错误通常发生在运行某些程序或游戏时&#xff0c;它会导致程序无法正常启动或运行。那么&#xff0c;如何解决找不到msvcr120.dll的问题呢&#xff1f;下面我…

现在软文发布平台都有哪些?如何在正规媒体发稿?

近年来,随着广告行业竞争愈加激烈,越来越多的企业开始注重软文宣传。软文推广平台是企业在网络上发布软文、传播信息和推广产品的重要工具。 媒介易软文平台介绍更好的品牌宣传和市场推广&#xff1a;软文推广发稿有哪些平台&#xff0c; 软文发稿好方法&#xff1f;软文不仅能…

SpringMVC Day 10 : 拦截器

前言 拦截器是Spring MVC框架提供的一种强大的机制&#xff0c;用于在请求到达控制器之前或之后进行预处理和后处理。它可以拦截并处理请求&#xff0c;对请求进行必要的修改或验证&#xff0c;以及在请求返回给客户端之前进行额外的操作。拦截器可以帮助我们实现各种需求&…

文心一言4.0对比ChatGPT4.0有什么优势?

目录 总结 文心一言4.0的优势 文心一言4.0的劣势 免费分享使用工具 后话 生成式AI的困境 “不会问”“不会用”“不敢信” 为什么要出收费版本&#xff1f; 目前使用过国内的文心一言3.5和WPS AI&#xff0c;国外的ChatGPT4.0。 文心一言和其他国内产品相比&#xff0…

Windows 开启 Kerberos 的火狐 Firefox 浏览器访问yarn、hdfs

背景&#xff1a;类型为IPA或者MIT KDC&#xff0c;windows目前只支持 firefoxMIT Kerberos客户端的形式&#xff0c;其他windows端浏览器IE、chrome、edge&#xff0c;没有办法去调用MIT Kerberos Windows客户端的GSSAPI验证方式&#xff0c;所以均无法使用 Windows 开启 Kerb…

stm32 模拟I2C

目录 简介 I2C 物理层 协议层 ①②&#xff1a;起始信号和结束信号 ③ 应答和非应答信号 ④数据有效性 ⑤数据传输 ⑥空闲状态 简介 I2C 物理层 一个 I2C 总线两条线组成&#xff0c;一个双向串行数据线SDA用来表示数据&#xff0c;一个串行时钟线SCL用于数据收发同步…

进口跨境电商商城源码(海关179接口+支持多平台搭建+提供多终端支持)

海关179接口 进口跨境电商商城源码提供了与海关179接口的集成&#xff0c;实现了便捷的报关操作。海关179接口是跨境电商进口的关键链接&#xff0c;通过该接口可以快速准确地完成商品的报关手续。进口商可以通过商城源码直接与海关进行数据交互&#xff0c;减少了繁琐的人工操…

taro微信小程序不支持热重新的一种解决思路

使用taro开发我觉得是挺好的&#xff0c;但是也有一个缺点&#xff0c;就是暂时使用不了微信的热重载功能&#xff0c;每次更新代码之后&#xff0c;都要手动重新编译才可以看到效果&#xff0c;这就很麻烦了&#xff0c;所以一种解决思路就是先开发h5的页面&#xff0c;因为h5…

【力扣】2127. (分类讨论 + 拓扑排序)参加会议的最多员工数

【力扣】2127. &#xff08;分类讨论 拓扑排序&#xff09;参加会议的最多员工数 文章目录 【力扣】2127. &#xff08;分类讨论 拓扑排序&#xff09;参加会议的最多员工数1. 题目介绍2. 思路&#xff08;**分类讨论 拓扑排序**&#xff09;3. 解题代码4. Danger参考 1. 题…

为什么时间跟踪对企业和员工很重要?

时间是每个企业主最宝贵的资产。如果员工不能正确管理自己的时间&#xff0c;就会出现延误&#xff0c;项目也会超出预算。 为了让员工获得公平的时间补偿&#xff0c;就必须记录他们的工作时间。工时管理系统可以帮助企业和员工更好地组织工作、提高效率和生产力&#xff0c;…

100量子比特启动实用化算力标准!玻色量子重磅发布相干光量子计算机

2023年5月16日&#xff0c;北京玻色量子科技有限公司&#xff08;以下简称“玻色量子”&#xff09;在北京正大中心成功召开了2023年首场新品发布会&#xff0c;重磅发布了自研100量子比特相干光量子计算机——“天工量子大脑”。 就在3个月前&#xff0c;因“天工量子大脑”在…