目录
- 循循渐进理解
 - 使用Cache或者Persist
 - CheckPoint
 - 缓存和CheckPoint的区别
 
循循渐进理解
wc.txt数据
hello java
spark hadoop flume kafka
hbase kafka flume hadoop
 
看下面代码会打印多少条-------------------------(RDD2)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Cache {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
    val rdd1: RDD[String] = sc.textFile("src/main/resources/wc.txt")
    val rdd2: RDD[String] = rdd1.flatMap(x => {
      println("-------------------------")
      x.split(" ")
    })
    val rdd3: RDD[(String, Int)] = rdd2.map(x => (x, 1))
    val rdd4: RDD[Int] = rdd2.map(x => x.size)
    rdd3.collect()
    rdd4.collect()
    
    Thread.sleep(10000000)
  }
}
 
正确答案是6条(解释一下wc.txt里面有三行数据,所以flatmap执行一次,会打印三条),因为执行了两个collect()行动算子(action)
 大致流程就是这样,因为rdd2没有缓存,所以要执行两次
 
 
上述的问题
 1.一个RDD在多个job中重复使用
- 问题:每个job执行的时候,该RDD之前处理布置也会宠物中
 - 使用持久化的好处:可以将该RDD数据持久化后,后续job在执行在执行的时候可以直接获取数据计算,不用重读RDD之前数据处理
 
2.如果一个job依赖链条长
- 问题:依赖链条太长的时候,如果数据丢失需要重新计算浪费大量的空间
 - 使用持久化的好处:可以直接持久化数据拿来计算,不用重头计算,节省时间
 
使用Cache或者Persist
看下面代码会打印多少条-------------------------(RDD2) 使用了Cache
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Cache {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
    val rdd1: RDD[String] = sc.textFile("src/main/resources/wc.txt")
    val rdd2: RDD[String] = rdd1.flatMap(x => {
      println("-------------------------")
      x.split(" ")
    })
    rdd2.cache()
    val rdd3: RDD[(String, Int)] = rdd2.map(x => (x, 1))
    val rdd4: RDD[Int] = rdd2.map(x => x.size)
    rdd3.collect()
    rdd4.collect()
    Thread.sleep(10000000)
  }
}
 
正确答案是3条
 
发现有个绿色点
 
 发现cache存到memory里面
 
 RDD的持久化分为
 缓存
-  
数据保存位置: task所在主机内存/本地磁盘中
 -  
数据保存时机: 在缓存所在第一个Job执行过程中进行数据保存
 -  
使用: rdd.cache()/rdd.persist()/rdd.persist(StorageLevel.XXXX)
 -  
cache与persist的区别
-  
cache是只将数据保存在内存中(cache的底层就是persisit())

 -  
persist是可以指定将数据保存在内存/磁盘中

 
 -  
 -  
常用的存储级别:
- StorageLevel.MEMORY_ONLY:只将数据保存在内存中,一般用于小数据量场景
 - StorageLevel.MEMORY_AND_DISK:只将数据保存在内存+磁盘中,一般用于大数据量场景
 
 
CheckPoint
看下面代码会打印多少条-------------------------(RDD2) 使用了CheckPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
import org.apache.spark.{SparkConf, SparkContext}
object Cache {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
    sc.setCheckpointDir("hdfs://hadoop102:8020/sparkss")
    val rdd1: RDD[String] = sc.textFile("src/main/resources/wc.txt")
    val rdd2: RDD[String] = rdd1.flatMap(x => {
      println("-------------------------")
      x.split(" ")
    })
    rdd2.checkpoint()
    val rdd3: RDD[(String, Int)] = rdd2.map(x => (x, 1))
    val rdd4: RDD[Int] = rdd2.map(x => x.size)
    rdd3.collect()
    rdd4.collect()
    rdd4.collect()
    Thread.sleep(10000000)
  }
}
 
正确答案是6条,无论你有多少个行动算子,他都是6条,因为在checkpoint rdd所在第一个job执行完成之后,会单独触发一个job计算得到rdd数据之后保存。
为什么要用CheckPoint的原因
 缓存是将数据保存在主机磁盘/内存中,如果服务器宕机数据丢失,需要重新根据依赖关系计算得到数据,需要花费大量时间,所以需要将数据保存在可靠的存储介质HDFS中,避免后续数据丢失重新计算。
- 数据保存位置: HDFS
 - 数据保存时机: 在checkpoint rdd所在第一个job执行完成之后,
会单独触发一个job计算得到rdd数据之后保存。 - 使用 
  
- 1、设置保存数据的目录: sc.setCheckpointDir(path)
 - 2、保存数据: rdd.checkpoint
 
 
checkpoint会单独触发一个job执行得到数据之后保存,所以导致数据重复计算,此时可以搭配缓存使用: rdd.cache() + rdd.checkpoint(这样只会产生3条)
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
import org.apache.spark.{SparkConf, SparkContext}
object Cache {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
    sc.setCheckpointDir("hdfs://hadoop102:8020/sparkss")
    val rdd1: RDD[String] = sc.textFile("src/main/resources/wc.txt")
    val rdd2: RDD[String] = rdd1.flatMap(x => {
      println("-------------------------")
      x.split(" ")
    })
    rdd2.cache()
    rdd2.checkpoint()
    val rdd3: RDD[(String, Int)] = rdd2.map(x => (x, 1))
    val rdd4: RDD[Int] = rdd2.map(x => x.size)
    rdd3.collect()
    rdd4.collect()
    rdd4.collect()
    Thread.sleep(10000000)
  }
}
 
缓存和CheckPoint的区别
1.数据保存位置不一样
- 缓存是将数据保存在task所在主机磁盘/内存中
 - checkpoint是将数据保存到HDFS
 
2、数据保存时机不一样
- 缓存是rdd所在第一个Job执行过程中进行数据保存
 - checkpoint是rdd所在第一个job执行完成之后保存
 
3、依赖关系是否保留不一样
- 缓存是将数据保存在task所在主机磁盘/内存中,所以服务器宕机数据丢失,需要根据依赖关系重新计算得到数据,所以rdd的依赖不能切除。
 - checkpoint是将数据保存到HDFS,数据不会丢失,所以rdd的依赖后续就用不到了,会切除。
 


















