Un'architettura ELK stack per i Big Data

II parte: Il filtro Elapseddi e

Negli ultimi anni si parla sempre più spesso di Big Data, Centralized Log Management e Analytics alimentando l'ecosistema delle soluzioni commerciali e open source. In questo secondo articolo, gli autori descrivono il funzionamento del plug-in Elapsed per Logstash, nato per risolvere delle esigenze di un caso reale, occorso nelle attività del Gruppo Imola.

Introduzione

Nello scorso numero abbiamo visto le ragioni che ci hanno portato alla realizazione del filtro Elapsed e che qui riassumiamo brevemnte: quanche anno fa, a seguito della richiesta di un cliente, abbiamo iniziato a studiare le architetture di monitoring basate sull'analisi dei log esistenti, esaminando sia prodotti open source che prodotti proprietari. La scelta è caduta su un gruppo di prodotti open source che sono stati opportunamente assemblati in una architettura resiliente simile a quella che oggi è conosciuta come ELK stack.

In questo articolo approfondiremo il filtro elapsed. Si tratta un componente creato per risolvere delle esigenze di progetto che in seguito è stato rilasciato ed è ora a disposizione della community di Logstash.

Filtri Logstash

Riprendendo il dettaglio architetturale descritto nell'articolo precedente, esporremo il funzionamento della catena di filtri allinterno di Logstash (figura 1).

I filtri sono componenti applicativi pluggabili, estendibili e configurabili, la cui combinazione permette di definire una pipeline di elaborazione per ogni singolo event log proveniente dai componenti  di input. In particolare i filtri sono applicati a cascata e ognuno applica una o più regole (p.e.: espressioni regolari) per individuare informazioni di interesse. Le informazioni ricavate sono inserite all'interno di un JSON  di risposta che si arricchisce e si struttura man mano che i filtri sono eseguiti.

Durante il filtering è possibile quindi classificare, taggare, aggiungere o eliminare informazioni, scartare, strutturare, correlare, eliminare, normalizzare gli event log ed effetuare il parsing su di essi.

 

 

Figura 1 - "Esploso" dell'architettura di progetto sulla parte Logstah e la sua catena di filtri.

 

È possibile utilizzare statement condizionali, all'interno della pipeline, per eseguire azioni al verificarsi di determinati eventi; abbiamo aggiunto, per esempio, un controllo nella pipeline di filtering per "taggare" alcuni eventi come "alert" da inviare via e-mail all'amministratore del sistema come da richieste funzionali di progetto.

Riportiamo di seguito alcuni filtri utilizzati nel progetto e in particolare il filtro custom "elapsed" progettato e sviluppato dal Gruppo Imola [3].

Filtro "grok"

Il filtro grok [8] effettua il parsing del messaggio di ogni singolo log event (per sua natura non strutturato) e cerca di trovare una corrispondenza con gli insiemi di pattern grok predefiniti o customizzati. Un pattern grok base incapsula un'espressione regolare più o meno complessa. Ogni insieme di pattern grok definisce un aggregato strutturato d'informazioni.

La possibilità di "comporre" pattern di pattern è decisamente utile sia in termini di potenza espressiva sia in termini di facilità di utilizzo. Nell'installazione di Logstash sono presenti svariati pattern "preconfezionati" per la maggior parte delle tipologie di log conosciute (per esempio, syslog, Java, Apache log, database log, etc.).

Esistono strumenti on-line come il grok debugger [9] che permettono di verificare sintatticamente e semanticamente la correttezza dei pattern.

A titolo di esempio riportiamo di seguito il modo in cui abbiamo utilizzato il filtro grok nel progetto.

Event log da elaborare

[2015_03_28-22_55_03] [CONSOLIDAMENTO] ; $# Nome della Sequenza: Seq_AI_1379503146 $# 
          Identificativo del territorio: 9904 $# Zona del territorio: AI;

Porzione del file di configurazione logstash

