Kafka connects, l’autoroute des messages

Kafka est un système open-source de messagerie développé chez LinkedIn en 2009 et maintenu depuis 2012 par la fondation Apache. Kafka permet de gérer des flux de messages entre des producteurs de messages et les consommateurs sur des gros volumes de données avec une faible latence. Apparu avec la version 0.9 de Kafka, Kafka Connect est un framework permettant de lire et d'écrire des données depuis/vers une source de données externe (exemple : HDFS, FileSystem, JDBC ...).

Ce framework est directement inclus dans Kafka. Dans le cas d'un Kafka Connect de type "Source" ce dernier va lire une source de données pour ensuite écrire les données dans un topic Kafka. Pour un Kafka Connect de type "Sink" il va lire les données d'un ou plusieurs topics, suivant l'implémentation du connecteur, pour ensuite écrire dans le source de données (exemple : FileSystem, Elasticsearch, JDBC...). De base Kafka propose le connecteur FileSystem qui est à la fois "source" et "sink".

Installation et démarrage.

En pré requis de l'installation il est nécessaire d'avoir un JDK 1.8 d'installé. Kafka peut s'installer via la page de téléchargement d'Apache .

Une fois que vous aurez récupéré l'archive et décompressée cette dernière, pour démarrer Kafka il vous suffit de lancer deux commandes :

  • une pour exécuter Zookeeper
  • une pour Kafka.

Pour information Zookeeper est un logiciel opensource de gestion de configuration pour systèmes distribués.

#pour les environnements Linux
./bin/zookeeper-server-start ./config/zookeeper.properties

./bin/zookeeper-server-start ./config/zookeeper.properties

#pour les environnements Windows
cd bin\windows
zookeeper-server-start.bat ..\..\config/zookeeper.properties

kafka-server-start.bat ..\..\config\server.properties

A noter que zookeeper et kafka sont à démarrer dans des consoles différentes.

Pour vérifier que notre Kafka fonctionne correctement vous allez devoir ouvrir deux autres consoles :

  • une qui va envoyer un message dans un "topic" Kafka
  • une console qui va lire les messages du topic.
#création du topic de "test"
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

#envoyer des messages au topc "test" il suffit de saisir du texte dans la console, à chaque nouvelle ligne le message est envoyé
kafka-console-producer --broker-list localhost:9092 --topic test
Hello topic "test"
Console 1 : envoi des message dans le topic
#lecture du topic "test", à chaque nouveau message envoyé le résultat s'affiche dans cette console
kafka-console-consumer --bootstrap-server localhost:9092 --topic test
Console 1 : envoi des message dans le topic

Les connecteurs

Connecteur de type Sink : Elasticsearch

Plus haut j'ai précisé que le framework Kafka connect est directement inclus dans Kafka, mais pour les besoins de cet article on va récupérer les sources du connecteur pour Elasticsearch via le github du projet (j'ai fait pointer le lien vers la version release 3.2.1). Pour avoir une liste assez étoffée de connecteurs Kafka vous pouvez consulter cette page sur le site de Confluent, qui fourni une plateforme open source incluant Kafka et un certain nombre de connecteurs.

Une fois les sources du connecteur récupérées, il vous suffit de les packager avec Maven

#je désactive les tests qui nécessitent que vous ayez un serveur Elasticssearch qui tourne en local
mvn package -DskipTests

Une fois le packaging du connecteur Kafka pour Elasticsearch terminé vous devez copier tous les jars du dossier "kafka-connect-elasticsearch-3.2.1\target\kafka-connect-elasticsearch-3.2.1-package\share\java\kafka-connect-elasticsearch" dans le dossier des librairies de votre serveur Kafka qui doit se nommer "libs".

On va également créer un fichier de configuration minimum pour le connecteur qui indiquera quel topic écouter et vers quel index Elasticsearch écrire.

name=elasticsearch-connect-test
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=els-topic
key.ignore=true
connection.url=http://localhost:9200
type.name=els-connect
schema.ignore=true

Pour détailler un peu la configuration :

  • name : le nom de notre connecteur
  • connector.class : la classe principale du connecteur
  • tasks.max : Le nombre maximum de tâches que le connecteur pourra créer en parallèles pour traiter les topics
  • topics : noms des topics que le connecteurs devra écouter, les noms doivent être séparés par une virgule
  • key.ignore : quand il est à vrai, le connecteur génère automatiquement une clé composée de cette forme NOM_TOPIC+ID_TACHE+OFFSET_MESSAGE, si vous passez la valeur à false, vos messages dans les topics devront avoir une clé renseignée.
  • connection.url : url de connexion vers le serveur Elastisearch
  • type.name : nom du type Elasticsearch lors de l'indexation
  • schema.ignore : avec la valeur à "true"on demande à Elasticsearch de déduire le mapping de l'index en fonction des messages envoyés

