[ELasticSearch]-Logstash的使用

news2025/7/19 1:22:56

[ELasticSearch]-Logstash的使用

森格 | 2023年2月

介绍:Logstash在Elastic Stack中担当着ELK的工作,在本文主要阐述Logstash的处理流程以及一些应用案例。


文章目录

  • [ELasticSearch]-Logstash的使用
  • 一、Logstash介绍
    • 1.1 What is Logstash?
    • 1.2 版本兼容性
    • 1.3 处理流程
  • 二、Logstash应用
    • 2.1 ElasticSearch索引迁移
      • 2.1.1 logstash-input-elasticsearch
      • 2.1.2 logstash-filter-mutate
      • 2.1.3 logstash-output-elasticsearch
    • 2.2 同步MySQL数据
      • 2.2.1 logstash-input-jdbc
      • 2.2.2 logstash-filter-mutate
      • 2.2.3 logstash-output-elasticsearch
    • 2.3 实战
  • 三、性能调优
    • 3.1 标准输出stdout
    • 3.2 内存 JVM
    • 3.3 管道线程


一、Logstash介绍

1.1 What is Logstash?

Logstash是一款强大的ETL(Extract、Transform、Load)工具。我们可以把它想象成具有数据传输能力的管道,数据信息从输入端(Input)传输到输出端(Output),用户可根据自己的需要在管道中间加上过滤网(Filter)。

1.2 版本兼容性

通常来说,对于Elastic的相关产品,版本最好一一对应。

在个人使用Logstash做es的索引迁移过程中,跳板机使用版本为7.4.1,es版本为6.8.x、7.4.x、7.7.x,迁移过程未见问题。

1.3 处理流程

可以把Logstash实例看成一个正在运行的Logstash进程,因为Logstash利用了jvm,建议在单独的机器上运行。而对于其中的管道可以看成插件的合集,一个Logstash可以运行多个管道,且多个管道之间彼此独立。

请添加图片描述

对于Logstash我们主要需要配置三个插件:

Input Plugins:提取或接收数据。

Filter Plugins:应用转换并丰富数据。

Output Plugins:将已处理的数据加载给目标。

除此之外还有Codec plugins插件,作用于input和output plugin ,负责将数据在原始与LogstashEvent之间转换。

想要了解更多插件支持参考官网:Input Plugins、Filter Plugins、Output Plugins、Codec plugins。

# 查看已安装的插件
cd /logstash
./bin/logstash-plugin list

二、Logstash应用

2.1 ElasticSearch索引迁移

2.1.1 logstash-input-elasticsearch

input {
 elasticsearch {
    hosts => [
            "xxx.xxx.xxx.xxx:9200"
        ]
    user => "xxx"
    password => "xxx"
    index => "xxx"
    query => '{
            "query": {
                "match_all": {}
            }
        }'
    size => 500
    scroll => "5m"
    docinfo => true
    codec => json
    }
}

  • index:要匹配的索引,多个索引用, 隔开。
  • size:每次滚动返回的最大记录数。默认值1000。
  • scroll:两次滚动的间隔时间。默认值1m,单位为秒。
  • docinfo:在事件中会包含es文档的信息,例如索引、类型、id等。默认值flase。
  • codec:在数据输入之前对其进行解码。默认为json。

2.1.2 logstash-filter-mutate

filter {
 mutate {
     remove_field => [
            "@timestamp",
            "@version"
        ]
    }
}

Logstash的输出数据会加上版本和时间戳信息。所以我们要过滤掉这些信息。

过滤前:

请添加图片描述

过滤后:

在这里插入图片描述

2.1.3 logstash-output-elasticsearch

output {
     elasticsearch {
         hosts => [
            "xxx.xxx.xxx.xxx:9200"
        ]
        user => "xxx"
        password => "xxx"
        action => "update"       //有则更新,无则插入
        doc_as_upsert => true
        document_type => "_doc"	 //文档类型
        document_id => "%{[@metadata][_id]}"	//文档id,取元数据docinfo信息中的id
        index => "%{[@metadata][_index]}"		//文档索引名,取元数据docinfo信息中的索引名
    }
}

2.2 同步MySQL数据

2.2.1 logstash-input-jdbc

  1. 下载jar包(mysql-connector)
# 下载
wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-j-8.0.31.tar.gz
tar -zxf mysql-connector-j-8.0.31.tar.gz
# 复制jar包到/xx/logstash/logstash-core/lib/jars/目录
cp /mysql-connector-j-8.0.31/mysql-connector-j-8.0.31.jar /xx/logstash/logstash-core/lib/jars/

这里值得提的是,将连接的jar包放置于Logstash的logstash-core/lib/jars/目录下,就不用再配置中设置jdbc_driver_library参数。

  1. input 配置
