一、基础优化配置
1. 资源配置优化
# 提交Spark作业时的资源配置示例
spark-submit \
--master yarn \
--executor-memory 8G \
--executor-cores 4 \
--num-executors 10 \
--conf spark.sql.shuffle.partitions=200 \
your_spark_app.py
参数说明:
-
executor-memory
: 每个Executor的内存 -
executor-cores
: 每个Executor的CPU核心数 -
num-executors
: Executor数量 -
spark.sql.shuffle.partitions
: Shuffle操作的分区数(通常设为集群核心数的2-3倍)
2. 内存管理优化
// 在SparkSession初始化时设置
val spark = SparkSession.builder()
.appName("OptimizedSparkSQL")
.config("spark.memory.fraction", "0.8") // 执行和存储内存占总内存的比例
.config("spark.memory.storageFraction", "0.3") // 存储内存占内存比例
.getOrCreate()
二、SQL查询优化技巧
1. 分区裁剪(Partition Pruning)
-- 原始查询(全表扫描)
SELECT * FROM sales WHERE dt = '2023-01-01';
-- 优化后(确保表按dt分区)
SELECT * FROM sales WHERE dt = '2023-01-01'; -- 自动分区裁剪
2. 谓词下推(Predicate Pushdown)
-- 原始查询(先JOIN后过滤)
SELECT a.*, b.name
FROM transactions a
JOIN users b ON a.user_id = b.id
WHERE a.dt = '2023-01-01' AND b.age > 18;
-- 优化后(过滤条件下推)
SELECT /*+ MAPJOIN(b) */ a.*, b.name
FROM (SELECT * FROM transactions WHERE dt = '2023-01-01') a
JOIN (SELECT id, name FROM users WHERE age > 18) b
ON a.user_id = b.id;
3. 广播小表(Broadcast Join)
// 方式1: 通过配置自动广播
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760") // 10MB
// 方式2: 手动指定广播
val smallDF = spark.table("small_table")
val largeDF = spark.table("large_table")
largeDF.join(broadcast(smallDF), "key")
三、数据存储优化
1. 文件格式选择
// 写入Parquet格式(列式存储,适合分析)
df.write.parquet("/path/to/parquet")
// 写入Delta Lake(支持ACID)
df.write.format("delta").save("/path/to/delta")
// 写入ORC(高度压缩)
df.write.orc("/path/to/orc")
2. 分区与分桶
// 按日期分区
df.write.partitionBy("dt").parquet("/path/to/partitioned")
// 分桶(适合大表JOIN)
df.write.bucketBy(50, "user_id").sortBy("user_id").saveAsTable("bucketed_table")
四、执行计划分析与优化
1. 查看执行计划
val df = spark.sql("SELECT * FROM sales WHERE amount > 100")
df.explain(true) // 显示逻辑和物理计划
// 更详细的执行计划
spark.sql("EXPLAIN EXTENDED SELECT * FROM sales WHERE amount > 100").show(false)
2. 常见执行计划问题识别
-
数据倾斜:某个task执行时间远长于其他task
-
全表扫描:执行计划中出现
Scan
操作没有过滤条件 -
非广播Join:出现
SortMergeJoin
而不是BroadcastHashJoin
-
数据重复计算:同一子查询被多次执行
3. 解决数据倾斜
// 方法1: 加盐处理
import org.apache.spark.sql.functions._
val skewedKey = "user_id"
// 为倾斜键添加随机前缀
val saltedDF = df.withColumn("salted_key",
concat(col(skewedKey), lit("_"), floor(rand() * 10)))
// 方法2: 单独处理倾斜键
val commonDF = df.filter($"user_id" =!= "skewed_value")
val skewedDF = df.filter($"user_id" === "skewed_value")
// 分别处理后union
val result = commonDF.union(skewedDF)
五、缓存策略优化
1. 缓存热数据
val hotDF = spark.sql("SELECT * FROM hot_table")
hotDF.persist(StorageLevel.MEMORY_AND_DISK) // 内存不足时溢写到磁盘
// 检查缓存状态
spark.catalog.cacheTable("hot_table")
spark.catalog.isCached("hot_table")
2. 缓存策略选择
存储级别 | 描述 | 适用场景 |
---|---|---|
MEMORY_ONLY | 仅内存 | 小数据集,频繁访问 |
MEMORY_AND_DISK | 内存+磁盘 | 中等数据集 |
MEMORY_ONLY_SER | 序列化存储 | 内存有限,减少内存占用 |
DISK_ONLY | 仅磁盘 | 很少访问的大数据集 |
六、高级优化技巧
1. 动态资源分配
spark-submit \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.initialExecutors=5 \
--conf spark.dynamicAllocation.minExecutors=2 \
--conf spark.dynamicAllocation.maxExecutors=20 \
your_app.py
2. 自适应查询执行(AQE)
// Spark 3.0+ 启用AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
3. 代码生成优化
// 启用全阶段代码生成(默认已启用)
spark.conf.set("spark.sql.codegen.wholeStage", "true")
// 对于复杂表达式,可调优
spark.conf.set("spark.sql.codegen.maxFields", "100")
七、监控与调优
1. Spark UI分析
-
Jobs页面:识别长任务
-
Stages页面:查看数据倾斜
-
Storage页面:检查缓存效率
-
SQL页面:分析查询执行计划
2. 日志分析
# 查看Executor日志中的GC情况
grep "GC" spark-executor-*.log
# 检查是否有OOM错误
grep "OutOfMemory" spark-executor-*.log
八、实战优化案例
案例:优化慢速JOIN查询
原始查询:
SELECT a.*, b.*
FROM large_table a
JOIN small_table b ON a.key = b.key
WHERE a.dt BETWEEN '2023-01-01' AND '2023-01-31'
优化步骤:
-
确认执行计划:发现是SortMergeJoin
-
检查表大小:small_table < 10MB
-
应用广播Join:
SELECT /*+ BROADCAST(b) */ a.*, b.* FROM large_table a JOIN small_table b ON a.key = b.key WHERE a.dt BETWEEN '2023-01-01' AND '2023-01-31'
-
添加分区过滤:确保large_table按dt分区
-
调整shuffle分区:
spark.conf.set("spark.sql.shuffle.partitions", "200")
通过以上优化,该查询性能提升了15倍。