Introduzione
In questa quinta parte proseguiamo la descrizione della soluzione di monitoraggio real time presentando i dettagli di implementazione del secondo modulo applicativo, quello di Data Ingestion & Computing, realizzato da una libreria Java, eseguita da Spark ad ogni iterazione del batch.
Componente di Data Ingestion & Computing
Questo componente applicativo implementa, per mezzo di Spark Streaming, la logica di ingestion dei dati pubblicati sul topic Kafka. Si tratta di una libreria Java distribuita nel runtime di Spark, che realizzerà la logica di Data ingestion & computing che soddisfa i requisiti di monitoraggio stabiliti.
Di seguito lo skeleton del progetto Eclipse.
Requisiti di monitoraggio
Il sistema di monitoraggio dovrà soddisfare i seguenti requisiti.
Anzitutto, la visualizzazione di dettaglio dei veicoli in prossimità delle coordinate di un Point of Interest stabilito, come da tabella riportata di seguito.
Oltre a questo, la visualizzazione della distribuzione del numero di veicoli raggruppati per tipo e per direttrice stradale di rilevazione, come da tabella riportata di seguito.
Sulla base di queste evidenze sarà codificata la libreria Java che, dopo aver impostato i parametri di esecuzione di Spark, applicherà ai messaggi di ogni batch di ingestion la sequenza di trasformazioni necessarie a ottenere il DStream finale. Su quest’ultimo verranno applicate le funzioni di calcolo necessarie a soddisfare i requisiti di monitoraggio richiesti. Nei paragrafi successivi verrà descritto il dettaglio di ciascuna delle trasformazioni eseguite da Spark.
Configurazione di runtime per Spark
Riportiamo di seguito i fragment salienti del codice della classe Java che realizza l’esecuzione e la gestione del processo di streaming di Spark. Nella prima parte di questa classe viene codificato quanto necessario alla configurazione di esecuzione di Spark e alla risoluzione dei servizi delle istanze di Kafka e Cassandra. La valorizzazione di questi parametri viene disaccoppiata dalla codifica Java attraverso un file di properties. Tra i parametri di esecuzione di Kafka, possiamo notare le classi che saranno utilizzate per serializzare e deserializzare i messaggi JSon pubblicati sul topic di Kafka.
Per attualizzare a run time quanto codificato, vengono riportate di seguito le coppie chiave-valore dei parametri di esecuzione di Spark definite nel file di properties:
Una volta che le configurazioni di Spark sono state dichiarate, viene creato uno streaming context dal topic di Kafka con batch eseguiti a intervalli di 5 secondi.
La classe org.apache.spark.streaming.api.java.JavaStreamingContext rappresenta l’entry point principale di tutte le operazioni di streaming di Spark.
Qualsiasi processo di Spark Streaming deve partire dalla creazione di una istanza dell’oggetto di StreamingContext. Sarà poi attraverso il metodo statico createDirectStream della classe KafkaUtils che verrà definito il contesto di streaming di dettaglio, in questo caso verso Kafka.
Ingestion di Spark dei messaggi Kafka
Ad ogni iterazione del batch, Spark esegue il trasferimento in una classe Java di tipo JavaInputDStream dei messaggi presenti nel topic di Kafka non ancora processati.
La variabile messagesTypeOfJAvaDStream è un JavaInputDstream di tipo ConsumerRecord<K,V>. ConsumerRecord<K,V>. Questo tipo rappresenta una collection di coppie chiave-valore ricevute dal topic di Kafka definito nel javaStreamSparkCntxt, il contesto Spark di streaming di tipo Java.
Nell’implementazione presentata la collection ConsumerRecord<K,V> viene attualizzata con tipi Stringa sia per la chiave che per il valore. Quest’ultimo conterrà il messaggio in formato JSON che verrà poi serializzato nella classe Java di tipo IOTData. La classe Java ConsumerRecord<String, String> contiene inoltre informazioni circa il nome del topic, il numero della partizione dalle quali il record viene ricevuto, un offset che punta al record nella partizione Kafka, e un timestamp come indicato dal corrispondente ProducerRecord.
JavaInputDStream è una interfaccia Java-frendly della classe InputDStream che a sua volta estende Dstream, il Discretized Stream che, come già descritto, rappresenta un’astrazione di base di Spark Streaming. Il Discretized Stream è costituito da una sequenza continua di Resilient Distributed Dataset (RDD) dello stesso tipo.
Nella firma del metodo statico createDirectStream troviamo i parametri LocationStrategies e ConsumerStrategies.
LocationStrategies
Le API consumer di Kafka attuano la bufferizzazione dei messaggi. Pertanto, per motivi prestazionali, è importante che l’integrazione con Spark sia tale che i client vengano mantenuti memorizzati nella cache degli esecutori, più che vengano ricreati ad ogni batch.
Nella maggior parte dei casi, la strategia PreferConsistent è quella più adeguata, in quanto distribuisce le partizioni in modo uniforme tra tutti gli esecutori disponibili.
Nel caso in cui gli esecutori si trovino sugli stessi host dei broker Kafka, la strategia PreferBrokers attuerà la pianificazione delle partizioni sul leader Kafka.
Infine, nel caso in cui sia significativo il disallineamento di carico tra le partizioni, la strategia PreferFixed consente di stabilire un mapping esplicito delle partizioni verso gli host.
ConsumerStrategies
La API per i consumer di Kafka può specificare i topic in diversi modi. ConsumerStrategies fornisce un’astrazione che consente a Spark di riottenere correttamente gli utenti configurati anche dopo il riavvio dal checkpoint.
ConsumerStrategies.Subscribe permette la sottoscrizione a una collezione definita di topic.
Attraverso il parametro SubscribePattern è possibile utilizzare una regular expression per specificare i topic di interesse. Il parametro Assign consente di specificare una collezione fissa di partizioni.
Tutte le strategie di spongono di costruttori di overload che consentono di specificare l’offset iniziale di una specifica partizione.
Inoltre ConsumerStrategy è una classe pubblica che si può estendere implementando quanto necessario per esigenze specifiche di configurazione.
Serializzazione dei messaggi Kafka
I Discretized Stream possono essere creati sia da dati di live ingestion da un topic Kafka utilizzando uno streamingContext, sia da trasformazioni di Discretized Stream esistenti utilizzando le operazioni di:
- map;
- window;
- filter;
- reduceByKeyAndWindow
Come descritto al paragrafo precedente, ad ogni iterazione del batch Spark la variabile messagesTypeOfJAvaDStream, di tipo JavaInputDStream<ConsumerRecord<String, String>> contiene i messaggi trasferiti dal topic Kafka. L’operazione successiva del batch sarà quindi quella di applicare a quest’ultimo JavaInputDStream il metodo map per ottenere un nuovo JavaDStream di tipo IoTData, la classe che serializza in un oggetto Java ogni messaggio pubblicato su Kafka in formato JSon.
Di seguito la funzione che trasferisce la variabile di tipo ConsumerRecord<String,String> restituita da Kafka in un JavaDStream di tipo IOTData, utilizzando per la trasformazione la classe GsonBuilder .
Internamente un DStream è caratterizzato da poche proprietà di base; una lista di altri DStream da cui dipende, l’intervallo di tempo nel quale il DStream genera un RDD, e una funzione che viene usate per generare un RDD ad ogni intervallo di tempo.
Individuazione dei veicoli distinti nei batch dei messaggi
La variabile nonFilteredIotDataStream di tipo JavaDStream, ottenuta al passo precedente, conterrà i tutti i messaggi dei veicoli circolanti sotto forma di classe di serializzazione di tipo IoTData.
Obiettivo di questo step è quello di ottenere, da questo Dstream, un DStream ridotto ai soli veicoli distinti per calcolarne l’effettiva numerosità sulle direttrici stradali. Sarà quindi applicando in sequenza i metodi .mapToPair e .reduceByKey alla varaibile nonFilteredIotDataStream che si otterrà un nuovo DStream di tipo JavaPairDStream.
Un JavaPairDStream è un’interfaccia al DStream di tipo coppia di chiave e valore.
In questo caso il metodo mapToPair riduce il DStream riversato da Kafka nella operazione di Spark streaming ai soli veicoli distinti. La chiave consiste nell’identificativo univoco del veicolo e il valore la classe IOTDati che serializza il messaggio Kafka ricevuto.
Assegnazione del flag di permanenza su una direttrice
Per calcolare il numero dei veicoli che percorrono una determinata strada, è necessario verificare se i veicoli siano già stati processati in un batch antecedente. In questo step quindi il DStream ottenuto al passo precedente viene esteso in uno nuovo e arricchito da un flag di stato che marca, per un intervallo di tempo definito, la permanenza dei veicoli nella direttrice stradale di rilevazione.
Viene quindi creato un nuovo DStream di tipo JavaMapWithStateDStream a partire dal precedente applicando il metodo mapWithState, con una scadenza di questo stato impostata a 60 minuti. Di seguito il fragment di codice che implementa questa funzionalità.
Riduzione del DStream ai soli veicoli già processati
Una volta valorizzato il flag di permanenza su una direttrice ai veicoli considerati in tutti i batch, dal DStream del passo precedente viene creato un nuovo JavaDStream attraverso l’applicazione di un filtro che mantiene i soli veicoli processati. Questo si ottiene applicando in sequenza i metodi .map e .filter con regola Boolean.FALSE.
Di seguito il fragment di implementazione.
Definizione del DStream finale con i messaggi in formato IOTData
L’ultima operazione consiste nel trasformare il DStream del passo precedente in uno nuovo di tipo JavaDStream contenente i soli oggetti IOTData che serializzano i messaggi inviati a Kafka in formato JSon.
Questo DStream viene quindi persistito nella cache, e sarà utilizzato per le operazioni di calcolo del traffico totale della finestra temporale che verranno illustrate nella prossima parte.
Conclusioni
In questa quinta parte sono stati presentati i requisiti di monitoraggio e i dettagli di implementazione del componente applicativo di Data ingestion & computing, relativi all’avvio del batch di Spark Streaming e degli step di preparazione del DStream finale. Sarà su quest’ultimo che saranno applicate le funzioni di trasformazione e persistenza su Cassandra, che nella prossima parte completeranno la descrizione dei dettagli di implementazione di questo componente applicativo.
Riferimenti
[1] Socks
socks-client
https://github.com/sockjs/sockjs-client
[2] Kafka
https://kafka.apache.org/quickstart
https://www.slideshare.net/mumrah/kafka-talk-tri-hug?next_slideshow=1
https://www.youtube.com/watch?v=U4y2R3v9tlY
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
[3] Spark
https://spark.apache.org/docs/latest/index.html
https://data-flair.training/blogs/spark-rdd-tutorial/
https://data-flair.training/blogs/apache-spark-rdd-features/
[4] Spark e Cassandra data connector
https://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java
[5] Zookeeper
https://www.youtube.com/watch?v=gifeThkqHjg
https://www.slideshare.net/sauravhaloi/introduction-to-apache-zookeeper
[6] Kafka e Zookeeper
https://www.youtube.com/watch?v=SxHsnNYxcww
[7] Price Waterhouse and Cooper, The Bright Future of Connected Cars
https://carrealtime.com/all/pwc-the-bright-future-of-connected-cars/
[8] Hadoop
https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html
[9] Spark
[10] RDD
https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf
[11] WebSocket
http://jmesnil.net/stomp-websocket/doc/
[12] STOMP
https://stomp-js.github.io/guide/stompjs/rx-stomp/ng2-stompjs/using-stomp-with-sockjs.html