SparkSQL基本操作

news2025/12/17 8:53:51

以下是 Spark SQL 的基本操作总结,涵盖数据读取、转换、查询、写入等核心功能:

一、初始化 SparkSession

scala

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()

  .appName("Spark SQL Demo")

  .master("local[*]") // 本地模式(集群用 `spark://host:port`)

  .getOrCreate()

// 导入隐式转换(用于 DataFrame 与 RDD 互转)

import spark.implicits._

 

二、数据读取

1. 读取文件(CSV/JSON/Parquet等)

scala

// 读取 CSV(带表头)

val csvDF = spark.read

  .option("header", "true")

  .option("inferSchema", "true") // 自动推断数据类型

  .csv("路径/文件.csv")

// 读取 JSON

val jsonDF = spark.read.json("路径/文件.json")

// 读取 Parquet(Spark 原生格式,高效)

val parquetDF = spark.read.parquet("路径/文件.parquet")

 

2. 读取数据库(如 MySQL)

scala

val jdbcDF = spark.read.format("jdbc")

  .option("url", "jdbc:mysql://host:port/db")

  .option("dbtable", "表名")

  .option("user", "用户名")

  .option("password", "密码")

  .load()

 

3. 从 RDD 创建 DataFrame

scala

// 示例:RDD 转 DataFrame(通过 case class 推断 Schema)

case class Person(id: Int, name: String, age: Int)

val peopleRDD = spark.sparkContext.parallelize(Seq(Person(1, "Alice", 25), Person(2, "Bob", 30)))

val peopleDF = peopleRDD.toDF() // 自动使用 case class 字段作为列名

 

三、基本数据操作

1. 查看数据

scala

df.show() // 打印前20行(默认)

df.show(false) // 不截断长字符串

df.printSchema() // 查看表结构

df.describe().show() // 统计摘要(均值、计数等)

 

2. 列操作

scala

// 选择列

df.select("name", "age").show()

// 新增列(表达式计算)

import org.apache.spark.sql.functions._

val dfWithNewColumn = df.withColumn("age_plus_1", col("age") + 1)

// 重命名列

val renamedDF = df.withColumnRenamed("old_name", "new_name")

// 删除列

val filteredDF = df.drop("column_to_drop")

 

3. 行过滤与排序

scala

// 过滤行(where/filter 等价)

df.filter(col("age") > 18).show()

df.where("age > 18 AND name LIKE 'A%'").show()

// 排序(asc/desc)

df.orderBy(col("age").desc, "name").show() // 按年龄降序、姓名升序

 

4. 分组与聚合

scala

import org.apache.spark.sql.functions._

// 分组统计(如计算每个年龄段的人数)

df.groupBy("age")

  .agg(

    count("*").alias("count"), // 计数

    avg("score").alias("avg_score") // 平均值

  ).show()

// 窗口函数(如按年龄分区排序)

import org.apache.spark.sql.window.Window

val windowSpec = Window.partitionBy("age").orderBy(col("score").desc)

df.withColumn("rank", rank().over(windowSpec)).show()

 

四、Spark SQL 查询(SQL 语法)

1. 注册临时视图

scala

df.createOrReplaceTempView("people") // 注册为临时视图(会话级)

 

2. 执行 SQL 查询

scala

val sqlResult = spark.sql("""

  SELECT name, age

  FROM people

  WHERE age > 25

  ORDER BY age DESC

""")

sqlResult.show()

 

3. 全局临时视图(跨会话)

scala

df.createGlobalTempView("global_people") // 全局视图,需用 `global_temp.表名` 访问

spark.sql("SELECT * FROM global_temp.global_people").show()

 

五、数据写入

1. 保存为文件

scala

// 保存为 CSV(覆盖模式)

df.write.mode("overwrite") // 模式:overwrite/append/ignore/replace

  .option("header", "true")

  .csv("路径/输出.csv")

// 保存为 Parquet(压缩高效)

df.write.parquet("路径/输出.parquet")

 

2. 写入数据库(如 MySQL)

scala

df.write.format("jdbc")

  .option("url", "jdbc:mysql://host:port/db")

  .option("dbtable", "表名")

  .option("user", "用户名")

  .option("password", "密码")

  .mode("append") // 追加数据

  .save()

 

3. 保存为 Hive 表

scala

df.write.saveAsTable("hive_table") // 需提前启用 Hive 支持(spark.sql.catalogImplementation = hive)

 

