pyspark实践

news2025/6/4 21:04:00

1。pyspark是什么

PySpark 是 Apache Spark 的官方 Python 接口,它使得 Python 开发者能够访问 Spark 的核心功能,如:

  • Spark SQL:用于执行 SQL 查询以及读取数据的库,支持多种数据格式和存储系统。py.qizhen.xyz

  • DataFrame API:提供了一个分布式数据集合,使得数据处理和分析更加直观和高效。py.qizhen.xyz+1CSDN 博客+1

  • MLlib:用于进行机器学习的库。py.qizhen.xyz+1维基百科+1

  • GraphX:用于图形处理的库(在 PySpark 中通过第三方库如 GraphFrames 访问)。py.qizhen.xyz

  • Spark Streaming:用于实时数据流处理的库。py.qizhen.xyz

通过 PySpark,Python 开发者可以方便地进行大规模数据分析和数据挖掘工作,而无需深入了解分布式计算的复杂性。

2.实战

创建和管理 Spark 会话所需的类

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
#从 pyspark.sql.functions 模块中导入所有函数,并将其简写为 F
from pyspark.sql.types import *
#从 pyspark.sql.types 模块中导入所有的数据类型类。
from pyspark.sql.functions import udf
#使用 PySpark 创建和注册用户定义函数的第一步,允许将自定义的 Python 函数应用于 Spark 的数据处理流程中
spark = SparkSession.builder.appName('data_processing').getOrCreate()
#创建或获取一个 SparkSession 实例
schema = StructType().add('user_id',"string").add("country","string").add("browser","string").add("OS","string").add("age","integer")
#创建的 DataFrame 将具有预定义的列名和数据类型,有助于确保数据一致性和便于后续的数据处理操作
df_custom = spark.createDataFrame([("A203",'India',"Chrome","WIN",33),("A201",'China',"Safari","MacOS",35),("A205",'UK',"Mozilla","Linux",25)],schema = schema)
#这行代码使用 PySpark 创建了一个带有指定模式(schema)的 DataFrame。
df_custom.printSchema()
#用于以可读的层次结构格式展示 DataFrame 的结构信息。

df_custom.show()
#将 DataFrame 的前几行数据显示在控制台上

df_na=spark.createDataFrame([("A203",None,"Chrome","WIN",33),("A201",'China',None,"MacOS",35),("A205",'UK',"Mozilla","Linux",25)],schema=schema)
#使用指定的模式(schema)创建一个包含部分缺失值(None)的 DataFrame。

df_na.fillna('0').show()
#将 DataFrame df_na 中所有列的缺失值(null 或 None)替换为字符串 '0',然后以表格形式在控制台上显示前 20 行数据

df_na.fillna({'country':'USA','browser':'Google Chrome'}).show()
#使用 fillna() 方法,将 DataFrame df_na 中 country 列的缺失值替换为 'USA',browser 列的缺失值替换为 'Google Chrome'

df_na.na.drop().show()
#删除包含缺失值的行:df_na.na.drop() 会从 DataFrame df_na 中删除任何包含 null(或 None)值的行。默认情况下,drop() 方法会移除任何列中存在缺失值的行。
df_na.na.drop(subset='country').show()
#删除特定列中包含缺失值的行:df_na.na.drop(subset='country') 会从 DataFrame df_na 中删除 country 列中包含 null 或 None 值的行。

 对csv文件进行处理

df = spark.read.csv("customer_data.csv",header = True, inferSchema=True)
#读取名为 customer_data.csv 的 CSV 文件,并将其加载为 DataFrame
df.count()
查看数量

len(df.columns)
#查看列数
df.columns
#查看列名
df.filter(df['Avg_Salary']>500000).filter(df['Number_of_houses']>2).show()
#筛选

df.where((df['Avg_Salary']>500000)&(df['Number_of_houses']>2)).show()
#where() 是 PySpark DataFrame 的方法,用于根据指定的条件筛选行。它是 filter() 方法的别名,两者功能相同
df.groupBy('Customer_subtype').count().show()
#按客户子类型分组并统计每组数量的常用方法,有助于了解不同客户子类型的分布情况。
for col in df.columns:
    if col!='Avg_Salary':
        print(f" Aggregation for {col}")
        df.groupBy(col).count().orderBy('count',ascending=False).show(truncate=False)
#对 DataFrame df 中除 'Avg_Salary' 列以外的每一列进行分组计数,并按计数降序显示结果。

