当前位置:首页 » 《关注互联网》 » 正文

运维实操——日志分析系统ELK(中)之logstash采集数据、伪装rsyslog、多行过滤、grok切片_谁主沉浮lyb的博客

21 人参与  2021年08月17日 14:43  分类 : 《关注互联网》  评论

点击全文阅读


日志分析系统ELK(中)之logstash

  • 1、什么是logstash?
  • 2、Logstash安装
  • 3、logstash简单命令行测试
  • 4、logstash文件测试
    • (1)命令行输入,输出到文件
    • (2)命令行输入,输出到elasticsearch
    • (3)文件输入,输出到elasticsearch
  • 5、logstash可以伪装成日志服务器,直接接受远程日志
  • 6、多行过滤插件
    • (1)命令行多行输入,文件输出
    • (2)文件多行输入,输出到elasticsearch
  • 7、grok切片过滤插件
    • (1)命令行输入,过滤,命令行输出
    • (2)apache日志输入,切片,es输出

接上篇,server3、server4、server5,是Elasticsearch集群。本文介绍logstash

1、什么是logstash?

Logstash是一个开源的服务器端数据处理管道,聚合器。logstash拥有200多个插件,能够同时从多个来源采集数据,转换数据,过滤数据,然后将数据发送到您最喜欢的 “存储库” 中。(大多都是 Elasticsearch。)

Logstash管道有两个必需的元素,输入和输出,以及一个可选元素过滤器。如下图
在这里插入图片描述
(1)输入:采集各种样式、大小和来源的数据
Logstash 支持各种输入选择 ,同时从众多常用来源捕捉事件。
能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
在这里插入图片描述
(2)过滤器:实时解析和转换数据
数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。

  • 利用 Grok 从非结构化数据中派生出结构
  • 从 IP 地址破译出地理坐标
  • 将 PII 数据匿名化,完全排除敏感字段
  • 简化整体处理,不受数据源、格式或架构的影响

(3)输出:选择您的存储库,导出您的数据
尽管 Elasticsearch 是我们的首选输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。
在这里插入图片描述

2、Logstash安装

官网https://elasticsearch.cn/download/下载Logstash软件包,并准备java的jdk包
准备新的虚拟机serever6(172.25.77.6),分配内存1G

server6安装jdk和logstash
在这里插入图片描述

3、logstash简单命令行测试

找到logstash命令的路径执行,标准输入到标准输出,即命令行输入,命令行输出
在这里插入图片描述
输入hello word,标准输出hello word;输入test,标准输出test
在这里插入图片描述

4、logstash文件测试

(1)命令行输入,输出到文件

/etc/logstash/conf.d/目录下.conf结尾的文件都可以读到,
在这里插入图片描述
编辑test.conf文件如下

[root@server6 conf.d]# cat test.conf 
input {
	stdin {}		%输入来自命令行标准输入
}

output {
 file {				%输出到/tmp/testfile文件中,格式为custom format: {输入内容}
   path => "/tmp/testfile"
   codec => line { format => "custom format: %{message}"}
 }
}

在这里插入图片描述
执行test.conf文件
在这里插入图片描述
命令行输入heiheihei,输出到了/tmp/testfile文件中
在这里插入图片描述
上面的方法无法直接看到结果,不太舒服,修改test.conf文件

[root@server6 conf.d]# cat test.conf 
input {
	stdin {}		
}

output {
	stdout{}		%一份标准输出到命令行
			
 file {				%一份输出到/tmp/testfile文件中
   path => "/tmp/testfile"
   codec => line { format => "custom format: %{message}"}
 }
}

在这里插入图片描述
执行test.conf文件
在这里插入图片描述
输入linux,可以看到标准输出,也输出一份到/tmp/testfile文件中
在这里插入图片描述

(2)命令行输入,输出到elasticsearch

编辑es.conf 文件

[root@server6 conf.d]# cat es.conf 
input {
	stdin {}
}

output {
	stdout {}			%标准输出一份

	elasticsearch {		%给elasticsearch输出一份
		hosts => ["172.25.77.3:9200"]	%目标elasticsearch主机ip
		index => "logstash-%{+yyyy.MM.dd}"	%索引格式为logstash-年月日
	}
}

在这里插入图片描述
执行es.conf 文件
在这里插入图片描述
输入linux haha,标准输出一份
在这里插入图片描述
数据浏览->指定索引logstash-2021.08.14,可以看到elasticsearch输出一份
在这里插入图片描述

(3)文件输入,输出到elasticsearch

