MokaByte 69 - Dicembre 2002 
Java Message Service
IV parte: le destinazioni temporanee e i MessageDrivenBean
di
Stefano Rossini
Si conclude con questa puntata la mini serie di articoli dedicati a JMS. Questa quarta e ultima parte è dedicata alla presentazione di un'applicazione che utilizza congiuntamente i modelli PTP e Publisher/Subscriber, alla spiegazione delle destinazioni temporanee per concludere con una breve panoramica sugli EJB Message Driven. intro

Per riepilogare i concetti visti nelle puntate precedenti([JMS1],[JMS2],[JMS3]),si presenta un'applicazione JMS che ha lo scopo di gestire la vendita di biglietti di partite di football americano e le relative applicazioni client potenziali acquirenti.
L'applicazione è costituita da tre classi:

  • TicketMessage: è la classe che incapsula il biglietto della partita da vendere/acquistare
  • TicketSeller: è la classe responsabile di vendere i biglietti delle partite
  • TicketBuyer: è la classe in grado di comprare i biglietti delle partite d'interesse


Figura 1 - l'applicazione JMS che vende in biglietti e i relativi client acquirenti
(clicca per ingrandire)


La classe TicketMessage incapsula la descrizione del biglietto e per ogni sua proprietà mette a disposizione i relativi metodi get e set:

