Elastic Stack 安装与介绍
Elastic Stack 简介
ELK 是三款软件的简称,分别是 Elasticsearch、Logstash、Kibana,在发展的过程中,又有新成员 Beats 的加入,所以就形成了 Elastic Stack。 全系的 Elastic Stack 技术栈包括:

Elasticsearch 基于 java,是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful 风格接口,多数据源,自动搜索负载等。
Logstash 基于 java,是一个开源的用于收集,分析和存储数据的工具。
Kibana 基于 nodejs,也是一个开源和免费的工具,Kibana 可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以汇总、分析和搜索重要数据日志。
Beats
Beats 是 elastic 公司开源的一款采集系统监控数据的代理 agent,可以直接把数据发送给 Elasticsearch 或者通过 Logstash 发送给 Elasticsearch。Beats 由如下组成:
- Packetbeat:是一个网络数据包分析器,用于监控、收集网络流量信息,支持 ICMP (v4 and v6)、DNS、HTTP、Mysql、PostgreSQL、Redis、MongoDB、Memcache 等协议;
- Filebeat:用于监控、收集服务器日志文件,其已取代 logstash forwarder;
- Metricbeat:可定期获取外部系统的监控指标信息,其可以监控、收集 Apache、HAProxy、MongoDB MySQL、Nginx、PostgreSQL、Redis、System、Zookeeper 等服务;
Beats 和 Logstash 其实都可以进行数据的采集,但是目前主流的是使用 Beats 进行数据采集,然后使用 Logstash 进行数据的分割处理等,早期没有 Beats 的时候,使用的就是 Logstash 进行数据的采集。
ELK
ELK 由 Elasticsearch、 Logstash、Kibana 组成,可整合 SpringBoot 应用实现日志收集,部署起来比 Elastic Stack 要简单,因为无需配置 Beats,适用于小中型项目,下面介绍 ELK 的搭建和整合 SpringBoot 程序:
安装 ELK
使用 Docker Compose 可一键部署 ELK,非常方便,需要用到的配置如下:
input {
tcp {
port => 5044
codec => json_lines
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
user => "your_username"
password => "your_elastic_password"
index => "%{service}-log-%{+YYYY.MM.dd}"
}
}services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
privileged: true
networks:
- es-net
environment:
- TZ=Asia/Shanghai
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
- ELASTIC_USERNAME=your_username
- ELASTIC_PASSWORD=your_elastic_password
# 注意,若开启安全策略,用户名只能手动配置(通过命令行),ELASTICSEARCH_USERNAME 配置将失效(建议生产开启)
- xpack.security.enabled=false
- xpack.security.transport.ssl.enabled=false
ports:
# Api端口
- "9200:9200"
# 节点集群端口
- "9300:9300"
volumes:
- ./es/es-data:/usr/share/elasticsearch/data
- ./es/es-plugins:/usr/share/elasticsearch/plugins
kibana:
image: docker.elastic.co/kibana/kibana:8.17.0
environment:
- TZ=Asia/Shanghai
- ELASTICSEARCH_URL=http://elasticsearch:9200
# 注意,较新版本的 kibana 不允许指定用户名为 elastic(因为他是ES默认的管理员用户名)
- ELASTICSEARCH_USERNAME=your_username
- ELASTICSEARCH_PASSWORD=your_elastic_password
ports:
- "5601:5601"
depends_on:
- elasticsearch
networks:
- es-net
logstash:
image: docker.elastic.co/logstash/logstash:8.17.0
environment:
- TZ=Asia/Shanghai
container_name: logstash
hostname: logstash
privileged: true
ports:
- "9600:9600"
- "5044:5044"
volumes:
- ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
networks:
- es-net
depends_on:
- elasticsearch
networks:
es-net:
driver: bridgelogstash.conf 中,index 用于创建 ES 索引库,%{service}是动态参数,整合 SpringBoot 中会介绍。
参考 docker-compose.yml,需要提前准备好要挂载的 3 个目录,并添加读写权限,如下:
mkdir -p ./es/es-data ./es/es-plugins ./logstash
# 新增配置文件,内容见上文
vim ./logstash/logstash.conf
chmod -R a+rw ./es ./logstash提示
为 ES 安装中文分词器、汉化 Kibana 请参考使用 Docker 安装 ES
至此,logstash 对外暴露了一个 5044 端口,logstash.conf 中也可以暴露多个端口;SpringBoot 应用可以通过这个端口主动发送日志给 logstash。
整合 SpringBoot
<!-- SpringBoot 默认集成了 logback,因此无需添加额外依赖 -->
<!-- 此依赖支持将日志输出到 logstash 中 -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.0</version> <!-- 请根据需要选择适当的版本 -->
</dependency>spring:
application:
name: my-app
profiles:
active: dev<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<springProperty scope = "context" name = "applicationName" source = "spring.application.name" />
<springProperty scope = "context" name = "env" source = "spring.profiles.active" />
<!-- 为logstash输出的JSON格式的Appender -->
<appender name="my-logstash-appender" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>172.0.0.1:5044</destination> <!-- logstash 地址 -->
<!-- 日志输出编码 -->
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
<pattern>yyyy-MM-dd HH:mm:ss.SSS</pattern>
</timestamp>
<pattern>
<!-- 注意日志格式为 Json -->
<pattern>
{
"severity": "%level",
"service": "${applicationName:-}",
"trace": "%X{traceId:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"message": "%message",
"env": "${env}",
"stack_trace": "%exception"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>
<root level="info">
<!--ELK输出-->
<appender-ref ref="my-logstash-appender" />
</root>
</configuration>观察 logback.xml 中日志格式的 service 项,为当前项目的应用名,在 logstash 的配置文件中,index 索引库动态接收这个参数(%{service}),这样,logstash 会为不同的 SpringBoot 程序,在 ES 中创建不同的索引。
一切准备就绪,就可以将 SpringBoot 程序日志对应的索引库显示到 Kibana 中啦。启动 SpringBoot 程序后,访问 Kibana 地址,点击 Management 菜单项(点击不是展开哦),找到 Kibana 菜单,点击 Data Views 创建数据视图。

可以看到 ES 中的索引库和 logstash 配置文件所指定的索引库格式是一致的,最右侧是 ES 中存在的索引库,Index pattern 可以正则匹配索引,这里的作用是将每天的日志都放到同一个数据视图中。
有了数据视图,回到 Kibana 主页面,在 Analytics 菜单中点击 Discover,并选中创建的数据视图,自定义一下数据显示的格式,系统的所有日志数据都能在此轻松查阅,至此就大功告成啦!

同步 Mysql 数据到 ELK
有时系统的操作日志被直接保存到数据库中,查库的话,需要在海量数据中筛选,十分缓慢。可使用 logstash 将数据库日志数据同步到 ES 中,利用 ES 的特性以提高数据查询速度。
第一步:新建两个 docker compose 的配置文件(一个太长了看着不爽):
# docker-compose-logstash.yml
services:
logstash:
image: docker.elastic.co/logstash/logstash:8.17.0
environment:
- TZ=Asia/Shanghai
container_name: logstash
hostname: logstash
privileged: true
ports:
- "9600:9600"
- "5044:5044"
volumes:
- ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
- ./logstash/integrate-mysql:/usr/local/logstash/integrate-mysql
networks:
- es-net
networks:
es-net:
driver: bridge# docker-compose-es-kibana.yml
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
privileged: true
networks:
- es-net
environment:
- TZ=Asia/Shanghai
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
- ELASTIC_USERNAME=your_username
- ELASTIC_PASSWORD=your_elastic_password
# 注意,若开启安全策略,用户名只能手动配置(通过命令行),ELASTICSEARCH_USERNAME 配置将失效(建议生产开启)
- xpack.security.enabled=false
- xpack.security.transport.ssl.enabled=false
ports:
# Api端口
- "9200:9200"
# 节点集群端口
- "9300:9300"
volumes:
- ./es/es-data:/usr/share/elasticsearch/data
- ./es/es-plugins:/usr/share/elasticsearch/plugins
kibana:
image: docker.elastic.co/kibana/kibana:8.17.0
environment:
- TZ=Asia/Shanghai
- ELASTICSEARCH_URL=http://elasticsearch:9200
# 注意,较新版本的 kibana 不允许指定用户名为 elastic(因为他是ES默认的管理员用户名)
- ELASTICSEARCH_USERNAME=your_username
- ELASTICSEARCH_PASSWORD=your_elastic_password
ports:
- "5601:5601"
depends_on:
- elasticsearch
networks:
- es-ne
networks:
es-net:
driver: bridge第二步:准备挂载文件
由于 logstash.conf 配置项有点长,文档不太美观,将其按内容分为了 3 个标签。
# 创建三个文件夹
mkdir -p ./es/es-data ./es/es-plugins ./logstash/integrate-mysql
# 创建配置文件,内容点上面另外三个标签查看(实际操作请将他们都写在 logstash.conf 中)
vim ./logstash/logstash.conf
# 修改目录权限
chmod -R a+rw ./es ./logstashinput {
tcp {
port => 5044
codec => json_lines
}
jdbc {
# JDBC 连接字符串
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/my_database"
jdbc_user => "xxx"
jdbc_password => "xxx"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_driver_library => "/usr/local/logstash/integrate-mysql/mysql-connector-j-8.3.0.jar"
# 定义 SQL 查询语句(:sql_last_value 是占位符,其值由 tracking_column 决定)
statement => "SELECT * FROM sys_oper_log WHERE create_time > :sql_last_value order by create_time asc"
# 这里将最后一条记录的 create_time 作为下次查询的起点,每次查询都会取最新数据
use_column_value => true
tracking_column => "create_time"
# 字段类型为时间戳( 可选 numeric 和 timestamp;numeric 表示字段是数值类型,如自增 )
tracking_column_type => "timestamp"
# 上次处理的时间戳保存位置( logstash 会自动创建这个配置,第一次启动不用管他;后面若想再次扫描旧数据,需修改这个时间 )
last_run_metadata_path => "/usr/local/logstash/integrate-mysql/last_run_metadata.yml"
# 轮询频率(每隔2分钟从数据库查询数据)
schedule => "*/2 * * * *"
# 添加自定义字段,可用于动态创建索引库
add_field => {
"source_type" => "jdbc"
"database_name" => "my_database_name"
}
}
}filter {
mutate {
# 将表中某个字段的类型转换为指定类型
convert => { "order_type" => "integer"}
# 将 create_time 字段映射到 @timestamp; 相当于给字段重命名
rename => {
"create_time" => "@timestamp"
"name" => "user_name"
}
}
# 时间过滤器; 如果 create_time 不是时间类型,而是字符串,需要转换后再映射到 @timestamp
# date {
# match => ["create_time", "yyyy-MM-dd HH:mm:ss"]
# target => "@timestamp"
# timezone => "Asia/Shanghai"
# }
}output {
# 为不同的输入源创建不同的索引库(index)
if [source_type] == "tcp" {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
user => "your_username"
password => "your_elastic_password"
index => "%{service}-log-%{+YYYY.MM.dd}"
}
} else if [source_type] == "jdbc" {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "%{database_name}-log-%{+YYYY.MM.dd}"
user => "your_username"
password => "your_elastic_password"
}
}
}第三步:提供 MySQL 连接驱动
将 mysql-connector-j-8.3.0.jar 上传到 ./logstashintegrate-mysql 目录中。
# 上传成功后,以防权限不足,修改下权限
chmod -R a+rw ./logstash第四步:一切准备就绪,运行容器即可
容器运行成功,在 Data Views 菜单中新建数据视图即可,数据同步可能需要点时间,查看 logstash 启动日志可观察 SQL 执行状态。
# 启动容器
docker compose -f docker-compose-es-kibana.yml -f docker-compose-logstash.yml up -d
# 停止容器
docker compose -f docker-compose-es-kibana.yml -f docker-compose-logstash.yml down
