L'evoluzione di Java: verso Java 8

IV parte: Java SE 7, fork et imperadi

Architetture multi-core/multi-processore hanno ormai invaso il mercato di largo consumo. Relegate inizialmente al mondo dei server, dominano oramai mercato dei desktop e laptop, e ultimamente anche quello degli smartphone e dei tablet. Diviene sempre più pressante avere software in grado di sfruttare al massimo le potenzialità di queste architetture. Nel mondo Java ciò si traduce in implementare software che la VM, in collaborazione con il sistema operativo, può facilmente distribuire sui vari core/processori disponibili. Vediamo come il framework fork/join, introdotto con Java SE7, vada in questa direzione, permettendo di implementare in maniera veloce ed elegante algoritmi 'divide and conquer'.

Premessa: aiutiamo l'Emilia!

Questo articolo, come gli altri, è prodotto con un grande investimento personale da parte degli autori. Il tempo necessario per la scrittura è spesso sottratto agli affetti familiari e non prevede alcuna remunerazione. Gli articoli sono redatti con il grande supporto di MokaByte che da sempre lavora per la libera circolazione della conoscenza. Ma, in un'ottica "donation-ware", se questo articolo è di vostro gradimento e vi è risultato utile, stavolta vi chiediamo di eseguire un versamento a una delle iniziative che supportano la ricostruzione dell'Emilia, il cui tessuto produttivo è stato duramente danneggiato dal recente terremoto.

Introduzione

Nell'articolo precedente (cfr. [1]) abbiamo presentato le innovazioni introdotte al linguaggio Java attraverso il progetto Coin. Come si è avuto modo di vedere, non tutte sono sembrate migliorative. Una feature che invece introduce un ulteriore miglioramento alla programmazione multi-threading (MT) è il framework fork/join (FFJ). In questo caso si tratta di una feature veramente interessante che permette di implementare in maniera veloce ed elegante algoritmi di tipo "divide and conquer" (che poi sarebbe la traduzione inglese dell'antico motto latino "divide et impera", "dividi e conquista"). Il FFJ, come lecito attendersi, si innesta nella struttura del package java.util.concurrent, che a sua volta, presenta altre interessanti aggiornamenti: l'introduzione della classe Phaser, TransferQueue, CuncurrentLinkedDeque (utilizzata dal FFJ) e ThreadLocalRandom. Per comprendere pienamente le varie feature, il presente articolo inizia con una breve ricapitolazione sia delle componenti del package della concorrenza, sia, più in generale del MT in Java.

I prossimi paragrafi dedicati a una breve panoramica del MT in Java possono essere saltati da chi conosce bene l'argomento: chi è interessato esclusivamente al Framework Fork/Join (FFJ) può passare direttamente al paragrafo "Il framework fork/join".

Breve storia del MT in Java

Tutti coloro che hanno seguito l'evoluzione del linguaggio Java fin dalla sua genesi, ricordano la straordinaria (ma poco ottimale) enfasi conferita alla programmazione MT. Ciò si evince da una serie di elementi nativi del linguaggio, quali la presenza di specifiche classi e interfacce disegnate appositamente per supportare il MT e la disponibilità di determinate parole chiave e metodi dedicati alla programmazione MT. Per esempio, affinchè i metodi (delle istanze) di una classe siano eseguiti all'interno di un apposito thread dedicato, è sufficiente far sì che la classe implementi l'interfaccia Runnable (e quindi definisca l'implementazione del metodo run()), oppure, in alternativa, è possibile estendere direttamente la classe java.lang.Thread. Ciò rende possibile definire una serie di thread, la cui esecuzione compete con quella del metodo principale del programma. La JVM, naturalmente, funziona sempre in modalità MT: oltre all'applicazione utente esistono altri thread (daemon), come per esempio il famoso garbage collector, il gestore della coda degli eventi grafici, etc.

