Introduction à Spring Integration

Cet article a pour objectif d’introduire Spring Integration. Dans un premier temps, j’y décrirai les différents concepts inhérents à Spring Integration, puis nous verrons un exemple basique d’intégration dans une application.

Spring integration est, comme son nom l’indique, un projet du framework Spring. Il respecte donc tous les principes de Spring dont voici les plus importants : la séparation des préoccupations (separation of concerns), l’injection de dépendances (dependency injection) ou le couplage lâche (loose coupling). En revanche, Spring Integration permet de pousser ces principes encore plus loin en permettant de faire communiquer facilement des beans Spring de manière asynchrone et indépendante via un système de messaging, sans qu’ils aient besoin de se connaître mutuellement.

De plus, Spring Integration fournit également des outils pour communiquer avec des systèmes externes (JMS, RabbitMQ, etc).

Spring Integration est une implémentation des Enterprise Integrations Patterns. Il est construit autour du modèle pipes-and-filters. Les pipes sont n’importe quel composant capable de transporter les messages alors que les filters sont ceux capables de produire ou consommer des messages.

Les composants principaux

Message

Un Message n’est autre qu’un wrapper pour n’importe quel POJO (le payload) associé à des metadata (les headers). Les headers sont par exemple l’id, le timestamp, l’id de corrélation, et une adresse de retour. Mais ils peuvent également être utilisés pour passer des informations entre les différents composants.

Message Channel

Un Message Channel permet de transporter les messages, il représente donc le « pipe » du modèle « pipes-and-filters ». Les producteurs de messages envoient leurs messages dans le channel alors que les consommateurs vont les lire. Un channel peut être « point à point » ou de type « publish / subscribe ». Avec un Channel « point à point », un seul consommateur recevra un message envoyé sur le Channel alors que tous les consommateurs recevront le même message si le Channel est de type « publish / subscribe ».

Message Endpoint

Les Messages Endpoint permettent de connecter notre application au système de messaging. Ils représentent les « filters » du modèle « pipes-and-filters ». Ils permettent à notre application d’envoyer des messages sans qu’elle ait connaissance du système de messaging sur lequel elle s’appuie. Voici les principaux types de Message Endpoint supportés nativement :

  • Transformer : permet de transformer un message
  • Filter : permet de filtrer des messages entrants et renvoyer sur un channel de sortie uniquement certains messages
  • Router : permet de rediriger des messages vers un channel ou un autre suivant certains critères
  • Splitter : permet de découper un message en plusieurs autres
  • Aggregator : permet de regrouper plusieurs message en un seul
  • Service Activator : permet d'exécuter une méthode d’un service à la réception d’un message, et éventuellement d’envoyer une réponse sur un channel de réponse
  • Channel Adapter : permet de connecter un channel à un autre système. Il peut s’agir d’une connexion entrante (Inbound Channel Adapter) ou sortante (Outbound Channel Adapter)

Exemple par le code

Passons maintenant à un exemple concret pour mieux comprendre son utilisation. Imaginons donc une application Spring Boot qui recevrait des métriques de la part d’objets connectés. Je ne détaillerai pas ce qui touche à Spring Boot dans les exemples suivants.

L’application de test est basique, elle expose un endpoint POST sur /metric pour recevoir des métriques, qui se composent pour commencer simplement d’une date et d’une valeur. Les métriques ne sont pas persistées, elles sont juste envoyées sur un Channel Spring Integration afin de pouvoir leur appliquer quelques traitements.

 

public class Metric {
    public Date datetime;
    public int value;

    [… getters and setters ...]
}
Metric.java

 

@RestController
public class MetricsController {
    @Autowired
    MetricSenderService metricService;
    
       @PostMapping(path = "/metric")
    void metric(@RequestBody Metric metric) {
        metricService.send(metric);
    }
}
MetricsController.java
@SpringBootApplication
@Configuration
@ImportResource({"classpath*:*-context.xml"})
public class Application {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(Application.class, args);
    }
}
Application.java

