目录
单机部署spark本地模式部署
Anaconda部署Python(3台机器都需要)
Spark本地模式部署
Spark Python Shell
Spark的Standalone集群部署
Standalone集群架构
Standalone集群部署
Standalone集群启动
Standalone集群测试
Spark on YARN的实现
Spark on YARN:资源设计
Spark on YARN:配置测试
安装spark的前提是已经在完成了Hadoop的部署
单机部署spark本地模式部署
部署:Anaconda部署Python(3台机器都需要)
-  目标:实现Linux机器上使用Anaconda部署Python 
-  上传: 
cd /export/server/- 安装
# 添加执行权限
chmod u+x Anaconda3-2020.07-Linux-x86_64.sh
# 执行
sh ./Anaconda3-2020.07-Linux-x86_64.sh
# 过程
#第一次:【直接回车,然后按q】
    Please, press ENTER to continue
    >>> 
#第二次:【输入yes】
    Do you accept the license terms? [yes|no]
    [no] >>> yes
    
#第三次:【输入解压路径:/export/server/anaconda3】
    [/root/anaconda3] >>> /export/server/anaconda3
    
#第四次:【输入yes,是否在用户的.bashrc文件中初始化Anaconda3的相关内容】
    Do you wish the installer to initialize Anaconda3
    by running conda init? [yes|no]
    [no] >>> yes
- 激活
# 刷新环境变量
source /root/.bashrc
# 激活虚拟环境,如果需要关闭就使用:conda deactivate
conda activate- 验证
python3
- 配置
# 编辑环境变量
vim /etc/profile
# 添加以下内容
# Anaconda Home
export ANACONDA_HOME=/export/server/anaconda3
export PATH=$PATH:$ANACONDA_HOME/bin
# 刷新环境变量
source /etc/profile
# 创建软连接
ln -s /export/server/anaconda3/bin/python3 /usr/bin/python3
# 验证
echo $ANACONDA_HOME部署:Spark本地模式部署
-  目标:实现Spark本地模式的单机部署 
-  上传 
cd /export/server/- 安装
# 解压安装
tar -zxf spark-3.1.2-bin-hadoop3.2.tgz -C /export/server
# 重命名
cd /export/server
mv spark-3.1.2-bin-hadoop3.2 spark-local
# 创建软连接
ln -s spark-local spark
单机部署:Spark Python Shell
-  目标:掌握Spark Shell的基本使用 
-  实施 -  功能:提供一个交互式的命令行,用于测试开发Spark的程序代码 -  Spark的客户端bin目录下:提供了多个测试工具客户端 
-  pyspark:Python命令行 
-  spark-submit:用于提交我们开发的Spark程序的 
-  beeline:SQL命令行 
-  spark-sql:SQL命令行 
 
-  
- 启动
 
-  
# --master指定运行的模式,local代表本地模式,[N]代表这个程序运行给定几核CPU
/export/server/spark/bin/pyspark --master local[2]
-  核心 -  Spark context Web UI available at http://node1.itcast.cn:4040 -  Spark为每个程序都提供了一个Web监控界面,端口从4040开始,如果被占用就不断+1 
-  方便我们对每个程序的运行状况做监控用的 
 
-  
-  Spark context available as 'sc' (master = local[2], app id = local-1637509567232). -  SparkContext是Spark程序中的一个类,这个类是所有Spark程序都必须有的,负责读取数据,调度Task等 
-  当前的程序中默认构建了一个SparkContext类的对象叫做sc 
 
-  
-  SparkSession available as 'spark' -  SparkSession也是Spark程序中的一个类,功能类似于SparkContext,Spark2.0以后推出的,主要用于SparkSQL 
-  当前的程序中默认构建了一个SparkSession类的对象叫做spark 
 
-  
 
-  
Spark的Standalone集群部署
Standalone集群架构
-  目标:理解Standalone集群架构 
-  对比 
| 概念 | MR+YARN | Spark Standalone | 
| 主节点 | ResourceManager | Master | 
| 从节点 | NodeManager | Woker | 
| 计算进程 | MapTask/ReduceTask | Executor | 
-  分布式主从架构:整体的功能及架构高度类似于YARN【ResourceManager、NodeManager】 -  分布式架构 
-  普通分布式主从架构:HDFS、YARN、Spark、Flink、Hbase => 主节点单点故障问题 -  解决主节点单点故障问题:HA高可用架构来解决 
 
