Devoxx 2017 : Retour sur « Reactor 3 et la programmation réactive sur la JVM »

J’ai eu la chance de participer cette année à la 6ème édition du Devoxx France. Je vous propose dans cet article de revenir sur la conférence "Reactor 3 et la programmation réactive sur la JVM" présentée par Simon Baslé qui travaille sur le projet Reactor. J’ai apprécié cette présentation qui plus est, elle est sur un thème dans l'air du temps : la programme réactive.

La programmation réactive

Simon Baslé est dans un premier temps revenu sur les bases de la programmation réactive et sur la raison de l'utilisation de ce paradigme. Le choix de ce type de programmation est de ne pas bloquer le traitement de notre application que ça soit de manière synchrone ou asynchrone. La programmation réactive consiste donc en l'assemblement d’événements asynchrones en utilisant des opérateurs non-bloquants sans dégrader la lisibilité et la maintenabilité du code.

Au niveau de la JVM aujourd'hui, il n'y a que des API qui reposent sur des callbacks ou des futures. Le problème est que les callbacks peuvent amener du code à devenir, très compliqué à maintenir et les futures peuvent facilement être bloquées; par exemple un simple appel à la méthode get est bloquant. Avec la programmation réactive, on ne va pas demander les données mais être notifié de la disponibilité des données. Il prend ensuite pour exemple la comparaison entre les couples Iterable/Iterator et Publisher/Subscriber où le programmeur doit dans un cas faire appel à iterator.next() pour avoir la donnée et dans le cas du subscriber, il sera notifié de l'arrivée de la donnée.

Publisher/Subscriber

Ces interfaces font partie de la spécification Reactive Streams qui sera intégrée au JDK9.

L'interface Subscriber contient trois méthodes : onNext(T), onComplete() et onError(Throwable). La méthode onNext est appelée par le publisher pour notifier l'arrivée d'un nouvel élément, la méthode onComplete est appelée en fin de flux et onError est appelée en cas d'erreur. Les méthodes onComplete et onError sont des méthodes terminales.

Un mécanisme appelé backpressure peut être utilisé par le subscriber pour dire au publisher de ralentir ou accélérer l'envoi de données par exemple si le subscriber reçoit un volume de données trop conséquent par rapport à sa capacité à les traiter.

Reactor 3 va de son côté fournir une API qui implémente l'interface Publisher.

Reactor 3

Reactor 3 est une librairie Spring qui implémente la spécification Reactive Streams.

Les types qui implémentent Publisher sont :

  • Flux<T> : Séquence asynchrone de 0 à n éléments
  • Mono<T> : Séquence avec au plus 1 élément

Une chaîne d'opérateurs peut être associée à un publisher afin d'effectuer différents traitements sur ce flux. Cependant, ce n'est que lorsqu'un subscriber sera inscrit que le publisher commencera à émettre. Pour chaque opérateur, un sous-état du subscriber sera créé et c'est à l'aide de cette chaîne de subscriber que la donnée va se propager.

Exemple :

Si on a un flux de données représenté par la suite 5, 6, 7 et les trois opérateurs suivants :

  • map : qui va ajouter 3 à chaque valeur
  • filter : qui ne va garder que les valeurs paires
  • buffer : qui va regrouper les valeurs par 3
Flux.range(5, 3)
    .map(i -> i + 3)
    .filter(i -> i % 2 == 0)
    .buffer(3) 

Le résultat du flux après le premier opérateur sera 8, 9 et 10; après le second, il sera 8, 10 et enfin en fin de chaîne, le flux sera [8,10].

Schedulers

Au niveau threading, Reactor 3 fournit des schedulers qu'on peut associer à des pools de threads.

Différents schedulers sont fournis par Reactor 3 :

  • Elastic : Pool de threads qui grossit dynamiquement en fonction de la demande
  • Parallel : Plusieurs threads en parallèle pour maximiser l'utilisation du CPU
  • Single : Un seul et unique thread

L'opérateur publishOn bascule le reste des événements du flux sur le scheduler choisi.

L'opérateur subscribeOn bascule les événements qui vont être émis par le flux après l'abonnement à celui-ci.

Exemple d'utilisation des ces opérateurs : Si vous souhaitez isoler un traitement, vous pouvez créer un scheduler dédié et basculer sur ce scheduler à l'aide de l'opérateur publishOn et ensuite rebasculer sur un traitement général avec un autre publishOn.

Tester et débugger dans un monde asynchrone

La difficulté du test et du debug dans ce paradigme de programmation réactive est le fait que tout soit asynchrone avec différents opérateurs et des contextes d'éxécution qui peuvent changer.

Le stepVerifier de Reactor 3 permet de tester un publisher en lui fournissant le flux de données attendues. Le stepVerifier souscrit au publisher testé et compare le flux de données envoyé par celui-ci et celui attendu. Un mécanisme de temps virtuel permet de tester unitairement des publishers qui par exemple envoient des données toutes les heures. Une instruction va, dans ce cas, dire d'attendre une heure, ce qui va avancer l'horloge virtuelle d'une heure.

Le testPublisher permet de simuler une source et va permettre par exemple de tester la réaction de votre chaîne si vous envoyez deux signaux "complete".

Le plus gros problème pour débugger dans ce type de programmation est la difficulté à déchiffrer la stacktrace qui en général indique l'endroit de la souscription comme origine de l'erreur et non ce qui est arrivé dans la suite du traitement. La méthode checkpoint() va permettre de tracer l'ensemble des opérations afin de trouver l'origine d'un problème cependant cette opération reste très coûteuse.

Conclusion

Cette présentation effectuée par Simon Baslé était accessible à tous même sans connaître la programmation réactive. La plus connue des librairies qui implémentent Reactive Streams est RxJava mais il est intéressant de découvrir ce qu'a fait Spring de son côté.

La programmation réactive est un sujet à la mode et très intéressant, je vous invite donc à aller voir les différentes présentations qu'il y a eu durant le Devoxx cette année !

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Captcha *