这篇主要讲的是搭建日志收集分析系统,关于如何使用docker部署ELK,可以参考我上篇
日志收集系统
简介
与SpringBoot整合应用,再延伸到ELK技术栈实现日志输出(Log4j2)、数据抓取(Filebeat)、数据转储(Kafka Broker);再到Logstash消费,然后Sink到Elasticsearch平台,通过Kibana进行展示
ELK技术架构图
ELK实战流程图
使用docker搭建filebeat
拉取镜像
docker pull elastic/filebeat:7.13.2
复制代码
filebeat与Elasticsearch最好版本一致。如何查看es版本,进入Kibana页面查看或者进入es bin目录下使用
/opt/elasticsearch/bin/elasticsearch --version
创建容器
docker run -d \
--name=filebeat \
--user=root \
--volume="/usr/local/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro" \
--volume="/usr/local/logs:/usr/local/logs" \
elastic/filebeat:7.13.2 filebeat -e -strict.perms=false
复制代码
创建filebeat容器的时候,首先需要创建filebeat.yml,将宿主机的logs挂载到filebeat容器中以方便收集日志
--volume="/usr/local/logs:/usr/local/logs"
filebeat.yml文件
###################### Filebeat Configuration Example #########################
filebeat.prospectors:
- input_type: log
paths:
## app-服务名称.log, 为什么写死,防止发生轮转抓取历史数据
- /usr/local/logs/app-collector.log
#定义写入 ES 时的 _type 值
document_type: "app-log"
multiline:
#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)
negate: true # 是否匹配到
match: after # 合并到上一行的末尾
max_lines: 2000 # 最大的行数
timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的日志
fields:
logbiz: collector
logtopic: app-log-collector ## 按服务划分用作kafka topic
evn: dev
- input_type: log
paths:
- /usr/local/logs/error-collector.log
document_type: "error-log"
multiline:
#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)
negate: true # 是否匹配到
match: after # 合并到上一行的末尾
max_lines: 2000 # 最大的行数
timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的日志
fields:
logbiz: collector
logtopic: error-log-collector ## 按服务划分用作kafka topic
evn: dev
output.kafka:
enabled: true
#替换成你自己的es地址
hosts: ["localhost:9092"]
topic: '%{[fields.logtopic]}'
partition.hash:
reachable_only: true
compression: gzip
max_message_bytes: 1000000
required_acks: 1
logging.to_files: true
复制代码
filebeat使用到了kafka,需要自己搭建kafka环境,后面会提供如何使用kafka,先将kafka环境和项目的日志配置环境搭建起来
docker-compose构建filebeat
docker-filebeat.yml
version: '2'
services:
filebeat:
image: "elastic/filebeat:7.13.2"
container_name: filebeat
volumes:
- "/usr/local/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro"
- "/usr/local/logs:/usr/local/logs"
environment:
strict.perms: false
复制代码
创建容器
docker-compose apply -f docker-filebeat.yml
复制代码
filebeat与logstash基础语法使用
filebeat使用
## file插件
input {
file {
path => ["/var/log/*.log", "/var/log/message"]
type => "system"
start_position => "beginning"
}
}
## 其他参数:
discover_interval ## 表示每隔多久检测一下文件,默认15秒
exclude ## 表示排除那些文件
close_older ## 文件超过多长时间没有更新,就关闭监听 默认3600s
ignore_older ## 每次检查文件列表 如果有一个文件 最后修改时间超过这个值 那么就忽略文件 86400s
sincedb_path ## sincedb保存文件的位置,默认存在home下(/dev/null)
sincedb_write_interval ## 每隔多久去记录一次 默认15秒
stat_interval ## 每隔多久查询一次文件状态 默认1秒
start_position ## 从头开始读取或者从结尾开始读取
复制代码
logstash使用
- Logstash设计了自己的DSL包括有区域,注释,数据类型(布尔值,字符串,数值,数组,哈希),条件判断字段引用。
- Logstash用{}来定义区域。区域内可以包括插件区域定义,你可以在一个区域内定义多个插件。插件区域内则可以定义键值对设置。
- 格式、语法、使用方式:
## conf下配置文件说明:
# logstash配置文件:/config/logstash.yml
# JVM参数文件:/config/jvm.options
# 日志格式配置文件:log4j2.properties
# 制作Linux服务参数:/config/startup.options
## 配置文件说明:
vim /usr/local/logstash-6.6.0/config/logstash.yml
--path.config 或 –f :logstash启动时使用的配置文件
--configtest 或 –t:测试 Logstash 读取到的配置文件语法是否能正常解析
--log或-l:日志输出存储位置
--pipeline.workers 或 –w:运行 filter 和 output 的 pipeline 线程数量。默认是 CPU 核数。
--pipeline.batch.size 或 –b:每个 Logstash pipeline 线程,在执行具体的 filter 和 output 函数之前,最多能累积的日志条数。
--pipeline.batch.delay 或 –u:每个 Logstash pipeline 线程,在打包批量日志的时候,最多等待几毫秒。
--verbose:输出调试日志
--debug:输出更多的调试日志
## 虚拟机配置
vim /usr/local/logstash-6.6.0/config/jvm.options
## 启动配置 比如启动时的java位置、LS的home等
vim /usr/local/logstash-6.6.0/config/startup.options
## 数据收集目录:/usr/local/logstash-6.6.0/data
## 插件目录:/usr/local/logstash-6.6.0/vendor/bundle/jruby/1.9/gems
## 查看插件命令:
/usr/local/logstash-6.6.0/bin/logstash-plugin list
## 更新插件命令:
/usr/local/logstash-6.6.0/bin/logstash-plugin update logstash-xxxx-xxxxx
## 安装插件命令:
/usr/local/logstash-6.6.0/bin/logstash-plugin install logstash-xxxx-xxxxx
## 插件地址: https://github.com/logstash-plugins
复制代码
logstash.conf配置文件
## multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。
input {
kafka {
## app-log-服务名称
topics_pattern => "app-log-.*"
bootstrap_servers => "localhost:9092"
codec => json
consumer_threads => 1 ## 增加consumer的并行消费线程数
decorate_events => true
#auto_offset_rest => "latest"
group_id => "app-log-group"
}
kafka {
## error-log-服务名称
topics_pattern => "error-log-.*"
bootstrap_servers => "localhost:9092"
codec => json
consumer_threads => 1
decorate_events => true
#auto_offset_rest => "latest"
group_id => "error-log-group"
}
}
filter {
## 时区转换
ruby {
code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
}
if "app-log" in [fields][logtopic]{
grok {
## 表达式
match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
if "error-log" in [fields][logtopic]{
grok {
## 表达式
match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
}
## 测试输出到控制台:
output {
stdout { codec => rubydebug }
}
## elasticsearch:
output {
if "app-log" in [fields][logtopic]{
## es插件
elasticsearch {
# es服务地址
hosts => ["localhost:9200"]
# 用户名密码
user => "elastic"
password => "123456"
## 索引名,+ 号开头的,就会自动认为后面是时间格式:
## javalog-app-service-2019.01.23
index => "app-log-%{[fields][logbiz]}-%{index_time}"
# 是否嗅探集群ip:一般设置true;http://localhost:9200/_nodes/http?pretty
# 通过嗅探机制进行es集群负载均衡发日志消息
sniffing => true
# logstash默认自带一个mapping模板,进行模板覆盖
template_overwrite => true
}
}
if "error-log" in [fields][logtopic]{
elasticsearch {
hosts => ["localhost:9200"]
user => "elastic"
password => "123456"
index => "error-log-%{[fields][logbiz]}-%{index_time}"
sniffing => true
template_overwrite => true
}
}
}
复制代码
logstash使用到了kafka,需要自己搭建kafka环境,后面会提供如何使用kafka,先将kafka环境和项目的日志配置环境搭建起来
使用docker构建kafka环境
拉取镜像
docker pull hlebalbau/kafka-manager:stable
复制代码
创建容器
docker run -d --name=kafka 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=localhost -e KAFKA_ZOOKEEPER_CONNECT=localhost:2181
复制代码
KAFKA_ADVERTISED_HOST_NAME
外网访问配置,云服务器填写公网ip
KAFKA_ZOOKEEPER_CONNECT
zookeeper连接地址
docker run -d --name=zookeeper -p 2181:2181 zookeeper
docker创建zookeeper
使用docker-compose构建kafka环境
version: '1'
services:
zookeeper:
image: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
复制代码
springboot项目日志文件配置
log4j2.xml 配置
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" >
<Properties>
<Property name="LOG_HOME">logs</Property>
<property name="FILE_NAME">collector</property>
<property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property>
</Properties>
<Appenders>
<Console name="CONSOLE" target="SYSTEM_OUT">
<PatternLayout pattern="${patternLayout}"/>
</Console>
<RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
<PatternLayout pattern="${patternLayout}" />
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="500MB"/>
</Policies>
<DefaultRolloverStrategy max="20"/>
</RollingRandomAccessFile>
<RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
<PatternLayout pattern="${patternLayout}" />
<Filters>
<ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="500MB"/>
</Policies>
<DefaultRolloverStrategy max="20"/>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
<!-- 业务相关 异步logger -->
<AsyncLogger name="com.qiangge.*" level="info" includeLocation="true">
<AppenderRef ref="appAppender"/>
</AsyncLogger>
<AsyncLogger name="com.qiangge.*" level="info" includeLocation="true">
<AppenderRef ref="errorAppender"/>
</AsyncLogger>
<Root level="info">
<Appender-Ref ref="CONSOLE"/>
<Appender-Ref ref="appAppender"/>
<AppenderRef ref="errorAppender"/>
</Root>
</Loggers>
</Configuration>
复制代码
最后
将logstash.conf
复制到elk容器/etc/logstash/conf.d/
目录中,重启elk,filebeat将日志文件信息发送kakfa队列中,logstash消费kafka队列中的消息,然后收集中es中,Kibana进行展示。如果有什么不明白可以评论留言。
近期评论