Spark流水线+Gravitino+Marquez数据血缘采集

news2025/6/9 6:17:15

1.Openlinage和Marquez简介

1.1 OpenLineage

概述
  • OpenLineage 是一个开放标准框架,用于跨工具、平台和系统捕获数据血缘信息。
  • 它定义了通用的数据血缘模型和API,允许不同的数据处理工具(如ETL、调度器、数据仓库)以标准化格式生成血缘元数据。
  • 由Linux基金会托管,社区驱动,支持广泛的集成。
核心功能
  • 标准化元数据收集:通过统一的规范(基于JSON Schema)描述数据血缘,包括作业(Job)、数据集(Dataset)和运行(Run)等实体。
  • 跨工具集成:支持与Airflow、Spark、dbt、Great Expectations等流行数据工具的集成。
  • 可扩展性:允许用户自定义提取器(Extractors)或适配器来兼容其他工具。
典型应用场景
  • 数据治理(如合规性审计)。
  • 故障排查(追踪数据错误来源)。
  • 影响分析(评估上游变更对下游的影响)。

1.2. Marquez

概述
  • Marquez 是OpenLineage的参考实现,是一个开源元数据服务,专为数据血缘和元数据管理设计。
  • 由WeWork团队最初开发,现由社区维护,与OpenLineage深度集成。
  • 提供Web UI和API,用于存储、查询和可视化血缘信息。
核心功能
  • 元数据存储:持久化存储OpenLineage格式的血缘数据(使用PostgreSQL或兼容的数据库)。
  • 血缘可视化:通过Web界面展示数据集、作业和依赖关系的图谱。
  • API支持:提供REST API供其他系统访问或写入元数据。
  • 与OpenLineage生态集成:自动接收来自支持OpenLineage的工具(如Airflow)的血缘事件。
架构组成
  • API服务:处理血缘事件的摄入和查询。
  • Web UI:交互式查看血缘关系。
  • 后端数据库:存储元数据。

如果需要进一步了解部署或集成细节,可以参考它们的官方文档:

  • OpenLineage官网
  • Marquez GitHub

2.Gravitino血缘配置

Gravitino血缘事件采集后,默认是输出到日志,如果需要处理,可以实现org.apache.gravitino.lineage.sink.LineageSink进行扩展。

本文便实现此接口,通过http接口将血缘事件发送到Marquez,进行血缘的存储和展示。

package org.apache.gravitino.lineage.sink;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.openlineage.server.OpenLineage.RunEvent;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.server.web.ObjectMapperProvider;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.classic.methods.HttpPut;
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LineageHttpSink implements LineageSink {

  private static final Logger LOG = LoggerFactory.getLogger(LineageHttpSink.class);

  private String url;
  private String endpoint;
  private String method;
  private Map<String, String> headers;
  private CloseableHttpClient httpClient;
  private ObjectMapper objectMapper;
  private int retryCount;
  private long retryDelayMs;

  @Override
  public void initialize(Map<String, String> configs) {
    this.url = configs.get("http.url");
    this.endpoint = configs.getOrDefault("http.endpoint", "/api/v1/lineage");
    this.method = configs.getOrDefault("http.method", "POST");
    this.retryCount = Integer.parseInt(configs.getOrDefault("http.retry.count", "3"));
    this.retryDelayMs = Long.parseLong(configs.getOrDefault("http.retry.delay", "1000"));

    this.headers = parseHeaders(configs.getOrDefault("http.headers", ""));

    this.httpClient = HttpClients.createDefault();
    this.objectMapper = ObjectMapperProvider.objectMapper();

    LOG.info("Initialized HTTP sink with URL: {}{}", url, endpoint);
  }

  @Override
  @SuppressWarnings("deprecation")
  public void sink(RunEvent event) {
    String fullUrl = url + endpoint;

    for (int attempt = 0; attempt <= retryCount; attempt++) {
      try {
        String jsonPayload = objectMapper.writeValueAsString(event);

        HttpUriRequestBase request = createHttpRequest(fullUrl, jsonPayload);
        headers.forEach(request::setHeader);

        try (CloseableHttpResponse response = httpClient.execute(request)) {
          int statusCode = response.getCode();

          if (isSuccessResponse(statusCode)) {
            LOG.debug("Successfully sent lineage event to {}", fullUrl);
            return;
          } else {
            LOG.warn("HTTP request failed with status {}", statusCode);
          }
        }

      } catch (Exception e) {
        LOG.warn(
            "Attempt {} failed to send lineage event to {}: {}",
            attempt + 1,
            fullUrl,
            e.getMessage());

        if (attempt == retryCount) {
          LOG.error("Failed to send lineage event after {} attempts", retryCount + 1, e);
          return;
        }
      }

      if (attempt < retryCount) {
        try {
          Thread.sleep(retryDelayMs * (attempt + 1));
        } catch (InterruptedException ie) {
          Thread.currentThread().interrupt();
          return;
        }
      }
    }
  }

  private HttpUriRequestBase createHttpRequest(String url, String jsonPayload) {
    HttpUriRequestBase request;

    switch (method.toUpperCase()) {
      case "POST":
        request = new HttpPost(url);
        break;
      case "PUT":
        request = new HttpPut(url);
        break;
      default:
        throw new IllegalArgumentException("Unsupported HTTP method: " + method);
    }

    StringEntity entity = new StringEntity(jsonPayload, ContentType.APPLICATION_JSON);
    request.setEntity(entity);

    return request;
  }

  private Map<String, String> parseHeaders(String headersString) {
    Map<String, String> headerMap = new HashMap<>();
    if (StringUtils.isNotBlank(headersString)) {
      String[] pairs = headersString.split(",");
      for (String pair : pairs) {
        String[] keyValue = pair.split(":", 2);
        if (keyValue.length == 2) {
          headerMap.put(keyValue[0].trim(), keyValue[1].trim());
        }
      }
    }
    return headerMap;
  }

  private boolean isSuccessResponse(int statusCode) {
    return statusCode >= 200 && statusCode < 300;
  }

  @Override
  public void close() {
    if (httpClient != null) {
      try {
        httpClient.close();
      } catch (IOException e) {
        LOG.warn("Error closing HTTP client", e);
      }
    }
    LOG.info("HTTP sink closed");
  }
}

