Flink-输出算子(Sink)使用

news2025/7/30 5:26:42

5.5 输出算子

5.5.1 概述

  1. print也是一种输出类PrintSinkFunction
    image.png

创建了一个PrintSinkFunction操作,然后调用addSink方法的作为传入参数
image.png

PrintSinkFunction这个类继承自RichSinkFunction富函数类

  1. RichSourceFunction类

image.png

  • 继承了AbstractRichFunction富函数类

因此就可以调用富函数类的声明周期方法,例如open,close,以及获取运行时上下文,运行环境,定义状态等等

  • RichSourceFunction类又实现了SinkFunction这个接口,所以本质上也是SinkFunction

image.png

image.png

  • SinkFunction接口的抽象方法有invoke,传入是value,以及当前的上下文
  1. 如果需要自定义输出算子
    image.png

可以调用DataStream的addSink方法

image.png
然后传入自己实现的SinkFunction

  1. flink提供的第三方系统连接器

image.png

5.5.2 输出到文件

  1. StreamFileSink流失文件输出类
  • 来源

继承RichSinkFunction类,并实现CheckpointedFunction,CheckpointListener(检查点)

image.png

  • 底层

底层将数据写入bucket(桶),桶里面分大小存储分区文件,实现了分布式存储

  • 创建实例

使用Builder构建器构建

image.png

image.png)

RowFormatBuilder是行编码

image.png

BulkFormatBuilder是列存储编码格式

  1. 代码

image.png

public class SinkToFileTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3300L),
                new Event("Alice", "./prod?id=200", 3000L),
                new Event("Bob", "./home", 3500L),
                new Event("Bob", "./prod?id=2", 3800L),
                new Event("Bob", "./prod?id=3", 4200L));


        //2.为了得到并传入SinkFunction,需要构建StreamingFileSink的一个对象
        //调用forRowFormat方法或者forBulkformat方法得到一个DefaultRowFormatBuilder
            //  其中forBulkformat方法前面还有类型参数,以及传参要求一个目录名称,一个编码器
                //写入文件需要序列化,需要定义序列化方法并进行编码转换,当成Stream写入文件
        //然后再使用builder创建实例
        StreamingFileSink<String> streamingFileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"),
                        new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(//指定滚动策略,根据事件或者文件大小新产生文件归档保存
                        DefaultRollingPolicy.builder()//使用builder构建实例
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .withRolloverInterval(TimeUnit.MINUTES.toMinutes(15))//事件间隔毫秒数
                                .withInactivityInterval(TimeUnit.MINUTES.toMinutes(15))//当前不活跃的间隔事件,隔多长事件没有数据到来
                                .build()
                )
                .build();
        //1.写入文件调用addSink()方法,并传入SinkFunction
        stream
                .map(data -> data.toString())//把Event类型转换成String
                .addSink(streamingFileSink);

        env.execute();

    }
}
  • 结果

image.png

5.5.3 输出到kafka

image.png

  1. 代码
public class SinkToKafka {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.从kafka中读取数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","hadoop2:9092");
        properties.setProperty("group.id", "consumer-group");

        DataStreamSource<String> kafkaStream = env.addSource(
                new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));

        //2.用flink进行简单的etl处理转换
        SingleOutputStreamOperator<String> result = kafkaStream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {

                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();

            }
        });

        //3.结果数据写入kafka
            //FlinkKafkaProducer传参borckList,topicid,序列化
        result.addSink(new FlinkKafkaProducer<String>(
                "hadoop2:9092","events",new SimpleStringSchema()));

        env.execute();
    }
}

  1. kafka输出结果
    image.png

5.5.4 输出到redis

  1. 引入依赖
<dependency>
 <groupId>org.apache.bahir</groupId>
 <artifactId>flink-connector-redis_2.11</artifactId>
 <version>1.0</version>
</dependency>
  1. 代码
    image.png
    继承自RichSinkFunction
    image.png

去调构造方法,换入redis集群的配置FlinkJedisConfigBase以及RedisMapper写入命令

image.png