现在我们想把日志文件作为输入,首先要把权限改为644,因为logstash读取时是logstash身份,所以必须开放读的权力。
在这里插入图片描述
修改es.conf 文件

[root@server6 conf.d]# cat es.conf 
input {
	file {				%从文件/var/log/messages输入,从头开始输入
		path => "/var/log/messages"
		start_position => "beginning"
	}
}

output {
	stdout {}			%标准输出

	elasticsearch {		%输出elasticsearch
		hosts => ["172.25.77.3:9200"]	
		index => "logstash-%{+yyyy.MM.dd}"	
	}
}

在这里插入图片描述
执行es.conf 文件
在这里插入图片描述
可以看到输入了很多/var/log/messages
在这里插入图片描述
elasticsearch也可以看到很多数据
在这里插入图片描述
假如我们把刚才创建的索引删除了,再次创建可以恢复吗?
在这里插入图片描述
删除索引
在这里插入图片描述
再次执行es.conf 文件,会发现没有数据输入
在这里插入图片描述
由于终端被占用了,再开启一个窗口,输入一条日志
在这里插入图片描述
现在再返回去看,有输入了,一个是远程登录,一个是刚创建的日志,都是新的日志,那旧的日志呢?我写的从头开始输入阿
在这里插入图片描述
数据输入到elasticsearch了
在这里插入图片描述
再次删除索引
在这里插入图片描述
/usr/share/logstash/data/plugins/inputs/file/目录下,有一个.sincedb的文件,它负责记录数据偏移量,之前看到那个位置了,不会重复输入,所以删除他,就可以重新全部输入了
在这里插入图片描述
这里补充一下,sincedb文件一共6个字段,分别表示inode编号、文件系统的主要设备号、文件系统的次要设备号、文件中的当前字节偏移量、最后一个活动时间戳(浮点数)、与此记录匹配的最后一个已知路径
在这里插入图片描述
再次执行es.conf 文件
在这里插入图片描述
/var/log/messages的数据又全部输入了一遍
在这里插入图片描述
数据输入到elasticsearch了
在这里插入图片描述

5、logstash可以伪装成日志服务器,直接接受远程日志

如果按照前面的方法收集日志信息,需要每台服务器上都部署logstash,这样太累了,那能不能让logstash伪装成日志服务器,每个节点服务器远程发送日志给logstash呢?

编辑es.conf 文件

[root@server6 conf.d]# cat es.conf 
input {
	#file {
	#	path => "/var/log/messages"
	#	start_position => "beginning"
	#}
	
	syslog {		%伪装syslog,开放端口514
		port => 514
	}
}

output {
	stdout {}

	elasticsearch {
		hosts => ["172.25.77.3:9200"]
		index => "syslog-%{+yyyy.MM.dd}"	%索引为syslog-年月日
	}
}

在这里插入图片描述
执行es.conf 文件
在这里插入图片描述
新开一个窗口查看端口514已开放
在这里插入图片描述
远程主机server3编辑/etc/rsyslog.conf文件
打开514端口
在这里插入图片描述
所有的日志发送给172.25.77.6:514一份
在这里插入图片描述
server3重启rsyslog服务
在这里插入图片描述
server6的窗口有输入了
在这里插入图片描述

现在elasticsearch就可以看到server3的日志了
在这里插入图片描述
同理,修改远程主机server4的/etc/rsyslog.conf文件
在这里插入图片描述
在这里插入图片描述
server4重启rsyslog服务
在这里插入图片描述
server6的窗口又有输入了
在这里插入图片描述
elasticsearch可以看到server4的日志了
在这里插入图片描述
这时server6查看514端口有三个,第一个是自己的接受端口,第二个是server3的发送端口,第三个是server4的发送端口
在这里插入图片描述

6、多行过滤插件

错误日志一般都有很多行,如果按照前面的做法会分成很多条,分开读,单独看根本不知道什么意思,多行过滤可以把多行日志记录合并为一行输出。

(1)命令行多行输入,文件输出

编辑test.conf文件

[root@server6 conf.d]# cat test.conf 
input {
      stdin {
          codec => multiline {	%多行输入
            pattern => "EOF"	%结束标志词为EOF
            negate => "true"
            what => "previous"
          }
        }
}

output {
	stdout {}

 file {				
   path => "/tmp/testfile"		%输出到文件/tmp/testfile
   codec => line { format => "custom format: %{message}"}	%格式为custom format:{数据}
 }
}

