目录
一、背景
二、设计
三、具体实现
Filebeat配置
K8S SideCar yaml
Logstash配置
一、背景
将容器中服务的trace日志和应用日志收集到KAFKA,需要注意的是 trace 日志和app 日志需要存放在同一个KAFKA两个不同的topic中。分别为APP_TOPIC和TRACE_TOPIC
二、设计
流程图如下:
 
 
  说明:
APP_TOPIC:主要存放服务的应用日志
TRACE_TOPIC:存放程序输出的trace日志,用于排查某一个请求的链路
文字说明:
filebeat 采集容器中的日志(这里需要定义一些规范,我们定义的容器日志路径如下),filebeat会采集两个不同目录下的日志,然后输出到对应的topic中,之后对kafka 的topic进行消费、存储。最终展示出来
/home/service/
└── logs
    ├── app
    │   └── pass
    │       ├── 10.246.84.58-paas-biz-784c68f79f-cxczf.log
    │       ├── 1.log
    │       ├── 2.log
    │       ├── 3.log
    │       ├── 4.log
    │       └── 5.log
    └── trace
        ├── 1.log
        ├── 2.log
        ├── 3.log
        ├── 4.log
        ├── 5.log
        └── trace.log
4 directories, 13 files三、具体实现
上干货~
Filebeat配置
配置说明:
其中我将filebeat的一些配置设置成了变量,在接下来的k8s yaml文件中需要定义变量和设置变量的value。
需要特别说明的是我这里是使用了 tags: ["trace-log"]结合when.contains来匹配,实现将对应intput中的日志输出到对应kafka的topic中
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /home/service/logs/trace/*.log
  fields_under_root: true
  fields:
    topic: "${TRACE_TOPIC}"
  json.keys_under_root: true
  json.add_error_key: true
  json.message_key: message
  scan_frequency: 10s
  max_bytes: 10485760
  harvester_buffer_size: 1638400
  ignore_older: 24h
  close_inactive: 1h
  tags: ["trace-log"]
  processors:
    - decode_json_fields:
        fields: ["message"]
        process_array: false
        max_depth: 1
        target: ""
        overwrite_keys: true
- type: log
  enabled: true
  paths:
    - /home/service/logs/app/*/*.log
  fields:
    topic: "${APP_TOPIC}"
  scan_frequency: 10s
  max_bytes: 10485760
  harvester_buffer_size: 1638400
  close_inactive: 1h
  tags: ["app-log"]
output.kafka:
  enabled: true
  codec.json:
    pretty: true  # 是否格式化json数据,默认false
  compression: gzip
  hosts: "${KAFKA_HOST}"
  topics:
    - topic: "${TRACE_TOPIC}"
      bulk_max_duration: 2s
      bulk_max_size: 2048
      required_acks: 1
      max_message_bytes: 10485760
      when.contains:
        tags: "trace-log"
    - topic: "${APP_TOPIC}"
      bulk_flush_frequency: 0
      bulk_max_size: 2048
      compression: gzip
      compression_level: 4
      group_id: "k8s_filebeat"
      grouping_enabled: true
      max_message_bytes: 10485760
      partition.round_robin:
        reachable_only: true
      required_acks: 1
      workers: 2
      when.contains:
        tags: "app-log"K8S SideCar yaml
配置说明:
该yaml中定一个两个容器,容器1为nginx(示例)容器2为filebeat容器。定义了一个名称为logs的emptryDir类型的卷,将logs卷同时挂载在了容器1和容器2的/home/service/logs目录
接下来又在filebeat容器中自定义了三个环境变量,这样我们就可以通过修改yaml的方式很灵活的来配置filebeat
TRACE_TOPIC: Trace日志的topic
APP_TOPIC:App日志的topic
KAFKA_HOST:KAFKA地址
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: nginx
  name: nginx
  namespace: default