On remarque que la classe Application est annotée @ImportResource afin de charger le fichier de configuration Spring qui définira l’ensemble de la configuration Spring Integration. J’ai préféré le choix de configuration XML pour cet exemple afin d’avoir un endroit centralisé pour montrer les possibilités offertes par Spring Integration. Il aurait été également possible de concevoir la même application de test via les annotations Spring Integration.

Messaging Gateway et Service Activator

Lors de la réception d’une requête POST avec une représentation JSON d’un objet Metric, un message est envoyé, l’objet Metric étant son payload. Pour cela, l’interface MetricSenderService est déclarée comme une Messaging Gateway.

 

<int:gateway service-interface="org.boudet.spring.integration.sample.service.MetricSenderService">
    <int:method name="send" request-channel="metrics.in"/>
</int:gateway>
integration-context.xml

Ainsi, tous les appels à la méthode send permettent de transformer l’objet Metric en un message envoyé sur le Channel metrics.in.

Maintenant que nous avons de quoi créer des messages sur un Channel, il nous reste à créer un consommateur. Ce consommateur n’aura pour seul but que de logguer la réception d’un message. Pour cela, il suffit de déclarer un bean ServiceActivator :

<int:service-activator input-channel="metrics.in" method="receive">
    <bean class="org.boudet.spring.integration.sample.service.SimpleMetricReceiverService"/>
</int:service-activator>
integration-context.xml

A chaque message reçu sur le Channel metrics.in, la méthode receive du bean SimpleMetricReceiverService sera exécutée, avec le message en paramètre.

@Service
public class SimpleMetricReceiverService implements MetricReceiverService {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Override
    public void receive(Metric metric) {
        logger.debug("receiving metric : " + metric);
    }
}
SimpleMetricReceiverService.java

Nous avons maintenant un système simple de production et de consommation de messages. Nous pouvons le tester:

curl -H 'Content-Type:application/json' -XPOST http://localhost:8080/metric -d '{"datetime" : "2016-12-19T23:00:00", "value":"10"}'
2016-12-21 17:12:47.791 DEBUG 30111 --- [nio-8080-exec-2] o.b.s.i.s.s.SimpleMetricReceiverService  : receiving metric : Metric{datetime=Tue Dec 20 00:00:00 CET 2016, value=10}

Le code de cet exemple est disponible ici.

Logging Handler

Pour le moment, nous avons vu comment lire un message que nous logguons depuis le Service Activator.

Nous pouvons utiliser directement un Logging Handler afin de logguer l’intégralité dans messages produits sur un Channel. Pour cela nous allons enregistrer un Interceptor de type Wire-Tap sur notre Channel qui enverra à son tour les messages reçu au composant Channel Logging Adapter qui effectuera l’écriture dans le log.

<int:channel id="metrics.in">
    <int:interceptors>
        <int:wire-tap channel="logger"  />
    </int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG" />
integration-context.xml

Pour tester, le code se trouve ici.

Message Filter

Admettons maintenant que nous voulions filtrer certains messages pour éviter de les traiter. Par exemple, dans notre cas, les messages dans le futur qui correspondent forcément à des erreurs. Pour cela, nous allons déclarer un Filter. Le Filter permet de supprimer à la volée les messages qui ne respectent pas un critère et éventuellement de le déposer dans un autre Channel. Par défaut, les messages sont supprimés.

Un Filter étant une implémentation de MessageSelector, nous pouvons déclarer nos beans comme suit:

<int:filter input-channel="metrics.in" ref="messageFilter" output-channel="metrics.filtered"/>
<bean id="messageFilter" class="org.boudet.spring.integration.sample.filter.MessageFilterImpl"/>
integration-context.xml

Les messages arrivant sur le Channel metrics.in seront testés via la méthode accept(Message<?> message) du bean MessageFilterImpl. Si la méthode renvoit false, le message est supprimé. Dans notre exemple, les messages non filtrés sont déposés dans le Channel metrics.filtered. Il aurait été également possible de déposer les messages invalides dans un autre Channel en positionnant l’attribut discard-channel. Il est également possible de lever une exception lorsque un message est invalide, avec l’attribut throw-exception-on-rejection.

