SparkStreaming与Kafka整合

news2025/9/20 19:47:01

1.3 SparkStreaming与Kafka整合

1.3.1 整合简述
kafka是做消息的缓存,数据和业务隔离操作的消息队列,而sparkstreaming是一款准实时流式计算框架,所以二者的整合,是大势所趋。
​
二者的整合,有主要的两大版本。

kafka作为一个实时的分布式消息队列,实时的生产和消费消息,在实际开发中Spark Streaming经常会结合Kafka来处理实时数据。Spark Streaming 与 kafka整合需要引入spark-streaming-kafka.jar,该jar根据kafka版本有2个分支,分别是spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10。jar包分支选择原则:

  • 0.10.0>kafka版本>=0.8.2.1,选择 08 接口

  • kafka版本>=0.10.0,选择 010 接口

sparkStreaming和Kafka整合一般两种方式:Receiver方式和Direct方式

Receiver方式(介绍)

Receiver方式基于kafka的高级消费者API实现(高级优点:高级API写起来简单;不需要去自行去管理offset,系统通过zookeeper自行管理;不需要管理分区,副本等情况,系统自动管理;消费者断线会自动根据上一次记录在 zookeeper中的offset去接着获取数据;高级缺点:不能自行控制 offset;不能细化控制如分区、副本、zk 等)。Receiver从kafka接收数据,存储在Executor中,Spark Streaming 定时生成任务来处理数据。

默认配置的情况,Receiver失败时有可能丢失数据。如果要保证数据的可靠性,需要开启预写式日志,简称WAL(Write Ahead Logs,Spark1.2引入),只有接收到的数据被持久化之后才会去更新Kafka中的消费位移。接收到的数据和WAL存储位置信息被可靠地存储,如果期间出现故障,这些信息被用来从错误中恢复,并继续处理数据。

还有几个需要注意的点:

  • 在Receiver的方式中,Spark中的 partition 和 kafka 中的 partition 并不是相关的,如果加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度;

  • 对于不同的 Group 和 Topic 可以使用多个 Receiver 创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream;

  • 如果启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是:KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

  • WAL将接收的数据备份到HDFS上,保证了数据的安全性。但写HDFS比较消耗性能,另外要在备份完数据之后还要写相关的元数据信息,这样总体上增加job的执行时间,增加了任务执行时间;

  • 总体上看 Receiver 方式,不适于生产环境;

1.3.2  Direct的方式
Direct方式从Spark1.3开始引入的,通过 KafkaUtils.createDirectStream 方法创建一个DStream对象,Direct方式的结构如下图所示。

Direct 方式特点如下:

  • 对应Kafka的版本 0.8.2.1+

  • Direct 方式

  • Offset 可自定义

  • 使用kafka低阶API

  • 底层实现为KafkaRDD

该方式中Kafka的一个分区与Spark RDD对应,通过定期扫描所订阅Kafka每个主题的每个分区的最新偏移量以确定当前批处理数据偏移范围。与Receiver方式相比,Direct方式不需要维护一份WAL数据,由Spark Streaming程序自己控制位移的处理,通常通过检查点机制处理消费位移,这样可以保证Kafka中的数据只会被Spark拉取一次

  • 引入依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    <version>3.1.2</version>
</dependency>
  • 模拟kafka生产数据

package com.qianfeng.sparkstreaming
​
import java.util.{Properties, Random}
​
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
​
/**
 * 向kafka中test主题模拟生产数据;;;也可以使用命令行生产:kafka-console-producer.sh --broker-list qianfeng01:9092,hadoop02:9092,hadoop03:9092 -topic test
 */
