Stack ELK : Logstash et Kibana les outils d'Elasticsearch

Les retards à la SNCF Version Longue

(Facultatif) TP longue version

Pour ceux qui souhaitent approfondir le processus d'ETL en utilisant logstash

Analyse des données de l'Open Data de la SNCF

La première partie de ce TP portera sur l'analyse des données open data de la sncf portant sur les retards des lignes TGV en France à l'aide de logstash

Question

Rendez-vous dans le dossier logstash/conf/conf.d

1
cd logstash/conf/conf.d

Modifier le fichier 01-input.conf

Ce fichier contiendra la configuration permettant de lire le fichier regularite-mensuelle-tgv-short.csv placé dans le dossier /etc/logstash/files

Indice

Le plugin à utiliser est de la forme suivante :

1
input {
2
   file {
3
     path => ["un/fichier"]
4
   }
5
}
Indice

Il faut aussi ajouter les options suivantes pour s'assurer que logstash lise bien tout le fichier

1
start_position => 'beginning'
2
sincedb_path => "/dev/null"
Solution
1
# /home/api04XX/logstash/conf/conf.d/01-input.conf
2
input {
3
    file{
4
		  path => [ "/etc/logstash/files/regularite-mensuelle-tgv-short.csv" ]
5
    	start_position => 'beginning'
6
    	sincedb_path => "/dev/null"
7
    }
8
}
9

Question

Modifier le fichier 02-filter.conf

Ce fichier contiendra la configuration permettant d'analyser un fichier csv avec les colonnes suivantes :

1
"date","axe","depart","arrivee","trains_programmes","trains_circules","trains_annules","trains_retards","regularite"
Indice
1
filter {
2
    csv {
3
    	columns => ["columns"]
4
	    separator => ";"
5
    }
6
}
Solution
1
# /home/api04XX/logstash/conf/conf.d/02-filter.conf
2
filter {
3
    csv {
4
    	columns => ["date","axe","depart","arrivee","trains_programmes","trains_circules","trains_annules","trains_retards","regularite"]
5
	    separator => ";"
6
    }
7
}

Question

Modifier le fichier 03-output.conf

Ce fichier contiendra la configuration permettant de voir sur la console le résultat de l'analyse de logstash

Indice
1
output {
2
3
}
Solution
1
# /home/api04XX/logstash/conf/conf.d/03-output.conf
2
output {
3
	stdout { codec => rubydebug }
4
}

Question

Lancer logstash et analyser le résultat

1
start logstash
Solution
1
[api04XX@tpapi04 ~]$ start logstash
2
Starting api04XX_api04XX-logstash_1
3
Attaching to api04XX_api04XX-logstash_1
4
api04XX-logstash_1 | {
5
api04XX-logstash_1 |               "message" => [
6
api04XX-logstash_1 |         [0] "2011-09;Atlantique;ANGOULEME;PARIS MONTPARNASSE;358;358;0;23;93.6\r"
7
api04XX-logstash_1 |     ],
8
api04XX-logstash_1 |              "@version" => "1",
9
api04XX-logstash_1 |            "@timestamp" => "2016-01-17T13:26:57.562Z",
10
api04XX-logstash_1 |                  "host" => "api04XX-logstash",
11
api04XX-logstash_1 |                  "path" => "/etc/logstash/files/regularite-mensuelle-tgv-short.csv",
12
api04XX-logstash_1 |                  "date" => "2011-09",
13
api04XX-logstash_1 |                   "axe" => "Atlantique",
14
api04XX-logstash_1 |                "depart" => "ANGOULEME",
15
api04XX-logstash_1 |               "arrivee" => "PARIS MONTPARNASSE",
16
api04XX-logstash_1 |     "trains_programmes" => "358",
17
api04XX-logstash_1 |       "trains_circules" => "358",
18
api04XX-logstash_1 |        "trains_annules" => "0",
19
api04XX-logstash_1 |        "trains_retards" => "23",
20
api04XX-logstash_1 |            "regularite" => "93.6"
21
api04XX-logstash_1 | }