df.groupBy('Customer_main_type').agg(F.mean('Avg_Salary')).show()
#对 DataFrame df 按照 Customer_main_type 列进行分组,并计算每个主类型的平均薪资:
df.groupBy('Customer_main_type').agg(F.max('Avg_Salary')).orderBy('max(Avg_Salary)',ascending=False).show()
#用于对 DataFrame df 按照 Customer_main_type 列进行分组,并计算每个主类型的最高平均薪资,然后按降序排列结果
df.groupBy('Customer_subtype').agg(F.max('Avg_Salary').alias('max_salary')).orderBy('max_salary',ascending=False).show()
#在 PySpark 中,以下代码用于对 DataFrame df 按照 Customer_subtype 列进行分组,并计算每个子类型的最高平均薪资,然后按降序排列结果
df.groupBy("Customer_subtype").agg(F.collect_set("Number_of_houses")).show() 
#是在 PySpark 中用于按客户子类型分组并收集每组房屋数量的唯一值的常用方法,有助于了解不同客户子类型的房屋数量分布情况。
1. 初始化与数据创建
  • 使用 SparkSession.builder.getOrCreate() 初始化 Spark 环境。

  • 使用 StructType 明确定义 schema。

  • spark.createDataFrame() 构造了两个 DataFrame(一个有缺失值)。

2. DataFrame 基本操作
  • .printSchema():打印 schema 信息。

  • .show():展示数据内容。

  • .fillna():填充缺失值。

  • .na.drop() / .na.drop(subset=...):删除缺失值行。

  • .replace():替换字段值。

  • .drop():删除某一列。

3. CSV 文件读取
  • spark.read.csv(..., header=True, inferSchema=True) 读取并自动推断数据类型。

4. 基础探索与过滤
  • .count() / len(df.columns) / df.columns:了解数据结构。

  • .summary().show():生成描述性统计。

  • .filter() / .where():条件筛选数据。

  • .select():选择列。

5. 分组与聚合操作
  • .groupBy(...).count():分组计数。

  • .groupBy(...).agg(F.mean(...)):分组平均值。

  • .groupBy(...).agg(F.max(...)):分组最大值。

  • .agg(F.collect_set(...)):收集唯一值列表。

  • .orderBy(...):排序显示。

6. 列操作
  • .withColumn("new_col", F.lit(...)):添加常量列。

  • .withColumn("new_col", udf(col)):使用自定义 UDF 添加新列。

7. UDF / Pandas UDF
  • 使用标准 udf() 创建 age 分类函数。

  • 使用 pandas_udf() 实现归一化薪资计算(注释掉了实际调用)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2397053.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【深度学习新浪潮】多模态模型如何处理任意分辨率输入?

多模态模型处理任意分辨率输入的能力主要依赖于架构设计的灵活性和预处理技术的结合。以下是核心方法及技术细节: 一、图像模态的分辨率处理 1. 基于Transformer的可变补丁划分(ViT架构) 补丁化(Patch Embedding): 将图像分割为固定大小的补丁(如1616或3232像素),不…

K-匿名模型

K-匿名模型是隐私保护领域的一项基础技术,防止通过链接攻击从公开数据中重新识别特定个体。其核心思想是让每个个体在发布的数据中“隐匿于人群”,确保任意一条记录至少与其他K-1条记录在准标识符(Quasi-Identifiers, QIDs)上不可…

UE5蓝图暴露变量,在游戏运行时修改变量实时变化、看向目标跟随目标Find Look at Rotation、修改玩家自身弹簧臂

UE5蓝图中暴露变量,类似Unity中public一个变量,在游戏运行时修改变量实时变化 1,添加变量 2,设置变量的值 3,点开小眼睛,此变量显示在编辑器中,可以运行时修改 看向目标跟随目标Find Look at R…

Python-matplotlib中的Pyplot API和面向对象 API

matplotlib中的Pyplot API和面向对象 API Pyplot API(状态机模式)面向对象 API 详解二者差别核心区别方法命名差异注意事项差别举例 🍅 Pyplot API(状态机模式)和面向对象 API 是两种不同的编程接口.🍅 它们…

FastAPI安全认证:从密码到令牌的魔法之旅

title: FastAPI安全认证:从密码到令牌的魔法之旅 date: 2025/06/02 13:24:43 updated: 2025/06/02 13:24:43 author: cmdragon excerpt: 在FastAPI中实现OAuth2密码流程的认证机制。通过创建令牌端点,用户可以使用用户名和密码获取JWT访问令牌。代码示例展示了如何使用Cry…

java对接bacnet ip协议(跨网段方式)

1、环境准备 #maven环境<repositories><repository><id>ias-releases</id><url>https://maven.mangoautomation.net/repository/ias-release/</url></repository></repositories><dependencies><dependency><…

LabVIEW超宽带紧凑场测量系统

采用 LabVIEW 开发超宽带紧凑场测量系统&#xff0c;实现天线方向图、目标雷达散射截面&#xff08;RCS&#xff09;及天线增益的自动化测量。通过品牌硬件设备&#xff0c;优化系统架构&#xff0c;解决传统测量系统在兼容性、数据处理效率及操作便捷性等方面的问题&#xff0…

编译rustdesk,使用flutter、hwcodec硬件编解码、支持Windows 7系统

目录 安装相应的环境安装visual studio安装vpkg安装rust开发环境安装llvm和clang编译源码下载源码使用Sciter作为UI的(已弃用)使用flutter作为UI的(主流)下载flutter sdk桥接静默安装支持Windows 7系统最近某desk免费的限制越来越多,实在没办法,平时远程控制用的比较多,…

