日志分析系统ELK(中)之logstash
- 1、什么是logstash?
- 2、Logstash安装
- 3、logstash简单命令行测试
- 4、logstash文件测试
- (1)命令行输入,输出到文件
- (2)命令行输入,输出到elasticsearch
- (3)文件输入,输出到elasticsearch
- 5、logstash可以伪装成日志服务器,直接接受远程日志
- 6、多行过滤插件
- (1)命令行多行输入,文件输出
- (2)文件多行输入,输出到elasticsearch
- 7、grok切片过滤插件
- (1)命令行输入,过滤,命令行输出
- (2)apache日志输入,切片,es输出
接上篇,server3、server4、server5,是Elasticsearch集群。本文介绍logstash
1、什么是logstash?
Logstash是一个开源的服务器端数据处理管道,聚合器。logstash拥有200多个插件,能够同时从多个来源采集数据,转换数据,过滤数据,然后将数据发送到您最喜欢的 “存储库” 中。(大多都是 Elasticsearch。)
Logstash管道有两个必需的元素,输入和输出,以及一个可选元素过滤器。如下图
(1)输入:采集各种样式、大小和来源的数据
Logstash 支持各种输入选择 ,同时从众多常用来源捕捉事件。
能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
(2)过滤器:实时解析和转换数据
数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。
- 利用 Grok 从非结构化数据中派生出结构
- 从 IP 地址破译出地理坐标
- 将 PII 数据匿名化,完全排除敏感字段
- 简化整体处理,不受数据源、格式或架构的影响
(3)输出:选择您的存储库,导出您的数据
尽管 Elasticsearch 是我们的首选输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。
2、Logstash安装
官网https://elasticsearch.cn/download/下载Logstash软件包,并准备java的jdk包
准备新的虚拟机serever6(172.25.77.6),分配内存1G
server6安装jdk和logstash
3、logstash简单命令行测试
找到logstash命令的路径执行,标准输入到标准输出,即命令行输入,命令行输出
输入hello word,标准输出hello word;输入test,标准输出test
4、logstash文件测试
(1)命令行输入,输出到文件
在/etc/logstash/conf.d/
目录下.conf结尾的文件都可以读到,
编辑test.conf文件如下
[root@server6 conf.d]# cat test.conf
input {
stdin {} %输入来自命令行标准输入
}
output {
file { %输出到/tmp/testfile文件中,格式为custom format: {输入内容}
path => "/tmp/testfile"
codec => line { format => "custom format: %{message}"}
}
}
执行test.conf文件
命令行输入heiheihei,输出到了/tmp/testfile文件中
上面的方法无法直接看到结果,不太舒服,修改test.conf文件
[root@server6 conf.d]# cat test.conf
input {
stdin {}
}
output {
stdout{} %一份标准输出到命令行
file { %一份输出到/tmp/testfile文件中
path => "/tmp/testfile"
codec => line { format => "custom format: %{message}"}
}
}
执行test.conf文件
输入linux,可以看到标准输出,也输出一份到/tmp/testfile文件中
(2)命令行输入,输出到elasticsearch
编辑es.conf 文件
[root@server6 conf.d]# cat es.conf
input {
stdin {}
}
output {
stdout {} %标准输出一份
elasticsearch { %给elasticsearch输出一份
hosts => ["172.25.77.3:9200"] %目标elasticsearch主机ip
index => "logstash-%{+yyyy.MM.dd}" %索引格式为logstash-年月日
}
}
执行es.conf 文件
输入linux haha,标准输出一份
数据浏览->指定索引logstash-2021.08.14,可以看到elasticsearch输出一份
(3)文件输入,输出到elasticsearch
现在我们想把日志文件作为输入,首先要把权限改为644,因为logstash读取时是logstash身份,所以必须开放读的权力。
修改es.conf 文件
[root@server6 conf.d]# cat es.conf
input {
file { %从文件/var/log/messages输入,从头开始输入
path => "/var/log/messages"
start_position => "beginning"
}
}
output {
stdout {} %标准输出
elasticsearch { %输出elasticsearch
hosts => ["172.25.77.3:9200"]
index => "logstash-%{+yyyy.MM.dd}"
}
}
执行es.conf 文件
可以看到输入了很多/var/log/messages
elasticsearch也可以看到很多数据
假如我们把刚才创建的索引删除了,再次创建可以恢复吗?
删除索引
再次执行es.conf 文件,会发现没有数据输入
由于终端被占用了,再开启一个窗口,输入一条日志
现在再返回去看,有输入了,一个是远程登录,一个是刚创建的日志,都是新的日志,那旧的日志呢?我写的从头开始输入阿
数据输入到elasticsearch了
再次删除索引
在/usr/share/logstash/data/plugins/inputs/file/
目录下,有一个.sincedb
的文件,它负责记录数据偏移量,之前看到那个位置了,不会重复输入,所以删除他,就可以重新全部输入了
这里补充一下,sincedb文件一共6个字段,分别表示inode编号、文件系统的主要设备号、文件系统的次要设备号、文件中的当前字节偏移量、最后一个活动时间戳(浮点数)、与此记录匹配的最后一个已知路径
再次执行es.conf 文件
/var/log/messages的数据又全部输入了一遍
数据输入到elasticsearch了
5、logstash可以伪装成日志服务器,直接接受远程日志
如果按照前面的方法收集日志信息,需要每台服务器上都部署logstash,这样太累了,那能不能让logstash伪装成日志服务器,每个节点服务器远程发送日志给logstash呢?
编辑es.conf 文件
[root@server6 conf.d]# cat es.conf
input {
#file {
# path => "/var/log/messages"
# start_position => "beginning"
#}
syslog { %伪装syslog,开放端口514
port => 514
}
}
output {
stdout {}
elasticsearch {
hosts => ["172.25.77.3:9200"]
index => "syslog-%{+yyyy.MM.dd}" %索引为syslog-年月日
}
}
执行es.conf 文件
新开一个窗口查看端口514已开放
远程主机server3编辑/etc/rsyslog.conf文件
打开514端口
所有的日志发送给172.25.77.6:514一份
server3重启rsyslog服务
server6的窗口有输入了
现在elasticsearch就可以看到server3的日志了
同理,修改远程主机server4的/etc/rsyslog.conf文件
server4重启rsyslog服务
server6的窗口又有输入了
elasticsearch可以看到server4的日志了
这时server6查看514端口有三个,第一个是自己的接受端口,第二个是server3的发送端口,第三个是server4的发送端口
6、多行过滤插件
错误日志一般都有很多行,如果按照前面的做法会分成很多条,分开读,单独看根本不知道什么意思,多行过滤可以把多行日志记录合并为一行输出。
(1)命令行多行输入,文件输出
编辑test.conf文件
[root@server6 conf.d]# cat test.conf
input {
stdin {
codec => multiline { %多行输入
pattern => "EOF" %结束标志词为EOF
negate => "true"
what => "previous"
}
}
}
output {
stdout {}
file {
path => "/tmp/testfile" %输出到文件/tmp/testfile
codec => line { format => "custom format: %{message}"} %格式为custom format:{数据}
}
}
执行test.conf文件
测试:多行输入,以EOF结束输入,可以看到标准输出是一条
/tmp/testfile文件中也是一条
(2)文件多行输入,输出到elasticsearch
接下来用文件输入来测试,使用es集群的server3的日志(之前有一些报错日志),把my-es.log发给server6的/var/log
先看一下正确的日志都是以时间开头的,并且被中括号[ ]括起来的一行,而错误日志有很多行,比如下图的at org开头的这些,他们合起来应该是一条错误日志。
修改test.conf文件,先不加多行输入模块,看效果
[root@server6 conf.d]# cat test.conf
input {
file {
path => "/var/log/my-es.log" %文件/var/log/my-es.log作为输入
start_position => "beginning" %从头开始输入
# codec => multiline {
# pattern => "EOF"
# negate => "true"
# what => "previous"
# }
}
}
output {
stdout {}
elasticsearch { %输出到es
hosts => ["172.25.77.3:9200"]
index => "eslog-%{+yyyy.MM.dd}"
}
}
执行test.conf文件
显示输入了很多数据
在es中查看eslog索引,搜索at org,可以看到他们分成了一条一条的单独的数据
现在删除该索引
并删除相关的sincedb
修改test.conf文件
[root@server6 conf.d]# cat test.conf
input {
file {
path => "/var/log/my-es.log"
start_position => "beginning"
codec => multiline {
pattern => "^\[" %以[开头的为结束词
negate => "true"
what => "previous"
}
}
}
output {
stdout {}
elasticsearch { %输出到es
hosts => ["172.25.77.3:9200"]
index => "eslog-%{+yyyy.MM.dd}"
}
}
再次执行test.conf文件
看到这个是一条数据
当然也可以在es中查看,是一条数据
7、grok切片过滤插件
我们平时查看日志,比如查看apache的日志,可以发现很有规律,如下图,先是访问的ip,时间等等,那么能不能只看其中一组数据,比如只想要ip这一列。现在就需要logstash的切片这个功能了
我们可以根据日志的特征自定义grok的书写,得到想要的切片
(1)命令行输入,过滤,命令行输出
编辑grok.conf文件
[root@server6 conf.d]# cat grok.conf
input {
stdin {}
}
filter {
grok { %把输入切片成五块,分别对应
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
output {
stdout {}
}
执行grok.conf文件
输入一串数据,根据设定的切片方法,一一对应
(2)apache日志输入,切片,es输出
server6安装apache
开启apache,写入默认发布目录
真机访问172.25.77.6
server6查看apache的日志
在/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns
目录下,有很多软件的日志的输出形式
看httpd的规定,如何写日志已经提前用变量的方法定义了,所以我们只需要按照这个规定切片就好了
把apache的日志作为grok的输入,日志文件需要给读的权限,日志文件的目录需要给读和执行的权限,读的时候是logstash的身份
修改grok.conf文件
[root@server6 conf.d]# cat grok.conf
input {
file {
path => "/var/log/httpd/access_log" %/var/log/httpd/access_log文件作为输入
start_position => "beginning" %从头开始
}
}
filter {
grok {
match => { "message" => "%{HTTPD_COMBINEDLOG}" } %按照默认的HTTPD_COMBINEDLOG方式切片
}
}
output {
stdout {}
elasticsearch {
hosts => ["172.25.77.3:9200"]
index => "apachelog-%{+yyyy.MM.dd}" %索引名字叫apachelog
}
}
执行grok.conf文件
按照默认定义好的模式切片
es查看,成功切片