【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)

news2025/7/15 19:26:51

需要源码请点赞关注收藏后评论区留言私信~~~

Flume、Kafka区别和侧重点

1)Kafka 是一个非常通用的系统,你可以有许多生产者和消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase等发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。如果数据被多个系统消费的话,使用kafka;如果数据有多个生产者场景,或者有写入Hbase、HDFS操作,使用Flume。

2)Flume可以使用拦截器实时处理数据。而Kafka需要外部的流处理系统才能做到。

3)Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果你需要一个高可靠行的管道,那么使用Kafka是个更好的选择。

Spark Streaming与Flume、Kafka整合与开发

此开发示例的功能是商品实时交易数据统计分析,通过Flume实时收集交易订单,将数据分发Kafka,Kafka将数据传输到Spark Streaming,Spark Streaming统计商品的销售量。实现主要有以下几个步骤:

1)通过LOG日志模拟产生实时交易数据

2)Flume收集模拟产生实时交易数据

3)Flume将数据发送给Kafka消息队列

4)Spark Streaming接收Kafka消息队列的消息,每5秒进行数据统计

具体实现如下:

1)新建MAVEN项目,名称为RealtimeAnalysis,新建过程请见第9章。在POM.XML文件中加入依赖包

2)在工程的resource目录下新建log4j.properties文件,其中注意的是log4j.appender.flume.Hostname的配置,要配置成你安装flume的服务器

3)在工程的test目录下新建java类LoggerGenerator,此类用于不断模拟产生订单交易数据,在此类中每6秒调用一次PaymentInfo交易的实体类的random方法是模拟产生订单交易数据方法,数据以JSON格式返回。其中PaymentInfo是交易的实体类,用三个成员变量,分别是订单编号、商品编号、商品价格,LoggerGenerator为模拟日志生成类 

4)在安装Flume服务器的conf目录下新建文件log4j_flume.properties,其中注意的是sinks.kafka_sink.brokerList配置的是连接Kafka集群的地址和端口号

5)启动flume,命令如下:

./kafka-server-start.sh /hadoop/kafka_2.11-2.4.1/config/server.properties &

6)新建topic,名称为 logtoflume,命令如下:

kafka-topics.sh
  --zookeeper 172.16.106.69:2181,172.16.106.70:2181,172.16.106.71:2181
 --topic logtoflume --replication-factor 1 --partitions 1  --create

7)新建scala类KafkaConsumerMsg,接收kafka下的topic队列,名称为logtoflume的数据,并做统计

8)启动LoggerGenerator不断模拟产生订单交易数据,运行效果如下:

 9)启动KafkaConsumerMsg接收kafka下的topic队列的数据,并做统计,运行效果如下:

部分代码如下

import com.alibaba.fastjson.JSONObject;

import java.util.Random;
import java.util.UUID;
public class PaymentInfo {
    private static final long serialVersionUID = 1L;
    private String orderId;//订单编号
    private String productId;//商品编号
    private long productPrice;//商品价格
    public PaymentInfo() {
    }
    public static long getSerialVersionUID() {
        return serialVersionUID;
    }
    public String getOrderId() {
        return orderId;
    }
    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }
    public String getProductId() {
        return productId;
    }
    public void setProductId(String productId) {
        this.productId = productId;
    }
    public long getProductPrice() {
        return productPrice;
    }
    public void setProductPrice(long productPrice) {
        this.productPrice = productPrice;
    }
    @Override
    public String toString() {
        return "PaymentInfo{" +
                "orderId='" + orderId + '\'' +
                ", productId='" + productId + '\'' +
                ", productPrice=" + productPrice +
                '}';
    }
    //模拟订单数据
    public String random() {
        Random r = new Random();
        this.orderId = UUID.randomUUID().toString().replaceAll( "-", "" );
        this.productPrice = r.nextInt( 1000 );
        this.productId = r.nextInt( 10 ) + "";
        JSONObject obj = new JSONObject();
        String jsonString = obj.toJSONString( this );
        return jsonString;
    }
}

创作不易 觉得有帮助请点赞关注收藏~~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/105558.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2022年我国江蓠行业现状:养殖面积、产量不断增长 进口量仍大于出口

根据观研报告网发布的《中国江蓠市场现状深度研究与发展前景预测报告(2022-2029年)》显示,江蓠属于“海藻”产业,为暖水性藻类,我国俗称 “龙须菜”、 “海菜”、 “蚝菜”。藻体紫褐色或紫黄色、绿色。 江蓠在热带、 …

Opencv(C++)笔记--霍夫变换检测直线、霍夫变换检测圆