L'ensemble des options de configuration est détaillé dans la documentation du connecteur.

On va nommer ce fichier "elasticsearch-connect.properties" que l'on va sauvegarder dans le dossier "config" de notre serveur Kafka. Comme nous allons envoyer des messages directement depuis le producer console, il est nécessaire de modifier le fichier de configuration du serveur Kafka "connect-syandalone.properties" dans le dossier "config" comme ceci :

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

Ici on désactive la conversion des clés et des messages des topics pour ne pas avoir de problèmes de conversion au moment où nos messages seront traités par le connecteur.

Puis redémarrer le serveur Kafka pour que les librairies et les fichiers de configuration soient pris en compte.

Pour tester notre connecteur Elastisearch, il vous faut un serveur Elasticsearch démarré. Pour ma part, j'ai fait le test avec une version 2.4.4. Nous allons créer un nouveau topic pour nos tests :

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic els-topic
Console 1 : envoi des message dans le topic

Dans une console nous allons démarrer notre connecteur en se positionnant dans le dossier "bin" du serveur Kafka

#lecture du topic "test", à chaque nouveau message envoyé le résultat s'affiche dans cette console
connect-standalone ..\..\config\connect-standalone.properties ..\..\config\elasticsearch-connect.properties
Console 1 : envoi des message dans le topic

Pour vérifier que votre connecteur est bien démarré vous pouvez faire appel à l'API Rest du framework connect (je parlerai de l'API Rest un peu plus loin) via cette url http://localhost:8083/connectors/elasticsearch-connect-test/status et vous allez avoir une réponse JSON similaire à cela :

{
	"name": "elasticsearch-connect-test",
	"connector": {
		"state": "RUNNING",
		"worker_id": "127.0.0.1:8083"
	},
	"tasks": [{
		"state": "RUNNING",
		"id": 0,
		"worker_id": "127.0.0.1:8083"
	}]
}

Maintenant nous allons pouvoir envoyer des messages dans notre topic et constater qu'ils sont incrits dans notre index Elasticsearch :

kafka-console-producer --broker-list localhost:9092 --topic els-topic
{"message":"Test log", "severity": "INFO"}
{"message":"Error service order", "severity": "ERROR"}

Et quand vous faites appel à cette url http://localhost:9200/els-topic/_search?pretty vous obtenez cette réponse :

{
  "took" : 15,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "els-topic",
      "_type" : "els-connect",
      "_id" : "els-topic+0+1",
      "_score" : 1.0,
      "_source" : {
        "severity" : "ERROR",
        "message" : "Error service order"
      }
    }, {
      "_index" : "els-topic",
      "_type" : "els-connect",
      "_id" : "els-topic+0+0",
      "_score" : 1.0,
      "_source" : {
        "severity" : "INFO",
        "message" : "Test log"
      }
    } ]
  }
}

Voilà c'est assez simple à mettre en place et cela vous permet d'alimenter un index Elasticsearch avec les messages qui transitent par Kafka.

Connecteur de type Source : FileStream

Deuxième exemple de connecteur Kafka : FileStream. Il a la particularité d'exister dans les deux versions "source" et "skin", pour la lecture et l'écriture dans un fichier. Pour pouvoir tester ce connecteur vous allez devoir arrêter le connecteur Elasticsearch. Et là je vous entends déjà vous exclamer "c'est quoi ce framework en carton qui ne permet de lancer qu'un seul connecteur à la fois!" Pas de panique, c'est uniquement parce que nous démarrons les connecteurs en mode standalone, donc si vous exécutez 2 connecteurs en standalone avec la configuration par défaut, les deux vont démarrer sur le port 8083 et boom! On va voir dans le chapitre sur l'API Rest comment démarrer nos connecteurs en mode "distribué" pour ne plus avoir de problème.

Commençons par créer un topic de test :

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic file-topic
Console 1 : envoi des message dans le topic

Puis à lancer un consommateur qui va écouter ce topic :

kafka-console-consumer --bootstrap-server localhost:9092 --topic file-topic
Console 1 : envoi des message dans le topic

Puis dans le dossier "config" de votre serveur Kafka on ajoute un nouveau fichier de configuration que l'on nomme "file-source-connect.properties" dans le quel vous donnez le configuration du connecteur

