Un sistema di monitoraggio del traffico veicolare “in tempo reale”

IV parte: L’invio dei dati al topic di Kafkadi

Introduzione

Dopo aver completato la parte descrittiva dei framework, e presentata la big picture del sistema, da questa puntata vengono illustrate le integrazioni dei framework e le implementazioni di dettaglio dei moduli del sistema di monitoraggio real-time del traffico veicolare. Questa parte avrà come argomento il modulo software che simula l’invio dei dati dai veicoli al topic di Kafka. Nella realtà ciascun veicolo sarà equipaggiato con i dispositivi e il software necessari a inviare al topic Kafka i dati oggetto di monitoraggio.

 

Componenti IOT Data Producer

 Di seguito viene riportato il dettaglio del progetto Java in Eclipse.

Figura 1 – Data Producer.

Figura 1 – Data Producer.

 

Inviare i dati oggetto del monitoraggio

I dati oggetto di monitoraggio come l’identificativo univoco del veicolo, il tipo di veicolo, la posizione, la velocità e il livello di carburante, nell’istante di rilevazione vengono inviati al topic di Kafka sotto forma di JSon per poi essere serializzati in una classe Java come rappresentato nel codice di figura 2.

Figura 2 – Il codice della classe di serializzazione dei dati inviati al topic di Kafka.

Figura 2 – Il codice della classe di serializzazione dei dati inviati al topic di Kafka.

 

Dichiarare i puntamenti e la lista di topic

Nello stesso progetto Eclipse viene quindi codificato un file di properties dove vengono dichiarati i puntamenti alle istanze Zookeeper e Kafka e la corrispondente lista dei topic di destinazione.

Figura 3 – Parametri di puntamento alle istanze Zookeeper e Kafka.

Figura 3 – Parametri di puntamento alle istanze Zookeeper e Kafka.

 

Una classe per inviare i messaggi

Allo stesso progetto appartiene la classe Java che implementa la generazione di messaggi e la pubblicazione di questi sul topic di Kafka (fig. 4).

Figura 4 – Classe che implementa la generazione e l’invio dei messaggi.

Figura 4 – Classe che implementa la generazione e l’invio dei messaggi.

Analizziamo il codice

Esaminando i commenti, possiamo facilmente individuare le linee di codice dove sono referenziati i parametri di risoluzione dell’istanza di Kafka e di quella di Zookeeper, la lista dei broker, la lista dei topic sottoscritti e, infine. la classe Java che verrà utilizzata per serializzare il messaggio pubblicato sul topic in formato JSon.

Nella stessa classe viene quindi codificata la generazione dei messaggi casuali e il loro invio al topic di Kafka. La classe kafka.javaapi.producer.Producer è una API Java di Kafka utilizzabile per creare un nuovo messaggio da inviare a uno specifico topic e, opzionalmente, a una specifica partition di Kafka.

La classe Producer è un Generic Java; pertanto sarà necessario specializzare il tipo dei due parametri. Il primo è la chiave di partizione, il secondo la classe Java di serializzazione del messaggio. In questa implementazione, essi sono rispettivamente un tipo String e la classe scelta di serializzazione del messaggio.

La classe Producer

Per l’istanza della classe Producer è necessario dichiarare le proprietà di runtime: il cluster di istanze Kafka, la classe che serializza i messaggi e l’eventuale routing verso una specifica partizione. Nel dettaglio:

  • broker.list definisce il puntamento della classe Producer verso uno o più broker per determinare il leader di ciascun topic;
  • class definisce quale classe di serializzazione da utilizzare in fase di preparazione del messaggio da inviare al broker;
  • i parametri serializer e value.serializer indicano come trasformare in byte la chiave e gli oggetti valore presenti nella classe ProducerRecord. È possibile utilizzare ByteArraySerializer o StringSerializer per tipi semplici stringa o byte.

Con il parametro partitioner.class è possibile definire la classe da usare per determinare a quale partizione del topic deve essere inviato il messaggio. Questo parametro non viene usato in questa implementazione.

L’ultima proprietà che viene presa in considerazione è request.required.acks. Questa informa Kafka che la classe Producer richiede dal Broker la conferma di ricezione del messaggio. Diversamente, si potrebbe implementare una politica fire and forget con migliori performance, ma a costo di possibili perdite di dati. Con questa configurazione viene eseguito il controllo di completezza delle richieste; il valore “all” determina il blocco fino alla full commit del record. Si tratta di una configurazione ovviamente più lenta ma più stabile. Se la richiesta fallisce il producer potrà automaticamente riprovare per un numero di tentativi definiti dal parametro retries.

