以下是使用 Spark SQL(Scala)读取 CSV 文件的完整代码示例:
scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
object CSVReadExample {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder
.appName("CSVReadExample")
.config("spark.master", "local[*]")
.getOrCreate()
try {
// 方法1:自动推断模式
val df1 = spark.read
.option("header", "true") // 第一行是否包含列名
.option("inferSchema", "true") // 自动推断数据类型
.csv("path/to/your/file.csv")
println("方法1:自动推断模式")
df1.printSchema()
df1.show()
// 方法2:指定自定义模式
val customSchema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("salary", DoubleType, nullable = true)
))
val df2 = spark.read
.option("header", "true")
.schema(customSchema) // 使用自定义模式
.csv("path/to/your/file.csv")
println("方法2:指定自定义模式")
df2.printSchema()
df2.show()
// 方法3:读取多文件
val df3 = spark.read
.option("header", "true")
.csv("path/to/your/files/*.csv") // 读取目录下所有CSV文件
println("方法3:读取多文件")
df3.count()
df3.show()
// 执行SQL查询示例
df2.createOrReplaceTempView("people")
val result = spark.sql("SELECT name, age FROM people WHERE age > 30")
result.show()
} catch {
case e: Exception =>
println(s"读取CSV失败: ${e.getMessage}")
e.printStackTrace()
} finally {
// 关闭SparkSession
spark.stop()
}
}
}
常用 CSV 读取选项:
scala
spark.read
.option("header", "true") // 是否有表头
.option("delimiter", ",") // 分隔符,默认为逗号
.option("quote", "\"") // 引号字符,默认为双引号
.option("escape", "\\") // 转义字符
.option("nullValue", "null") // 指定空值表示
.option("dateFormat", "yyyy-MM-dd") // 日期格式
.option("inferSchema", "true") // 是否自动推断模式
.csv("path/to/file.csv")
处理特殊情况:
-
处理引号包含的分隔符:
scala
.option("quote", "\"") .option("escape", "\"")
-
处理包含换行符的字段:
scala
.option("multiline", "true")
-
处理不同编码的文件:
scala
.option("charset", "UTF-8")
执行步骤:
-
准备示例 CSV 文件
people.csv
:csv
id,name,age,salary 1,Alice,25,5000.0 2,Bob,30,6000.0 3,Charlie,35,7500.0
-
运行 Spark 应用:
bash
spark-submit --class CSVReadExample \ --master local[*] \ your-application.jar
-
也可以在 Spark Shell 中交互式运行:
bash
spark-shell
然后粘贴代码片段执行
性能优化建议:
-
禁用自动推断模式(如果已知模式):
scala
.schema(customSchema) .option("inferSchema", "false") // 提高性能
-
分区并行读取:
scala
// 增加分区数提高并行度 val df = spark.read.csv("path/to/file.csv").repartition(10)
-
使用列剪枝:
scala
// 只选择需要的列 df.select("name", "age")
-
过滤数据:
scala
// 尽早过滤数据减少内存占用 df.filter($"age" > 30)
错误处理:
-
处理格式错误:
scala
.option("mode", "DROPMALFORMED") // 丢弃格式错误的记录 .option("mode", "PERMISSIVE") // 将错误字段设为null .option("mode", "FAILFAST") // 遇到错误立即失败
-
自定义错误处理:
scala
import org.apache.spark.sql.Row val df = spark.read.csv("path/to/file.csv") val validRows = df.rdd.filter { row => try { // 自定义验证逻辑 row.getString(1).length > 0 } catch { case e: Exception => false } }