Problématique : On se rend compte que le fichier de la SNCF comporte une date qui n'est pas complète ce qui empêche logstash de l'analyser.

Question

Rajouter un filtre qui permettrait de compléter le champ date avec le premier jour du mois et de le convertir en timestamp afin de pouvoir par la suite l'analyser.

Indice

Le plugin mutate permet de modifier la valeur d'un champ.

On peut récupérer la valeur du champ avec %{champ}

1
mutate {
2
  replace => { "champ" => "valeur" }
3
}
Indice

La conversion en timestamp s'effectue à l'aide du plugin date.

1
date {
2
  match => [ "champ", "YYYY-MM-dd" ] #On sélectionnne le pattern du champ afin d'éviter les erreurs de cast
3
  timezone => "UTC"
4
}
5
Solution
1
# /home/api04XX/logstash/conf/conf.d/02-filter.conf
2
filter {
3
    csv {
4
    	columns => ["date","axe","depart","arrivee","trains_programmes","trains_circules","trains_annules","trains_retards","regularite"]
5
	    separator => ";"
6
    }
7
8
		#Modification du champ date qui est de la forme yyyy-mm vers yyyy-mm-dd
9
		mutate {
10
			replace => { "date" => "%{date}-01" }
11
		}
12
		#cast du champ date que l'on vient de modifier vers un timestamp reconnu par elasticsearch.
13
		date {
14
			match => [ "date", "YYYY-MM-dd" ]
15
			timezone => "UTC"
16
			#Si la conversion a réussi, on supprime le champ date. Par défaut le résultat de la fonction date est envoyé au champ @timestamp
17
			remove_field => ["date"]
18
		}
19
}
1
[api04XX@tpapi04 conf.d]$ start logstash
2
Starting api04XX_api04XX-logstash_1
3
Attaching to api04XX_api04XX-logstash_1
4
api04XX-logstash_1 | {
5
api04XX-logstash_1 |               "message" => [
6
api04XX-logstash_1 |         [0] "2011-09;Atlantique;ANGOULEME;PARIS MONTPARNASSE;358;358;0;23;93.6\r"
7
api04XX-logstash_1 |     ],
8
api04XX-logstash_1 |              "@version" => "1",
9
api04XX-logstash_1 |            "@timestamp" => "2011-09-01T00:00:00.000Z",
10
api04XX-logstash_1 |                  "host" => "api04XX-logstash",
11
api04XX-logstash_1 |                  "path" => "/etc/logstash/files/regularite-mensuelle-tgv-short.csv",
12
api04XX-logstash_1 |                   "axe" => "Atlantique",
13
api04XX-logstash_1 |                "depart" => "ANGOULEME",
14
api04XX-logstash_1 |               "arrivee" => "PARIS MONTPARNASSE",
15
api04XX-logstash_1 |     "trains_programmes" => "358",
16
api04XX-logstash_1 |       "trains_circules" => "358",
17
api04XX-logstash_1 |        "trains_annules" => "0",
18
api04XX-logstash_1 |        "trains_retards" => "23",
19
api04XX-logstash_1 |            "regularite" => "93.6"
20
api04XX-logstash_1 | }

Question

Les valeurs numériques doivent aussi être castées par logstash pour être prise en compte par elasticsearch.

