Filebeat+Elasticsearch收集整洁的业务日

一、开场

最近在开发业务网关,我们期望将网关的多维度日志搜集起来供作统计分析,其中就包括网关请求日志,监控日志等。抱着尽量少引入外部组件依赖,同时期望稳定性好的思路,最终选择了Filebeat+Elasticsearch这种组合的方案。
注:文中提及的Filebeat、Elasticsearch均是在6.5.4版本下实现。7以上版本会有部分差异,但不影响整体的实现思路。

二、收集效果

先放一张最终处理得到的ES文档结果截图:

图中主要搜集微服务Jvm相关的指标,经处理后,得到的全为业务所需字段,各业务无关字段都做了移除处理,有没有很整洁?

三、实现过程

使用过Filebeat都知道,采集的数据信息会被写入到一个message字段,要做内容解析可以配合Logstash使用Gork过滤器进行二次处理,为尽量减少外部组件的引入,我们未选择这种实现。考虑Filebeat本身可以对json格式的文件内容做解析处理,我们将要搜集的日志统一以json的格式输出,这样Filebeat即可直接实现日志内容的解析。

3.1 服务端日志记录

  • Java日志记录
public class MetricsGatherJob {
    
    private static final Logger LOG = LoggerFactory.getLogger(MetricsGatherJob.class);

    ...
    private void doGather() {
        try {
            LogMetricsInfo metrics = new LogMetricsInfo();
        ...
        #添加记录内容
        ...
            //已json格式将日志写出到日志文件
            LOG.info(jacksonToString(metrics));
        } catch (Exception ignore) {
            LOG.error(ignore.getMessage(), ignore);
        }
    }

    private String jacksonToString(LogMetricsInfo metrics) {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setSerializationInclusion(JsonInclude.Include.ALWAYS);
        String response = null;
        try {
            response = objectMapper.writeValueAsString(metrics);
        } catch (JsonProcessingException ignore) {
            LOG.error(ignore.getMessage(), ignore);
        }
        return response;
    }
}
复制代码
  • logback日志配置
...
    <appender name="metricsRollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>/data1/metrics/logs/metrics.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>/data1/metrics/logs/metrics.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <logger name="xxxpackage.MetricsGatherJob" level="INFO">
        <appender-ref ref="metricsRollingFile"/>
    </logger>
...
复制代码
  • 日志记录样例
{"instance_id":"microservices-gateway-1987-169.254.51.37","minor_gc_count":4,"minor_gc_costs":0.043,"major_gc_count":1,"major_gc_costs":0.126,"heap_old_size":30225216,"heap_survivor_size":0,"heap_eden_size":76985408,"heap_total_size":107210624,"metaspace_size":61840000,"non_heap_committed_size":88760320,"non_heap_used_size":84556424,"non_heap_max_size":1325400063,"threads_new":0,"threads_blocked":0,"threads_runnable":12,"threads_terminated":0,"threads_timed_waiting":24,"threads_waiting":16,"cpu_usage":0.0,"gather_time":1600063277535}
{"instance_id":"microservices-gateway-1987-169.254.51.37","minor_gc_count":4,"minor_gc_costs":0.043,"major_gc_count":1,"major_gc_costs":0.126,"heap_old_size":30225216,"heap_survivor_size":0,"heap_eden_size":87406704,"heap_total_size":117631920,"metaspace_size":62022688,"non_heap_committed_size":89415680,"non_heap_used_size":85142112,"non_heap_max_size":1325400063,"threads_new":0,"threads_blocked":0,"threads_runnable":12,"threads_terminated":0,"threads_timed_waiting":20,"threads_waiting":17,"cpu_usage":0.6869372973126338,"gather_time":1600063338324}
{"instance_id":"microservices-gateway-1987-169.254.51.37","minor_gc_count":4,"minor_gc_costs":0.043,"major_gc_count":1,"major_gc_costs":0.126,"heap_old_size":30225216,"heap_survivor_size":0,"heap_eden_size":96924312,"heap_total_size":127149528,"metaspace_size":62038344,"non_heap_committed_size":90005504,"non_heap_used_size":85688504,"non_heap_max_size":1325400063,"threads_new":0,"threads_blocked":0,"threads_runnable":12,"threads_terminated":0,"threads_timed_waiting":20,"threads_waiting":17,"cpu_usage":0.7160772089562932,"gather_time":1600063398395}
复制代码

3.2 Filebeat配置

  • filebeat.yml关键配置