Afin que le ServiceActivator ne consomme que les messages filtrés, il est nécessaire de changer son Channel d’écoute :

<int:service-activator input-channel="metrics.filtered" method="receive">
    <bean class="org.boudet.spring.integration.sample.service.SimpleMetricReceiverService"/>
</int:service-activator>
integration-context.xml

Pour ce filtre, l’implémentation de l’interface MessageSelector est très simple :

public class MessageFilterImpl implements MessageSelector {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Override
    public boolean accept(Message<?> message) {
        if (message.getPayload().getClass() != Metric.class) {
            return false;
        }
        Metric metric = (Metric) message.getPayload();
        return metric.getDatetime().before(new Date());
    }
}
MessageFilterImpl.java

Attention, ne pas confondre ici Filter qui permet, littéralement, de filtrer des messages avec le terme Filter dans Pipes-and-Filters qui désignent dans ce cas n’importe quel composant capable de produire ou consommer des messages.

Pour tester, le code se trouve ici.

Commençons avec une date passée :

Notre message est effectivement remonté jusqu’au ServiceActivator.

Alors qu’avec une date dans le futur, notre ServiceActivator n’a pas été exécuté, rien n'apparaît dans le log.

Pour des cas simples comme celui-ci, il est également possible de se passer de l’implémentation de MessageSelector et d’utiliser directement une expression SPeL pour effectuer le filtrage :

<int:filter input-channel="metrics.in" expression="payload.datetime.before(new java.util.Date())"
            output-channel="metrics.filtered"
            throw-exception-on-rejection="true"
/>
integration-context.xml

A tester avec le code ici.

Router

Pour les cas un peu plus complexes où les messages ne doivent pas être tous traités de la même manière, nous avons à disposition le Router dont le but est de dispatcher des messages sur différents Channels en fonction de critères avant d’être consommés.

Pour notre cas, imaginons que nous recevons désormais deux types de métriques, temperature et humidity, que nous voulons traiter différemment. Voici comment l’on pourrait procéder :

<int:service-activator input-channel="metrics.temperature" method="receive">
    <bean class="org.boudet.spring.integration.sample.service.TemperatureMetricReceiverService"/>
</int:service-activator>
<int:service-activator input-channel="metrics.humidity" method="receive">
    <bean class="org.boudet.spring.integration.sample.service.HumidityMetricReceiverService"/>
</int:service-activator>
<int:router input-channel="metrics.filtered" expression="payload.type">
    <int:mapping value="temperature" channel="metrics.temperature"/>
    <int:mapping value="humidity" channel="metrics.humidity"/>
</int:router>
integration-context.xml

Nous avons désormais deux ServiceActivator (un pour les températures et un pour l’humidité) qui consomment les messages depuis deux Channels distincts (metrics.humidity et metrics.temperature). C’est le Router qui s’occupe de dispatcher les messages arrivant sur le channel metrics.filtered vers les Channels temperature et humidity en fonction du nouvel attribut type de l’objet Metric.

Voici par exemple à quoi ressemble le Service Activator correspondant au type humidity.

@Service
public class HumidityMetricReceiverService implements MetricReceiverService {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Override
    public void receive(Metric metric) {
        logger.debug("received a humidity metric");
    }
}
HumidityMetricReceiverService.java

Comme d’habitude, voici le code.

Et il suffit d’exécuter le test comme ceci :

Puis essayons avec un autre type :

AMQP Gateway

Pour ce dernier exemple, je vous propose de réaliser une passerelle vers une file de message RabbitMQ. Spring Integration offre le support natif d'AMQP, il suffit d'ajouter la dépendance spring-integration-amqp à notre projet pour en profiter :

 dependencies {
     compile 'org.springframework.boot:spring-boot-starter-web:1.4.2.RELEASE'
     compile 'org.springframework.integration:spring-integration-core:4.3.5.RELEASE'
     compile 'org.springframework.integration:spring-integration-amqp:4.3.5.RELEASE'
     testCompile group: 'junit', name: 'junit', version: '4.11'
 }