Nelle tipiche implementazioni MT, i diversi thread competono per aggiudicarsi le risorse di cui hanno bisogno, incluse aree di memoria (heap) condivise e quindi è necessario porre molta attenzione a problemi di sincronizzazione, di condivisione delle risorse, etc. Queste problematiche, che rappresentano una delle sfide principali nel disegno e implementazione di programmi MT, tradizionalmente, potevano essere risolte utilizzando apposite parole chiave (come: synchronized e volatile) e metodi (quali, per esempio, le primitive: wait(), notify(), notifyAll(), contenuti nella classe antenata (java.lang.Object) da cui ereditano tutte le altre.

La difficile pratica del multi-threading

Nonostante la presenza di meccanismi "nativi" per il supporto alla programmazione MT, la pratica insegna come la progettazione e realizzazione di applicazioni Java concorrenti, scalabili e ad alte prestazioni, sia un'attività tutt'altro che semplice, in cui, frequentemente, gli sviluppatori finiscono per insabbiarsi e non sempre solo a causa delle loro mancanze. La programmazione MT introduce nuove problematiche nel disegno delle applicazioni ed enfatizza quelle esistenti. Il disegno di avanzate applicazioni MT richiede uno studio attento a fattori quali accesso a risorse condivise, comunicazione tra diversi flussi di esecuzione, performance, throughtput, parsimonioso utilizzo della memoria e delle risorse in generale, minimizzazione degli oggetti utilizzati, e così via. Inoltre, la progettazione di applicazioni MT Java, basate esclusivamente sulle primitive succitate, non è per nulla immediata, anzi è spesso e volentieri macchinosa al punto di indurre gli sviluppatori a cadere in una serie di insidie. Il problema di fondo è che quando si progettano complesse applicazioni concorrenti, scalabili ad alte prestazioni, raramente si tende a pensare il sistema in termini di singoli thread e di metodi quali wait() e notify(), primitive probabilmente troppo a basso livello. Risulta più naturale ragionare in termini di pool di thread, semafori, mutex e barriere. Logica conseguenza è che, in passato, il disegno di avanzate applicazioni MT Java ha richiesto lo sviluppo preliminare e ad hoc di complicati framework, atti a fornire un elevato livello di astrazione. L'obiettivo consisteva nel realizzare blocchi base, più o meno complessi, per il supporto della programmazione concorrente, con i soliti problemi derivati dalla cronica carenza di tempo: framework mai completamente compiuti, non sempre flessibili e riutilizzabili, faticosamente scalabili, carenti di documentazione, e così via.

Un passo in avanti per il multi-threading

La situazione è drasticamente cambiata grazie all'avvento del J2SE 5.0 (30 settembre del 2004 con la JSR 176, cfr. [2]) ed in particolare con l'introduzione del package della concorrenza (java.util.concurrent). Il disegno di questo framework si deve essenzialmente a Doug Lea, professore universitario della State University of New York Oswego (lo stesso che ha dato luogo al framework delle collection), il quale, in tre anni di lavoro, è riuscito a mettere a punto un brillante framework Java per il supporto alla programmazione MT. Tale framework ha dimostrato livelli di performance e scalabilità così elevati, da meritare l'inclusione nella piattaforma Java 2 versione 5.0 ("Tiger"). Lo stesso Doug Lea, a cui si deve anche il framework delle collections Java, ha assolto il ruolo di specification leader della JSR 166.

Il package è costituito da una serie di classi che forniscono le fondamenta per la soluzione di un insieme di problemi appartenenti al particolare dominio delle applicazioni MT. Soluzioni concrete, ossia specifiche applicazioni MT, possono essere codificate implementando ben definite interfacce e/o estendendo specifiche classi e quindi combinando queste nuove classi con quelle esistenti. I principali meccanismi introdotti con il package della concorrenza sono:

  • variabili atomiche
  • schedulazione e gestione dell'esecuzione di task
  • strutture di dati "coda" e code bloccanti
  • nuove collezioni concorrenti (ottimizzate per il MT)
  • blocchi (locks)
  • sincronizzatori (semafori, barriere e variabili atomiche).

La libreria della concorrenza è organizzata in tre package: java.util.concurrent e i due sotto-package locks e atomic.

Gli elementi del package della concorrenza (java.util.concurrent)

Vediamo di ricapitolare le caratteristiche dei vari elementi del package della concorrenza, introdotti a partire dalla J2SE 5.0.

Variabili atomiche (atomic variables).

Sono un insieme di classi (AtomicBoolean, AtomicInteger, AtomicIntegerArray, etc.) che permettono il trattamento atomico (non interrompibile) e, preferibilmente, non bloccante (il cui conseguimento dipende dai servizi offerti dalla piattaforma di esecuzione) delle variabili incapsulate. Si tratta di un concetto simile alle classi Java "wrapper" che incapsulano le variabili primitive Java (Boolean, Integer, etc.). In questo caso tuttavia l'obiettivo è disporre di classi ottimizzate per l'utilizzo in ambienti MT, e quindi in grado di offrire performance superiori a quelle che si otterrebbero con il ricorso al costrutto bloccante synchronized, grazie all'implementazione basata sulle primitive "compare and swap". Queste nuove istruzioni del tipo

boolean compareAndSet(expectedValue, updateValue)

corrispondono all'implementazione, a basso livello, della strategia dell'optmistic locking: l'aggiornamento di un attributo avviene solo se il valore del parametro expectedValue coincide con quello reale. Qualora ciò si verifichi, il metodo ritorna il valore true, altrimenti false.

Locks e Conditions

Come visto in precedenza, tradizionalmente il linguaggio Java gestiva i lock in maniera del tutto trasparente. Con l'introduzione del package della concorrenza si è introdotta la possibilità di una gestione esplicita. Gli strumenti lock ("serrature di chiusura") forniscono avanzati meccanismi per controllare l'accesso a risorse condivise (logiche e/o fisiche) da parte di diversi thread. Si tratta di una sofisticata alternativa al costrutto Java synchronized, la cui gestione basata sul singolo monitor raramente risulta essere ottimale in ambienti MT sia per problemi di performance, sia di limitato controllo. Il meccanismo dei lock, oltre a presentare migliori prestazioni, fornisce una serie di utilissimi servizi aggiuntivi, quali:

  • la possibilità di interrompere un thread in attesa di uno specifico lock;
  • la dichiarazione del tempo massimo di attesa di un thread per uno specifico lock;
  • la generazione di insiemi di meccanismi "wait-notify" (chiamati condition) associati a uno stesso lock.

Le classi del package della concorrenza, utilizzano sistematicamente il meccanismo dei lock e in particolare la classe ReentrantLock, al posto del costrutto synchronized. L'unico effetto collaterale dovuto all'utilizzo di questo meccanismo consiste nel dover codificare esplicitamente l'acquisizione (myLock.lock()) e il rilascio (myLock.unlock()) dei lock che, ovviamente, richiede la presenza del costrutto try...finally onde evitare l'insorgere di situazioni di dead-lock in situazione di eccezione.

Sincronizzatori (semaphore, barrier, latch e Exchanger).

Tutti coloro che hanno studiato la programmazione concorrente ricorderanno il concetto dei semafori di Dijkstra. Questi sono stati (finalmente) introdotti con il package java.util.concurrent, attraverso la classe Semaphore. Si tratta di un ennesimo meccanismo utilizzato per limitare l'accesso a risorse condivise (tipicamente a un pool), logiche e/o fisiche che siano, da parte di un predefinito numero di thread. I semafori, detti anche semafori contatori, consistono in un insieme di permessi che possono essere ottenuti (acquire()) finchè c'è ne è uno disponibile (il contatore interno è maggiore di 0). La relativa restituzione (release()) genera l'incremento del contatore e, potenzialmente, l'uscita dallo stato di attesa da parte di un thread che precedentemente aveva richiesto di eseguire l'acquisizione di un permesso. Il concetto della barriera, introdotto per mezzo della classe CyclicBarrier, fornisce un meccanismo molto conveniente per la sincronizzazione di un insieme di thread. In particolare, come suggerisce il nome, offre la possibilità di specificare nel codice dei punti (myBarrier.await()) in cui i vari thread sono forzati ad attendere che i restanti raggiungano quel medesimo punto prima di poter proseguire oltre. Se, per qualche ragione, un thread in attesa lascia prematuramente la barriera stessa (per esempio scade il relativo tempo massimo di attesa, timeout), l'oggetto barriera transita in uno stato di rottura (broken) che forza i restanti a procedere oltre. Il nome ciclico (cyclic) deriva dal fatto che un medesimo oggetto barriera può essere utilizzato diverse volte tramite l'invocazione del metodo reset(). Anche il meccanismo delle barriere è utile per decomporre calcoli particolarmente complessi in un insieme di sottocalcoli di complessità inferiore (disegno "divide et impera").

Altro meccanismo simile alla barriera è costituito dalla classe CountDownLatch ("serratura a scatto a decremento"), la quale fa sì che un thread rimanga in attesa dei risultati forniti da opportune operazioni la cui completa esecuzione è assegnata ad altri thread. Istanze di questo tipo prevedono come parametro del metodo costruttore un valore di inizializzazione del contatore interno. Il corrispondente metodo di attesa (myLatch.await()) blocca il thread chiamante finchè il contatore interno raggiunge il valore 0. Il decremento avviene quando gli altri thread, ai quali va fornito il riferimento dell'istanza CountDownLatch (in questo caso myLatch), invocano esplicitamente l'operazione di decremento (myLatch.countdown()). L'ultima classe presente, appartenente a questa famiglia è Exchanger: si tratta di un potente meccanismo che consente di definire punti di incontro (rendezvous) tra coppie di thread, con esplicito scambio di dati. In particolare, la primitiva exchange, fa sì che il thread invocante attenda, sia bloccato nell'esecuzione del metodo, finchè un altro thread giunga al medesimo punto e quindi fornisca il riferimento all'oggetto di scambio dichiarato. Anche questo metodo prevede la possibilità di definire il tempo massimo di attesa di un thread.

Strutture dati (Collection) MT

Quantunque le collezioni standard Java siano thread-safe, nativamente quelle di Java 1 (Hashtable, Vector, etc.) o su richiesta quelle introdotte con Java 2 (Collections.synchronizedMap(), Collections.synchronizedList(), etc.), queste non sempre risultano particolarmente efficienti e scalabili per applicazioni MT. Il relativo utilizzo in questi ambienti finisce frequentemente per generare odiosi colli di bottiglia, dovuti principalmente alla strategia del singolo monitor. Il nuovo package della concorrenza risolve questa limitazione fornendo nuove classi che implementano strutture dati di tipo lista, set e hashtable, rispettivamente CopyOnWriteArrayList, CopyOnWriteArraySet e ConcurrentHashMap, disegnate specificatamente per ambienti MT. Quindi presentano comportamenti ottimizzati per scenari in cui diversi thread, "contemporaneamente", necessitano di leggere e/o scrivere su istanze di tali classi.

Altra variazione importante consiste nella definizione dell'interfaccia Queue, correttamente inserita nel package java.util, disegnata per definire il comportamento di oggetti code. L'interfaccia Queue, come lecito attendersi, è stata disegnata per un utilizzo generale e quindi non contiene metodi specifici per il supporto al MT. Questi, necessari per la soluzione di scenari del tipo produttore/consumatore, sono invece presenti nell'interfaccia BlockingQueue che presenta tutta una serie di implementazioni come ArrayBlockingQueue, DelayQueue, LinkedBlockingQueue, PriorityBlockingQueue e SynchronousQueue inserite nel package java.util.concurrent.

Executor e lo scheduling ed esecuzione di task.

La stragrande maggioranza di applicazioni MT richiedono l'implementazione di pool di thread e la presenza di soluzioni a problematiche quali lo scheduling dei task e il controllo della relativa evoluzione. Il ricorso a pool di thread è un'ottima strategia sia per migliorare le performance delle applicazioni MT (disponendo di thread pronti ad essere utilizzati, si elimina la latenza richiesta dalla creazione di un nuovo thread), sia per controllarne la quantità, onde evitare che la presenza di un numero eccessivo di thread finisca per generare l'effetto contrario: la diminuzione delle performance. Il pool di thread, inoltre, tende a semplificare il problema dell'assegnazione delle risorse ai task: una strategia spesso utilizzata consiste nel pre-assegnare ai thread, all'atto della relativa creazione, le risorse necessarie per l'esecuzione dei task. Le interfacce e classi che implementano i meccanismi base di quest'area sono:

  • esecutori (Executor, ExecutorService, ScheduledExecutorService, etc.);
  • esecutori basati su pool di thread (ScheduledThreadPoolExecutor e ThreadPoolExecutor);
  • operazioni a termine (Future e FutureTask).

In particolare, il framework degli esecutori fornisce una serie di meccanismi decisamente flessibili per la gestione dei task. Questi sono rappresentati da classi che implementano l'interfaccia Runnable, che quindi definiscono il corpo del metodo run(), eseguibile da opportuni thread.

Il package dispone di meccanismi che permettono di effettuare l'esecuzione asincrona, oltre che ovviamente quella sincrona, la richiesta di informazioni, il controllo dell'esecuzione ed eventualmente la cancellazione di task, la cui gestione avviene in funzione di specifiche politiche, dichiarabili separatamente. Il package dispone poi di una serie di pool di thread assolutamente flessibili, i quali possono essere generati

  • direttamente tramite le classi ThreadPoolExecutor, ScheduledThreadPoolExectutor, la quale permette di gestire task i cui comandi possono essere eseguiti dopo un determinato intervallo di tempo e/o periodicamente;
  • utilizzando appositi meccanismi di factory (Executors.newCachedThreadPool(), Executors.newFixedThreadPool(int) e Executors.newSingleThreadExecutor()).

Le diverse implementazioni sono sempre isolate da opportune interfacce, le quali, tra l'altro permettono di definire meccanismi standard per la terminazione (shut-down) controllata degli esecutori e per la gestione della cancellazione di lavori non ancora eseguiti. La descrizione di quest'area del MT è però rinviata al prossimo articolo.

Le nuove classi concorrenti (Java SE 7)

Iniziamo dalla classe ThreadLocalRandom che da un punto di vista funzionale è assolutamente equivalente alla classe java.util.Radom (il che è abbastanza normale considerato che ThreadLocalRandom estende Radom). Pertanto permette di generare numeri (pseudo)randomici a partire dal seme (seed) inizializzato internamente all'atto della costruzione dell'oggetto. Da notare che sebbene ThreadLocalRandom disponga di un metodo per l'impostazione esplicita del seme (setSeed), la relativa invocazione scatena un'eccezione UnsupportedOperationException (il metodo è stato disegnato solo per l'inizializzazione gestita per altro dalla classe Random).

Per capire la ragione della nuova classe è sufficiente leggere la documentazione JavaDoc della classe Random (cfr. [4]): le istanze della classe Random sono threadsafe. Pertanto l'utilizzo della stessa istanza genera accessi contesi e quindi un degrado delle performance. Quindi ThreadLocalRandom è stata implementata per creare un generatore di numero randomici confinata al corrente thread in esecuzione. Ciò, per esempio, è particolarmente utile quando una serie di task hanno bisogno di utilizzare dei numeri randomici concorrentemente. In tale contesto, la nuova classe permette di eliminare il collo di bottiglia dovuto alla contesa e quindi di migliorare le performance. L'utilizzo di questa classe prevede il seguente codice:

ThreadLocalRandom.current().nextInt()

Da notare che è sufficiente invocare il metodo statico current() per poi poter richiedere il tipo di numero randomico richiesto: nextInt, nextLong e nextDouble. Questi metodi prevedono due forme: la prima che prevede come unico parametro il limite superiore e quindi i numeri sono generati da zero incluso al numero specificato escluso; e la seconda che invece richiede di specificare l'intervallo. In questo caso i numeri sono generati dal limite inferiore incluso a quello superiore escluso. Ecco di seguito un semplice esempio di utilizzo di alcuni metodi della class ThreadLocalRandom:

public class ThreadRandomSample {
       public static void main(String[] args) {
            
             System.out.println(ThreadLocalRandom.current().nextInt(100));
             System.out.println(ThreadLocalRandom.current().nextInt(20,40));
 
             System.out.println(ThreadLocalRandom.current().nextDouble(100.00));
             System.out.println(ThreadLocalRandom.current().nextDouble(20.00, 40.00));
 
             System.out.println(ThreadLocalRandom.current().nextLong(100));
             System.out.println(ThreadLocalRandom.current().nextLong(20,40));
 
            
             System.out.println(ThreadLocalRandom.current().nextGaussian());
       }
}

Nuova coda bloccante

Java SE 7 introduce una nuova coda bloccante denominata LinkedTransferQueue che implementa la nuova interfaccia TransferQueue che, a sua volta, estende la classe BlockingQueue. Si tratta di una nuova feature disegnata esplicitamente per fornire un validissimo supporto a problemi del tipo produttore/consumatore dove il produttore resta bloccato nell'operazione di inserimento di un nuovo elemento nella coda ("put") fino quando il consumatore effettivamente consuma l'elemento. Si tratta quindi di un ulteriore passo in avanti giacchè le implementazioni precedenti si limitavano a bloccare il produttore fino all'effettivo inserimento dell'elemento nella coda. Il nuovo comportamento è enfatizzato dal nome dei metodi della nuova interfaccia chiamati appunto "trasferimento": transfer(E e) e tryTransfer(E e), proprio a enfatizzare il concetto che il passaggio è completato solo quando un thread produttore passa l'oggetto a un thread consumatore e quindi avviene il trasferimento. Da notare che tryTransfer è la versione non bloccante del metodo transfer, dove il metodo è eseguito solo se il passaggio può essere eseguito immediatamente (ciò significa che esiste un thread consumatore pronto) o entro il timeout specificato.

Per essere precisi, questa coda non è totalmente differente da altre e in particolare dalla SynchronousQueue (Java 1.5), la cui nuova implementazione utilizza l'API transfer, che è una coda di dimensione 0. Interrogato circa le motivazione che hanno spinto a realizzare questa nuova feature visto che già esistono diverse implementazioni di code, Doug Lea ha risposto (cfr. [5]) che esiste una risposta molto semplice. In realtà avrebbero dovuto dar luogo a questo disegno direttamente nella versione 1.5 piuttosto che dar luogo alle altre che ora generano una certa confusione. Tuttavia a quei tempi non avevano ancora maturato la visione attuale che si è formata con il senno di poi. Comunque è il classico caso del "meglio tardi che mai". Le feature offerte da questa nuova coda sono un superset di quelle fornite da ConcurrentLinkedQueue, SynchronousQueue (modalità "fair"), e LinkedBlockingQueues. Si tratta di una migliore implementazione non soltanto perchè è possibile utilizzare le diverse feature contemporaneamente (put, poll, transfer, etc.), ma anche perchè il disegno è migliore e decisamente più efficiente. Un articolo pubblicato da William Scherer, Doug Lea e Michael Scott (cfr. [6]) mostra chiaramente che le performance offerte dalla nuova classe LinkedTransferQueue sono significatamente migliori di quelle offerte dalle precedenti implementazioni. Vediamo quindi subito un esempio di LinkedTransferQueue:.

public class LinkedTransferQueueSample {
       // ------------------------ INNER CLASSES ------------------------
      
       /**
        * Simple producer
        */
       public static class SimpleProducer implements Runnable {
            
             TransferQueue transfer = null;
            
             public SimpleProducer(TransferQueue transfer) {
                    this.transfer = transfer;
             }
            
             @Override
             public void run() {
                    for(int ind=0; ind<10; ind++) {
                           System.out.println("Thread"+Thread.currentThread().getId()+
                                               " Produced: Element_"+ind);
                           try {
                                  transfer.transfer("Element_"+ind); 
                                            //Wait for the consumer
                           } catch (InterruptedException e) {
                                  System.out.println("Ignore exception:"+e.getMessage());
                           }
                    }
                   
                    try {
                           Thread.sleep(100);
                    } catch (InterruptedException e) {
                           System.out.println("Ignore exception:"+e.getMessage());
                    }
                   
                    System.out.println("Thread"+Thread.currentThread().getId()+
                                                  "Try to transfer ‘Extra element'");
                    transfer.tryTransfer("Extra element"); 
                    System.out.println("Thread"+Thread.currentThread().getId()+
                                            "Transferered ‘Extra element'");
             }
            
       }
      
      
       /**
        * Simple consumer.
        */
       public static class SimpleConsumer implements Runnable {
            
             TransferQueue transfer = null;
            
             public SimpleConsumer(TransferQueue transfer) {
                    this.transfer = transfer;
             }
 
             @Override
             public void run() {
                    for(int ind=0; ind<11; ind++) {
                          
                           try {
                                  String element = transfer.take();
                                  System.out.println("Thread"
                                                         +Thread.currentThread().getId()
                                                         +" Consumed: "+element);
                           } catch (InterruptedException e) {
                                  System.out.println("Ignore exception:"+e.getMessage());
                           }
                    }
             }
            
       }
      
      
       // Main method
       public static void main(String[] args) throws InterruptedException {
             TransferQueue transfer = new LinkedTransferQueue();
             SimpleProducer aProducer = new SimpleProducer(transfer);
             Thread treadProducer = new Thread(aProducer);
             treadProducer.start();
            
             try {
                    Thread.sleep(1_000);  // wait for a second
             } catch (InterruptedException e) {
                    System.out.println("Ignore exception:"+e.getMessage());
             }
            
             SimpleConsumer aConsumer = new SimpleConsumer(transfer);
             Thread treadConsumer = new Thread(aConsumer);
             treadConsumer.start();
       }
} // --- end of class ---

Il listato sopra mostra una semplice implementazione del pattern producer/consumer con un solo thread che genera i dati da consumare e un solo thread che si occupa di acquisire tali dati. Dall'esecuzione del metodo main si osserva che il producer tenta di iniziare a produrre le informazioni, più precisamente di trasferirle (transfer.transfer("Element_"+ind);) ma viene immediatamente bloccato in quanto non ci sono ancora consumer. Il codice del main in effetti introduce un ritardo artificioso di 1 sec, prima di avviare i consumer, proprio per evidenziare la natura bloccante del metodo. Per finire, il producer, alla fine del suo ciclo di produzione dati, ne produce uno finale ("Extra element") il cui transferimento avviene per mezzo dell'istruzione non bloccante transfer.tryTransfer("Extra element");

Nuova strategia di collaborazione

Come vedremo tra un po', nella parte dedicata al FFJ, la più importante delle nuove feature introdotte con Java SE 7 nel package della concorrenza è probabilmente il framework fork/join. Questo introduce una nuova strategia di collaborazione tra thread presenti nel medesimo pool, nota con il nome di work stealing la cui implementazione è basta su una struttura dati denominata deque. Per la precisione l'interfaccia Deque è stata introdotta con Java SE 6 mentre ciò che è stato aggiunto in Java 7 è l'implementazione ConcurrentLinkedDeque. Come verrà descritto nei paragrafi successivi, la peculiarità di questa struttura dati è di far convivere un funzionamento FIFO (First In, First Out) tipico delle code, evidenziato dai metodi addLast(e), offerLast(e), removeFirst(), pollFirst(), getFirst(), peekFirst() con una strategia LIFO (Last In First Out) tipico delle pile, evidenziato dai metodi addFirst(e), removeFirst(), peekFirst(). Questo doppio funzionamento è utilizzato per far in modo che un Thread possa eseguire il push/pop dei propri task da eseguire mentre gli altri Thread possano "rubare" i lavori che da più tempo risiedono nella struttura dati. Maggiori informazioni sono riportate di seguito nel contesto del FFJ.

Barriere

La versione originale del package della concorrenza include due interessanti implementazioni del concetto di barriera: CyclicBarrier e CountDownLatch. Sebbene si tratti di due implementazioni molto interessanti, il loro utilizzo non sempre fornisce un elevato livello di flessibilità e una API altrettanto ricca e potente. Per risolvere queste limitazioni, Java SE 7 fornisce una nuova classe java.util.concurrent.Phaser ("fasatore", in italiano). Si tratta di un valido strumento per implementare complessi processi paralleli che richiedono ai vari thread coinvolti di sincronizzarsi in diversi punti. Le parti fondamentali di questa classe/framework sono riportate di seguito.

Registrazione

La registrazione rappresenta un primo importante livello di flessibilità. Infatti, a differenza delle altre implementazioni del concetto di barriera, le istanze Phaser rendono possibile variare nel tempo (dinamicamente) i soggetti interessati alla sincronizzazione. I partecipanti (questi sono tipicamente, ma non necessariamente, Thread) possono essere registrati in qualsiasi momento attraverso i metodi register(), bulkRegister(int), o attraverso le apposite varianti del metodo costruttore. In maniera simmetrica, i vari soggetti possono rimuovere la propria registrazione in qualsiasi momento dell'arrivo utilizzando il metodo arriveAndDeregister(). Da notare che oggetti Phaser gestiscono tre contatori: il numero di soggetti registrati, il numero di soggetti arrivati e quello dei soggetti attesi.

Sincronizzazione

Come anche nel caso della classe CyclicBarrier, anche un'istanza Phaser può essere utilizzata per gestire diverse sincronizzazioni. Il metodo arriveAndAwaitAdvance(), segnala l'arrivo al Phaser da parte di un soggeto e attende l'arrivo degli altri, ha lo stesso effetto del metodo CyclicBarrier.await. Ogni istanza Phaser (e ogni rigenerazione di uno stesso Phaser) ha associato un numero di fase che inizia da zero e viene incrementato ogni volta che i vari soggetti arrivano al Phaser (ricomincia poi da zero quando raggiuge Integer.MAX_VALUE). La sincronizzazione prevede due parti fondamentali:

  • Arrivo. I metodi arrive() e arriveAndDeregister() permettono ai soggetti di notificare il proprio arrivo. Si tratta di metodi non bloccanti che restituiscono il numero di arrivo della fase: il numero della fase del Phaser per cui l'arrivo è applicato. Quando l'ultimo soggetto per una determinata fase arriva, viene eseguita un'azione opzionale di pre-avanzamento fase. Tali azioni sono eseguite dal soggetto che innesca l'avanzamento di fase attraverso il metodo onAdvance(int, int), e, come descritto di seguito, controllano anche la terminazione. L'override di questo metodo è simile alla fornitura di un'azione-barriera ad un'istanza CyclicBarrier, con il vantaggio di essere molto più flessibile.
  • Attesa. Il metodo awaitAdvance(int) richiede un argomento che indica il numero arrivo fase (questo numero è tipicamente quello restituito dai metodi arrive o arriveAndDeregister), e termina quando il Phaser avanza o è già in una fase diversa. A differenza di soluzioni basate sulla classe CyclicBarrier, il metodo awaitAdvance continua ad attendere, anche qualora il thread in attesa venga interrotto. Questo metodo prevede anche le versioni interrompibili (awaitAdvanceInterruptibly(int phase)) e dotate di timeout (awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)), tuttavia eccezioni generate mentre i task si trovano in attesa non cambiano lo stato del Phaser. Se necessario, è possibile eseguire operazioni di recovery del blocco di codice di gestione dell'eccezione, spesso dopo aver richiesto il forceTermination. I Phaser possono essere utilizzati anche da task in esecuzione in un ForkJoinPool, allo scopo di assciurare un livello sufficiente di parallelismo per eseguire alcuni task mentre gli altri sono bloccati in attesa dell'avanzamento di una fase.

Termination

Un Phaser può entrare in uno stato di terminazione: in tal caso il metodo isTerminated() restituisce un valore true. Quando ciò avviene tutti metodi di sincronizzazione terminano immediatamente, sbloccando eventuali Thread in attesa. Per segnalare questa situazione, restituiscono un valore negativo. Ulteriori tentativi di registrazione, come è lecito attendersi, non generano alcun effetto. Il processo di terminazione è attivato quando una chiamata al metodo protetto onAdvance() restituisce il valore true. Questo metodo è invocato automaticamente quando il Phaser si accinge a lasciare la fase corrente. L'implementazione predefinita restituisce true se il numero dei soggetti registrati diviene zero. Questo metodo dovrebbe essere sovrascritto (override), nelle situazioni in cui il Phaser debba eseguire un numero fisso di iterazioni. Per finire, è sempre possibile invocare il metodo forceTermination() al fine di provocare una brusca terminazione del Phaser con immediato rilascio dei thread in attesa.

Tiering

Al fine di migliorare il throughtput riducendo le contese, i Phaser possono essere organizzati in strutture ad albero (in questo caso si dice che sono "tiered", stratificati). Questa organizzazione presenta importanti vantaggi nelle situazioni di Phaser con un gran numero di soggetti registrati che, inevitabilmente, finirebbero per risentire di notevoli riduzioni delle performance per via delle contese di sincronizzazione. In tal caso è opportuno organizzarne la struttura in gruppi di sub-Phaser che condividano un medesimo genitore. Questa strategia ha la potenzialità di aumentare notevolmente il throughput a spese di un maggiore overhead sulle singole operazioni. In un albero di Phaser stratificati, la registrazione e la cancellazione di un Phaser child con il rispettivo genitore avviene automaticamente. Ogni volta che il numero di soggetti iscritti ad un Phaser child diventa non-zero, il Phaser child viene automaticamente registrato con il relativo genitore. Analogamente, ogni volta che il numero di soggetti iscritti diventa pari a zero, come il risultato di una chiamata di arriveAndDeregister(), il Phaser child è cancellato dal suo genitore.

Monitoring

Mentre i metodi di sincronizzazione possono essere invocati soltanto dai soggetti registrati, tutti possono monitorare lo stato del Phaser utilizzando metodi quali getRegisteredParties() che permette di ottenere il numero dei soggetti registrati, getArrivedParties() che permette di ottenere il numero dei soggetti arrivati alla fase corrente (questo metodo è dotato del simmetrico getUnarrivedParties()), getPhase() restituisce il numero di fase corrente. Oltre a questi è sempre possibile ottenere una snapshot dello stato del Phaser per mezzo dell'intramontabile metodo toString(). Da notare che trattandosi di un sistema MT, i valori restituiti da questi metodi possono riflettere stati transitori.

Vantaggi della classe Phaser

Ricapitolando, i vantaggi della classe Phaser, rispetto a precedenti implementazioni, possono essere riassunti nei seguenti punti:

  • gestione dinamica dei soggetti registrati ad una fase;
  • feature di terminazione;
  • maggiore flessibilità nella (ri)definizione del metodo (onAdvance) da invocare prima dell'avanzamento della fase;
  • possibilità di organizzazioni oggetti Phaser in strutture ad albero;
  • può funzionare con il FFJ.

Ed ecco di seguito un semplice esempio di Phaser.

public class PhaserExample {
       // ------------------- CONSTANTS SECTION -----------------
       /** logger */
       private static final Logger LOGGER = Logger.getLogger(PhaserExample.class);
      
       // ------------------- ATTRIBUTES SECTION ----------------
      
       // ------------------- METHODS SECTION -------------------
      
       private static final int NUM_THREADS = 4;
      
       /**
        * Creates the required number of tasks.
        * Assign each of them to a new thread
        * and start the thread
        */
       public static void createAndRunTasks() {
            
             Phaser Phaser = new Phaser();
            
             for (int i = 0; i < NUM_THREADS; i++) {
                    Task aTask = new Task(Phaser); // create a new task
                    new Thread(aTask).start(); // new Thread to execute the task
             }
       }
      
       /**
        * Inner class that defines the runnable task
        */
       static class Task implements Runnable {
            
             private final Phaser aPhaser;
            
             public Task(Phaser givenPhaser) {
                    aPhaser = givenPhaser;
             }
            
             @Override
             public void run() {
            
                    aPhaser.register();
                   
                    execNextPhase("Phase A");
                   
                    execNextPhase("Phase B");
                   
                    // at some point the task de-register itself from the Phaser
                    int newArrivalPhase = aPhaser.arriveAndDeregister();
                    dumpStatus(aPhaser, "Task deregistered. Returned:"
                                            + newArrivalPhase);
             }
            
             /**
              * Pretend to execute the work required by the next phase
              * @param phaseName  phase logical name
              */
             private void execNextPhase(String phaseName) {
            
                    boolean last = false;
                    pretendToDoSomeWork();
                   
                    String txt = " Completed phase.....";
                    if (aPhaser.getUnarrivedParties() == 1) {
                            txt =" Last thread arrived.";
                            last = true;
                    }
                    dumpStatus(aPhaser, phaseName + txt);
                   
                    int arrivalPhase = aPhaser.arrive();
                   
                    // let's synch all thread
                    dumpStatus(aPhaser, phaseName + " Await to advance....");
                    aPhaser.awaitAdvance(arrivalPhase);
                    dumpStatus(aPhaser, phaseName + " Advance next phase.");
                   
                    if (last) {
                           LOGGER.info("-------------------------------------------");
                    }
             }
            
             /**
              * Print out the current thread the given text and few data
              * about the given Phaser status
               *
              * @param aPhaser
              * @param txt
              */
             private void dumpStatus(Phaser aPhaser, String txt) {
            
                    String threadId = Thread.currentThread().getId() + "";
                    if (Thread.currentThread().getId() < 10) {
                           threadId = "0" + threadId;
                    }
                   
                    LOGGER.info(
                                  txt+"	"+
                                  " Thread :" + threadId+
                                  " Phase: " + aPhaser.getPhase() + " reg.:"+
                                  aPhaser.getRegisteredParties() + " arrived:"+
                                  aPhaser.getArrivedParties() + " unarrived:"+
                                  aPhaser.getUnarrivedParties());
             }
            
             /**
              * Pretend to do some work... But it just waits for 1/2 sec
              */
             public void pretendToDoSomeWork() {
            
                    for (int ind = 0; ind < 100_000; ind++) {
                           String a = ind + "";
                          
                           if (a.equalsIgnoreCase("100000")) {
                                  a = null;
                           }
                    }
             }
       }
      
       /**
        * Poor way to test classes!
        *
        * @param args
        */
       public static void main(String[] args) {
             PhaserExample.createAndRunTasks();
       }
      
}  // --- end of class ---

L'esecuzione del codice precedente genera un output come quello riportato di seguito:

Phase A Completed phase.....       Thread :10 Phase: 0 reg.:4 arrived:0 unarrived:4
Phase A Await to advance....       Thread :10 Phase: 0 reg.:4 arrived:1 unarrived:3
Phase A Completed phase.....       Thread :08 Phase: 0 reg.:4 arrived:0 unarrived:4
Phase A Await to advance....       Thread :08 Phase: 0 reg.:4 arrived:2 unarrived:2
Phase A Completed phase.....       Thread :09 Phase: 0 reg.:4 arrived:2 unarrived:2
Phase A Await to advance....       Thread :09 Phase: 0 reg.:4 arrived:3 unarrived:1
Phase A Last thread arrived.       Thread :11 Phase: 0 reg.:4 arrived:3 unarrived:1
Phase A Await to advance....       Thread :11 Phase: 1 reg.:4 arrived:0 unarrived:4
Phase A Advance next phase.        Thread :09 Phase: 1 reg.:4 arrived:0 unarrived:4
Phase A Advance next phase.        Thread :11 Phase: 1 reg.:4 arrived:0 unarrived:4
-----------------------------------------------
Phase A Advance next phase.        Thread :10 Phase: 1 reg.:4 arrived:0 unarrived:4
Phase A Advance next phase.        Thread :08 Phase: 1 reg.:4 arrived:0 unarrived:4

Una rapida disamina

Vediamo di analizzare il comportamento. Prima però è necessario ricordare che l'ouput generato è il risultato di un programma MT, pertanto, non deterministico. Quindi, quando i Thread non sono in attesa di sincronizzazione, non è possibile sapere quale sarà in esecuzione, quale in attesa, etc. Sebbene si sia cercato di prendere delle contromisure (e.g. emettere l'output prima di sbloccare la barriera, evitare Thread.sleep, etc.) l'output spesso può presentare un interlacciamento con qualche differanza tra quanto atteso qualora il programma fosse rigidamente sequenziale.