ROS机器人和NPU的往事和新知-250602

往事&#xff1a; 回顾一篇五年前的博客&#xff1a; ROS2机器人笔记20-12-04_ros2 移植到vxworks-CSDN博客 里面提及专用的机器人处理器&#xff0c;那时候只有那么1-2款专用机器人处理器。 无关&#xff1a; 01&#xff1a; 每代人的智商和注意力差异是如何出现的-250602-…

【从零开始学习QT】信号和槽

目录 一、信号和槽概述 信号的本质 槽的本质 二、信号和槽的使用 2.1 连接信号和槽 2.2 查看内置信号和槽 2.3 通过 Qt Creator 生成信号槽代码 自定义槽函数 自定义信号 自定义信号和槽 2.4 带参数的信号和槽 三、信号与槽的连接方式 3.1 一对一 &#xff08;1&…

MCP调研

什么是 MCP MCP&#xff08;Model Context Protocol&#xff0c;模型上下文协议&#xff09;&#xff0c;是由 Anthropic 在 2024 年 11 月底推出的开放标准协议&#xff0c;旨在统一大型语言模型&#xff08;LLM&#xff09;与外部数据源、工具的通信方式。MCP 的主要目的在于…

TDengine 运维——巡检工具(定期检查)

背景 TDengine 在运行一段时间后需要针对运行环境和 TDengine 本身的运行状态进行定期巡检&#xff0c;本文档旨在说明如何使用巡检工具对 TDengine 的运行环境进行自动化检查。 安装工具使用方法 工具支持通过 help 参数查看支持的语法 Usage: taosinspect [OPTIONS]Check…

qwen 2.5 并行计算机制:依靠 PyTorch 和 Transformers 库的分布式能力

qwen 2.5 并行计算机制:依靠 PyTorch 和 Transformers 库的分布式能力 完整可运行代码: import torch import torch.nn.functional as F from transformers

MSTNet:用于糖尿病视网膜病变分类的多尺度空间感知 Transformer 与多实例学习方法|文献速递-深度学习医疗AI最新文献

Title 题目 MSTNet: Multi-scale spatial-aware transformer with multi-instance learning for diabetic retinopathy classification MSTNet&#xff1a;用于糖尿病视网膜病变分类的多尺度空间感知 Transformer 与多实例学习方法 01 文献速递介绍 糖尿病视网膜病变&#…

docker运行程序Killed异常排查

问题描述 我最近开发了一个C 多线程程序&#xff0c;测试没有问题&#xff0c;封装docker测试也没有问题&#xff0c;然后提交给客户了&#xff0c;然后在他那边测试有问题&#xff0c;不定时、不定位置异常中断&#xff0c;以前一直认为只要封装了docker就万事大吉&#xff0…

Excel 批量下载PDF、批量下载考勤图片——仙盟创梦IDE

在办公场景中&#xff0c;借助应用软件实现 Excel 批量处理考勤图片、电子文档与 PDF&#xff0c;具有诸多显著优势。 从考勤图片处理来看&#xff0c;通过 Excel 批量操作&#xff0c;能快速提取图片中的考勤信息&#xff0c;如员工打卡时间、面部识别数据等&#xff0c;节省…

PCIe-Error Detection(一)

下表为PCIe协议中给出的错误&#xff1a; 一、可纠正错误&#xff08;Correctable Errors&#xff0c;8种&#xff09;​​ ​​检错机制​​ ​​错误名称​​​​检测层级​​​​触发条件​​​​Receiver Error​​Physical接收端均衡器&#xff08;EQ&#xff09;监测到…

向量空间的练习题目

1.考虑 中的向量x1 和x2 求每一向量的长度 令x3x1x2,求x3的长度&#xff0c;它的长度与x1和x2的和有什么关系&#xff1f; 2.重复练习1&#xff0c;取向量 3.令C为复数集合&#xff0c;定义C上的加法为 (abi)(cdi)(ac)(bd)i 并定义标量乘法为对所有实数a (abi) a bi 证明&…

Leetcode 2123. 使矩阵中的 1 互不相邻的最小操作数

1.题目基本信息 1.1.题目描述 给你一个 下标从 0 开始 的矩阵 grid。每次操作&#xff0c;你可以把 grid 中的 一个 1 变成 0 。 如果一个矩阵中&#xff0c;没有 1 与其它的 1 四连通&#xff08;也就是说所有 1 在上下左右四个方向上不能与其他 1 相邻&#xff09;&#x…

MySQL高可用集群

https://dev.mysql.com/doc/mysql-shell/8.4/en/mysql-innodb-cluster.html 1 什么是MySQL高可用集群 MySQL高可用集群&#xff1a;MySQL InnoDB ClusterInnoDB Cluster是MySQL官方实现高可用读写分离的架构方案&#xff0c;包含以下组件 MySQL Group Replication&#xff1a;简…