Kafka集成Flume/Spark/Flink(大数据)/SpringBoot

news2025/6/5 4:48:21

Kafka集成Flume

在这里插入图片描述

Flume生产者

在这里插入图片描述
③、安装Flume,上传apache-flume的压缩包.tar.gz到Linux系统的software,并解压到/opt/module目录下,并修改其名称为flume
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Flume消费者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Kafka集成Spark

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

生产者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

object SparkKafkaProducer{
	
	def main(args:Array[String]):Unit = {
		
		//配置信息
		val properties  = new Properties()
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092")
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
		
		//创建一个生产者
		var producer = new KafkaProducer[String,String](properties)

		//发送数据
		for(i <- 1 to 5){
			producer.send(new ProducerRecord[String,String]("first","atguigu"+i))
		}

		//关闭资源
		producer.close()
	}
}

在这里插入图片描述

消费者
在这里插入图片描述

Object SparkKafkaConsumer{
	
	def main(args:Array[String]):Unit = {
		
		//初始化上下文环境
		val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")
		
		val ssc = new StreamingContext(conf,Seconds(3))

		//消费数据
		val kafkapara = Map[String,Object](
			ConsumerConfig.BOOT_STRAP_SERVERS_CONFIG->"hadoop102:9092,hadoop103:9092",
			ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
			ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
			ConsumerConfig.GROUP_ID_CONFIG->"test"
		)
		val kafkaDStream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreFerConsistent
										,ConsumerStrategies.Subscribe[String,String](Set("first"),kafkapara))

		val valueDStream = kafkaDStream.map(record=>record.value())
		valueDStream.print()
		//执行代码,并阻塞
		ssc.start()
		ssc.awaitTermination()
	}
}

Kafka集成Flink

在这里插入图片描述

创建maven项目,导入以下依赖
在这里插入图片描述
resources里面添加log4j.properties文件,可以更改打印日志的级别为error
在这里插入图片描述

Flink生产者

public class FlinkafkaProducer1{
	
	public static void main(String[] args){
		
		//获取环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		env.setParallelism(3);

		//准备数据源
		ArrayList<String> wordList = new ArrayList<>();
		wordList.add("hello");
		wordList.add("atguigu");
		DataStreamSource<String> stream = env.fromCollection();

		//创建一个kafka生产者
		Properties properteis = new Properties();
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
		
		FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("first",new SimpleStringSchema(),properties);

		//添加数据源Kafka生产者
		stream.addSink(kafkaProducer);

		//执行
		env.execute();
	}
}

在这里插入图片描述

Flink消费者

public class FlinkafkaConsumer1{
	
	public static void main(String[] args){
		
		//获取环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(3);
		
		//创建一个消费者
		Properties properties = new Properties();
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
		properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

		FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first",new SimpleSStringSchema(),properties);

		//关联消费者和flink流
		env.addSource(kafkaConsumer).print();
		
		//执行
		env.execute();
	}
}

Kafka集成SpringBoot

在这里插入图片描述
在这里插入图片描述

生产者
在这里插入图片描述
在这里插入图片描述
通过浏览器发送
在这里插入图片描述
在这里插入图片描述

消费者

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2397009.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Scratch节日 | 拯救屈原 | 端午节

端午节快乐&#xff01; 这款特别为端午节打造的Scratch游戏 《拯救屈原》&#xff0c;将带你走进古代中国&#xff0c;感受历史与文化的魅力&#xff01; &#x1f3ee; 游戏介绍 扮演勇敢的探险者&#xff0c;穿越时空回到古代&#xff0c;解锁谜题&#xff0c;完成任务&…

rabbitmq Direct交换机简介

在实际开发中&#xff0c;需求可能变得复杂&#xff0c;如消息的收发和处理。以支付系统为例&#xff0c;成功支付后需要改变订单状态并通知用户&#xff0c;而失败则不需要。为处理这种情况&#xff0c;提出了使用Direct交换机&#xff0c;它可以根据规则将消息路由到指定队列…

Git实战--基于已有分支克隆进行项目开发的完整流程

Git克隆项目开发流程 ✅ 一、完整流程概述✅ 二、详细操作步骤Step 1&#xff1a;克隆仓库&#xff08;如果尚未克隆&#xff09;Step 2&#xff1a;获取远程分支信息并切换到 feature/ 获取所有远程分支Step 3&#xff1a;创建并切换到你的新分支Step 4&#xff1a;开始开发新…

2_MCU开发环境搭建-配置MDK兼容Keil4和C51

MCU开发环境搭建-配置MDK兼容Keil4和C51 一、概述 本文以MDK-ARM V5.36版本基础介绍DMK-ARM工程兼容Keil4和C51的配置。 注:在阅读本文前,请先安装和配置完成MDK-ARM(Keil5)。 二、工具包下载 链接: https://pan.baidu.com/s/1Tu2tDD6zRra4xb_PuA1Wsw 提取码: 81pp 三、…

通过远程桌面连接Windows实例提示“出现身份验证错误,无法连接到本地安全机构”错误怎么办?

本文介绍通过远程桌面连接Windows实例提示“出现身份验证错误无法连接到本地安全机构”错误的解决方案。 问题现象 通过本地电脑内的远程桌面连接Windows实例提示“出现身份验证错误&#xff0c;无法连接到本地安全机构”错误。 问题原因 导致该问题的可能原因如下&#x…