Il main esegue il metodo createAndRunTasks, che appunto si occua di creare una serie di oggetti Task ai quale viene fornito come parametro del costruttore la reference al medesimo oggetto Phaser. Lo stesso metodo assegna ciascuna istanza Task a un Thread dedicato generato a tal scopo. Una volta avviato il Thread (invocazione del metodo start), questo esegue il metodo run del Task che esegue alcune semplici azioni, come registrarsi presso l'oggetto Phaser (register) e quindi eseguire due fasi: "Phase A" e "Phase B". Ciascun thread, per ogni fase esegue i seguenti passi:

  • pretende di compiere qualche lavoro (pretendToDoSomeWork)
  • segnala di essere arrivato alla fase (in arrivalPhase = aPhaser.arrive()).
  • si pone in stato di attesa per la determinata fase (aPhaser.awaitAdvance(arrivalPhase)). In altre parole attende l'arrivo dei restanti Thread registrati per la medesima fase.

Da notare che mentre aPhaser.arrive() è un'operazione non bloccante, aPhaser.awaitAdvance() lo è. Questo è evidente anche dall'output: i vari Thread segnalano di aver completato la fase e poi di porsi in stato di attesa (testo: "Completed phase..." e " Await to advance..."). Poi però attendono prima di poter procedere alla fase successiva ("Advance next phase").

