Pyspark基础入门5_RDD的持久化方法

news2025/7/23 16:34:42

Pyspark

注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下Pyspark基础入门5
#博学谷IT学习技术支持


`

文章目录

  • Pyspark
  • 前言
  • 一、RDD的缓存
  • 二、使用步骤
    • 1.演示缓存的使用操作
  • 三、RDD的checkpoint检查点
  • 四、缓存和检查点区别
  • 总结


前言

今天和大家分享的是Spark RDD的持久化方法。


一、RDD的缓存

缓存:
一般当一个RDD的计算非常的耗时|昂贵(计算规则比较复杂),或者说这个RDD需要被重复(多方)使用,此时可以将这个RDD计算完的结果缓存起来, 便于后续的使用, 从而提升效率
通过缓存也可以提升RDD的容错能力, 当后续计算失败后, 尽量不让RDD进行回溯所有的依赖链条, 从而减少重新计算时间

注意:
缓存仅仅是一种临时的存储, 缓存数据可以保存到内存(executor内存空间),也可以保存到磁盘中, 甚至支持将缓存数据保存到堆外内存中(executor以外的系统内容)
由于临时存储, 可能会存在数据丢失, 所以缓存操作, 并不会将RDD之间的依赖关系给截断掉(丢失掉),因为当缓存失效后, 可以基于原有依赖关系重新计算

缓存的API都是LAZY的, 如果需要触发缓存操作, 必须后续跟上一个action算子, 一般建议使用count

如果不添加action算子, 只有当后续遇到第一个action算子后, 才会触发缓存

二、使用步骤

设置缓存的API:
rdd.cache(): 执行缓存操作 仅能将数据缓存到内存中
rdd.persist(缓存的级别(位置)): 执行缓存操作, 默认将数据缓存到内存中, 当然也可以自定义缓存位置

手动清理缓存的API:
rdd.unpersist()

默认情况下, 当整个Spark应用程序执行完成后, 缓存也会自动失效的, 自动删除

常用的缓存级别:
MEMORY_ONLY : 仅缓存到内存中
DISK_ONLY: 仅缓存到磁盘
MEMORY_AND_DISK: 内存 + 磁盘 优先缓存到内存中, 当内存不足的时候, 剩余数据缓存到磁盘中
OFF_HEAP: 缓存到堆外内存

最为常用的: MEMORY_AND_DISK

1.演示缓存的使用操作

在这里插入图片描述

import time

import jieba
from pyspark import SparkContext, SparkConf, StorageLevel
import os

# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
"""
    清洗需求: 
	    需要先对数据进行清洗转换处理操作, 清洗掉为空的数据, 
	    以及数据字段个数不足6个的数据, 并且将每一行的数据放置到一个元组中, 
	    元组中每一个元素就是一个字段的数据
"""


def xuqiu1():
    # 需求一:  统计每个关键词出现了多少次, 获取前10个
    res = rdd_map \
        .flatMap(lambda field_tuple: jieba.cut(field_tuple[2])) \
        .map(lambda keyWord: (keyWord, 1)) \
        .reduceByKey(lambda agg, curr: agg + curr) \
        .sortBy(lambda res_tup: res_tup[1], ascending=False).take(10)
    print(res)


def xuqiu2():
    res = rdd_map \
        .map(lambda field_tuple: ((field_tuple[1], field_tuple[2]), 1)) \
        .reduceByKey(lambda agg, curr: agg + curr) \
        .top(10, lambda res_tup: res_tup[1])
    print(res)


if __name__ == '__main__':
    print("Spark的Python模板")

    # 1. 创建SparkContext核心对象
    conf = SparkConf().setAppName('sougou').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    # 2. 读取外部文件数据
    rdd = sc.textFile(name='file:///export/data/workspace/ky06_pyspark/_02_SparkCore/data/SogouQ.sample')

    # 3. 执行相关的操作:
    # 3.1 执行清洗操作
    rdd_filter = rdd.filter(lambda line: line.strip() != '' and len(line.split()) == 6)

    rdd_map = rdd_filter.map(lambda line: (
        line.split()[0],
        line.split()[1],
        line.split()[2][1:-1],
        line.split()[3],
        line.split()[4],
        line.split()[5]
    ))

    # 由于 rdd_map 被多方使用了, 此时可以将其设置为缓存
    rdd_map.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()

    # 3.2 : 实现需求
    # 需求一:  统计每个关键词出现了多少次, 获取前10个
    # 快速抽取函数:  ctrl + alt + M
    xuqiu1()
    
    # 当需求1执行完成, 让缓存失效
    rdd_map.unpersist().count()

    # 需求二:统计每个用户每个搜索词点击的次数
    xuqiu2()

    time.sleep(100)

三、RDD的checkpoint检查点

checkpoint比较类似于缓存操作, 只不过缓存是将数据保存到内存 或者 磁盘上, 而checkpoint是将数据保存到磁盘或者HDFS(主要)上
checkpoint提供了更加安全可靠的持久化的方案, 确保RDD的数据不会发生丢失, 一旦构建checkpoint操作后, 会将RDD之间的依赖关系(血缘关系)进行截断,后续计算出来了问题, 可以直接从检查点的位置恢复数据

主要作用: 容错 也可以在一定程度上提升效率(性能) (不如缓存)
	在后续计算失败后, 从检查点直接恢复数据, 不需要重新计算

相关的API:
第一步: 设置检查点保存数据位置
sc.setCheckpointDir(‘路径地址’)

第二步: 在对应RDD开启检查点
	rdd.checkpoint()
	rdd.count()

注意: 
	如果运行在集群模式中, checkpoint的保存的路径地址必须是HDFS, 如果是local模式 可以支持在本地路径
	checkpoint数据不会自动删除, 必须同时手动方式将其删除掉
import time

from pyspark import SparkContext, SparkConf
import os

# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("演示checkpoint相关的操作")

    # 1- 创建SparkContext对象
    conf = SparkConf().setAppName('sougou').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    # 开启检查点, 设置检查点的路径
    sc.setCheckpointDir('/spark/chk') # 默认的地址为HDFS
    # 2- 获取数据集
    rdd = sc.parallelize(['张三 李四 王五 赵六', '田七 周八 李九 老张 老王 老李'])

    # 3- 执行相关的操作:  以下操作仅仅是为了让依赖链条更长, 并没有太多的实际意义
    rdd1 = rdd.flatMap(lambda line: line.split())

    rdd2 = rdd1.map(lambda name: (name, 1))

    rdd3 = rdd2.map(lambda name_tuple: (f'{name_tuple[0]}_itcast', name_tuple[1]))

    rdd3 = rdd3.repartition(3)

    rdd4 = rdd3.map(lambda name_tuple: name_tuple[0])

    # RDD4设置检查点:
    rdd4.checkpoint()
    rdd4.count()


    rdd5 = rdd4.flatMap(lambda name: name.split('_'))
    rdd5 = rdd5.repartition(4)

    rdd6 = rdd5.map(lambda name: (name, 1))

    rdd_res = rdd6.reduceByKey(lambda agg, curr: agg + curr)

    print(rdd_res.collect())

    time.sleep(1000)

四、缓存和检查点区别

1- 存储位置不同:
缓存: 存储在内存或者磁盘 或者 堆外内存中
检查点: 可以将数据存储在磁盘 或者 HDFS上, 在集群模式下, 仅能保存到HDFS上

2- 血缘关系:
缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能会失效, 当失效后, 需要重新回溯计算操作
检查点: 会截断RDD的之间的血缘关系, 因为检查点将数据保存到更加安全可靠的位置, 认为数据不会发生丢失问题, 当执行失败的时候, 也不需要重新回溯计算

3- 生命周期:
缓存: 当程序执行完成后, 或者手动调度unpersist 缓存都会被删除
检查点: 即使程序退出后, 检查点的数据依然是存在的, 不会删除, 需要手动删除的

一般建议将两种持久化的方案一同作用于项目环境中, 先设置缓存 然后再设置检查点, 最后统一触发执行(底层: 会将数据先缓存好, 然后将缓存好的数据, 保存到checkpoint对应的路径中, 后续在使用的时候, 优先从缓存中读取, 如果缓存中没有, 会从checkpoint中获取, 同时再把读取数据放置到缓存中)

总结

今天和大家分享了RDD的两种持久化方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/394910.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

回收站清空恢复,4种方法任你选!

案例:不小心清空了回收站怎么恢复? “回收站刚刚清空的文件怎么恢复呀?辛苦收集的材料不小心删除了,请大神来帮我下。” 回收站是我们电脑上的一个非常实用的功能,可以让我们暂时存放不需要的文件或者是可以被删除的…

启动u盘还原成普通u盘(Windows Diskpart)

使用windows系统的diskpart 命令解决系统盘恢复成普通U盘的问题:1. 按Windows R键打开运行窗口。在搜索框中输入“ Diskpart ”,然后按 Enter 键。2. 现在输入“ list disk ”并回车。3. 然后输入“ select disk X ”(将 X 替换为可启动U盘的…

围棋高手郭广昌的“假眼”棋局

(图片来源于网络,侵删)文丨熔财经作者|易不二2022年,在复星深陷债务压顶和变卖资产漩涡的而立之年,“消失”已久的郭广昌,在质疑与非议声中回国稳定军心,强调复星将在未来的五到十年迎来一个全新…

安卓反编译入门03-dex2jar反编译apk得到Java源代码

1.获取工具dex2jar下载地址:http://sourceforge.net/projects/dex2jar/files/直接下载下载完成后,解压得到文件夹dex2jar-2.0jd-gui下载地址:http://jd.benow.ca/ 直接下载(官网版本无法复制中文,可以用这个&#xff0…

K8s pod 动态弹性扩缩容 HPA

一、概述Horizontal Pod Autoscaler(HPA,Pod水平自动伸缩),根据平均 CPU 利用率、平均内存利用率或你指定的任何其他自定义指标自动调整 Deployment 、ReplicaSet 或 StatefulSet 或其他类似资源,实现部署的自动扩展和…

算法练习-二分查找(二)

算法练习-二分查找(二) 文章目录算法练习-二分查找(二)1 二分查找1.1 题目1.2 题解2 猜数字大小2.1 题目2.2 题解3 寻找比目标字母大的最小字母3.1 题目3.2题解4 搜索插入位置4.1 题目4.2 题解5 在排序数组中查找元素的第一个和最后一个位置5.1 题目5.2 …

34- PyTorch数据增强和迁移学习 (PyTorch系列) (深度学习)

知识要点 对vgg 模型进行迁移学习定义数据路径: train_dir os.path.join(base_dir, train) # base_dir ./dataset 定义转换格式: transform transforms.Compose([transforms.Resize((96, 96)), # 统一缩放transforms.ToTensor(), # 转换为tensortransforms.No…

如何查找你的IP地址?通过IP地址能直接定位到你家!

我们ip地址分为A、B、C、D、E共5类,每一类地址范围不同,从A到Eip地址范围依次递减,其中哦,D和E是保留地址,我们用不了。A、B、C3类地址很多都被美国这样的西方国家分走了,而留给我们的就剩有限的地址了&…

记一次接口远程调用异常排查链路 Remote peer closed connection before all data could be read

前言: 异常信息: java.io.IOException: UT000128: Remote peer closed connection before all data could be read 在九月份-十月初一直都被这个问题困扰~ 排查链路 第一次、二次、三次排查该问题: 当时看到”Remote peer c…

支持在局域网使用的项目管理系统有哪些?5款软件对比

一、选择私有部署的原因以及该方案的优点有很多可能的原因导致人们更倾向于使用私有部署的企业管理软件,其中一些原因可能包括:1.数据安全性要求:一些企业管理软件包含敏感的商业数据和隐私信息,为了保护这些信息不被未经授权的第…

MyBatisPlus中的条件构造器Wrapper

引言为什么要了解Wrapper?Wrapper解决的了什么问题?一、Wrapper:条件构造抽象类,用来解决单表操作出现的一些复杂问题,例如排序,和模糊查询等等结构图文字解释AbstractWrapper : 用于查询条件封装&#xff…

java设计模式学习

一、设计模式7大原则1.单一职责原则 (Single Responsibility Principle) 在类级别和方法级别进行职责规划,专人专事2.开放-关闭原则 (Open-Closed Principle) 增加接口功能时,尽可能不要修改原有代码3.里氏替换原则 (Liskov Substitution Principle) 子类…

华为HCIE学习之Openstack keystone组件

文章目录一、keystone对象模型二、使用token的好处Token的实现模式1、UUID Token,每次验证需要访问keystone服务端2、PKI Token 验证,在客户端即可完成 发放公钥 私钥解密3、RBAC三、policy.json权限实验一、keystone对象模型 二、使用token的好处 1、token缓存在客…

什么是数字化?企业如何实现数字化?

随着社会的发展与时代的进步,以生产为核心的企业也在进行不断的创新,而新一代信息技术的应用深化,制造业迎来了数字化转型新机遇。数字化转型近些年更多的被提及,越来越多的企业想通过数字化的转型,降低企业运营成本&a…

《Ansible变量篇:ansible中事实变量facts》

一、简介 facts组件是ansible用于采集被管理机器设备信息的一个功能, 采集的机器设备信息主要包含IP地址,操作系统,以太网设备,mac地址,时间/日期相关数据,硬件信息等。 ansible有一个模块叫setup,用于获取远程主机的相关信息,并可以将这些信息作为变量在playbook里进行调用,而…

【python】剑指offer代码大集合

剑指 Offer(第 2 版) https://leetcode.cn/problem-list/xb9nqhhg/ 剑指 Offer 03. 数组中重复的数字 https://leetcode.cn/problems/shu-zu-zhong-zhong-fu-de-shu-zi-lcof/ # 法1:哈希表(Set) class Solution:def findRepeatNumber(self, nums: [

CentOS8基础篇12:使用RPM管理telnet-server软件包

一、RPM包管理工具简介 RedHat软件包管理工具(RedHat Package Manager,RPM) RPM软件包工具常用于软件包的安装、查询、更新升级、校验、卸载以及生成.rpm格式的软件包等操作。 RPM软件包工具只能管理后缀是.rpm的软件包。软件包的命名格式: 软件名称…

java 实现pgsql数据库免密备份 Windows版本

阐述下背景: 最近公司项目中有个功能模块,需要使用java实现pgsql数据库整库的数据备份,没有pgsql使用经验的我,在网上寻找了好多大神的帖子,也没发现适合自己当前场景的解决方案,但是我把大神们的帖子按照自…

某开源远程办公软件安装配置说明

******是开源软件,只需要简单的配置,就可以在企业搭建SSL VPN,满足日常远程办公的需求。本篇文档为在windows服务器安装搭建openvpn记录,仅供参考。 拓扑说明 使用虚拟机进行测试,主机操作系统Windows 10和m0n0。模拟…

【Java|golang】1247. 交换字符使得字符串相同

有两个长度相同的字符串 s1 和 s2,且它们其中 只含有 字符 “x” 和 “y”,你需要通过「交换字符」的方式使这两个字符串相同。 每次「交换字符」的时候,你都可以在两个字符串中各选一个字符进行交换。 交换只能发生在两个不同的字符串之间…