spec:
  replicas: 2
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      imagePullSecrets:
      - name: uhub-registry
      containers:
      - image: uhub.service.ucloud.cn/sre-paas/nginx:v1
        imagePullPolicy: IfNotPresent
        name: nginx
        ports:
        - name: nginx
          containerPort: 80
        - mountPath: /home/service/logs
          name: logs
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /home/service/logs
          name: logs
      - env:
        - name: TRACE_TOPIC
          value: pro_platform_monitor_log
        - name: APP_TOPIC
          value: platform_logs
        - name: KAFKA_HOST
          value: '["xxx.xxx.xxx.xxx:9092","xx.xxx.xxx.xxx:9092","xx.xxx.xxx.xxx:9092"]'
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.name
        image: xxx.xxx.xxx.cn/sre-paas/filebeat-v2:8.11.2
        imagePullPolicy: Always
        name: filebeat
        resources:
          limits:
            cpu: 150m
            memory: 200Mi
          requests:
            cpu: 50m
            memory: 100Mi
        securityContext:
          privileged: true
          runAsUser: 0
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /home/service/logs
          name: logs
      dnsPolicy: ClusterFirst
      imagePullSecrets:
      - name: xxx-registry
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      terminationGracePeriodSeconds: 30
      volumes:
      - emptyDir: {}
        name: logs                                                                                                                                                                              Logstash配置
input {
  kafka {
    type => "platform_logs"
    bootstrap_servers => "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092"
    topics => ["platform_logs"]
    group_id => 'platform_logs'
    client_id => 'open-platform-logstash-logs'
  }
  kafka {
    type => "platform_pre_log"
    bootstrap_servers => "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092"
    topics => ["pre_platform_logs"]
    group_id => 'pre_platform_logs'
    client_id => 'open-platform-logstash-pre'
  }
  kafka {
    type => "platform_nginx_log"
    bootstrap_servers => "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092"
    topics => ["platform_nginx_log"]
    group_id => 'platform_nginx_log'
    client_id => 'open-platform-logstash-nginx'
  }
}
filter {
  if [type] == "platform_pre_log" {
    grok {
      match => { "message" => "\[%{IP}-(?<service>[a-zA-Z-]+)-%{DATA}\]" }
    }
  }
  if [type] == "platform_logs" {
    grok {
      match => { "message" => "\[%{IP}-(?<service>[a-zA-Z-]+)-%{DATA}\]" }
    }
  }
}
output {
  if [type] == "platform_logs" {
    elasticsearch {
      id => "platform_logs"
      hosts => ["http://xxx.xxx.xxx.xxx:9200","http://xxx.xxx.xxx.xxx:9200","http://xxx.xxx.xxx.xxx:9200"]
      index => "log-xxx-prod-%{service}-%{+yyyy.MM.dd}"
      user => "logstash_transformer"
      password => "xxxxxxx"
      template_name => "log-xxx-prod"
      manage_template => "true"
      template_overwrite => "true"
    }
  }
  if [type] == "platform_pre_log" {
    elasticsearch {
      id => "platform_pre_logs"
      hosts => ["http://xxx.xxx.xxx.xxx:9200","http://xxx.xxx.xxx.xxx:9200","http://xxx.xxx.xxx.xxx:9200"]
      index => "log-xxx-pre-%{service}-%{+yyyy.MM.dd}"
      user => "logstash_transformer"
      password => "xxxxxxx"
      template_name => "log-xxx-pre"
      manage_template => "true"
      template_overwrite => "true"
    }
  }
  if [type] == "platform_nginx_log" {
    elasticsearch {
      id => "platform_nginx_log"
      hosts => ["http://xxx.xxx.xxx.xxx:9200","http://xxx.xxx.xxx.xxx:9200","http://xxx.xxx.xxx.xxx:9200"]
      index => "log-platform-nginx-%{+yyyy.MM.dd}"
      user => "logstash_transformer"
      password => "xxxxxxx"
      template_name => "log-platform-nginx"
      manage_template => "true"
      template_overwrite => "true"
    }
  }
}如果有帮助到你麻烦给个赞或者收藏一下~,有问题可以随时私聊我或者在评论区评论,我看到会第一时间回复












![[文本挖掘和知识发现] 03.基于大连理工情感词典的情感分析和情绪计算](https://img-blog.csdnimg.cn/20200809125022536.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0Vhc3Rtb3VudA==,size_16,color_FFFFFF,t_70#pic_center)






