Spark对正常日志文件清洗并分析

news2025/6/17 4:06:44

目录

日志文件准备:

一.日志数据清洗: 

第一步:数据清洗需求分析:

二.代码实现 

2.1 代码和其详解

2.2创建jdbcUtils来连接Mysql数据库

2.3 运行后结果展示:

三、留存用户分析 

3.1需求概览

3.2.代码实现

3.3 运行后结果展示: 

四、活跃用户分析 

4.1需求概览

4.2代码实现


日志文件准备:

链接:https://pan.baidu.com/s/1dWjIGMttVJALhniJyS6R4A?pwd=h4ec 
提取码:h4ec 
--来自百度网盘超级会员V5的分享

一.日志数据清洗: 

第一步:数据清洗需求分析:

1.读入日志文件并转化为Row类型

  • 按照Tab切割数据
  • 过滤掉字段数量少于8个的

2.对数据进行清洗

  • 按照第一列和第二列对数据进行去重
  • 过滤掉状态码非200
  • 过滤掉event_time为空的数据
  • 将url按照”&”以及”=”切割

3.保存数据

  • 将数据写入mysql表中
  • 将其分成多个字段

二.代码实现 

2.1 代码和其详解

 def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName("detlDemo").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val sc: SparkContext = spark.sparkContext

    import spark.implicits._

    //TODO 加载日志文件数据,按照\t分组,过滤出长度小于8的数据,将数据封装到 Row对象中,创建DF

    //创建Row对象
    val rowRdd: RDD[Row] = sc.textFile("in/test.log")
      .map(x => x.split("\t"))
      .filter(x => x.length >= 8)
      .map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))
    
    //创建Schema
    val schema: StructType = StructType(
      Array(
        StructField("event_time", StringType),
        StructField("url", StringType),
        StructField("method", StringType),
        StructField("status", StringType),
        StructField("sip", StringType),
        StructField("user_uip", StringType),
        StructField("action_prepend", StringType),
        StructField("action_client", StringType)
      )
    )
    //创建DataFrame
    val logDF: DataFrame = spark.createDataFrame(rowRdd, schema)
    logDF.printSchema()
    logDF.show(3)

    //TODO 删除重复数据,过滤掉状态码非200

    val filterLogs: Dataset[Row] = logDF.dropDuplicates("event_time", "url")
      .filter(x => x(3) == "200")
      .filter(x => StringUtils.isNotEmpty(x(0).toString))
    //单独处理url,并转为Row对象
    val full_logs_rdd: RDD[Row] = filterLogs.map(
      line => {
        val str: String = line.getAs[String]("url")
        val paramsArray: Array[String] = str.split("\\?")
        var paramsMap: Map[String, String] = null
        if (paramsArray.length == 2) {
          val tuples: Array[(String, String)] = paramsArray(1).split("&")
            .map(x => x.split("="))
            .filter(x => x.length == 2)
            .map(x => (x(0), x(1)))
          paramsMap = tuples.toMap
        }
        (
          line.getAs[String]("event_time"),
          paramsMap.getOrElse[String]("userUID", ""),
          paramsMap.getOrElse[String]("userSID", ""),
          paramsMap.getOrElse[String]("actionBegin", ""),
          paramsMap.getOrElse[String]("actionEnd", ""),
          paramsMap.getOrElse[String]("actionType", ""),
          paramsMap.getOrElse[String]("actionName", ""),
          paramsMap.getOrElse[String]("actionValue", ""),
          paramsMap.getOrElse[String]("actionTest", ""),
          paramsMap.getOrElse[String]("ifEquipment", ""),
          line.getAs[String]("method"),
          line.getAs[String]("status"),
          line.getAs[String]("sip"),
          line.getAs[String]("user_uip"),
          line.getAs[String]("action_prepend"),
          line.getAs[String]("action_client")
        )
      }
    ).toDF().rdd

    //   frame.withColumnRenamed("_1","event_time").printSchema()
    
    //再次创建Schema
    val full_logs_schema: StructType = StructType(
      Array(
        StructField("event_time", StringType),
        StructField("userUID", StringType),
        StructField("userSID", StringType),
        StructField("actionBegin", StringType),
        StructField("actionEnd", StringType),
        StructField("actionType", StringType),
        StructField("actionName", StringType),
        StructField("actionValue", StringType),
        StructField("actionTest", StringType),
        StructField("ifEquipment", StringType),
        StructField("method", StringType),
        StructField("status", StringType),
        StructField("sip", StringType),
        StructField("user_uip", StringType),
        StructField("action_prepend", StringType),
        StructField("action_client", StringType),
      )
    )
    //再次创建DataFrame
    val full_logDF: DataFrame = spark.createDataFrame(full_logs_rdd, full_logs_schema)
    full_logDF.printSchema()
    full_logDF.show(2, true)

    //    filterLogs.write
    //    jdbcUtils.dataFrameToMysql(filterLogs, jdbcUtils.table_access_logs, 1)

    jdbcUtils.dataFrameToMysql( full_logDF, jdbcUtils.table_full_access_logs, 1)

    spark.close()
  }