object Demo02_DataLoad2Kafka {
  def main(args: Array[String]): Unit = {
    val prop = new Properties()
    //提供Kafka服务器信息
    prop.put("bootstrap.servers","qianfeng01:9092")
    //指定响应的方式
    prop.put("acks","all")
    //请求失败重试的次数
    prop.put("retries","3")
    //指定key的序列化方式,key是用于存放数据对应的offset
    prop.put("key.serializer",
      "org.apache.kafka.common.serialization.StringSerializer")
    //指定value的序列化方式
    prop.put("value.serializer",
      "org.apache.kafka.common.serialization.StringSerializer")
    //创建producer对象
    val producer = new KafkaProducer[String,String](prop)
    //提供一个数组,数组中数据
    val arr = Array(
      "hello tom",
      "hello jerry",
      "hello dabao",
      "hello zhangsan",
      "hello lisi",
      "hello wangwu",
    )
    //提供一个随机数,随机获取数组中数据向kafka中进行发送存储
    val r = new Random()
    while(true){
      val message = arr(r.nextInt(arr.length))
      producer.send(new ProducerRecord[String,String]("test",message))
      Thread.sleep(r.nextInt(1000))   //休眠1s以内
    }
  }
}
  • 实时消费kafka数据

package com.qianfeng.sparkstreaming
​
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
​
​
/**
 * sparkStreaming消费Kafka中的数据
 */
object Demo03_SparkStreamingWithKafka {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf对象
    val conf = new SparkConf()
      .setAppName("SparkStreamingToKafka")
      .setMaster("local[*]")
    //2.提供批次时间
    val time = Seconds(5)
    //3.提供StreamingContext对象
    val sc = new StreamingContext(conf, time)
    //4.提供Kafka配置参数
    val kafkaConfig = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "qianfeng01:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "qianfeng",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
    )
    //5.读取Kafka中数据信息生成DStream
    val value = KafkaUtils.createDirectStream(sc,
      //本地化策略:将Kafka的分区数据均匀的分配到各个执行Executor中
      LocationStrategies.PreferConsistent,
      //表示要从使用kafka进行消费【offset谁来管理,从那个位置开始消费数据】
      ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaConfig)
    )
    //6.将每条消息kv获取出来
    val line: DStream[String] = value.map(record => record.value())
    //7.开始计算操作
    line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
    //line.count().print()   //每隔5s的数据条数
    //8.开始任务
    sc.start()
    sc.awaitTermination()
  }
}
  • 说明

    1. 简化的并行性:不需要创建多个输入Kafka流并将其合并。 使用directStream,Spark Streaming将创建与使用Kafka分区一样多的RDD分区,这些分区将全部从Kafka并行读取数据。 所以在Kafka和RDD分区之间有一对一的映射关系。

    2. 效率:在第一种方法中实现零数据丢失需要将数据存储在预写日志中,这会进一步复制数据。这实际上是效率低下的,因为数据被有效地复制了两次:一次是Kafka,另一次是由预先写入日志(WriteAhead Log)复制。这个第二种方法消除了这个问题,因为没有接收器,因此不需要预先写入日志。只要Kafka数据保留时间足够长。

    3. 正好一次(Exactly-once)的语义:第一种方法使用Kafka的高级API来在Zookeeper中存储消耗的偏移量。传统上这是从Kafka消费数据的方式。虽然这种方法(结合提前写入日志)可以确保零数据丢失(即至少一次语义),但是在某些失败情况下,有一些记录可能会消费两次。发生这种情况是因为Spark Streaming可靠接收到的数据与Zookeeper跟踪的偏移之间的不一致。因此,在第二种方法中,我们使用不使用Zookeeper的简单Kafka API。在其检查点内,Spark Streaming跟踪偏移量。这消除了Spark Streaming和Zookeeper/Kafka之间的不一致,因此Spark Streaming每次记录都会在发生故障的情况下有效地收到一次。为了实现输出结果的一次语义,将数据保存到外部数据存储区的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务。

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1346276.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ARM CCA机密计算软件架构之软件堆栈概述

