dolphinscheduler-数据质量-源码分析

news2025/8/16 18:38:33

数据质量工作流程

数据质量运行流程分为2个部分:在web端进行数据质量检测的流程定义,通过dolphinscheduer进行调度,提交到spark计算引擎;spark端负责解析数据质量模型的参数,通过读取数据、执行转换、输出三个步骤,完成数据质量检测任务,工作流程如下图所示。
在这里插入图片描述

在web端进行定义

数据质量定义如下图所示,这里只定义了一个节点。
在这里插入图片描述以一个空值检测的输入参数为例,这个json文件会以字符串形式提交给spark端

{
    "name": "$t(null_check)",
    "env": {
        "type": "batch",
        "config": null
    },
    "readers": [
        {
            "type": "JDBC",
            "config": {
                "database": "ops",
                "password": "***",
                "driver": "com.mysql.cj.jdbc.Driver",
                "user": "root",
                "output_table": "ops_ms_alarm",
                "table": "ms_alarm",
                "url": "jdbc:mysql://192.168.3.211:3306/ops?allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
            }
        }
    ],
    "transformers": [
        {
            "type": "sql",
            "config": {
                "index": 1,
                "output_table": "total_count",
                "sql": "SELECT COUNT(*) AS total FROM ops_ms_alarm"
            }
        },
        {
            "type": "sql",
            "config": {
                "index": 2,
                "output_table": "null_items",
                "sql": "SELECT * FROM ops_ms_alarm WHERE (alarm_time is null or alarm_time = '') "
            }
        },
        {
            "type": "sql",
            "config": {
                "index": 3,
                "output_table": "null_count",
                "sql": "SELECT COUNT(*) AS nulls FROM null_items"
            }
        }
    ],
    "writers": [
        {
            "type": "JDBC",
            "config": {
                "database": "dolphinscheduler3",
                "password": "***",
                "driver": "com.mysql.cj.jdbc.Driver",
                "user": "root",
                "table": "t_ds_dq_execute_result",
                "url": "jdbc:mysql://192.168.3.212:3306/dolphinscheduler3?characterEncoding=utf-8&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false",
                "sql": "select 0 as rule_type,'$t(null_check)' as rule_name,0 as process_definition_id,25 as process_instance_id,26 as task_instance_id,null_count.nulls AS statistics_value,total_count.total AS comparison_value,7 AS comparison_type,3 as check_type,0.95 as threshold,3 as operator,1 as failure_strategy,'hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测' as error_output_path,'2022-11-16 03:40:32' as create_time,'2022-11-16 03:40:32' as update_time from null_count full join total_count"
            }
        },
        {
            "type": "JDBC",
            "config": {
                "database": "dolphinscheduler3",
                "password": "***",
                "driver": "com.mysql.cj.jdbc.Driver",
                "user": "root",
                "table": "t_ds_dq_task_statistics_value",
                "url": "jdbc:mysql://192.168.3.212:3306/dolphinscheduler3?characterEncoding=utf-8&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false",
                "sql": "select 0 as process_definition_id,26 as task_instance_id,1 as rule_id,'ZKTZKDBTRFDKXKQUDNZJVKNX8OIAEVLQ91VT2EXZD3U=' as unique_code,'null_count.nulls'AS statistics_name,null_count.nulls AS statistics_value,'2022-11-16 03:40:32' as data_time,'2022-11-16 03:40:32' as create_time,'2022-11-16 03:40:32' as update_time from null_count"
            }
        },
        {
            "type": "hdfs_file",
            "config": {
                "path": "hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测",
                "input_table": "null_items"
            }
        }
    ]
}

spark端源码分析

DataQualityApplication程序入口

DataQualityApplication#main

