RDD API 学习
RDD vs DataFrame 对比特性RDDDataFrameAPI 风格函数式Scala/Java声明式SQL性能较慢更快Catalyst 优化类型安全编译时运行时内存管理手动JVM自动Tungsten适用场景复杂 ETL/算法结构化查询学习目标理解 Spark 底层原理处理复杂数据转换。 RDD 基础操作1.1 从 DataFrame 转换为 RDDscala// 将 DataFrame 转为 RDD val rdd df.rdd // 查看 RDD 类型 println(rdd.getClass) // 查看分区数 println(sRDD 分区数: ${rdd.getNumPartitions}) // 查看第一个元素 rdd.first()1.2 创建 RDD 的几种方式scala// 方式1从集合创建 val dataRDD spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9,10)) dataRDD.collect().foreach(print)// 方式2从文件创建 val textRDD spark.sparkContext.textFile(/opt/data/user_behavior.csv) println(s文件 RDD 行数: ${textRDD.count()})1.3 RDD 基本操作Transformationscala// map逐行转换 val userRDD textRDD.map(line { val arr line.split(,) (arr(0).toLong, arr(3)) // (user_id, behavior_type) }) userRDD.take(10).foreach(println)// filter过滤 val pvRDD userRDD.filter(_._2 pv) println(sPV 数量: ${pvRDD.count()})// distinct去重 val distinctUsers userRDD.map(_._1).distinct() println(s去重用户数: ${distinctUsers.count()}) RDD 聚合操作2.1 reduceByKey按 Key 聚合scala// 统计每个用户的行为次数 val userBehaviorCount userRDD.map(x (x._1, 1)) .reduceByKey(_ _) .sortBy(_._2, false) println(行为次数 Top 10 用户:) userBehaviorCount.take(10).foreach(println)2.2 groupByKey vs reduceByKeyscala// groupByKey不推荐性能差 val grouped userRDD.groupByKey() println(groupByKey 结果示例:) grouped.mapValues(_.size).take(5).foreach(println) // reduceByKey推荐预聚合 val reduced userRDD.mapValues(_ 1).reduceByKey(_ _) println(reduceByKey 结果示例:) reduced.take(5).foreach(println)1. groupByKey先把所有数据 shuffle 拉过去再分组、再计算数据全部在网络传输中间不做任何合并容易OOM、卡死、倾斜2. reduceByKey先在本地预聚合Map 端聚合再 shuffle网络传输数据量大大减少速度快、不占内存、不倾斜2.3 aggregateByKey自定义聚合scala// 统计每个用户的行为类型分布 val behaviorAgg userRDD.map(x (x._1, Seq[String](x._2))) .aggregateByKey(Seq.empty[String])( (seq, value) seq Seq(value), // seqOp: 分区内合并 (seq1, seq2) seq1 seq2 // combOp: 分区间合并 ) behaviorAgg.take(5).foreach(println) RDD 高级操作3.1 join关联两个 RDDscala// 创建用户维度 RDD val userDimRDD spark.sparkContext.parallelize(Seq( (3987L, 活跃用户), (8527L, 新用户), (124L, 高活用户), (7450L, 付费用户) ))// 行为 RDD val behaviorRDD userRDD.take(100).map(x (x._1, x._2))// 执行 Join val joinedRDD behaviorRDD.join(userDimRDD) joinedRDD.foreach(println)// 遇到的报错val joinedRDD behaviorRDD.join(userDimRDD)console:24: error: value join is not a member of Array[(Long, String)]val joinedRDD behaviorRDD.join(userDimRDD)之前val behaviorRDD userRDD.take(100).map(x (x._1, x._2))take操作使得behaviorRDD变成普通数组普通数组没有join方法更改val behaviorRDD spark.sparkContext.parallelize( userRDD.take(100) )先take取数据再转成rdd3.2 mapPartitions分区级别操作scala// 每个分区内计算 val partitionCounts rdd.mapPartitions(iter { var count 0 while (iter.hasNext) { count 1 iter.next() } Iterator(count) }).collect() println(s每个分区的数据量: ${partitionCounts.mkString(, )})3.3 持久化 RDDscala// 缓存 RDD val cachedRDD userRDD.cache() println(s缓存后触发计算: ${cachedRDD.count()}) // 查看存储级别 println(cachedRDD.getStorageLevel) // 查看缓存状态 spark.sparkContext.getExecutorMemoryStatus.foreach(println) RDD 操作分类总结类型操作说明Transformationmap,filter,flatMap懒执行返回新 RDD聚合reduceByKey,groupByKey按 Key 聚合排序sortBy,sortByKey全局排序关联join,leftOuterJoin两个 RDD 关联分区repartition,coalesce调整分区数Actioncount,collect,take触发计算
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2584229.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!