gravitino.conf中添加以下配置

gravitino.lineage.source=http
gravitino.lineage.sinks=log,openlineage  
gravitino.lineage.openlineage.sinkClass=org.apache.gravitino.lineage.sink.LineageHttpSink  
gravitino.lineage.openlineage.http.url=http://127.0.0.1:5000
gravitino.lineage.openlineage.http.endpoint=/api/v1/lineage  
gravitino.lineage.openlineage.http.method=POST  
gravitino.lineage.openlineage.http.headers=Content-Type:application/json
gravitino.lineage.openlineage.http.retry.count=3  
gravitino.lineage.openlineage.http.retry.delay=1000

其中gravitino.lineage.openlineage.http.url填写的是Marquez地址

gravitino.lineage.openlineage.http.endpoint填写的是Marquez接收血缘事件的接口。

3. 集成演示

如需开启血缘采集功能,首先需要下载 Gravitino OpenLineage 插件 jar 并将其放置到 Spark 的类路径中。

(gravitino-openlineage-plugins/spark-plugin at main · datastrato/gravitino-openlineage-plugins)

3.1 访问系统登录页面,输入账号密码完成身份验证。

3.2 创建任务

  • 入口:通过顶部菜单栏选择 任务开发,或通过快捷入口 快速创建任务

  • 任务类型:选择 SparkPipeline

    3.3 配置任务

    点击任务名称,进入任务详情页。任务节点如下

  • Gravatino节点:配置Gravatino连接信息,并设置enableLinagetrue,开起血缘采集

    SQLQuery节点:执行sql查询语句。跨catalog实现联邦查询

      SELECT 
      a.id, 
      a.user_name, 
      b.description 
    FROM local_data_service.dolphinscheduler.t_ds_user a
    left join docker_data_service.dolphinscheduler.t_ds_tenant b
    on a.tenant_id = b.id
    

  • PostgresqlWrite节点:将查询结果写入到Postgres

3.4 运行任务

  • 点击 运行 按钮启动任务。

3.5 查看血缘

3.6 数据查询