在这里插入图片描述
执行test.conf文件
在这里插入图片描述
测试:多行输入,以EOF结束输入,可以看到标准输出是一条
在这里插入图片描述
/tmp/testfile文件中也是一条
在这里插入图片描述

(2)文件多行输入,输出到elasticsearch

接下来用文件输入来测试,使用es集群的server3的日志(之前有一些报错日志),把my-es.log发给server6的/var/log
在这里插入图片描述
先看一下正确的日志都是以时间开头的,并且被中括号[ ]括起来的一行,而错误日志有很多行,比如下图的at org开头的这些,他们合起来应该是一条错误日志。
在这里插入图片描述
修改test.conf文件,先不加多行输入模块,看效果

[root@server6 conf.d]# cat test.conf 
input {
      file {		
        path => "/var/log/my-es.log"	%文件/var/log/my-es.log作为输入
        start_position => "beginning"	%从头开始输入
         # codec => multiline {
         #   pattern => "EOF"
         #   negate => "true"
         #   what => "previous"
         # }
      }
}

output {
	stdout {}

	elasticsearch {			%输出到es
		hosts => ["172.25.77.3:9200"]
		index => "eslog-%{+yyyy.MM.dd}"
	}
}

在这里插入图片描述
执行test.conf文件
在这里插入图片描述
显示输入了很多数据
在这里插入图片描述
在es中查看eslog索引,搜索at org,可以看到他们分成了一条一条的单独的数据
在这里插入图片描述
现在删除该索引
在这里插入图片描述
并删除相关的sincedb
在这里插入图片描述
修改test.conf文件

[root@server6 conf.d]# cat test.conf 
input {
      file {		
        path => "/var/log/my-es.log"	
        start_position => "beginning"	
          codec => multiline {
            pattern => "^\["	%[开头的为结束词
            negate => "true"
            what => "previous"
          }
      }
}

output {
	stdout {}

	elasticsearch {			%输出到es
		hosts => ["172.25.77.3:9200"]
		index => "eslog-%{+yyyy.MM.dd}"
	}
}

在这里插入图片描述
再次执行test.conf文件
在这里插入图片描述
看到这个是一条数据
在这里插入图片描述
当然也可以在es中查看,是一条数据
在这里插入图片描述

7、grok切片过滤插件

我们平时查看日志,比如查看apache的日志,可以发现很有规律,如下图,先是访问的ip,时间等等,那么能不能只看其中一组数据,比如只想要ip这一列。现在就需要logstash的切片这个功能了
在这里插入图片描述
我们可以根据日志的特征自定义grok的书写,得到想要的切片

(1)命令行输入,过滤,命令行输出

编辑grok.conf文件

[root@server6 conf.d]# cat grok.conf 
input {
	stdin {}
}

filter {
	grok {						%把输入切片成五块,分别对应
	match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
	}
}

output {
	stdout {}
}

在这里插入图片描述
执行grok.conf文件
在这里插入图片描述
输入一串数据,根据设定的切片方法,一一对应
在这里插入图片描述

(2)apache日志输入,切片,es输出

server6安装apache
在这里插入图片描述
开启apache,写入默认发布目录
在这里插入图片描述
真机访问172.25.77.6
在这里插入图片描述
server6查看apache的日志
在这里插入图片描述
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns目录下,有很多软件的日志的输出形式
在这里插入图片描述
看httpd的规定,如何写日志已经提前用变量的方法定义了,所以我们只需要按照这个规定切片就好了
在这里插入图片描述
把apache的日志作为grok的输入,日志文件需要给读的权限,日志文件的目录需要给读和执行的权限,读的时候是logstash的身份
在这里插入图片描述
修改grok.conf文件

[root@server6 conf.d]# cat grok.conf 
input {
	file {
		path => "/var/log/httpd/access_log"		%/var/log/httpd/access_log文件作为输入
		start_position => "beginning"			%从头开始
	}

}

filter {
	grok {
	match => { "message" => "%{HTTPD_COMBINEDLOG}" }	%按照默认的HTTPD_COMBINEDLOG方式切片
	}
}

output {
	stdout {}

	elasticsearch {
		hosts => ["172.25.77.3:9200"]
		index => "apachelog-%{+yyyy.MM.dd}"		%索引名字叫apachelog
	}
	
}

在这里插入图片描述
执行grok.conf文件
在这里插入图片描述
按照默认定义好的模式切片
在这里插入图片描述
es查看,成功切片
在这里插入图片描述


点击全文阅读


本文链接:http://zhangshiyu.com/post/25605.html

输入  文件  输出  
<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1