Il framework fork-join

Il framework fork-join è indubbiamente la feature più interessante introdotta nel package della concorrenza con Java SE 7 il cui core è la classe ForkJoinPool, che specializza la classe astratta AbstractExecutorService per l'esecuzione di task di tipo ForkJoinTask. Questi sono istanze della relativa classe astratta ForkJoinTask che implementa le interfacce Serializable e Future e rappresenta una sorta di "Thread" dal punto di vista della possibilità di eseguire task asincroni e paralleli. Tuttavia, le istanze sono molto più leggere dei thread in quanto un gran numero di task e sub-task possono essere eseguiti da un ridotto numero di thread presenti in un'istanza ForkJoinPool.

Un oggetto ForkJoinTask inizia la propria esecuzione dopo essere stato sottoposto a un ForkJoinPool (invocazione di uno dei metodi execute, invoke, submit, etc. come dettagliato nella tabella 1). Una volta avviato, il comportamento tipico prevede la creazione a sua volta di sotto attività (comportamento tipico degli algoritmi divide and conquer, figura 1). Come indicato dallo stesso nome di questa classe, molti programmi che utilizzano ForkJoinTask ricorrono all'utilizzo dei soli metodi di fork() e join() (o derivati come invokeAll). Questa classe fornisce anche un certo numero di altri metodi molto utili per utilizzi avanzati e richiesti per l'inserimento nella gerarchia dei Future (gli oggetti ForkJoinTask sono considerati versioni leggere di Future).

 

 

 

