Spark核心编程(Spark Core)
文章目录
- Spark核心编程(Spark Core)
- 1. 了解RDD
- 1.2 RDD五大特性
- 1.3 WordCount案例分析
- 2 RDD编程入门
- 2.1 RDD的创建
- 2.2 RDD算子
- 2.3 常用Transformation 算子
- 小案例(客户端,集群)
1. 了解RDD
1.1 RDD概念
背景
首先分布式计算
- 分区控制
- Shuffle控制
- 数据存储\序列化\发送
- 数据计算API
- 等一系列功能
这些功能, 不能简单的通过Python内置的本地集合对象(如 List\ 字典等)去完成. 我们在分布式框架中, 需要有一个统一的数据抽象对象, 来实现上述分布式计算所需功能. 这个抽象对象, 就是RDD(Resilient Distributed Databases)
RDD定义
RDD定义 RDD(Resilient Distributed Dataset)叫做弹性分布式数据集。
是Spark中最基本的数据抽象,代表一个不可变、可 分区、里面的元素可并行计算的集合。
- Dataset:一个数据集合,用于存放数据的。 (这个概念 就是 类似于 List Array 之类的)
- Distributed:RDD中的数据是分布式存储的,可用于分布式计算。 (RDD的数据跨机器存储(跨进程))
- Resilient:RDD中的数据可以存储在内存中或者磁盘中
1.2 RDD五大特性
-
RDD有分区
-
计算方法作用到每一个分区
-
RDD之间相互依赖
-
KV型RDD可以有分区器(可选)
-
RDD分区数据的读取会尽量靠近数据所在地(可选)
-
RDD是有分区的
RDD的分区是RDD数据存储的最小单位(一份RDD数据本质上是分隔成多个分区)
-
RDD的方法会作用在其所有分区上
-
RDD之间是有依赖关系的
-
Key-Value 型的RDD可以有分区
KV型RDD数据:RDD内存储的数据是:二元元组
-
RDD的分区规划 会尽量靠近数据所在的服务器
1.3 WordCount案例分析
2 RDD编程入门
Spark RDD 编程的程序入口对象是SparkContext对象(不论何种编程语言)
只有构建出SparkContext, 基于它才能执行后续的API调用和计算
本质上, SparkContext对编程来说, 主要功能就是创建第一个RDD出来
2.1 RDD的创建
两种方式:
- 并行化集合方式创建(本地化对象 转 分布式RDD)
- 读取外部数据源(读取文件)
并行化集合方式创建
# 0. 创建Spark执行环境
from pyspark import SparkConf,SparkContext
#配置对象
conf = SparkConf().setAppName("test").setMaster("local[*]")
#入口对象
sc = SparkContext(conf=conf)
# master种类
# local:loacal[N] N核CPU运行 *所有CPU核心
# standalone
# yarn
#sc对象的parallelize 方法:将本地集合转换成RDD返回
data = [1,2,3,4,5,6,7,8,9]
rdd = sc.parallelize(data,numSlices=3)# 设置分区
print("默认分区数:",rdd.getNumPartitions())
# collect : 将RDD(分布式对象)中每个分区的数据都发送到Driver 中 形成一个 Python List
# collect: 分布式 -> 本地集合
print(rdd.collect())
读取外部数据源
- textFile() 既可以读取本地文件也可HDFS文件
在这里插入图片描述
#coding:utf-8
from pyspark import SparkConf,SparkContext
from pathlib import Path
FILE = Path(__file__).resolve()# /tmp/pycharm_project_362/00_example/HelloWorld.py
ROOT = FILE.parents[1]# /tmp/pycharm_project_362
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
## 读取文本文件
filerdd1 = sc.textFile(f"file:///{ROOT}/data/input/words.txt")
print("默认分区:",filerdd1.getNumPartitions())
print("file_rdd1:",filerdd1.collect())
filerdd2 = sc.textFile(f"file:///{ROOT}/data/input/words.txt",minPartitions=3)
# 指定最小分区只是参考值 Spark 有自己的判断
filerdd3 = sc.textFile(f"file:///{ROOT}/data/input/words.txt", minPartitions=100)
print("最小分区:", filerdd2.getNumPartitions())
print("filerdd3分区:", filerdd3.getNumPartitions())
print("file_rdd2:", filerdd2.collect())
##读取HDFS
hdfs_rdd = sc.textFile('hdfs://hadoop102:8020/wcinput/word.txt')
print(hdfs_rdd.collect())
- wholeTextFile()
import os
from pyspark import SparkConf,SparkContext
from pathlib import Path
FILE = Path(__file__).resolve()# /tmp/pycharm_project_362/00_example/HelloWorld.py
ROOT = FILE.parents[1]# /tmp/pycharm_project_362
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 这样也可成功 但都需要绝对路径
# rdd = sc.wholeTextFiles("file:///"+os.path.abspath("../data/input/tiny_files"))
rdd = sc.wholeTextFiles(f"file:///{ROOT}/data/input/tiny_files")
print(rdd.collect())
# 元组(路径:文件内容)
# [('file:/tmp/pycharm_project_362/data/input/tiny_files/3.txt', 'hello spark\r\nhello hadoop\r\nhello flink'),
# ('file:/tmp/pycharm_project_362/data/input/tiny_files/1.txt', 'hello spark\r\nhello hadoop\r\nhello flink'),
# ('file:/tmp/pycharm_project_362/data/input/tiny_files/5.txt', 'hello spark\r\nhello hadoop\r\nhello flink'),
# ('file:/tmp/pycharm_project_362/data/input/tiny_files/2.txt', 'hello spark\r\nhello hadoop\r\nhello flink'),
# ('file:/tmp/pycharm_project_362/data/input/tiny_files/4.txt', 'hello spark\r\nhello hadoop\r\nhello flink')]
print(rdd.map(lambda x:x[1]).collect())
# ['hello spark\r\nhello hadoop\r\nhello flink', 'hello spark\r\nhello hadoop\r\nhello flink',
# 'hello spark\r\nhello hadoop\r\nhello flink', 'hello spark\r\nhello hadoop\r\nhello flink',
# 'hello spark\r\nhello hadoop\r\nhello flink']
2.2 RDD算子
算子:分布式集合对象上的API 称为 算子
方法\函数: 本地对象的API
算子分类:
-
Transformation:转换算子
定义: RDD的算子,返回值仍然是RDD
特性:这类算子
lazy 懒加载
如果没有action 算子, Transformation是不工作的 -
Action:动作(行动)算子
定义:返回值不是RDD就是Action 算子
对于这两类算子来说Transformation算子,相当于在构建执行计划,
action是一个指令让这个执行计划开始工作.
如果没有action,Transformation算子之间的迭代关系,就是一个没有通电的流水线只有action到来,这个数据处理的流水线才开始工作.
2.3 常用Transformation 算子
- map
- flatMap
- reduceByKey
- mapValues
- groupBy
- filter
- distinct
- union
- join
- intersection
- glom
- groupByKey
- sortBy
- sortByKey
- map
功能:功能: map算子,是将RDD的数据一条条处理( 处理的逻基于map算子中接收的处理函数 ,返回新的RDD
在这里插入图片描述
这个和python 都一样
- flatMap
对RDD执行map 然后进行解除嵌套操作(展平成一维)
# coding:utf8
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(["hadoop spark hadoop", "spark hadoop hadoop", "hadoop flink spark"])
# 得到所有的单词, 组成RDD, flatMap的传入参数 和map一致, 就是给map逻辑用的, 解除嵌套无需逻辑(传参)
rdd2 = rdd.map(lambda line: line.split(" "))
rdd3 = rdd.flatMap(lambda line: line.split(" "))
print(rdd2.collect())
print(rdd3.collect())
# [['hadoop', 'spark', 'hadoop'], ['spark', 'hadoop', 'hadoop'], ['hadoop', 'flink', 'spark']]
# ['hadoop', 'spark', 'hadoop', 'spark', 'hadoop', 'hadoop', 'hadoop', 'flink', 'spark']
- reduceByKey
针对 KV型
RDD,自动按照key 分,然后根据你提供的局和逻辑完成组内数据(value)
的聚合操作
rdd.reduceByKey(func)
# func: (V,V) -> V
# 接受2 个传入参数(类型要一致),一个返回值 类型和传入一致
# 类比reduce
- mapValues
针对二元元组RDD 对其内部二元元组Value 执行map 操作
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('a', 1)])
print(rdd.reduceByKey(lambda a,b:a+b).collect())
# [('b', 2), ('a', 3)]
print(rdd.map(lambda x:(x[0],x[1]*10)).collect())
# [('a', 10), ('a', 10), ('b', 10), ('b', 10), ('a', 10)]
print(rdd.mapValues(lambda x:x*10).collect())
# [('a', 10), ('a', 10), ('b', 10), ('b', 10), ('a', 10)]
- groupBy
将RDD数据进行分组
rdd.groupBy(func)
# 函数 func:(T) -> K
# 传入一个参数 类型 返回值类型无所谓
# 拿到返回值后,将所有相同返回值放入一个组
# 分组完成后 每个组是一个二元组 所有同组数据放入一个迭代器对象中作为value
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('a', 1)])
res = rdd.groupBy(lambda t:t[0])
print(res.collect())
# [('b', <pyspark.resultiterable.ResultIterable object at 0x7f8581e06580>), ('a', <pyspark.resultiterable.ResultIterable object at 0x7f8581e061f0>)]
# value 变成了迭代对象
print(res.mapValues(list).collect())
# [('b', [('b', 1), ('b', 1)]), ('a', [('a', 1), ('a', 1), ('a', 1)])]
# 我们发现这是把整条数据都保留了
# print(res.map(lambda x:(x[0],list(x[1]))).collect()) 同样的作用
rdd = sc.parallelize([1,2,3,4,5,6])
## 将奇数偶数分组
rdd2 = rdd.groupBy(lambda x:"evnen" if (x%2 == 0) else "odd")
print(rdd2.mapValues(list).collect())
# [('odd', [1, 3, 5]), ('evnen', [2, 4, 6])]
- filter
过滤想要的数据进行保留
rdd.filter(func)
# func(T) -> bool 返回值必须为True False
rdd = sc.parallelize([1,2,3,4,5,6])
## 过滤器 保留奇数
print(rdd.filter(lambda x:x%2==1).collect())
# [1, 3, 5]
- distinct
去重
rdd.distinct() 直接用
- union
2个RDD合并成1个RDD 并不会去重 不同类型可以合并
sc.union([RDD1,RDD2,…])
- intersection
两个RDD交集,返回新的RDD
rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([4, 5,6])
rdd3 = sc.parallelize(['a','b','c'])
print(rdd1.union(rdd2).collect())
# [1, 2, 3, 4, 4, 5, 6]
print(rdd1.union(rdd3).collect())
# [1, 2, 3, 4, 'a', 'b', 'c']
print(sc.union([rdd1,rdd2,rdd3]).collect())
# [1, 2, 3, 4, 4, 5, 6, 'a', 'b', 'c']
## INTERSECTION
rdd1 = sc.parallelize([('a', 1), ('a', 3)])
rdd2 = sc.parallelize([('a', 1), ('b', 3)])
rdd3 = sc.parallelize([1,2,3,4])
rdd4 = sc.parallelize([2, 4, 5,6])
# 通过intersection算子求RDD之间的交集, 将交集取出 返回新RDD
rdd5 = rdd1.intersection(rdd2)
rdd6 = rdd3.intersection(rdd4)
print(rdd5.collect())
# [('a', 1)]
print(rdd6.collect())
# [2, 4]
- join
对两个RDD执行JOIN 操作(实现SQL的内外连接)
注意: join 算子只能用于二元元组
rdd.join(rdd2)#内连接
rdd.leftOuterJoin(rdd2)# 左外
rdd.rightOuterJoin(rdd2)# 右外
rdd1 = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhaoliu")])
rdd2 = sc.parallelize([(1001, "销售部"), (1002, "科技部")])
# 通过join算子来进行rdd之间的关联
# 对于join算子来说 关联条件 按照二元元组的key来进行关联
print(rdd1.join(rdd2).collect())
# [(1001, ('zhangsan', '销售部')), (1002, ('lisi', '科技部'))]
# 左外连接, 右外连接 可以更换一下rdd的顺序 或者调用rightOuterJoin即可
print(rdd1.leftOuterJoin(rdd2).collect())
# [(1001, ('zhangsan', '销售部')), (1002, ('lisi', '科技部')), (1003, ('wangwu', None)), (1004, ('zhaoliu', None))]
print(rdd2.leftOuterJoin(rdd1).collect())
# [(1001, ('销售部', 'zhangsan')), (1002, ('科技部', 'lisi'))]
print(rdd1.rightOuterJoin(rdd2).collect())
# [(1001, ('zhangsan', '销售部')), (1002, ('lisi', '科技部'))]
- glom
将RDD的数据加上嵌套 这个嵌套按照分区进行
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2)
print(rdd.glom().collect())
# [[1, 2, 3, 4], [5, 6, 7, 8, 9]]
# 只解嵌套
print(rdd.glom().flatMap(lambda x: x).collect())
# [1, 2, 3, 4, 5, 6, 7, 8, 9]
- groupByKey
针对KV型数据RDD 自动按照key 分组
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
rdd2 = rdd.groupByKey()
print(rdd2.map(lambda x: (x[0], list(x[1]))).collect())
print(rdd2.mapValues(list).collect())
# [('b', [1, 1, 1]), ('a', [1, 1])]
# [('b', [1, 1, 1]), ('a', [1, 1])]
- sortBy
基于指定排序依据 进行排序
rdd.sortBy(func,ascending=False,numPartitions=1)
# func: (T) -> U : 告诉RDD中按照那个数据排序
# ascending True 升序 False 降序
# 用多少分区排序
rdd = sc.parallelize([('c', 3), ('f', 1), ('b', 11), ('c', 3), ('a', 1), ('c', 5), ('e', 1), ('n', 9), ('a', 1)], 3)
# 使用sortBy对rdd执行排序
# 按照value 数字进行排序
# 参数1函数, 表示的是 , 告知Spark 按照数据的哪个列进行排序
# 参数2: True表示升序 False表示降序
# 参数3: 排序的分区数
"""注意: 如果要全局有序, 排序分区数请设置为1"""
print(rdd.sortBy(lambda x:x[0],ascending=True,numPartitions=1).collect())
# [('a', 1), ('a', 1), ('b', 11), ('c', 3), ('c', 3), ('c', 5), ('e', 1), ('f', 1), ('n', 9)]
print(rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=1).collect())
# [('f', 1), ('a', 1), ('e', 1), ('a', 1), ('c', 3), ('c', 3), ('c', 5), ('n', 9), ('b', 11)]
- sortByKey(ascending=True,numPartitions=1,keyfunc=<>)
keyfunc: 在 排序前 对key 进行处理 不会改变排序后的输出
rdd = sc.parallelize([('a', 1), ('E', 1), ('C', 1), ('D', 1), ('b', 1), ('g', 1), ('f', 1),
('y', 1), ('u', 1), ('i', 1), ('o', 1), ('p', 1),
('m', 1), ('n', 1), ('j', 1), ('k', 1), ('l', 1)], 3)
print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: str(key).lower()).collect())
# [('a', 1), ('b', 1), ('C', 1), ('D', 1), ('E', 1), ('f', 1), ('g', 1), ('i', 1), ('j', 1), ('k', 1), ('l', 1), ('m', 1), ('n', 1), ('o', 1), ('p', 1), ('u', 1), ('y', 1)]
# 不会改变
小案例(客户端,集群)
需求: 读取data 文件夹的order.text 文件夹 提取北京的数据 组合北京和商品类别进行输出同时对结果去重,得到北京商品的类别信息
## 读取文件
file_rdd = sc.textFile(f"file:///{ROOT}/data/input/order.text")
json_rdd = file_rdd.flatMap(lambda x:x.split("|"))
dict_rdd = json_rdd.map(lambda json_str:json.loads(json_str))
# print(dict_rdd.collect())
# 过滤数据
beijing_rdd = dict_rdd.filter(lambda d:d['areaName'] == "北京")
# print(beijing_rdd.collect())
# 组合北京 和 商品类型形成新的字符串
category_rdd = beijing_rdd.map(lambda x: x['areaName'] + "_" + x['category'])
res_rdd = category_rdd.distinct()
print(res_rdd.collect())
# ['北京_平板电脑', '北京_家具', '北京_书籍', '北京_食品', '北京_服饰', '北京_手机', '北京_家电', '北京_电脑']
上传到Yarn (使用pycharm)
这边注意依赖的注入
# coding:utf8
from pyspark import SparkConf, SparkContext
from defs_19 import city_with_category
import json
import os
os.environ['HADOOP_CONF_DIR'] = "/opt/module/hadoop-3.1.3/etc/hadoop"
if __name__ == '__main__':
# 提交 到yarn集群, master 设置为yarn
conf = SparkConf().setAppName("test-yarn-1").setMaster("yarn")
# 如果提交到集群运行, 除了主代码以外, 还依赖了其它的代码文件
# 需要设置一个参数, 来告知spark ,还有依赖文件要同步上传到集群中
# 参数叫做: spark.submit.pyFiles
# 参数的值可以是 单个.py文件, 也可以是.zip压缩包(有多个依赖文件的时候可以用zip压缩后上传)
conf.set("spark.submit.pyFiles", "defs_19.py")
sc = SparkContext(conf=conf)
# 在集群中运行, 我们需要用HDFS路径了. 不能用本地路径
file_rdd = sc.textFile("hdfs://hadoop102:8020/input/order.text")
# 进行rdd数据的split 按照|符号进行, 得到一个个的json数据
jsons_rdd = file_rdd.flatMap(lambda line: line.split("|"))
# 通过Python 内置的json库, 完成json字符串到字典对象的转换
dict_rdd = jsons_rdd.map(lambda json_str: json.loads(json_str))
# 过滤数据, 只保留北京的数据
beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == "北京")
# 组合北京 和 商品类型形成新的字符串
category_rdd = beijing_rdd.map(city_with_category)
# 对结果集进行去重操作
result_rdd = category_rdd.distinct()
# 输出
print(result_rdd.collect())
# ['北京_平板电脑', '北京_家具', '北京_书籍', '北京_食品', '北京_服饰', '北京_手机', '北京_家电', '北京_电脑']
在这里插入图片描述
从linux 提交
(pyspark) [tao@hadoop102 sparkdemo1]$ /opt/module/spark/bin/spark-submit --master yarn --py-files ./defs.py ./main.py
['北京_书籍', '北京_食品', '北京_服饰', '北京_平板电脑', '北京_家具', '北京_手机', '北京_家电', '北京_电脑']
# --py-files 也是指定依赖 可以py 可以压缩文件
通过yarn 集群 运行