目录 1--原理 2--Opencv API 3--实例代码 4--霍夫变换检测圆 1--原理 具体原理可参考 博客1 和 视频讲解1; 霍夫变换检测直线的核心思想是:在笛卡尔坐标系下,一条直线(两个点(x1, y1)和(x2,…

行业权威来揭秘,商用PC为什么首选12代酷睿

第12代酷睿处理器可以提供更卓越的性能,凭借架构先进性让商用台式机和笔记本电脑为用户带来更好的体验,帮助企业和员工效率倍增。 作者|九月 来源| PConline 想要让办公效率进一步提升,一台强大的PC设备是必不可少的生产力和内容创作工…

有什么适合零基础的人做的副业兼职

互联网上有很多套路。这是不可预防的。只要你敢贪婪,你就会陷入别人设计的陷阱。在业余时间做兼职应该是很多人的梦想,因为他们可以在有限的时间内赚更多的钱。很多人不知道的是,其实我们赚钱的渠道很多:比如网上发文章.短视频直播.我们媒体、…

基于SpringBoot+Mybatis框架的私人影院预约系统(附源码,包含数据库文件)

基于SpringBootMybatis框架的私人影院预约系统,附源码,包含数据库文件。 非常完整的一个项目,希望能对大家有帮助哈。 本系统的完整源码以及数据库文件都在文章结尾处,大家自行获取即可。 项目简介 该项目设计了基于SpringBoo…

Spring MVC—Spring MVC概述

文章目录Java web的发展历史一.Model I和Model II1.Model I开发模式2.Model II开发模式二. MVC模式SpringMVC 的工作原理和流程springmvc 的拦截器Spring和SpringMVC的区别————————————————————————————————Java web的发展历史 一.Model I和M…

VS Code debug调试时无法查看变量内容【已解决】

问题场景:新换成的vscode编译软件,但是在debug调试时发现与QtCreator不同,无法直接查看变量,显示的都是地址或其他。 比如:QString或QStringList无法查看具体的内容,正常是这样显示的,反正我不…

Linux神器——vim

目录 一、vim基本概念 二、vim基本操作 三、vim正常模式命令集 四、vim末行模式命令集 五、vim操作总结 六、vim界面配置 vi/vim的区别简单点来说,它们都是多模式编辑器,不同的是vim是vi的升级版本,它不仅兼容vi的所有指令,而…

上班15年后,普通程序员能实现财富自由吗?

对于职业生涯还没有开挂的普通程序员来说,有可能实现财务自由吗? 先来说下财务自由的最低标准 北上广深:身价3000万,含房产1000万、现金2000万。 杭州、南京、成都等二线城市:身价1500万,含500万房产、现…

集成底座双K8S集群扩展升级方案

集成底座方案是应用于企业信息化建设的集成整合阶段,通过建立统一、标准、柔性、可复用、可扩展的IT架构,解决企业信息化建设过程中缺乏整体规划、集成整合难度大、安全管控不到位等问题,强化企业信息化的架构建设、集成整合、数据治理、安全…

某鱼兼职并不是那么好做,钱也不是漫天要价

文章目录一、背景二、雇主的期望2.1、jinja2代码三、题主的期望3.1、删除功能3.2、前端体现3.3、留言列表实现降序3.4、效果显示四、总结一、背景 上周某鱼推送过来的单子多到题主应接不暇,不得已拒绝了几单,但是接下来的单子呢又不那么顺利,…

提速3.7倍!何恺明团队再发新作,更快更高效的FLIP模型:通过Masking扩展语言-图像预训练(附论文原文下载)

原创/文 BFT机器人 研究论文地址:https://arxiv.org/abs/2212.00794 计算机视觉和深度学习领域大神何恺明携团队再发新作!论文围绕近来火热的CLIP(Contrastive Language-Image Pre-Training)模型展开研究,并提出了一种…

Python怎么进行时区的转换

pytz 是一个用于处理时区的 Python 库,它为 Python 提供了对时区的支持。 它提供了大量的时区信息,包括时区名称、偏移量、是否使用夏令时等。你可以使用 pytz 库来处理本地时间、UTC 时间和其他时区之间的转换。 它提供了许多函数来帮助你处理时区相关的信息。 …

C++之多态(中篇)(最全总结)

这里接上面C之多态(上篇) 本篇目录4.多态的原理4.2 多态的原理4.3 C 11 override和final4.4 重载、重写(覆盖)、隐藏(重定义)的对比 (函数之间的关系)5.抽象类5.1概念5.2接口继承和实…

三、基于kubeadm安装kubernetes1.25集群第二篇

在上一篇中我们已经安装kubernetes要求做了服务器初始化,看这篇之前,建议先看下上篇:https://blog.csdn.net/u011837804/article/details/128350651 那我们正式开始kubernetes1.26集群安装 1、每台机器安装docker20.10.22 docker的安装细节…

数据结构训练营4

开启蓝桥杯备战计划,每日练习算法一题!!坚持下去,想必下一年的蓝桥杯将会有你!!笔者是在力扣上面进行的刷题!!由于是第一次刷题!找到的题目也不咋样!所以&…

itop-imx8m开发板gstreamer日志级别设置

gst 的日志等级分为 none(0)error(1) warning(2) info(3) debug(4) log(5)。默认 gst 的日志等级为 1,即 error 打印,出错时会打印。 1)全局日志级别设置 如果需要更高级别打印,修改环境变量 GST_DEBUG 即可。如需要 warning 级别…

基于python的transform行人车辆识别

Transformer是一种神经网络体系结构,由于它能够有效地处理顺序数据中的长期依赖性,因此在自然语言处理(NLP)任务中受到欢迎。它还被应用于各种其他任务,包括图像分类、对象检测和语音识别。 在车辆和行人识别方面,transformer可用…

浅谈安科瑞电能预付费系统在大电力客户中的设计及应用分析

摘 要 随着我国供电企业的不断发展,而用电模式也在不断改革,预付费技术在气、电等部门得到普遍的使用,本文主要针对预付费系统在大电力客户中的使用情况进行分析,提高用电用户的缴费率,有效的避免了客户恶意偷窃电行…

【小5聊】Winform从指定服务器下载文件的方式

在一些实际项目中,我们往往需要上传一些excel、word等办公文件,甚至是mp3、mp4等音频视频文件。 当然,大多数小型网站会放到自己服务器,如果文件量不大的话 如果文件数量会很多,那么就需要考虑使用第三方来保管存储 不…