public static void main(String[] args) throws Exception {
    //...
//从命令行获取参数
    String dataQualityParameter = args[0];
//   将json参数转为DataQualityConfiguration对象
    DataQualityConfiguration dataQualityConfiguration = JsonUtils.fromJson(dataQualityParameter,DataQualityConfiguration.class);
    //...
//构建 SparkRuntimeEnvironment的参数Config对象
    EnvConfig envConfig = dataQualityConfiguration.getEnvConfig();
    Config config = new Config(envConfig.getConfig());
    config.put("type",envConfig.getType());
    if (Strings.isNullOrEmpty(config.getString(SPARK_APP_NAME))) {
        config.put(SPARK_APP_NAME,dataQualityConfiguration.getName());
    }

    SparkRuntimeEnvironment sparkRuntimeEnvironment = new SparkRuntimeEnvironment(config);
//委托给 DataQualityContext执行
    DataQualityContext dataQualityContext = new DataQualityContext(sparkRuntimeEnvironment,dataQualityConfiguration);
    dataQualityContext.execute();
}

数据质量配置类

public class DataQualityConfiguration implements IConfig {
    @JsonProperty("name")
    private String name; // 名称
    @JsonProperty("env")
    private EnvConfig envConfig; // 环境配置
    @JsonProperty("readers")
    private List<ReaderConfig> readerConfigs; // reader配置
    @JsonProperty("transformers")
    private List<TransformerConfig> transformerConfigs;  // transformer配置
    @JsonProperty("writers")
    private List<WriterConfig> writerConfigs; // writer配置
//...
}

DataQualityContext#execute

从dataQualityConfiguration类中获取 readers、transformers、writers, 委托给SparkBatchExecution

public void execute() throws DataQualityException {
// 将List<ReaderConfig>转为List<BatchReader>
    List<BatchReader> readers = ReaderFactory
            .getInstance()
            .getReaders(this.sparkRuntimeEnvironment,dataQualityConfiguration.getReaderConfigs());
// 将List<TransformerConfig>转为List<BatchTransformer>
    List<BatchTransformer> transformers = TransformerFactory
            .getInstance()
            .getTransformer(this.sparkRuntimeEnvironment,dataQualityConfiguration.getTransformerConfigs());
// 将List<WriterConfig>转为List<BatchWriter>
    List<BatchWriter> writers = WriterFactory
            .getInstance()
            .getWriters(this.sparkRuntimeEnvironment,dataQualityConfiguration.getWriterConfigs());
// spark 运行环境
    if (sparkRuntimeEnvironment.isBatch()) {
// 批模式
        sparkRuntimeEnvironment.getBatchExecution().execute(readers,transformers,writers);
    } else {
// 流模式, 暂不支持
        throw new DataQualityException("stream mode is not supported now");
    }
}

ReaderFactory 类采用了单例和工厂方法的设计模式, 目前支持JDBC和HIVE 的数据源的读取, 对应Reader类HiveReader、JdbcReader
WriterFactory 类采用了单例和工厂方法的设计模式, 目前支持JDBC、HDFS、LOCAL_FILE 的数据源的输出, 对应Writer类 HdfsFileWriter LocalFileWriter JdbcWriter
TransformerFactory 类采用了单例和工厂方法的设计模式,目前仅支持TransformerType.SQL的转换器类型

结合json 可以看出 一个空值检测的reader、tranformer、 writer情况
1个reader : 读取源表数据

3个tranformer: total_count 行总数 、null_items 空值项(行数据) 、null_count (空值数),计算sql 如下
– SELECT COUNT() AS total FROM ops_ms_alarm
– SELECT * FROM ops_ms_alarm WHERE (alarm_time is null or alarm_time = ‘’)
– SELECT COUNT(
) AS nulls FROM null_items

3个writer:
第一个是jdbc writer, 将比较值、统计值 输出t_ds_dq_execute_result 数据质量执行结果表,

SELECT
   //...
    null_count.nulls AS statistics_value,
    total_count.total AS comparison_value,
   //...
    'hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测' AS error_output_path,
   //...
FROM
    null_count
    FULL JOIN total_count

第二个是jdbc writer,将statistics_value写入到表 t_ds_dq_task_statistics_value

SELECT
    //...
    //...
    'null_count.nulls' AS statistics_name,
    null_count.nulls AS statistics_value,
    //...
FROM
    null_count

第3个是hdfs writer,将空值项写入到hdfs 文件目录

