FlinkSQL中【FULL OUTER JOIN】使用实例分析(坑)

news2025/5/24 1:07:38

Flink版本:flink1.14
最近有【FULL OUTER JOIN】场景的实时数据开发需求,想要的结果是,左右表来了数据都下发数据;左表存在的数据,右表进来可以关联下发(同样,右表存在的数据,左表进来也可以关联下发)。但在实际应用中遇到一些问题。

FlinkSQL demo


CREATE TABLE waybill_extend_kafka (
     mid bigint,
     db string,
     sch string,
     tab string,
     opt string,
     ts bigint,
     ddl string,
     err string,
     src map<string,string>,
     cur map<string,string>,
     cus map<string,string>,
     _proc  as proctime()
) WITH (
  'connector' = 'kafka',
  'topic' = 't1',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
  'format' = 'json'
);

CREATE TABLE package_state_kafka (
     mid bigint,
     db string,
     sch string,
     tab string,
     opt string,
     ts bigint,
     ddl string,
     err string,
     src map<string,string>,
     cur map<string,string>,
     cus map<string,string>,
     _proc  as proctime()
) WITH (
  'connector' = 'kafka',
  'topic' = 't2',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
  'format' = 'json'
);

CREATE TABLE es_dim(
    id                STRING,
    ts                STRING,
    waybill_code      STRING,
    pin               STRING,
    operater_ts       STRING,
    operater_type     STRING,
    is_enable         STRING,
    batch_no          STRING,
    activity_key      STRING,
    p_type            STRING,
    p_name            STRING,
    version            STRING,
    update_time            STRING
)
with (
    'connector' = 'elasticsearch-6',
    'index' = 'es_dim',
    'document-type' = 'es_dim',
    'hosts' = 'http://xxx:9200',
    'format' = 'json'
);

CREATE TABLE es_sink(
     waybill_code      STRING
    ,first_order       STRING -- 新客1,非新客0
    ,extend_update_time            STRING
    ,state       STRING -- 妥投150
    ,package_update_time            STRING
    ,pin              STRING
    ,coupon_use_time      STRING
    ,operater_type    STRING
    ,is_enable        STRING
    ,batch_no         STRING
    ,update_time         STRING
    ,PRIMARY KEY (waybill_code) NOT ENFORCED
)
with (
    'connector' = 'elasticsearch-6',
    'index' = 'es_sink',
    'document-type' = 'es_sink',
    'hosts' = 'http://xxx:9200',
    'format' = 'json',
    'filter.null-value'='true',
    'sink.bulk-flush.max-actions' = '1000',
    'sink.bulk-flush.max-size' = '10mb'
);

