Flume 简介及基本使用

news2025/9/17 2:44:55

1.Flume简介

Apache Flume 是一个分布式,高可用的数据收集系统。它可以从不同的数据源收集数据,经过聚合后发送到存储系统中,通常用于日志数据的收集。Flume 分为 NG 和 OG (1.0 之前) 两个版本,NG 在 OG 的基础上进行了完全的重构,是目前使用最为广泛的版本。下面的介绍均以 NG 为基础。

2. Flume架构和基本概念

下图为 Flume 的基本架构图:

2.1 基本架构

外部数据源以特定格式向 Flume 发送 `events` (事件),当 `source` 接收到 `events` 时,它将其存储到一个或多个 `channel`,`channe` 会一直保存 `events` 直到它被 `sink` 所消费。`sink` 的主要功能从 `channel` 中读取 `events`,并将其存入外部存储系统或转发到下一个 `source`,成功后再从 `channel` 中移除 `events`。

2.2 基本概念

1. Event

`Event` 是 Flume NG 数据传输的基本单元。类似于 JMS 和消息系统中的消息。一个 `Event` 由标题和正文组成:前者是键/值映射,后者是任意字节数组。

2. Source 

数据收集组件,从外部数据源收集数据,并存储到 Channel 中。

3. Channel

`Channel` 是源和接收器之间的管道,用于临时存储数据。可以是内存或持久化的文件系统:

+ `Memory Channel` : 使用内存,优点是速度快,但数据可能会丢失 (如突然宕机);

+ `File Channel` : 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢。

4. Sink

`Sink` 的主要功能从 `Channel` 中读取 `Event`,并将其存入外部存储系统或将其转发到下一个 `Source`,成功后再从 `Channel` 中移除 `Event`。

5. Agent

是一个独立的 (JVM) 进程,包含 `Source`、 `Channel`、 `Sink` 等组件。

2.3 组件种类

Flume 中的每一个组件都提供了丰富的类型,适用于不同场景:

- Source 类型 :内置了几十种类型,如 `Avro Source`,`Thrift Source`,`Kafka Source`,`JMS Source`;

- Sink 类型 :`HDFS Sink`,`Hive Sink`,`HBaseSinks`,`Avro Sink` 等;

- Channel 类型 :`Memory Channel`,`JDBC Channel`,`Kafka Channel`,`File Channel` 等。

对于 Flume 的使用,除非有特别的需求,否则通过组合内置的各种类型的 Source,Sink 和 Channel 就能满足大多数的需求。

3. Flume架构模式

Flume 支持多种架构模式,分别介绍如下

3.1 multi-agent flow

Flume 支持跨越多个 Agent 的数据传递,这要求前一个 Agent 的 Sink 和下一个 Agent 的 Source 都必须是 `Avro` 类型,Sink 指向 Source 所在主机名 (或 IP 地址) 和端口(详细配置见下文案例三)。

3.2 Consolidation

日志收集中常常存在大量的客户端(比如分布式 web 服务),Flume 支持使用多个 Agent 分别收集日志,然后通过一个或者多个 Agent 聚合后再存储到文件系统中。

3.3 Multiplexing the flow

Flume 支持从一个 Source 向多个 Channel,也就是向多个 Sink 传递事件,这个操作称之为 `Fan Out`(扇出)。默认情况下 `Fan Out` 是向所有的 Channel 复制 `Event`,即所有 Channel 收到的数据都是相同的。同时 Flume 也支持在 `Source` 上自定义一个复用选择器 (multiplexing selector) 来实现自定义的路由规则。

4.Flume配置格式

Flume 配置通常需要以下两个步骤:

1. 分别定义好 Agent 的 Sources,Sinks,Channels,然后将 Sources 和 Sinks 与通道进行绑定。需要注意的是一个 Source 可以配置多个 Channel,但一个 Sink 只能配置一个 Channel。基本格式如下:

<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>

set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...

set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>

2. 分别定义 Source,Sink,Channel 的具体属性。基本格式如下:

<Agent>.sources.<Source>.<someProperty> = <someValue>

properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>

properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>

5. Flume使用案例

介绍几个 Flume 的使用案例:

+ 案例一:使用 Flume 监听文件内容变动,将新增加的内容输出到控制台。

+ 案例二:使用 Flume 监听指定目录,将目录下新增加的文件存储到 HDFS。

+ 案例三:使用 Avro 将本服务器收集到的日志数据发送到另外一台服务器。

5.1 案例一

需求: 监听文件内容变动,将新增加的内容输出到控制台。

实现: 主要使用 `Exec Source` 配合 `tail` 命令实现。

1. 配置

新建配置文件 `exec-memory-logger.properties`,其内容如下:

#指定agentsources,sinks,channels
a1.sources = s1  
a1.sinks = k1  
a1.channels = c1  
   
#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c

#将sourceschannels进行绑定
a1.sources.s1.channels = c1
   
#配置sink 
a1.sinks.k1.type = logger

#将sinkschannels进行绑定  
a1.sinks.k1.channel = c1  
   
#配置channel类型
a1.channels.c1.type = memory

2. 启动

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-logger.properties \
--name a1 \
-Dflume.root.logger=INFO,console

3. 测试

向文件中追加数据:

控制台的显示:

5.2 案例二

需求: 监听指定目录,将目录下新增加的文件存储到 HDFS。

实现:使用 `Spooling Directory Source` 和 `HDFS Sink`。

1. 配置

#指定agentsources,sinks,channels
a1.sources = s1  
a1.sinks = k1  
a1.channels = c1  
   
#配置sources属性
a1.sources.s1.type =spooldir  
a1.sources.s1.spoolDir =/tmp/logs
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = fileName 
#将sourceschannels进行绑定  
a1.sources.s1.channels =c1 

   
#配置sink 
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/
a1.sinks.k1.hdfs.filePrefix = %{fileName}
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream  
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#将sinkschannels进行绑定  
a1.sinks.k1.channel = c1
   
#配置channel类型
a1.channels.c1.type = memory

2. 启动

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/spooling-memory-hdfs.properties \
--name a1 -Dflume.root.logger=INFO,console

3. 测试

拷贝任意文件到监听目录下,可以从日志看到文件上传到 HDFS 的路径:

cp log.txt logs/

查看上传到 HDFS 上的文件内容与本地是否一致:

hdfs dfs -cat /flume/events/19-04-09/13/log.txt.1554788567801

5.3 案例三

需求: 将本服务器收集到的数据发送到另外一台服务器。

实现:使用 `avro sources` 和 `avro Sink` 实现。

1. 配置日志收集Flume

新建配置 `netcat-memory-avro.properties`,监听文件内容变化,然后将新的文件内容通过 `avro sink` 发送到 hadoop001 这台服务器的 8888 端口:

#指定agentsources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1

#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
a1.sources.s1.channels = c1

#配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
a1.sinks.k1.channel = c1

#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

2. 配置日志聚合Flume

使用 `avro source` 监听 hadoop001 服务器的 8888 端口,将获取到内容输出到控制台:

#指定agentsources,sinks,channels
a2.sources = s2
a2.sinks = k2
a2.channels = c2

#配置sources属性
a2.sources.s2.type = avro
a2.sources.s2.bind = hadoop001
a2.sources.s2.port = 8888

#将sourceschannels进行绑定
a2.sources.s2.channels = c2

#配置sink
a2.sinks.k2.type = logger

#将sinkschannels进行绑定
a2.sinks.k2.channel = c2

#配置channel类型
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

3. 启动

启动日志聚集 Flume:

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/avro-memory-logger.properties \
--name a2 -Dflume.root.logger=INFO,console

在启动日志收集 Flume:

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \
--name a1 -Dflume.root.logger=INFO,console

这里建议按以上顺序启动,原因是 `avro.source` 会先与端口进行绑定,这样 `avro sink` 连接时才不会报无法连接的异常。但是即使不按顺序启动也是没关系的,`sink` 会一直重试,直至建立好连接。

