防止ES的的I/O的压力过大,使用redis/kafka进行缓冲。
对redis的要求
Redis input plugin | Logstash Reference [8.17] | Elastic
一般企业要求的架构


我实现的架构

filebeat把数据传给logstash

配置好filebeat把收集到的数据输入到redis

然后执行命令,filebeat就开始往redis中写数据
cd /etc/filebeat
/usr/share/filebeat/bin/filebeat -e -c filebeat.yml
logstash配置,从redis中读数据输出到elasticsearch

当我使用filebeat收集数据,传到redis,然后logstash从redis读取数据的时候有一个大问题就是
filebeat收集的是json格式的nginx访问日志,它被filebeat收集到redis的时候,会对这个收集的日志和与日志相关的元数据进行封装然后输出到redis。等logstash从redis里面取出来的时候,得到的数据是 收集的日志(message) 和 元数据字段的集合。
(就变成了一个字段message,其他都是没用的元数据字段。)
我收集的nginx日志格式

要解决的问题
1.json格式的日志数据message未正确解析
这是在filebeat直接向logstash传送数据的时候发送的错误,因为传送过去的数据没有设置正确的字符集,导致收集到的日志乱码

kibana显示

改正后基本上就没有乱码格式了,一个event是一条日志


2.删除无用的元数据字段
Filebeat直接连接logtsash的时候,filebeat通过output.logstash发送数据,默认不会添加完整元数据,输出到redis的时候,filebeat使用output.redis将日志包装成完整事件结构,包含所有的元数据。
Redis 作为缓冲队列:数据存储到 Redis 时,Filebeat 会保留完整的 Beat 事件结构(包括 @metadata、host、agent 等)。
所以当logstash从redis取日志数据的时候,会收集无用的元数据(只有message是收集的访问日志),我们要把无用的元数据过滤掉

然后会得到一整个大的message字段

3.过滤出日志里面的单个信息
所以删除无用的元数据之后如果我们不仅想得到访问日志,还想得到访问日志里面得具体数据,使用这个具体数据进行画图等等(比如我们想得到remote_addr字段,就需要对message进行解析)
input {
  redis {
    host       => "10.8.0.23"
    port       => 6379
    password   => "Pu@1uC2016"
    db         => 0
    data_type  => "list"
    key        => "nginx-accesslog"
    codec      => json  # 自动解析外层 JSON
  }
}
#nginx的原始的json格式的数据是这样的
#{"time_local":"2025-03-28T05:55:57+08:00","remote_addr":"45.90.163.37","remote_user":"","request":"PUT /v1/agent/service/register
#HTTP/1.1","status":"400","body_bytes_sent":"173","request_time":"0.263","http_referer":"","http_user_agent":"","http_x_forwarded_for":"",
#"upstream_addr":"","upstream_response_time":""}
#但是message里面的数据是这样的,我们要把message中的数据转化为json格式,然后json过滤才能把字段(比如ip)过滤出来,然后可以对数据做图形分析
# "message" => "{\"time_local\":\"2025-03-28T18:12:23+08:00\",\"remote_addr\":\"10.8.0.23\",\"request\":\"HEAD / HTTP/1.1\",\"status\":\"200\",
#\"body_bytes_sent\":\"0\",\"request_time\":\"0.000\",\"http_referer\":\"\",\"http_user_agent\":\"curl/7.61.1\",}"
#要把下面转化为上面的这样的json格式(上面是标准的json格式),filter里面的json工具才能把message这个大字段里面的小字段取出来
filter {
  # ========== 第一步:处理 message 字段中的内层 JSON ==========
  mutate {                                          
    gsub => [
        "message", "\\\"", "\""     # 将 \" 替换为普通引号
       ] 
     }
    mutate {
    # 删除末尾多余逗号(如 ",}" → "}")
    gsub => [
      "message", ",}", "}",
      "message", ",]", "]"
    ]
     }
  # 解析 message 字段中的业务日志   这个json过滤,它只作用于json格式的数据
  json {
    source => "message"
    target => "nginx_log"  # 解析结果存入 nginx_log 子对象
    remove_field => ["message"]
    tag_on_failure => ["_json_parse_failure"]  # 标记解析失败的日志
  }
  # ========== 第二步:提升业务字段到根层级 ==========
  mutate {
    rename => {
      "[nginx_log][time_local]"          => "time_local"
      "[nginx_log][remote_addr]"         => "remote_addr"
      "[nginx_log][request]"             => "request"
      "[nginx_log][status]"              => "status"
      "[nginx_log][body_bytes_sent]"     => "body_bytes_sent"
      "[nginx_log][http_referer]"        => "http_referer"
      "[nginx_log][http_user_agent]"     => "http_user_agent"
    }
  }
#把一些无用的元数据过滤掉不要
  # ========== 第三步:清理所有元数据字段 ==========
  mutate {
    remove_field => [
      "host", "agent", "ecs", "log", "input", "fields",
      "@version", "event", "nginx_log"
    ]
  }
  # ========== 第四步:修正时间戳 ==========
  date {
    match  => [ "time_local", "ISO8601" ]
    target => "@timestamp"  # 覆盖默认时间戳
  }
}
output {
  elasticsearch {
    hosts  => ["https://10.8.0.23:9200"]
    index  => "dami-logs-%{+YYYY.MM.dd}"
    user   => "elastic"
    password => "sxm@325468"
    ssl => true
    cacert => "/logstash/http_ca.crt"    
  }
  # 调试时开启 stdout,查看完整字段
  stdout {
    codec => rubydebug {
      metadata => true  # 显示元数据(确认字段结构)
    }
  }
}最后可以得到message里面的单个小字段信息,以前只能在message中进行查看,不能对数据进行分析和画图,现在可以了


filter过滤器的其他用法
Filter plugins | Logstash Reference [8.17] | Elastic
1.grok
也可以康康这个人的文章
logstash过滤器插件filter详解及实例 - 峰哥ge - 博客园
官方文档
Grok filter plugin | Logstash Reference [8.17] | Elastic
grok %{语法:语义}
#语法在配置文件里已经定义好了
#语义是自己定义的,表示要将获得的字段放在哪个key里,例如下面ip就是key值,取出的字段值是value
match => {"message" => "%{IPV4:ip}"} #message是收集到的每一条数据
/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-patterns-core-4.3.4/patterns/ecs-v1/grok-patterns #语法在配置文件里已经定义好了
match => {"message" => "%{IPV4:ip}"} #message是收集到的每一条数据
match => {"message" => "%{@HTTPDATE:time}"} #
match => {"message" => "%{LOGLEVEL:level}"} #


2.groip 通过ip定位物理位置
Geoip 过滤器插件 |Logstash 参考 [8.17] |弹性的
logstash上要安装这个数据库(要先注册才能看)
Download GeoIP Databases | MaxMind

我的geoip配置写错了但是我懒得改了,我的阿里云服务器要释放了。




logstash收集到的数据还可以存放到数据库中,然后可以自己公司开发一个前端工具连接到数据库进行数据分析




















