时间:2021-05-10 08:41:45 | 栏目:Nginx | 点击:次
所有ELK的安装包都可以去官网下载,虽然速度稍慢,但还可以接受,官网地址:https://www.elastic.co/
logstash
在Logstash1.5.1版本,pattern的目录已经发生改变,存储在/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-0.1.10/目录下,但是好在配置引用的时候是可以对patterns的目录进行配置的,所以本人在Logstash的根目录下新建了一个patterns目录。而配置目录在1.5.1版本中也不存在了,如果是rpm包安装的,可以在/etc/logstash/conf.d/下面进行配置,但个人测试多次,这样启动经常性的失败,目前还没有去分析原因(个人不推荐使用RPM包安装)。所以大家可以采用Nohup或者screen的方式进行启动
专属nginx的pattern配置:
由于是测试环境,我这里使用logstash读取nginx日志文件的方式来获取nginx的日志,并且仅读取了nginx的access log,对于error log没有关心。
使用的logstash版本为2.2.0,在log stash程序目录下创建conf文件夹,用于存放解析日志的配置文件,并在其中创建文件test.conf,文件内容如下:
input { file { path => ["/var/log/nginx/access.log"] } } filter { grok { match => { "message" => "%{IPORHOST:clientip} \[%{HTTPDATE:time}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"(?<http_referer>\S+)\" \"(?<http_user_agent>\S+)\" \"(?<http_x_forwarded_for>\S+)\"" } } } output { elasticsearch { hosts => ["10.103.17.4:9200"] index => "logstash-nginx-test-%{+YYYY.MM.dd}" workers => 1 flush_size => 1 idle_flush_time => 1 template_overwrite => true } stdout{codec => rubydebug} }
需要说明的是,filter字段中的grok部分,由于nginx的日志是格式化的,logstash解析日志的思路为通过正则表达式来匹配日志,并将字段保存到相应的变量中。logstash中使用grok插件来解析日志,grok中message部分为对应的grok语法,并不完全等价于正则表达式的语法,在其中增加了变量信息。
具体grok语法不作过多介绍,可以通过logstash的官方文档中来了解。但grok语法中的变量类型如IPORHOST并未找到具体的文档,只能通过在logstash的安装目录下通过grep -nr "IPORHOST" .来搜索具体的含义。
配置文件中的stdout部分用于打印grok解析结果的信息,在调试阶段一定要打开。
可以通过这里来验证grok表达式的语法是否正确,编写grok表达式的时候可以在这里编写和测试。
对于elasticsearch部分不做过多介绍,网上容易找到资料。
elk收集分析nginx access日志
使用redis的push和pop做队列,然后有个logstash_indexer来从队列中pop数据分析插入elasticsearch。这样做的好处是可扩展,logstash_agent只需要收集log进入队列即可,比较可能会有瓶颈的log分析使用logstash_indexer来做,而这个logstash_indexer又是可以水平扩展的,我可以在单独的机器上跑多个indexer来进行日志分析存储。
好了,现在进一步配置了。
nginx中的日志存储格式
nginx由于有get请求,也有post请求,get请求的参数是会直接显示在日志的url中的,但是post请求的参数呢,却不会在access日志中体现出来。那么我想要post的参数也进行存储纪录下来。就需要自己定义一个log格式了。
log_format logstash '$http_host $server_addr $remote_addr [$time_local] "$request" $request_body $status $body_bytes_sent "$http_referer" "$http_user_agent" $request_time $upstream_response_time';
这里的server_addr存放的是当前web机器的IP,存这个IP是为了分析日志的时候可以分析日志的原始来源。
下面是一个GET请求的例子:
api.yejianfeng.com 10.171.xx.xx 100.97.xx.xx [10/Jun/2015:10:53:24 +0800] "GET /api1.2/qa/getquestionlist/?limit=10&source=ios&token=12343425324&type=1&uid=304116&ver=1.2.379 HTTP/1.0" - 200 2950 "-" "TheMaster/1.2.379 (iPhone; iOS 8.3; Scale/2.00)" 0.656 0.654
api.yejianfeng.com 10.171.xx.xx 100.97.xx.xx [10/Jun/2015:10:53:24 +0800] "POST /api1.2/user/mechanicupdate/ HTTP/1.0" start_time=1276099200&lng=110.985723&source=android&uid=328910&lat=35.039471&city=140800 200 754 "-" "-" 0.161 0.159
listen 80; server_name api.yejianfeng.com; access_log /mnt/logs/api.yejianfeng.com.logstash.log logstash;
log_agent的配置
这个配置就是往redis队列中塞入日志就行。output的位置设置为redis就行。
input {
file {
type => "nginx_access"
path => ["/mnt/logs/api.yejianfeng.com.logstash.log"]
}
}
output {
redis {
host => "10.173.xx.xx"
port => 8001
password => pass
data_type => "list"
key => "logstash:redis"
}
}
log_indexer的配置
log_indexer的配置就比较麻烦了,需要配置的有三个部分
input: 负责从redis中获取日志数据
filter: 负责对日志数据进行分析和结构化
output: 负责将结构化的数据存储进入elasticsearch
input部分
input { redis { host => "10.173.xx.xx" port => 8001 password => pass data_type => "list" key => "logstash:redis" } }
其中的redis配置当然要和agent的一致了。
filter部分
解析文本可以使用grokgrok debug进行分析,参照着之前的log格式,需要一个个进行日志分析比对。这个grok语法写的还是比较复杂的,还好有在线grok比对工具可以使用。比对前面的GET和POST的日志格式,修改出来的grok语句如下:
%{IPORHOST:http_host} %{IPORHOST:server_ip} %{IPORHOST:client_ip} \[%{HTTPDATE:timestamp}\] \"%{WORD:http_verb} (?:%{PATH:baseurl}\?%{NOTSPACE:params}(?: HTTP/%{NUMBER:http_version})?|%{DATA:raw_http_request})\" (%{NOTSPACE:params})?|- %{NUMBER:http_status_code} (?:%{NUMBER:bytes_read}|-) %{QS:referrer} %{QS:agent} %{NUMBER:time_duration:float} %{NUMBER:time_backend_response:float}
好了,现在params中是请求的参数。比如source=ios&uid=123。但是呢,最后做统计的时候,我希望得出的是“所有source值为ios的调用”,那么就需要对参数进行结构化了。而且我们还希望如果接口中新加入了一个参数,不用修改logstash_indexer就可以直接使用,方法就是使用kv,kv能实现对一个字符串的结构进行k=v格式的拆分。其中的参数prefix可以为这个key在统计的时候增加一个前缀,include_keys可以设置有哪些key包含在其中,exclude_keys可以设置要排除哪些key。
kv { prefix => "params." field_split => "&" source => "params" }
好了,现在还有一个问题,如果请求中有中文,那么日志中的中文是被urlencode之后存储的。我们具体分析的时候,比如有个接口是/api/search?keyword=我们,需要统计的是keyword被查询的热门顺序,那么就需要解码了。logstash牛逼的也有urldecode命令,urldecode可以设置对某个字段,也可以设置对所有字段进行解码。
urldecode { all_fields => true }
看起来没事了,但是实际上在运行的时候,你会发现一个问题,就是存储到elasticsearch中的timestamp和请求日志中的请求时间不一样。原因是es中的请求日志使用的是日志结构存放进入es的时间,而不是timestamp的时间,这里想要吧es中的时间和请求日志中的时间统一怎么办呢?使用date命令。具体设置如下:
date { locale => "en" match => ["timestamp" , "dd/MMM/YYYY:HH:mm:ss Z"] }
具体的logstash_indexer中的全部配置如下:
filter { grok { match => [ "message", "%{IPORHOST:http_host} %{IPORHOST:server_ip} %{IPORHOST:client_ip} \[%{HTTPDATE:timestamp}\] \"%{WORD:http_verb} (?:%{PATH:baseurl}\?%{NOTSPACE:params}(?: HTTP/%{NUMBER:http_version})?|%{DATA:raw_http_request})\" (%{NOTSPACE:params})?|- %{NUMBER:http_status_code} (?:%{NUMBER:bytes_read}|-) %{QS:referrer} %{QS:agent} %{NUMBER:time_duration:float} %{NUMBER:time_backend_response:float}" ] } kv { prefix => "params." field_split => "&" source => "params" } urldecode { all_fields => true } date { locale => "en" match => ["timestamp" , "dd/MMM/YYYY:HH:mm:ss Z"] } }
output部分
这里就是很简单往es中发送数据
output { elasticsearch { embedded => false protocol => "http" host => "localhost" port => "9200" user => "yejianfeng" password => "yejianfeng" } }
这里有个user和password,其实elasticsearch加上shield就可以强制使用用户名密码登录了。这里的output就是配置这个使用的。
查询elasticsearch
比如上面的例子,我要查询某段时间的params.source(其实是source参数,但是前面的params是前缀)调用情况
$url = 'http://xx.xx.xx.xx:9200/logstash-*/_search'; $filter = ' { "query": { "range" : { "@timestamp" : { "gt" : 123213213213, "lt" : 123213213213 } } }, "aggs" : { "group_by_source" : {"terms" : {"field" : "params.source"}} }, "size": 0 }';