Buffer
a capacità limitata
I meccanismi di comunicazione tra thread visti fino
ad ora sono concettualmente piuttosto semplici: nella
barriera lo scambio di dati avviene solo quando i thread
hanno smesso di girare, nella pipeline lo scambio di
dati è continuo, ma ha il limite di poter collegare
solamente una coppia produttore-consumatore. Nel caso
più generale in cui si desidera mettere in comunicazione
più di produttori e/o più consumatori
è necessario creare un oggetto intermedio che
faccia da punto di incontro tra le esigenze dei produttori
e quelle dei consumatori: tale oggetto prende il nome
di "buffer a capacità limitata". Un
buffer è un oggetto dotato di una apposita memoria
con uno spazio limitato e di due metodi: put(), per
aggiungere un elemento e get() per prelevarlo.
Figura 1 - Una rappresentazione grafica del buffer
a capacita limitatà
Esistono
due tipi di problemi a cui questo tipo di oggetti può
andare incontro. Il primo si verifica quando un produttore
tenta di depositare un ulteriore elemento in un buffer
pieno, o un consumatore vuole prelevare un elemento
da un buffer vuoto. Ma esistono problemi peggiori a
cui può andare incontro un oggetto del genere.
Si osservi il seguente esempio di buffer non sincronizzato:
public
class BadBuffer {
private Object[] buffer = new Object[10];
private int index = 0;
private int max = 9;
public Object get() {
Object element = buffer[0];
System.arraycopy(buffer,1,buffer,0,max);
index--;
return element;
}
public void put(Object element) {
buffer[index] = element;
index++;
}
}
Questo
oggetto dispone di un array di 10 elementi e una di
variabile index che punta al primo posto libero. Il
metodo get() preleva il primo elemento presente nell'array,
copia tutti gli elementi indietro di una posizione e
decrementa l'indice di un'unità; infine restituisce
l'elemento. Il metodo put() deposita l'elemento passato
come parametro nella cella di memoria puntata da index,
e quindi incrementa di uno la variabile index stessa.
Per
capire cosa non va in questo frammento di codice, si
pensi di nuovo all'esempio della pizzeria: il pizzaiolo
è il produttore, i camerieri sono i consumatori
e il bancone su cui poggiano i piatti è il buffer
a capacità limitata. Mentre le pizze sono in
forno, il pizzaiolo deposita otto piatti sul bancone.
Quindi, non appena le pizze sono cotte, le preleva dal
forno e le deposita sui piatti. I camerieri, appena
vedono che un piatto è pronto per essere servito,
lo prendono e lo portano al tavolo. Si provi ad immaginare
cosa succederebbe se un cameriere distratto prendesse
un piatto vuoto un attimo prima che il pizzaiolo ci
depositi sopra una pizza: il risultato sarebbe un piatto
vuoto in mano al cameriere e una pizza squagliata sul
bancone.
Un
buffer non sincronizzato va incontro a problemi di questo
genere: dal momento che un thread può essere
interrotto in qualunque momento, non è escluso
che esso possa essere interrotto prima di aver completato
un compito complesso, come l'aggiornamento congiunto
di un gruppo di variabili. Si osservi la tavola 1:
Tavola 1 - Traccia di esecuzione concorrente non
sincronizzata
Il
produttore chiama la put() e deposita l'oggetto nella
cella di memoria specificata da index, che assume il
valore 3. In quell'istante lo scheduler mette a riposo
il produttore e passa il controllo al consumatore, che
chiama la get() sul buffer, preleva il primo elemento
presente nell'array, sposta l'array indietro di un valore
e decrementa l'indice, che ora vale 2; infine restituisce
l'elemento e prosegue nella propria esecuzione. Non
appena lo scheduler rimanda in esecuzione il produttore,
questo prosegue da dove aveva terminato, e incrementa
il valore della variabile index, che ora vale di nuovo
3. Peccato che nel frattempo l'elemento appena depositato
sia stato spostato nella cella numero 1, e che la cella
numero 2 sia vuota o, peggio ancora, contenga un dato
non valido: il buffer ora si trova in uno stato inconsistente,
come il bancone della pizzeria con sei piatti vuoti
e una pizza spiaccicata.
In
conclusione, per poter costruire un buffer funzionante
è necessario garantire due cose: la prima è
che le operazioni get() e set() possano essere eseguite
in un blocco indivisibile, atomico; la seconda che il
buffer sia in grado di comportarsi in modo corretto
quando l'array è completamente vuoto o al contrario
è pieno fino all'orlo. Queste problematiche possono
essere risolte con le direttive di sincronizzazione.
Sincronizzazione
attraverso i monitor
Il monitor è un'entità che sovrintende
dall'alto, affinché le cose si svolgano in un
determinato modo. Il monitor è basato sul concetto
di mutex (mutual exclusion), che corrisponde concettualmente
alla chiave del bagno in alcuni locali: quando un cliente
deve andare in bagno, chiede la chiave al gestore e
la restituisce quando ha finito; in questo modo il cliente
è sicuro di non essere disturbato mentre si trova
in bagno. In java un monitor denota un insieme di metodi
che condividono un unico mutex: quando un thread sta
eseguendo uno di questi metodi, nessun altro thread
può accedere ad essi.
Direttive di sincronizzazione
Il linguaggio Java prevede in modo nativo la sincronizzazione
attraverso i monitor. Per poter usare i monitor in modo
corretto è necessario studiare le direttive di
sincronizzazione su cui sono basati, e vedere come possono
essere usate in pratica in un buffer a capacità
limitata.
Il
modificatore synchronized
Il modificatore synchronized permette di marcare un
gruppo di metodi di una classe in modo che condividano
un unico mutex, in modo da garantire un accesso mutuamente
esclusivo a tali metodi: questo gruppo di metodi diviene
a tutti gli effetti un monitor. Questo significa che
solo un thread per volta potrà eseguire uno di
questi metodi: se durante l'esecuzione il thread che
possiede il mutex viene interrotto dallo scheduler in
favore di un altro thread che concorre allo stesso mutex,
quest'ultimo verrà messo in attesa fino a quando
il mutex non sarà stato rilasciato.
Il
modificatore synchronized permette di risolvere il primo
problema del buffer a capacità limitata, ossia
la garanzia di esecuzione atomica dei metodi get() e
put() da parte di un thread:
public
synchronized Object get() {}
public synchronized void put(Object element) {}
Nota:
la direttiva synchronized può essere usata anche
come istruzione, per sincronizzare un frammento di codice
sul mutex di un oggetto arbitrario:
synchronized(obj)
{
method1();
method2();
}
Questo
meccanismo di sincronizzazione è stato spesso
utilizzato in modo scorretto (cfr [1] e [2]) e per questa
ragione non verrà illustrato nella presente trattazione.
I
metodi wait(), notify() e notifyAll()
Per poter realizzare una versione funzionante del buffer
a capacità limitata è necessario risolvere
il problema del buffer pieno o di quello vuoto. Per
risolvere questo problema esistono due ulteriori direttive:
wait() e notifyAll(). Si osservi il seguente frammento
di codice:
public
synchronized void put(Object o) {
while(index == max) // buffer pieno
try {
wait();
}
catch(InterruptedException e) {}
buffer[index] = o;
index++;
notifyAll();
}
Si
immagini ora il seguente scenario: un produttore chiama
il metodo get(). Appena entrato, si accorge che il buffer
è pieno, a questo punto chiama un'apposita istruzione
wait() che mette il thread a riposo e rilascia il mutex,
in modo che un consumatore possa accedere al buffer
e liberare posto. Si noti che l'istruzione wait() è
posta all'interno di un ciclo while: se ad un certo
punto lo scheduler restituisce il controllo al produttore
senza che il buffer sia stato liberato, questo torna
in wait. In caso contrario prosegue nella sua esecuzione,
deposita l'elemento nell'array, incrementa l'indice
e chiama l'istruzione notifyAll(). Questa direttiva
invia un segnale a tutti i thread in attesa di quel
mutex. Lo scheduler li metterà nella coda di
ready e li manderà in esecuzione uno per volta.
Si osservi ora il codice del metodo get():
public
synchronized Object get() {
while(index == 0) // Buffer vuoto
try {
wait();
}
catch(InterruptedException e) {}
Object o = buffer[0];
System.arraycopy(buffer,1,buffer,0,max);
index--;
notifyAll();
return o;
}
Anche
in questo caso viene effettuato un controllo: se il
buffer è vuoto, il thread consumatore viene messo
in wait. Se invece il vettore non è vuoto, il
consumatore preleva il primo elemento dal vettore, copia
gli altri elementi indietro di una posizione, incrementa
l'indice e invia un segnale notifyAll() a tutti gli
altri thread in attesa di quel mutex.
Il
linguaggio Java prevede una ulteriore direttiva di sincronizzazione:
la notify(). Questo metodo assomiglia a notifyAll(),
ma invece di inviare un segnale a tutti i thread in
attesa ne sveglia uno solo. Il motivo per cui in questo
esempio non è stata utilizzata è che non
esiste alcuna regola su quale thread verrà risvegliato:
se al termine di una get() che ha svuotato il buffer
venisse svegliato un altro consumatore, questo verrebbe
immediatamente messo a riposo dalla wait(), e il sistema
rimarrebbe bloccato. La notify() in conclusione può
tornare utile solamente nel caso in cui esista una sola
coppia produttore-consumatore: in tutti gli altri casi
è preferibile inviare un segnale a tutti i thread
coinvolti nell'operazione, e lasciare che siano le direttive
di sincronizzazione a rimettere in equilibrio il sistema.
Buffer
grafico
Le nozioni illustrate nei precedenti paragrafi permettono
di sviluppare un'interessante variante sul buffer a
capacità limitata: il buffer grafico. Il buffer
grafico è un semplice buffer a capacità
limitata che mostra, attraverso le librerie grafiche
Swing, lo stato del proprio array:
import
javax.swing.*;
public
class GraphicBuffer {
private Object[] buffer;
private int index;
private int max;
private JProgressBar progressBar;
public GraphicBuffer(String name,int max) {
this.buffer = new Object[max];
this.max = max-1;
index = 0;
JFrame f = new JFrame(name);
progressBar = new JProgressBar(0,this.max);
f.getContentPane().add(progressBar);
f.setSize(300,80);
f.setVisible(true);
}
public synchronized Object get() {
while(index == 0)
try {
wait();
}
catch(InterruptedException e) {}
Object o = buffer[0];
System.arraycopy(buffer,1,buffer,0,max);
index--;
progressBar.setValue(index);
notifyAll();
return o;
}
public synchronized void put(Object o) {
while(index == max)
try {
wait();
}
catch(InterruptedException e) {}
buffer[index] = o;
index++;
progressBar.setValue(index);
notifyAll();
}
}
Il
costruttore richiede due parametri: un nome, che verrà
visualizzato come titolo dell'apposita finestra, e un
valore intero, che denota la dimensione del buffer.
Dopo aver inizializzato le variabili, il costruttore
crea una finestra di 300 * 80 pixel e mette al suo interno
una JProgressBar. I metodi get() e set() sono assolutamente
equivalenti a quanto già visto, con l'unica differenza
che prima di inviare la notifyAll() viene aggiornato
il valore della JProgressBar. Nel prossimo paragrafo
verrà illustrato un esempio che fa uso di questo
particolare buffer.
Un
esempio: la pizzeria
Si vuole creare una simulazione di una pizzeria, con
due pizzaioli e sette camerieri. La classe Pizzaiolo
svolge il ruolo del produttore:
class
Pizzaiolo implements Runnable {
private int id;
private GraphicBuffer buffer;
public Pizzaiolo(int id,GraphicBuffer buffer) {
this.id = id;
this.buffer = buffer;
}
public void run() {
int ind = 0;
while ( true ) {
buffer.put("Pizza Numero " + ind +" del
pizzaiolo " + id);
ind++;
try {
Thread.sleep((int)(Math.random() * 1000));
}
catch (InterruptedException e) {
}
}
}
}
Il
metodo run() della classe Pizzaiolo contiene un ciclo
infinito in cui vengono deposte nel buffer grafico delle
pizze (in realtà delle stringhe assai poco appetibili),
e quindi si pone in attesa per un tempo compreso tra
i 0 e i 1000 millisecondi. La classe cameriere svolge
in questo esempio il ruolo del consumatore:
class
Cameriere implements Runnable {
private int id;
private GraphicBuffer buffer;
public Cameriere(int id , GraphicBuffer buffer) {
this.id = id;
this.buffer = buffer;
}
public void run() {
while ( true ) {
String s = (String)buffer.get();
System.out.println("Cameriere " + id + "
consegna " + s);
try {
Thread.sleep((int)(1000 + Math.random() * 2000));
}
catch (InterruptedException e) {
}
}
}
}
Anche
in questo caso il metodo run() prevede un ciclo continuo
in cui il cameriere preleva una pizza e stampa a schermo
una stringa in cui si indica l'id del cameriere, quello
del pizzaiolo e il numero di pizze prodotte fino a quel
momento da quel particolare pizzaiolo. Il cameriere
viene messo in attesa per un tempo maggiore: un tempo
compreso tra 1 e 3 secondi. Questa differenza nei tempi
di riposo ha come conseguenza di creare un certo equilibrio
tra il lavoro dei due produttori e quello dei tre consumatori.
La classe PizzeriaExample crea ed avvia il sistema:
public
class PizzeriaExample {
public static void main(String argv[]) {
GraphicBuffer bancone = new GraphicBuffer("Bancone",10);
for(int i = 1;i<=2;i++) {
Runnable pizzaiolo = new Pizzaiolo(i,bancone);
Thread t = new Thread(pizzaiolo);
t.start();
}
for(int i = 1;i<=7;i++) {
Runnable cameriere = new Cameriere(i,bancone);
Thread t = new Thread(cameriere);
t.start();
}
}
}
Il
codice di questa classe è piuttosto semplice:
dapprima viene creato un GraphicBuffer di nome "Bancone"
a 10 posti, quindi, attraverso due cicli for, vengono
creati ed avviati i pizzaioli e i camerieri. Il sistema
è in equilibrio, come si può vedere osservando
l'andamento del buffer.
Figura 2 - Il programma PizzeriaExample in esecuzione
Si
invita il lettore a variare i parametri relativi al
numero di pizzaioli e camerieri nel sistema, i tempi
di riposo definiti nelle rispettive classi o il numero
di posizioni nel buffer: si osservi a questo punto se
il sistema rimane o meno in equilibrio con 20 pizzaioli
e 50 camerieri, o se il fatto di creare un bancone più
capiente costituisca o meno un vantaggio.
Buffer
a capacità limitata generico
Negli esempi precedenti non sono stati usati i tipi
generici per non creare confusione su un argomento già
di per sé molto complesso. Tuttavia, grazie a
questa nuova funzionalità di Java 5, è
possibile realizzare un buffer generico, che sia possibile
usare in qualunque circostanza.
import
java.util.*;
public
class GenericBuffer<T> {
private List<T> buffer;
private int index;
private int max;
public GenericBuffer(int max) {
this.buffer = new ArrayList<T>();
this.max = max-1;
index = 0;
}
public synchronized T get() {
while(index == 0)
try {
wait();
}
catch(InterruptedException e) {}
T element = buffer.get(0);
buffer.remove(0);
index--;
notifyAll();
return element;
}
public synchronized void put(T element) {
while(index == max)
try {
wait();
}
catch(InterruptedException e) {}
buffer.add(element);
index++;
notifyAll();
}
}
Come
oramai dovrebbe essere chiaro, il buffer ha ora un tipo
parametrico, che è necessario specificare in
fase di costruzione:
GenericBuffer<String>
b = new GenericBuffer<String>(10);
Conclusioni
Questo mese è stato illustrato il problema del
buffer a capacità limitata. Per risolverlo sono
state introdotte le direttive di sincronizzazione del
linguaggio Java. Grazie ad esse è stato possibile
costruire un buffer a capacità limitata funzionante,
e dimostrarne il funzionamento in un ambiente caratterizzato
da un numero di produttori e consumatori superiore ad
1.
Il mese prossimo verranno descritti i lock, un meccanismo
di sincronizzazione che permette interazioni più
complesse di quelle viste fino ad ora tra i thread e
un oggetto condiviso.
Bibliografia
[1] Double-checked locking: Clever, but broken
Do you know what synchronized really means?
Java World - February 2001
http://www.javaworld.com/javaworld/jw-02-2001/jw-0209-double.html
[2] Warning! Threading in a multiprocessor world
Find out why many tricks to avoid synchronization overhead
just don't work
Java World - February 2001
http://www.javaworld.com/javaworld/jw-02-2001/jw-0209-toolbox.html
Esempi
Scarica
qui gli esempi
|