Figura 1 - Comportamento tipico di un algoritmo fork/join.

 

L'efficienza delle istanze ForkJoinTasks è ottenuta per mezzo di una serie di restrizioni (spesso solo informali e quindi solo parzialmente applicabili staticamente) che ne riflettono la destinazione d'uso, come per esempio utilizzo per compiti computazionali di puro calcolo e esecuzione di specifici algoritmi su oggetti isolati (non condivisi). I meccanismi primari di coordinamento sono dati dal metodo fork(), che organizza l'esecuzione asincrona, e join(), che blocca l'esecuzione fino al termine dell'elaborazione del risultato dell'attività. Il comporamento richiesto prevede che i calcoli nei vari oggetti non necessitino di sincronizzazioni/blocchi e comunicazioni esplicite oltre quelle richieste dal FFJ. Inoltre i vari task non dovrebbero eseguire operazioni di I/O bloccanti, e dovrebbero accedere (idealmente) alle sole variabili che sono del tutto indipendenti da quelli accessibili da altri processi in esecuzione. Come è lecito attendersi, violazioni minori di queste restrizioni non sono un problema. Per esempio, non è grave far sì che diversi task accedano contemporaneamente in sola lettura ad una medesima struttura dati o che utilizzino stream in uscita condivisi. Tuttavia è importante tener presente che il FFJ non è stato pensato per utilizzi che richiedono continuo coordinamento dei vari Thread, accessi contesi, etc. che riducono le prestazioni e possono generare potenziali situazione di stallo a tempo indeterminato. Questa restrizione di utilizzo può essere solo parzialmente forzata. Una strategia utilizzata per forzare questa limitazione consiste nel non consentire la generazione di eccezioni checked come IOException. Tuttavia, i task possono sempre generare eccezioni run-time, come RejectedExecutionException che viene scatenata qualora si assista ad un esaurimento delle risorse interne, come per esempio il fallimento dell'allocazione delle code dei task interni.

