Les retards à la SNCF Version Longue
(Facultatif) TP longue version
Pour ceux qui souhaitent approfondir le processus d'ETL en utilisant logstash
Analyse des données de l'Open Data de la SNCF
La première partie de ce TP portera sur l'analyse des données open data de la sncf portant sur les retards des lignes TGV en France à l'aide de logstash
Question
Rendez-vous dans le dossier logstash/conf/conf.d
cd logstash/conf/conf.d
Modifier le fichier 01-input.conf
Ce fichier contiendra la configuration permettant de lire le fichier regularite-mensuelle-tgv-short.csv placé dans le dossier /etc/logstash/files
Le plugin à utiliser est de la forme suivante :
input {
file {
path => ["un/fichier"]
}
}
Il faut aussi ajouter les options suivantes pour s'assurer que logstash lise bien tout le fichier
start_position => 'beginning'
sincedb_path => "/dev/null"
# /home/api04XX/logstash/conf/conf.d/01-input.conf
input {
file{
path => [ "/etc/logstash/files/regularite-mensuelle-tgv-short.csv" ]
start_position => 'beginning'
sincedb_path => "/dev/null"
}
}
Question
Modifier le fichier 02-filter.conf
Ce fichier contiendra la configuration permettant d'analyser un fichier csv avec les colonnes suivantes :
"date","axe","depart","arrivee","trains_programmes","trains_circules","trains_annules","trains_retards","regularite"
filter {
csv {
columns => ["columns"]
separator => ";"
}
}
# /home/api04XX/logstash/conf/conf.d/02-filter.conf
filter {
csv {
columns => ["date","axe","depart","arrivee","trains_programmes","trains_circules","trains_annules","trains_retards","regularite"]
separator => ";"
}
}
Question
Question
Lancer logstash et analyser le résultat
start logstash
[api04XX@tpapi04 ~]$ start logstash
Starting api04XX_api04XX-logstash_1
Attaching to api04XX_api04XX-logstash_1
api04XX-logstash_1 | {
api04XX-logstash_1 | "message" => [
api04XX-logstash_1 | [0] "2011-09;Atlantique;ANGOULEME;PARIS MONTPARNASSE;358;358;0;23;93.6\r"
api04XX-logstash_1 | ],
api04XX-logstash_1 | "@version" => "1",
api04XX-logstash_1 | "@timestamp" => "2016-01-17T13:26:57.562Z",
api04XX-logstash_1 | "host" => "api04XX-logstash",
api04XX-logstash_1 | "path" => "/etc/logstash/files/regularite-mensuelle-tgv-short.csv",
api04XX-logstash_1 | "date" => "2011-09",
api04XX-logstash_1 | "axe" => "Atlantique",
api04XX-logstash_1 | "depart" => "ANGOULEME",
api04XX-logstash_1 | "arrivee" => "PARIS MONTPARNASSE",
api04XX-logstash_1 | "trains_programmes" => "358",
api04XX-logstash_1 | "trains_circules" => "358",
api04XX-logstash_1 | "trains_annules" => "0",
api04XX-logstash_1 | "trains_retards" => "23",
api04XX-logstash_1 | "regularite" => "93.6"
api04XX-logstash_1 | }
Problématique : On se rend compte que le fichier de la SNCF comporte une date qui n'est pas complète ce qui empêche logstash de l'analyser.
Question
Rajouter un filtre qui permettrait de compléter le champ date avec le premier jour du mois et de le convertir en timestamp afin de pouvoir par la suite l'analyser.
Le plugin mutate permet de modifier la valeur d'un champ.
On peut récupérer la valeur du champ avec %{champ}
mutate {
replace => { "champ" => "valeur" }
}
La conversion en timestamp s'effectue à l'aide du plugin date.
date {
match => [ "champ", "YYYY-MM-dd" ] #On sélectionnne le pattern du champ afin d'éviter les erreurs de cast
timezone => "UTC"
}
# /home/api04XX/logstash/conf/conf.d/02-filter.conf
filter {
csv {
columns => ["date","axe","depart","arrivee","trains_programmes","trains_circules","trains_annules","trains_retards","regularite"]
separator => ";"
}
#Modification du champ date qui est de la forme yyyy-mm vers yyyy-mm-dd
mutate {
replace => { "date" => "%{date}-01" }
}
#cast du champ date que l'on vient de modifier vers un timestamp reconnu par elasticsearch.
date {
match => [ "date", "YYYY-MM-dd" ]
timezone => "UTC"
#Si la conversion a réussi, on supprime le champ date. Par défaut le résultat de la fonction date est envoyé au champ @timestamp
remove_field => ["date"]
}
}
[api04XX@tpapi04 conf.d]$ start logstash
Starting api04XX_api04XX-logstash_1
Attaching to api04XX_api04XX-logstash_1
api04XX-logstash_1 | {
api04XX-logstash_1 | "message" => [
api04XX-logstash_1 | [0] "2011-09;Atlantique;ANGOULEME;PARIS MONTPARNASSE;358;358;0;23;93.6\r"
api04XX-logstash_1 | ],
api04XX-logstash_1 | "@version" => "1",
api04XX-logstash_1 | "@timestamp" => "2011-09-01T00:00:00.000Z",
api04XX-logstash_1 | "host" => "api04XX-logstash",
api04XX-logstash_1 | "path" => "/etc/logstash/files/regularite-mensuelle-tgv-short.csv",
api04XX-logstash_1 | "axe" => "Atlantique",
api04XX-logstash_1 | "depart" => "ANGOULEME",
api04XX-logstash_1 | "arrivee" => "PARIS MONTPARNASSE",
api04XX-logstash_1 | "trains_programmes" => "358",
api04XX-logstash_1 | "trains_circules" => "358",
api04XX-logstash_1 | "trains_annules" => "0",
api04XX-logstash_1 | "trains_retards" => "23",
api04XX-logstash_1 | "regularite" => "93.6"
api04XX-logstash_1 | }
Question
Les valeurs numériques doivent aussi être castées par logstash pour être prise en compte par elasticsearch.
# /home/api04XX/logstash/conf/conf.d/02-filter.conf
filter {
csv {
columns => ["date","axe","depart","arrivee","trains_programmes","trains_circules","trains_annules","trains_retards","regularite"]
separator => ";"
}
# /home/api04XX/logstash/conf/conf.d/02-filter.conf
#Modification du champ date qui est de la forme yyyy-mm vers yyyy-mm-dd
mutate {
replace => { "date" => "%{date}-01" }
convert => { "trains_programmes" => "integer" }
convert => { "trains_circules" => "integer" }
convert => { "trains_annules" => "integer" }
convert => { "trains_retards" => "integer" }
convert => { "regularite" => "float" }
}
#cast du champ date que l'on vient de modifier vers un timestamp reconnu par elasticsearch.
date {
match => [ "date", "YYYY-MM-dd" ]
timezone => "UTC"
#Si la conversion a réussi, on supprime le champ date. Par défaut le résultat de la fonction date est envoyé au champ @timestamp
remove_field => ["date"]
}
}
Maintenant que les résultats correspondent à ce que l'on veut, on souhaite charger les données dans la base elasticsearch.
Question
Modifiez le fichier d'entrée pour utiliser le fichier regularite-mensuelle-tgv.csv
# /home/api04XX/logstash/conf/conf.d/01-input.conf
input {
file{
path => [ "/etc/logstash/files/regularite-mensuelle-tgv.csv" ]
start_position => 'beginning'
sincedb_path => "/dev/null"
}
}
Question
Modifiez le fichier de sortie pour utiliser une base elasticsearch
# /home/api04XX/logstash/conf/conf.d/03-output.conf
output {
elasticsearch {
host => "192.168.1.160"
cluster => "elastic.api04XX.docker"
port => "93XX"
index => "logstash-api04-%{+YYYY.MM.dd}"
protocol => "transport"
}
}
start elasticsearch
start logstash
Après l'ajout des données, il doit y avoir 100 shards actifs sur la base
curl localhost:92XX/_cluster/health?pretty=true
{
"cluster_name" : "elastic.api0402.docker",
"status" : "green",
"timed_out" : false,
"number_of_nodes" : 1,
"number_of_data_nodes" : 1,
"active_primary_shards" : 100,
"active_shards" : 100,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 0,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0
}
Lors du lancement de logstash, vous pouvez voir des messages de la forme
{:timestamp=>"2016-01-17T14:20:50.351000+0000", :message=>"retrying failed action with response code: 429", :level=>:warn}
Ce signifie que l'on charge "trop vite" l'elasticsearch et que logstash doit renvoyer les informations.
Question
Question subsidiaire : Dans le fichier de la SNCF, on voit qu'il y a le nom des gares de départ et d'arrivée.
Dans un second fichier opendata, la SNCF fournit le nom de ses points de vente ainsi que leurs coordonnées GPS. Elasticsearch est capable de stocker des points géo-point ce qui permet par la suite à Kibana d'afficher des cartes avec des points géographiques.
Il serait intéressant pour nous de pouvoir faire une sorte de jointure entre le fichier des localisations et le fichier des retards. Pour ça, il est possible d'écrire son propre plugin en ruby.
À partir du plugin situé dans le répertoire logstash/conf/conf.d/logstash/filter/localisation.rb ajoutez la configuration au fichier 02-filter.conf pour ajouter la localisation des villes dans les résultats de logstash.
require "logstash/filters/base"
require "logstash/namespace"
class LogStash::Filters::Localisation < LogStash::Filters::Base
# filter {
# localisation { ... }
# }
config_name "localisation" #Nom du plugin
#Définition des variables
config :database, :validate => :path # Le script va lire un fichier en entrée
config :source, :validate => :string, :required => true # Variable d'entrée avec une ville
config :target, :validate => :string, :default => 'location' # Variable de sortie pour stocker le tableau des positions GPS
config :locs
public
def register
if @database.nil?
@database = LogStash::Environment.vendor_path("/etc/logstash/files/localisation.csv")
if !File.exists?(@database)
raise "la ligne 'database => ...' est obligatoire"
end
end
@locs = Hash.new
CSV.foreach(@database) do |row|
# Lecture des infos du fichier csv des villes et convertion en float
# NB Elasticsearch analyse les géopoint stocké dans un tableau sous le forme longitude,latitude
@locs[row[0]] = [ row[2].to_f, row[1].to_f ] # { "lat" => row[1].to_f, "lon" => row[2].to_f }
end
end
public
def filter(event)
# Ne retourne rien si la fonction ne renvoie rien
return unless filter?(event)
if event[@source]
loc = event[@source]
if @locs[loc]
event[@target] = @locs[loc]
else
@logger.error("Localisation inconnue: ", {"loc" => loc})
end
end
# Les données récupérées sont renvoyées
filter_matched(event)
end
end # class LogStash::Filters::Foo
# /home/api04XX/logstash/conf/conf.d/02-filter.conf
filter {
csv {
columns => ["date","axe","depart","arrivee","trains_programmes","trains_circules","trains_annules","trains_retards","regularite"]
separator => ";"
}
# /home/api04XX/logstash/conf/conf.d/02-filter.conf
#Modification du champ date qui est de la forme yyyy-mm vers yyyy-mm-dd
mutate {
replace => { "date" => "%{date}-01" }
convert => { "trains_programmes" => "integer" }
convert => { "trains_circules" => "integer" }
convert => { "trains_annules" => "integer" }
convert => { "trains_retards" => "integer" }
convert => { "regularite" => "float" }
}
#cast du champ date que l'on vient de modifier vers un timestamp reconnu par elasticsearch.
date {
match => [ "date", "YYYY-MM-dd" ]
timezone => "UTC"
#Si la conversion a réussi, on supprime le champ date. Par défaut le résultat de la fonction date est envoyé au champ @timestamp
remove_field => ["date"]
}
#Rajoute les coordonnées GPS des villes pour Kibana
localisation {
database => "/etc/logstash/files/localisation.csv"
source => "depart"
target => "departGPS"
}
localisation {
database => "/etc/logstash/files/localisation.csv"
source => "arrivee"
target => "arriveeGPS"
}
}
Lorsque l'on ajoute des geopoints, il faut spécifier un mapping spécifique pour caster les coordonnées.
resetES
curl -XPUT 'localhost:92XX/_template/logstash-api04' -d @/home/api04XX/logstash/conf/files/mappingGPS
start logstash