2.2创建jdbcUtils来连接Mysql数据库

object jdbcUtils {
  val url = "jdbc:mysql://192.168.61.141:3306/jsondemo?createDatabaseIfNotExist=true"
  val driver = "com.mysql.cj.jdbc.Driver"
  val user = "root"
  val password = "root"

  val table_access_logs: String = "access_logs"
  val table_full_access_logs: String = "full_access_logs"
  val table_day_active:String="table_day_active"
  val table_retention:String="retention"

  val table_loading_json="loading_json"
  val table_ad_json="ad_json"
  val table_notification_json="notification_json"
  val table_active_background_json="active_background_json"
  val table_comment_json="comment_json"
  val table_praise_json="praise_json"

  val table_teacher_json="teacher_json"

  val properties = new Properties()
  properties.setProperty("user", jdbcUtils.user)
  properties.setProperty("password", jdbcUtils.password)
  properties.setProperty("driver", jdbcUtils.driver)

  def dataFrameToMysql(df: DataFrame, table: String, op: Int = 1): Unit = {
    if (op == 0) {
      df.write.mode(SaveMode.Append).jdbc(jdbcUtils.url, table, properties)
    } else {
      df.write.mode(SaveMode.Overwrite).jdbc(jdbcUtils.url, table, properties)
    }
  }

  def getDataFtameByTableName(spark:SparkSession,table:String):DataFrame={
    val frame: DataFrame = spark.read.jdbc(jdbcUtils.url, table, jdbcUtils.properties)
    frame
  }

}

2.3 运行后结果展示:

初次清理后的日志数据

 清理完url的数据

三、留存用户分析 

3.1需求概览

1.计算用户的次日留存率

  • 求当天新增用户总数n
  • 求当天新增的用户ID与次日登录的用户ID的交集,得出新增用户次日登录总数m (次日留存数)
  • m/n*100%

2.计算用户的次周留存率

3.2.代码实现

object Retention {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("retentionDemo").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val sc: SparkContext = spark.sparkContext

    import spark.implicits._
    import org.apache.spark.sql.functions._

    val logs: DataFrame = jdbcUtils.getDataFtameByTableName(spark, jdbcUtils.table_full_access_logs)
    logs.printSchema()
    logs.show(3, false)

