【Spark分析HBase数据】Spark读取并分析HBase数据

news2025/5/16 6:24:46

Spark读取并分析HBase数据

  • 一、摘要
  • 二、实现过程
  • 三、小结

一、摘要

Apache Spark 是一个快速、通用的大数据处理引擎,提供了丰富的 API 用于数据处理和分析。HBase 是一个分布式、可扩展的 NoSQL 数据库,适合存储海量结构化和半结构化数据。Spark 与 HBase 的结合可以充分发挥两者的优势,实现高效的数据处理和分析。
Spark 可以通过 HBase 的 Java API 或者专用的连接器来读取 HBase 中的数据。在读取数据时,Spark 可以将 HBase 表中的数据转换为 RDD(弹性分布式数据集)或者 DataFrame,然后利用 Spark 的各种操作进行数据处理和分析。
本文以Spark2.3.2读取HBase1.4.8中的hbase_emp_table表数据进行简单分析,用户实现相关的业务逻辑。

二、实现过程

  1. 在IDEA创建工程SparkReadHBaseData
  2. 在pom.xml文件中添加依赖
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.3.3</spark.version>
        <hbase.version>1.4.8</hbase.version>
    </properties>
    
    <dependencies>
        <!-- Spark 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    
        <!-- HBase 依赖 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-hadoop-compat</artifactId>
            <version>${hbase.version}</version>
        </dependency>
    
        <!-- Hadoop 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
            <scope>provided</scope>
        </dependency>
    
        <!-- 处理依赖冲突 -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>12.0.1</version>
        </dependency>
    
        <!-- 使用scala2.11.8进行编译和打包 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
    
    </dependencies>
    
    <build>
        <!-- 指定scala源代码所在的目录 -->
        <sourceDirectory>src/main/scala</sourceDirectory>
    
        <plugins>
    
            <!-- Scala 编译插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
    
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <archive>
                        <!-- 项目中有多个主类时,采用不指定主类规避pom中只能配置一个主类的问题 -->
                        <manifest/>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
  3. 新建com.lpssfxy的package
    在这里插入图片描述
  4. 在该package下新建名为SparkReadHBaseData的Object,编写程序实现业务逻辑:
    import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
    import org.apache.hadoop.hbase.client.{ConnectionFactory, Scan}
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.spark.sql.{SparkSession}
    
    /**
     * Employee样例类
     *
     * @param empNo
     * @param eName
     * @param job
     * @param mgr
     * @param hireDate
     * @param salary
     * @param comm
     * @param deptNo
     */
    case class Employee(empNo: Int, eName: String, job: String, mgr: Int, hireDate: String, salary: Double, comm: Double, deptNo: Int)
    
    
    object SparkReadHBaseData {
    
      private val TABLE_NAME = "hbase_emp_table"
      private val INFO_CF = "info"
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName("SparkHBaseIntegration")
          .master("local[*]")
          .getOrCreate()
    
        val conf = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.quorum", "s1,s2,s3")
        conf.set("hbase.zookeeper.property.clientPort", "2181")
    
        val connection = ConnectionFactory.createConnection(conf)
        val table = connection.getTable(TableName.valueOf(TABLE_NAME))
    
        val scan = new Scan()
        scan.addFamily(Bytes.toBytes(INFO_CF))
    
        // 扫描 HBase 表并转换为 RDD
        val results = table.getScanner(scan)
        val data = Iterator.continually(results.next()).takeWhile(_ != null).map { result =>
          val rowKey = Bytes.toString(result.getRow())
          val eName = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("ename")))
          val job = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("job")))
          val mgrString = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("mgr")))
          var mgr: Int = 0
          if (!"".equals(mgrString) && null != mgrString) {
            mgr = mgrString.toInt
          }
          val hireDate = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("hiredate")))
          val salary = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("sal")))
          val commString = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("comm")))
          var comm: Double = 0
          if (!"".equals(commString) && null != commString) {
            comm = commString.toDouble
          }
          val deptNo = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("deptno")))
          (rowKey.toInt, eName, job, mgr, hireDate, salary.toDouble, comm, deptNo.toInt)
        }.toList
    
        // 转换为 DataFrame
        import spark.implicits._
        val df = spark.sparkContext.parallelize(data).map(item => {
          Employee(item._1, item._2, item._3, item._4, item._5, item._6, item._7, item._8)
        }).toDF()
    
        // 将df注册成临时表
        df.createOrReplaceTempView("emp")
        // 需求1:统计各个部门总支出
        val totalExpense = spark.sql("select deptNo,sum(salary) as total from emp group by deptNo order by total desc")
        totalExpense.show()
        // 需求2: 统计各个部门总的支出(包括工资和奖金),并按照总支出升序排
    	val totalExpense2 = spark.sql("select deptNo,sum(salary + comm) as total from emp group by deptNo order by total")
    	totalExpense2.show()
    	// TODO:需求3-结合dept部门表来实现多表关联查询,请同学自行实现
    
        // 关闭连接
        connection.close()
        // 停止spark,释放资源
        spark.stop()
      }
    }
    
  5. 为了没有大量无关日志输出,在resources目录下新建log4j.properties,添加如下内容:
    log4j.rootLogger=ERROR,stdout
    # write to stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
    
  6. 启动虚拟机中的hdfs、zookeeper和hbase
    start-dfs.sh
    zkServer.sh start
    start-hbase.sh
    
  7. 运行代码,查看执行结果
    在这里插入图片描述