Solution
1
# /home/api04XX/logstash/conf/conf.d/02-filter.conf
2
filter {
3
    csv {
4
        columns => ["date","axe","depart","arrivee","trains_programmes","trains_circules","trains_annules","trains_retards","regularite"]
5
            separator => ";"
6
    }
7
8
# /home/api04XX/logstash/conf/conf.d/02-filter.conf
9
#Modification du champ date qui est de la forme yyyy-mm vers yyyy-mm-dd
10
mutate {
11
        replace => { "date" => "%{date}-01" }
12
        convert => { "trains_programmes" => "integer" }
13
        convert => { "trains_circules" => "integer" }
14
        convert => { "trains_annules" => "integer" }
15
        convert => { "trains_retards" => "integer" }
16
        convert => { "regularite" => "float" }
17
}
18
#cast du champ date que l'on vient de modifier vers un timestamp reconnu par elasticsearch.
19
date {
20
        match => [ "date", "YYYY-MM-dd" ]
21
        timezone => "UTC"
22
        #Si la conversion a réussi, on supprime le champ date. Par défaut le résultat de la fonction date est envoyé au champ @timestamp
23
        remove_field => ["date"]
24
}
25
26
}

Maintenant que les résultats correspondent à ce que l'on veut, on souhaite charger les données dans la base elasticsearch.

Question

Modifiez le fichier d'entrée pour utiliser le fichier regularite-mensuelle-tgv.csv

Solution
1
# /home/api04XX/logstash/conf/conf.d/01-input.conf
2
input {
3
    file{
4
        path => [ "/etc/logstash/files/regularite-mensuelle-tgv.csv" ]
5
        start_position => 'beginning'
6
        sincedb_path => "/dev/null"
7
    }
8
}

Question

Modifiez le fichier de sortie pour utiliser une base elasticsearch

Solution
1
# /home/api04XX/logstash/conf/conf.d/03-output.conf
2
output {
3
    elasticsearch {
4
        host => "192.168.1.160"
5
        cluster => "elastic.api04XX.docker"
6
        port => "93XX"
7
        index => "logstash-api04-%{+YYYY.MM.dd}"
8
        protocol => "transport"
9
    }
10
}
1
start elasticsearch
2
start logstash

Après l'ajout des données, il doit y avoir 100 shards actifs sur la base

1
curl localhost:92XX/_cluster/health?pretty=true
2
{
3
  "cluster_name" : "elastic.api0402.docker",
4
  "status" : "green",
5
  "timed_out" : false,
6
  "number_of_nodes" : 1,
7
  "number_of_data_nodes" : 1,
8
  "active_primary_shards" : 100,
9
  "active_shards" : 100,
10
  "relocating_shards" : 0,
11
  "initializing_shards" : 0,
12
  "unassigned_shards" : 0,
13
  "delayed_unassigned_shards" : 0,
14
  "number_of_pending_tasks" : 0,
15
  "number_of_in_flight_fetch" : 0
16
}

Lors du lancement de logstash, vous pouvez voir des messages de la forme

1
{:timestamp=>"2016-01-17T14:20:50.351000+0000", :message=>"retrying failed action with response code: 429", :level=>:warn}

Ce signifie que l'on charge "trop vite" l'elasticsearch et que logstash doit renvoyer les informations.

Question

Question subsidiaire : Dans le fichier de la SNCF, on voit qu'il y a le nom des gares de départ et d'arrivée.

Dans un second fichier opendata, la SNCF fournit le nom de ses points de vente ainsi que leurs coordonnées GPS. Elasticsearch est capable de stocker des points géo-point ce qui permet par la suite à Kibana d'afficher des cartes avec des points géographiques.

Il serait intéressant pour nous de pouvoir faire une sorte de jointure entre le fichier des localisations et le fichier des retards. Pour ça, il est possible d'écrire son propre plugin en ruby.

À partir du plugin situé dans le répertoire logstash/conf/conf.d/logstash/filter/localisation.rb ajoutez la configuration au fichier 02-filter.conf pour ajouter la localisation des villes dans les résultats de logstash.

Indice
1
require "logstash/filters/base"
2
require "logstash/namespace"
3
4
class LogStash::Filters::Localisation < LogStash::Filters::Base
5
6
7
  # filter {
8
  #   localisation { ... }
9
  # }