    //    logs.createOrReplaceTempView("logs"):
    //    过滤出所有的事件为Registered的日志,并且修改事件时间(event_time)为注册时间(registered——time)
    //    找出注册用户id和注册时间
    val registered: DataFrame = logs.filter('actionName === "Registered")
      .withColumnRenamed("event_time", "register_time")
      .select("userUID", "register_time")
    registered.printSchema()
    registered.show(3, false)

    //    找出ActionName为Signin的日志数据
    val signin: DataFrame = logs.filter('actionName === "Signin")
      .withColumnRenamed("event_time", "signin_time")
      .select("userUID", "signin_time")
    signin.printSchema()
    signin.show(3, false)

    //    两个DF关联(几种写法和可能出现的问题)
    val joined: DataFrame = registered.join(signin, Seq("userUID"), "left")
    //    registered.join(signin,$"userUID","left")   显示是模棱两可的得先使两表userUID相等
    //    val joined2: DataFrame = registered.as("r1").join(signin.as("s1"), $"r1.userUID" === $"s1.userUID", "left")
    //    joined2.printSchema()
    //    joined2.show(3,false)  //会显示相同的id,在后续的操作中会有两个userUID,再次使用很难使用
    //    joined.printSchema()
    //    joined.show(3, false)

    val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
    val mydefDataformat: UserDefinedFunction = spark.udf.register("mydefDataformat", (event_time: String) => {
      if (StringUtils.isEmpty(event_time))
        0
      else
        dateFormat.parse(event_time).getTime
    })

    val joinedFrame: DataFrame = joined.withColumn("register_date", mydefDataformat($"register_time"))
      .withColumn("signin_date", mydefDataformat($"signin_time"))
    //      .drop("")
    joinedFrame.printSchema()
    joinedFrame.show(3, false)

    //    求出前一天注册,当天登录的用户数量,过滤注册时间加上86400000查询第二天登录的用户,filter操作==需要变成===
    val signinNumDF: DataFrame = joinedFrame.filter('register_date + 86400000 === 'signin_date)
      .groupBy($"register_date")
      .agg(countDistinct('userUID).as("signNum"))
    signinNumDF.printSchema()
    signinNumDF.show(3, false)

    //    求出当前注册用户的数量
    val registerNumDF: DataFrame = joinedFrame.groupBy('register_date)
      .agg(countDistinct("userUID").as("registerNum"))
    registerNumDF.printSchema()
    registerNumDF.show(3, false)

    //    求出留存率
    val joinRegisAndSigninDF: DataFrame = signinNumDF.join(registerNumDF, Seq("register_date"))

    joinRegisAndSigninDF.printSchema()
    joinRegisAndSigninDF.show(3, false)

    val resultRetention: DataFrame = joinRegisAndSigninDF.select('register_date, ('signNum / 'registerNum).as("percent"))
    resultRetention.show()

    jdbcUtils.dataFrameToMysql(resultRetention,jdbcUtils.table_retention,1)

    spark.close()
  }

}

3.3 运行后结果展示: 

 

四、活跃用户分析 

4.1需求概览

  1. 读取数据库,统计每天的活跃用户数
  2. 统计规则:有看课和买课行为的用户才属于活跃用户
  3. 对UID进行去重

4.2代码实现

object Active {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("activeDemo").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val sc: SparkContext = spark.sparkContext

    import spark.implicits._
    import org.apache.spark.sql.functions._

//  读取清洗后的日志数据并过滤出活跃用户
    val logs: DataFrame = jdbcUtils.getDataFtameByTableName(spark, jdbcUtils.table_full_access_logs)

    val ds: Dataset[Row] = logs.filter($"actionName" === "BuyCourse" || $"actionName" === "StartLearn")
    ds.printSchema()
    ds.show(3,false)
//  修改DataSet=>二元组
    val ds2: Dataset[(String, String)] = ds.map(x =>
      (
        x.getAs[String]("userSID"),
        x.getAs[String]("event_time").substring(0, 10)
      )
    )
    ds2.show()

//    按天进行聚合,求出活跃用户数并去重
    val frame: DataFrame = ds2.withColumnRenamed("_2", "date")
      .withColumnRenamed("_1", "userid")
      .groupBy($"date")
      .agg(countDistinct("userid").as("activeNum"))

    frame.printSchema()
    frame.show(3,false)
//   JdbcUtils中新增活跃用户变量
    jdbcUtils.dataFrameToMysql(frame,jdbcUtils.table_day_active,1)
    println("操作结束")
    spark.close()
  }

}

4.3 运行后结果展示:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/412009.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

港科夜闻|香港科大(广州)副校长吴宏伟教授与150余位高教界人士分享两会期间见闻及体会...

关注并星标每周阅读港科夜闻建立新视野 开启新思维1、香港科大(广州)副校长吴宏伟教授与150余位高教界人士分享两会期间见闻及体会。港专学院及香港高等教育评议会合办“港区高等教育界全国政协委员2023全国两会见闻分享”活动。吴宏伟教授在会上发言表示,全国两会令…

Apple Xcode 14.3 (14E222b) 正式版发布下载

Command Line Tools for Xcode 14, tvOS 16 & watchOS 9 Simulator Runtime 请访问原文链接:https://sysin.org/blog/apple-xcode-14/,查看最新版。原创作品,转载请保留出处。 作者主页:sysin.org Xcode 14 包含了在所有 Ap…

上海亚商投顾:沪指震荡反弹 游戏、传媒概念股再度大涨

上海亚商投顾前言:无惧大盘涨跌,解密龙虎榜资金,跟踪一线游资和机构资金动向,识别短期热点和强势个股。 市场情绪大小指数今日走势分化,沪指向上震荡反弹,创业板指一度跌近1%,黄白二线大幅背离。…

强大到让人无法想象的ChatGPT-5即将发布,上千名人士却紧急叫停

目录 【ChatGPT 5简介】 【ChatGPT 5的潜在应用】 【ChatGPT 5的潜在危险】 ChatGPT4还没有好好体验,比GPT4强大1000倍的ChatGPT5又即将发布!届时将彻底改变人工智能领域,并改变我们现有的世界 【ChatGPT 5简介】 OpenAI计划在2023年12月发…

接入丰桥,下单到打印面单到配送开发流程

顺丰开放平台地址:顺丰开放平台 一、了解官网 1.1、开发文档 开发接口之前可以先查看熟悉文档,尤其是API文档仔细阅读。进入之后是如下图,根据自己需要开发的接口,先查看文档,然后根据文档要求来请求并处理响应。 1.2…

在 Linux 上使用 Pigz 更快地压缩文件,真的快!

Pigz是一款快速压缩文件的工具,它能够使用多个CPU核心进行压缩,使得压缩速度得到了极大的提升。在本文中,我们将介绍如何在Linux上使用Pigz来更快地压缩文件。 安装Pigz 在开始使用Pigz之前,我们需要先安装它。在大多数Linux发行…

11. unity 物理系统和碰撞检测+射击游戏案例

1. 物理系统 也就是在游戏场景中添加日常的重力,碰撞等到物理属性 1.1 刚体组件(Rigidbody) 给模型添加刚体组件后,模型会具备一些物理属性,比如重力,速度,加速度等,在属性窗口中…

CSS基础知识点-01

【01】标准CSS盒模型和IE盒模型的区别 盒模型的区别在于设置width和height时,所对应的范围不同。标准盒模型宽高属性的范围只包含content,而IE盒模型的宽高属性范围包含了border、padding和content。 一般来说,我们可以通过修改元素的box-siz…

AcWing 245. 你能回答这些问题吗(线段树)

AcWing 245. 你能回答这些问题吗(线段树)一、题目二、分析1、节点定义2、函数分析(1)pushup函数(2)build函数(3)modify函数(4)query函数三、代码一、题目 Ac…

COM 对象析构函数是非常敏感的函数

如果你试图在 COM 对象的析构函数中做太多事情,你会发现自己有麻烦。 此话怎讲? 举个例子,如果析构函数将自身引用交给其他函数,则这些函数可能会决定调用 IUnknown::AddRef 和 IUnknown::Release 方法作为其内部操作的一部分。考察下面的…

解码行业新趋势:2023晶球益生菌与肠内营养健康高峰论坛圆满落幕

后疫情时代,国人自身健康管理意识日益提高,越来越多的人认识到到微生物组、营养吸收与免疫健康的密切联系,并逐渐认可微生态和肠内营养在临床应用过程中的积极作用,使得营养治疗研究成果进一步落地转化。消费升级新时代&#xff0…

项目6:实现数据字典的展示与缓存

项目6:实现数据字典的展示与缓存 1.数据字典如何展示? 2.前后端如何设计? 3.前端设计代码? 4.后端设计代码? 5.实现数据字典缓存到redis 项目6:实现数据字典的展示与缓存 1.数据字典如何展示&#xf…

WEB攻防-通用漏洞PHP反序列化POP链构造魔术方法原生类

目录 一、序列化和反序列化 二、为什么会出现反序列化漏洞 三、序列化和反序列化演示 <演示一> <演示二> <演示二> 四、漏洞出现演示 <演示一> <演示二> 四、ctfshow靶场真题实操 <真题一> <真题二> <真题三> &l…

C++入门到入土(一)

C语言中&#xff0c;我们这样定义&#xff0c;输出100。 因为局部作用域的访问权限大于全局作用域的。 当我们加入头文件#include <time.h>的时候&#xff0c;就会报错 看报错&#xff0c;我们也知道&#xff0c;time重定义&#xff0c;因为我们头文件time.h里面有time函…

验证码识别过程中切割图片的几种方案

目录 方案一&#xff1a;图片均分 方案二&#xff1a;寻找轮廓并截取 方案三&#xff1a;聚类算法 方案四&#xff1a;垂直投影法 源码下载 在用机器学习识别验证码的过程中&#xff0c;我们通常会选择把验证码中的各个字符切割出来然后单独识别&#xff0c;切割质量会直接…

JS Hook 基本使用

前言 Hook技术也叫钩子函数&#xff0c;功能是把网站的代码拉出来&#xff0c;改成我们自己想执行的代码片段&#xff0c;简单来说就是可以控制执行函数的入参和出参&#xff1b; 一、资源下载 编程猫插件&#xff1a;https://pan.baidu.com/s/1SP8xHoDpugssFRpu-nLxPw?pwdz…

ARM 编译器 Arm Compiler for Embedded 6 相关工具链简介

目录 1, Introduction to Arm Compiler 6 1.1 armclang 1.2 armasm 1.3 armlink 1.4 armar 1.5 fromelf 1.6 Arm C libraries 1.7 Arm C libraries 1,8 Application development &#xff0c;ARM程序开发流程 2&#xff0c;ARM 编译器 5和ARM 编译器 6的兼容性 3&…

Opencv项目实战:22 物体颜色识别并框选

目录 0、项目介绍 1、效果展示 2、项目搭建 3、项目代码展示与部分讲解 Color_trackbar.py bgr_detector.py test.py 4、项目资源 5、项目总结 0、项目介绍 本次项目要完成的是对物体颜色的识别并框选&#xff0c;有如下功能&#xff1a; &#xff08;1&#xff09;…

【权限提升】Linux Sudo权限提升漏洞(CVE-2023-22809)

文章目录前言一、Sudo介绍二、漏洞概述三、漏洞成因四、漏洞分析五、影响版本六、本地复现七、修复建议前言 Sudo存在权限提升漏洞,攻击者可过特定的payload获取服务器ROOT权限 一、Sudo介绍 sudo (su " do")允许系统管理员将权限委托给某些用户(或用户组),能够以…

网络安全与防御

1. 什么是IDS&#xff1f; IDS(入侵检测系统)&#xff1a;入侵检测是防火墙的合理补充&#xff0c;帮助系统对付网络攻击&#xff0c;扩展了系统管理员的安全管理能力&#xff0c;提高了信息安全基础结构的完整性。主要针对防火墙涉及不到的部分进行检测。 入侵检测主要面对的…