public class TicketMessage implements java.io.Serializable{
  // valore dello stato del biglietto in vendita
  public static final String INSALE = "TICKET IN SALE...";
  // valore dello stato del biglietto comprato
  public static final String SOLD = "! SOLD !";
  // ID del biglietto
  private int id;
  // Stadio
  private String stadium;
  // Nome della squadra che gioca in casa
  private String home;
  // Nome della suadra ospite
  private String visitors;
  // Data della partita
  private String date;
  // Stato del biglietto : INSALE / SOLD
  private String status;
  // Data di messa in vendita
  private String insaleDate;
  // Data di effettiva vendita del biglietto
  private String soldDate;
  // ID del compratore
  private String userId;
  ...


La vendita dei biglietti
La situazione di vendita è un tipico caso di modellazione Publish/Subscriber; il venditore, modelllato mediante la classe TicketSeller, pubblica presso un Topic, presso un topic (MokaTopicSell) e chiunque si dichiari subscriber di quel topic è in grado di ricevere gli annunci di vendita.
I biglietti da vendere sono contenuti in un array di oggetti di classe TicketMessage:

private static final TicketMessage arrayTickets[] = {
  new TicketMessage(0,"Soldier Field",
                      "Pittsburgh","Chicago","01/10/02"),
  new TicketMessage(1,"Coliseum","Oakland","Chicago","03/11/02"),
  new TicketMessage(2,"Heinz Field",
                      "Pittsburgh","Cincinnati","10/11/02"),
  ...
}

La classe TicketSeller provvede a metterli in vendita pubblicandoli, a istanti temporali definiti e se non sono stati nel frattempo già venduti, presso il MokaTopicSell:

public class TicketSeller extends JFrame
                          implements MessageListener {

  private void publishTicketsToSell(TopicSession topicSession) {
    this.message = topicSession.createObjectMessage();
    ...
    for(int i=0; i<arrayTickets.length; ++i) {
      if(arrayTickets[i].getStatus().equals(TicketMessage.INSALE)){         // se il biglietto è in vendita
        try{
          Thread.sleep(5000); // aspetto…
        }catch(InterruptedException ie){. . .}
      // preparo il messaggio da vendere
      arrayTickets[i].setInsaleDate(
                      new java.util.Date().toString());
      message.setObject(arrayTickets[i]);
      message.setJMSReplyTo(tempOrderTopic);
      // lo pubblico presso il MokaTopicSell
      topicPublisher.publish(message);
      
...

I compratori sono le applicazioni JMS che si dichiarano subscriber presso il MokaTopicSell.

public class TicketBuyer extends JFrame implements MessageListener {
  private void setup() {
    ...
    TopicSubscriber     topicSubscriber=topicSession.createSubscriber(topic,null,true);
    topicSubscriber.setMessageListener(this);
    ...

Fino ad ora, niente di nuovo rispetto a quello già visto nelle puntate precedenti; passiamo a vedere come modellare e gestire la parte relativa all'acquisto dei biglietti.

 

L'acquisto dei biglietti
Tutti i client JMS in ascolto sul topic MokaTopicSell sono potenziali acquirenti. Quando un client JMS decide di comprare un biglietto deve comunicare con il solo venditore inoltrando la richiesta d'acquisto.

 

Utilizzo delle Temporary Destination
Fino ad adesso si sono utilizzate destinazioni gestite come Administered Object, cioè create/configurate/rimosse dal Provider JMS mediante o l'opportuno tool di amministrazione o lo specifico file di configurazione.
Le API JMS permettono di creare destinazioni dette temporanee (TemporaryQueue e TemporaryTopic) la cui durata è strettamente legata a quella della connessione tramite la quale sono state create.
Queste destinazioni sono create dinamicamente dall'applicazione utilizzando i metodi createTemporaryQueue() della classe QueueSession e createTemporaryTopic() della classe TopicSession.
Le destinazioni temporanee sono ad uso privato dell'applicazione JMS che le ha create e quindi gli altri client JMS non ne hanno visibilità.
Se si chiude la connessione da cui si è creata la destinazione temporanea, anch'essa viene chiusa ed il suo contenuto viene perso.
Nella applicazione in esame il TicketSeller crea quindi un temporary topic sul quale si predisporrà a ricevere le richieste di acquisto

tempOrderTopic = topicSession.createTemporaryTopic();
topicSubscriber = topicSession.createSubscriber(tempOrderTopic);
topicSubscriber.setMessageListener(this);

Gli acquirenti quindi potranno inviare le richieste di acquisto al topic temporaneo in modo tale che esse siano visibili al solo venditore e non quindi agli altri acquirenti.
Resta da capire come è possibile indicare al consumatore la destinazione del produttore.

 

Il campo JMSReplyTo
Il campo JMSReplyTo, appartiene all'header del messaggio JMS, permette al consumatore di un messaggio di ricavarsi la destinazione a cui rispondere.
Nel nostro esempio, l'acquirente ricava dalla lettura di tale campo la destinazione del venditore verso il quale inoltrare la richiesta d'acquisto.
Per attuare questo semplice meccanismo di request/reply, il client Seller, prima di inviare il messaggio, valorizza il campo JMSReplyTo con il valore del temporary topic precedentemente creato

message.setJMSReplyTo(tempOrderTopic);
topicPublisher.publish(message);

Il ricevente legge il campo JMSReplyTo

Destination dest = message.getJMSReplyTo();

ed effettua l'opportuno downcast alla specifica tipologia della destinazione

Topic buytopic = (Topic)dest;

per poi provvedere all'invio del messaggio

// creo il publisher
publisher = this.topicSession.createPublisher(buytopic);
// invio il messaggio d'acquisto
publisher.publish(msg);

Il campo JMSReplyTo permetta di fatto un "routing applicativo" di messaggi che può essere particolarmente utile in tipici contesti di workflow in cui è necessario specificare le destinazioni da utilizzare a secondo dello stato e del punto particolare del processo.
Il metodo decideToBuy() conterrà la logica responsabile dell'acquisto del biglietto e verrà invocato ogni qualvolta si riceve un annuncio di vendita.

public void onMessage(javax.jms.Message message){
  decideToBuy(message);
}

Tale metodo effettua un semplice confronto tra la squadra che gioca in casa indicata nel biglietto da comprare (ticket.getHome()) e la squadra preferita dell'utente (this.teamName)

private void decideToBuy(Message message){
  ObjectMessage msg = null;
  try {
    if (message instanceof ObjectMessage) {
      msg = (ObjectMessage) message;
      TicketMessage ticket = (TicketMessage)msg.getObject();
      if(ticket.getHome().equalsIgnoreCase(this.teamName)){

Se i nomi delle squadre sono uguali, si prepara il messaggio da inviare (il ticket d'interesse)

msg.setObject(ticket);

si ricava la destinazione a cui inoltrare la richiesta d'acquisto

Topic buytopic =(javax.jms.Topic)message.getJMSReplyTo();

e si invia il messaggio

publisher = this.topicSession.createPublisher(buytopic);
publisher.publish(msg);
. . .



Figura 2
- Acquisto dei biglietti: utilizzo del topic temporaneo.

 


Utilizzo delle code in modalità Point to point
Mentre la modellazione di vendita dei biglietti è un tipico caso di relazione 1 a molti (1 venditore - N acquirenti), l'acquisto dei biglietti è di fatto una relazione 1 a 1 tra acquirente e venditore e quindi un tipico scenario Point-to-Point.
Il TicketSeller effettua quindi una lookup sulla coda MokaQueueBuy e si pone in ricezione di eventuali messaggi di richiesta d'acquisto dichiarandosi Listener della destinazione.

this.queue = << lookup Queue "MokaQueueBuy" >>
QueueReceiver queueReceiver = queueSession.createReceiver(this.queue);
queueReceiver.setMessageListener(this);

L'utilizzo del campo JMSReplyTo dell'header del messaggio è analogo al caso precedente.
Il TicketSeller specifica in fase di publishing la coda alla quale inviare la risposta

message.setJMSReplyTo(this.queue);

mentre il TicketBuyer, una volta ricevuto il messaggio, legge il campo JMSReplyTo e provvede ad inviare il messaggio di richiesta d'acquisto
Queue buyQueue = (Queue)dest;
// creo il QueueSender
QueueSender queueSender = this.queueSession.createSender(buyQueue);
// invio il messaggio d'acquisto
queueSender.send(msg);

 


Figura 3
- Acquisto dei biglietti: utilizzo della Queue

 

Identificazione dell'acquirente
Una volta implementata la parte relativa all'acquisto dei biglietti, resta da risolvere il problema dell'identificazione dell'acquirente tra gli N subscriber attivi.
Per fare questo è possibile utilizzare il campo JMSCorrelationID, che fa parte dell'header del messaggio JMS e può essere utilizzato per vari scopi applicativi: nella nostra applicazioni nel JMSCorrelationID verrà inserito l'username del cliente prima di inviare il messaggio d'acquisto.

// Setto la username come correlationID
msg.setJMSCorrelationID(this.userName);
// invio il messaggio d'acquisto
queueSender.send(msg);

In alternativa si sarebbe potuto decidere di specificare l'identità dell'acquirente con un message property

msg.setStringProperty("ACQUIRENTE",userName);
queueSender.send(msg);

o nel corpo del messaggio stesso

ticket.setBuyerId(username);
msg.setObject(ticket);
queueSender.send(msg);


Infine si noti come, inserendo nel JMSCorrelationID del messaggio di risposta lo stesso valore del messaggio ricevuto, tale campo può essere utilizzato anche come correlatore di request e response.

 

JMS & J2EE
Gli EJB 1.1 (Session e Entity) sono componenti prettamente sincroni che si basano su architettura ORB. Con l'avvento delle specifiche EJB 2.0 hanno fatto la comparsa i Message Driven Beans (MDB), EJB asincroni basati su JMS.
I MDB sono una variante degli EJB Session Stateless in grado di processare messaggi JMS; analogamente agli EJB Session Stateless, i MDB non sono in grado di mantenere uno stato conversazionale per uno specifico client e sono gestiti in pool.
La caratteristica principale dei MDB è che permettono quindi di consumare in modo asincrono e concorrente messaggi da destinazioni JMS attraverso l'infrastuttura J2EE dell'Application Server.
Dato che un MDB non è un componente invocabile da remoto direttamente da un client, non possiede le interfacce Home e Remote, ma implementa invece le interfacce javax.ejb.MessageDrivenBean e javax.jms.MessageListener e deve quindi definire i seguenti metodi:

  • ejbCreate(): viene invocato quando l'EJB viene creato
  • ejbRemove(): viene invocato prima che il container rimuova l'EJB
  • setMessageDrivenContext(MessageDrivenContext ctx): il context è passato all'EJB dopo che è stato istanziato e prima della ejbCreate(). Il context permette di gestire transazioni con i metodi getUserTransaction(), setRollbackOnly(), getRollbackOnly() e per la security con i metodi getCallerPrincipal(), isCallerInRole().
  • OnMessage(Message msg): tale metodo è sempre invocato in una propria e separata transazione e viene invocato esattamente come avviene per un MessageListener di un normale client JMS.

Il ciclo di vita di un EJB Message Driven è il seguente:

 


Figura 4
- Ciclo di vita di in Message Driven Bean

 

Differenze tra un'applicazione JMS ed un EJB MD
Dopo questa breve introduzione sui MDB si può passare ad analizzare brevemente le differenze di sviluppo tra un MDB e tra un'applicazione JMS standalone.
Nel caso del MDB le operazioni di creazione del message consumer (QueueReceiver/TopicSubscriber), di registrazione del message listener (setMessageListener) e della modalità di acknowledge del messaggio vengono effettuate dall'EJB Container e non sono più a carico dell'applicativo. L'associazione tra il MDB, il ConnectionFactory e la destinazione avvengono a deploy time mediante il deployment descriptor file.

 

Un semplice EJB Message Driven

Vediamo ora un semplice MDB che stampa a video il contenuto dei messaggi che riceve da una coda; esso è costituito da una sola classe di nome SimpleMDB :

public class SimpleMDB implements MessageDrivenBean,MessageListener{
  private transient MessageDrivenContext mdc = null;
  private Context context;
  public SimpleMDB() {}
  public void setMessageDrivenContext(MessageDrivenContext mdc){
    this.mdc = mdc;
  }

  public void ejbCreate() {}
    public void onMessage(Message message) {
    TextMessage msg = null;
    try {
      if (message instanceof TextMessage) {
        msg = (TextMessage) message;
      } else {
        System.out.println("msg of wrong type: ");
      }

     } catch (JMSException e) {
      e.printStackTrace();
      mdc.setRollbackOnly();
    }
  }
  public void ejbRemove() {}
}

Il legame con la destinazione a cui è collegato il MDB non viene definito all'interno del codice ma verrà descritto nei file DD ejb.jar.xml ed il DD vendor-dependent.
Nel file standard di deploy ejb-jar.xml si riporta il nome della classe del MDB (<ejb-class>), il tipo di transazione(<transaction-type>), la modalità di acknowledge(<acknowledge-mode>) e la tipologia della destinazione(<destination-type>).

<enterprise-beans>
<message-driven>
<ejb-name>SimpleMDB_Queue</ejb-name>
<ejb-class>it.mokabyte.jms.ejb.SimpleMDB</ejb-class>
<transaction-type>Container</transaction-type>
<acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
<message-driven-destination>
<destination-type>javax.jms.Queue</destination-type>
</message-driven-destination>
</message-driven>
</enterprise-beans>
</ejb-jar>

Inoltre, come da specifica EJB, ogni riferimento alle risorse fisiche (per esempio nome della coda ) è inserita nel Deployment Descriptor dello specifico Application Server J2EE; nel caso dell'Application Server JBoss è nel file jboss.xml che si associa al MDB la destinazione fisica a cui è collegato.

<?xml version="1.0"?>
<jboss>
<enterprise-beans>
<message-driven>
<ejb-name>SimpleMDB_Queue</ejb-name>
<destination-jndi-name>queue/MokaQueueMDB</destination-jndi-name>
</message-driven>
</enterprise-beans>
</jboss>

Nell'esempio riportato il MDB è listener di una destinazione Queue di nome JNDI queue/MokaQueueMDB.
Per completare il processo di creazione del bean è necessario effettuare il deploy del file contenente il SimpleMDB.class e i due deployment descriptor; sempre nel caso si utilizzi JBoss il deploy avviene in modo molto semplice copiando il jar ottenuto nella directory <JBOSS_HOME>\deploy.
Per provare l'EJB è sufficiente scrivere un client JMS che crei dei messaggi e li invii alla coda indicata dal nome JNDI queue/MokaQueueMDB.
Per complicare leggermente l'esempio si può pensare di sviluppare un MDB che effettua l'eco dei messaggi ricevuti dalla coda JMS.
Rispetto al caso precedente, dove il MDB necessitava solo di ricevere messaggi, adesso è necessario anche inviare ciò che si riceve al mittente del messaggio.

public class QueueEchoMDB implements MessageDrivenBean,                                      MessageListener{

  private MessageDrivenContext ctx = null;
  private QueueConnection conn;
  private QueueSession session;
  public QueueEchoMDB(){}
  public void setMessageDrivenContext(MessageDrivenContext ctx){
    this.ctx = ctx;
  }

  public void ejbCreate()throws EJBException {
    try{
      InitialContext iniCtx = new InitialContext();
      Object tmp = iniCtx.lookup("java:comp/env/jms/QCF");
      QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
      conn = qcf.createQueueConnection();
      session = conn.createQueueSession(false,
                     QueueSession.AUTO_ACKNOWLEDGE);
      conn.start();
    }
    catch(NamingException ne){
      throw new EJBException(. . .);
    }
    catch(Exception ex){
      ...
    }
  }

  public void ejbRemove(){
    System.out.println("QueueEchoMDB.ejbRemove : ...");
    ctx = null;
    try{
      if( session != null ){session.close();}
      if( conn != null ) { conn.close();}
    }
    catch(JMSException ex){
      ex.printStackTrace();
      throw new EJBException(. . .);
    }
  }

  public void onMessage(Message msg){
    try{
      TextMessage tm = (TextMessage) msg;
      String text = tm.getText();
      Queue dest = (Queue) msg.getJMSReplyTo();
      text = "Hello From MDB ! You've sent : " + text;
      sendReply(text, dest);
    }
    catch(Exception ex){
      ex.printStackTrace();
    }
  }


  // Eco del messaggio ricevuto
  private void sendReply(String text, Queue dest)
                         throws JMSException {
    QueueSender sender = session.createSender(dest);
    TextMessage tm = session.createTextMessage(text);
    sender.send(tm);
    sender.close();
  }
}

Il codice è molto semplice: in fase di creazione (metodo ejbCreate()) il MDB effettua la lookup del ConnectionFactory e crea una sessione di lavoro

InitialContext iniCtx = new InitialContext();
Object tmp = iniCtx.lookup("java:comp/env/jms/QCF");
QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
conn = qcf.createQueueConnection();
session = conn.createQueueSession(false,
                                  QueueSession.AUTO_ACKNOWLEDGE);
conn.start();

Una volta ricevuto il messaggio il MDB (metodo onMessage()) si ricava la destinazione a cui inoltrare il messaggio leggendo il campo JMSReplyTo

Queue dest = (Queue) msg.getJMSReplyTo();

per poi provvedere al relativo invio

QueueSender sender = session.createSender(dest);
TextMessage tm = session.createTextMessage(text);
sender.send(tm);
sender.close();

Rispetto all'esempio precedente bisogna configurare il ConnectionFactory mediante il quale il MDB sarà in grado di inviare il messaggio di risposta-

<message-driven>
<ejb-name>QueueEchoMDB</ejb-name>
<ejb-class>it.mokabyte.jms.ejb.QueueEchoMDB</ejb-class>
<transaction-type>Container</transaction-type>
<acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
<message-driven-destination>
<destination-type>javax.jms.Queue</destination-type>
</message-driven-destination>
<resource-ref>
<res-ref-name>jms/QCF</res-ref-name>
<res-type>javax.jms.QueueConnectionFactory</res-type>
<res-auth>Container</res-auth>
</resource-ref>
</message-driven>

Nel deployment descriptor si deve valorizzare il tag <resource-ref> con la dichiarazione del tipo e del nome della risorsa esterna utilizzata dall'EJB mediante gli elementi <res-ref-name>, <res-type>, e <res-auth>.
Nel file jboss.xml si deve specificare, oltre al nome della destinazione da cui ricevere i messaggi, anche il nome JNDI del Queue Connection Factory.

<message-driven>
<ejb-name>QueueEchoMDB</ejb-name>
<destination-jndi-name>queue/MokaQueueMDB_A</destination-jndi-name>
<resource-ref>
<res-ref-name>jms/QCF</res-ref-name>
<jndi-name>MokaQueueConnectionFactoryMDB</jndi-name>
</resource-ref>
</message-driven>

Conclusioni
Con questo esempio si conclude la mini-serie dedicata a JMS: lo scopo (spero raggiunto) era di dare una panoramica la più possibile completa di JMS spaziando dai modello PTP al Publisher/Subscriber, descrivendo la modalità store and forward, message selector e destinazioni temporane fino a concludere con una breve panaramica degli EJB MDB.

 

Bibliografia
[JMS1]S.Rossini:"JMS-La gestione dei messaggi : la teoria", Mokabyte N.60, Febbraio 2002
[JMS2]S.Rossini:"JMS-La gestione dei messaggi: la pratica", Mokabyte N.61, Marzo 2002
[JMS3]S.Rossini:"JMS-La gestione dei messaggi: terza parte",Mokabyte N.68, 9mbre 2002
[ORJMS] D.A.Chappel,R.M.Haefel - Java Message Service,O'Reilly 2001
[JMSS] JMS Specification (versione 1.0.2b) - http://java.sun.com/products/jms/docs.html
[JMST] JMS tutorial - http://java.sun.com/products/jms/tutorial/index.html
[EEDOC] J2SDKEE API Documentation
[BWLS] P. Gomez, P. Zadrozny - "Java 2 Enterprise Edition with BEA Weblogic Server", Wrox 2001

MokaByte® è un marchio registrato da MokaByte s.r.l. 
Java®, Jini® e tutti i nomi derivati sono marchi registrati da Sun Microsystems.
Tutti i diritti riservati. E' vietata la riproduzione anche parziale.
Per comunicazioni inviare una mail a info@mokabyte.it