input {
	jdbc {
       jdbc_connection_string => "jdbc:mysql://数据库地址:端口/数据库名?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false"
       jdbc_user => "xxxxxx"
       jdbc_password => "xxxxxx"
       jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
       jdbc_validate_connection => true
       jdbc_paging_enabled => true
       jdbc_page_size => 50000
       lowercase_column_names => false
       statement => "SELECT * FROM xxx"
       schedule => "*/1 * * * *"
    }    
}
  • jdbc_connection_string:指定编码格式,禁用SSL协议,设定自动重连。
  • jdbc_validate_connection:连接前做验证。
  • statement:要执行的查询语句。
  • lowercase_column_names:将列名转换为小写。
  • schedule:定时执行,类似于crontab。

2.2.2 logstash-filter-mutate

同2.1.2,这里不做赘述。

2.2.3 logstash-output-elasticsearch

output {
     elasticsearch {
         hosts => [
            "xxx.xxx.xxx.xxx:9200"
        ]
        user => "xxx"
        password => "xxx"
        action => "update"       //有则更新,无则插入
        doc_as_upsert => true
        document_type => "_doc"	 //文档类型
		index => "mysql_data"	 //索引名,需要自己填写
        document_id => "%{id}"	 //文档id,可以根据mysql表中主键id匹配
}
	//标准输出,实际运行中可以取消
     stdout{
        codec => json_lines
        }
}

2.3 实战

TODO:迁移mysql某张表的数据到es实例1,再将es实例1的mysql索引迁移至es实例2。

MySQL数据:

请添加图片描述

将要生成的es索引名为mysql_data_otter

es实例1迁移前索引列表:

请添加图片描述

es实例2迁移前索引列表:

请添加图片描述

logstash_mysql.conf、logstahs_es.conf 配置文件按照上文内容修改即可。

执行命令:

./bin/logstash -f ./conf/xxx.conf

运行成功信息如下:

[2023-02-21T09:32:18,661][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}
[2023-02-21T09:32:20,326][INFO ][logstash.runner          ] Logstash shut down.

es实例1迁移后索引列表:

请添加图片描述

es实例2迁移后索引列表:

请添加图片描述

三、性能调优

3.1 标准输出stdout

在output中我们除了自己做调试使用它,在实际运行过程中避免使用,会影响使用性能。

3.2 内存 JVM

在conf/jvm.options文件中,最好设置为物理内存的50%以上,如果堆大小太低,CPU利用率可能会不必要地增加,从而导致JVM不断进行垃圾回收。且Xms和Xmx堆分配大小设置为相同的值,以防止在运行时调整堆大小。

3.3 管道线程

pipeline.workers:并行执行筛选器和管道输出阶段的工作人员数量,官方建议为CPU核数。

pipeline.batch.size:单个工作线程在尝试执行其筛选器和输出之前从输入中收集的最大事件数,在内存允许的情况下,可以合理增加批处理大小。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/362698.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分享5款小众良心软件,好用到让人惊艳

目前win7渐渐退出视野,大部分人都开始使用win10了,笔者在日常的工作和使用中,为了能够让效率的大提升,下载了不少软件,以下的软件都是个人认为装机必备,而且都是可以免费下载,且没有插件的。 1…

Node-RED 3.0升级,新增特性介绍

前言 最近给我的树莓派上的Node-RED(以下简称NR)做了一下升级,从2.x升级到得了3.0。这是一个比较大的版本升级,在用户体验方面,NR有了有很大的提升。下面让我们一起来看一如何升级以及,3.0新增了那些特性 升级3.0 由于之前的NR是直接使用npm来进行安装的,所以此处升级…

库存管理系统-课后程序(JAVA基础案例教程-黑马程序员编著-第六章-课后作业)

【案例6-1】 库存管理系统 【案例介绍】 1.任务描述 像商城和超市这样的地方,都需要有自己的库房,并且库房商品的库存变化有专人记录,这样才能保证商城和超市正常运转。 本例要求编写一个程序,模拟库存管理系统。该系统主要包…

Mybatis持久层框架 | Mapper加载方式、目录结构解析

💗wei_shuo的个人主页 💫wei_shuo的学习社区 🌐Hello World ! Mapper(resource、class、package)加载方式 resource方式加载 通过resource或url加载单个mapper,接口文件与映射文件不在同一路径下,只能用re…

Java使用MD5加盐对密码进行加密处理,附注册和登录加密解密处理

前言 在开发的时候,有一些敏感信息是不能直接通过明白直接保存到数据库的。最经典的就是密码了。如果直接把密码以明文的形式入库,不仅会泄露用户的隐私,对系统也是极其的不厉,这样做是非常危险的。 那么我们就需要对这些铭文进…

【C++】内存管理知识

💙作者:阿润菜菜 📖专栏:C 本文目录 C/C内存区域分布 对比C语言内存管理的方式 C内存管理的方式 operator new与operator delete函数(new和delete实现的底层调用) new和delete的实现原理 malloc/free…