{
    "type": "hdfs_file",
    "config": {
        "path": "hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测",
        "input_table": "null_items"
    }
}

目前 DolphinScheduler占不支持实时数据的质量检测。

SparkBatchExecution#execute

public class SparkBatchExecution implements Execution<BatchReader, BatchTransformer, BatchWriter> {
    private final SparkRuntimeEnvironment environment;
    public SparkBatchExecution(SparkRuntimeEnvironment environment) throws ConfigRuntimeException {
        this.environment = environment;
    }
    
    @Override
    public void execute(List<BatchReader> readers, List<BatchTransformer> transformers, List<BatchWriter> writers) {
// 为每一个reader注册输入临时表
        readers.forEach(reader -> registerInputTempView(reader, environment));

        if (!readers.isEmpty()) {
// 取readers列表的第一个reader读取数据集合, reader的实现类有HiveReader、JdbcReader
            Dataset<Row> ds = readers.get(0).read(environment);
            for (BatchTransformer tf:transformers) {
// 执行转换
                ds = executeTransformer(environment, tf, ds);
// 将转换后结果写到临时表
                registerTransformTempView(tf, ds);
            }

            for (BatchWriter sink: writers) {
// 执行将转换结果由writer输出, writer的实现类有JdbcWriter、LocalFileWriter、HdfsFileWriter
                executeWriter(environment, sink, ds);
            }
        }
// 结束
        environment.sparkSession().stop();
    }
}

SparkBatchExecution#registerInputTempView

//注册输入临时表, 临时表表名为OUTPUT_TABLE的名字
    private void registerInputTempView(BatchReader reader, SparkRuntimeEnvironment environment) {
        Config conf = reader.getConfig();
        if (Boolean.TRUE.equals(conf.has(OUTPUT_TABLE))) {// ops_ms_alarm
            String tableName = conf.getString(OUTPUT_TABLE);        
            registerTempView(tableName, reader.read(environment));
        } else {
            throw new ConfigRuntimeException(
                "[" + reader.getClass().getName() + "] must be registered as dataset, please set \"output_table\" config");
        }
    }

调用 Dataset.createOrReplaceTempView方法

private void registerTempView(String tableName, Dataset<Row> ds) {
    if (ds != null) {
        ds.createOrReplaceTempView(tableName);
    } else {
        throw new ConfigRuntimeException("dataset is null, can not createOrReplaceTempView");
    }
}

执行转换executeTransformer

private Dataset<Row> executeTransformer(SparkRuntimeEnvironment environment, BatchTransformer transformer, Dataset<Row> dataset) {
    Config config = transformer.getConfig();
    Dataset<Row> inputDataset;
    Dataset<Row> outputDataset = null;
    if (Boolean.TRUE.equals(config.has(INPUT_TABLE))) {
// 从INPUT_TABLE获取表名
        String[] tableNames = config.getString(INPUT_TABLE).split(",");

// outputDataset合并了inputDataset数据集合
        for (String sourceTableName: tableNames) {
            inputDataset = environment.sparkSession().read().table(sourceTableName);
            if (outputDataset == null) {
                outputDataset = inputDataset;
            } else {
                outputDataset = outputDataset.union(inputDataset);
            }
        }
    } else {
//  配置文件无INPUT_TABLE
        outputDataset = dataset;
    }
// 如果配置文件中配置了TMP_TABLE, 将outputDataset 注册到TempView
    if (Boolean.TRUE.equals(config.has(TMP_TABLE))) {
        if (outputDataset == null) {
            outputDataset = dataset;
        }
        String tableName = config.getString(TMP_TABLE);
        registerTempView(tableName, outputDataset);
    }
//  转换器进行转换
    return transformer.transform(outputDataset, environment);
}

SqlTransformer#transform 最终是使用spark-sql进行处理, 所以核心还是这个sql语句,sql 需要在web端生成好,参加前面的json文件。

public class SqlTransformer implements BatchTransformer {
    private final Config config;
    public SqlTransformer(Config config) {
        this.config = config;
    }
//...
    @Override
    public Dataset<Row> transform(Dataset<Row> data, SparkRuntimeEnvironment env) {
        return env.sparkSession().sql(config.getString(SQL));
    }
}

