04-240606Spark笔记
1.行动算子-2
-
save相关算子:
格式:
def saveAsTextFile(path: String): Unit def saveAsObjectFile(path: String): Unit def saveAsSequenceFile( path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
例子:
val rdd = sc.makeRDD(List(
("a",1),("a",2),("a",3)
))
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
// saveAsSequenceFile方法要求数据的格式必须为K-V类型
rdd.saveAsSequenceFile("output2")
输出结果:

-
foreach
格式:
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
例子:
val rdd = sc.makeRDD(List(1,2,3,4))
//foreach 其实是Driver端内存集合的循环遍历方法
rdd.collect().foreach(println) //Driver
println("***************")
// foreach 其实是Executor端内存数据打印
rdd.foreach(println) // Executor
// 算子 : Operator(操作)
// RDD的方法和Scala集合对象的方法不一样
// 集合对象的方法都是在同一个节点的内存中完成的。
// RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行
// 为了区分不同的处理效果,所以将RDD的方法称之为算子。
// RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。
输出结果:

2. 序列化
2.1 闭包检测
-
闭包检测
因为Driver需要给两个Executor共享User方法,共享就需要序列化
案例:
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List[Int]())
val user = new User()
// SparkException: Task not serializable
// NotSerializableException: com.atguigu.bigdata.spark.core.rdd.operator.action.Spark07_RDD_Operator_Action$User
// RDD算子中传递的函数是会包含闭包操作,那么就会进行检测功能
// 闭包检测
rdd.foreach(
num => {
println("age = " + (user.age + num))
}
)
sc.stop()
}
//class User extends Serializable {
// 样例类在编译时,会自动混入序列化特质(实现可序列化接口)
//case class User() {
class User {
var age : Int = 30
}
-
RDD 的分区器
自己来写分区器:
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(List(
("nba", "xxxxxxxxx"),
("cba", "xxxxxxxxx"),
("wnba", "xxxxxxxxx"),
("nba", "xxxxxxxxx"),
),3)
val partRDD: RDD[(String, String)] = rdd.partitionBy( new MyPartitioner )
partRDD.saveAsTextFile("output")
sc.stop()
}
自定义的分区器:
class MyPartitioner extends Partitioner{
// 分区数量
override def numPartitions: Int = 3
// 根据数据的key值返回数据所在的分区索引(从0开始)
override def getPartition(key: Any): Int = {
key match {
case "nba" => 0
case "wnba" => 1
case _ => 2
}
}
}
* 自定义分区器 * 1. 继承Partitioner * 2. 重写方法
输出结果:


-
RDD 文件读取与保存
案例1:
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf)
val rdd = sc.textFile("output1")
println(rdd.collect().mkString(","))
val rdd1 = sc.objectFile[(String, Int)]("output2")
println(rdd1.collect().mkString(","))
val rdd2 = sc.sequenceFile[String, Int]("output3")
println(rdd2.collect().mkString(","))
sc.stop()
}
输出结果:

案例2:
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(
List(
("a", 1),
("b", 2),
("c", 3)
)
)
rdd.saveAsTextFile("output1")
rdd.saveAsObjectFile("output2")
rdd.saveAsSequenceFile("output3")
sc.stop()
}
输出结果:

1. 数据结构:

-
累加器
累加器用来把 Executor 端变量信息聚合到 Driver 端。

Acc,累加器可以把Excutor端的数据返回到Driver中去:

案例:
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(List(1,2,3,4))
// reduce : 分区内计算,分区间计算
//val i: Int = rdd.reduce(_+_)
//println(i)
var sum = 0
rdd.foreach(
num => {
sum += num
}
)
println("sum = " + sum)
sc.stop()
}
-
系统累加器
案例:
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(List(1,2,3,4))
// 获取系统累加器
// Spark默认就提供了简单数据聚合的累加器
val sumAcc = sc.longAccumulator("sum")
//sc.doubleAccumulator
//sc.collectionAccumulator
rdd.foreach(
num => {
// 使用累加器
sumAcc.add(num)
}
)
// 获取累加器的值
println(sumAcc.value)
sc.stop()
}
累加器的一些特殊情况:
少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行 一般情况下,累加器会放置在行动算子进
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(List(1,2,3,4))
// 获取系统累加器
// Spark默认就提供了简单数据聚合的累加器
val sumAcc = sc.longAccumulator("sum")
//sc.doubleAccumulator
//sc.collectionAccumulator
val mapRDD = rdd.map(
num => {
// 使用累加器
sumAcc.add(num)
num
}
)
// 获取累加器的值
// 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
// 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
// 一般情况下,累加器会放置在行动算子进行操作
mapRDD.collect()
mapRDD.collect()
println(sumAcc.value)
sc.stop()
}
-
自定义累加器
分布式共享只写变量
案例:
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(List("hello", "spark", "hello"))
// 累加器 : WordCount
// 创建累加器对象
val wcAcc = new MyAccumulator()
// 向Spark进行注册
sc.register(wcAcc, "wordCountAcc")
rdd.foreach(
word => {
// 数据的累加(使用累加器)
wcAcc.add(word)
}
)
// 获取累加器累加的结果
println(wcAcc.value)
sc.stop()
}
/*
自定义数据累加器:WordCount
1. 继承AccumulatorV2, 定义泛型
IN : 累加器输入的数据类型 String
OUT : 累加器返回的数据类型 mutable.Map[String, Long]
2. 重写方法(6)
*/
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
private var wcMap = mutable.Map[String, Long]()
// 判断是否初始状态
override def isZero: Boolean = {
wcMap.isEmpty
}
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new MyAccumulator()
}
override def reset(): Unit = {
wcMap.clear()
}
// 获取累加器需要计算的值
override def add(word: String): Unit = {
val newCnt = wcMap.getOrElse(word, 0L) + 1
wcMap.update(word, newCnt)
}
// Driver合并多个累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map1 = this.wcMap
val map2 = other.value
map2.foreach{
case ( word, count ) => {
val newCount = map1.getOrElse(word, 0L) + count
map1.update(word, newCount)
}
}
}
// 累加器结果
override def value: mutable.Map[String, Long] = {
wcMap
}
}
-
广播变量
实现原理:
广播变量用来高效分发较大的对象。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务
分别发送。
案例:
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd1 = sc.makeRDD(List(
("a", 1),("b", 2),("c", 3)
))
// val rdd2 = sc.makeRDD(List(
// ("a", 4),("b", 5),("c", 6)
// ))
val map = mutable.Map(("a", 4),("b", 5),("c", 6))
// join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用
//val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
//joinRDD.collect().foreach(println)
// (a, 1), (b, 2), (c, 3)
// (a, (1,4)),(b, (2,5)),(c, (3,6))
rdd1.map {
case (w, c) => {
val l: Int = map.getOrElse(w, 0)
(w, (c, l))
}
}.collect().foreach(println)
sc.stop()
}
join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用

Spark 中的广播变量就可以将闭包的数据保存到Executor的内存中
Spark 中的广播变量不能更改 : 分布式共享只读变量

封装广播变量1
案例:
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd1 = sc.makeRDD(List(
("a", 1),("b", 2),("c", 3)
))
val map = mutable.Map(("a", 4),("b", 5),("c", 6))
// 封装广播变量
val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
rdd1.map {
case (w, c) => {
// 方法广播变量
val l: Int = bc.value.getOrElse(w, 0)
(w, (c, l))
}
}.collect().foreach(println)
sc.stop()
}



