CREATE TABLE kafka_sink (
     waybill_code      STRING
    ,first_order       STRING 
    ,extend_update_time            STRING
    ,state       STRING -- 妥投150
    ,package_update_time            STRING
    ,pin              STRING
    ,coupon_use_time      STRING
    ,operater_type    STRING
    ,is_enable        STRING
    ,batch_no         STRING
    ,update_time         STRING
    ,PRIMARY KEY (waybill_code) NOT ENFORCED --注意 确保在 DDL 中定义主键。
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 't3',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

--新客
CREATE view  waybill_extend_temp as
select
    IF(cur['waybill_code'] IS NOT NULL , cur['waybill_code'], src ['waybill_code'])  AS waybill_code,
    IF(cur['data_key'] IS NOT NULL , cur['data_key'], src ['data_key'])  AS data_key,
    IF(cur['create_time'] IS NOT NULL , cur['create_time'], src ['create_time'])  AS create_time,
    opt,
    _proc
FROM waybill_extend_kafka
where UPPER(opt) = 'DELETE' OR UPPER(opt) = 'INSERT';

CREATE view  waybill_extend_temp_handle as
SELECT
    waybill_code,
    case when UPPER(opt) = 'INSERT'  then '1'
         when UPPER(opt) = 'DELETE'  then '0'
            end as first_order,
    create_time,
    _proc
from waybill_extend_temp
where data_key = 'firstOrder';

--妥投
CREATE view package_state_temp as
select
    IF(cur['WAYBILL_CODE'] IS NOT NULL , cur['WAYBILL_CODE'], src ['WAYBILL_CODE'])  AS waybill_code,
    IF(cur['STATE'] IS NOT NULL , cur['STATE'], src ['STATE'])  AS state,
    IF(cur['CREATE_TIME'] IS NOT NULL , cur['CREATE_TIME'], src ['CREATE_TIME'])  AS create_time,
    opt,
    _proc
FROM package_state_kafka
where UPPER(opt) = 'INSERT';

CREATE view package_state_temp_handle as
SELECT
    waybill_code,
    max(state) as state,
    min(create_time) as package_update_time,
    proctime() as _proc
from package_state_temp
where state = '150'
group by waybill_code;

--full join
-- flink1.14 注意:flinksql里面的FULL OUTER JOIN 只是分别下发左右数据,中间状态不关联下发,在流处理场景下相当于union all
CREATE view waybill_extend_package_state  as
SELECT
    COALESCE(a.waybill_code, b.waybill_code) as waybill_code,
    a.first_order,
    a.create_time as extend_update_time,
    b.state,
    b.package_update_time,
    COALESCE(a._proc, b._proc) as _proc
from waybill_extend_temp_handle as a
FULL OUTER JOIN package_state_temp_handle b
on a.waybill_code=b.waybill_code;

--result
CREATE VIEW res_view AS
SELECT
     a.waybill_code
    ,a.first_order
    ,a.extend_update_time
    ,a.state
    ,a.package_update_time
    ,b.pin
    ,b.operater_ts
    ,b.operater_type
    ,b.is_enable
    ,b.batch_no
    ,CAST(CAST(a._proc AS TIMESTAMP(3)) AS STRING) as update_time
    ,row_number() over(partition by a.waybill_code order by b.operater_ts desc) as rn 
from waybill_extend_package_state as a
JOIN es_dim FOR SYSTEM_TIME AS OF a._proc as b
on a.waybill_code=b.waybill_code;

INSERT INTO es_sink
SELECT
     waybill_code
    ,first_order
    ,extend_update_time
    ,state
    ,package_update_time
    ,pin
    ,operater_ts
    ,operater_type
    ,is_enable
    ,batch_no
    ,update_time
FROM res_view
where rn =1;

INSERT INTO kafka_sink
SELECT
     waybill_code
    ,first_order
    ,extend_update_time
    ,state
    ,package_update_time
    ,pin
    ,operater_ts 
    ,operater_type
    ,is_enable
    ,batch_no
    ,update_time
FROM res_view
where rn =1;

es_sink mapping:

POST es_sink/es_sink/_mapping
{
    "es_sink": {
        "properties": {
            "waybill_code": {
                "type": "keyword"
            },
            "pin": {
                "type": "keyword"
            },
            "operater_ts": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            },
            "operater_type": {
                "type": "keyword"
            },
            "is_enable": {
                "type": "keyword"
            },
            "batch_no": {
                "type": "keyword"
            },
            "first_order": {
                "type": "keyword"
            },
            "extend_update_time": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            },
            "state": {
                "type": "keyword"
            },
            "package_update_time": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            },
            "update_time": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            }
        }
    }
}

结果分析

从sink_es和sink_kafka获取数据都是同样的结果,部分结果如下:
在这里插入图片描述
但从结果中可以看出,FlinkSQL里面的【FULL OUTER JOIN】 只是分别下发左右数据,中间状态(从FlinkUI中可以看到【FULL OUTER JOIN】状态也做了保存)不关联下发,在流处理场景下相当于【UNION ALL】,不知是否是FlinkSQL的bug。
【FULL OUTER JOIN】状态数据,如下:
在这里插入图片描述
此次用例分析只是针对于Flink1.14,对于其他版本尚不清楚。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1355916.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

