Logstash
- Horizontal Scalable
Pipeline:
Input, filter, Output
To Send Pipeline Configuration Directly to Logstash:
bin/logstash -e "input { stdin {} } output { stdout {} }"
stdin -> Input plugin. We no need plugin for it
stdout -> It is also plugin
The above command will start logstash. We have also requested
pipeline so it will wait for events in stdin. Just enter something in terminal.
It is not very useful but it is just for testing.
Pipeline.conf
We can have any filename, but just need to have below format.
input{
stdin{
}
}
output{
stdout{
}
}
To start logstash with pipeline configuration form
file
bin/logstash -f config/pipelines/pipeline.conf
Wait for it to start.
Enter "Hello World"
We notice that logstash will create couple of fields automatically
like hostname and time.
Pipeline with ruby codec for json output:
input{
stdin{
}
}
output{
stdout{
codec => rubydebug
}
}
We need to restart logstash everytime we make changes, but we can
override that. After restart will see output from rubydebug.
$ bin/logstash -f config/pipelines/pipeline.conf
After restart, if we input "Hello World" it will give
output in JSON with column name which won't get without it and it is easier to
read.
Json Input:
Passing below Json input
{ "amount":10, "quantity":2 }
Output will have them in same line eventhough you have in json. So
to fix it we need another codec. Codec can be used for both input and output.
Codec to read json input:
input{
stdin{
codec => json
}
}
output{
stdout{
codec => rubydebug
}
}
Restart and start logstash.
Input:
{ "amount":10, "quantity":2 }
Output: We will now get fields named as amount and quantity.
Trying with invalid json:
Input
{ invalid json }
Output: Will show error in log but just put the contents in message.
It wont discard the json.
message: "{ invalid json }"
Since we
are using stdin, it can process only single line. Multiline will give same
error we got.
Multiple Output - json and file:
input{
stdin{
codec => json
}
}
output{
stdout{
codec => rubydebug
}
file{
path => "output.txt"
}
}
Restart logstash, it will write output to both stdout as json and
file
input:
{ "amount":10, "quantity":2 }
Output: Will be in both stdout and file.
HTTP input
input{
stdin{
codec => json
}
http {
host =>
"127.0.0.1"
port -> 8080
}
}
output{
stdout{
codec => rubydebug
}
file{
path => "output.txt"
}
}
HTTP request can be sent using postman and CURL.
In postman:
HTTP PUT: 127.0.0.1:8080 (Note in lecture he used logstash url)
In Body Tab choose raw and slect json:
{
"amount":10
"quantity":2
}
In Curl:
curl -XPUT -H "Content-Type: application/json" -d '' '{
"amount":7, "quantity":3 }' http://localhost:8080
Filtering events - Mutate:
It is used to define datatype. For example in our environment
"Runid" is number but we use it as string. So we can use mutate
plugin to make sure RunId is string.
input{
stdin{
codec => json
}
http {
host =>
"127.0.0.1"
port -> 8080
}
}
filter {
mutate{
convert => { "quantity" => "integer" }
}
}
output{
stdout{
codec => rubydebug
}
file{
path => "output.txt"
}
}
Supported Datatype: Floats, Boolean, Strings
Other filter plugin
- common options:
Option
|
Purpose
|
add_field
|
Adds
one or more field to events
|
remove_field
|
Removes
one or more fields
|
add_tag
|
Adds
one or more tags
|
remove_tag
|
Removes
one or more tags
|
Example: remove filed:
input{
stdin{
codec => json
}
http {
host =>
"127.0.0.1"
port -> 8080
}
}
filter {
mutate{
remove_field=> [ "host" ]
}
}
output{
stdout{
codec => rubydebug
}
file{
path => "output.txt"
}
}
We can use it with any plugin, here we used it with mutate.
Logstash Execution Model:
We have number of "inputs" listening for events. Each
input run in its own thread to avoid collision.
So multiple inputs are handled concurrently. Inputs uses codec to
decode, example: json. Then input will be sent to "work queue". From
there pipeline will do remaining process - filter and output along with codec.
Workes will also run in own thread, the can be process simultaneously .
Pipeline Batch Size -
Pipeline Batch Delay - How long to wait before processing an
undersized batch of events
Example: If maximum batch size is set to 50 and batch delay is 100
ms, in this case batch of events would be processed if there are 50 unprocessed
events in the work view or if 100 milliseconds have elapsed.
The reason that a batch will process even if the maximum batch size
has not been reached is not to delay processing by too much.
Pipeline works are based on CPU cores.
Enabling automatic reload pipeline:
Logs can be stored under event-data directory.
To reload pipeline automatically start with below option:
bin/logstash -f config/pipelines/pipeline.conf
--config.reload.automatic
input{
file{
path => "<path of log>"
start_position => "beginning"
}
}
output{
stdout{
codec => rubydebug
}
}
Note: STDIN plugin wont support automatic reload. In video first he
used STDIN to prove. Then he changed to file and restarted with that settings
Sincedb files:
Sincedb files tracks what files are loaded and how much
Go to cd data/plugins/inputs/file/
sincedb file is hidden file and if you cat there will be following 4
numeric fields . 4th one is the offset which maintains until which line it has
processed.
- inode
- major device number
- minor device number
- byte offset
If we delete this sincedb file, it will start pushing events from
beginning. But it will be in memory so first stop then delete and then start.
When using filebeat delete this direcotry -> /logstash-6.8.1/data/queue
Adding http codec:
input{
file{
path => "<path of log>"
start_position => "beginning"
}
http{
}
}
output{
stdout{
codec => rubydebug
}
}
Logstash will check for pipline changes every 3 secs.
GROK
%{SYNTAX:SEMANTIC}
For more patters check this -> https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
Basic patterns:
John -> %{WORD:first_name}
Doe -> %{WORD:last_name}
john@doe.com -> %{EMAILADDRESS:email}
32 -> %{INT:age}
input{
file{
path => "<path of log>"
start_position => "beginning"
}
http{
}
}
filter{
grok{
match => {"message" => "%{IP:ip_address} %{USER:identity} %{USER:auth}
\[%{HTTPDATE:req_ts}\] \"{%WORD:http_verb} %{URIPATHPARAM:req_path}\"
%{INT:http_status:int} %{INT:num_bytes:int} " }
}
}
output{
stdout{
codec => rubydebug
}
}
GROK Log:
84.252.109.232 - Joe [20/Sep/2017:13:22:22 +0200] GET
/products/view/123 200 12798
Taking GROK from GIT:
input{
file{
path => "<path of log>"
start_position => "beginning"
}
http{
}
}
filter{
grok{
match => {"message" =>
"%{HTTPD_COMBINEDLOG}" }
}
mutate{
gsub => [
"agent", '"', ""
"referrer", '"', ""
]
}
}
output{
stdout{
codec => rubydebug
}
}
Plugins will be loaded in the order they appear. So use grok first
and then mutate.
gsub -> substitute. First give field name, then value we want to
replace, with what to be replaced. Here we are replacing with empty.
To avoid escaping use single quotes.
Accessing field values:
This is useful when using conditional.
Field is variable name. Lets say we have a json:
{
"request":"/products/view/123"
"headers":{
"request_path"=> "/"
}
}
To reference a field simply use:
request or [request] using square brackets is explicit.
For nested field:
[headers][request_path]
Reusing same pipeline used for apache log, but made littile
modification in filter as I skipped on video. This json covers the output part.
type option will filter conditionally.
He used key word String expansion. So below code he puts access log,
later he will show error log. Type option mainly used to conditionally apply
filter.
input{
file{
path => "<path of log>"
start_position => "beginning"
type => "access"
}
http{
type => "access"
}
}
filter{
grok{
match => {"message" =>
'"%{HTTPD_COMMONLOG}" "%{GREEDYDATA:referrer}"
"%{GREEDYDATA:agent}"' }
}
mutate{
convert => [
"response" => "integer"
"bytes" => "integer"
]
}
}
output{
stdout{
codec => rubydebug
}
file {
path => "%{type}.log"
}
}
With above pipeline, we will get type=access in the event and
access.log will be created.
Formatting dates:
Syntax -> %{+DATE_FORMAT}
It must be in Java Date format
Example: %{+yyyy-MM-dd}
input{
file{
path => "<path of log>"
start_position => "beginning"
type => "access"
}
http{
type => "access"
}
}
filter{
grok{
match => {"message" =>
'"%{HTTPD_COMMONLOG}" "%{GREEDYDATA:referrer}"
"%{GREEDYDATA:agent}"' }
}
mutate{
convert => [
"response" => "integer"
"bytes" => "integer"
]
}
}
output{
stdout{
codec => rubydebug
}
file {
path => "%{type}_%{+yyyy_MM_dd}.log"
}
}
In above we are formating the @timestamp field.
Setting the time of the event:
Logstash will already set time of the event using @timestamp, but
the time will be when the events are processed. Suppose we receive a order, we
want exactly the time order was received not the time it is processed.
We are using date plugin in filter. The timestamp is the variable
generated by logstash. In the timestamp Z refers to UTC offset.
input{
file{
path => "<path of log>"
start_position => "beginning"
type => "access"
}
http{
type => "access"
}
}
filter{
grok{
match => {"message" =>
'"%{HTTPD_COMMONLOG}" "%{GREEDYDATA:referrer}"
"%{GREEDYDATA:agent}"' }
}
mutate{
convert => [
"response" => "integer"
"bytes" => "integer"
]
}
date{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss
Z" ]
}
}
output{
stdout{
codec => rubydebug
}
file {
path => "%{type}_%{+yyyy_MM_dd}.log"
}
}
Output: "timestamp" => 20/Aug/2017:13:22:22 +0200.
Note: This is not the same timestamp generated by logstash. If
parsing of date fails the filter plugin will add a tag name _dateparsefailure. Now the timestamp and
@timestamp will be same, so removing timestamp.
To remove timestamp field, use the same code above but changes to
date plugin:
date{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss
Z" ]
remove_field => [ "timestamp" ]
}
Introduction to conditional statement:
EXPR can be any valid logstash expression.
if EXPR {
} else if EXPR {
} else {
}
Example:
if [type] == "access" {
elasticsearch { … }
} else if [type] == "error" {
file { … }
} else {
}
If we want only error logs to be send to file put inside else if.
GROUP
|
Operators
|
Example
|
Equality
|
==, !=,
<, >, <=, >=
|
if
[headers][content_length] >= 1000 {
}
|
Regexp
|
=~
(checks match), !~ (checks no match)
|
if
[some_field] =~ /[0-9]+/ {
}
|
inclusion
|
in, not
in
|
if
[some_field] in ["one", "two", "three"]{
}
|
Boolean
|
and, or
|
|
Unary
|
!
|
Used to
negate the expression. Like in bash ! -d not a directory
|
Preparing pipeline to handle both access and error log
using conditional - HTTP Plugin:
According to video, it is better to start conditional statement in
filter block.
input{
file{
path =>
"/users/andy/desktop/logstash/event-data/apache_access.log"
start_position => "beginning"
}
http{
}
}
filter{
if [headers][request_uri] =~ "error" {
mutate{
replace => { type => "error" }
}
} else {
mutate{
replace => { type => "access" }
}
grok{
match => {"message" =>
'"%{HTTPD_COMMONLOG}" "%{GREEDYDATA:referrer}"
"%{GREEDYDATA:agent}"' }
}
mutate{
convert => [
"response" => "integer"
"bytes" => "integer"
]
}
date{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss
Z" ]
remove_field => [ "timestamp" ]
}
}
}
output{
stdout{
codec => rubydebug
}
file {
path => "%{type}_%{+yyyy_MM_dd}.log"
}
}
Now he tried send some events via postman and it evaluated to access
log type.
Then he tried adding error at the end of URL in postman and it
identified as error.
My observation: He is able to do it becaues he uses HTTP plugin
in input so he gets some key value pair.
He is using [headers][request_uri] which is provided by HTTP plugin. In our
case we use file, so need to think about it.
Preparing pipeline to handle both access and error log
using conditional - File Plugin:
We can match file path by below methods:
/pat/to/*.log
/path/to/**/*.log
/path/to/{nginx,apache}/*.log
Tags will store the failures related to parsing like grok and date.
He uses it to drop those events where parsing got failed.
input{
file{
path =>
"/users/andy/desktop/logstash/event-data/apache_*.log"
start_position => "beginning"
}
http{
}
}
filter{
if [headers][request_uri] =~ "error" or [path] =~ "errors" {
mutate{
replace => { type => "error" }
}
} else {
mutate{
replace => { type => "access" }
}
grok{
match => {"message" =>
'"%{HTTPD_COMMONLOG}" "%{GREEDYDATA:referrer}"
"%{GREEDYDATA:agent}"' }
}
if "_grokparsefailure" in [tags] {
drop{}
}
mutate{
convert => [
"response" => "integer"
"bytes" => "integer"
]
}
date{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss
Z" ]
remove_field => [ "timestamp" ]
}
}
}
output{
stdout{
codec => rubydebug
}
file {
path => "%{type}_%{+yyyy_MM_dd}.log"
}
}
Geographical Data Enrichment:
GeoLite2 database
Geoip is the plugin
input{
file{
path =>
"/users/andy/desktop/logstash/event-data/apache_*.log"
start_position => "beginning"
}
http{
}
}
filter{
if [headers][request_uri] =~ "error" or [path] =~ "errors" {
mutate{
replace => { type => "error" }
}
} else {
mutate{
replace => { type => "access" }
}
grok{
match => {"message" =>
'"%{HTTPD_COMMONLOG}" "%{GREEDYDATA:referrer}"
"%{GREEDYDATA:agent}"' }
}
if "_grokparsefailure" in [tags] {
drop{}
}
mutate{
convert => [
"response" => "integer"
"bytes" => "integer"
]
}
date{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss
Z" ]
remove_field => [ "timestamp" ]
}
geoip {
source => "clientip"
}
}
}
output{
stdout{
codec => rubydebug
}
file {
path => "%{type}_%{+yyyy_MM_dd}.log"
}
}
We will get region based on the IP like log, lat. It is not much
accurate like commercial one. On failure it put _geoip fialure
Parsing user agent:
Grok patter will just parse data but will not say which browser the
user used.
input{
file{
path =>
"/users/andy/desktop/logstash/event-data/apache_*.log"
start_position => "beginning"
}
http{
}
}
filter{
if [headers][request_uri] =~ "error" or [path] =~ "errors" {
mutate{
replace => { type => "error" }
}
} else {
mutate{
replace => { type => "access" }
}
grok{
match => {"message" =>
'"%{HTTPD_COMMONLOG}" "%{GREEDYDATA:referrer}"
"%{GREEDYDATA:agent}"' }
}
if "_grokparsefailure" in [tags] {
drop{}
}
useragent {
source => "agent"
target => "ua"
}
mutate{
convert => [
"response" => "integer"
"bytes" => "integer"
]
}
date{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss
Z" ]
remove_field => [ "timestamp" ]
}
geoip {
source => "clientip"
}
}
}
output{
stdout{
codec => rubydebug
}
file {
path => "%{type}_%{+yyyy_MM_dd}.log"
}
}
Output:
Source is the field which is already present in our log
Target is the field which this plugin creates
ua is just arbitary. you can name anything.
"ua" => {
"OS => "Ios",
"major" => "11",
…
"device" => "iPHone"
}
Finishing the pipeline:
In below code, we are adding mutate after geoip to remove some
fields.
Drop the Admin page request. Code under useragent{}
Drop Static files like js css etc using if[request]
Use more filter on useragent, so it should be after the useragent
plugin.
He also modified output to use elasticsearch.
input{
file{
start_position => "beginning"
path =>
"/users/andy/desktop/logstash/event-data/apache_*.log"
}
http{
}
}
filter{
if [headers][request_uri] =~ "error" or [path] =~ "errors" {
mutate{
replace => { type => "error" }
}
} else {
mutate{
replace => { type => "access" }
}
grok{
match => {"message" =>
'"%{HTTPD_COMMONLOG}" "%{GREEDYDATA:referrer}"
"%{GREEDYDATA:agent}"' }
}
if "_grokparsefailure" in [tags] {
drop{}
}
useragent {
source => "agent"
target => "ua"
}
#Admin pages
if [request] =~ /^\/admin\// {
drop { }
}
#Static files
if [ua][devices] == "Spider" {
drop { }
}
if[request] =~ /^\js\//
or [request] =~ /^\/css\//
or [request] in [ "/robots.txt", "/favicon.ico"
] {
drop { }
}
#Crawlers
mutate{
convert => [
"response" => "integer"
"bytes" => "integer"
]
}
date{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss
Z" ]
remove_field => [ "timestamp" ]
}
geoip {
source => "clientip"
}
}
mutate{
remove_field => [ "headers", "@version",
"host" ]
}
}
output{
elasticsearch {
hosts => [" localhost:2000" ]
#index => "%{type}-%{+YYYY.MM.dd}"
document_type => "default"
http_compression => true
}
}
If you change the default index then you will miss few settings like
longitude and latitude.
Multiline Option:
Multiline option for Java Stack Trace. Java stack trace are intended
by TAB and it has some lines begins with "Caused By". So we need to
write regular expression to match them both.
Java Stack Trace:
Sep 9, 2017 6:14:2 AM ERROR
com.codingexland.blog.service.PostServiceImple:
at: com.amazonawsauth
Caused by:
com.amazonaws.AmazonClientException: Unable to calculate a request
We write regex only for at and caused by. The pattern will
automatically check if the next line of the event matches the regex.
Regex for beginning with tab and Caused by -> pattern =?
"^(\s+|\t)|(Caused by:)"
\s+ -> Starting with One or More Space
\t -> Starting with Tab
input{
file{
start_position => "beginning"
path =>
"/users/andy/desktop/logstash/event-data/apache_*.log"
}
file{
path =>
"/Users/andy/Desktop/logstash/ecent-data/java-errors.log"
start_position => "beginning"
code => multiline {
pattern => "^(\s+|\t)|(Caused by:)"
what => "previous"
auto_flush_interval => 5
}
}
}
filter{
if [headers][request_uri] =~ "error" or [path] =~
"errors" {
mutate{
replace => { type => "error" }
}
} else {
mutate{
replace => { type => "access" }
}
grok{
match => {"message" =>
'"%{HTTPD_COMMONLOG}" "%{GREEDYDATA:referrer}"
"%{GREEDYDATA:agent}"' }
}
if "_grokparsefailure" in [tags] {
drop{}
}
useragent {
source => "agent"
target => "ua"
}
#Admin pages
if [request] =~ /^\/admin\// {
drop { }
}
#Static files
if [ua][devices] == "Spider" {
drop { }
}
if[request] =~ /^\js\//
or [request] =~ /^\/css\//
or [request] in [ "/robots.txt", "/favicon.ico"
] {
drop { }
}
#Crawlers
mutate{
convert => [
"response" => "integer"
"bytes" => "integer"
]
}
date{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss
Z" ]
remove_field => [ "timestamp" ]
}
geoip {
source => "clientip"
}
}
mutate{
remove_field => [ "headers", "@version",
"host" ]
}
}
output{
if [type] == "access" {
elasticsearch {
hosts => [" localhost:2000" ]
#index => "%{type}-%{+YYYY.MM.dd}"
document_type => "default"
http_compression => true
}
}else{
stdout {
codec => rubydebug
}
}
}
In video,
the first time he started there was no output after restarted but after copying
the stack trace again he was able to see the line in stdout.
If not
output after restart then probably there is mistake in pipeline. The multiline
codec receives one line at a time and it has no way of knowing when it will
receive the next line or when there are no more lines to process the code is
not aware of which plugin is using. The codec waits around until it receives
the line that does not match pattern. When that happens it groups all of the
preceding lines into an events which were defined with "what" option.
In this
case, there is no sincedb because it was new file. When we restarted first time
it was waiting for end of the pattern. In the file all he had is one stack
trace and even the last line was matching the pattern so it was keep waiting to
see even the next line matches the pattern. When he copy pasted stack trace
again, the first line "Sep " won't match the pattern so it got
processed.
In real
world we will be always one line behind. To overcome this we can add a option auto_flush_interval. He removed sincedb as he
was using same file to reprocess and restart.
Multiline Option - easy way:
Go to logstash github, we can find many java specific Grok pattern.
For tomcat use CATALINA_DATESTAMP.
input{
file{
start_position => "beginning"
path =>
"/users/andy/desktop/logstash/event-data/apache_*.log"
}
file{
path =>
"/Users/andy/Desktop/logstash/ecent-data/java-errors.log"
start_position => "beginning"
code => multiline {
pattern => "^%{CATALINA_DATESTAMP}"
negate => true
what => "previous"
auto_flush_interval => 5
}
}
filter{
if [headers][request_uri] =~ "error" or [path] =~
"errors" {
mutate{
replace => { type => "error" }
}
} else {
mutate{
replace => { type => "access" }
}
grok{
match => {"message" =>
'"%{HTTPD_COMMONLOG}" "%{GREEDYDATA:referrer}"
"%{GREEDYDATA:agent}"' }
}
if "_grokparsefailure" in [tags] {
drop{}
}
useragent {
source => "agent"
target => "ua"
}
#Admin pages
if [request] =~ /^\/admin\// {
drop { }
}
#Static files
if [ua][devices] == "Spider" {
drop { }
}
if[request] =~ /^\js\//
or [request] =~ /^\/css\//
or [request] in [ "/robots.txt", "/favicon.ico"
] {
drop { }
}
#Crawlers
mutate{
convert => [
"response" => "integer"
"bytes" => "integer"
]
}
date{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss
Z" ]
remove_field => [ "timestamp" ]
}
geoip {
source => "clientip"
}
}
mutate{
remove_field => [ "headers", "@version",
"host" ]
}
}
output{
if [type] == "access" {
elasticsearch {
hosts => [" localhost:2000" ]
#index => "%{type}-%{+YYYY.MM.dd}"
document_type => "default"
http_compression => true
}
}else{
stdout {
codec => rubydebug
}
}
}
Parsing Stack Trace with GROK: - Complicated
Things are complicated because we are using multiline.
We can mix grok with regular expression which is slightly different.
Below is syntax and regular expression.
(?<msg>.+?(?=\n))" ->
(?<field_name>regex_pattern)
.+ -> matches any character multiple time. It is greedy.
?=\n -> Look around in regex
input{
file{
start_position => "beginning"
path =>
"/users/andy/desktop/logstash/event-data/apache_*.log"
}
file{
path =>
"/Users/andy/Desktop/logstash/ecent-data/java-errors.log"
start_position => "beginning"
code => multiline {
pattern => "^%{CATALINA_DATESTAMP}"
negate => true
what => "previous"
auto_flush_interval => 5
}
}
filter{
if [headers][request_uri] =~ "error" or [path] =~
"errors" {
mutate{
replace => { type => "error" }
}
grok {
match => { "message" =>
"%{CATALINA_DATESTAMP:timestamp} %{LOGLEVEL:level} %{JAVACLASS:class}:
(?<msg>.+?(?=\n))"
}
} else {
mutate{
replace => { type => "access" }
}
grok{
match => {"message" =>
'"%{HTTPD_COMMONLOG}" "%{GREEDYDATA:referrer}"
"%{GREEDYDATA:agent}"' }
}
if "_grokparsefailure" in [tags] {
drop{}
}
useragent {
source => "agent"
target => "ua"
}
#Admin pages
if [request] =~ /^\/admin\// {
drop { }
}
#Static files
if [ua][devices] == "Spider" {
drop { }
}
if[request] =~ /^\js\//
or [request] =~ /^\/css\//
or [request] in [ "/robots.txt", "/favicon.ico"
] {
drop { }
}
#Crawlers
mutate{
convert => [
"response" => "integer"
"bytes" => "integer"
]
}
date{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss
Z" ]
remove_field => [ "timestamp" ]
}
geoip {
source => "clientip"
}
}
mutate{
remove_field => [ "headers", "@version",
"host" ]
}
}
output{
if [type] == "access" {
elasticsearch {
hosts => [" localhost:2000" ]
#index => "%{type}-%{+YYYY.MM.dd}"
document_type => "default"
http_compression => true
}
}else{
stdout {
codec => rubydebug
}
}
}
We can even add both windows and linux newline. We can rewrite (?<msg>.+?(?=\n))" to
(?<msg>.+?(?=(\r\n|\r|\n)))
@Metadata
This values are not processed as events. This is similar to adding
and removing field, instead we can just use this.
- Modifying timestamp groc inside the filter
- Then adding date below it.
match => [ "[@metadata][timestamp]", "MMM dd, yyyy
HH:mm:ss a" ], a refers to AM/PM
- Usually @metadata wont appear in output events. But if you want to debug, you can specify metadata => true
- In the output @timestamp has 2017-09-09 and message have converted time Sep 9, 2017. If we just used date plugin then there will be a need to remove the field by ourselves.
input{
file{
start_position => "beginning"
path =>
"/users/andy/desktop/logstash/event-data/apache_*.log"
}
file{
path =>
"/Users/andy/Desktop/logstash/ecent-data/java-errors.log"
start_position => "beginning"
code => multiline {
pattern => "^%{CATALINA_DATESTAMP}"
negate => true
what => "previous"
auto_flush_interval => 5
}
}
filter{
if [headers][request_uri] =~ "error" or [path] =~
"errors" {
mutate{
replace => { type => "error" }
}
grok {
match => { "message" =>
"%{CATALINA_DATESTAMP:[@metadata][timestamp]} %{LOGLEVEL:level}
%{JAVACLASS:class}: (?<msg>.+?(?=\n))"
}
date {
match => [ "[@metadata][timestamp]", "MMM dd, yyyy
HH:mm:ss a" ]
}
} else {
mutate{
replace => { type => "access" }
}
grok{
match => {"message" =>
'"%{HTTPD_COMMONLOG}" "%{GREEDYDATA:referrer}"
"%{GREEDYDATA:agent}"' }
}
if "_grokparsefailure" in [tags] {
drop{}
}
useragent {
source => "agent"
target => "ua"
}
#Admin pages
if [request] =~ /^\/admin\// {
drop { }
}
#Static files
if [ua][devices] == "Spider" {
drop { }
}
if[request] =~ /^\js\//
or [request] =~ /^\/css\//
or [request] in [ "/robots.txt", "/favicon.ico"
] {
drop { }
}
#Crawlers
mutate{
convert => [
"response" => "integer"
"bytes" => "integer"
]
}
date{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss
Z" ]
remove_field => [ "timestamp" ]
}
geoip {
source => "clientip"
}
}
mutate{
remove_field => [ "headers", "@version",
"host" ]
}
}
output{
if [type] == "access" {
elasticsearch {
hosts => [" localhost:2000" ]
#index => "%{type}-%{+YYYY.MM.dd}"
document_type => "default"
http_compression => true
}
}else{
stdout {
codec => rubydebug
metadata => true
}
}
}
Running Multiple Pipelines:
Configure in -> /pat/to/logstash/config/pipelines.yml
Example:
-pipeline.id: user_searched
pipeline.batch.size: 50
path.config:
"/path/to/logstash/config/pipelines/searched.conf"
-pipeline.id: user_clicked_search_result
pipeline.batch.size: 10
config.string: "input { http{ } } output { stdout { } }"
- Go to pipeline.yml file and add below entry. He has commented pipeline.batch.size to expalin.
- pipeline.id: access_logs
#pipeline.batch.size: 125
path.config:
"/Users/andy/desktop/logstash/config/pipelines/access.conf"
- pipeline.id: error_logs
#pipeline.batch.size: 1
path.config:
"/Users/andy/desktop/logstash/config/pipelines/errors.conf"
- In config/pipelines/access.conf file, create below entry. It is same as above we are just tweaking to have only access.conf
input{
file{
start_position => "beginning"
path =>
"/users/andy/desktop/logstash/event-data/apache_*.log"
}
filter{
mutate{
replace => { type => "access" }
}
grok{
match => {"message" =>
'"%{HTTPD_COMMONLOG}" "%{GREEDYDATA:referrer}"
"%{GREEDYDATA:agent}"' }
}
if "_grokparsefailure" in [tags] {
drop{}
}
useragent {
source => "agent"
target => "ua"
}
#Admin pages
if [request] =~ /^\/admin\// {
drop { }
}
#Static files
if [ua][devices] == "Spider" {
drop { }
}
if[request] =~ /^\js\//
or [request] =~ /^\/css\//
or [request] in [ "/robots.txt", "/favicon.ico"
] {
drop { }
}
#Crawlers
mutate{
convert => [
"response" => "integer"
"bytes" => "integer"
]
}
date{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss
Z" ]
remove_field => [ "timestamp" ]
}
geoip {
source => "clientip"
}
}
mutate{
remove_field => [ "headers", "@version",
"host" ]
}
}
output{
#elasticsearch {
#hosts => [" localhost:2000" ]
#index => "%{type}-%{+YYYY.MM.dd}"
#document_type => "default"
#http_compression => true
#}
stdout {
codec => rubydebug
metadata => true
}
}
- Create error.conf and copy the old code and adjusting related to error:
input{
file{
path =>
"/Users/andy/Desktop/logstash/ecent-data/java-errors.log"
start_position => "beginning"
code => multiline {
pattern => "^%{CATALINA_DATESTAMP}"
negate => true
what => "previous"
auto_flush_interval => 5
}
}
filter{
mutate{
replace => { type => "error" }
}
grok {
match => { "message" =>
"%{CATALINA_DATESTAMP:[@metadata][timestamp]} %{LOGLEVEL:level}
%{JAVACLASS:class}: (?<msg>.+?(?=\n))"
}
date {
match => [ "[@metadata][timestamp]", "MMM dd, yyyy
HH:mm:ss a" ]
}
}
date{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss
Z" ]
remove_field => [ "timestamp" ]
}
mutate{
remove_field => [ "headers", "@version",
"host" ]
}
}
output{
stdout {
codec => rubydebug {
metadata => true
}
}
}
- Instruct logstash to restart with the pipelines. We don't have to add any extra arguments because logstash will read the pipelines.yml by default.
Also delete sincedb files.
bin/logstash --config.reload.automatic
No comments:
Post a Comment