该文章主要为完成实训任务,详细实现过程及结果见【http://t.csdn.cn/Twpwe】
文章目录
- 一、任务目标
 - 二、准备工作
 - 2.1 在本地创建用户文件
 - 2.2 将用户文件上传到HDFS指定位置
 
- 三、完成任务
 - 3.1 在Spark Shell里完成任务
 - 3.1.1 读取文件,得到RDD
 - 3.1.2 倒排,互换RDD中元组的元素顺序
 - 3.1.3 倒排后的RDD按键分组
 - 3.1.4 取分组后的日期集合最小值,计数为1
 - 3.1.5 按键计数,得到每日新增用户数
 - 3.1.6 让输出结果按日期升序
 
- 3.2 在IntelliJ IDEA里完成任务
 - 3.2.1 打开RDD项目
 - 3.2.2 创建统计新增用户对象
 - 3.2.3 运行程序,查看结果
 
一、任务目标
- 已知有以下用户访问历史数据,第一列为用户访问网站的日期,第二列为用户名。
 
2023-05-01,mike
2023-05-01,alice
2023-05-01,brown
2023-05-02,mike
2023-05-02,alice
2023-05-02,green
2023-05-03,alice
2023-05-03,smith
2023-05-03,brian
 

- 现需要根据上述数据统计每日新增的用户数量,期望统计结果。
 
2023-05-01新增用户数:3
2023-05-02新增用户数:1
2023-05-03新增用户数:2
 
- 即2023-05-01新增了3个用户(分别为mike、alice、brown),2023-05-02新增了1个用户(green),2023-05-03新增了两个用户(分别为smith、brian)。
 
二、准备工作
2.1 在本地创建用户文件
- 在
/home目录里创建users.txt文件

 
2.2 将用户文件上传到HDFS指定位置
- 先创建
/newusers/input目录,再将用户文件上传到该目录

 
三、完成任务
3.1 在Spark Shell里完成任务
3.1.1 读取文件,得到RDD
- 执行命令:
val rdd1 = sc.textFile("hdfs://master:9000/newusers/input/users.txt")

 
3.1.2 倒排,互换RDD中元组的元素顺序
val rdd2 = rdd1.map(
   line => {
       val fields = line.split(",")
       (fields(1), fields(0))
   }
)
rdd2.collect.foreach(println)
 

3.1.3 倒排后的RDD按键分组
- 执行命令:
val rdd3 = rdd2.groupByKey()

 
3.1.4 取分组后的日期集合最小值,计数为1
- 执行命令:
val rdd4 = rdd3.map(line => (line._2.min, 1))

 
3.1.5 按键计数,得到每日新增用户数
- 执行命令:
val result = rdd4.countByKey()

 - 执行命令:
result.keys.foreach(key => println(key + "新增用户:" + result(key)))

 
3.1.6 让输出结果按日期升序
- 执行命令:
val keys = result.keys.toList.sorted,让键集升序排列

 - 按日期降序

 
3.2 在IntelliJ IDEA里完成任务
3.2.1 打开RDD项目

3.2.2 创建统计新增用户对象
- 在
cn.kox.day07包里创建CountNewUsers对象

 
package cn.kox.rdd.day07
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @ClassName: CountNewUsers
 * @Author: Kox
 * @Data: 2023/6/15
 * @Sketch:
 */
object CountNewUsers {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置对象
    val conf = new SparkConf()
      .setAppName("CountNewUsers") // 设置应用名称
      .setMaster("local[*]") // 设置主节点位置(本地调试)
    // 基于Spark配置对象创建Spark容器
    val sc = new SparkContext(conf)
    // 读取文件,得到RDD
    val rdd1 = sc.textFile("hdfs://master:9000/newusers/input/users.txt")
    // 倒排,互换RDD中元组的元素顺序
    val rdd2 = rdd1.map(
      line => {
        val fields = line.split(",")
        (fields(1), fields(0))
      }
    )
    // 倒排后的RDD按键分组
    val rdd3 = rdd2.groupByKey()
    // 取分组后的日期集合最小值,计数为1
    val rdd4 = rdd3.map(line => (line._2.min, 1))
    // 按键计数,得到每日新增用户数
    val result = rdd4.countByKey()
    // 让统计结果按日期升序
    val keys = result.keys.toList.sorted
    keys.foreach(key => println(key + "新增用户:" + result(key)))
    // 停止Spark容器
    sc.stop()
  }
}
 
3.2.3 运行程序,查看结果



