10
  config_name "localisation" #Nom du plugin
11
12
  #Définition des variables
13
  config :database, :validate => :path # Le script va lire un fichier en entrée
14
  config :source, :validate => :string, :required => true # Variable d'entrée avec une ville
15
  config :target, :validate => :string, :default => 'location' # Variable de sortie pour stocker le tableau des positions GPS
16
  config :locs 
17
18
  public
19
  def register
20
    if @database.nil?
21
      @database = LogStash::Environment.vendor_path("/etc/logstash/files/localisation.csv")
22
      if !File.exists?(@database)
23
        raise "la ligne  'database => ...' est obligatoire"
24
      end
25
    end
26
    
27
    @locs = Hash.new
28
    CSV.foreach(@database) do |row|
29
      # Lecture des infos du fichier csv des villes et convertion en float
30
      # NB Elasticsearch analyse les géopoint stocké dans un tableau sous le forme longitude,latitude 
31
      @locs[row[0]] = [ row[2].to_f, row[1].to_f ] # { "lat" => row[1].to_f, "lon" => row[2].to_f }
32
    end
33
  end
34
  
35
36
  public
37
  def filter(event)
38
    # Ne retourne rien si la fonction ne renvoie rien 
39
    return unless filter?(event)
40
41
    if event[@source]
42
      loc = event[@source]
43
      if @locs[loc]
44
        event[@target] = @locs[loc]
45
        else
46
        @logger.error("Localisation inconnue: ", {"loc" => loc})
47
      end
48
    end
49
    # Les données récupérées sont renvoyées 
50
    filter_matched(event)
51
  end
52
end # class LogStash::Filters::Foo
53
Solution
1
# /home/api04XX/logstash/conf/conf.d/02-filter.conf
2
filter {
3
    csv {
4
        columns => ["date","axe","depart","arrivee","trains_programmes","trains_circules","trains_annules","trains_retards","regularite"]
5
            separator => ";"
6
    }
7
8
	# /home/api04XX/logstash/conf/conf.d/02-filter.conf
9
	#Modification du champ date qui est de la forme yyyy-mm vers yyyy-mm-dd
10
	mutate {
11
	        replace => { "date" => "%{date}-01" }
12
	        convert => { "trains_programmes" => "integer" }
13
	        convert => { "trains_circules" => "integer" }
14
	        convert => { "trains_annules" => "integer" }
15
	        convert => { "trains_retards" => "integer" }
16
	        convert => { "regularite" => "float" }
17
	}
18
	#cast du champ date que l'on vient de modifier vers un timestamp reconnu par elasticsearch.
19
	date {
20
	        match => [ "date", "YYYY-MM-dd" ]
21
	        timezone => "UTC"
22
	        #Si la conversion a réussi, on supprime le champ date. Par défaut le résultat de la fonction date est envoyé au champ @timestamp
23
	        remove_field => ["date"]
24
	}
25
	
26
	#Rajoute les coordonnées GPS des villes pour Kibana
27
	localisation {
28
		database => "/etc/logstash/files/localisation.csv"
29
	  	source => "depart"
30
	  	target => "departGPS"
31
	}
32
	localisation {
33
	    database => "/etc/logstash/files/localisation.csv"
34
		source => "arrivee"
35
		target => "arriveeGPS"
36
	}
37
}

Lorsque l'on ajoute des geopoints, il faut spécifier un mapping spécifique pour caster les coordonnées.

1
resetES
2
3
curl -XPUT 'localhost:92XX/_template/logstash-api04' -d @/home/api04XX/logstash/conf/files/mappingGPS
4
5
start logstash
PrécédentPrécédentFin
AccueilAccueilImprimerImprimer Antoine Barbare, 2015 (Contributions : Stéphane Crozat, les étudiants de l'UTC) Paternité - Partage des Conditions Initiales à l'IdentiqueRéalisé avec Scenari (nouvelle fenêtre)