name=local-file-connect-test
connector.class=FileStreamSource
tasks.max=1
file=/tmp/log.txt
topic=file-topic

Au passage vous remarquerez que je n'ai donné que le nom de la classe du connecteur, ce connecteur est natif à Kafka.

On crée un fichier vide "log.txt" dans le dossier "tmp" de votre environnement et on lance notre connecteur :

connect-standalone ..\..\config\connect-standalone.properties ..\..\config\file-source-connect.properties
Console 1 : envoi des message dans le topic

Maintenant à chaque fois que vous allez modifier votre fichier "log.txt" le connecteur va écrire de nouveaux messages dans le topic.

Prochaine étape : réussir à faire parler tout ce petit monde.

L'API Rest

Je vous ai parlé tout à l'heure de l'API Rest des connecteurs, qui permet entre autre de savoir si un connecteur est démarré ou non. Cette API ne se limite pas à cela (exemple : CRUD sur les connecteurs, mise en pause). Dans la documentation de Confluent vous avez la liste complète des ressources ainsi que des exemples d'utilisation de cette API. Nous allons voir comment, grâce au mode "distribué", configurer des connecteurs via cette API.

En premier lieu, stoppez votre connecteur "FileStreamSource" pour ne pas avoir de conflit de port. Puis exécuter le service "connect" en mode "distribué" comme ceci :

connect-distributed ../../config/connect-distributed.properties
Console 1 : envoi des message dans le topic

Pour vérifier que le service est bien démarré, vous pouvez appeler cette url http://localhost:8083/ vous allez avoir une réponse comme celle-ci :

{"version":"0.10.2.1","commit":"e89bffd6b2eff799"}

Pour le moment nous n'avons aucun connecteur de défini, vous pouvez le vérifier avec cette url http://localhost:8083/connectors qui va vous retourner un tableau Javascript vide.

Pour créer notre connecteur FileStreamSource vous devez faire cet appel en CURL ou via Postman par exemple :

curl -X POST \
  http://localhost:8083/connectors \
  -H 'content-type: application/json' \
  -d '{
    "name": "local-file-connect-test",                                                                   
    "config": {
        "connector.class":"FileStreamSource",  
        "tasks.max":"1",                                                                    
        "topic":"els-topic",                                                                    
        "file":"/tmp/log.txt"                                                                  
    }
}'

Et maintenant vous pouvez revoir la configuration de votre connecteur via cette url http://localhost:8083/connectors/local-file-connect-test  et voir s'il est bien démarré avec cette url http://localhost:8083/connectors/local-file-connect-test/status . Notre connecteur va donc lire le contenu du fichier "/tmp/log.txt" et écrire les messages dans le topic "els-topic".

Pour terminer, on crée notre connecteur Elasticsearch qui ira lire le contenu du topic "els-topic" pour aller écrire dans un index Elasticsearch via cet appel à l'API :

curl -X POST \
  http://localhost:8083/connectors \
  -H 'content-type: application/json' \
  -d '{
    "name": "elasticsearch-connect-test",                                                                   
    "config": {
        "connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",  
        "tasks.max":"1",                                                                    
        "topics":"els-topic",                                                                          
        "connection.url":"http://localhost:9200",
        "type.name":"els-connect",
        "key.ignore":"true",
        "schema.ignore":"true"                                                                  
    }
}'

Nous avons nos deux connecteurs qui s'exécutent en même temps et un flux complet de lecteur de fichier dont le contenu est déversé dans Kafka pour ensuite être envoyer vers un index Elasticsearch.

Avec l'API vous pouvez créer autant de connecteur que vous le souhaitez à partir du moment où les librairies des connecteurs sont chargées dans le contexte de votre serveur Kakfka.

Conclusion

Les connecteurs dans Kafka sont une fonctionnalité très intéressante notamment dans le cadre d'une plateforme de centralisation de données. D'une part parce qu'il est possible d'adresser un grand nombre de sources de données, même si le connecteur de votre source cible n'existe pas, il est très simple de le créer. Dans la documentation de Kafka mais également dans celle de Confluent, vous retrouverez tout une partie sur la structure des connecteurs et comment créer vos propres connecteurs.

D'autre part Kafka et ses connecteurs sont destinés à traiter de grands volumes de données. Vous pouvez même allez un peu plus  loin en ajoutant à votre pipeline de traitement de données des streams "Kafka streams" vous pourrez alors avoir des logiques d'agrégations dans vos flux.

N'hésitez surtout pas à apporter des précisions ou des corrections à cet article, elles seront bien sûr les bienvenues.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Captcha *