精简数据管道:如何使用 PySpark 和 WhyLogs 进行高效的数据分析和验证

news2026/5/5 14:37:55
原文towardsdatascience.com/streamline-data-pipelines-how-to-use-whylogs-with-pyspark-for-data-profiling-and-validation-544efa36c5ad?sourcecollection_archive---------3-----------------------#2024-01-07https://medium.com/sarbahi.sarthak?sourcepost_page---byline--544efa36c5ad--------------------------------https://towardsdatascience.com/?sourcepost_page---byline--544efa36c5ad-------------------------------- Sarthak Sarbahi·发表于Towards Data Science ·阅读时间9 分钟·2024 年 1 月 7 日–https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/30d8f37b895178790b25f83e1ab4662b.png图片来自Evan Dennis提供的Unsplash数据管道由数据工程师或机器学习工程师创建不仅仅是为了准备报告数据或训练模型。确保数据的质量同样至关重要。如果数据随着时间变化你可能会得到意料之外的结果这样是不好的。为了避免这种情况我们常常使用数据分析和数据验证技术。数据分析为我们提供关于数据集中不同列的统计信息。数据验证检查是否存在错误将实际数据与预期数据进行比较。一个很棒的工具是whylogs。它可以让你记录各种数据。记录后你可以创建whylogs 配置文件。这些配置文件帮助你跟踪数据的变化设置规则以确保数据的正确性并以简便的方式展示汇总统计数据。在这篇博客中你将学习如何将 whylogs 与 PySpark 配合使用。我们将通过一个实践指南来讲解如何进行数据分析和验证。让我们开始吧目录whylogs 的组件环境设置理解数据集开始使用 PySpark使用 whylogs 进行数据分析使用 whylogs 进行数据验证whylogs 的组件让我们首先理解 whylogs 的重要特性。数据记录whylogs 的核心是其记录数据的功能。可以把它想象成记录数据特征的详细日记。它会记录数据的各个方面比如有多少行、每列的值范围以及其他统计细节。Whylogs 简介一旦数据被记录whylogs 会创建“简介”。这些简介就像是数据的快照概括了数据的情况。它们包括诸如平均值、计数和分布等统计信息。这对于快速理解数据并追踪数据随时间的变化非常有用。数据追踪使用 whylogs你可以追踪数据随时间的变化。这一点非常重要因为数据通常会发生变化上个月的情况可能今天就不再适用。追踪有助于你捕捉到这些变化并理解它们的影响。数据验证Whylogs 允许你设置规则或约束以确保数据符合预期。例如如果你知道某一列应该只有正数你可以为此设置规则。如果某些数据不符合你的规则你将能够发现可能存在的问题。数据可视化通过可视化方式理解数据更加容易。Whylogs 可以创建图形和图表帮助你更清楚地看到数据的动态特别是对于那些不是数据专家的人来说这使得数据更加易于访问。集成Whylogs 支持与多种工具、框架和语言的集成——如 Spark、Kafka、Pandas、MLFlow、GitHub actions、RAPIDS、Java、Docker、AWS S3 等。这些就是我们需要了解的关于 whylogs 的所有信息。如果你想了解更多我鼓励你查看文档。接下来让我们开始为教程设置环境。环境设置我们将在本教程中使用 Jupyter notebook。为了让我们的代码在任何地方都能运行我们将在 Docker 中使用 JupyterLab。这个设置会安装所有所需的库并准备好示例数据。如果你是 Docker 新手并想学习如何设置 Docker请查看这个链接。[## GitHub - sarthak-sarbahi/whylogs-pyspark通过在 GitHub 上创建账户贡献于 sarthak-sarbahi/whylogs-pyspark 的开发。github.com](https://github.com/sarthak-sarbahi/whylogs-pyspark/tree/main?sourcepost_page-----544efa36c5ad--------------------------------)从这里下载示例数据CSV。这些数据将用于数据简介和验证。创建一个data文件夹在项目的根目录下并将 CSV 文件保存到该文件夹中。接下来在相同的根目录下创建一个Dockerfile。本教程的 Dockerfile图片来自作者这个 Dockerfile 是一组创建特定环境的指令用于本教程。我们来逐步解析它第一行FROM quay.io/jupyter/pyspark-notebook告诉 Docker 使用一个现有镜像作为起点。这个镜像是一个已经配置了 PySpark 的 Jupyter notebook。RUN pip install whylogs whylogs[viz] whylogs[spark]这一行是为了向环境中添加必要的库。它使用pip安装whylogs以及其附加功能用于可视化的 (viz) 和用于处理 Spark 的 (spark)。最后一行COPY data/patient_data.csv /home/patient_data.csv是将数据文件移入该环境。它将项目目录中data文件夹下的 CSV 文件patient_data.csv复制到 Docker 环境中的/home/目录下。到目前为止你的项目目录应该是这样的。https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/d46865880372332c87ffd2efd2e6fc44.png在 VS Code 中的项目目录图源作者太棒了现在让我们构建一个 Docker 镜像。为此请在终端中输入以下命令确保你位于项目的根文件夹中。docker build-t pyspark-whylogs.这个命令创建了一个名为pyspark-whylogs的 Docker 镜像。你可以在Docker Desktop应用的“镜像”标签中看到它。https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/b227c143a7d68aa6f704308ea8bb051f.png构建的 Docker 镜像图源作者下一步让我们运行这个镜像来启动 JupyterLab。请在终端中输入另一个命令。docker run-p8888:8888pyspark-whylogs这个命令从pyspark-whylogs镜像启动一个容器。它确保你可以通过计算机的 8888 端口访问 JupyterLab。运行这个命令后你会在日志中看到一个类似于这样的 URLhttp://127.0.0.1:8888/lab?tokenyour_token。点击该链接以打开 JupyterLab Web 界面。https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/a6b6590371d0150b205222e6f1e10cf7.pngDocker 容器日志图源作者太棒了一切已经为使用 whylogs 设置好了。现在让我们了解一下我们将要处理的数据集。理解数据集我们将使用一个关于医院患者的数据集。该文件名为patient_data.csv包含 100k 行数据列包括patient_id每个患者的唯一 ID。记住数据集中可能会出现相同的患者 ID 多次。patient_name患者的姓名。不同的患者可能有相同的名字。height患者的身高单位为厘米。每次就诊时患者的身高都是相同的。weight患者的体重单位为千克。体重总是大于零。visit_date患者就诊的日期格式为YYYY-MM-DD。关于这个数据集的来源别担心。它是由 ChatGPT 创建的。接下来让我们开始编写一些代码。开始使用 PySpark首先在 JupyterLab 中打开一个新的 notebook。记得在开始工作之前保存它。[## whylogs-pyspark/whylogs_pyspark.ipynb at main · sarthak-sarbahi/whylogs-pyspark通过在 GitHub 上创建账户为 sarthak-sarbahi/whylogs-pyspark 项目的开发做出贡献。github.com我们将首先导入所需的库。# Import librariesfromtypingimportAnyimportpysparkfrompyspark.sqlimportSparkSessionimportpyspark.sql.functionsasFfromwhylogs.api.pyspark.experimentalimportcollect_column_profile_viewsfromwhylogs.api.pyspark.experimentalimportcollect_dataset_profile_viewfromwhylogs.core.metrics.condition_count_metricimportConditionfromwhylogs.core.relationsimportPredicatefromwhylogs.core.schemaimportDeclarativeSchemafromwhylogs.core.resolversimportSTANDARD_RESOLVERfromwhylogs.core.specialized_resolversimportConditionCountMetricSpecfromwhylogs.core.constraints.factoriesimportcondition_meetsfromwhylogs.core.constraintsimportConstraintsBuilderfromwhylogs.core.constraints.factoriesimportno_missing_valuesfromwhylogs.core.constraints.factoriesimportgreater_than_numberfromwhylogs.vizimportNotebookProfileVisualizerimportpandasaspdimportdatetime然后我们将设置一个 SparkSession。这让我们能够运行 PySpark 代码。# Initialize a SparkSessionsparkSparkSession.builder.appName(whylogs).getOrCreate()spark.conf.set(spark.sql.execution.arrow.pyspark.enabled,true)之后我们将通过读取 CSV 文件来创建一个 Spark 数据框。我们还会检查它的架构。# Create a dataframe from CSV filedfspark.read.option(header,True).option(inferSchema,True).csv(/home/patient_data.csv)df.printSchema()接下来让我们先看一下数据。我们将查看数据框中的第一行。# First row from dataframedf.show(n1,verticalTrue)既然我们已经查看了数据现在是时候开始使用 whylogs 进行数据分析了。使用 whylogs 进行数据分析为了对数据进行分析我们将使用两个函数。首先是collect_column_profile_views。这个函数为数据框中的每一列收集详细的分析配置。这些配置为我们提供统计信息比如计数、分布等具体取决于我们如何设置 whylogs。# Profile the data with whylogsdf_profilecollect_column_profile_views(df)print(df_profile)数据集中的每一列都会在字典中获取一个ColumnProfileView对象。我们可以检查每列的各种指标比如它们的均值。whylogs 会查看每个数据点并通过统计方式决定该数据点是否与最终计算相关。例如让我们看看height的平均值。df_profile[height].get_metric(distribution).mean.value接下来我们还将直接从数据框中计算均值以进行对比。# Compare with mean from dataframedf.select(F.mean(F.col(height))).show()然而仅仅逐列进行数据分析并不总是足够的。因此我们使用另一个函数collect_dataset_profile_view。这个函数对整个数据集进行分析而不仅仅是单列。我们可以将其与 Pandas 结合分析所有分析指标。# Putting everything togetherdf_profile_viewcollect_dataset_profile_view(input_dfdf)df_profile_view.to_pandas().head()我们还可以将这个分析结果保存为 CSV 文件以备后用。# Persist profile as a filedf_profile_view.to_pandas().reset_index().to_csv(/home/jovyan/patint_profile.csv,headerTrue,indexFalse)/home/jovyan文件夹位于我们的 Docker 容器中来自Jupyter 的 Docker 镜像堆栈包含 Jupyter 应用程序的现成 Docker 镜像。在这些 Docker 设置中‘jovyan’ 是运行 Jupyter 的默认用户。/home/jovyan文件夹是 Jupyter 笔记本通常启动的位置也是您应将文件放置在其中以便在 Jupyter 中访问的地方。就这样我们使用 whylogs 对数据进行分析。接下来我们将探索数据验证。使用 whylogs 进行数据验证对于我们的数据验证我们将执行以下检查patient_id确保没有缺失值。weight确保每个值都大于零。visit_date检查日期是否采用YYYY-MM-DD格式。现在让我们开始吧。whylogs 中的数据验证从数据分析开始。我们可以使用collect_dataset_profile_view函数来创建分析配置就像我们之前看到的那样。然而这个函数通常会创建一个带有标准指标的分析配置比如均值和计数。但如果我们需要检查列中的单个值而不是对比其他约束条件这时就可以使用条件计数指标。它就像是向我们的分析配置中添加了一个自定义的指标。让我们为visit_date列创建一个检查验证每一行。defcheck_date_format(date_value:Any)-bool:date_format%Y-%m-%dtry:datetime.datetime.strptime(date_value,date_format)returnTrueexceptValueError:returnFalsevisit_date_condition{is_date_format:Condition(Predicate().is_(check_date_format))}一旦我们有了条件就将其添加到分析配置中。我们使用标准架构并添加自定义检查。# Create condition count metricschemaDeclarativeSchema(STANDARD_RESOLVER)schema.add_resolver_spec(column_namevisit_date,metrics[ConditionCountMetricSpec(visit_date_condition)])然后我们使用标准指标和我们为visit_date列创建的自定义新指标重新创建了配置文件。# Use the schema to pass to logger with collect_dataset_profile_view# This creates profile with standard metrics as well as condition count metricsdf_profile_view_v2collect_dataset_profile_view(input_dfdf,schemaschema)在我们的配置文件准备好之后我们可以为每一列设置验证检查。builderConstraintsBuilder(dataset_profile_viewdf_profile_view_v2)builder.add_constraint(no_missing_values(column_namepatient_id))builder.add_constraint(condition_meets(column_namevisit_date,condition_nameis_date_format))builder.add_constraint(greater_than_number(column_nameweight,number0))constraintsbuilder.build()constraints.generate_constraints_report()我们还可以使用 whylogs 生成这些检查的报告。# Visualize constraints report using Notebook Profile VisualizervisualizationNotebookProfileVisualizer()visualization.constraints_report(constraints,cell_height300)它将生成一个 HTML 报告显示哪些检查通过哪些失败。https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/92e9b55579654c4f34af3e2310e9ad42.pngwhylogs 约束报告图片来源作者这是我们发现的内容patient_id列没有缺失值。很好一些visit_date值不符合YYYY-MM-DD格式。一些weight值为零。让我们再次检查数据框中的这些发现。首先我们用 PySpark 代码检查visit_date格式。# Validate visit_date columndf \.withColumn(check_visit_date,F.to_date(F.col(visit_date),yyyy-MM-dd))\.withColumn(null_check,F.when(F.col(check_visit_date).isNull(),null).otherwise(not_null))\.groupBy(null_check)\.count()\.show(truncateFalse)---------------|null_check|count|---------------|not_null|98977||null|1023|---------------它显示100,000 行中有 1023 行不符合我们的日期格式。接下来是weight列。# Validate weight columndf \.select(weight)\.groupBy(weight)\.count()\.orderBy(F.col(weight))\.limit(1)\.show(truncateFalse)-----------|weight|count|-----------|0|2039|-----------再次我们的发现与 whylogs 一致。几乎有 2,000 行的权重为零。这也结束了我们的教程。你可以在这里找到本教程的笔记本。结论在本教程中我们介绍了如何在 PySpark 中使用 whylogs。我们首先使用 Docker 准备了环境然后对我们的数据集进行了数据分析和验证。记住这只是开始。Whylogs 提供了更多功能从机器学习中的数据变化数据漂移追踪到实时流中的数据质量检查。我真诚地希望这篇指南对你有所帮助。如果你有任何问题请随时在下面的评论中提出。参考文献本教程的 GitHub 仓库:github.com/sarthak-sarbahi/whylogs-pyspark/tree/mainWhylogs 文档:docs.whylabs.ai/docs/whylogs-overview/GitHub for whylogs:github.com/whylabs/whylogs/tree/mainlinePySpark 中的数据分析:github.com/whylabs/whylogs/blob/mainline/python/examples/integrations/Pyspark_Profiling.ipynbWhylogs 约束在 PySpark 中的使用:github.com/whylabs/whylogs/blob/mainline/python/examples/tutorials/Pyspark_and_Constraints.ipynb

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2585267.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…

网络编程(Modbus进阶)

思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…