-  
-  公平分布式主从架构:Zookeeper -  不存在所讲的单点故障问题,Zookeeper负责帮别人解决单点故障问题 
-  整个大数据平台中ZK的场景:1-辅助实现HA,解决单点故障问题。2-存储实时工具元数据 
 
-  
 
-  
-  功能:提供分布式资源管理和任务调度 
-  主:Master:管理节点,类比于YARN中的RM -  接受客户端请求:所有程序的提交,都是提交给主节点 
-  管理从节点:通过心跳机制检测所有的从节点的健康状态 
-  资源管理和任务调度:将所有从节点的资源在逻辑上合并为一个整体,将任务分配给不同的从节点 
 
-  
-  从:Worker:计算节点,类比于YARN中NM -  使用自己所在节点的资源运行计算进程Executor:给每个计算进程分配一定的资源 
-  所有Task线程计算任务就运行在Executor进程中 
-  假设每台机器机器:32Core - 64GB 
-  那么Worker的资源由配置决定,例如16Core - 32GB 
-  表示Worker最多能使用这台机器的16Core32GB的资源用于计算 
 
-  
-  注意:Executor类似于MapTask或者ReduceTask进程,每个程序的Executor只启动一次 
Standalone集群部署
-  目标:实现Spark Standalone集群的部署 
实施
# 三台机器都执行
cd /export/server
ln -s jdk1.8.0_241 jdk
ln -s hadoop-3.3.0 hadoop
# 只在node1上执行
ln -s hive-3.1.2/ hive-  安装Spark:第一台机器 # 解压安装 cd /export/server/ tar -zxf spark-3.1.2-bin-hadoop3.2.tgz -C /export/server # 重命名 mv spark-3.1.2-bin-hadoop3.2 spark-standalone # 重新构建软连接 rm -rf spark ln -s spark-standalone spark 
-  配置集群节点【注意不要从PDF复制,要从md文件复制,进入vim以后要按i或者o】 -  spark-env.sh:Spark环境配置 cd /export/server/spark/conf mv spark-env.sh.template spark-env.sh vim spark-env.sh# 22行:申明JVM环境路径以及Hadoop的配置文件路径 export JAVA_HOME=/export/server/jdk export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop # 60行左右 SPARK_MASTER_HOST=node1.itcast.cn SPARK_MASTER_PORT=7077 SPARK_MASTER_WEBUI_PORT=8080 SPARK_WORKER_CORES=1 SPARK_WORKER_MEMORY=1g SPARK_WORKER_PORT=7078 SPARK_WORKER_WEBUI_PORT=8081 SPARK_DAEMON_MEMORY=1g SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1.itcast.cn:8020/spark/eventLogs/ -Dspark.history.fs.cleaner.enabled=true" 
 
-  
# 注释:不用放到配置文件中
SPARK_MASTER_HOST=node1.itcast.cn   #用于指定Master主节点的地址
SPARK_MASTER_PORT=7077              #用于指定Master任务提交端口
SPARK_MASTER_WEBUI_PORT=8080        #用于指定Master Web界面的端口,类似于YARN中的8088
SPARK_WORKER_CORES=1                #用于指定每个Worker能用机器多少核CPU
SPARK_WORKER_MEMORY=1g              #用于指定每个Worker能用机器多少内存   
SPARK_WORKER_PORT=7078              #用于指定从节点的通信端口
SPARK_WORKER_WEBUI_PORT=8081        #用于指定从节点的Web端口
SPARK_DAEMON_MEMORY=1g              #运行进程使用的资源
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1.itcast.cn:8020/spark/eventLogs/ -Dspark.history.fs.cleaner.enabled=true"  # 配置Spark的HistoryServer进程日志存储配置-  在HDFS上创建程序日志存储目录 # 第一台机器启动HDFS start-dfs.sh # 创建程序运行日志的存储目录 hdfs dfs -mkdir -p /spark/eventLogs/ # 在浏览器中访问 http://192.168.88.100:9870/ 
-  spark-defaults.conf:Spark属性配置文件 
cd /export/server/spark/conf
mv spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf# 末尾
spark.eventLog.enabled     true
spark.eventLog.dir         hdfs://node1.itcast.cn:8020/spark/eventLogs
spark.eventLog.compress    true-  workers:从节点地址配置文件 
mv workers.template workers
vim workers# 删掉localhost,添加以下内容
node1.itcast.cn
node2.itcast.cn
node3.itcast.cn-  log4j.properties:日志配置文件 
mv log4j.properties.template log4j.properties
vim log4j.properties# 19行:修改日志级别为WARN
log4j.rootCategory=WARN, console-  分发同步 - 分发
 