4.测试

向文件 `tmp/log.txt` 中追加内容:

可以看到已经从 8888 端口监听到内容,并成功输出到控制台:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1095516.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

雷电模拟器上使用第一个frida(四)第一个HOOK

经过上述三篇&#xff0c;已经可以使用python3.8.10编写代码&#xff0c;利用frida14.2.18和雷电模拟器9.0.60(9)&#xff0c;Android 9交互。 雷电模拟器上使用第一个frida&#xff08;一&#xff09;之安装-CSDN博客 雷电模拟器上使用第一个frida&#xff08;二&#xff09…

网络类型与数据链路层协议

目录 整体大纲图 一、网络类型 二、数据链路层协议 1、MA网络 2、P2P网络 1&#xff09;HDLC协议 2&#xff09;PPP协议 a、特点及其数据帧封装结构 b、组成及其工作过程 c、ppp会话流程及ppp验证 d、ppp配置命令 f、ppp mp 整体大纲图 一、网络类型 二、数据链路层…

【机器学习】sklearn降维算法PCA

文章目录 降维PCAsklearn中的PCA代码实践 PCA对手写数字数据集的降维 降维 如何实现降维&#xff1f;【即减少特征的数量&#xff0c;又保留大部分有效信息】 将那些带有重复信息的特征合并&#xff0c;并删除那些带无效信息的特征等等&#xff0c;逐渐创造出能够代表原特征矩…

计算机毕业设计 基于协同过滤算法的白酒销售系统的设计与实现 Javaweb项目 Java实战项目 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

线性表的插入、删除和查询操作

线性表的插入、删除和查询操作 1、定义线性表 定义一个线性结构&#xff0c;有列表默认长度设置为50&#xff0c;列表数量 #include <stdio.h> #define MaxSize 50typedef int Element; typedef struct{Element data[MaxSize];int length; }SqList;2、顺序表插入 插入…

【算法训练-排序算法 一】【手撕排序】快速排序、堆排序、归并排序

废话不多说&#xff0c;喊一句号子鼓励自己&#xff1a;程序员永不失业&#xff0c;程序员走向架构&#xff01;本篇Blog的主题是【手撕排序系列】&#xff0c;使用【数组】这个基本的数据结构来实现&#xff0c;这个高频题的站点是&#xff1a;CodeTop&#xff0c;筛选条件为&…

PCL点云处理之配准中的匹配对连线可视化显示 Correspondences(二百一十九)

PCL点云处理之配准中的匹配对连线可视化显示 Correspondences(二百一十九) 一、算法介绍二、算法实现1.可视化代码2.完整代码(特征匹配+可视化)最终效果一、算法介绍 关于点云配准中的匹配对,如果能够可视化将极大提高实验的准确性,还好PCL提供了这样的可视化工具,做法…

【Java零基础入门到就业】第一天:java简介和cmd窗口的一些常见命令

1、java简介 Java是一种基于类的、面向对象的编程语言&#xff0c;它被设计成具有尽可能少的实现依赖。它旨在让应用程序开发人员编写一次&#xff0c;并在任何地方运行(WORA)&#xff0c;这意味着编译后的Java代码可以在所有支持Java的平台上运行&#xff0c;而无需重新编译。…

pikachu靶场搭建及通关

一、靶场搭建 下载工具&#xff1a;phpstudy Pikachu靶机下载地址&#xff1a; https://github.com/zhuifengshaonianhanlu/pikachu 下载后解压缩并放入如下文件夹&#xff08;网站根目录&#xff09; 建议修改文件名称为 pikachu 修改配置文件&#xff08;mysql 用户名&…

ORA-00600: internal error code, arguments

通过rman将11g异机升级到19c时&#xff0c;应用归档时报错&#xff0c;报错如下 RMAN> recover database ; Starting recover at 2023-10-15 21:10:02 allocated channel: ORA_DISK_1 channel ORA_DISK_1: SID5776 device typeDISK starting media recovery media recove…