FlinkJedisPoolConfig用这个没毛病,直接继承的FlinkJedisConfigBase

image.png

  1. 代码
public class SinkToRedis {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.输入ClickSource是自定义输入
        DataStreamSource<Event> stream = env.addSource(new ClickSource());


        //2.创建一个jedis连接配置
        //FlinkJedisPoolConfig直接继承的FlinkJedisConfigBase
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop2")
                .build();



        //3.写入redis
        stream.addSink(new RedisSink<>(config,new MyRedisMapper()));

        env.execute();

    }

    //3.自定义类实现 redisMapper接口
    public static class MyRedisMapper implements RedisMapper<Event>{

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET,"clicks");//写入哈希表
        }

        @Override
        public String getKeyFromData(Event data) {
            return data.user;
        }

        @Override
        public String getValueFromData(Event data) {
            return data.url;
        }
    }
}
  1. 结果

image.png

5.5.5 输出到ElasticSearch

  1. 引入依赖
<dependency>
 <groupId>org.apache.flink</groupId> 
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifact
Id>
<version>${flink.version}</version>
</dependency>
  1. 代码
    image.png

image.png

传入参数是List和ElasticsearchSinkFunction


image.png

image.png

public class SinToES {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.输入
        DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3300L),
                new Event("Alice", "./prod?id=200", 3000L),
                new Event("Bob", "./home", 3500L),
                new Event("Bob", "./prod?id=2", 3800L),
                new Event("Bob", "./prod?id=3", 4200L));



        
        //2.定义hosts的列表
        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("hadoop",9200));

        //3.定义ElasticsearchSinkFunction<T>,是个接口,重写process方法
        //向es发送请求,并插入数据
        ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {
            @Override
            //输入,运行上下文,发送任务请求
            public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {
                HashMap<String, String> map = new HashMap<>();
                map.put(element.user, element.url);

                //构建一个indexrequest
                IndexRequest request = Requests.indexRequest()
                        .index("clicks")
                        .type("types")
                        .source(map);

                indexer.add(request);
            }
        };

        //4.写入es
        //传入参数是List<HttpHost>和ElasticsearchSinkFunction<T>
        stream.addSink(new ElasticsearchSink.Builder<>(httpHosts,elasticsearchSinkFunction).build());

        env.execute();

    }
}
  1. 结果

image.png

image.png

5.5.6 输入到Mysql

  1. 引入依赖
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.47</version>
</dependency>
  1. 代码

image.png

三个参数,sql,JdbcStatementBuilder构造,JdbcConnectionOptions等sql的连接配置
image.png

image.png

单一抽象方法,lambda使用

public class SinkToMysql {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.输入
        DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3300L),
                new Event("Alice", "./prod?id=200", 3000L),
                new Event("Bob", "./home", 3500L),
                new Event("Bob", "./prod?id=2", 3800L),
                new Event("Bob", "./prod?id=3", 4200L));

        //三个参数,sql,JdbcStatementBuilder构造,JdbcConnectionOptions等sql的连接配置
        stream.addSink(JdbcSink.sink(
                "INSERT INTO clicks (user,url) VALUES(?,?)",
                ((statement,event)->{
                    statement.setString(1,event.user);
                    statement.setString(2,event.url);
                }),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/test2")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("123456")
                        .build()
        ));

        env.execute();

    }
}
  1. mysql前期准备
  • 创建mysql的test2
  • 创建clicks表
mysql> create table clicks(
    -> user varchar(20) not null,
    -> url varchar(100) not null);
Query OK, 0 rows affected (0.02 sec)
  1. 结果
    image.png

5.5.7 自定义Sink输出

  1. 分析

调用DataStream的addSink()方法,并传入自定义好的SinkFunction(采用富函数类),重写关键方法invoke(),并且重写富函数类的生命周期相关方法open和close

  1. 导入依赖
<dependency>
 <groupId>org.apache.hbase</groupId>
 <artifactId>hbase-client</artifactId>
 <version>${hbase.version}</version>
</dependency>
  1. 代码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/16241.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot2整合开发流程示例