La classe ForkJoinTask raramente è utilizzata direttamente (eseguirne una specializzazione è pratica da ponderare accuratamente). Il caso decisamente più frequente e consigliato consiste nel realizzare un'apposita sottoclasse delle classi astratte:

  • RecursiveAction: da utilizzarsi per l'esecuzione di algoritmi che non restituiscono risultati;
  • RecursiveTask: da utilizzarsi per calcoli che generano un risultato (caso più frequente).

 

 


Tabella 1 - Tipi di invocazione ad un'istanza ForkJoinPool.

 

Gli oggetti di tipo ForkJoinTask supportano la creazione di sotto-task ed attendono il relativo completamento.

Il furto del lavoro

Il funzionamento del FFJ è una variante dello scheduler (basato sul principio del work stealing, "furto del lavoro") del linguaggio di programmazione Cilk ([7]): si tratta di un linguaggio di programmazione e di un ambiente run-time, studiato presso il MIT, disegnato appositamente per l'implementazione di algoritmi MT. La filosofia base del linguaggio è che un programmatore dovrebbe concentrarsi nello strutturare il proprio programma al fine di esporre il parallelismo e sfruttare località, lasciando al sistema runtime la responsabilità di schedulare il calcolo da eseguire in modo efficiente sulla piattaforma data. Il sistema Cilk runtime si prende cura dei dettagli come protocolli di bilanciamento del carico, la sincronizzazione e la comunicazione.

Come funziona FFJ

