现在有两组 JSON 日志需要上报到 Elasticsearch

  • 下载失败日志
1
2
3
4
5
6
7
8
9
{
"Topic": "download_failed",
"Source": "192.168.1.158",
"FailCode": "0500",
"Title": "多米尼克大战",
"Space": 0,
"Time": 1494023823,
"Speed": "84.82267",
}
  • 下载成功日志
1
2
3
4
5
6
7
8
{
"Topic": "download_success",
"Source": "192.168.1.158",
"Title": "爱送书",
"Space": 1028,
"Time": 1494090811,
"Speed": "120.82",
}

filter配置

配置好 logstash pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
input {
stdin{
codec => json
}
}

filter{
# @timestamp 取 stdin 传入的 __time__ 字段
date {
match => ["Time", "UNIX"]
}

# 经纬度转换
geoip {
source => "Source"
}
}

output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "logstash-android-%{+YYYY.MM.dd}"
document_type => "%{Topic}"
}
}

1.上面看到 投递到 Elasticsearch 的索引是 logstash- 为前缀,是为了让 geoIp 处理后的经纬度符合 geopint 类型,相关参考: https://github.com/elastic/logstash/issues/3137

2.将这两条日志通过 pipeline 投递到 Elasticsearch 时会自动创建 mapping,可以在 kibana > Dev Tools 通过命令查看 mapping

1
GET logstash-android-*/_mapping

如果你的 mapping 类型和你预期的不符合,可以调整filter,比如 原始数据 Speed 字段是 string 类型,而你想要 float 类型

1
2
3
4
5
mutate {
convert => {
"Space" => "integer"
}
}

template 配置

  • TODO