六、数据类型与转换

1. 常用数据类型

- 基础类型: IntegerType 、 StringType 、 DoubleType 、 TimestampType 

- 复杂类型: ArrayType 、 MapType 、 StructType (嵌套结构)

 

2. 类型转换

scala

import org.apache.spark.sql.functions._

// 字符串转整数

val castDF = df.withColumn("age_str", col("age").cast("string"))

// 时间格式转换

val timestampDF = df.withColumn("date", to_date(col("timestamp_col"), "yyyy-MM-dd"))

 

七、性能优化技巧

 

1. 使用 Parquet 格式:列式存储,压缩率高,查询更快。

2. 分区表:按日期/类别分区( partitionBy ),减少数据扫描范围。

3. 缓存数据: df.cache()  避免重复计算(适用于多次查询的数据集)。

4. 广播小表: spark.sql.autoBroadcastJoinThreshold  设置小表广播阈值(默认 10MB)。

 

八、停止 SparkSession

scala

spark.stop() // 释放资源

 

通过以上操作,可实现数据的读取、处理、分析和存储。实际应用中可结合业务需求灵活组合函数,或通过 Spark UI( http://localhost:4040 )监控作业执行情况。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2379477.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

20250515配置联想笔记本电脑IdeaPad总是使用独立显卡的步骤

20250515配置联想笔记本电脑IdeaPad总是使用独立显卡的步骤 2025/5/15 19:55 百度:intel 集成显卡 NVIDIA 配置成为 总是用独立显卡 百度为您找到以下结果 ?要将Intel集成显卡和NVIDIA独立显卡配置为总是使用独立显卡,可以通过以下步骤实现?&#xff…

sparkSQL读入csv文件写入mysql

思路 示例 (年龄>18改成>20) mysql的字符集问题 把user改成person “让字符集认识中文”

大涡模拟实战:从区域尺度到街区尺度的大气环境模拟

前言: 随着低空经济的蓬勃发展,无人机、空中出租车等新型交通工具正在重塑我们的城市空间。这场静默的革命不仅带来了经济机遇,更对城市大气环境提出了全新挑战。在距离地面200米以下的城市冠层中,建筑物与大气的复杂相互作用、人…

单目测距和双目测距 bev 3D车道线

单目视觉测距原理 单目视觉测距有两种方式。 第一种,是通过深度神经网络来预测深度,这需要大量的训练数据。训练后的单目视觉摄像头可以认识道路上最典型的参与者——人、汽车、卡车、摩托车,或是其他障碍物(雪糕桶之类&#xf…

Web开发-JavaEE应用SpringBoot栈SnakeYaml反序列化链JARWAR构建打包

知识点: 1、安全开发-JavaEE-WAR&JAR打包&反编译 2、安全开发-JavaEE-SnakeYaml反序列化&链 一、演示案例-WEB开发-JavaEE-项目-SnakeYaml序列化 常见的创建的序列化和反序列化协议 • (已讲)JAVA内置的writeObject()/readObje…

项目复习(2)

第四天 高并发优化 前端每隔15秒就发起一次请求,将播放记录写入数据库。 但问题是,提交播放记录的业务太复杂了,其中涉及到大量的数据库操作:在并发较高的情况下,会给数据库带来非常大的压力 使用Redis合并写请求 一…

UE 材质基础 第一天

课程:虚幻引擎【UE5】材质宝典【初学者材质基础入门系列】-北冥没有鱼啊_-稍后再看-哔哩哔哩视频 随便记录一些 黑色是0到负无穷,白色是1到无穷 各向异性 有点类似于高光,可以配合切线来使用,R G B 相当于 X Y Z轴,切…

学习FineBI

FineBI 第一章 FineBI 介绍 1.1. FineBI 概述 FineBI 是帆软软件有限公司推出的一款商业智能 (Business Intelligence) 产品 。 FineBI 是新一代大数据分析的 BI 工具 , 旨在帮助企业的业务人员充分了解和利用他们的数据 。FineBI 凭借强…

深入剖析某App视频详情逆向:聚焦sig3参数攻克

深入剖析某手App视频详情逆向:聚焦sig3参数攻克 一、引言 在当今互联网信息爆炸的时代,短视频平台如某手,已成为人们获取信息、娱乐消遣的重要渠道。对于技术爱好者和研究人员而言,深入探索其内部机制,特别是视频详情…

【Linux】Linux安装并配置MongoDB

目录 1.添加仓库 2.安装 MongoDB 包 3.启动 MongoDB 服务 4. 验证安装 5.配置 5.1.进入无认证模式 5.2.1创建用户 5.2.2.开启认证 5.2.3重启 5.2.4.登录 6.端口变更 7.卸载 7.1.停止 MongoDB 服务 7.2.禁用 MongoDB 开机自启动 7.3.卸载 MongoDB 包 7.4.删除数…

新电脑软件配置二:安装python,git, pycharm

安装python 地址 https://www.python.org/downloads/ 不是很懂为什么这么多版本 安装windows64位的 这里我是凭自己感觉装的了 然后cmd输入命令没有生效,先重启下? 重启之后再次验证 环境是成功的 之前是输入的python -version 命令输入错误 安装pyc…

数据仓库:企业数据管理的核心引擎

一、数据仓库的由来 数据仓库(Data Warehouse, DW)概念的诞生源于企业对数据价值的深度挖掘需求。在1980年代,随着OLTP(联机事务处理)系统在企业中的普及,传统关系型数据库在处理海量数据分析时显露出明显瓶…

MCU开发学习记录17* - RTC学习与实践(HAL库) - 日历、闹钟、RTC备份寄存器 -STM32CubeMX

名词解释: RTC:Real-Time Clock​ 统一文章结构(数字后加*): 第一部分: 阐述外设工作原理;第二部分:芯片参考手册对应外设的学习;第三部分:使用STM32CubeMX进…

C++中的四种强制转换

static_cast 原型&#xff1a;static_cast<type-id>(expression) type-id表示目标类型&#xff0c;expression表示要转换的表达式 static_cast用于非多态类型的转换&#xff08;静态转换&#xff09;&#xff0c;编译器隐式执行的任何类型转换都可用static_c…

YOLOv2目标检测算法:速度与精度的平衡之道

一、YOLOv2的核心改进&#xff1a;从V1到V2的蜕变 YOLOv2作为YOLO系列的第二代算法&#xff0c;在继承V1端到端、单阶段检测的基础上&#xff0c;针对V1存在的小目标检测弱、定位精度低等问题进行了全方位升级&#xff0c;成为目标检测领域的重要里程碑。 &#xff08;一&am…

利用腾讯云MCP提升跨平台协作效率的实践与探索

一、场景痛点 在当今这个数字化快速发展的时代&#xff0c;跨平台协作成为了许多企业和团队面临的一个重大挑战。随着企业业务的不断拓展&#xff0c;团队成员往往需要利用多种工具和平台进行沟通、协作和管理。这些平台包括但不限于电子邮件、即时通讯工具、项目管理软件、文…

【Vue篇】数据秘语:从watch源码看响应式宇宙的蝴蝶效应

目录 引言 一、watch侦听器&#xff08;监视器&#xff09; 1.作用&#xff1a; 2.语法&#xff1a; 3.侦听器代码准备 4. 配置项 5.总结 二、翻译案例-代码实现 1.需求 2.代码实现 三、综合案例——购物车案例 1. 需求 2. 代码 引言 &#x1f4ac; 欢迎讨论&#…

OGGMA 21c 微服务 (MySQL) 安装避坑指南

前言 这两天在写 100 天实战课程 的 OGG 微服务课程&#xff1a; 在 Oracle Linux 8.10 上安装 OGGMA 21c MySQL 遇到了一点问题&#xff0c;分享给大家一起避坑&#xff01; 环境信息 环境信息&#xff1a; 主机版本主机名实例名MySQL 版本IP 地址数据库字符集Goldengate …

Linux面试题集合(4)

现有压缩文件:a.tar.gz存在于etc目录&#xff0c;如何解压到data目录 tar -zxvf /etc/a.tar.gz -C /data 给admin.txt创建一个软链接 ln -s admin.txt adminl 查找etc目录下以vilinux开头的文件 find /etc -name vilinux* 查找admin目录下以test开头的文件 find admin -name te…

Android Studio 安装与配置完全指南

文章目录 第一部分&#xff1a;Android Studio 简介与安装准备1.1 Android Studio 概述1.2 系统要求Windows 系统&#xff1a;macOS 系统&#xff1a;Linux 系统&#xff1a; 1.3 下载 Android Studio 第二部分&#xff1a;安装 Android Studio2.1 Windows 系统安装步骤2.2 mac…