Il Producer mantiene, per ciascuna partizione, un buffer con i record non inviati. Questo buffer è della grandezza specificata dalla configurazione del parametro buffer.size. Chiaramente, al crescere di questo valore si ottengono buffer di dimensioni maggiori che possono gestire batch più grandi, determinando una maggiore occupazione di memoria.

Il parametro buffer.memory controlla la quantità di memoria messa a disposizione al Producer per la bufferizzazione dei messaggi. Se i record sono inviati più velocemente di quanto possano essere accettati dal server, allora questa area di bufferizzazione verrà esaurita. Con la saturazione del buffer l’invio di chiamate aggiuntive verrà bloccato. La soglia di tempo di blocco è determinata dal parametro max.bloc.ms dopo la quale viene sollevata una eccezione di timeout (TimeoutException).

Inviare il messaggio

Di seguito la codifica Java che implementa l’invio del messaggio al topic di Kafka.

Figura 5 – Codice che implementa l'invio del messaggio al topic di Kafka.

Figura 5 – Codice che implementa l'invio del messaggio al topic di Kafka.

 

Dal codice Java possiamo facilmente individuare la classe che serializza le informazioni IOT e la variabile KeyedMessage necessaria al metodo send() della classe Producer. Quest’ultimo è un metodo asincrono che, quando invocato, aggiunge il record da inviare a un buffer contenente i record in sospeso, ne processa quindi l’invio concludendo così l’elaborazione. Questo permette alla classe Producer di poter raggruppare insieme singoli record per inviarli poi in modo più efficiente.

La classe Producer è di tipo Thread Safe: la condivisione di una singola istanza tra molti thread è più performante di un modello di sviluppo a istanze multiple. La classe Producer consiste in un pool di buffer di memoria dove vengono conservati i record non ancora inviati al server e di un thread in background. Quest’ultimo è responsabile di trasformare questi record in richieste e di trasmetterli al cluster di Kafka. Il mancato rilascio della variabile di istanza di questa classe determinerebbe dei memori leak di queste risorse. 

Dalla versione 0.11 di Kafka la classe Producer fornisce due implementazioni aggiuntive: il Producer idempotente e quello transazionale. Il Producer idempotente rafforza la semantica di delivery da “almeno una” a “esattamente una” volta: gli eventuali tentativi aggiuntivi del Producer non introdurranno più duplicati. Il Producer transazionale permette a una applicazione di inviare messaggi a partizioni e topic multipli in modo atomico.

 

Conclusioni

In questa quarta parte è stata presentata l’implementazione del componente applicativo che simula l’invio dei dati oggetto di monitoraggio dai veicoli al topic di Kafka. Nella prossima puntata verrà descritto il secondo componente applicativo, quello che realizza la logica Spark di ingestion ed elaborazione dei dati presenti sul topic di Kafka.

 

Riferimenti

[1] Socks

socks-client

https://github.com/sockjs/sockjs-client

 

[2] Kafka

https://kafka.apache.org/quickstart

https://www.slideshare.net/mumrah/kafka-talk-tri-hug?next_slideshow=1

https://www.youtube.com/watch?v=U4y2R3v9tlY

https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

 

[3] Spark

https://spark.apache.org/docs/latest/index.html

https://data-flair.training/blogs/spark-rdd-tutorial/

https://data-flair.training/blogs/apache-spark-rdd-features/

 

[4] Spark e Cassandra data connector

https://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java

 

[5] Zookeeper

https://www.youtube.com/watch?v=gifeThkqHjg

https://www.slideshare.net/sauravhaloi/introduction-to-apache-zookeeper

 

[6] Kafka e Zookeeper

https://www.youtube.com/watch?v=SxHsnNYxcww

 

[7] Price Waterhouse and Cooper,  The Bright Future of Connected Cars
https://carrealtime.com/all/pwc-the-bright-future-of-connected-cars/

 

[8] Hadoop

https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html

 

[9] Srini Penchikala, Big Data Processing with Apache Spark – Part 1: Introduction. InfoQ, 30/01/2015

https://t.ly/FRcDu

 

[10] RDD

https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf

 

[11] WebSocket

http://jmesnil.net/stomp-websocket/doc/

 

[12] STOMP

https://stomp-js.github.io/guide/stompjs/rx-stomp/ng2-stompjs/using-stomp-with-sockjs.html

 

Condividi

Pubblicato nel numero
283 maggio 2022
')">
Luigi Bennardis si è laureato in Scienze Statistiche ed Economiche all’Università di Roma. Si occupa di informatica da un po’ di tempo ed è attualmente impegnato su tematiche DevOps. Nel tempo libero ancora gioca a basket, nuota ed è appassionato di elettronica vintage.
Ti potrebbe interessare anche