🔗 平台体验地址:DataStudio (http://1.94.182.15:8090)

参考链接:

[1] https://github.com/datastrato/gravitino/

[2] https://datastrato.ai/blog/gravitino-unified-metadata-lake/

.6 数据查询

[外链图片转存中…(img-pQb0YwgS-1749396844654)]

[外链图片转存中…(img-hqrYlduK-1749396844654)]

🔗 平台体验地址:DataStudio (http://1.94.182.15:8090)

参考链接:

[1] https://github.com/datastrato/gravitino/

[2] https://datastrato.ai/blog/gravitino-unified-metadata-lake/

[3] Apache Gravitino Spark connector | Apache Gravitino

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2405044.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于微信小程序的车位共享平台的设计与实现源码数据库文档

摘 要 近年来&#xff0c;随着国民经济的飞速发展&#xff0c;城镇化进程的步伐加快&#xff0c;城市人口急剧增长&#xff0c;人们的生活水平持续改善&#xff0c;特别是大中型城市&#xff0c;城市的交通规模日益增大&#xff0c;汽车的保有量不断提高&#xff0c;然而城市的…

多模态大语言模型arxiv论文略读(111)

SEA: Supervised Embedding Alignment for Token-Level Visual-Textual Integration in MLLMs ➡️ 论文标题&#xff1a;SEA: Supervised Embedding Alignment for Token-Level Visual-Textual Integration in MLLMs ➡️ 论文作者&#xff1a;Yuanyang Yin, Yaqi Zhao, Yaji…

怎么让自己ip显示外省?一文说清操作

在互联网时代&#xff0c;IP地址不仅关联网络连接&#xff0c;还可能影响IP属地显示。那么&#xff0c;手机和电脑用户怎么让自己IP显示外省&#xff1f;一文说清操作要点。 ‌ 二、4种主流方法详解 要让自己的IP显示为外省地址&#xff0c;主要有以下几种方法&#xff1a; …

【Docker】容器安全之非root用户运行

【Docker】容器安全之非root用户运行 1. 场景2. 原 Dockerfile 内容3. 整改结果4. 非 root 用户带来的潜在问题4.1 文件夹读写权限异常4.2 验证文件夹权限 1. 场景 最近有个项目要交付&#xff0c;第三方测试对项目源码扫描后发现一个问题&#xff0c;服务的 Dockerfile 都未指…

汽车车载软件平台化项目规模颗粒度选择的一些探讨

汽车进入 SDV 时代后&#xff0c;车载软件研发呈现出开源生态构建、电子架构升级、基础软件标准化、本土供应链崛起、AI 原生架构普及、云边协同开发等趋势&#xff0c;这些趋势促使车载软件研发面临新挑战&#xff0c;如何构建适应这些变化的平台化架构成为车企与 Tier 1 的战…

【八股消消乐】构建微服务架构体系—服务注册与发现

&#x1f60a;你好&#xff0c;我是小航&#xff0c;一个正在变秃、变强的文艺倾年。 &#x1f514;本专栏《八股消消乐》旨在记录个人所背的八股文&#xff0c;包括Java/Go开发、Vue开发、系统架构、大模型开发、具身智能、机器学习、深度学习、力扣算法等相关知识点&#xff…

掌握Git核心:版本控制、分支管理与远程操作

前言 无论热爱技术的阅读者你是希望掌握Git的企业级应用&#xff0c;能够深刻理解Git操作过程及操作原理&#xff0c;理解工作区暂存区、版本库的含义&#xff1b;还是想要掌握Git的版本、分支管理&#xff0c;自由的进行版本回退、撤销、修改等Git操作方式与背后原理和通过分…

c#,Powershell,mmsys.cpl,使用Win32 API展示音频设备属性对话框

常识&#xff08;基础&#xff09; 众所周知&#xff0c;mmsys.cpl使管理音频设备的控制面板小工具&#xff0c; 其能产生一个对话框&#xff08;属性表&#xff09;让我们查看和修改各设备的详细属性&#xff1a; 在音量合成器中单击音频输出设备的小图标也能实现这个效果&a…

STM标准库-TIM旋转编码器

文章目录 一、编码器接口1.1简介1.2正交编码器1.3编码器接口基本结构**1. 模块与 STM32 配置的映射关系****2. 设计实现步骤&#xff08;核心流程&#xff09;****① 硬件规划****② 时钟使能****③ GPIO 配置&#xff08;对应架构图 “GPIO” 模块&#xff09;****④ 时基单元…

【原创】基于视觉模型+FFmpeg+MoviePy实现短视频自动化二次编辑+多赛道

AI视频处理系统功能总览 &#x1f3af; 系统概述 这是一个智能短视频自动化处理系统&#xff0c;专门用于视频搬运和二次创作。系统支持多赛道配置&#xff0c;可以根据不同的内容类型&#xff08;如"外国人少系列"等&#xff09;应用不同的处理策略。 &#x1f3d…

C++----剖析list

前面学习了vector和string&#xff0c;接下来剖析stl中的list&#xff0c;在数据库中学习过&#xff0c;list逻辑上是连续的&#xff0c;但是存储中是分散的&#xff0c;这是与vector这种数组类型不同的地方。所以list中的元素设置为一个结构体&#xff0c;将list设计成双向的&…

纳米AI搜索与百度AI搜、豆包的核心差异解析

一、技术定位与设计目标 1、纳米AI搜索&#xff1a;轻量化边缘计算导向
专注于实时数据处理与资源受限环境下的高效响应&#xff0c;通过算法优化和模型压缩技术&#xff0c;实现在物联网设备、智能终端等低功耗场景的本地化部署。其核心优势在于减少云端依赖&#xff0c;保障…

不到 2 个月,OpenAI 火速用 Rust 重写 AI 编程工具。尤雨溪也觉得 Rust 香!

一、OpenAI 用 Rust 重写 Codex CLI OpenAI 已用 Rust 语言重写了其 AI 命令行编程工具 Codex CLI&#xff0c;理由是此举能提升性能和安全性&#xff0c;同时避免对 Node.js 的依赖。他们认为 Node.js “可能让部分用户感到沮丧或成为使用障碍”。 Codex 是一款实验性编程代理…

Python60日基础学习打卡Day46

一、 什么是注意力 注意力机制的由来本质是从onehot-elmo-selfattention-encoder-bert这就是一条不断提取特征的路。各有各的特点&#xff0c;也可以说由弱到强。 其中注意力机制是一种让模型学会「选择性关注重要信息」的特征提取器&#xff0c;就像人类视觉会自动忽略背景&…

WEB3全栈开发——面试专业技能点P1Node.js / Web3.js / Ethers.js

一、Node.js 事件循环 Node.js 的事件循环&#xff08;Event Loop&#xff09;是其异步编程的核心机制&#xff0c;它使得 Node.js 可以在单线程中实现非阻塞 I/O 操作。 &#x1f501; 简要原理 Node.js 是基于 libuv 实现的&#xff0c;它使用事件循环来处理非阻塞操作。事件…

Vscode下Go语言环境配置

前言 本文介绍了vscode下Go语言开发环境的快速配置&#xff0c;为新手小白快速上手Go语言提供帮助。 1.下载官方Vscode 这步比较基础&#xff0c;已经安装好的同学可以直接快进到第二步 官方安装包地址&#xff1a;https://code.visualstudio.com/ 双击一直点击下一步即可,记…

Go语言--语法基础5--基本数据类型--输入输出(1)

I : input 输入操作 格式化输入 scanf O &#xff1a; output 输出操作 格式化输出 printf 标准输入 》键盘设备 》 Stdin 标准输出 》显示器终端 》 Stdout 异常输出 》显示器终端 》 Stderr 1 、输入语句 Go 语言的标准输出流在打印到屏幕时有些参数跟别的语言…

永磁同步电机无速度算法--自适应龙贝格观测器

一、原理介绍 传统龙伯格观测器&#xff0c;在设计观测器反馈增益矩阵K时&#xff0c;为简化分析与设计&#xff0c;根据静止两相坐标系下的对称关系&#xff0c;只引入了K、K,两个常系数&#xff0c;且在实际应用时&#xff0c;大多是通过试凑找到一组合适的反馈增益系数缺乏…

LangChain工具集成实战:构建智能问答系统完整指南

导读&#xff1a;在人工智能快速发展的今天&#xff0c;如何构建一个既能理解自然语言又能调用外部工具的智能问答系统&#xff0c;成为许多开发者面临的核心挑战。本文将为您提供一套完整的解决方案&#xff0c;从LangChain内置工具包的基础架构到复杂系统的工程实践。 文章深…

【razor】x264 在 的intra-refresh和IDR插帧

你提到的是这样一个情况: 使用 DirectShow 采集,帧率稳定(如回调了20帧)使用 x264 的 total intra refresh 模式(intra-refresh=1) 进行编码但编码过程中「隔几十秒才有一帧intra(关键帧)」这不正常,具体分析如下: 🎯 一、问题核心 x264 的 intra refresh 模式(特…