cd /export/server/
ln -s spark-standalone spark-  第二台和第三台创建软链接 
cd /export/server/
ln -s spark-standalone sparkStandalone集群启动
-  目标:掌握Spark Standalone集群的启动管理 
-  实施 -  启动 -  第一台机器执行命令 
-  启动 hdfs 
 
-  
 
-  
# 第一台机器启动HDFS
start-dfs.sh- 启动Master
cd /export/server/spark
sbin/start-master.sh
- 启动Worker
sbin/start-workers.sh
-  启动HistoryServer 
sbin/start-history-server.sh
-  监控:启动以后才能访问 -  Master监控服务:相当于YARN中的8080 
 
-  
http://node1:8080/
HistoryServer历史监控服务:相当于MR中的19888
http://node1:18080/
-  端口不要记混了 -  Master提交程序端口:7077 
-  Master Web界面的端口:8080 
-  HistoryServer Web界面端口:18080 
 
-  
-  关闭 
# 关闭Master
sbin/stop-master.sh
# 关闭Worker
sbin/stop-workers.sh
# 关闭HistoryServer
sbin/stop-history-server.shStandalone集群测试
-  目标:实现Spark程序在Standalone集群上的测试 
-  实施 -  圆周率测试 
 
-  
# 提交程序脚本:bin/spark-submit
# --master:指定运行模式,本地模式:local, Standalone集群:spark://主节点地址:7077
/export/server/spark/bin/spark-submit \
--master spark://node1.itcast.cn:7077 \
--conf "spark.pyspark.driver.python=/export/server/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/export/server/anaconda3/bin/python3" \
/export/server/spark/examples/src/main/python/pi.py \
200-  提交程序提交给主节点 
-  主节点是Master,Master用于接受客户端请求的7077 

Spark on YARN的实现
Spark on YARN:资源设计
-  目标:掌握Spark on YARN的设计 
-  实施 -  问题: 为什么要将Spark的程序运行在YARN上,不运行在自带的Standalone集群上?
-  实现 -  统一化资源管理 -  工作中的计算集群大多数情况下只有1套集群 
-  如果Hadoop生态的程序,例如MR、Hive、Sqoop、Oozie等使用YARN来计算 
-  而Spark的程序单独用Standalone集群来计算 
-  就导致了一套硬件资源被两套资源管理平台所管理,使用时会导致资源竞争冲突等问题 
-  不能充分的发挥硬件资源的性能且管理麻烦 
 
-  
-  自由开发模式 -  使用YARN统一化管理整个硬件集群的所有计算资源:公共分布式资源平台 
-  YARN支持多种类型程序的运行:MR、Tez、Spark、Flink等 
 
-  
-  成熟的资源调度机制 -  支持多队列、多种调度器可以实现不同场景下的计算资源隔离和任务调度 
-  YARN中Capacity、Fair调度器 
 
-  
 
-  
-  关闭原来的Standalone集群 
 
-  
cd /export/server/spark
sbin/stop-master.sh 
sbin/stop-workers.sh 
sbin/stop-history-server.sh Spark on YARN:配置测试
-  目标:实现Spark on YARN的配置及测试 
-  实施 -  搭建 -  准备工作(三台机器都需要做) 
 
-  
 
-  
cd /export/server
ln -s jdk1.8.0_241 jdk
ln -s hadoop-3.3.0 hadoop-  解压:第一台机器 
cd /export/server
tar -zxf spark-3.1.2-bin-hadoop3.2.tgz -C /export/server
mv spark-3.1.2-bin-hadoop3.2 spark-yarn
rm -rf /export/server/spark
ln -s /export/server/spark-yarn /export/server/spark-  修改配置 -  spark-env.sh 
 
