La serie sul pattern architetturale Command Query Responsibility Segregation (CQRS) continua con l’approfondimento dei macro-blocchi che compongono il framework Axon, implementazione Java di tale pattern. In questo articolo è la volta dell’Event Processing.
Questo articolo conclude l’analisi in dettaglio dei macro blocchi che costituiscono l’architettura del framework Axon [1]. Qualora non lo aveste già fatto, per una migliore comprensione di quello corrente, suggerisco di leggere preliminarmente anche i due articoli sul pattern CQRS [2] [3] e i precedenti tre sul framework Axon [4] [5] [6].
Per coerenza di codice, la versione di Axon a cui si fa riferimento è sempre la 1.4 anche se, durante la stesura degli ultimi articoli, è stata rilasciata la nuova release stabile 2.0, cui faremo riferimento a partire dal prossimo articolo (vedi sotto, “Conclusioni”).
Gestione degli eventi
Gli eventi generati da una applicazione Axon based devono essere inoltrati ai componenti che ne hanno bisogno, e cioè agli Event Listeners. L’Event Bus ha la responsabilità di tale compito. Axon fornisce un Event Bus (che è stato già introdotto nel terzo articolo di questa serie [4]) e alcune classi per l’implementazione degli Event Listener.
Event Bus
Il concetto di Event Bus in Axon non differisce da quello implementato in tanti altri framework. Anche in questo caso è una implementazione del pattern omonimo. È il meccanismo che invia eventi ai listener che sono subscriber di determinati tipi di eventi. Uno dei più grossi vantaggi di questo pattern è la riduzione dell’accoppiamento dei componenti di un sistema: tramite l’adozione di un’architettura di tipo publish/subscribe esso aiuta a realizzare un disaccoppiamento di componenti che altrimenti risulterebbero fortemente accoppiati. Tramite un Event Bus la comunicazione di eventi tra componenti multipli risulta quindi notevolmente semplificata.
Le implementazioni
In Axon si hanno due diverse implementazioni di Event Bus
- org.axonframework.eventhandling.SimpleEventBus
- org.axonframework.eventhandling.ClusteringEventBus
Entrambe le implementazioni sono in grado di gestire subscribed Event Listeners e inoltrano tutti gli eventi in arrivo a tutti i subscribed listeners. Questo significa che gli Event Listeners, per poter ricevere gli eventi devono essere esplicitamente registrati nell’Event Bus. Il processo di registrazione è thread-safe. I listeners possono registrarsi e deregistrarsi dal ricevere un certo tipo di evento in qualsiasi momento a runtime.
SimpleEventBus, come suggerisce il nome, è l’implementazione più basilare dell’interfaccia org.axonframework.eventhandling.EventBus. Essa si occupa semplicemente di inoltrare gli Event in arrivo a ciascuno dei subscribed Event Listener in maniera sequenziale. Nel caso in cui un Event Listener lanci una eccezione, l’azione di inoltro dell’evento viene interrotta e l’eccezione viene propagata fino al componente che ha pubblicato l’evento. SimpleEventBus va benissimo nei casi in cui il dispatch degli eventi avviene localmente, in una singola JVM.
Qualora l’applicazione richieda che gli eventi vengano pubblicati attraverso più JVM, allora bisogna usare ClusteringEventBus. ClusteringEventBus contiene due meccanismi: il ClusterSelector, che seleziona un’istanza di Cluster per ogni Event Listener registrato, e l’EventBusTerminal , che è responsabile del dispatch di eventi a ciascuno dei cluster (sia locali che remoti) pertinenti all’Event Bus.
La responsabilità primaria del ClusterSelector è, come suggerisce il nome, selezionare un cluster per ogni Event Listener che sottoscrive l’Event Bus. Per impostazione predefinita, tutti gli Event Listener sono posti in una singola istanza di cluster, la quale invia eventi per i suoi membri in maniera sequenziale (similmente al modo di operare di SimpleEventBus). Tramite un’implementazione personalizzata, è possibile organizzare gli Event Listener in istanze di Cluster differenti, in modo da soddisfare le esigenze specifiche dell’architettura del sistema.
In Axon vi è una sola implementazione dell’interfaccia Cluster: org.axonframework.eventhandling.SimpleCluster. Tale implementazione invoca ciascun Event Listener sequenzialmente nel thread chiamante. Aggiungendo informazioni ai metadati di un cluster, il selettore è in grado di fornire suggerimenti al Terminal circa il comportamento previsto.
L’EventBusTerminal costituisce il ponte tra gli eventi che devono essere inviati e i cluster all’interno dell’Event Bus. Il terminal è a conoscenza della tecnologia di Remoting utilizzata. L’implementazione predefinita inoltra gli eventi pubblicati a ciascuno dei cluster locali utilizzando il thread che li pubblica. Questo significa che con il terminal di default e il default ClusterSelector , il comportamento del ClusteringEventBus è esattamente lo stesso di quello del SimpleEventBus.
Event Listeners
Gli Event Listeners sono i componenti che entrano in azione al verificarsi di un evento. In Axon essi devono implementare sempre l’interfaccia
org.axonframework.eventhandling.EventListener.
Necessitano di essere registrati all’Event Bus per poter ricevere notifica degli eventi.
L’implementazione base fornita da Axon è
org.axonframework.eventhandling.annotation.AnnotationEventListenerAdapter.
Questa classe è in grado, tramite annotations, di trasformare in Event Listener qualsiasi bean avente un EventHandler. Si fa carico di invocare il metodo dell’EventHandler più adatto per la situazione. I metodi dell’EventHandler devono essere annotati tramite l’annotation @EventHandler. Per l’invocazione dei metodi degli EventHandler valgono le stesse regole descritte nell’articolo [5] per gli AbstractAnnotatedAggregateRoot.
Il costruttore di AnnotationEventListenerAdapter si aspetta due argomenti: l’annotated bean e l’EventBus a cui il listener deve sottoscrivere. È possibile iscrivere e cancellare l’Event Listener tramite i metodi subscribe() e unsubscribe() dell’adapter.
Come operano gli Event Listeners
Per default, gli Event Listeners processano gli eventi nel thread che li inoltra. Questo significa che il thread che esegue il comando dovrà aspettare fino al termine dell’event handling. Per alcuni tipi di Event Listeners questo non è il modo ottimale di operare. L’event processing asincrono migliora la scalabilità dell’applicazione, ma reca lo svantaggio di aggiungere complessità quando si deve tener conto della eventual consistency. Axon consente di convertire facilmente qualsiasi Event Handler da sincrono ad asincrono tramite il wrapper
org.axonframework.eventhandling.AsynchronousEventHandlerWrapper
oppure tramite l’annotation
org.axonframework.eventhandling.annotation.AsynchronousEventListener.
AsynchronousEventHandlerWrapper ha bisogno di qualche configurazione in più per rendere un Event Handler asincrono, da passare, oltre all’Event Listener, come argomenti al suo costruttore: un’istanza di java.util.concurrent.Executor, una di org.axonframework.eventhandling.SequencingPolicy, cioè la definizione di quali eventi possono essere processati in parallelo e quali in modo sequenziale, e un’istanza di org.axonframework.eventhandling.TransactionManager (quest’ultimo argomento facoltativo), che consente l’event processing all’interno di una transazione.
L’Executor è responsabile dell’esecuzione dell’event processing.
Con quale sequenza si trattano gli eventi?
Il SequencingPolicy definisce se gli eventi devono essere trattati in maniera sequenziale, in parallelo o una combinazione di entrambi. Le policies restituiscono un sequence identifier per un dato evento. Se due eventi hanno lo stesso sequence identifier significa che devono essere gestiti sequenzialmente dall’Event Handler. Un sequence identifier null significa che un evento può essere gestito in parallelo con qualsiasi altro.
Le policies consentite da Axon sono le seguenti:
- org.axonframework.eventhandling.FullConcurrencyPolicy: l’Event Handler può gestire tutti gli eventi contemporaneamente. Questo significa che non vi è alcuna relazione tra gli eventi che richiedono di essere trattati in un particolare ordine.
- org.axonframework.eventhandling.SequentialPolicy: tutti gli eventi devono essere processati in modo sequenziale. La gestione di un evento inizierà solo quando la gestione di quello precedente è terminata. Per gli annotated Event Handler questa è la policy predefinita.
- org.axonframework.eventhandling.SequentialPerAggregatePolicy: forza i domain events che sono stati sollevati dallo stesso aggregate ad essere gestiti in maniera sequenziale. Gli eventi sollevati da aggregate diversi possono essere invece processati in maniera concorrente.
Oltre a queste policies previste dal framework è possibile comunque definirne altre specifiche per una applicazione. Esse devono implementare l’interfaccia org.axonframework.eventhandling.SequencingPolicy.
Questa interfaccia definisce un unico metodo, getSequenceIdentifierFor, il quale restituisce il sequence identifier di un dato evento. Eventi per i quali viene restituito lo stesso sequence identifier devono essere trattati in maniera sequenziale. Eventi per i quali viene restituito un sequence identifier diverso possono essere trattati in maniera concorrente. Per motivi di performance, le implementazioni di policies dovrebbero restituire null se un evento può essere processato in parallelo a qualsiasi altro. In questo modo si guadagna in velocità perche’ Axon non ha bisogno di verificare la presenza di eventuali restrizioni nell’event processing.
Un TransactionManager può essere assegnato a un AsynchronousEventHandlerWrapper per poter eseguire event processing transazionale. Per ottimizzare le performance, gli eventi possono essere processati in piccoli batch all’interno di una transazione. Il TransactionManager ha la capacità di influenzare la dimensione di questi batch e può decidere se eseguire il commit, lo skip o il retry dell’event processing sulla base del risultato di un batch.
Gestione delle transazioni in caso di event handling asincrono
In alcuni casi gli event handlers devono memorizzare i dati in sistemi che utilizzano le transazioni. Far partire ed eseguire il commit di una transazione per ogni singolo evento ha un grande impatto sulle prestazioni. In Axon gli eventi vengono processati in batch. La dimensione di un batch dipende dal numero di eventi che devono essere processati e dalle impostazioni fornite dall’event handler. Come default, la dimensione di un batch viene impostata in base al numero di eventi disponibili nella processing queue al momento in cui il batch inizia. Nella maggior parte dei casi l’event handling viene effettuato utilizzando un thread pool executor o scheduler. Questo schedulerà batch di event processing appena ci sarà un evento disponibile. Quando un batch è completato, lo scheduler rischedulerà il processing del batch successivo, finche’ ci saranno altri eventi disponibili. Più piccolo è un batch, più equa sarà la distribuzione di event handler processing, ma maggiore lo scheduling overhead da creare.
Quando un Event Listener è inglobato in un AsynchronousEventHandlerWrapper, è possibile configurare un TransactionManager per gestirne le transazioni. Quest’ultimo può, in base alle informazioni contenute in un oggetto di tipo org.axonframework.eventhandling.TransactionStatus, decidere di avviare, eseguire il commit o il rollback di una transazione in un sistema esterno.
Il metodo
void beforeTransaction(TransactionStatus transactionStatus)
viene invocato prima che Axon inizi la gestione di un batch di eventi. È possibile utilizzare TransactionStatus per configurare un batch prima che questo inizi. Ad esempio, è possibile modificare il numero massimo di eventi che possono essere eseguiti nel batch.
Il metodo
void afterTransaction(TransactionStatus transactionStatus)
viene invocato dopo che un batch è stato elaborato, ma prima che lo scheduler scheduli il successivo.
In base al valore restituito dal metodo
public boolean isSuccessful()
di TransactionStatus, si può decidere se eseguire il commit o il rollback della transazione sottostante.
Le impostazioni di Transaction Status
Nel TransactionStatus c’è tutta una serie di impostazioni che è possibile utilizzare. È possibile configurare una yielding policy che fornisce allo scheduler un’indicazione su cosa fare quando un batch è terminato, ma altri eventi sono disponibili per il processing.
I valori possibili sono due:
- DO_NOT_YIELD, se si desidera che lo scheduler continui il processing immediatamente finchè sono disponibili nuovi eventi
- YIELD_AFTER_TRANSACTION, che indica allo scheduler di dare la precedenza ad altri thread (qualora disponibili) al termine della transazione corrente.
La prima policy assicura che gli eventi vengano processati il più presto possibile, mentre la seconda garantisce una esecuzione più equa di eventi, in quanto fornisce ai thread in attesa una possibilità di iniziare il processing. È possibile impostare il numero massimo di eventi da gestire all’interno di una transazione utilizzando il metodo
public void setMaxTransactionSize(int maxTransactionSize)
Il valore di default è il numero di eventi pronti per il processing al momento in cui una transazione inizia.
Transazioni non riuscite
Quando un event handler genera un’eccezione, la transazione viene contrassegnata come non riuscita. In tal caso il metodo isSuccessful() restituirà false e il metodo getException() (sempre della classe TransactionStatus) restituirà l’eccezione che lo scheduler ha catturato. È responsabilità dell’Event Listener eseguire il rollback o il commit di ciascuna transazione attiva sottostante, sulla base delle informazioni fornite dai due metodi citati al punto precedente. L’event handler, tramite il metodo
public void setRetryPolicy(RetryPolicy retryPolicy)
di TransactionStatus, è in grado di indicare allo scheduler cosa fare in questo caso. Ci sono tre possibili criteri, ciascuno per uno scenario specifico:
- RETRY_TRANSACTION: indica allo scheduler che l’intera transazione deve essere ripetuta. Quest’ultimo schedulerà nuovamente tutti gli eventi nella transazione corrente per il processing. Questa policy è adatta quando un Event Listener processa eventi verso un data source transazionale che esegue il rollback di un’intera transazione.
- RETRY_LAST_EVENT: indica allo scheduler di ritentare solo l’ultimo evento nella transazione. Questa opzione è adatta se il data source sottostante non supporta le transazioni o se è stato eseguito il commit della transazione senza l’ultimo evento.
- SKIP_FAILED_EVENT: indica allo scheduler di ignorare l’eccezione e continuare il processing dell’evento successivo. L’Event Listener può ancora tentare il commit della transazione sottostante per persistere ogni cambiamento avvenuto durante il processing di altri eventi in questa transazione. Questo è il valore di default.
Quando il criterio scelto impone un nuovo tentativo di processing di un evento, questa azione è ritardata per un numero di millisecondi definiti nella proprietà RetryInterval di TransactionStatus. Il valore predefinito è 5 secondi.
Annotazioni
Come per molte delle altre funzionalità previste da Axon, vi è il supporto delle annotations anche per la gestione delle transazioni. Sono disponibili diverse opzioni. La prima consiste nell’annotare i metodi dell’Event Listener con @BeforeTransaction e @AfterTransaction . Questi metodi verranno invocati rispettivamente prima e dopo l’esecuzione di un batch transazionale. I metodi annotati possono accettare un unico parametro di tipo TransactionStatus che fornisce l’accesso ai dettagli delle transazioni, come lo stato corrente e la configurazione. In alternativa è possibile utilizzare un transaction manager esterno, che viene assegnato a un field dell’Event Listener. Se si annota tale field con @TransactionManager, Axon lo rileverà automaticamente e lo utilizzerà come transaction manager per quel listener. Il transaction manager esterno può essere sia uno che implementa l’interfaccia TransactionManager, o di qualsiasi altro tipo che utilizza le annotations suddette.
Conclusioni
Con questo articolo abbiamo concluso l’analisi dettagliata di tutti i macroblocchi che compongono il framework Axon. Una volta che si hanno ben chiari i concetti di base di questi blocchi, è possibile disegnare e implementare applicazioni web basate su CQRS in maniera piuttosto meccanica, potendosi così focalizzare quasi esclusivamente sulla business logic.
In questa trattazione, per motivi di coerenza e di comprensione del testo, abbiamo fatto sempre riferimento alla release 1.4 di Axon, anche se da qualche mese è stata rilasciata la nuova release stabile 2.0. I concetti base, però, sono sempre gli stessi: tra la 1.4 e la 2.0 ci sono delle differenze soprattutto implementative e volte a facilitare ancora di più il lavoro degli sviluppatori.
Nel prossimo, e ultimo, articolo della serie andremo ad analizzare il codice di una applicazione Axon based, facendo riferimento alla versione più recente.
Riferimenti
[1] Sito ufficiale di Axon framework
[2] Guglielmo Iozzia, “Command Query Responsibility Segregation pattern – I parte: Breve panoramica su CQRS”, MokaByte 177, ottobre 2012
https://www.mokabyte.it/cms/article.run?articleId=7TK-XI4-MRI-VMN_7f000001_13046033_40989f48
[3] Guglielmo Iozzia, “Command Query Responsibility Segregation pattern – II parte: Quando utilizzarlo?”, MokaByte 178, novembre 2012
https://www.mokabyte.it/cms/article.run?articleId=AKD-QEF-VCL-OUP_7f000001_13046033_f3c5af0b
[4] Guglielmo Iozzia, “Command Query Responsibility Segregation pattern – III parte: Introduzione ad Axon Framework”, MokaByte 179, dicembre 2012
https://www.mokabyte.it/cms/article.run?permalink=mb179_CQRS-3
[5] Guglielmo Iozzia, “Command Query Responsibility Segregation pattern – IV parte: Domain Modeling in Axon framework”, MokaByte 181, febbraio 2013
https://www.mokabyte.it/cms/article.run?articleId=F79-K2B-PYN-G2N_7f000001_26089272_9f04d42c
[6] Guglielmo Iozzia, “Command Query Responsibility Segregation pattern – V parte: Repositories ed Event Stores in Axon”, MokaByte 182, marzo 2013
https://www.mokabyte.it/cms/article.run?articleId=8NU-7JY-S5U-KM4_7f000001_26089272_3efb3042