Luca Ceppelli è nato a Bolzano 13/09/1977. Nel luglio 2002 ha conseguito la laurea in Ingegneria Elettronica (specializzazione microelettronica) presso il Politecnico di Milano.
Dal luglio del 2004 è dipendente di Iona Technologies come Senior Consultant. Ha maturato esperienze nazionali e internazionali nel campo dell‘integrazione software e delle architetture distribuite presso aziende finanziarie, TelCo e nel settore della pubblica amministrazione.
Paolo Palazzini è nell'organico di IONA Technologies da gennaio 2006 con l'obiettivo di supportare i clienti nella progettazione di soluzioni in ambito Service Oriented Architecture. Dopo aver conseguito la laurea in Ingegneria Elettronica presso l'Università “La Sapienza” di Roma, ha lavorato in diverse aziende operanti come System Integrator, Information Security Consulting e Application Service Provider. Nel corso della carriera professionale si è occupato del settore ICT del mercato finanziario, delle telecomunicazioni, della pubblica amministrazione e della difesa, e ha collaborato a stretto contatto con i principali Vendor di tecnologia. Lavorando sia in Italia che negli Stati Uniti, si è occupato di progetti mission-critical per clienti Enterprise.
Web Service, Enterprise Integration Patterns e Web Application con FUSE
Nel precedente articolo è stato mostrato come implementare alcuni Web Service utilizzando FUSE Services Framework (open.iona.com/products/enterprise-cxf), versione supportata e certificata di Apache CXF, e come utilizzare meccanismi di comunicazione sincroni e asincroni per invocare tali servizi. Nel presente articolo vedremo come utilizzare FUSE Mediation Router per implementare l‘agente di prestito descritto nel corso del primo articolo di questa serie.
Nel corso di questo articolo mostreremo come, a partire dai service provider dell’agenzia di credito e delle banche realizzati nell’articolo precedente, sia possibile implementare un service mediator in grado di esporre un Web Service verso l’utente e di interagire con i service provider per realizzare il seguente workflow:
L’agente di prestito (AdP) riceve la richiesta di quotazione del rateo da parte dell’utente (U);
AdP riceve dall’agenzia di credito (AdC) il punteggio associato ad U;
AdP invia una richiesta ad ogni banca (B);
AdP raccoglie le risposte da tutte le B;
AdP stabilisce qual è la migliore quotazione;
AdP fornisce il risultato ad U.
Nel corso dell’articolo sarà descritto il seguente scenario:
il service mediator (AdP) espone verso l’utente (U) un’interfaccia SOAP/HTTP;
il service provider dell’agenzia di credito (AdC) espone verso AdP un’interfaccia SOAP/HTTP;
i service provider delle banche (B) espongono un’interfaccia SOAP/JMS;
l’invocazione di B da parte di AdP avviene in maniera parallela.
Anche in questo caso, per facilitare la possibilità di sperimentare sulla propria macchina gli esempi proposti, è stato reso disponibile il file articolo_2.zip all’URL www.iona.com/FUSE_EIP/articolo_2.zip, contenente i sorgenti Java e il file POM (Project Object Model) da utilizzare con Maven. All’interno del file articolo_2.zip sono state incluse anche le altre combinazioni, che prevedono l’esposizione da parte di AdC di un’interfaccia SOAP/JMS, l’esposizione da parte di B di interfacce SOAP/HTTP e l’invocazione di B da parte di AdP in maniera sequenziale.
Inoltre, all’URL http://www.iona.com/FUSE_EIP/maven_guide.htm è disponibile una breve guida per l’installazione di Maven, utilizzato anche per il codice distribuito con il presente articolo.
Come anticipato, utilizzeremo FUSE Mediation Router per implementare AdP. FUSE Mediation Router, versione certificata di Apache Camel distribuita da IONA Technologies all’URL open.iona.com/products/enterprise-camel, è un engine per il routing e per il process mediation che semplifica l’implementazione di pattern di integrazione utilizzando codice Java POJO. FUSE Mediation Router fa uso di generic, annotation e Uniform Resource Identifier (URI) per avvalersi di qualsiasi tipo di trasporto e modello di messaging. Ciò che rende particolarmente interessante FUSE Mediation Router sta nel fatto che questa libreria implementa i pattern di integrazione descritti nel libro “Enterprise Integration Patterns” di Gregor Hohpe e Bobby Woolf, considerato uno dei principali manuali di riferimento delle architetture basata su integrazione message-oriented e che descrive in maniera esaustiva i design pattern per l’integrazione di applicazioni enterprise.
FUSE Mediation Router utilizza un Domain Specific Language (DSL) Java per configurare le regole di routing e di mediation: in tal modo è possibile realizzare rapidamente regole di routing in qualsiasi IDE utilizzando codice Java dover necessariamente far uso di configurazioni XML complesse.
Per quanto riguarda la sintassi, una regola di routing inizia sempre con il metodo from(“EndpointURL”) che specifica la sorgente del messaggio e al quale si può aggiungere una catena lunga a piacere di processor per la regola di routing, come ad esempio filter(). La definizione della regola di routing termina con il metodo to(“EndpointURL”) che specifica il destinatario del messaggio processato, ma non è obbligatorio terminare la regola di routing con to(). Come URL degli endpoint (EndpointURL) può essere utilizzato uno qualsiasi dei componenti configurati in fase di deployment, per esempio si può utilizzare un file (file:MyMessageDirectory), un endpoint basato su FUSE Services Framework (cxf:MyServiceName) o un endpoint basato su FUSE Message Broker (activemq:queue:MyQName).
Un esempio della sintassi di base per la definizione di regole di routing è illustrata nella seguente figura.
Figura 1 – Esempio di regola di routing
È anche possibile definire regole di routing globali iniziando l’istruzione con processor type special, come ad esempio intercept(),exception(),errorHandler(). L’elenco completo di componenti è disponibile all’URL http://activemq.apache.org/camel/components.html.
Un processor è un oggetto che può modificare il flusso dei messaggi che passano attraverso l’elaborazione di una regola; se un messaggio appartiene a una remote procedure call (InOut call) il processor può agire sul flusso del messaggio in entrambe le direzioni: sui messaggi di richiesta che vanno dal sorgente al destinatario e sui messaggi risposta che vanno dal destinatario al sorgente. Inoltre, i processor possono avere come argomenti le espressioni e i predicati che modificano il comportamento del metodo, ad esempio una regola può includere un processor filter() che utilizza un predicato xpath() come argomento.
Le espressioni, per l’analisi dei tipi di dato, e i predicati, per l’analisi dei true e dei false, sono spesso utilizzati come argomenti dei processor e non è necessario preoccuparsi di quale tipo passare all’argomento di un’espressione perche’ questi sono automaticamente convertiti.
Esempi di pattern di integrazione disponibili con FUSE Mediation Router
Di seguito sono riportati alcuni processor disponibili con FUSE Mediation Router e la corrispondente icona definita da Gregor Hohpe e Bobby Woolf. L’elenco completo dei processor implementati da FUSE Mediation Router è disponibile all’URL http://activemq.apache.org/camel/enterprise-integration-patterns.html.
Message Filter
Il processor filter() può essere utilizzato per evitare che messaggi non necessari raggiungano l’edpoint destinatario. La sintassi prevede l’impiego di un argomento predicate: se questo è true, il messaggio exchange è inviato al destinatario; se il predicate è false, il messaggio exchange viene bloccato.
Figura 2 – Message Filter
Ad esempio, come illustrato nel seguente esempio, il message filter blocca i messaggi exchange, a meno che questi non contengano un header “foo” il cui valore è pari a “bar”.
Se il messaggio ricevuto da un endpoint sorgente (SourceURL) deve essere ricevuto da più di un destinatario, come illustrato nella figura seguente,
Figura 3 – Recipient List
ci sono due approcci alternativi che possono essere utilizzati. Il primo consiste nell’invocare il metodo to() con endpoint destinatari multipli (recipent list statica), come ad esempio il seguente codice:
L’approccio alternativo è quello di invocare il processor recipientList(), che ha come argomento una lista di destinatari (recipent list dinamica). Il vantaggio derivante dall’utilizzo del processor recipientList() è che la lista di destinatari può essere valorizzata a runtime: ad esempio, la regola seguente genera una lista di destinatari leggendo il valore di recipientListHeader dal messaggio ricevuto:
Il processor aggregator() è utilizzato per aggregare messaggi correlati in un singolo messaggio, come illustrato nella figura 4.
Figura 4 – Recipient List
Allo scopo di distinguere i messaggi che devono essere aggregati insieme, deve essere definita una correlation key per l’aggregator(): i messaggi che hanno la stessa correlation key devono essere aggregati. Solitamente, la correlation key deriva da uno dei campi del messaggio, ad esempio un campo dell’header. È comunque possibile definire un algoritmo di aggregazione per il processor aggregator(): l’algoritmo di default è di scegliere l’ultimo messaggio con un dato valore per la correlation key e di scartare i messaggi più vecchi con quel valore di correlation key. Ad esempio, nel caso si stia monitorando uno stream di dati che fornisce i prezzi delle azioni in tempo reale, potremmo essere interessati solo all’ultimo prezzo per ogni azione. In questo caso è possibile configurare un aggregator() che trasmetta solo l’ultimo prezzo di un’azione e scarti quelli più vecchi. Questa funzionalità è implementata dalla seguente regola, dove la correlation key è presa dall’header stockSymbol e viene utilizzato l’algoritmo di default:
Allo scopo di combinare tutti i messaggi di exchange, relativi ad una specifica correlation key, in un singolo messaggio exchange, si utilizza il processor AggregationStrategy().
Multicast
Il processor multicast() consente di eseguire il routing dello stesso messaggio verso diversi endpoint e di processarli in maniera differente. Il seguente esempio mostra come prendere una richiesta dall’endpoint direct:a e di eseguire il multicast verso direct:x, direct:y e direct:z.
Implementazione del service provider dell’agente di prestito
Come anticipato, l’esempio descritto in questo articolo prevede l’esposizione di interfacce SOAP/HTTP da parte dei service provider dell’agente di prestito e dell’agenzia di credito e di interfacce SOAP/JMS da parte dei service provider delle banche .Il sequence diagram UML corrispondente è illustrato nella seguente figura.
Figura 5 – Sequence diagram UML
Dal punto di vista progettuale, si tratta di implementare seguente workflow illustrato nella seguente figura, realizzato con le componenti descritte da Gregor Hohpe e Bobby Woolf.
Figura 6 – Broker
Per prima cosa definiamo l’interfaccia che il service provider dell’agente di prestito esporrà. L’operazione da esporre è getLoanQuote e restituisce una String contenente le informazioni sul valore del rateo minore e sulla banca che lo applica in base ai seguenti parametri: Social Security Number dell’utente (String ssn), quantità del prestito richiesto (double loanAmount), durata del prestito (int loanDuration). Si tratta degli stessi parametri richiesti dall’operazione getQuote esposta dalle banche, a meno dei parametri (int creditHistory e int creditScore) forniti dall’agenzia di credito e la cui valorizzazione è quindi inclusa nel workflow dell’agente di prestito e derivante dall’invocazione dell’agenzia di credito. L’interfaccia del service provider dell’agente di prestito pertanto sarà:
Prima di vedere come implementare il codice con FUSE Mediation Router, è importante sapere che org.apache.camel.builder.RouteBuilder è la classe base per la definizione della regola di routing. Questa classe definisce il metodo astratto configure() che deve essere specializzato nella classe derivata. Inoltre, RouteBuilder definisce anche metodi utilizzati per iniziare le regole di routing, come from(), intercept(), and exception(). È possibile definire quante classi RouteBuilder sono necessarie, l’importante è instanziare ognuna di queste con l’oggetto CamelContext. La classe LoanBroker che implementa l’agente di prestito sarà quindi derivata da RouteBuilder:
public class LoanBroker extends RouteBuilder {
}
all’interno del main sarà instanziato l’oggetto CamelContext
Analizziamo i singoli processor che compongono la regola di routing. Il processo from(), dopo aver inizializzato l’endpoint, intercetta il messaggio inviato dal client all’endpoint FUSE Services Framework definito dall’URI
Il secondo componente della regola di routing è process() che crea un’istanza del processor CreditScoreProcessor() prendendo come argomento l’URI
http://localhost:9006/creditAgency
che identifica il service provider dell’agenzia di credito che fornisce i dati aggiuntivi necessari all’invocazione delle 3 banche.
Il processor CreditScoreProcessor() si occupa dell’enrichment dei dati forniti dall’utente estraendo il valore ssn dall’exchange in input e utilizzandolo per invocare le operazioni getCreditScore() e getCreditHistoryLength() esposte dall’agenzia di credito, come descritto di seguito:
public void process(Exchange exchange) throws Exception {
System.out.println("
[CreditScoreProcessor] process has been invoked!!!");
Message requestMessage = exchange.getIn();
List
I valori ottenuti vengono poi inseriti come property direttamente all’interno dell’exchange. Il successivo componente della regole di routing è il processor intercept() che crea un’istanza del processor Transform() il quale interviene due volte durante il dispatching di una singola richiesta. Nel primo caso è responsabile di adattare richiesta contenuta nell’exchange proveniente da CreditScoreProcessor() per inviarlo ai service provider delle banche, mentre nel secondo caso di adattare la risposta dei service provider delle banche in quella da restituire all’utente. Il codice chiave di questo processor è riportato di seguito:
Da notare come i due interventi di trasformazione del messaggio verso e da i service provider delle banche siano separati dall’istruzione processNext(exchange).
Il successivo componente della regole di routing è il processor multicast() necessario al processamento dei risultati derivanti dall’invocazione dei service provider delle banche. il processor multicast() crea un’istanza della classe BankResponseAggregationStrategy(), definita come estensione del processor AggregationStrategy() e che, come visto, serve a combinare tutti i messaggi di exchange relativi ad una specifica correlation key, in un singolo messaggio exchange.
La specializzazione del metodo aggregate() viene richiamato da FUSE Mediation Router a partire dalla seconda risposta di una delle banche ricevuta dallo stadio multicast. Gli argomenti che vengono passati al metodo sono due exchange (oldExchange, newExchange) contenenti le risposte dei service provider delle prime due banche che rispondono. Ricordiamo a questo proposito che, nel caso in cui il secondo argomento del processor() multi cast sia true, la comunicazione con i service provider delle banche è asincrona, pertanto l’ordine di ricezione delle risposte è indipendente dall’ordine di invocazione.
Come si nota dal codice mostrato di seguito, viene fatta una prima comparazione tra i valori di rateo forniti dalle banche invocate e viene restituito l’exchange contenente il rateo più basso. A questo punto, quando il processor multicast() riceve la risposta dell’ultima banca, viene richiamato nuovamente il metodo aggregate passando l’exchange calcolato precedentemente (oldExchange) e l’exchange con la risposta appena ricevuta (newExchange).
Object[] oldResult = (Object[])oldExchange.getOut().getBody();
BankQuote oldQuote = (BankQuote) oldResult[0];
Object[] newResult = (Object[]) newExchange.getOut().getBody();
BankQuote newQuote = (BankQuote) newResult[0];
Exchange result = null;
if (newQuote.getRate() >= oldQuote.getRate()) {
result = oldExchange;
} else {
result = newExchange;
}
return result;
L’ultimo processor della regola di routing è to() che specifica i destinatari del messaggio, in questo caso gli URI:
Di seguito viene descritta l’esecuzione dell’intero processo utilizzando il codice disponibile nel file articolo_2.zip e distribuito all’URL www.iona.com/FUSE_EIP/articolo_2.zip.
Dopo aver eseguito l’installazione di Maven, è utile preparare uno script per inizializzare le variabili d’ambiente. Per gli utenti Windows le seguenti istruzioni del file maven_env.bat contenuto nel file articolo_1.zip dovranno essere modificate in base alle directory di installazione del JDK e di Maven:
set JAVA_HOME=
set M2_HOME=
set PATH=%M2_HOME% in;%JAVA_HOME% in;%PATH%
Per gli utenti Linux le seguenti istruzioni del file maven_env.sh contenuto nel file articolo_1.zip dovranno essere modificate in base alle directory di installazione del JDK e di Maven:
Per modificare i permessi di esecuzione del file dovrà essere eseguita la seguente istruzione:
chmod u+x maven_env.sh
e per inizializzare le variabili d’ambiente dovrà essere eseguita la seguente istruzione:
. ./maven_env.sh
Prima di lanciare i servizi, è comunque sempre consigliabile ricompilare tutto con il comando:
mvn compile
Per lanciare il broker JMS è sufficiente posizionarsi sulla directory articolo2, aprire la shell con il file batch maven_env.bat ed eseguire il comando:
Per lanciare il service provider dell’agenzia di credito è sufficiente posizionarsi sulla directory articolo2, aprire la shell con il file batch maven_env.bat ed eseguire il comando:
Per lanciare i service provider delle banche è sufficiente posizionarsi sulla directory articolo2, aprire la shell con il file batch maven_env.bat ed eseguire il comando:
Per lanciare il service provider dell’agente di prestito è sufficiente posizionarsi sulla directory articolo2, aprire la shell con il file batch maven_env.bat ed eseguire il comando:
A questo punto è possibile lanciare il client del service provider dell’agente di prestito posizionandosi sulla directory articolo2, aprendo la shell con il file batch maven_env.bat ed eseguendo il comando:
Il primo messaggio visualizzato sulla finestra del service provider dell’agente di prestito sarà la notifica di avvenuta invocazione del processor CreditScoreProcessor() e dei valori di getCreditScore e getCreditHistoryLength forniti dal service provider dell’agenzia di credito e sarà del tipo:
[CreditScoreProcessor] process has been invoked!!!
Credit Agency getCreditHistoryLength Response for ssn:#1003 -> historyLength:616
Credit Agency getCreditScore Response for ssn:#1003 -> score:10
In maniera corrispondente, sulla finestra del service provider dell’agenzia di credito sarà visualizzato un messaggio del tipo:
=== CreditAgency ===
getCreditHistoryLength() has been invoked!!!!
ssn:#1003
=== CreditAgency ===
getCreditScore() has been invoked!!!!
ssn:#1003
Il risultato dell’esecuzione della prima trasformazione del messaggio visualizzato sulla finestra del service provider dell’agente di prestito sarà del tipo
[Transform] process [REQUEST] has been invoked!!!
Request Transformation
ssn #1003
historyLength: 616
score: 10
amount 100000.0
loanDuriation: 12
Il risultato dell’invocazione dei service provider delle banche è visualizzato sulla finestra corrispondente.
=== Bank: JMS Bank 1 ===
getQuote() has been invoked!!!!
ssn:#1003, loanAmount:100000.0,
loanDuration:12, creditScore:10
=== Bank: JMS Bank 2 ===
getQuote() has been invoked!!!!
ssn:#1003, loanAmount:100000.0,
loanDuration:12, creditScore:10
=== Bank: JMS Bank 3 ===
getQuote() has been invoked!!!!
ssn:#1003, loanAmount:100000.0,
loanDuration:12, creditScore:10
Calculated:[ ssn:#1003 bank:JMS Bank 1 rate:4.1952066778022585 ]
Calculated:[ ssn:#1003 bank:JMS Bank 2 rate:3.002175767602914 ]
Calculated:[ ssn:#1003 bank:JMS Bank 3 rate:3.185705131816837 ]
Successivamente è possibile vedere il risultato dell’invocazione del processor BankResponseAggregationStrategy() sulla finestra del service provider dell’agente di prestito.
[BankResponseAggregationStrategy] aggregate has been invoked!!!
Compare: JMS Bank 1 rate:4.1952066778022585
with: JMS Bank 2 rate:3.002175767602914
[BankResponseAggregationStrategy] aggregate has been invoked!!!
Compare: JMS Bank 2 rate:3.002175767602914
with: JMS Bank 3 rate:3.185705131816837
Il risultato dell’esecuzione della seconda trasformazione del messaggio visualizzato sulla finestra del service provider dell’agente di prestito sarà del tipo
[Transform] process [RESPONSE] has been invoked!!!
Response Transformation
bank: JMS Bank 2
rate: 3.002175767602914
Infine, nella finestra relative al client del service provider dell’agente di prestito viene mostrata la String di ritorno, contenente il nime della banca con rateo minore e il valore di quest’ultimo.
|=== PARALLEL_LOANBROKER_WS_BANK_JMS_CREDITAGENCY_WS_ADDRESS ===
|======================================================
| It takes 2625 milliseconds
|------------------------------------------------------
| The best loan quote is: [ssn:#1003, Bank:JMS Bank 2, Rate:3.002175767602914]
|======================================================
Come configurare il trasporto JMS in un endpoint di CXF
A differenza della configurazione degli endpoint HTTP, che richiedono solamente la definizione dell’URL, la configurazione di un endpoint JMS ha bisogno di un maggior numero d’informazioni, come la definizione dei valori dei parametri jndiConnectionFactoryName, jndiDestinationName, connectionUserName, connectionPassword, etc.
Queste informazioni vengono inserite all’interno del file di configurazione di CXF. Per poter specificare tre configurazioni distinte, una per ogni banca, è necessario, rispetto al codice distribuito con il primo articolo di questa serie, modificare il file di configurazione di spring bank-server-ws-spring.xml, utilizzato per inizializzare, con un approccio code-first, i servizi che rappresentano le 3 banche. Come si può notare è stato inserito per ogni server l’attributo endpointName che ha la forma di un QName e corrisponde al portName nel WSDL. I tre valori sono da inserire nel file bank-server-ws-spring.xml sono:
All’interno del file di configurazione cxf.xml sono state inserite le tre differenti configurazione per gli endpoind JMS delle 3 banche. Come si può notare endpointName viene utilizzato all’interno dell’attributo name dell’elemento jms:conduit in modo da poter configurare i 3 endpoint in maniera indipendente.
Da notare che l’attributo jndiDestinationName ha come valore il suffisso dynamicQueues che indica al broker JMS di inizializzare una coda nel caso in cui non fosse già stata creata.
Conclusioni
In questo secondo articolo abbiamo visto come implementare un service provider in grado di esporre un servizio SOAP/HTTP, di implementare un workflow mediante pattern di integrazione e di invocare service provider in maniera sincrona e asincrona utilizzando le librerie disponibili con FUSE Mediation Router. Nel prossimo articolo mostreremo come eseguire il deployment dei servizi su un container OSGi e come implementare il front-end Web-based per l’invocazione dei service provider.
Luca Ceppelli è nato a Bolzano 13/09/1977. Nel luglio 2002 ha conseguito la laurea in Ingegneria Elettronica (specializzazione microelettronica) presso il Politecnico di Milano.
Dal luglio del 2004 è dipendente di Iona Technologies come Senior Consultant. Ha maturato esperienze nazionali e internazionali nel campo dell‘integrazione software e delle architetture distribuite presso aziende finanziarie, TelCo e nel settore della pubblica amministrazione.
Paolo Palazzini è nell'organico di IONA Technologies da gennaio 2006 con l'obiettivo di supportare i clienti nella progettazione di soluzioni in ambito Service Oriented Architecture. Dopo aver conseguito la laurea in Ingegneria Elettronica presso l'Università “La Sapienza” di Roma, ha lavorato in diverse aziende operanti come System Integrator, Information Security Consulting e Application Service Provider. Nel corso della carriera professionale si è occupato del settore ICT del mercato finanziario, delle telecomunicazioni, della pubblica amministrazione e della difesa, e ha collaborato a stretto contatto con i principali Vendor di tecnologia. Lavorando sia in Italia che negli Stati Uniti, si è occupato di progetti mission-critical per clienti Enterprise.