生活中缓存容量受成本和体积限制(比如 CPU 缓存只有几 MB 到几十 MB),但会通过算法(如 “最近最少使用” 原则)智能决定存什么,确保存的是 “最可能被用到的数据”。
1. 为什么需要缓存?
-
惰性执行机制:Spark 的转换操作(如
map
,filter
,join
)是惰性的,只有在触发行动操作(如count
,collect
)时才会真正执行。如果多次使用同一个 RDD/DataFrame,每次行动操作都会重新计算整个血缘(Lineage),导致性能浪费。 -
缓存的作用:将重复使用的数据持久化到内存或磁盘,避免重复计算。
2. 如何缓存数据?
Spark 提供两种方法缓存数据:
-
persist(storageLevel)
:指定存储级别(如内存、磁盘等)。 -
cache()
:等价于persist(StorageLevel.MEMORY_ONLY)
,默认将数据存储在内存中。
3.RDD缓存
Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存多个数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的行动算子时,该RDD将会被缓存在计算节点的内存中,并供以后重用。
不用缓存的例子
代码展示:
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object Cache {
def main(args: Array[String]): Unit = {
// 配置Spark
val conf = new SparkConf().setAppName("CacheExample").setMaster("local[*]")
val sc = new SparkContext(conf)
// conf.set("spark.local.dir", "_cache")
sc.setLogLevel("WARN")
// 创建一个包含大量随机数的 RDD
val largeRDD = sc.parallelize(1 to 1000*1000*10).map(_ => scala.util.Random.nextInt(1000))
// 定义一个复杂的转换函数
def complexTransformation(num: Int): Int = {
var result = num
for (_ <- 1 to 1000) {
result = result * 2 % 1000
}
result
}
// 不使用 cache 的情况
val nonCachedRDD = largeRDD.map(complexTransformation)
// 第一次触发行动算子,计算并统计时间
val startTime1 = System.currentTimeMillis()
val result1 = nonCachedRDD.collect()
val endTime1 = System.currentTimeMillis()
println(s"不使用 cache 第一次计算耗时: ${endTime1 - startTime1} 毫秒")
// 第二次触发行动算子,计算并统计时间
val startTime2 = System.currentTimeMillis()
val result2 = nonCachedRDD.collect()
val endTime2 = System.currentTimeMillis()
println(s"不使用 cache 第二次计算耗时: ${endTime2 - startTime2} 毫秒")
sc.stop()
}
}
核心代码说明:
1.map算子是转换算子,并不会导致真正的计算
2.第一次调用collect和第二调用collect花的时间基本一致。这就是没有缓存的效果。
带缓存的例子:
代码展示:
// 使用 cache 的情况
val cachedRDD = largeRDD.map(complexTransformation).cache()
// 第一次触发行动算子,计算并统计时间
val startTime3 = System.currentTimeMillis()
val result3 = cachedRDD.collect()
val endTime3 = System.currentTimeMillis()
println(s"使用 cache 第一次计算耗时: ${endTime3 - startTime3} 毫秒")
// 第二次触发行动算子,计算并统计时间
val startTime4 = System.currentTimeMillis()
val result4 = cachedRDD.collect()
val endTime4 = System.currentTimeMillis()
println(s"使用 cache 第二次计算耗时: ${endTime4 - startTime4} 毫秒")
println(s"spark.local.dir 的值: ${conf.get("spark.local.dir")}")
sc.stop()
核心代码说明:
第一次调用collect时,程序需要对RDD中的每个元素执行fibonacci函数进行计算,这涉及到递归运算,比较耗时。
第二次调用collect时,因为之前已经调用了cache方法,并且结果已被缓存,所以不需要再次执行计算,直接从缓存中读取数据。通过对比两次计算的耗时,可以明显发现第二次计算耗时会远小于第一次(在数据量较大或计算复杂时效果更显著),这就体现了cache方法缓存计算结果、避免重复计算、提升后续操作速度的作用 。