Il principio di funzionamento del FFJ è basato sull'organizzazione descritta di seguito.

  • Ogni thread mantiene i propri task da eseguire in una coda di pianificazione lavori dedicata.
  • Le code sono di tipo double-ended (interfaccia Deque estendente Queue), chiamate così perchè, come descritto in precedenza, supportano sia una sematica tipica delle code FIFO (first in first out, primo dentro, primo fuori), sia degli stack LIFO (Last In First Out, l'ultimo dentro è il primo fuori). Pertanto hanno due vie di uscita che ne caratterizzano il nome.
  • Le sottoattività generate dalle attività gestite da un determinato thread sono, tipicamente, inserite nella code double-ended dello stesso thread.
  • Ogni thread esegue i task nella propria coda attraverso una strategia a pila: il task più giovane è eseguito prima (il thread preleva i lavori da eseguire invocando un'operazione pop sulla coda).
  • Quando un thread non ha più attività da eseguire nella propria coda va in cerca di attività da eseguire ("rubare") nella coda di un thread del pool scelto casualmente. In questo caso tenta di prelevare il task che da più tempo è nella coda (regola FIFO).
  • Quando un thread incontra un'operazione di join, continua a elaborare gli altri task (se disponibili), fino a quando il task di destinazione è stato completato (il relativo metodo isDone restituisce il valore true). A parte l'operazione di join, tutti i task proseguono fino al completamento senza blocchi.
  • Quando poi un thread ha esaurito tutti i task dalla propria coda e non riesce a rubare un task da un'altra coda, si defila (esegue un'operazione, yield, sleep, etc., che che lo forza a lasciare lo stato di esecuzione), per riprovare in un secondo momento. Ciò a meno che tutti i thread si trovino in un medesimo stato di "idle". In qual caso, tutti i thread vanno in uno stato di idle finchè un nuovo task è richiesto al livello più altro.

La figura 2 mostra un Thread che esegue i task presenti nella propria coda (Deque). L'immagine mostra che il Thread preleva un task dalla propria coda (pop) e quindi, eseguendo un tipico algoritmo D&C, genera nuovi task (sub-task) che vengono inseriti nella stessa coda (push).

 

 

Figura 2 - Singolo Thread con la relativa Deque.

 

La figura 3 mostra due Thread appartenenti allo stesso pool. In particolare, mentre Thread 2 ha molti Task da eseguire, il Thread 1 non ne ha. Quindi, grazie al supporto del framework, ruba il lavoro più anziano presente nella coda del Thread 2 per eseguirlo. A tal fine vengono utilizzate le feature tipiche della coda rispetto a quelle della pila. L'utillizzo di diverse strategie di gestione delle stessa "coda" permette di evitare "conflitti" con il Thread 2.

 

 

Figura 3 - Una rappresentazione grafica della strategia work-stealing.

 

La tecnica del work stealing permette di creare scheduler leggeri ed estremamente efficienti che grazie all'oculata strategia di allocazione e prelevamento dei task permette, costruttivamente, di minimizzare contese. A questo punto dovrebbe essere chiaro il perchè è opportuno realizzare task che non necessitano ulteriori sincronizzazioni a parte quelle proprie del FFJ, proprio per non collidere con la politica di gestione dello stesso framework.

Critiche al FFJ

Per questioni di completezza diciamo anche che in Internet si torvano alcuni articoli, non molti a dire il vero, che esprimono pareri molto critici rispetto al FFJ considerato un mero esercizio accademico con ridotte possibilità di utilizzo in ambienti professionali. L'accusa principale è relativa al fatto che l'implementazione di sistemi complessi richiede soluzioni MT e la parallelizzazione di processi che vanno ben oltre al pattern D&C e al coordinamento di singole classi/Thread. I vari articoli hanno sicuramente qualche spunto interessante: per esempio Edward Harned, cfr. [8], critica giustamente anche l'eccessivo utilizzo di sun.misc.Unsafe, ossia il collegamento a routine scritte in C dipendenti dalla piattaforma al fine di ottimizzare le performance. Ma sul resto, crediamo che si tratti più di una provocazione che di un'argomentazione solida e che le soluzioni vadano valutate nel rispettivo contesto. Per esempio, lo stesso Lea non ha mai posizionato il FFJ come backbone di una soluzione Enterprise. Non è mai stato venduto come un'alternativa agli application server, sebbene sia possibile criticare il fatto che l'utilizzo del FFJ nel contesto degli application server dovrebbe essere considerato accuratamente dal momento che è compito dell'EJB container di gestire Thread e le implementazione dei componenti non dovrebbero, direttamente o indirettamente, lanciare nuovi Thread. il FFJ è stato concepito come una soluzione efficiente ed elegante per supportare l'implementazione di algoritmi, come per esempio la ricerca del best-path, che possono trarre grandi benefici da una struttura D&C. Invece uno spunto interessante è quello di vedere quale sarà l'utilizzo del FFJ ed il relativo sviluppo quando verrà finalmente rilasciata la grammatica delle espressioni Lambda.

Algoritmi "Divide et Impera" (Divide and Conquer, D&C)

Il FFJ è ideale per l'implementazione di algoritmi D&C la cui struttura tipica prevede un pattern come quello descritto di seguito:

0.     Result solveProblem(Problem problem) {
1.           if (il problema è sufficientemente piccolo) {
2.                  risolvi il problema
3.           } else {
4.                  Decomponi il problema in sotto-problemi indipendenti
5.                  (fork) genera sotto-task per risolvere i sotto-problemi
6.                  (join) riunisci i sotto-task
7.                  componi il risultato a partire dai quelli parziali
                    generati dai sotto-task
8.           }
9.     }

Da una prima analisi del precedente listato si può notare che si tratta di una struttura ricorsiva nella quale è possibile individuare:

  • il passo base: (linee 1 e 2) che permette di terminare la ricorsione;
  • il passo ricorsivo: (linee da 4 a 7), che si occupano di decomporre il problema dato in problemi più semplici da risolvere indipendentemente per poi ricomporre il risultato.

Qualche nota storica

L'algoritmo D&C più popolare, che verosimilmente tutti i lettori avranno implementato almeno una volta, è la ricerca binaria. Una descrizione dettagliata si deve a John Mauchly (fisico americano che con J. Presper Eckert disegnò l'ENIAC) che nel 1946 dettagliò il modello di ricerca binaria. Tuttavia tracce di algoritmi simili risalgono addirittura al 200 a.C. L'idea alla base è di partire da un array di elementi ordinati e quindi eseguire la ricerca dividendo ogni volta l'array in due componenti: quelli degli elementi inferiori alla chiave da cercare e quelli degli elementi superiori alla chiave che come tale possono essere ignorati. Altro algoritmo D&C, che risale a diversi secoli prima di Cristo, è l'algoritmo di Euclide atto a calcolare il massimo comun divisore di due numeri la cui strategia consiste nel ridurre i numeri in equivalenti sottoproblemi sempre più piccoli. Inoltre è possibile menzionare la serie di Fibonacci che si presta egregiamente ad un'implementazione D&C.

Un primo esempio di algoritmo D&C con sottoproblemi multipli è l'algoritmo di Gauss coniato nel 1805 divenuto poi famoso con il nome di trasformata veloce di Fourier (FFT, Fast Fourier Transformer) di Cooley-Tukey che lo riscoprirono circa un secolo più tardi. Mentre, uno dei primi algoritmi D&C specificatamente sviluppato per i computer è l'algoritmo di ordinamento merge sort inventato da John von Neumann nel 1945 del quale riportiamo l'implementazione fork/join di seguito.

Altro celebre esempio di algoritmo D&C che, inizialmente, non prevedeva un aspetto di automatizzazione è il metodo dell'ufficio postale coniato da Knuth. Si tratta di una strategia utilizza per organizzare razionalmente l'instradazione della posta. La strategia prevede che le lettere siano ordinate in sacchi separati per le diverse aree geografiche. Ognuno di questi sacchi di lettere viene poi suddiviso in lotti per aree più piccole, e così via fino alla consegna. Si tratta di una strategia simile al radix-sort implemetato attraverso schede perforate nel 1929.

Merge sort

Il seguente listato riporta lo pseudo-code dell'algoritmo merge sort.

       function mergeSort(List aList)
             // se la lisa ha un solo elemento, è ordinata. Ritorna la lista
             if aList.size() <= 1
                    return aList
             //...altrimenti, suddividi la lista in due sotto liste
             List leftList
             List rightList
             int middle = aList.size() / 2
             for each element in aList before middle
                    leftList.add(element)
             for each element in aList after or equal middle
                    rightList.add(element)
             // invoca ricorsivamente mergeSort.
             // ciò comporta l'ulteriore suddivisione delle sotto liste
             // fino al passo base (lista unitaria)
             leftList = mergeSort(leftList)
             rightList = mergeSort(rightList)
             // esegui la fusione delle sottoliste restituite dalle precedenti
             // invocazioni mergeSort in una nuova lista restituita al chiamante
             return merge(left, right)

Ed ecco di seguito il listato conl'implementazione dell'algoritmo MergeSort eseguita con il FFJ:

/**
 * This class implements the Quick Sort algorithm
 * The complexity is mostly O(n Log n), in worst case scenarios can be O(n^2)
 * Algorithm:
 * 1. pick an element from the list called the pivot
 * 2. Partition the list:
 *      - all elements with value less than the pivot come before the pivot
 *      - all elements with value grater than the pivot come after the pivot
 *      - elements with the same value can go either way as long as it is consistent
 * 3. Recursively
 *      - sort sub-list with elements prior the pivot
 *      - sort sub-list with elements after the pivot
 * 4. Terminate the execution when the list has 1 or zero elements
 */
 
public class QuickSort> extends RecursiveAction {
 
       // --------------------- CONSTANS SECTION ---------------------
       /** default serial version ID */
       private static final long serialVersionUID = 1L;
 
       // --------------------- ATTRIBUTES SECTION -------------------
       /** data source to sort */
       private List dataSource;
       /** lower index where to start the sort. Used in recursive invocations */
       private int     lowerIndex;
       /** upper index where to stop the sort. Used in recursive invocations */
       private int     upperIndex;
       /** kind of unique task id */
       private long    taskId;
      
       // --------------------- METHODS SECTION ---------------------
      
       // ---------------------------------------------- constructors
       /**
        * Full constructor
        * @param dataSource
        */
       public QuickSort(List dataSource) {
             this.dataSource = dataSource;
             this.lowerIndex = 0;
             this.upperIndex = dataSource.size() - 1;
             this.taskId =  System.nanoTime();
            
             outputStat("New (first)  ");
       }
 
      
       /**
        * Full constructor
        * @param dataSource
        * @param lowerIndex
        * @param upperIndex
        */
       protected QuickSort(List dataSource, int lowerIndex,
                                     int upperIndex, long taskId) {
              
             this.lowerIndex = lowerIndex;
             this.upperIndex = upperIndex;
             this.dataSource = dataSource;
             this.taskId     = taskId;
            
             outputStat("New          ");
       }
      
       // ------------------------------------------------- specific methods
      