将数据输出到指定的位置executeWriter

private void executeWriter(SparkRuntimeEnvironment environment, BatchWriter writer, Dataset<Row> ds) {
    Config config = writer.getConfig();
    Dataset<Row> inputDataSet = ds;
    if (Boolean.TRUE.equals(config.has(INPUT_TABLE))) {
        String sourceTableName = config.getString(INPUT_TABLE);
        inputDataSet = environment.sparkSession().read().table(sourceTableName);
    }

    writer.write(inputDataSet, environment);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/33138.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java -- 每日一问:谈谈常用的分布式ID的设计方案?Snowflake是否受冬令时切换影响?

典型回答 首先&#xff0c;我们需要明确通常的分布式 ID 定义&#xff0c;基本的要求包括&#xff1a; 全局唯一&#xff0c;区别于单点系统的唯一&#xff0c;全局是要求分布式系统内唯一。 有序性&#xff0c;通常都需要保证生成的 ID 是有序递增的。例如&#xff0c;在数据…

小体积单输入单输出单电源供电光电隔离转换器0-75mV/0-5V/0-200mV/0-20mA/4-20mA

概述&#xff1a; 导轨安装DIN11 IPO OC系列模拟信号隔离放大器是一种将输入信号隔离放大、转换成按比例输出的直流信号混合集成厚模电路。产品广泛应用在电力、远程监控、仪器仪表、医疗设备、工业自控等需要直流信号隔离测控的行业。此系列产品内部采用了线性光电隔离技术相…

Unity | 以附加模式加载场景,实现多场景叠加及注意事项

1 多场景叠加 Unity 允许多场景叠加&#xff0c;这种叠加包括编辑模式及运行模式 新建两个简单的场景&#xff0c;SampleScene 和 AdditiveScene&#xff0c;设置不同的天空盒及平行光源颜色 SampleScene AdditiveScene 2 编辑模式 2.1 添加场景 在编辑器中的场景资源右键选…

vue draggable怎么用?怎么写一个拖拽的看板?

项目中需要像看板一样的可以拖动的任务队列 如果自己要手写的话会很麻烦&#xff0c;大佬当我没说。市面上目前有很多成熟的组件&#xff0c;本次介绍的这个就是一个轻量级的应用。 sortable.js中文文档 - itxst.com 常规的vue文件使用&#xff0c;可以参考上面的网站。 本次…

C语言习题练习10--指针

1.代码结果 #include <stdio.h> int main() {int arr[] {1,2,3,4,5};short *p (short*)arr;int i 0;for(i0; i<4; i){*(pi) 0;}for(i0; i<5; i){printf("%d ", arr[i]);}return 0; } 正常&#xff1a;0001--00 02--00 03--00 04--00 05 数组内部是倒…

[附源码]java毕业设计医院管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

【转】数据库索引详细介绍

原文链接&#xff1a;https://blog.csdn.net/weixin_41948075/article/details/100009848 索引的定义 索引相当于一本书的目录&#xff0c;通过目录我们可以迅速定位书中要找的内容。MySQL中的索引也是一样&#xff0c;它是一种帮助MySQL高效获取数据的数据结构&#xff08;树…

Docker中php安装redis扩展

第一步&#xff1a;下载redis扩展压缩包 点击下载redis-5.3.7 &#xff0c;其他版本请访问&#xff1a;https://pecl.php.net/package/redis 第二步&#xff1a;加压压缩包 linux下解压&#xff1a;tar -zxvf redis-5.3.4.tgz windows下解压&#xff1a;自己学习 第三步&#…

暴雪网易事件大讨论:Web3游戏未来发展趋势

最近很多小伙伴们询问进群方式&#xff0c;希望能和NFT玩家&#xff0c;链游行家和数字艺术家们交流和学习&#xff0c;直接戳我【ID:cdf0822】就好&#xff01;文末也有联系方式&#x1f618; 11月17日下午&#xff0c;暴雪中国官方直接绕过现任代理商网易直接发布公告称&…

学生HTML个人网页作业作品 使用HTML+CSS+JavaScript个人介绍博客网站 web前端课程设计 web前端课程设计代码 web课程设计

&#x1f389;精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…

【考研数学】概率论与数理统计

这个知识点比较零碎。 文章目录1. 基础知识&#xff1a;连续型总体的最大似然估计法2. 解题技巧&#xff1a;随机变量函数的分布3. 解题技巧&#xff1a;几何数列求和4. 基础知识&#xff1a;切比雪夫不等式5. 基础知识&#xff1a;卡方分布&#xff0c;t分布&#xff0c;F分布…

如何深刻理解RPA?相关知识点看这里!

最近RPA大热&#xff0c;但对于RPA的理解大家都是模模糊糊的&#xff0c;那如何深刻理解呢&#xff1f;需要了解哪些知识点呢&#xff1f;这里我们简单来看看吧&#xff01; 一、RPA定义 RPA简单来说就是借助一些能够自动执行的脚本&#xff0c;以软件自动化方式实现一系列原来…

昔年邢台稻田不下万顷 国稻种芯·中国水稻节:河北谷子收获

昔年邢台稻田不下万顷 国稻种芯中国水稻节&#xff1a;河北谷子收获 新华社记者 骆学峰 摄 河北新闻网讯&#xff08;河北日报记者邢云 通讯员王聚芬&#xff09; 新闻中国采编网 中国新闻采编网 谋定研究中国智库网 中国农民丰收节国际贸易促进会 国稻种芯中国水稻节 中国三…

限制用户上传文件类型

在上传文件时&#xff0c;在表单元素中设置accept属性&#xff0c;这个属性只能与<input type"file">一起使用才生效。 accept的属性值是MIME值&#xff0c;MIME值对应的文件类型可以看这个链接 菜鸟编程-MIME类型l 例如&#xff0c;我只想要用户上传的文件是p…

【雷达干扰】基于matlab速度聚类欺骗式干扰仿真

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

微信预约课程小程序开发_分享微信预约课程小程序的好处

不管是素质拓展还是知识培训&#xff0c;教育机构的人一多&#xff0c;只靠人工的话&#xff0c;容易发生学生约课难&#xff0c;老师排课乱等问题。这时候&#xff0c;就要借助专业的约课系统小程序的帮助啦&#xff01;小程序需自带的营销功能还能帮忙拓客&#xff0c;这下招…

APIMapper 源码解析

git 地址&#xff1a;https://gitee.com/shaokang123/api-mapper 1、ApiMapper 是什么&#xff1f; 将API请求映射到接口上&#xff0c;返回封装的JavaBean数据。所以ApiMapper包括两部分功能&#xff0c; API请求接口映射 JSON数据转JavaBean对象 2、ApiMapper 中使用的设计…

IP-Guard管控应用程序运行有哪几种方式?

有五种方式可以管控应用程序运行&#xff1a; 1、通过进程名称来禁止 管理员直接添加应用程序的名称&#xff0c;如thunder.exe&#xff0c;此时策略是通过字符串匹配的&#xff0c;如果客户端修改了应用程序名称改为thunder123.exe&#xff0c;则策略就无法生效&#xff1b;要…

跟艾文学编程 《零基础入门学Python》Jupyter Notebook安装和使用

作者&#xff1a; 艾文&#xff0c;计算机硕士学位&#xff0c;企业内训讲师和金牌面试官&#xff0c;公司资深算法专家&#xff0c;现就职BAT一线大厂。 邮箱&#xff1a; 1121025745qq.com 内容&#xff1a;跟艾文学编程《零基础入门学Python​​​​​​​》 本节内容 ● Ju…

Kotlin基础入门 - 创建、兼容一个属于自己的Kotlin项目

这应该是我年前就想记录的一个基础入门&#xff0c;但是因为一直比较忙&#xff0c;当时只是做了一个备忘草稿&#xff0c;正文就拖到了现在&#xff0c;趁着有时间&#xff0c;赶紧来帮助一下新入行的朋友… 关于为何我把这篇Blog叫做 创建、兼容一个属于自己的Kotlin项目? 主…