1.整体开发逻辑 初始配置示例 开发文件示例 2.开发过程问题 ①默认配置路径 thymeleaf默认解析静态资源路径为 /templates/** ②mybatisplus核心工作原理 mapper继承的BaseMapper中声明了CRUD操作service接口继承的IService接口中抽象方法简单封装了CRUD操作 传入了类&a…

Win10系统如何安装配置maven

【原文链接】Win10系统如何安装配置maven &#xff08;1&#xff09;若未装jdk&#xff0c;可先参考 Win10系统下载安装配置JDK1.8 安装jdk8 &#xff08;2&#xff09;从 maven下载地址 &#xff0c;下载maven&#xff0c;如下&#xff0c;可以下载最新版本&#xff0c;也可…

【JVM】JVM详解

目录一.JVM概述1.jvm简介2.jvm作用3.jvm的内存模型二.类加载器1.类加载器的作用2.加载器的类型3.双亲委派机制的运行过程(面试题)三.JVM内存模块1.方法区2.堆3.栈(虚拟机栈)4.栈(本地方法栈)5.OutOfMemoryError内存溢出和StackOverFlowError栈溢出及解决方法(面试题)(1).OutOfM…

利用ogg微服务版将oracle同步到kafka

ogg微服务版可以再界面上配置抽取、复制进程&#xff0c;不必进入到shell中进行配置&#xff0c;并且图形化界面可以看到更多信息。 系统架构 源端安装ogg for oracle 19C , 目标端安装ogg for bigdata 21C kafka 2.2 数据库&#xff1a;19C 所有软件安装在同台服务器上&#…

操作系统4小时速成:文件管理,文件结构,属性,基本操作,逻辑有无结构,目录结构,文件系统

操作系统4小时速成&#xff1a;文件管理&#xff0c;文件结构&#xff0c;属性&#xff0c;基本操作&#xff0c;逻辑有无结构&#xff0c;目录结构&#xff0c;文件系统 2022找工作是学历、能力和运气的超强结合体&#xff0c;遇到寒冬&#xff0c;大厂不招人&#xff0c;可能…

2397. 被列覆盖的最多行数-深度优先枚举+二进制检索

2397. 被列覆盖的最多行数-深度优先枚举二进制检索 给你一个下标从 0 开始的 m x n 二进制矩阵 mat 和一个整数 cols &#xff0c;表示你需要选出的列数。 如果一行中&#xff0c;所有的 1 都被你选中的列所覆盖&#xff0c;那么我们称这一行 被覆盖 了。 请你返回在选择 co…

Spring Security内部工作原理

定义 Spring 安全性是 Spring提供的一个框架&#xff0c;有助于自定义访问和身份验证过程。它在保护应用程序方面起着非常关键的作用。 Spring 安全性&#xff0c;主要侧重于身份验证和授权&#xff0c;为 Java 应用程序提供所有好处。它非常有用&#xff0c;并提供了一种在实…

抑制细胞代谢紊乱的抑制剂

作者团队发现&#xff0c;缺乏 CD4 T 细胞能保护小鼠免受应激诱导的焦虑样行为&#xff0c;物理应激诱导的白三烯 B4 (LTB4) 触发 CD4 T 细胞中的严重线粒体裂变&#xff0c;进而导致各种行为异常&#xff0c;包括焦虑&#xff0c;抑郁和社交障碍。代谢组和单细胞转录组学显示…

Android App实战项目之实现手写签名APP功能(附源码,简单易懂 可直接实用)

运行有问题或需要源码请点赞关注收藏后评论区留言~~~ 一、跟踪滑动轨迹实现手写签名 手写签名的原理是把手机屏幕当作画板&#xff0c;把用户手指当作画笔&#xff0c;手指在屏幕上划来划去&#xff0c;屏幕就会显示手指的移动轨迹&#xff0c;就像画笔在画板上写字一样&#…

安装free IPA与CDH6.3.2结合

主机名之类的应该在cdh安装的时候就配好了 不再赘述 安装freeipa yum -y install nscd 修改 enable-cache netgroup no enable-cache group no enable-cache passwd no 云主机默认不开启IPv6&#xff0c;根据提…

小 A 的卡牌游戏(Gym - 103186B)

题 小A最近沉迷于-款名为Hearthverse的卡牌游戏。在这款游戏中&#xff0c;卡被分为了三个种类(随从、法术和魔法阵)&#xff0c;在组卡时&#xff0c;这款游戏严格规定了卡组中每种卡牌的数量&#xff0c;具体来说&#xff0c;-副n张卡的卡组需要包含恰好a张随从卡&#xff0…

Tdengine技术实践

1. 什么是时序数据库&#xff1f; 时序数据库全称为时间序列数据库。 即时间序列数据&#xff0c;按时间维度顺序记录且索引的数据。 时间序列数据主要由 电力行业、化工行业、气象行业、地理信息 等各类型实时监测、检查与分析设备所采集、产生的数据&#xff0c;这些工业数…

配置Maven环境

Maven官网 Maven所有发行版本 一、windows配置maven环境 想要使用maven就必须要有JDK JDK安装 百度网盘下载&#xff08;二进制文件&#xff0c;一直点下去&#xff09; 链接&#xff1a;https://pan.baidu.com/s/1y1AutzJeQGdNHa2ml_bk8w 提取码&#xff1a;scyc验证JDK是…

【Java八股文总结】之Java基础

写在前面&#xff1a; 整份Java八股文的整理贯穿我的整个秋招&#xff0c;希望可以给大家带来帮助&#xff0c;如果对你有一定的作用&#xff0c;欢迎大家转发点赞&#xff0c;谢谢&#xff01;在复习Java八股文的过程中&#xff0c;有两位老哥的博文对我帮助比较大&#xff0c…

力扣(LeetCode)10. 正则表达式匹配(C++)

动态规划 基于闫式dp分析法。 综上 , f[i][j]{f[i−1][j−1]&&(s[i]p[j]∣∣′.′p[j])if p[j]≠∗f[i][j−2]∣∣(f[i−1][j]&&(s[i]p[j−1]∣∣′.′p[j−1])if p[j]∗f[i][j] \begin {cases} f[i-1][j-1] ~~\&\& ~~(s[i]p[j] ~||~ .p[j])&…

鲁棒局部均值分解 (RLMD)附Matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

安卓手机丢了,危险了!意外的7万美元的谷歌Pixel绕过锁屏

我发现了一个影响似乎所有谷歌Pixel手机的漏洞&#xff0c;如果你把任何锁定的Pixel设备给我&#xff0c;我可以把它解锁还给你。这个漏洞刚刚在2022年11月5日的安全更新中得到修复。 该问题允许有物理权限的攻击者绕过锁屏保护&#xff08;指纹、PIN等&#xff09;&#xff0c…

英国博士后招聘|林肯大学—植物-土壤相互作用

英国林肯大学博士后职位—植物 -土壤相互作用 林肯大学&#xff08;University of Lincoln&#xff09;&#xff0c;简称“UoL”&#xff0c;是英国一所公立综合性研究型大学&#xff0c;创办于1861年&#xff0c;后与多所大学合并更名为林肯大学。 《独立报》曾评价林肯大学为…

Mysql之性能优化分析

一、避免死锁 1.1、导致mysql死锁的要素 1、两个或者两个以上事务。 2、锁资源只能被同一个事务持有或者多个事务竞争的锁是不兼容的&#xff0c;比如排他锁和共享锁、排他锁和排他锁。 3、每个事务都已经持有锁并且申请新的锁。 4、事务之间因为持有锁和申请锁导致彼此循环等…

Cesium 简介

Cesium 简介 一、Cesium 是什么&#xff1f; Cesium 是一个开源 JavaScript 库&#xff0c;用于 3D , 2D , 2.5D 地图可视化。 Cesium 由 AGI 公司计算机图形开发小组与 2011 年研发的。 Cesium 一词来源于化学元素铯&#xff0c;铯是制造原子钟的关键元素&#xff0c;研发小组…