Arm CCA平台通过硬件添加和固件组件的混合方式实现,例如在处理元素(PEs)中的RME以及特定的固件组件,特别是监视器和领域管理监视器。本节介绍Arm CCA平台的软件堆栈。 软件堆栈概述 领域VM的执行旨在与Normal world(正常世界)隔离,领域VM由Normal world Host(正常世界…

javaweb基础2.0 (持续更新中)

Day 10 : Responbody 将响应的实体类转为json发送给前端。RequestBody将request的json转为实体类给后端 修改未成功的可能是因为根据id 查询的时候&#xff0c;只查询了name字段&#xff0c;因为后面需要根据id 修改&#xff0c;所以还需查询id &#xff0c;不然前端不知道id也…

【网络安全】有趣的基础知识

背景 逐条记录网络安全学习中有趣的内容和知识。 内容 CNNIC&#xff08;中国互联网络信息中心&#xff09;是中国国家域名.cn的管理组织。中国互联网络信息中心于1997年6月3日组建&#xff0c;现为工业和信息化部 直属事业单位 &#xff0c;行使国家互联网络信息中心职责。…

lvs+keepalived+nginx实现四层负载+七层负载

目录 一、lvs配置 二、nginx配置 三、测试 3.1 keepalived负载均衡 3.2 lvskeepalived高可用 3.3 nginx高可用 主机IPlvs01-33 11.0.1.33 lvs02-3411.0.1.34nginx0111.0.1.31nginx0211.0.1.32VIP11.0.1.30 4台主机主机添加host [rootnginx01 sbin]# cat /etc/hosts 127.0.0.…

校园外卖趋势洞察与未来预测系统

校园外卖趋势洞察与未来预测系统 项目概述数据集技术栈功能特点创新点实施步骤 项目概述 本项目旨在打造一套基于Flask和Echarts的校园外卖趋势洞察与未来预测系统。通过深度分析模拟校园外卖数据集&#xff0c;结合机器学习算法&#xff0c;我们致力于提供对学生外卖点餐规律…

[LitCTF 2023]Vim yyds

[LitCTF 2023]Vim yyds wp 题目页面如下&#xff1a; 搜索一番&#xff0c;没有发现任何信息。题目描述中说到了源码泄露&#xff0c;那么先进行目录扫描。 dirsearch 目录扫描 命令&#xff1a; dirsearch -u "http://node4.anna.nssctf.cn:28588/"返回结果&…

微服务系列之分布式事务理论

概述 事务是由一组操作构成的可靠的独立的工作单元&#xff0c;事务具备ACID的特性&#xff0c;即原子性、一致性、隔离性和持久性。 分类 大多数情况下&#xff0c;分类是没有意义的一件事。但是分类可以一定程度上&#xff0c;加深理解。 实现 从实现角度来看&#xff0…

前端Vue中自定义Popup弹框、按钮及内容的设计与实践

标题&#xff1a;前端Vue中自定义Popup弹框、按钮及内容的设计与实践 一、引言 在Web前端开发中&#xff0c;弹框&#xff08;Popup&#xff09;是一种常见的用户界面元素&#xff0c;用于向用户显示额外的信息或提供额外的功能。然而&#xff0c;标准的弹框往往不能满足所有…

迅软科技助力高科技防泄密:从华为事件中汲取经验教训

近期&#xff0c;涉及华为芯片技术被窃一事引起广泛关注。据报道&#xff0c;华为海思的两个高管张某、刘某离职后成立尊湃通讯&#xff0c;然后以支付高薪、股权支付等方式&#xff0c;诱导多名海思研发人员跳槽其公司&#xff0c;并指使这些人员在离职前通过摘抄、截屏等方式…

跨域请求:Go语言下的“通天大道”

开场白&#xff1a;嘿&#xff0c;各位Go语言的爱好者们&#xff0c;你们是否曾经遇到过这样的困扰&#xff1a;当你的Go应用试图与另一个域的API进行交流时&#xff0c;突然跳出一个“未允许的跨域请求”的警告&#xff1f;别担心&#xff0c;今天&#xff0c;我们将一起在这条…

ROS学习记录:用C++实现对wpr_simulation软件包中机器人的运动控制

一、在工作空间下输入catkin_make进行编译 二、在工作空间中输入source ./devel/setup.bash后回车 三、机器人的运动控制在wpr_simulation中有一个例子程序&#xff0c;在工作空间中输入&#xff1a; roslaunch wpr_simulation wpb_simple.launch后回车 四、就会启动一个仿真环…

【数学建模美赛M奖速成系列】Matplotlib绘图技巧(三)

Matplotlib绘图技巧&#xff08;三&#xff09; 写在前面7. 雷达图7.1 圆形雷达图7.2 多边形雷达图 8. 极坐标图 subplot9. 折线图 plot10. 灰度图 meshgrid11. 热力图11.1 自定义colormap 12. 箱线图 boxplot 写在前面 终于更新完Matplotlib绘图技巧的全部内容&#xff0c;有…

如何用Python批量计算Word中的算式

一、问题的提出 到了期末&#xff0c;大家都在忙着写总结、改试卷、算工作量&#xff0c;写总结可以借助于ChatGPT&#xff0c;改试卷可以用星火的自动批阅功能&#xff0c;算工作量就是一项比较棘手的问题&#xff0c;因为它涉及很多算式&#xff0c;有时需要老师用计算器算来…

我在Vscode学OpenCV 图像处理四(轮廓查找 cv2.findContours() cv2.drawContours())-- 待补充

图像处理四&#xff08;轮廓查找&#xff09; 一、前言1.1 边缘检测和轮廓查找的区别是什么1.1.1 边缘检测&#xff1a;1.1.2 轮廓查找&#xff1a; 1.2 边缘检测和轮廓查找在图像处理中的关系和流程 二、查找并绘制轮廓2.1 cv2.findContours()&#xff1a;2.1.1 详细介绍&…

x-cmd pkg | openssl - 密码学开源工具集

目录 简介首次用户技术特点竞品分析进一步阅读 简介 OpenSSL 是一个开源的密码库和 SSL/TLS 协议实现&#xff0c;它提供了一组密码学工具和加密功能&#xff0c;用于保护数据通信的安全性。项目发展历史可以追溯到 1998 年&#xff0c;源自 Eric A. Young 和 Tim J. Hudson 开…

计算机毕业设计------ssm茶叶溯源系统

项目介绍 茶叶溯源系统&#xff0c;分为前台与后台。普通用户可在前台通过18位的编码查询茶叶的出售历史。 后台分为两种角色&#xff0c;管理员与经销商&#xff1b; 管理员主要功能包括&#xff1a; 主界面&#xff1b; 管理员管理&#xff1a;管理员列表、添加管理员&am…

软件测试/测试开发丨Python 内置装饰器 学习笔记

内置类装饰器 不用实例化、直接调用提升代码的可读性 内置装饰器含义classmethod类方法staticmethod静态方法 普通方法 定义&#xff1a; 第一个参数为self&#xff0c;代表 实例本身 调用&#xff1a; 要有实例化的过程&#xff0c;通过 实例对象.方法名 调用 # 1. 定义 c…

自定义docker镜像,ubuntu安装命令并导出

文章目录 问题现象解决步骤相关命令详细介绍docker save 与 docker loaddocker import 与 docker exportdocker commit 问题现象 我们的通讯服务&#xff0c;需要监测前端设备的在线情况&#xff08;是否在线、丢包率、延迟等&#xff09;&#xff0c;使用ping命令去实现此功能…

unity学习笔记----游戏练习03

一、修复植物种植的问题 1.当手上存在植物时&#xff0c;再次点击卡片上的植物就会在手上添加新的植物&#xff0c;需要修改成只有手上没有植物时才能再次获取到植物。需要修改AddPlant方法。 public bool AddPlant(PlantType plantType) { //防止手上出现多个植…

Python pandas 操作 excel 详解

文章目录 1 概述1.1 pandas 和 openpyxl 区别1.2 Series 和 DataFrame 2 常用操作2.1 创建 Excel&#xff1a;to_excel()2.2 读取 Excel&#xff1a;read_excel()2.2.1 header&#xff1a;标题的行索引2.2.2 index_col&#xff1a;索引列2.2.3 dtype&#xff1a;数据类型2.2.4 …