行为型设计模式之中介者模式

中介者模式 中介者模式又称为调解者模式或调停者模式,属于行为型模式。它用一个中介对象封装系列的对象交互,中介者使各对象不需要显示地相互作用,从而使其耦合松散,而且可以独立地改变它们之间的交互。 中介者模式包装了一系列对…

「TCG 规范解读」第11章 TPM工作组 TCG算法注册表

可信计算组织(Ttrusted Computing Group,TCG)是一个非盈利的工业标准组织,它的宗旨是加强在相异计算机平台上的计算环境的安全性。TCG于2003年春成立,并采纳了由可信计算平台联盟(the Trusted Computing Platform Alli…

allure简介

allure介绍allure是一个轻量级,灵活的,支持多语言的测试报告工具多平台的,奢华的report框架可以为dev/qa提供详尽的测试报告、测试步骤、log也可以为管理层提供high level统计报告java语言开发的,支持pytest,javaScript,PHP等可以…

springcloud入门+组件使用

代码gitee地址:https://gitee.com/bing520/springcloud 集群 cluster: 同一中软件服务的多个服务节点共同为系统提供服务过程,称之为软件服务集群。 分布式 distribute: 不同软件集群共同为一个系统提供服务,这个系统…

图解LeetCode——剑指 Offer 48. 最长不含重复字符的子字符串

一、题目 请从字符串中找出一个最长的不包含重复字符的子字符串,计算该最长子字符串的长度。 二、示例 2.1> 示例 1: 【输入】 "abcabcbb" 【输出】 3 【解释】 因为无重复字符的最长子串是 "abc",所以其长度为 3。 2.2> 示…

深入浅出深度学习Pytroch

本文将以通俗易懂的方式,深入浅出地为您揭开深度学习模型构建与训练的面纱: 深度学习数据data模型model损失函数loss优化optimizer可视化visualizer深度学习 数据data 模型model 损失函数loss 优化optimizer 可视化visualizer深度学习数据data模型m…

IP-GUARD离线的客户端如何更新策略?

在控制台上对指定客户端设置好策略后,在计算机树中找到该客户端右键-“策略导出”,导出相应的ipz格式的文件拿到离线的客户端上,客户端运行PolicyImportTool.exe工具,把ipz策略文件导入即可。

软件测试学习什么?好就业么

软件测试需要学习测试环境、网络环境、windows环境、数据库管理、编程技巧(java编程设计,脚本语言,设计工具,XML编程、软件测试技术,测试理论,方法,流程,文档写作,测试工…

Blazor入门100天 : 身份验证和授权 (5) - 本地化资源

目录 建立默认带身份验证 Blazor 程序角色/组件/特性/过程逻辑DB 改 Sqlite将自定义字段添加到用户表脚手架拉取IDS文件,本地化资源freesql 生成实体类,freesql 管理ids数据表初始化 Roles,freesql 外键 > 导航属性完善 freesql 和 bb 特性 本节源码 https://github.com/…

FreeRTOS入门(01):基础说明与使用演示

文章目录目的基础说明系统移植基础使用演示数据类型和命名风格总结碎碎念目的 FreeRTOS是一个现在非常流行的实时操作系统(Real Time Operating System)。本文将介绍FreeRTOS入门使用相关内容,这篇是第一篇,主要介绍基础背景方面…

追梦之旅【数据结构篇】——详解C语言动态实现带头结点的双向循环链表结构

详解C语言动态实现带头结点的双向循环链表结构~😎前言🙌预备小知识💞链表的概念及结构🙌预备小知识💞链表的概念及结构🙌带头结点的双向循环链表结构🙌整体实现内容分析💞1.头文件编…

openpose在win下环境配置

1.下载OpenPose库 以下二选一进行下载源码 (1)git进行下载 打开GitHub Desktop或者Powershell git clone https://github.com/CMU-Perceptual-Computing-Lab/openpose cd openpose/ git submodule update --init --recursive --remote(2)在github上手动下载 由于下载环境问…

SpringCloud Alibaba 之Nacos集群部署-高可用保证

文章目录Nacos集群部署Linux部署docker部署(参考待验证)Nacos 集群的工作原理Nacos 集群中 Leader 节点是如何产生的Nacos 节点间的数据同步过程官方推荐用户把所有服务列表放到一个vip下面,然后挂到一个域名下面。http://nacos.com:port/ope…

go进阶(1) -深入理解goroutine并发运行机制

并发指的是同时进行多个任务的程序,Web处理请求,读写处理操作,I/O操作都可以充分利用并发增长处理速度,随着网络的普及,并发操作逐渐不可或缺 一、goroutine简述 在Golang中一个goroutines就是一个执行单元&#xff…