// Estrapolazione dal file di configurazione Logstash.conf (Indexer)
          {
            input {
              ...
                      }
            filter {
              ...
              
              grok {
                patterns_dir => ["/etc/logstash/pattern/customPatterns "]
                match => ["message", "%{LOG_TIMESTAMP}"]
              }
              
              grok  {
                patterns_dir => ["/etc/logstash/pattern/customPatterns"]
                match => ["message", "%{LOG_START_SING_SEQ_CONS}"]
                add_field => [ "Tipo", "StartConsSingSeq" ]
                add_tag => [ "StartConsSingSeq", "Consolidamento"]  
              }
              ...
          
            }
            output {
              ...
            }
          }

il parametro match determina o meno l'attivazione del filtro qualora il messaggio contenuto nel log event, o parte di esso, soddisfi l'espressione regolare contenuta nel pattern. In caso il pattern sia riconosciuto, le informazioni ricavate sono aggiunte al JSON di risposta secondo le regole stesse del pattern. Ovvero un pattern può essere visto come una coppia di informazioni: la prima è un'espressione regolare e la seconda è la regola di inserimento nel JSON. Di seguito un piccolo esempio chiarirà meglio il funzionamento.

Porzione del file "customPatterns"

.......
          CONS_TIMESTAMP [%{YEAR}_%{MONTHNUM}_%{MONTHDAY}-
          %{HOUR}_%{MINUTE}_%{SECOND}]
          # Esempio CONS_TIMESTAMP: [2013_09_22-03_50_02]
          
          ZONA ([a-zA-Z]{2})
          # Esempio ZONA: AI
          
          SEQ_AZZ_ID Seq_%{ZONA}_%{NUMBER}
          # Esempio Sequenza Zona: Seq_AI_1372337060
          
          AZZ_TERR Zona%{SPACE}del%{SPACE}territorio:%{SPACE}%{ZONA:Zona}
          # Esempio AZZ_TERR:  Zona del territorio: AI
          
          LOG_START_SING_SEQ_CONS 
          O]%{SPACE};%{SPACE}$#%{SPACE}Nome%{SPACE}della%{GREEDYDATA}%{SEQ_AZZ_ID:
          Nome_Sequenza}%{GREEDYDATA}%{TERRITORIO}%{GREEDYDATA}%{AZZ_TERR};
------

I pattern YEAR, MONTHNUM, MONTHDAY, HOUR, MINUTE, SECOND, SPACE, NUMBER e GREEDYDATA sono tutti componenti base presenti in Logstash.

Risultato post filtering dell'event log con strutturazione delle informazioni nel documento JSON

{
            "message" => "[2015_03_28-22_55_03] [CONSOLIDAMENTO] ; 
            $# Nome della Sequenza: Seq_AI_1379503146 $# 
            Identificativo del territorio: 9904 $# 
            Zona del territorio: AI;",
              "@timestamp" => "2015-03-28T22:55:03.000Z",
                "@version" => "1",
                    "type" => "cons",
                    "host" => "UFFICIO_N",
                 "logdate" => "[2015_03_28-22_55_03]",
                    "tags" => [
               [0] "Dated",
               [1] "StartConsSingSeq",
               [2] "Consolidamento"
            ],
            "Nome_Sequenza" => "Seq_AI_1379503146",
            "Id_Territorio" => "9904",
              "Zona" => "AI",
                 "Tipo" => "StartConsSingSeq"
}

Filtro "elapsed"

Durante la fase di analisi funzionale era emersa la necessità di utilizzare le informazioni dei log event raccolti per tracciare la durata di ogni singolo processo, determinare la durata di tutti i processi appartenenti a una certa categoria applicativa e, soprattutto, notificare ai responsabili dei sistemi qualsiasi problema di non terminazione o durata oltre un tempo di time-out fissato.

Avevamo quindi la necessità di identificare il log event di start e stop di ogni singolo processo e determinarne la durata calcolando la differenza tra i timestamp contenuti nei log.

Non avendo trovato un filtro Logstash che risolvesse il problema di correlazione descritto abbiamo deciso di progettare e implementare un filtro custom elapsed da proporre alla community del progetto Logstash.

La sintassi del filtro