-  
cd /export/server/spark-yarn/conf
mv spark-env.sh.template spark-env.sh
vim /export/server/spark-yarn/conf/spark-env.sh## 22行左右设置JAVA安装目录、HADOOP和YARN配置文件目录
export JAVA_HOME=/export/server/jdk
export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
export YARN_CONF_DIR=/export/server/hadoop/etc/hadoop
## 历史日志服务器
SPARK_DAEMON_MEMORY=1g
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1.itcast.cn:8020/spark/eventLogs/ -Dspark.history.fs.cleaner.enabled=true"-  spark-defaults.conf 
cd /export/server/spark-yarn/conf
mv spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf## 添加内容:
spark.eventLog.enabled            true
spark.eventLog.dir                hdfs://node1.itcast.cn:8020/spark/eventLogs
spark.eventLog.compress           true
spark.yarn.historyServer.address  node1.itcast.cn:18080
spark.yarn.jars    hdfs://node1.itcast.cn:8020/spark/jars/*-  上传spark jar 
#因为YARN中运行Spark,需要用到Spark的一些类和方法
#如果不上传到HDFS,每次运行YARN都要上传一次,比较慢
#所以自己手动上传一次,以后每次YARN直接读取即可
hdfs dfs -mkdir -p /spark/jars/
hdfs dfs -put /export/server/spark-yarn/jars/* /spark/jars/
hdfs dfs -mkdir -p /spark/eventLogs-  修改yarn-site.xml【不用做】 
vim /export/server/hadoop/etc/hadoop/yarn-site.xml## 添加如下内容
<!-- 开启日志聚合功能 -->
<property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
</property>
<!-- 设置聚合日志在hdfs上的保存时间 -->
<property>
    <name>yarn.log-aggregation.retain-seconds</name>
    <value>604800</value>
</property>
<!-- 设置yarn历史服务器地址 -->
<property>
    <name>yarn.log.server.url</name>
    <value>http://node1.itcast.cn:19888/jobhistory/logs</value>
</property>
<!-- 关闭yarn内存检查 -->
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>-  分发yarn-site.xml 
cd /export/server/hadoop/etc/hadoop
scp -r yarn-site.xml root@node2.itcast.cn:$PWD
scp -r yarn-site.xml root@node3.itcast.cn:$PWD-  分发:将第一台机器的spark-yarn分发到第二台和第三台 (可以省略) 
-  分发:第一台机器操作 
cd /export/server/
scp -r spark-yarn node2:$PWD
scp -r spark-yarn node3:$PWD-  修改第二台和第三台软连接 
rm -rf /export/server/spark
ln -s /export/server/spark-yarn /export/server/spark-  启动:第一台机器 
# 启动hdfs
start-dfs.sh
# 启动yarn
start-yarn.sh
# 启动MR的JobHistoryServer:19888
mapred --daemon start historyserver
# 启动Spark的HistoryServer:18080
/export/server/spark/sbin/start-history-server.sh -  测试 -  官方圆周率计算 
 
-  
/export/server/spark/bin/spark-submit \
--master yarn \
/export/server/spark-yarn/examples/src/main/python/pi.py \
10-  自己开发WorkCount 
# 指定运行资源
/export/server/spark/bin/spark-submit \
--master yarn \
--driver-memory 512M \
--driver-cores 1  \
--supervise \
--executor-memory 1G \
--executor-cores 1 \
--num-executors 2 \
hdfs://node1:8020/spark/app/py/pyspark_core_word_args.py \
/spark/wordcount/input \
/spark/wordcount/output9-  pyspark_core_word_args.py 内容 
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
from pyspark import SparkContext, SparkConf
import os
import re
import sys
if __name__ == '__main__':
    # todo:0-设置系统环境变量:全部换成Linux地址
    os.environ['JAVA_HOME'] = '/export/server/jdk'
    os.environ['HADOOP_HOME'] = '/export/server/hadoop'
    os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'
    os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'
    # todo:1-构建SparkContext
    # conf = SparkConf().setMaster("local[2]").setAppName("Remote Test APP")
    conf = SparkConf().setAppName("Remote Test APP")
    sc = SparkContext(conf=conf)
    # todo:2-数据处理:读取、转换、保存
    # 2.1 读
    # input_rdd = sc.textFile("hdfs://node1:8020/spark/wordcount/input/b_word_re.txt")
    input_rdd = sc.textFile(sys.argv[1])
    # print(input_rdd.take(10))
    # 2.2 处理
    rs_rdd = (
        input_rdd
            .filter(lambda line: len(line.strip())>0)
            # .flatMap(lambda line: line.strip().split(" "))
            .flatMap(lambda line: re.split('\\s+', line))
            .map(lambda word: (word, 1))
            .reduceByKey(lambda a, b: a + b)
            .sortBy(lambda x: x[1])
    )
    rs_rdd.foreach(lambda x: print(x))
    # 2.3 写
    # rs_rdd.saveAsTextFile("hdfs://node1:8020/spark/wordcount/output_8")
    rs_rdd.saveAsTextFile(sys.argv[2])
    # todo:3-关闭SparkContext
    sc.stop()


![P3197 [HNOI2008] 越狱](https://i-blog.csdnimg.cn/direct/221276d91f9a425f9c445f1415aabcda.png)