在vs code中创建一个名为 “django_env“ 的虚拟环境报错?!以下方法可以解决

# vs code 终端窗口中运行&#xff1a; mkvirtualenv django_env # 拓展&#xff1a; mkvirtualenv django_env 是一个命令&#xff0c;用于创建一个名为 "django_env" 的虚拟环境。虚拟环境是一种用于隔离不同Python项目所需依赖的工具。通过创建虚拟环境&#x…

在pycharm中运行js文件,附加node.js下载步骤

文章目录 一、前言二、node.js安装和配置(如果之前就安装好了可以直接跳过)1、进入官网下载安装包2、在本地安装node.js3、环境配置4、验证是否安装成功5、修改下载位置(默认是在c盘&#xff0c;这个根据个人需求)6、设置默认模块包7、测试一下是否修改成功(要进入管理员模式的…

YOLOv5网络结构图

网络结构图&#xff08;简易版和详细版&#xff09; 网络框架介绍 前言&#xff1a; YOLOv5是一种基于轻量级卷积神经网络&#xff08;CNN&#xff09;的目标检测算法&#xff0c;整体可以分为三个部分&#xff0c; backbone&#xff0c;neck&#xff0c;head。 如上图所示…

测试左移右移-理论篇

目录 前言一、浅解左移1.什么是测试左移&#xff1f;1.1对产品1.2对开发1.3对测试1.4对运维 二、浅解右移1.1对产品1.2对开发1.3对测试1.4对运维 三、总结 前言 测试左移右移&#xff0c;很多人说能让测试更拥有主动权&#xff0c;展示出测试岗位也是有很大的价值&#xff0c;…

分享一个制作AI视频的好工具

点击上方“Python爬虫与数据挖掘”&#xff0c;进行关注 回复“书籍”即可获赠Python从入门到进阶共10本电子书 今 日 鸡 汤 同行十二年&#xff0c;不知木兰是女郎。 前几天在【AI破局俱乐部】看到了【元峰】老师的分享&#xff0c;干货满满&#xff0c;这个分享是关于数字人的…

动态规划算法(3)--0-1背包、石子合并、数字三角形

目录 一、0-1背包 1、概述 2、暴力枚举法 3、动态规划 二、石子合并问题 1、概述 2、动态规划 3、环形石子怎么办&#xff1f; 三、数字三角形问题 1、概述 2、递归 3、线性规划 四、租用游艇问题 一、0-1背包 1、概述 0-1背包&#xff1a;给定多种物品和一个固定…

OpenCV完美实现两张图片的全景拼接(详细教程)

目录 1&#xff0c;主要步骤 1.1 导入需要的包和模块&#xff0c;并读取两张待拼接的图片&#xff0c;这里我们假设它们为 left.jpg 和 right.jpg。 1.2 创建SIFT检测器 1.3 创建一个基于 FLANN 的匹配器 1.4 筛选过程删除掉一些不合适的匹配点&#xff0c;只保留最好的…

[Spring] SpringMVC 简介(三)

目录 九、SpringMVC 中的 AJAX 请求 1、简单示例 2、RequestBody&#xff08;重点关注“赋值形式”&#xff09; 3、ResponseBody&#xff08;经常用&#xff09; 4、为什么不用手动接收 JSON 字符串、转换 JSON 字符串 5、RestController 十、文件上传与下载 1、Respo…

03_51单片机点亮LED灯

51单片机是一种非常常见的单片机型号&#xff0c;广泛应用于各种嵌入式系统和电子设备中。LED灯是一种常见的输出设备&#xff0c;用于显示信息或指示状态。下面是关于51单片机控制LED灯的介绍&#xff1a; 1. 连接LED灯&#xff1a;将LED的正极连接到51单片机的一个I/O引脚&a…

英语——歌曲篇——All Out Of Love

All Out Of Love [Air Supply失落的爱] 歌词 I’m lying alone with my head on the phone Thinking of you till it hurts I know you hurt too but what else can we do Tormented and torn apart I wish I could carry your smile in my heart For times when my life se…