Riportiamo per comodità la sintassi del filtro, comunque ben documentato anche nel sito ufficiale di Logstash [4]:

filter {
            elapsed {
              start_tag => "start event tag"
              end_tag => "end event tag"
              unique_id_field => "id field name"
              timeout => seconds
              new_event_on_match => true/false
            }
}

 

Il funzionamento del filtro

Il filtro "cattura" tutti gli event log che contengono il valore impostato su start_tag uguale al valore start event tag, salvandoli temporaneamente in una struttura hash di attesa fino a quando nella pipeline di Logstash non entra un event log che contiene il valore end_tag settato su end event tag e con valore di correlazione  unique_id_field presente in entrambi.

La gestione di un event log di end è gestito con l'eliminazione del corrispondente start event log nella struttura hash e calcolando della differenza temporale in funzione dei timestamp presenti negli event log. Le informazioni calcolate vengono inserite nell'event log di uscita dalla pipeline.

Ad ogni event log inserito nella struttua hash di attesa è associato un timeout che ne stabilisce il tempo massimo di permanenza nella struttura stessa; qualora l'event log di end non arrivi nel tempo prestabilito, l'evento memorizzato nella struttura hash è scartato e si genere un nuovo evento di errore con tag elapsed.expired_error.

Un esempio

A titolo di esempio riportiamo le indicazione del filtro elapsed nella pipeline del componente Logstash Indexer:

