目录
- 1. 转换算子(Transformation)
- 1.1 常用转换算子
- 2. 行动算子(Action)
- 2.1 常用行动算子
- 3. 转换算子与行动算子的区别
- 4. 示例代码
- 5. 总结
在Spark中,算子(Operator)是对数据集(RDD、DataFrame、Dataset)进行操作的核心功能,分为两大类: 转换算子(Transformation)和行动算子(Action)。以下是详细的分类和常用算子的介绍。
1. 转换算子(Transformation)
- 定义:转换算子是惰性求值的,调用后不会立即执行,而是生成一个新的RDD、DataFrame或Dataset,并记录依赖关系,直到遇到Action算子时才触发计算。
- 特点:
- 返回一个新的RDD、DataFrame或Dataset。
- 是惰性求值的(Lazy Evaluation)。
1.1 常用转换算子
以下是常用的转换算子及其功能:
算子 | 功能 | 示例 |
---|---|---|
map | 对每个元素进行映射,返回一个新的RDD | rdd.map(x => x * 2) |
flatMap | 类似map ,但可以返回多个元素,通常用于拆分操作 | rdd.flatMap(x => x.split(" ")) |
filter | 过滤数据,返回满足条件的元素 | rdd.filter(x => x > 10) |
distinct | 去重操作,返回不重复的元素 | rdd.distinct() |
union | 合并两个RDD,返回并集 | rdd1.union(rdd2) |
intersection | 返回两个RDD的交集 | rdd1.intersection(rdd2) |
subtract | 返回两个RDD的差集 | rdd1.subtract(rdd2) |
cartesian | 返回两个RDD的笛卡尔积 | rdd1.cartesian(rdd2) |
groupByKey | 按Key分组,返回一个(Key, Iterable[Value]) 的RDD | rdd.groupByKey() |
reduceByKey | 按Key聚合,返回一个(Key, Value) 的RDD | rdd.reduceByKey((x, y) => x + y) |
sortByKey | 按Key排序 | rdd.sortByKey(ascending = true) |
join | 对两个RDD进行内连接,返回(Key, (Value1, Value2)) | rdd1.join(rdd2) |
cogroup | 对两个RDD按Key分组,返回(Key, (Iterable[Value1], Iterable[Value2])) | rdd1.cogroup(rdd2) |
mapValues | 对(Key, Value) 中的Value进行映射 | rdd.mapValues(value => value * 2) |
flatMapValues | 对(Key, Value) 中的Value进行映射,返回多个Value | rdd.flatMapValues(value => value.split(",")) |
partitionBy | 对RDD重新分区 | rdd.partitionBy(new HashPartitioner(4)) |
coalesce | 减少分区数,用于优化性能 | rdd.coalesce(2) |
repartition | 增加或减少分区数,类似coalesce ,但会触发shuffle | rdd.repartition(4) |
pipe | 调用外部脚本处理RDD中的数据 | rdd.pipe("script.sh") |
2. 行动算子(Action)
- 定义:行动算子会触发Spark的计算,将数据从Executor返回到Driver,或者将结果输出到存储系统。
- 特点:
- 会触发实际的计算。
- 返回非RDD类型的结果(如值、集合),或者将数据输出到外部存储。
2.1 常用行动算子
以下是常用的行动算子及其功能:
算子 | 功能 | 示例 |
---|---|---|
collect | 将RDD中的所有数据收集到Driver端,返回一个数组 | rdd.collect() |
count | 返回RDD中元素的总数 | rdd.count() |
first | 返回RDD中的第一个元素 | rdd.first() |
take | 返回RDD中的前N个元素 | rdd.take(5) |
takeSample | 随机采样返回RDD中的N个元素 | rdd.takeSample(withReplacement = false, num = 5) |
takeOrdered | 返回RDD中排序后的前N个元素 | rdd.takeOrdered(5) |
reduce | 对RDD中的元素进行归约操作 | rdd.reduce((x, y) => x + y) |
fold | 类似reduce ,但需要提供初始值 | rdd.fold(0)((x, y) => x + y) |
aggregate | 对RDD中的元素进行聚合操作,支持分区内和分区间的聚合逻辑 | rdd.aggregate(0)(_ + _, _ + _) |
countByKey | 对(Key, Value) 形式的RDD按Key计数 | rdd.countByKey() |
foreach | 对RDD中的每个元素执行指定操作(通常用于副作用,如打印日志) | rdd.foreach(println) |
saveAsTextFile | 将RDD保存为文本文件 | rdd.saveAsTextFile("output_path") |
saveAsSequenceFile | 将RDD保存为Hadoop的SequenceFile格式 | rdd.saveAsSequenceFile("output_path") |
saveAsObjectFile | 将RDD保存为对象文件 | rdd.saveAsObjectFile("output_path") |
3. 转换算子与行动算子的区别
维度 | 转换算子(Transformation) | 行动算子(Action) |
---|---|---|
定义 | 对RDD进行转换,生成新的RDD | 触发实际计算,返回非RDD结果或输出数据 |
执行时机 | 惰性求值,调用时不会立即执行 | 调用时立即触发计算 |
返回值 | 返回RDD | 返回非RDD类型的结果或无返回值 |
示例 | map 、filter 、reduceByKey | collect 、count 、saveAsTextFile |
4. 示例代码
以下是一个完整的示例,结合转换算子和行动算子:
val rdd = sc.textFile("data.txt") // 读取文本文件
val words = rdd.flatMap(_.split(" ")) // 转换算子:拆分单词
val filteredWords = words.filter(word => word.length > 3) // 转换算子:过滤长度大于3的单词
val wordPairs = filteredWords.map(word => (word, 1)) // 转换算子:转换为键值对
val wordCounts = wordPairs.reduceByKey(_ + _) // 转换算子:按Key聚合
wordCounts.foreach(println) // 行动算子:打印结果
5. 总结
-
转换算子(Transformation):
- 用于定义数据的处理逻辑,生成新的RDD。
- 是惰性求值的,只有在调用行动算子时才会执行。
- 常见的有:
map
、filter
、reduceByKey
、join
等。
-
行动算子(Action):
- 用于触发计算,将结果返回到Driver端或输出到存储。
- 常见的有:
collect
、count
、saveAsTextFile
、foreach
等。