#======= Filebeat inputs ========
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /data1/metrics/logs/*metrics.log
  json.keys_under_root: true # 如果日志格式是json格式,设置为true之后,filebeat会将日志进行json解析。
  json.add_error_key: true   # 如果启用此设置,出现json解析错误,Filebeat将添加“error.message”和“error.type”字段。


#------- Elasticsearch output ---
#启动外部自定义模板,且启用json模板
setup.ilm.enabled: false
setup:template.enabled: true
setup.template.overwrite: true
setup.template.json.enabled: true

#指定ES索引模板,filebeat将日志记录写入ES时,会自动创建索引
setup.template.name: "gateway_metrics_logs"
setup.template.pattern: "gateway_metrics_logs*"
setup.template.json.path: "/data1/index/template/gateway_metrics_logs_template.json"
setup.template.json.name: "gateway_metrics_logs_index_template"

#采集数据输出到ES
output.elasticsearch:
  hosts: ["127.0.0.1:9200"]
  index: "gateway_metrics_logs"
  pipeline: "gateway_metrics_logs_pipline"
复制代码
  • gateway_metrics_logs_template.json
{
    "index_patterns": [
      "gateway_metrics_logs*"
    ],
    "mappings": {
      "doc":{
      "dynamic_templates": [
        {
          "strings_as_keyword": {
            "mapping": {
              "type": "text",
              "analyzer": "standard",
          "fields":{
            "keyword":{
              "type":"keyword"
            }
          }
            },
            "match_mapping_type": "string",
        "match": "*"
          }
        }
      ],
      "properties": {
        "instance_id": {
          "type": "text",
          "analyzer": "standard"
        },
        "minor_gc_count": {
          "type": "long"
        },
        "minor_gc_costs": {
          "type": "double"
        },
        "major_gc_count": {
          "type": "long"
        },
        "major_gc_costs": {
          "type": "double"
        },
        "heap_old_size": {
          "type": "long"
        },
        "heap_survivor_size": {
          "type": "long"
        },
        "heap_eden_size": {
          "type": "long"
        },
        "heap_total_size": {
          "type": "long"
        },
        "non_heap_committed_size": {
          "type": "long"
        },
        "non_heap_used_size": {
          "type": "long"
        },
        "non_heap_max_size": {
          "type": "long"
        },
        "threads_new": {
          "type": "long"
        },
    	"threads_blocked": {
          "type": "long"
        },
    	"threads_runnable": {
          "type": "long"
        },
    	"threads_terminated": {
          "type": "long"
        },
    	"threads_timed_waiting": {
          "type": "long"
        },
    	"threads_waiting": {
          "type": "long"
        },
        "cpu_usage": {
          "type": "double"
        },
        "gather_time": {
          "type": "long"
        }
      }
    }
  }
}
复制代码

3.3 Elasticsearch端处理

Filebeat的采集会添加很多基础的信息,比如“input”,“agent”,“host”,“@timestamp”等字段。而这些字段对于我们的业务分析属于无意义的字段,于是我们使用ES ingest pipline做移除处理。回头看filebeat.yml的output.elasticsearch中定义了一个pipline:gateway_metrics_logs_pipline,正是为了解决这个烦恼。
Ingest pipline需要在Elasticsearch端提前定义,gateway_metrics_logs_pipline的定义命令如下:

PUT _ingest/pipeline/gateway_metrics_logs_pipline
{
    "description": "template",
    "processors": [
        {
            "remove": {
                "field": "input",
                "ignore_missing": true
            }
        },
        {
            "remove": {
                "field": "host",
                "ignore_missing": true
            }
        },
        {
            "remove": {
                "field": "offset",
                "ignore_missing": true
            }
        },
        {
            "remove": {
                "field": "prospector",
                "ignore_missing": true
            }
        },
        {
            "remove": {
                "field": "beat",
                "ignore_missing": true
            }
        },
        {
            "remove": {
                "field": "@timestamp",
                "ignore_missing": true
            }
        },
        {
            "remove": {
                "field": "source",
                "ignore_missing": true
            }
        },
        {
            "remove": {
              "field": "fields",
              "ignore_missing": true
            }
        },
        {
            "remove": {
              "field": "meta",
              "ignore_missing": true
            }
        }
    ]
}
复制代码

注: "ignore_missing": true,表示当filebeat输入的数据没有该字段时,则不作任何处理便将文档ES,如果不配置则会抛出字段缺失的异常,文档不会正常写入。

四、总结

本文描述了一种完整的日志处理方法,涵盖记录、采集、解析入库,且最终写入ES的文档记录非常清爽整洁,通过这种实现,我们可以方便的解决监控日志、请求日志、业务埋点日志的采集入库,以供后续的分析统计。

若有益处,请关注、点赞。个人认知有限,不准确的地方,欢迎指正!

End.