// Porzione del file di configurazione logstash:
          // Estrapolazione dal file di configurazione Logstash.conf (Indexer)
          {
            input   {
              redis {
                  host => "logging-server"
                  type => "redis-input"
                  data_type => "list"
                  key => "logstash"
              }
            }  
              filter {
                .....
                 
                //Utilizzo dell' espressione regolare "LOG_START_ALLINEAMENTO" 
                //per identificare l'inizio processo "Allineamento".
                grok {
                   patterns_dir => ["/etc/logstash/pattern/customPatterns"]
                   //Condizione di attivazione del filtro
                   match => ["message", "%{LOG_START_ALLINEAMENTO}"]
                   add_field => [ "Tipo", "StartAllineamento" ]
                   add_tag => [ "StartAllineamento", 
                                "ConsolidamentoElapsed", "catch" ]  
                }
                
                // Utilizzo dell' espressione regolare "LOG_STOP_ALLINEAMENTO" 
                // per identificare la fine del processo "Allineamento".
                grok {
                  patterns_dir => ["/etc/logstash/pattern/customPatterns"]
                  match => ["message", "%{LOG_STOP_ALLINEAMENTO}"]
                  add_field => [ "Tipo", "StopAllineamento" ]
                  add_tag => [ "StopAllineamento",  
                              "ConsolidamentoElapsed", "catch" ]  
                }
                
                //Utilizzo del filtro sviluppato dal Gruppo Imola
                elapsed {
                  start_tag => "StartAllineamento"
                  end_tag => "StopAllineamento"
                  unique_id_field => "IdAllineamento"
                  timeout => 15 (secondi)
                  new_event_on_match => false
                }
                .....
              }
              output {
                
                //In caso si verifichi un errore di non terminazione 
                  di un processo invio una mail
                if ("elapsed.expired_error" in [tags]) {
                
                  email {
                    body => "Il processo di allineameto con Id %{IdAllineamento} 
                    NON terminato entro il tempo previsto di %{elapsed.time} secondi.
                    Inizio del processo alle %{elapsed.timestamp_start}."
                      subject => "Timeout Scaduto Secondi %{elapsed.time}"
                      from => "Consolidamento@host.xyz"
                      to => "amministratore@host.xyz "
                      options => [ "smtpIporHost", " host.xyz", "port", "25"]
                  }
                 //Inoltra tutti gli event log sul cluster ElasticSearch
                 elasticsearch {
                   cluster => "elastic_cliente"
                 }
              }
          }

Tutti i plug-in (input, filtri e output) sono eseguiti nell'ordine con cui vengono inseriti nel file di configurazione.

Come accennato in precedenza, la possibilità di utilizzare statement condizionali all'interno del blocco di out nel file di configurazione ci ha permesso di gestire le notifiche via e-mail nel caso di "non terminazione" dei processi tracciati.

Esempio di tracciabilità per "Processo terminato"

Ipotizziamo che nella coda degli event log di Logstash arrivi il seguente evento

[2015_03_23-02_33_55] [CONSOLIDAMENTO] ;
          [ALLINEAMENTO] Inizio l'allineamento dei dati validati dal consolidamento 
          con l'altro DB - id allineamento: 1382488437340;

La pipeline scompone l'evento usando il primo filtro grok riportato nella configurazione d'esempio:

{
              "message" => "[2015_03_23-02_33_55] [CONSOLIDAMENTO] ;
              [ALLINEAMENTO] Inizio l'allineamento dei dati validati dal consolidamento 
              con l'altro DB - id allineamento: 1382488437340;",
                  "@timestamp" => "2015-03-23T02:33:55.000Z",
                    "@version" => "1",
                        "type" => "cons",
                        "host" => "UFFICIO_N",
                     "logdate" => "[2015_03_23-02_33_55]",
                        "tags" => [
                  [0] "Dated",
                  [1] "StartAllineamento"        
              ],
              "IdAllineamento" => "1382488437340",
                        "Tipo" => "StartAllineamento"
          }

Il filtro elapsed  rileva [start_tag => "StartAllineamento"] e "sospende" il log event.

Di seguito riportiamo il caso in cui arriva l'evento di stop di processo prima del timeout settato a 15 secondi:

[2013_03_23-02_33_59] [CONSOLIDAMENTO] ;[
          ALLINEAMENTO] Fine dell'allineamento - id allineamento: 1382488437340;

Qui, la pipeline scompone l'evento usando il secondo filtro grok riportato nella configurazione d'esempio:

{
              "message" => "[2013_03_23-02_33_59] [CONSOLIDAMENTO] ;
          [ALLINEAMENTO] Fine dell'allineamento - id allineamento: 1382488437340;",
                           "@timestamp" => "2013-03-23T02:33:59.000Z",
                              "@version" => "1",
                            "type" => "cons",
                            "host" => "UFFICIO_N",
                              "logdate" => "[2013_03_23-02_33_59]",
                                 "tags" => [
                  [0] "Dated",
                  [1] "StopAllineamento",
                  [2] "elapsed",
                  [3] "elapsed.match"
              ],
                       "IdAllineamento" => "1382488437340",
                                 "Tipo" => "StopAllineamento",
                         "elapsed.time" => 4.0,
              "elapsed.timestamp_start" => "2015-03-23T02:33:55.000Z",
                               "Durata" => 4.0
}

Il filtro elapsed  elimina il log event sospeso e "arricchisce" quello di stop con durata e informazioni di "elapsed".

Esempio di tracciabilità per "Processo non terminato"

Ipotiziamo che nella coda degli Event log di Logstash arrivi il seguente evento:

[2015_03_23-02_33_55] [CONSOLIDAMENTO] ;
          [ALLINEAMENTO] Inizio allineamento dei dati validati dal consolidamento 
          con l'altro DB - id allineamento: 1382488437340;

La pipeline scompone l'evento usando il primo filtro grok riportato nella configurazione d'esempio:

{
              "message" => "[2015_03_23-02_33_55] [CONSOLIDAMENTO] ;
              [ALLINEAMENTO] Inizio allineamento dei dati validati dal consolidamento 
              con l'altro DB - id allineamento: 1382488437340;",
                  "@timestamp" => "2015-03-23T02:33:55.000Z",
                     "@version" => "1",
                     "type" => "cons",
                     "host" => "UFFICIO_N",
                     "logdate" => "[2015_03_23-02_33_55]",
                        "tags" => [
                  [0] "Dated",
                  [1] "StartAllineamento",
              ],
              "IdAllineamento" => "1382488437340",
                        "Tipo" => "StartAllineamento"
}

Il filtro elapsed  rileva [start_tag => "StartAllineamento"] e "sospende" il log event.

Se nel tempo di time-out non arriva l'evento di STOP ALLINEAMENTO, il filtro elapsed elimina il log event sospeso e ne genera uno nuovo di errore:

{
    "@timestamp" => "2015-03-23T02:34:10.162Z",
    "@version" => "1",
    "tags" => [
          [0] "elapsed",
          [1] "elapsed.expired_error"
              ],
    "host" => "UFFICIO_N",
    "IdAllineamento" => "1382488437340",
    "elapsed.time" => 15,
    "elapsed.timestamp_start" => "2015-03-23T02:33:55.000Z"
}

Il tag elapsed.expired_error governa la notifica della mail di alert (figura 1).

Conclusioni

Abbiamo cercato di descrivere il funzionamento del plug-in "elapsed" in un caso di uso reale di utilizzo dello stack ELK, illustrando come questo filtro sia in grado di risolvere il problema delle operazioni non completate.

Nonostante nel tempo abbiamo consolidato come Gruppo Imola esperienze e competenze anche sugli strumenti commerciali per la gestione e analisi del logging, come Splunk, riteniamo importante questa esperienza perch� abbiamo realizzato un plugin utilizzato dalla community da più di due anni, contribuendo così all'evoluzione dello strumento Logstash e dello Stack ELK.

Riferimenti

[1] Splunk

http://www.splunk.com

 

[2] ELK Stack by Elasticsearch company

http://www.elastic.co/products

 

[3] Gruppo Imola costituito da Imola Informatica S.P.A., Antreem Srl e MokaByte Srl

http://www.imolinfo.it

http://www.antreem.com

http://www.mokabyte.it

 

[4] Filtro Logstash "elapsed" realizzato da Cristian Faraoni (Imola Informatica S.P.A.) e Andrea Forni (Antreem Srl)

http://logstash.net/docs/1.4.2/filters/elapsed

 

[5] Redis

http://redis.io/

 

[6] Logstash

http://logstash.net

 

[7] Elasticsearch

https://www.elastic.co/

 

[8] Filtro Logstash "grok"

http://logstash.net/docs/1.4.2/filters/grok

 

[9] Grok Debbuger: verifica la correttezza sintattica e semantica dei pattern grok

https://grokdebug.herokuapp.com/

 

[10] "DB-Engines Ranking of Search Engines" (aprile 2015)

http://db-engines.com/en/ranking/search+engine

 

[11] Interssante articolo sull'evoluzione di Elasticsearch nell'universo Big Data

http://www.hostingtalk.it/big-data-cosa-serve-elasticsearch-come-evolve/

 

[12] Elastic HQ: console di management per elasticsearch

http://www.elastichq.org/

 

[13] Kibana

https://www.elastic.co/products/kibana

 

[14] Intervista a Claudio Bergamini (CEO di Imola Informatica S.P.A.) durante il "CMWL 2015. Big Data e Reti Neurali nel Complexity  Management"

http://www.complexityinstitute.it/?p=7514

 

[15] Intervista a Matteo Busanelli (Senior IT Consultant & Semantic Web Specialist di Imola Informatica S.P.A.) durante il "CMWL 2015. Big Data e Reti Neurali nel Complexity  Management"

http://www.complexityinstitute.it/?p=7563

 

 

 

 

 

 

 

Condividi

Pubblicato nel numero
207 giugno 2015
Cristian Faraoni è nato a Forlì (FC) nel 1970 ed è laureato in Scienze dell‘Informazione all‘Università degli studi di Bologna. A livello professionale, si occupa di sviluppo, analisi e progettazione del software dal 1997. Attualmente lavora per Imola Informatica S.P.A. svolgendo principalmente attività di consulenza.
Laureato con lode in ingegneria informatica a Bologna, ha iniziato da subito a lavorare presso Imola Informatica, dove ha ricoperto tutti i ruoli legati al ciclo di vita di un prodotto software. Attualmente svolge attività di consulenza su architetture web multicanale, architetture di integrazione e middleware semantici.
Articoli nella stessa serie
Ti potrebbe interessare anche