大数据(9f)Flink双流JOIN

news2025/7/5 17:34:21

文章目录

      • 概述
      • 开发环境
      • 使用状态列表实现 INNER JOIN(双流connect后CoProcessFunction)
      • 基于间隔的JOIN(Interval Join)
      • 基于窗口的JOIN(Window Join)

概述

Flink双流JOIN可用算子或SQL实现,FlinkSQL的JOIN在另一篇讲
算子JOIN中较常用的是intervalJoin

开发环境

WIN10+IDEA

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.14.6</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>2.0.3</slf4j.version>
    <log4j.version>2.17.2</log4j.version>
    <lombok.version>1.18.24</lombok.version>
</properties>
<dependencies>
    <!-- Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>${log4j.version}</version>
    </dependency>
    <!-- 简化JavaBean书写 -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
    </dependency>
</dependencies>

使用状态列表实现 INNER JOIN(双流connect后CoProcessFunction)

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

public class Hello {
    public static void main(String[] args) throws Exception {
        //创建执行环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //创建双流
        DataStreamSource<Tuple2<String, Long>> d1 = env.fromElements(
                Tuple2.of("a", 2L),
                Tuple2.of("a", 3L),
                Tuple2.of("b", 5L)
        );
        DataStreamSource<Tuple2<String, String>> d2 = env.fromElements(
                Tuple2.of("a", "A"),
                Tuple2.of("b", "B"),
                Tuple2.of("c", "C")
        );
        //双流KeyBy
        KeyedStream<Tuple2<String, Long>, String> kd1 = d1.keyBy(t -> t.f0);
        KeyedStream<Tuple2<String, String>, String> kd2 = d2.keyBy(t -> t.f0);
        //connect
        ConnectedStreams<Tuple2<String, Long>, Tuple2<String, String>> c = kd1.connect(kd2);
        //CoProcessFunction<IN1, IN2, OUT>
        c.process(new CoProcessFunction<Tuple2<String, Long>, Tuple2<String, String>, String>() {
            ListState<Tuple2<String, Long>> l1;
            ListState<Tuple2<String, String>> l2;
            @Override
            public void open(Configuration parameters) {
                RuntimeContext r = getRuntimeContext();
                l1 = r.getListState(new ListStateDescriptor<>("L1", Types.TUPLE(Types.STRING, Types.LONG)));
                l2 = r.getListState(new ListStateDescriptor<>("L2", Types.TUPLE(Types.STRING, Types.STRING)));
            }

            @Override
            public void processElement1(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {
                l1.add(value);
                for (Tuple2<String, String> value2 : l2.get()) {
                    out.collect(value + "==>" + value2);
                }
            }

            @Override
            public void processElement2(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
                l2.add(value);
                for (Tuple2<String, Long> value1 : l1.get()) {
                    out.collect(value1 + "==>" + value);
                }
            }
        }).print();
        //流环境执行
        env.execute();
    }
}

基于间隔的JOIN(Interval Join)

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class Hello {
    public static void main(String[] args) throws Exception {
        //创建执行环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //创建双流和时间时间水位线策略
        SingleOutputStreamOperator<U> d1 = env.fromElements(
                new U("a", 3 * 1000L),
                new U("b", 8 * 1000L),
                new U("c", 13 * 1000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<U>forMonotonousTimestamps().withTimestampAssigner(
                (SerializableTimestampAssigner<U>) (element, recordTimestamp) -> element.timestamp));
        SingleOutputStreamOperator<U> d2 = env.fromElements(
                new U("a", 4 * 1000L),
                new U("b", 6 * 1000L),
                new U("b", 7 * 1000L),
                new U("c", 10 * 1000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<U>forMonotonousTimestamps().withTimestampAssigner(
                (SerializableTimestampAssigner<U>) (element, recordTimestamp) -> element.timestamp));
        //键控流
        KeyedStream<U, String> k1 = d1.keyBy(u -> u.id);
        KeyedStream<U, String> k2 = d2.keyBy(u -> u.id);
        //基于间隔进行联合
        k1.intervalJoin(k2).between(Time.seconds(-2L), Time.seconds(1L)).process(
                new ProcessJoinFunction<U, U, String>() {
                    @Override
                    public void processElement(U left, U right, Context ctx, Collector<String> out) {
                        out.collect(left + " ==> " + right);
                    }
                }).print();
        //流环境执行
        env.execute();
    }

    @Data
    @AllArgsConstructor
    public static class U {
        String id;
        Long timestamp;
    }
}

结果
Hello.U(id=a, timestamp=3000) ==> Hello.U(id=a, timestamp=4000)
Hello.U(id=b, timestamp=8000) ==> Hello.U(id=b, timestamp=6000)
Hello.U(id=b, timestamp=8000) ==> Hello.U(id=b, timestamp=7000)

双流JOIN是双向的,下面两种写法是等价的

k1.intervalJoin(k2).between(Time.seconds(-2L), Time.seconds(1L))
k2.intervalJoin(k1).between(Time.seconds(-1L), Time.seconds(2L))

基于窗口的JOIN(Window Join)

窗口JOIN包括滚动窗口、滑动窗口、会话窗口

滚动窗口JOIN

滑动窗口JOIN

会话窗口JOIN

语法

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

下面只展示滚动窗口JOIN

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class Hello {
    public static void main(String[] args) throws Exception {
        //创建执行环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //创建双流和时间时间水位线策略
        SingleOutputStreamOperator<U> d1 = env.fromElements(
                new U("a", 2000L),
                new U("b", 4000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<U>forMonotonousTimestamps().withTimestampAssigner(
                (SerializableTimestampAssigner<U>) (element, recordTimestamp) -> element.timestamp));
        SingleOutputStreamOperator<U> d2 = env.fromElements(
                new U("a", 3999L),
                new U("b", 3999L),
                new U("b", 5999L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<U>forMonotonousTimestamps().withTimestampAssigner(
                (SerializableTimestampAssigner<U>) (element, recordTimestamp) -> element.timestamp));
        //窗口JOIN
        d1
                .join(d2)
                .where(u -> u.id)
                .equalTo(u -> u.id)
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .apply((JoinFunction<U, U, String>) (first, second) -> first + " ==> " + second)
                .print();
        //流环境执行
        env.execute();
    }

    @Data
    @AllArgsConstructor
    public static class U {
        String id;
        Long timestamp;
    }
}

结果
Hello.U(id=a, timestamp=2000) ==> Hello.U(id=a, timestamp=3999)
Hello.U(id=b, timestamp=4000) ==> Hello.U(id=b, timestamp=5999)
(4000和3999不在同一个滚动窗口,4000和5999在同一个滚动窗口)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/39725.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

图论基础学习笔记

图论1.简单图2.简单图的补图3.图的同构4.完全图5.偶图6.完全偶图1.简单图 简单图&#xff1a;无环无平行边的图。下图&#xff1a;左环右平行边 平凡图&#xff1a;G(1,0)G(1,0)G(1,0) 零图&#xff1a;G(p,0)G(p,0)G(p,0) 2.简单图的补图 补图&#xff1a;对于 G(V,E)G(V,…

继电器电路分析-继电器放电时间、反向冲击电压

继电器的应用&#xff0c;相信大家都知道&#xff0c;在电路中只要给它供电、断电也就可以工作了。 然而&#xff0c;它的应用细节&#xff0c;不知道大家有没注意 。下面谈谈我的观点。 01 现在流行的接法 如下图&#xff1a; 图中&#xff0c;继电器的线圈经过Q1作为开关…

《丞相好梦中杀人,我喜梦中听课》(1)密码学入门

前言 &#x1f340;作者简介&#xff1a;被吉师散养、喜欢前端、学过后端、练过CTF、玩过DOS、不喜欢java的不知名学生。 &#x1f341;个人主页&#xff1a;红中 &#x1fad2;每日emo&#xff1a;等我把脸皮磨厚 &#x1f342;专栏地址&#xff1a;网安专栏 今天周日&#xf…

金融开放度指数-世界银行三位数字编码、ISO Alpha-3 Code等多指标数据

1、数据来源&#xff1a;http://web.pdx.edu/~ito/Chinn-Ito_website.htm 2、时间跨度&#xff1a;1970-2019 3、区域范围&#xff1a;全国 4、指标说明&#xff1a; Chinn-Ito指数(KAOPEN)是衡量一个国家资本账户开放程度的指标。 该指数最初是在Chinn和Ito (Journal of D…

并发基础(四):线程池

尺有所短&#xff0c;寸有所长&#xff1b;不忘初心&#xff0c;方得始终。 请关注公众号&#xff1a;星河之码 线程池技术是一种多线程处理形式&#xff0c;将任务添加到队列中&#xff0c;通过线程池中创建出来的现成执行这些任务&#xff0c;省去了创建线程和销毁线程的消耗…

let const var区别

文章目录写在前面1.var关键字1.1 没有块级作用域的概念&#xff0c;有全局作用域、函数作用域的概念1.2 存在变量提升1.3 全局作用域用var声明的变量会挂载到window对象上1.4 同一作用域中允许重复声明1.5 不初始化值默认为undefined2.let关键字2.1 有块级作用域的概念2.2 不存…

HTB靶机:RainyDay

目录介绍主机信息探测网站探测子域名爆破(BurpSuite)目录爆破爆破参数值分析 & 破解hash登录系统反弹shell端口转发内网穿透【很坑】配置socks代理内网扫描换用windows做内网渗透子域名信息收集爆破密钥位置爆破密钥内容JWT攻击进程监控Flag1 & 获取SSH私钥提权python沙…

UE4 通过按键切换不同的HUD

咱们在玩游戏的时候&#xff0c;通常会和界面进行各种各样的互动&#xff0c;而且会动都是在不同的界面上&#xff0c;所以需要在不同的界面上进行切换或者多个HUD重叠显示在一起。 首先创建两个HUD 2.重新定义一个PlayerController类 // Fill out your copyright notice…

被裁后半月面试8家公司无果,凭借这份Java面试指南成功入职阿里

前言 上个月班上的好好的突然被通知"毕业了"&#xff0c;现在工作也确实不好找。之前近一个月面了很多大大小小的公司降薪太严重都没考虑去&#xff0c;最后没办法本来都打算随便去一家了却偶然得到一个阿里的面试机会&#xff0c;足足面了七面&#xff08;我太难了…

前有刘德华,后有腾格尔和光头李进,明星为何都热衷于线上演唱会

2022年11月19日&#xff0c;有着“草原歌神”之称的腾格尔&#xff0c;开启了自己的线上演唱会&#xff0c;并且取得了圆满的成功。其实在此之前&#xff0c;天王刘德华已经在某音平台&#xff0c;两次开启自己的线上演唱会&#xff0c;都受到了非常好的效果。 为什么音乐领域的…

手把手教你定位线上MySQL锁超时问题,包教包会

昨晚我正在床上睡得着着的&#xff0c;突然来了一条短信。 ​什么&#xff1f;线上的订单无法取消&#xff01; 我赶紧登录线上系统&#xff0c;查看业务日志。 ​发现有MySQL锁超时的错误日志。 不用想&#xff0c;肯定有另一个事务正在修改这条订单&#xff0c;持有这条订单的…

【边缘注意:深度多尺度特征】

Learning a Deep Multi-Scale Feature Ensemble and an Edge-Attention Guidance for Image Fusion &#xff08;学习深度多尺度特征集成和图像融合的边缘注意指南&#xff09; 在本文中&#xff0c;我们提出了一种用于红外和可见光图像融合的深度网络&#xff0c;该网络将具…

带你了解什么是Java虚拟机运行时数据区

一、前言 程序都是运行在内存里的&#xff0c;所以对于一门开发语言来说&#xff0c;对于内存的管理都是重中之重的&#xff0c;前有C、C需要开发者管理内存&#xff0c;后有Java的自动内存管理&#xff0c;到如今的内存安全的Rust。 二、运行时数据区概览 Java虚拟机在运行…

PyQt5可视化编程-菜单和工具栏

一、简介 PyQt5 是Digia的一套Qt5与python绑定的应用框架&#xff0c;同时支持2.x和3.x。本教程使用的是3.x。Qt库由Riverbank Computing开发&#xff0c; 是最强大的GUI库之一 &#xff0c;官方网站&#xff1a;www.riverbankcomputing.co.uk/news。 PyQt5是由一系列Python模块…

Allegro上如何让飞线以方框形式显示

Allegro上如何让飞线以方框形式显示 Allegro可以让飞线以方框形式显示,让走线评估更简单,尤其是电源和地,如下图 选择Edit-Property Find选择Nets 选择需要改成方框显示的网络,左边选择Voltage,Value里面输入任意一个数字,比如0或者1,apply 可以看到网络已经显示成…

八.调试的技巧

目录 一.调试 1.何为调试&#xff1f; 2.调试的基本步骤 二.debug和release的介绍 三.Windows环境调试介绍 1.调试环境准备 2.学会快捷键 &#xff08;1&#xff09;F5 &#xff08;2&#xff09;F9 &#xff08;3&#xff09;F10 &#xff08;4&#xff09;F11 &am…

【Java学习】JavaWeb ---- JDBC

文章目录JDBC 快速入门ResultSet数据连接池JDBC 快速入门 下载jar包&#xff08;百度&#xff09;->add as library 代码 package com.ith.jdbc;import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement;public class demo1 {public stati…

HTTPS一定安全吗

https是一种通过计算机网络进行安全通信的传输协议&#xff0c;主要目的是提供对网站服务器的身份认证&#xff0c;保护交换数据的隐私与完整性&#xff0c;但不能说使用htttps就一定绝对的安全。 有一点需要了解的是&#xff0c;使用HTTPS 在内容传输的加密上使用的是对称加密…

使用dreamweaver制作采用DIV+CSS进行布局——美食甜品店铺加盟企业HTML静态网页 ——学生美食网页设计作品静态HTML网页模板源码

&#x1f468;‍&#x1f393;静态网站的编写主要是用HTML DIVCSS JS等来完成页面的排版设计&#x1f469;‍&#x1f393;,常用的网页设计软件有Dreamweaver、EditPlus、HBuilderX、VScode 、Webstorm、Animate等等&#xff0c;用的最多的还是DW&#xff0c;当然不同软件写出的…

C++标准库分析总结(九)——<适配器>

目录 1 适配器简介 2 适配器使用分类 2.1 容器适配器 2.2 函数适配器 2.2.1 常见的函数适配器 2.2.2 bind2nd 2.2.3 not1 2.2.4 bind用法 2.3 迭代器适配器 2.4 X适配器 1.6.1 ostream_iterator 1.6.2 istream_iterator 1 适配器简介 把一个原本存在的东西&#xf…