Attraverso Terracotta e i costrutti Java dedicati al multithreading è possibile costruire delle vere e proprie applicazioni distribuite, nelle quali thread eseguiti da diverse JVM, potenzialmente su diverse macchine, possono condividere uno stato comune, cooperando e sincronizzandosi nel modo familiare ai programmatori Java.
Una questione di “concorrenza”
Semi-ignorata dalla maggior parte dei programmatori, sempre più concentrati, per scelta o per necessità, a imparare l’ennesima versione dell’ennesimo framework miracoloso, la programmazione concorrente rimane uno degli aspetti più affascinanti (e complessi) dell’informatica. Nell’anno del Signore 2011, poi, stiamo assistendo a un vero boom degli strumenti dedicati al multithreading.
Da un lato, l’inclusione diretta in Java del package Java.util.concurrent ha fornito al linguaggio la maggior parte dei costrutti storicamente dedicati alla programmazione concorrente, inclusi i Monitor con condition esplicite, il cui uso consente un’espressività molto maggiore rispetto agli evergreen wait/notify.
Dal lato diametralmente opposto, la sempre maggiore diffusione di Scala ha finalmente portato alla luce un approccio completamente diverso alla programmazione concorrente, privo di stato condiviso e basato su scambio di messaggi anziche’ su sincronizzazione e cooperazione di thread, il modello actor-based. Si noti comunque che l’infrastruttura software su cui gli attori sono costruiti, per quanto invisibile al programmatore, è basata su sincronizzazione e stato condiviso.
Il nostro interesse in questo articolo sarà rivolto al primo modello, basato su stato condiviso. Nello scenario classico di un’applicazione Java (ma anche .NET), differenti thread possono trovarsi ad accedere contemporaneamente a una stessa risorsa, ad esempio la stessa istanza sullo heap di una qualche classe. Simili accessi contemporanei, se non propriamente gestiti, possono portare situazioni di errore molto difficili da gestire (e prevenire), le famigerate race conditions. In Java i costrutti synchronized, wait, notify e le più recenti e versatili features aggiunte in java.util.concurrent consentono di gestire propriamente queste situazioni. Quello che manca nativamente a Java è la possibilità di estendere questo modello a thread che condividono uno stato in memoria girando su JVM differenti! Per nostra fortuna tale lacuna viene colmata in maniera pressoche’ trasparente dall’entrata in scena di Terracotta.
Che cosa è Terracotta
Senza entrare nei dettagli di cosa Terracotta [3] sia, limitiamoci a definirlo per i nostri scopi un Servizio di clustering trasparente. Trasparente è riferito al linguaggio Java, in cui il clustering è ottenuto senza alcun modello di programmazione o senza API specifiche. Clustering indica la capacità di consentire a macchine e processi Java multipli di lavorare insieme, sugli stessi dati, e di poter comunicare tra di loro utilizzando semplicemente oggetti in memoria e threading logic.
Terracotta consente alle applicazioni Java di girare su quante macchine sia necessario, senza alcuna modifica specifica. Non viene imposto alcun modello specifico di programmazione come cluster-caching, EJB, o anche Hibernate o Spring. Terracotta può prendere un’applicazione esistente e scalarla in modo trasparente su molte macchine.
DSO, Root Objects e Distributed Locks
“Caspita!”, starete pensando dopo aver letto le ultime frasi, “questo Terracotta non scherza affatto, quasi quasi lo aggiungo ai framework da studiare!” Vediamo allora se ne vale la pena, analizzandone il cuore, il Distributed Shared Object (DSO). Tale definizione identifica una zona di memoria condivisa a tutte le JVM controllate dal server Terracotta, una sorta di super-heap.
Affinche’ Terracotta possa effettivamente operare sul nostro codice (bytecode in realtà, non lo dimentichiamo), occorre indicare nella configurazione quali classi devono essere “ispezionate e controllate”. Questo avviene tramite il file di configurazione generale di Terracotta, nella seguente sezione:
it.jugmilano.terracotta.example1..*
In questo esempio stiamo indicando a Terracotta di “gestire” tutte le classi presenti nel package indicato nell’elemento . La sintassi per specificare l’insieme delle classi instrumented è essenzialmente ripresa da AOP e può quindi consentirci una grande precisione nell’individuare esattamente solo le classi necessarie ai nostri scopi. Ma quali sono tali classi, ossia quali classi devono essere instrumented? Non ci sono particolari vincoli, qualunque classe può essere scelta, ma per ragioni di performance è bene limitare l’insieme a quello delle classi che usano root objects.
Oggetti root
Un oggetto definito come root è l’elemento radice di un grafo di oggetti clusterizzati. Qualunque variabile di una classe può essere dichiarata come root. La prima volta che nel codice Java viene assegnato un valore ad un oggetto root, Terracotta trasforma lo stesso in un clustered object, allocandolo sullo heap condiviso. In questo processo, Terracotta naviga l’intero grafo degli oggetti raggiungibili dalla root, ed essi vengono a loro volta allocati sullo heap condiviso. Le root sono dichiarate nel file di configurazione di Terracotta con la seguente sintassi:
my.package.MyClass.myRootField
Tutti i field dichiarati come root assumono una speciale semantica:
- La prima volta che un field root viene assegnato da una JVM, la root è creata nel cluster Terracotta.
- Una volta assegnato, il valore di un field root non può essere cambiato. Tutti i successivi assegnamenti, di qualunque JVM, vengono ignorati.
- L’oggetto top level di un grafo root, non viene mai reclamato dal garbage collector distribuito di Terracotta.
- Le root hanno lo stesso ciclo di vita del clustered heap, e non sono legate alla singola JVM.
Ma come fa terracotta a trasformare in maniera trasparente una variabile d’istanza o statica in un oggetto condiviso? Terracotta opera esclusivamente a livello di bytecode. Guidato da quanto indicato nella sezione instrumented-classes, esso opera una ricerca sulla presenza di oggetti root, e ne trasforma il bytecode. In altre parole, Terracotta si inserisce tra la logica applicativa e la memoria della singola JVM, ispezionando tutte le chiamate allo heap che coinvolgono root objects.
In particolare, tutte le letture/scritture sullo heap che coinvolgono root objects vengono sostituite da letture/scritture sul clustered heap.
HEAPREAD() → CLUSTEREDHEAPREAD() HEAPWRITE() → CLUSTEREDHEAPWRITE()
Quando diversi thread accedono concorrentemente agli stessi dati sullo heap, speciali precauzioni devono essere prese per evitare interferenze distruttive tra di essi.
Distributed locks
Naturalmente la cosa è ancora valida quando diversi thread su diverse JVM accedono contemporaneamente lo stesso oggetto sul clustered-heap. Come detto più volte, su singola JVM, Java sin dagli albori offre meccanismi integrati di mutua esclusione e collaborazione, nella forma del costrutto synchronized e dei metodi di Object wait e notify.
Cosa deve cambiare nel codice quando mutua esclusione e collaborazione assumono una valenza a livello di cluster? Assolutamente nulla! Terracotta fornisce le stesse garanzie di serializzazione degli accessi, coordinamento e visibilità a thread in differenti JVM rispetto a quanto fa la singola JVM sui suoi thread/heap. I meccanismi che Terracotta utilizza per ottemperare a queste semantiche sono i locks.
Terracotta amplia la semantica dei lock standard di Java in modo da dargli un effetto a livello di cluster. Il lock clusterizzato è iniettato nel bytecode in base a quanto specificato nella sezione lock del file di configurazione, con la seguente sintassi:
write void HelloWorld.method(..)
Questa configurazione istruisce Terracotta di ispezionare tutti i metodi che corrispondono alla method-expression dentro le instrumented-classes, di verificare se al loro interno ci sono accessi synchronized a field root e, nel caso, di sostituire il lock JVM con un lock clustered. A differenza di Java, i lock di Terracotta hanno diversi livelli.
- Write locks sono i classici lock di mutua esclusione; essi garantiscono che solo un thread nell’intero cluster possa ottenere l’accesso all’oggetto protetto.
- Synchronous write locks aggiungono la garanzia che il lock non sia rilasciato fin quando le modifiche effettuate dal thread corrente non siano state effettivamente propagate al cluster.
- Read locks consentono a diversi thread di acquisire il lock sull’oggetto, a patto di non effettuare nessuna operazione di scrittura (a pena di eccezione runtime). Nessun thread può acquisire un lock a livello write se altri thread detengono un lock a livello read. Inoltre a nessun thread è consentito di ottenere un lock read se un altro thread detiene un lock a livello write.
Forti di tutte queste definizioni (e anche un po’ confusi), cerchiamo di metterle in pratica con alcuni semplici esempi.
Clustering in pratica 1: lock clusterizzati
Immaginiamo di avere un insieme di thread il cui compito consista unicamente nell’estrarre dei Job da una coda condivisa, elaborarli e cambiarne lo stato una volta terminato. Tali thread non devono necessariamente essere eseguiti sulla stessa JVM. La situazione è schematizzata in figura 1.
Figura 1 – I thread devono estrarre dei job da una coda condivisa, per elaborarli e cambiarne lo stato.
La coda dei job è evidentemente un oggetto condiviso, quindi va protetta dall’accesso concorrente da parte di thread differenti (che siano sulla stessa o su diverse JVM). Una prima, molto semplificata, soluzione è la seguente:
public class SimpleJobQueue implements JobQueue { private List jobs = new ArrayList(); public SimpleJobQueue(int size){ for (int i=0;i<size;i++){ jobs.add(new SimpleJob()); } } public Job getJob(Job job) { return null; } public Job getJobAt(int index) { return jobs.get(index); } }
Nel codice appena visto, Job è un’interfaccia che potremmo implementare come nella classe SimpleJob di seguito riportata:
public interface Job { public static final int NOT_ASSIGNED = 0; public static final int ASSIGNED = 1; public static final int COMPLETED = 2; public void changeStatus(int status); public int getStatus(); } public class SimpleJob implements Job { private int jobStatus; public SimpleJob(){ jobStatus = NOT_ASSIGNED; } public void changeStatus(int status) { this.jobStatus = status; } public int getStatus() { return jobStatus; } }
Ed infine ecco il thread che fa il “lavoro sporco”, prelevando i Job dalla coda.
public class SimpleWorker implements Runnable { private String name; private JobQueue queue; private int index; public SimpleWorker(String name, JobQueue queue, int index) { this.name = name; this.queue = queue; this.index = index; } public void run() { int status; Job job; synchronized (queue) { job = queue.getJobAt(index); status = job.getStatus(); log("Extracted Job " + index + " in status " + status); if (status == Job.NOT_ASSIGNED) { job.changeStatus(Job.ASSIGNED); log("Assigned Job " + index); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } job.changeStatus(Job.COMPLETED); log("Completed Job " + index); } else { log("Job " + index + " Allready Assigned - Nothing TO DO"); } } } private void log(String message) { System.out.println("Thread " + name + " ----- " + message); } }
Il codice del thread non è nulla di trascendentale. Dopo aver acquisito in un blocco synchronized l’accesso esclusivo alla coda, estrae da essa l’elemento in posizione index e, se necessario, ne cambia lo stato. Per ipotesi semplificativa di lavoro, ciascun thread conosce a priori l’elemento della coda su cui operare. Quello che è interessante, è riuscire a lanciare istanze di tali thread su JVM diverse, facendo in modo che condividano la stessa coda in memoria, ossia che accedano alla stessa istanza di JobQueue.
Per fare questo occorre mettere in pratica quanto imparato sul DSO di Terracotta. La prima cosa da fare è scrivere un metodo main, da eseguire poi in tre differenti shell lanciando tre volte l’interprete Java. Proviamo a scriverlo.
public class SimpleMain { JobQueue queue = new SimpleJobQueue (10); CyclicBarrier barrier = new CyclicBarrier (3); public void launch ( int base ) { launchThreads( queue, base); try { barrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("-----------------------------------"); System.out.println("All threads completed their job. " + queue); } private void launchThreads(JobQueue queue, int blockNumber){ for (int i=0; i < 10; i++) { new Thread( new SimpleWorker("worker" + (i + blockNumber) , queue, i,)).start(); } } public static void main(String[] s) { int base = 0; if ( s.length==1 ) { base = Integer.parseInt(s[0]); } new SimpleMain().launch ( base ); } }
Chiaramente, senza uno strumento come Terracotta, eseguire tre volte tale codice in tre diverse JVM non sortirebbe l’effetto desiderato. Infatti si avrebbero tre diverse istanze della coda su ciascuno degli heap locali. Al contrario, noi vogliamo che la coda sia condivisa dai 30 threads totali, 10 per ciascuna JVM.
La barriera serve semplicemente a far sì che le due JVM più veloci aspettino la più lenta prima di terminare. Evidentemente, al pari della coda, anche l’oggetto barrier deve essere condiviso tra le tre JVM in esecuzione. Per ottenere questo risultato, le variabili barrier e queue devono essere dichiarate come root.
Ma cosa cambia in pratica dichiarandole come root? Prendiamo in esame le prime linee significative di codice, eseguite dalle tre istanze di SimpleMain in esecuzione.
JobQueue queue = new SimpleJobQueue (10); CyclicBarrier barrier = new CyclicBarrier (3);
Essendo la variabile queue dichiarata come root, tutte le operazioni su di essa, incluso l’assegnamento, hanno effetto sul clustered heap e non sullo heap locale. In particolare, alla prima esecuzione dell’istruzione new, Terracotta verifica che, sullo heap condiviso, la variabile queue non sia ancora mai stata assegnata e procede alla sua istanziazione sul super heap. Quando il secondo ed il terzo main eseguono la stessa operazione, Terracotta accede al clustered heap, verifica che la variabile root queue è già stata assegnata, e ignora l’istruzione. Tutte le successive operazioni sulla variabile queue (e sul grafo di oggetti ad essa collegati) avvengono sul clustered heap e non sul local heap, come illustrato in figura2. Le stesse identiche considerazioni valgono per l’oggetto barrier.
Figura 2 – Le operazioni di Terracotta avvengono sullo heap clustered e non su quello locale.
Essendo queue un oggetto root, inoltre, l’istruzione synchronized(queue) cessa di operare sullo heap locale e viene automaticamente propagata da Terracotta a livello di cluster. Tutti i thread su tutte le JVM in esecuzione (controllate da Terracotta) condividono lo stesso lock sul super heap e sono quindi in competizione per aggiudicarselo.
Il file di configurazione di Terracotta
Per ottenere questo mirabile risultato, dobbiamo anzitutto scrivere un opportuno file di configurazione per Terracotta, la cui parte saliente è riportata di seguito (troverete il listato completo nel progetto allegato, “projectTerracotta.zip”, che potete scaricare a partire dal menu in alto a destra).
*..* * *..*(..) write it.jugmilano.terracotta.example1 .main.SimpleMain.queue it.jugmilano.terracotta.example1 .main.SimpleMain.barrier
Test!
Infine, testiamo il tutto. Per prima cosa bisogna mandare in esecuzione, con l’apposito script fornito nella cartella di installazione, il server Terracotta. Una volta che Terracotta è in esecuzione, occorre istruire le varie JVM della sua esistenza. Questo comporta semplicemente passare gli opportuni jvm arguments all’interprete Java o, più semplicemente, lanciare lo script dso-java fornito con Terracotta, che lo fa in automatico. Ad esempio, relativamente ai path che troverete negli esempi allegati (scaricabili dal menu in alto a destra):
dso-Java -Dtc.config=../tc-config-simple-worker.xml it.jugmilano.terracotta.example1.main.SimpleMain %base%
dove tc-config-simple-worker.xml è il file di configurazione di Terracotta descritto in precedenza.
Clustering in pratica 2: miglioramento dei lock clusterizzati
Il nostro primo esempio è servito a comprendere molte cose, ma ha diversi difetti. In particolare la gestione della sincronizzazione è un evidente collo di bottiglia.
synchronized (queue) { job = queue.getJobAt(index); status = job.getStatus(); log("Extracted Job " + index + " in status " + status); }
Questo codice guadagna un lock esclusivo sull’oggetto queue, impedendo ad ogni altro thread di accedervi (altri eventuali thread si bloccano nell’entry set). Essendo tuttavia queue definito come root, tutti i thread di tutte le JVM in esecuzione nel cluster devono onorare tale lock ! Proviamo a fare di meglio, riducendo le contese.
Poiche’ nel nostro scenario semplificato ciascun thread sa a priori a quale job della coda deve accedere, potremmo dividere la politica di locking a livello del singolo job e non più dell’intera coda. Ciò comporta diverse modifiche. Anzitutto occorre modificare la stessa coda, ad esempio nel modo seguente.
public class ConcurrentJobQueue implements JobQueue { private List jobs = new ArrayList(); private List locks = new ArrayList(); public ConcurrentJobQueue(int size){ for (int i=0;i<size;i++){ jobs.add(new SimpleJob()); locks.add(new ReentrantLock()); } } public Job getJobAt(int index) { return jobs.get(index); } public Lock getLockAt(int index) { return locks.get(index); }
La lista di Lock ha lo scopo di ridurre le contese di accesso alla lista condivisa. Ciascun thread adesso compete solo con gli altri due thread aventi lo stesso valore di index, come illustrato dal listato seguente (come nel caso precedente, ipotizziamo di operare lanciando tre diverse JVM, ciascuna con i propri 10 thread)
public void run() { int status; Job job; queue.getLockAt(index).lock(); job = queue.getJobAt(index); status = job.getStatus(); log("Extracted Job " + index + " in status " + status); if (status == Job.NOT_ASSIGNED) { job.changeStatus(Job.ASSIGNED); log("Assigned Job " + index); queue.getLockAt(index).unlock(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } queue.getLockAt(index).lock(); job.changeStatus(Job.COMPLETED); log("Completed Job " + index); queue.getLockAt(index).unlock(); } else { log ("Nothing TO DO"); queue.getLockAt(index).unlock(); } }
E infine il nuovo metodo main per testare la nuova versione.
public class ConcurrentMain { JobQueue queue = new ConcurrentJobQueue(10); CyclicBarrier barrier = new CyclicBarrier(3); public void launch(int base){ launchThreads(queue, base); try { barrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("-----------------------------------"); System.out.println("All threads completed their job. " + queue); } private void launchThreads(JobQueue queue, int blockNumber){ for (int i=0; i < 10; i++){ new Thread(new ConcurrentWorker("worker_" + (i + blockNumber) , queue, i)).start(); } } public static void main(String[] s){ int base = 0; if (s.length==1){ base = Integer.parseInt(s[0]); } new ConcurrentMain().launch(base); } }
Si noti infine che il file di configurazione di Terracotta rispetto all’esempio precedente non necessita di cambiamenti. La lista dei Lock è infatti all’interno del grafo di oggetti di un root-object ed assume quindi significato a livello cluster senza nessuna altra necessità esplicita di configurazione.
Clustering in pratica 3: collaborazione tra thread distribuiti
Complichiamo ulteriormente lo scenario: finora i nostri thread lavorano in esclusione; il primo che guadagna l’accesso al job (entry della coda condivisa) effettua tutto il lavoro, gli altri aspettano passivamente. In uno scenario più realistico, diversi thread potrebbero lavorare in collaborazione tra loro. Ad esempio, il primo che guadagna l’accesso esegue un primo cambio di stato, il secondo un altro e così via fino ad arrivare nello stato finale ove il job è effettivamente stato completato.
Per rendere le cose più interessanti, scriviamo un worker generico in grado di riconoscere lo stato del job corrente e di comportarsi di conseguenza. Questo comporta la necessità di ripensare leggermente i diversi attori. Per prima cosa il job cambia definizione:
public interface CollaborativeJob { public static final int INITIAL_STEP = 0; public static final int STEP_1 = 1; public static final int STEP_2 = 2; public static final int FINAL_STEP = 3; public void changeStatus(int status); public int getStatus(); } public class SimpleCollaborativeJob implements CollaborativeJob { private int jobStatus; public SimpleCollaborativeJob(){ jobStatus = INITIAL_STEP; } public void changeStatus(int status) { this.jobStatus = status; } public int getStatus() { return jobStatus; } }
Anche la coda necessita di essere modificata, aggiungendo le strutture necessarie ad implementare i meccanismi di collaborazione.
public class CollaborativeJobQueue implements JobQueue{ private List jobs = new ArrayList(); private List locks = new ArrayList(); private List waitSets = new ArrayList(); public CollaborativeJobQueue(int size){ for (int i=0;i<size;i++){ jobs.add(new SimpleCollaborativeJob()); Lock lock = new ReentrantLock(); locks.add(lock); waitSets.add(lock.newCondition()); } } public CollaborativeJob getJobAt(int index) { return jobs.get(index); } public Lock getLockAt(int index) { return locks.get(index); } public Condition getWaitSetAt(int index) { return waitSets.get(index); } }
Ed infine il nuovo thread worker
public void run() { CollaborativeJob job; queue.getLockAt(index).lock(); job = queue.getJobAt ( index ); try { if ( job.getStatus() != CollaborativeJob.FINAL_STEP) { while (desiredStatus - job.getStatus() > 1) { log("Extracted Job " + index + " in status " + job.getStatus() + " waiting desiredstatus= " + desiredStatus); queue.getWaitSetAt(index).await(); } Thread.sleep(1000); job.changeStatus(desiredStatus); log("Job "+index + "status= " + job.getStatus() + " desired= " + desiredStatus); log("Job " + index + " completed "); queue.getWaitSetAt(index).signalAll(); queue.getLockAt(index).unlock(); } else { log("Nothing TO DO status= " + job.getStatus() + " desiredstatus= " + desiredStatus); queue.getLockAt(index).unlock(); } } catch (InterruptedException e) { queue.getLockAt(index).unlock(); e.printStackTrace(); } }
L’utilizzo di una lista di oggetti Condition, derivati dai rispettivi ReentrantLock consente che ciascun thread si possa sospendere volontariamente in attesa che la propria condizione di esecuzione venga a verificarsi.
La configurazione
Come cambia la configurazione di Terracotta rispetto all’esempio precedente? In nessun modo! Terracotta onora la semantica di java.util.concurrent: non occorre nessuna configurazione particolare. L’unico compito richiesto allo sviluppatore è la correttezza del comportamento runtime e l’assenza di race-conditions ed errori time depending, esattamente come se si programmasse in un ambiente concorrente puro.
Quest’ultimo esempio illustra perfettamente la totale trasparenza di Terracotta nel fornire allo sviluppatore un ambiente clusterizzato e un reale parallelismo tra i thread Java. Si pensi per contro a quanto sarebbe stato più complicato e invasivo realizzare questa stessa soluzione utilizzando altre soluzioni del mondo Java (database, JMS, RMI…).
Struttura degli esempi allegati
In figura 3 è riportata la struttura del progetto fornito a corredo. Per testare gli esempi così come sono è sufficiente lanciare diverse istanze dei file shell DOS forniti a corredo. Ciascun file batch corrisponde a uno dei tre esempi descritti in precedenza. Naturalmente occorre che il server Terracotta sia in esecuzione affinche’ tutto possa funzionare.
Figura 3 – La struttura del progetto allegato “projectTerracotta.zip” (scaricabile dal menu in alto sulla destra).
Conclusioni
Terracotta è uno strumento estremamente complesso e potente, ma allo stesso tempo semplice da utilizzare avendo esperienza di programmazione concorrente. I concetti illustrati sono alla base di soluzioni di grid computing, realizzabili con basso costo e ridotta difficoltà implementativa.
Ad esempio, tramite Terracotta e i concetti di root, lock e instrumentedclasses è abbastanza semplice realizzare un effettivo pattern master-worker, utilizzabile per sistemi di produzione ad alta affidabilità e in grado di rispondere a elevate esigenze prestazionali. Altre soluzioni di grid o cloud computing possono essere realizzate secondo gli stessi principi di base.
Riferimenti
[1] Terracotta Inc, “The definitive Guide to Terracotta”, aPress, 2008
[2] Guido Anselmi, “Clustering di applicazioni Java SE con Terracotta” JUG Milano: Meeting # 42, gennaio 2011
http://www.ustream.tv/recorded/12282794
[3] Il sito ufficiale di Terracotta