三、小结

  1. 本实验仅仅演示Spark读取HBase表数据并简单分析的过程,可以作为复杂的业务逻辑分析的基础。
  2. Spark 读取并分析 HBase 数据具有高性能、丰富的数据分析功能、可扩展性、灵活性和实时性等优势。然而,也存在数据一致性、复杂的配置和管理、资源消耗和兼容性等不足。在实际应用中,需要根据具体的需求和场景来选择是否使用 Spark 和 HBase 的组合,并注意解决可能出现的问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2376629.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode2934. 最大化数组末位元素的最少操作次数-medium

1 题目&#xff1a;最大化数组末位元素的最少操作次数 官方标定难度&#xff1a;中 给你两个下标从 0 开始的整数数组 nums1 和 nums2 &#xff0c;这两个数组的长度都是 n 。 你可以执行一系列 操作&#xff08;可能不执行&#xff09;。 在每次操作中&#xff0c;你可以选…

环境配置与MySQL简介

目录 1 环境配置 2 MySQL简介 1 环境配置 本专栏使用CentOS7进行讲解。首先我们查看系统中是否已经安装了MySQL&#xff0c;可以使用rpm -qa 命令查看系统安装包/压缩包 列表 这只是看我们是否下载过对应安装包&#xff0c;不一定就安装了。如果我们需要重新下载&#xff0c;…

适用于 iOS 的 开源Ultralytics YOLO:应用程序和 Swift 软件包,用于在您自己的 iOS 应用程序中运行 YOLO

​一、软件介绍 文末提供程序和源码下载 该项目利用 Ultralytics 最先进的 YOLO11 模型将您的 iOS 设备转变为用于对象检测的强大实时推理工具。直接从 App Store 下载该应用程序&#xff0c;或浏览我们的指南&#xff0c;将 YOLO 功能集成到您自己的 Swift 应用程序中。 二、…

Java零基础学习Day12——集合ArrayList

一、基本使用 1. 集合与数组 集合只存引用数据类型&#xff1b;长度可变 数组可存基本数据类型、引用数据类型&#xff1b;长度固定 2. 基本格式 ArrayList<String> list new ArrayList<>(); 3. 方法 增、删 import java.util.ArrayList; public class St…

[论文阅读]Formalizing and Benchmarking Prompt Injection Attacks and Defenses

Formalizing and Benchmarking Prompt Injection Attacks and Defenses Formalizing and Benchmarking Prompt Injection Attacks and Defenses | USENIX 33rd USENIX Security Symposium (USENIX Security 24) 提出了一个框架来形式化提示注入攻击&#xff0c;对提示注入攻击…

JavaScript性能优化实战,从理论到落地的全面指南

在前端开发领域&#xff0c;JavaScript的性能优化是提升用户体验的核心环节。随着Web应用复杂度的提升&#xff0c;开发者面临的性能瓶颈也日益多样化。本文将从理论分析、代码实践和工具使用三个维度&#xff0c;系统性地讲解JavaScript性能优化的实战技巧&#xff0c;并通过大…

MySQL 8.0 OCP 英文题库解析(三)

Oracle 为庆祝 MySQL 30 周年&#xff0c;截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始&#xff0c;将英文题库免费公布出来&#xff0c;并进行解析&#xff0c;帮助大家在一个月之内轻松通过OCP认证。 本期公布试题16~25 试题16:…

Docker容器启动失败?无法启动?

Docker容器无法启动的疑难杂症解析与解决方案 一、问题现象 Docker容器无法启动是开发者在容器化部署中最常见的故障之一。尽管Docker提供了丰富的调试工具&#xff0c;但问题的根源往往隐藏在复杂的配置、环境依赖或资源限制中。本文将从环境变量配置错误这一细节问题入手&am…

MySQL 数据类型全面指南:从理论到实践

在数据库设计和开发中&#xff0c;数据类型的选择是构建高效、可靠系统的基石。MySQL作为最流行的关系型数据库之一&#xff0c;提供了丰富的数据类型以满足各种数据存储需求。本文将全面介绍MySQL的数据类型体系&#xff0c;通过理论讲解和实际示例&#xff0c;帮助开发者做出…