build.gradle

Il suffit ensuite d'adapter un peu notre fichier de configuration de Spring Integration afin de définir un Outbound Channel Adapter pour écrire les messages dans une file et un Inbound Channel Adapter pour lire depuis une file.

Pour cet exemple, il y a également quelques adaptations :

  • définition d'un nouveau Channel metrics.from-rabbit qui recueille les messages en provenance de RabbitMQ
  • changement de l'input-channel du Router pour utiliser le nouveau Channel metrics.from-rabbit
  • définition d'une RabbitConnectionFactory pour configurer les paramètres de connexion à RabbitMQ
  • définition d'une Queue RabbitMQ nommée metrics.queue. Celle-ci sera créée automatiquement.
  • définition d'un Exchange RabbitMQ pour effectuer le routage de nos messages vers la file correspondante au routing-key définie par l'Outbound Channel Adapter
  • définition d'un template AMQP
  • Et enfin, la classe Metric (la classe de nos messages) doit être modifiée pour implémenter Serializable
    <int:channel id="metrics.from-rabbit">
        <int:interceptors>
            <int:wire-tap channel="logger"/>
        </int:interceptors>
    </int:channel>

    <int:router input-channel="metrics.from-rabbit" expression="payload.type">
        <int:mapping value="temperature" channel="metrics.temperature"/>
        <int:mapping value="humidity" channel="metrics.humidity"/>
    </int:router>


    <int-amqp:outbound-channel-adapter
            channel="metrics.filtered"
            exchange-name="metrics.exchange"
            routing-key="create-metric"
            amqp-template="amqpTemplate"
    />

    <int-amqp:inbound-channel-adapter
            channel="metrics.from-rabbit"
            queue-names="metrics.queue"
            connection-factory="connectionFactory"
            concurrent-consumers="10"
    />

    <rabbit:connection-factory id="connectionFactory" host="localhost" />

    <rabbit:template
            id="amqpTemplate"
            connection-factory="connectionFactory"
    />

    <rabbit:admin connection-factory="connectionFactory" />

    <rabbit:queue name="metrics.queue" />

    <rabbit:direct-exchange name="metrics.exchange">
        <rabbit:bindings>
            <rabbit:binding queue="metrics.queue" key="create-metric"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
integration-context.xml

Lorsque notre application est démarrée, on peut remarquer que la file RabbitMQ a été créée avec la commande suivante :

rabbitmqctl list_queues

J'ai positionné un Thread.sleep() dans la classe HumidityMetricReceiverService qui traite les messages de type humidity afin d'avoir le temps de visualiser les messages rentrer et sortir de la file RabbitMQ. Il suffit donc de faire des appels en boucle et de ré-exécuter la commande précédente pour voir que les messages arrivent bien dans la file.

while true;  do curl -H 'Content-Type:application/json' -XPOST http://localhost:8080/metric -d '{"type":"humidity", "datetime" : "2016-12-20T23:00:00", "value":"10"}'; done

Vous aurez peut-être remarqué que j'ai placé l'option concurrent-consumers="10" sur l'Inbound Channel Adapter. Elle permet tout simplement de lire 10 messages à la fois.

Conclusion

Comme nous avons pu le voir, il est très simple d'intégrer un système de messaging à une application Spring, et ceci avec presque uniquement que de la configuration ! Il est très pratique d'utiliser ce genre de mécanisme pour créer des traitements asynchrones, d'autant que l'intégration de systèmes externes comme RabbitMQ permettent de se passer de gérer la persistence des messages.

Chaque exemple de cet article montre les traitements de base de Spring Integration, la référence permet de découvrir toutes les possibilités de chaque composant, ainsi que tous les composants qui n'ont pas été (encore) couvert !

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Captcha *