spring见解2基于注解的IOC配置

3.基于注解的IOC配置 学习基于注解的IOC配置&#xff0c;大家脑海里首先得有一个认知&#xff0c;即注解配置和xml配置要实现的功能都是一样的&#xff0c;都是要降低程序间的耦合。只是配置的形式不一样。 3.1.创建工程 3.1.1.pom.xml <?xml version"1.0" en…

C++八股学习心得.4

1.C 类 & 对象 C 在 C 语言的基础上增加了面向对象编程&#xff0c;C 支持面向对象程序设计。类是 C 的核心特性&#xff0c;通常被称为用户定义的类型。 类用于指定对象的形式&#xff0c;它包含了数据表示法和用于处理数据的方法。类中的数据和方法称为类的成员。函数在…

图像融合论文阅读:MURF: Mutually Reinforcing Multi-Modal Image Registration and Fusion

article{xu2023murf, title{MURF: Mutually Reinforcing Multi-modal Image Registration and Fusion}, author{Xu, Han and Yuan, Jiteng and Ma, Jiayi}, journal{IEEE Transactions on Pattern Analysis and Machine Intelligence}, year{2023}, publisher{IEEE} } 论文级别…

Java进阶 1-2 枚举

目录 常量特定方法 职责链模式的枚举实现 状态机模式的枚举实现 多路分发 1、使用枚举类型实现分发 2、使用常量特定方法实现分发 3、使用EnumMap实现分发 4、使用二维数组实现分发 本笔记参考自&#xff1a; 《On Java 中文版》 常量特定方法 在Java中&#xff0c;我们…

数字孪生技术详解

在线工具推荐&#xff1a;3D数字孪生场景编辑器 - GLTF/GLB材质纹理编辑器 - 3D模型在线转换 - Three.js AI自动纹理开发包 - YOLO 虚幻合成数据生成器 - 三维模型预览图生成器 - 3D模型语义搜索引擎 数字孪生技术正在迅速彻底改变企业的运营方式。借助数字孪生技术&#xff0c…

洗地机什么牌子最好?家用洗地机推荐指南

随着人们对健康和卫生的关注日益增长&#xff0c;洗地机成为了现代家庭清洁的必备工具。然而&#xff0c;在市场上琳琅满目的洗地机品牌中&#xff0c;洗地机哪个品牌最好最实用呢?这是消费者最为关心的问题。现本文将为您介绍几个备受推崇的洗地机品牌&#xff0c;帮助您在众…

拖拽式工作流好用吗?有何特点?

大家都知道&#xff0c;随着行业的进步和发展&#xff0c;低代码技术平台也迎来了蓬勃发展期。很多企业喜欢使用低代码实现提质增效的办公效果&#xff0c;拖拽式工作流是其中一个功能&#xff0c;是助力企业实现流程化办公的得力助手。那么&#xff0c;拖拽式工作流好用吗&…

数字信号处理期末复习——计算大题(一)

个人名片&#xff1a; &#x1f981;作者简介&#xff1a;一名喜欢分享和记录学习的在校大学生 &#x1f42f;个人主页&#xff1a;妄北y &#x1f427;个人QQ&#xff1a;2061314755 &#x1f43b;个人邮箱&#xff1a;2061314755qq.com &#x1f989;个人WeChat&#xff1a;V…

【RocketMQ每日一问】RocketMQ中raft的应用?

1.rocketmq中raft算法实现方式 RocketMQ 中实现 Raft 算法的模块是 DLedger&#xff0c;它是一种基于 Raft 协议的分布式日志存储模式&#xff0c;用于提供高可用性和数据一致性的保证&#xff0c;保证消息的可靠性和持久化存储。 在 DLedger 中&#xff0c;每个节点都维护着…

vue3顶部内容固定定位,下面内容可以向上滚动