uniapp(微信小程序)>关于父子组件的样式传递问题(自定义组件样式穿透)

在父组件中给子组件添加类名,子组件的样式由父组件决定 由于"微信小程序"存在【样式隔离机制】&#xff0c;且默认设置为isolated(启用样式隔离)&#xff0c;因此这里给出以下两种解决方案&#xff1a; // 小程序编译机制 1. 当 <style scoped> 存在时&#…

【HCIA】BFD

前言 前面我们介绍了浮动路由以及出口路由器的默认路由配置&#xff0c;可如此配置会存在隐患&#xff0c;就是出口路由器直连的网络设备并不是运营商的路由器&#xff0c;而是交换机。此时我们就需要感知路由器的存活状态&#xff0c;这就需要用到 BFD&#xff08;Bidirectio…

计算机视觉最不卷的方向:三维重建学习路线梳理

提到计算机视觉&#xff08;CV&#xff09;&#xff0c;大多数人脑海中会立马浮现出一个字&#xff1a;“卷”。卷到什么程度呢&#xff1f;2022年秋招CV工程师岗位数下降了16%&#xff0c;但求职人数增加了23%&#xff0c;求职人数与招聘岗位的比例达到了恐怖的15:1&#xff0…

android抓包踩坑记录

​ 由于需要公司业务需求&#xff0c;需要抓取APP中摄像机插件的网络包&#xff0c;踩了两天坑&#xff0c;这里做个总结吧。 事先准备 android-studio emulatesdk 需要android模拟器和adb调试工具。如果已经有其他模拟器的话&#xff0c;可以只安装adb调试工具即可 mitmproxy…

Webpack其他插件

安装html打包插件 const path require(path); const HtmlWebpackPlugin require(html-webpack-plugin) module.exports {entry: path.resolve(__dirname,src/login/index.js),output: {path: path.resolve(__dirname, dist),filename: ./login/index.js,clean:true},Plugin:…

Python Matplotlib 库【绘图基础库】全面解析

让AI成为我们的得力助手&#xff1a;《用Cursor玩转AI辅助编程——不写代码也能做软件开发》 一、发展历程 Matplotlib 由 John D. Hunter 于 2003 年创建&#xff0c;灵感来源于 MATLAB 的绘图系统。作为 Python 生态中最早的可视化工具之一&#xff0c;它逐渐成为科学计算领…

C++ string数据查找、string数据替换、string子串获取

string查找示例见下&#xff0c;代码见下&#xff0c;以及对应运行结果见下&#xff1a; #include<iostream>using namespace std;int main() {// 1string s1 "hellooooworld";cout << s1.find("oooo") << endl;// 2cout << (in…

关于甲骨文(oracle cloud)丢失MFA的解决方案

前两年&#xff0c;申请了一个招商的多币种信用卡&#xff0c;然后就从网上撸了一个oracle的免费1h1g的服务器。 用了一段时间&#xff0c;人家要启用MFA验证。 啥叫MFA验证&#xff0c;类似与短信验证吧&#xff0c;就是绑定一个手机&#xff0c;然后下载一个app&#xff0c;每…

【网络编程】七、详解HTTP 搭建HTTP服务器

文章目录 Ⅰ. HTTP协议的由来 -- 万维网Ⅱ. 认识URL1、URL的格式协议方案名登录信息 -- 忽略服务器地址服务器端口号文件路径查询字符串片段标识符 2、URL的编码和解码 Ⅲ. HTTP的报文结构1、请求协议格式2、响应协议格式&#x1f38f; 写代码的时候&#xff0c;怎么保证请求和…

[Java实战]Spring Boot 快速配置 HTTPS 并实现 HTTP 自动跳转(八)

[Java实战]Spring Boot 快速配置 HTTPS 并实现 HTTP 自动跳转(八) 引言 在当今网络安全威胁日益严峻的背景下&#xff0c;为 Web 应用启用 HTTPS 已成为基本要求。Spring Boot 提供了简单高效的方式集成 HTTPS 支持&#xff0c;无论是开发环境测试还是生产环境部署&#xff0…

CVPR计算机视觉顶会论文解读:IPC-Dehaze 如何解决真实场景去雾难题

【CVPR 2025】迭代预测-评判编解码网络&#xff1a;突破真实场景去雾的极限 摘要 本文提出了一种名为IPC-Dehaze的创新去雾方法&#xff0c;通过迭代预测-评判框架和码本解码机制&#xff0c;有效解决了现有去雾算法在复杂场景下的性能瓶颈。该方法在多个基准测试中取得了SOT…