       @Override
       protected void compute() {
            
             if (lowerIndex < upperIndex) {
                    outputStat("Compute Start");
                    int pivotIndex = lowerIndex + ((upperIndex - lowerIndex) / 2); 
                      // middle point
                   
                    pivotIndex = partition(pivotIndex);
                   
                    invokeAll(
                           new QuickSort<>(dataSource, lowerIndex,
                                       pivotIndex-1, System.nanoTime()),
                                       new QuickSort<>(dataSource, pivotIndex+1, 
                                       upperIndex, System.nanoTime())
                    );
                    outputStat("Compute End  ");
             }
       }
       /**
        * This is a simple output used to show the stealing strategy
        * @param operation   operation to print
        */
       private void outputStat(String operation) {
             String threadId = Thread.currentThread().getId()+"";
             if (Thread.currentThread().getId() < 10) { 
                    threadId = "0"+threadId;
             }
            
             System.out.println(" task id:"+this.taskId+" thread:"
                                            +threadId+" op:"+operation);
       }
      
       /**
        * partition the data set
        */
       private int partition(int pivotIndex) {
             T pivotValue = dataSource.get(pivotIndex);   // middle element
             swap(pivotIndex, upperIndex);
            
             int storeIndex = lowerIndex;
             for (int i=lowerIndex; i < upperIndex; i++) { // go through all elements
                   
                    if (dataSource.get(i).compareTo(pivotValue) < 0) {
                           swap(i, storeIndex);
                           storeIndex++;
                    }
             }
            
             swap(storeIndex, upperIndex);
            
             return storeIndex;
       }
      
       /**
        * Swap the elements in the given positions
        */
       private void swap(int i, int j) {
             if (i != j) {
                    T iValue = dataSource.get(i);
                   
                    dataSource.set(i, dataSource.get(j));
                    dataSource.set(j, iValue);
             }
       }
      
       // -------------- MAIN METHOD: a poor way to test classes!!!
       public static void main(String[] args) {
             int MAX_SIZE = 10_000;
            
             // ------------------------- generate a random list
             List sourceData = new ArrayList<>(MAX_SIZE);
            
             for (int i=0; i<MAX_SIZE; i++) {
                    int randomValue = (int) (Math.random() * 100_000);
                    sourceData.add(randomValue);
             }
            
             // ------------------------- execute the sort
             QuickSort quickSort = new QuickSort<>(sourceData);
            
             ForkJoinPool pool = new ForkJoinPool(6);
             pool.invoke(quickSort);
            
       }
}  // --- end of class --

L'esecuzione di questo breve programma permette di evidenziare alcune intessanti caratteristiche.

  • In primo luogo la miriade di task generati durante l'esecuzione sono eseguiti dai thread gestiti dal ForkJoinPool. In particolare, l'implementazione richiede un pool di 6 thread (new ForkJoinPool(6)). Questo fa comprendere perchè i ForkJoinTask siano considerati Thread "leggeri": il fork di un nuovo task non genera assolutamente la creazione di un nuovo thread e tanto meno quello di un nuovo processo.
  • I nuovi task possono venire allocati alla coda di un thread diverso da quello che lo ha creato.
  • I "thread" rubano spesso il lavoro dalla coda degli altri thread.

In merito all'ultimo punto, è possibile notare sequenze come le due riportate di seguito:

 task id:4781408082098 thread:08 op:New
 task id:4781408082098 thread:09 op:Compute End
 task id:4781408082098 thread:09 op:Compute Start
 
 task id:5762273738806 thread:08 op:New
 task id:5762273738806 thread:11 op:Compute End
 task id:5762273738806 thread:11 op:Compute Start

Nel primo caso il Thread 8 genera un nuovo task che poi viene acquisto dal Thread 9. La stessa suquenza accade successivamente, in questo caso il Thread "ladro" è il numero 11.

 

Sempre dall'analisi del codice è possibile notare che il disegno tipico di una specializzazione della classe astratta ForkJoinTask richiede i campi che compongono i suoi parametri nel costruttore, e poi definisce un metodo di calcolo che utilizza in qualche modo i metodi di controllo forniti da questa classe base: fork e join, oppure un metodo a più alto livello di astrazione, come invokeAll dell'esempio, che esegue il fork automatico di tutti i task definiti per poi eseguirne il join.

Per finire è possibile notare che nel codice si sono utilizzate alcune feature implementate dal progetto Coin, Java SE 7 descritte nell'articolo precedente, come, per esempio, l'utilizzo dell'underscore per aumentare il grado di leggibilità di alcuce costanti:

int MAX_SIZE = 10_000; e (int) (Math.random() * 100_000);

o il ricorso alla notazione contratta del diamante

= new QuickSort<>(sourceData);

e

QuickSort<>(dataSource, lowerIndex, pivotIndex-1, System.nanoTime()),

Conclusione

Questo articolo è stato dedicato alle nuove feature del package della concorrenza introdotte con Java SE 7. In questo caso il giudizio non può che essere assolutamente positivo… come del resto succede per la stragrande maggioranza dei lavori di Doug Lea. Le nuove feature continuano il percorso di revisione dell'iniziale strategia Java per il supporto al MT iniziato con J2SE 5.0. Sebbene tale supporto fu inserito nativamente nel linguaggio Java e, almeno inizialmente sembrava veramente un'ottima idea in discontinuità con i linguaggi del passato, ben presto tutte le sue limitazioni sono divenute evidenti: eccessiva rigidità (singolo monitor), generazione di colli di bottiglia (i meccanismi di controllo della concorrenza erano eccessivamente coarse-grained), feature troppo a basso livello (Thread, synchronised, wait e notify), ridotto controllo, etc. Ora, con il senno di poi è possibile affermare che tale supporto fu più una limitazione che un reale vantaggio.

Tra le varie feature presentate in questo articolo, il framework Fork/Join è decisamente quella più interessante/importante. In particolare, introduce una semplice ed efficace tecnica di disegno per ottenere elevate prestazioni da algoritmi in grado di essere scomposti in sotto-parti in gradi di funzionare in parallelo (algoritmi Divide and Conquer). La sincronizzazione tra i vari Thread è construttivamente ridotta al minimo grazie alla strategia utilizzata per far funzionare l'assegnazione dei vari lavori: ogni Thread utilizza la propria coda come una pila e quindi inserisce e preleva i lavori da eseguire sempre "sopra", mentre il framework può rimuovere i task da un Thread per assegnarli ad altri dal fondo: preleva i più anziani secondo i dettami propri delle code. Il framework fork-join, se utilizzato secondo le direttive (non si creano task che richiedono ulteriori sincronizzazioni, continui accessi I/O, etc.) non blocca mai i vari Thread se non per attendere che tutte le varie sottoattività siano pronte quando è richiesto di esegurie il join. L'overhead e la sincronizzazioni sono quindi ridotto al minimo. Il framework fork/join mostra in maniera chiara come le migliori soluzioni frequentemente partono proprio dalla semplicità e pulizia del relativo disegno.

Riferimenti

[1] Luca Vetti Tagliati, "L'evoluzione di Java: verso Java 8 - III parte: Java SE 7 e la "moneta" delle nuove feature", MokaByte 173, Maggio 2012

http://www2.mokabyte.it/cms/article.run?articleId=ORJ-HZ8-Q45-F8N_7f000001_3893887_2d8c237e

 

[2] JSR 166: Concurrency Utilities

http://www.jcp.org/en/jsr/detail?id=166

 

[3] Doug Lea, "A Java Fork/Join Framework", State University of New York at Oswego

http://gee.cs.oswego.edu/dl/papers/fj.pdf

 

[4] java.util.Random codice sorgente

http://www.docjar.com/html/api/java/util/Random.java.html

 

[5] Doug Lea, "TransferQueue motivation?"

http://cs.oswego.edu/pipermail/concurrency-interest/2009-February/005888.html

 

[6] William N. Scherer III, Doug Lea, Michael L. Scott, "Scalable Synchronous Queues", java.util.Random codice sorgente

http://www.docjar.com/html/api/java/util/Random.java.html

 

[7] MIT, "The Cilk project"

http://supertech.csail.mit.edu/cilk/

 

[8] Edward Harned, " A Java™ Fork-Join Calamity Parallel processing with multi-core Java™ applications", maggio 2012

http://coopsoft.com/ar/CalamityArticle.html

 

 

 

Condividi

Pubblicato nel numero
174 giugno 2012
Luca Vetti Tagliati ha conseguito il PhD presso il Birkbeck College (University of London) e, negli anni, ha applicato la progettazione/programmazione OO, Component Based e SOA in diversi settori che variano dal mondo delle smart card a quello del trading bancario, prevalentemente in ambienti Java EE. Dopo aver lavorato a…
Articoli nella stessa serie
Ti potrebbe interessare anche