【Spark实战指南】RDD核心操作与数据分析实战(附完整代码)
1. RDD基础与实战环境搭建RDDResilient Distributed Dataset是Spark最核心的数据抽象你可以把它理解成一个分布式的数据集合但比普通集合更强大。想象你有一本超大的电话簿被撕成很多页分给不同的人保管——RDD就是那个能帮你协调所有人一起查找、统计的智能系统。我在实际项目中发现用PySpark操作RDD比Scala版本更友好。先来快速搭建环境# 安装PySpark需要提前配置Java8 pip install pyspark3.3.1测试一个简单的RDD创建from pyspark import SparkContext sc SparkContext(local, FirstApp) # 把Python列表转为RDD data [1,2,3,4,5] rdd sc.parallelize(data) print(rdd.count()) # 输出5注意本地模式虽然方便测试但处理大数据时一定要用集群模式。我曾用4台EC2机器处理20GB日志速度比单机快15倍。2. 学生成绩分析实战我们用某大学计算机系的成绩数据集演示数据格式姓名,课程,分数。先下载示例数据!wget https://example.com/chapter5-data1.txt2.1 基础统计操作统计学生总数的代码看似简单但藏着关键知识点lines sc.textFile(chapter5-data1.txt) students lines.map(lambda x: x.split(,)[0]).distinct() print(总学生数:, students.count())这里踩过的坑distinct()操作会导致数据重分发shuffle在大数据集上非常耗资源。有次我处理1TB数据时忘记这个细节作业跑了3小时...课程平均分计算演示了经典的MapReduce模式courses lines.map(lambda x: (x.split(,)[1], int(x.split(,)[2]))) # 魔法发生在reduceByKey avg_scores courses.mapValues(lambda v: (v, 1)) \ .reduceByKey(lambda a,b: (a[0]b[0], a[1]b[1])) \ .mapValues(lambda v: v[0]/v[1]) print(avg_scores.collect())2.2 高级分析技巧累加器的使用是面试常考点。比如统计DataBase课程的选修人数db_course lines.filter(lambda x: DataBase in x) accum sc.accumulator(0) db_course.foreach(lambda x: accum.add(1)) print(DataBase选修人数:, accum.value)我曾用这个技术实时统计异常日志量当accum值超过阈值时触发告警。3. 生产级RDD应用3.1 数据去重实战合并两个日志文件并去重是常见需求。完整代码示例def deduplicate_files(file1, file2, output_path): rdd1 sc.textFile(file1).map(lambda x: (x, None)) rdd2 sc.textFile(file2).map(lambda x: (x, None)) unified rdd1.union(rdd2).reduceByKey(lambda x,_: None).keys() unified.saveAsTextFile(output_path) # 调用示例 deduplicate_files(A.txt, B.txt, output/)性能提示给reduceByKey指定分区数能显著提升性能比如.reduceByKey(lambda x,_: None, 10)3.2 多数据集聚合分析计算学生多科平均分时我推荐使用combineByKey而不是groupByKeydef calculate_avg(grades_rdd): def create_combiner(v): return (v, 1) def merge_value(acc, v): return (acc[0] v, acc[1] 1) def merge_combiners(acc1, acc2): return (acc1[0] acc2[0], acc1[1] acc2[1]) return grades_rdd.combineByKey( create_combiner, merge_value, merge_combiners ).mapValues(lambda x: round(x[0]/x[1], 2)) # 使用示例 grades sc.parallelize([(Tom,88), (Tom,90), (Jim,75)]) print(calculate_avg(grades).collect())4. 性能优化与调试4.1 持久化策略选择RDD的持久化能大幅提升性能但用错级别会适得其反。这是我总结的决策树内存充足MEMORY_ONLYRDD太大MEMORY_AND_DISK需要快速恢复MEMORY_ONLY_SER序列化存储跨作业共享DISK_ONLYprocessed lines.map(complex_transformation).persist(storageLevelpyspark.StorageLevel.MEMORY_AND_DISK)4.2 数据倾斜处理遇到reduceByKey卡在99%八成是数据倾斜。我用过的解决方案加盐法给key添加随机前缀skewed_rdd.map(lambda x: (str(random.randint(0,9))_x[0], x[1])) \ .reduceByKey(lambda a,b: ab) \ .map(lambda x: (x[0].split(_)[1], x[1])) \ .reduceByKey(lambda a,b: ab)采样调优先采样找出热点key单独处理有次处理用户行为数据发现1%的用户产生了90%的记录用上述方法将作业时间从6小时降到40分钟。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2453135.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!