功能要求&#xff1a;一个div里有两个模块儿&#xff0c;顶部按钮模块儿和下面的内容区域模块儿&#xff0c;顶部按钮模块儿固定在顶部不随滚动条滚动&#xff0c;下面内容区域可以滚动 如图&#xff1a; 思路是&#xff1a; 1、顶部按钮固定定位&#xff0c;会脱离文档流&…

【无标题】一本好书

(https://img-blog.csdnimg.cn/9e3c2302242149e4ac7dbc834bd5e027.jpg)(https://img-blog.csdnimg.cn/3427ed8648ff46bbb496ed512e0aa9cd.jpg1

109-Gradle构建工具的学习

Gradle构建工具的学习 Gradle 简介&#xff1a; Gradle 是一款Google 推出的基于 JVM、通用灵活的项目构建工具&#xff0c;支持 Maven&#xff0c;JCenter 多种第三方仓库&#xff0c;支持传递性依赖管理、废弃了繁杂的xml 文件&#xff0c;转而使用简洁的、支持多种语言&am…

懒加载的el-tree中没有了子节点之后还是有前面icon箭头的展示,如何取消没有子节点之后的箭头显示

没有特别多的数据 <template><el-tree:props"props":load"loadNode"lazyshow-checkbox></el-tree></template><script>export default {data() {return {props: {label: name,children: zones,isLeaf:"leaf",//关…

nginx下upstream模块详解

目录 一&#xff1a;介绍 二&#xff1a;特性介绍 一&#xff1a;介绍 Nginx的upstream模块用于定义后端服务器组&#xff0c;以及与这些服务器进行通信的方式。它是Nginx负载均衡功能的核心部分&#xff0c;允许将请求转发到多个后端服务器&#xff0c;并平衡负载。 在upst…

如潮好评!优秀选手视角下的第二届粤港澳大湾区(黄埔)国际算法算例大赛

为发挥国家实验室作用、推动地区大数据与人工智能算法的生态体系建设&#xff0c;琶洲实验室&#xff08;黄埔&#xff09;受广州市黄埔区政府委托&#xff0c;于 2022 年创办粤港澳大湾区&#xff08;黄埔&#xff09;国际算法算例大赛&#xff0c;推动原始创新、赋能社会经济…

以 Serverfull 方式运行无服务器服务

当前 IT 架构中最流行的用例是从 Serverfull 转向 Serverless 设计。在某些情况下&#xff0c;我们可能需要以 Serverfull 方式设计服务或迁移到 Serverfull 作为运营成本的一部分。 在本文中&#xff0c;我们将展示如何将 Kumologica flow 作为 Docker 容器运行。通常&#x…

力扣322. 零钱兑换(java语言实现 完全背包问题)

Problem: 322. 零钱兑换 文章目录 题目描述思路解题方法复杂度Code 题目描述 思路 该题目可以归纳为完全背包问题&#xff0c;最少需要多少物品能填满背包。该类问题大体思路如下 状态&#xff1a; int dp[ n n n][ w 1 w 1 w1] (其中 n n n表示有 n n n个物品&#xff0c; …

Python常用模块之hashlib

常用模块 - hashlib模块 一、简介 Python的hashlib提供了常见的摘要算法&#xff0c;如MD5、SHA1、SHA224、SHA256、SHA384、SHA512等算法。 什么是摘要算法呢&#xff1f;摘要算法又称哈希算法、散列算法。它通过一个函数&#xff0c;把任意长度的数据转换为一个长度固定的…

14.用户管理

目录 1、权限表 1、user表 1.用户列 2.权限列 3.安全列 4.资源控制列 2、db表和host 表 1.用户列 2.权限列 3. tables_priv 表和 columns _priv 表 4.procs_priv 表 2、账户管理 1. 登录和退出MySQL服务器 2、创建普通用户&#xff1a; 1.使用CREATE USER语创建…