百度golang研发一面面经

输入一个网址&#xff0c;到显示界面&#xff0c;中间的过程是怎样的 IP 报文段的结构是什么 Innodb 的底层结构 知道几种设计模式 工厂模式 简单工厂模式&#xff1a;根据传入类型参数判断创建哪种类型对象工厂方法模式&#xff1a;由子类决定实例化哪个类抽象工厂模式&#…

TC3xx学习笔记-启动过程详解(一)

文章目录 前言Firmware启动过程BMHD Check流程ABM启动Internal Flash启动Bootloader ModeProcessing in case no valid BMHD foundProcessing in case no Boot Mode configured by SSW 总结 前言 之前介绍过UCB BMHD的使用&#xff0c;它在启动过程中起着重要的作用&#xff0…

Scratch节日 | 六一儿童节抓糖果

六一儿童节怎么能没有糖果&#xff1f;这款 六一儿童节抓糖果 小游戏&#xff0c;让你变身小猫&#xff0c;开启一场甜蜜大作战&#xff01; &#x1f3ae; 游戏玩法 帮助小猫收集所有丢失的糖果&#xff0c;收集越多分数越高&#xff01; 小心虫子一样的“坏糖果”&#xff…

通信算法之280:无人机侦测模块知识框架思维导图

1. 无人机侦测模块知识框架思维导图, 见文末章节。 2. OFDM参数估计,基于循环自相关特性。 3. 无人机其它参数估计

【Doris基础】Apache Doris中的Coordinator节点作用详解

目录 1 Doris架构概述 2 Coordinator节点的核心作用 2.1 查询协调与调度 2.2 执行计划生成与优化 2.3 资源管理与负载均衡 2.4 容错与故障恢复 3 Coordinator节点的关键实现机制 3.1 两阶段执行模型 3.2 流水线执行引擎 3.3 分布式事务管理 4 Coordinator节点的高可…

【Kubernetes-1.30】--containerd部署

文章目录 一、环境准备1.1 三台服务器1.2 基础配置&#xff08;三台机通用&#xff09;1.3 关闭 Swap&#xff08;必须&#xff09;1.4 关闭防火墙&#xff08;可选&#xff09;1.5 加载必要模块 & 配置内核参数 二、安装容器运行时&#xff08;containerd 推荐&#xff09…

相机--相机标定

教程 内外参公式及讲解 相机标定分类 相机标定分为内参标定和外参标定。 相机成像原理 相机成像畸变 链接 四个坐标系的转变 内参标定 内参 相机内参通常用一个 33 矩阵&#xff08;内参矩阵&#xff0c;KK&#xff09;表示&#xff0c;形式如下&#xff1a; (1)焦距&…

MongoDB(七) - MongoDB副本集安装与配置

文章目录 前言一、下载MongoDB1. 下载MongoDB2. 上传安装包3. 创建相关目录 二、安装配置MongoDB1. 解压MongoDB安装包2. 重命名MongoDB文件夹名称3. 修改配置文件4. 分发MongoDB文件夹5. 配置环境变量6. 启动副本集7. 进入MongoDB客户端8. 初始化副本集8.1 初始化副本集8.2 添…

131. 分割回文串-两种回溯思路

我们可以将字符串分割成若干回文子串&#xff0c;返回所有可能的方案。如果将问题分解&#xff0c;可以表示为分割长度为n-1的子字符串&#xff0c;这与原问题性质相同&#xff0c;因此可以采用递归方法解决。 为什么回溯与递归存在联系&#xff1f;在解决这个问题时&#xff0…

[Java恶补day13] 53. 最大子数组和

休息了一天&#xff0c;开始补上&#xff01; 给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 子数组是数组中的一个连续部分。 示例 1&#xff1a; 输入&#xff1a;nums …

摩尔投票算法原理实现一文剖析

摩尔投票算法原理&实现一文剖析 一、算法原理1.1 基本思想1.2 数学原理 二、算法实现2.1 Python实现2.2 Java实现2.3 C实现 三、复杂度分析四、应用场景4.1 多数元素问题4.2 扩展应用&#xff1a;寻找出现次数超过n/3的元素 五、算法优势与注意事项5.1 优势5.2 注意事项 总…

MyBatis操作数据库(2)

1.#{}和${}使用 Interger类型的参数可以看到这里显示的语句是:select username,password,age,gender,phone from userinfo where id? 输入的参数并没有在后面进行拼接,,id的值是使用?进行占位,这种sql称之为"预编译sql".这里,把#{}改成${}观察情况:这里可以看到…

C++面向对象(二)

面向对象基础内容参考&#xff1a; C面向对象&#xff08;一&#xff09;-CSDN博客 友元函数 类的友元函数是定义在类外部&#xff0c;但有权访问类的所有私有&#xff08;private&#xff09;成员和保护&#xff08;protected&#xff09;成员。尽管友元函数的原型有在类的定…

【C语言入门级教学】冒泡排序和指针数组

文章目录 1.冒泡排序2.⼆级指针3.指针数组4.指针数组模拟⼆维数组 1.冒泡排序 冒泡排序的核⼼思想&#xff1a;两两相邻的元素进⾏⽐较。 //⽅法1 void bubble_sort(int arr[], int sz)//参数接收数组元素个数 { int i 0;for(i0; i-1; i) { int j 0; for(j0; j-1; j) { …