Dans la première partie, nous vous avions présenté Quarkus, framework dont le fer de lance est la réduction de l’empreinte mémoire et du temps de démarrage des applications. L’objectif étant de vous faire découvrir Quarkus par la pratique, entrons directement dans le vif du sujet ! A la fin de cet article vous saurez récupérer dynamiquement des configurations depuis Consul, et traiter les flux provenant d’un bus MQTT.

FICHE TECHNIQUE

Partie matérielle

Un ESP 32 par module de mesure

DS18B20 pour les capteurs de température

DSD011 pour les capteurs de particules fines

Fils et plaque de prototypage pour les phases de conceptions

Partie logicielle

Quarkus 0.28.1

Arduino 1.8.9

Quelques prérequis avant de commencer…

Déploiement de Consul, Mosquitto et MongoDB

Afin de faciliter le référencement des capteurs, leurs caractéristiques seront externalisées dans une instance Consul que notre projet Quarkus pourra contacter.

L’application devant persister les mesures relevées par les capteurs, d’autres briques techniques que Consul doivent également être déployées. L’objectif de l’article est de détailler les subtilités de Quarkus et non de présenter l’installation du trio Consul / Mosquitto / MongoDB. Aussi nous vous suggérons d’utiliser le fichier docker-compose détaillé ci – après :

version: '3'
services:
  mosquitto:
    image: "eclipse-mosquitto"
    container_name: "poc-quarkus-mosquitto"
    ports:
      - "1883:1883"
      - "9001:9001"
  consul:
    image: "consul"
    container_name: "poc-quarkus-consul"
    ports:
      - "8500:8500"
    entrypoint:
      - consul
      - agent
      - -dev
      - -client=0.0.0.0
      - -bind=0.0.0.0
  mongodb:
    image: mongo
    container_name: "poc-quarkus-mongodb"
    environment:
      MONGO_INITDB_ROOT_USERNAME: "admin-user"
      MONGO_INITDB_ROOT_PASSWORD: "admin-password"
      MONGO_INITDB_DATABASE: "poc-quarkus-db"
    ports:
      - 27017:27017
    volumes:
      - ./db/mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js:ro
volumes:
  db:

 

Ce dernier s’appuie sur un autre fichier permettant d’initialiser la base de données Mongo (et créer les collections utilisées pour la persistances des données) : 

db.auth('admin-user', 'admin-password')

db = db.getSiblingDB('poc-quarkus-db')

db.createUser({
    user: 'quarkus-user',
    pwd: 'quarkus-password',
    roles: [
        {
            role: 'root',
            db: 'admin',
        },
    ],
});

db.createCollection("poc-quarkus-metrics")

Pour notre cas de figure nous avons opté pour un stockage des mesures en base Mongo. S’appuyer sur une solution optimisée pour le stockage de données timeseries aurait été certes plus judicieux, mais Mongo répond à notre besoin. De plus l’un des objectifs principaux de l’article est d’illustrer de quelle façon Mongo et Quarkus peuvent être associés.

Création des capteurs et publication des données

Maintenant que la stack est en place, il nous faut publier des données. Dans cette section nous vous présentons l’assemblage des capteurs et l’envoi des relevés sur Mosquitto. Chaque capteur sera composé d’une carte ESP32 et d’une sonde (de température ou de mesure de particules fines). En règle général ce type de matériel est peu onéreux (comptez une quinzaine d’euros pour un capteur de température, et environ quarante euros pour un dispositif permettant les relevés de particules fines).
Cependant pour ceux d’entre vous qui souhaiteraient simplement tester le projet sans investir dans ce type de matériel, nous mettons a disposition un script Python qui simulera le comportement d’un de ces capteurs en envoyant sur Mosquitto des valeurs aléatoires.
Pour le lancer :

$ python mosquitto-sender.py {SENSOR_NAME} {INTERVAL}

On remplacera simplement {SENSOR_NAME} par le nom du capteur, et {INTERVAL} par la fréquence de publication (en secondes).

Dans la partie suivante nous donnons à titre indicatif la procédure permettant de créer un capteur de température.

Assembler un capteur de température

Les relevés de températures sont assurés par une sonde DS18B20. Le câblage est assez sommaire puisqu’il suffit de raccorder le GND sur la borne ‘-‘ de la sonde, le 5V sur la borne ‘+’ et un des GPIOs (dans notre cas le G27) sur la borne centrale (on pourra optionellement ajouter une résistance 4K7 pour supprimer les parasites).

Un petit schéma pour illustrer mes propos…

… et le rendu réel sur une plaque de prototypage :

Côté code, nous procéderons en trois temps :

  • La connexion au WIFI
void setupWifi() {
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.println("Connecting to WiFi..");
  }
  Serial.println("Connected to the WiFi network");
}
  • La connexion à Mosquitto
WiFiClient espClient;
PubSubClient client(espClient);
...
client.setServer(mqtt_server_address, 1883);
  • Le relevé et l’envoi des données
OneWire oneWire(27);
DallasTemperature tempSensor(&oneWire);
...
tempSensor.requestTemperaturesByIndex(0); float value = tempSensor.getTempCByIndex(0); ... snprintf (msg, 20, "%lf", value); client.publish(topicName, msg);

Le code complet vous est donné ici.

Accéder aux configurations

Nous souhaitons que seules les données issues des capteurs connus soient analysées. Pour cela nous allons utiliser la fonction de stockage de clés/valeurs de Consul pour référencer une structure représentant les topics MQTT “autorisés”.

Un topic MQTT désigne une chaîne UTF-8 utilisée par le broker pour filtrer les messages de chaque client connecté (en quelque sorte une adresse de publication). Dans notre cas, chaque capteur publiera sur un topic qui lui sera dédié.

Format des configurations

MQTT permet de hiérarchiser les topics sur plusieurs niveaux, offrant la possibilité de regrouper les capteurs comme suit :

EMPLACEMENT / TYPE / NOM

Où EMPLACEMENT représente la pièce dans laquelle se trouve le capteur, TYPE le type de capteur (température, particules, …) et NOM le nom du capteur (ex : ilab/thermo/sensor1).

La configuration centralisée dans Consul aura pour clé “topics” et une valeur du type : 

{ 
    "ilab" : { 
      "thermal" : ["sensor1", "sensor2", "sensor4"],
      "particule" : []
    },
    "cds" : {}
}

Connexion à Consul

Pour que notre projet puisse accéder à Consul, nous ajoutons la dépendance suivante :

<dependency>
      <groupId>org.microprofile-ext.config-ext</groupId>
      <artifactId>configsource-consul</artifactId>
      <version>1.0.9</version>
      <scope>runtime</scope>
</dependency>

Puis nous définissons les paramètres de connexions dans application.properties :

configsource.consul.host=localhost:8500
configsource.consul.validity=30

Outre l’adresse et le port de Consul, on précise combien de temps les valeurs seront conservées en cache (en secondes).

Et côté Java ça donne quoi ?

A ce stade, la majeure partie du travail est déjà réalisée  : on ajoutera simplement deux lignes de code aux classes qui devront avoir accès aux configurations stockées dans Consul.

@ConfigProperty(name = "topics", defaultValue = "Config not set")
Provider<String> authorizedTopics;

La première ligne est une annotation permettant de spécifier la clé permettant de retrouver notre configuration (rappelez vous, nous l’avions défini sous le nom de “topics” dans Consul).  Si aucune valeur n’est trouvée, alors la valeur par défaut sera utilisée.

A la seconde ligne, l’utilisation d’un Provider plutôt que d’un simple String comme type pour authorizedTopics permet de bénéficier d’un rafraichissement automatique de la valeur récupérée : configsource.consul.validity ayant été positionné à 30 dans application.properties, le rafraîchissement va s’opérer toutes les 30 secondes. De cette façon, plus besoin de redémarrer l’application en cas de changement dans Consul. 

Collecter les données

Comme abordé dans la partie précédente, les données relevées par les capteurs seront publiées sur un bus MQTT. Nous avions vu dans le premier article que Quarkus reposait sur des briques existantes (Vertx, Hibernate, RestEasy, …) pour proposer un grand nombre d’extensions. Certaines permettent de créer des endpoints Rest, là où d’autres assurent la persistance des données en base. Dans notre cas, nous avons besoins de lire les messages émis sur Mosquitto, et ce de façon réactive et non bloquante. Alors quelle extension utiliser ? Quel module permet de répondre à notre besoin sans que le développement ne devienne un casse tête ?

SmallRye à la rescousse !

SmallRye est une implémentation de MicroProfile, cette initiative qui vise à uniformiser le développement de microservices en s’appuyant sur un sous-ensemble des APIs de JAVA EE (citons CDI pour l’injection de dépendances ou JsonB pour le binding Json).

Quarkus propose bon nombre de modules de SmallRye sous la forme d’extensions et notamment SmallRye Reactive Messaging. Comme son nom l’indique, ce dernier est une implémentation de MicroProfile Reactive Messaging et permet de construire des applications réactives de streaming de données. Une des forces de SmallRye Reactive Messaging est sans doute la facilité avec laquelle il est possible de créer des Consumers et Producers de messages : l’accès aux messages et le traitement de ceux ci se fait au moyen de classes Java annotées via @ApplicationScoped (afin que la classe soit reconnue comme composant à charger au démarrage de l’application).

Les méthodes de ces classes prendront chacune une entrée spécifiée avec l’annotation @Incoming, effectueront un traitement sur la donnée, et fourniront éventuellement une sortie au moyen de @Outgoing.

Source : Documentation officielle de SmallRye

Chaque méthode correspond à une étape du traitement de l’information, le traitement des flux de données revient donc simplement à définir une suite d’entrées / sorties.

Source : Documentation officielle de SmallRye

Du code, du code, du code !

Connexion au broker de messages

La première étape pour écouter les messages nous parvenant de Mosquitto est de spécifier le broker de message à utiliser, et comment s’y connecter.
On commence donc par ajouter les dépendances nécessaires au pom.xml :

<dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-reactive-streams-operators</artifactId>
      <version>${quarkus.version}</version>
</dependency>
<dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-reactive-messaging</artifactId>
      <version>${quarkus.version}</version>
</dependency>
<dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-reactive-messaging-mqtt</artifactId>
      <version>${quarkus.version}</version>
</dependency>

On ajoutera ensuite quelques paramètres au fichier application.properties se trouvant dans le répertoire resources du projet.

mp.messaging.incoming.topic-thermal.connector=smallrye-mqtt
mp.messaging.incoming.topic-thermal.topic=+/thermal/+
mp.messaging.incoming.topic-thermal.host=localhost
mp.messaging.incoming.topic-thermal.port=1883

La ligne 1 permet de préciser le type de broker à utiliser. Dans le cas où nous souhaiterions passer à un autre broker, par exemple Kafka, il suffirait de remplacer smallrye-mqtt par smallrye-kafka (quand nous vous disions que le passage d’une implémentation à l’autre était très simple ;)).

Le mot-clé incoming permet ici de préciser que la configuration s’applique aux flux entrant, là où outgoing s’appliquerait aux flux sortant. Vous en déduirez rapidement qu’il est donc tout à fait envisageable de prendre en entrée les flux issus d’un broker, et d’exposer les données traitées sur un autre broker en sortie. La chaîne de caractères topic-thermal est libre et nous permettra de faire le lien entre notre code java et les messages réceptionnés (nous y reviendrons un peu plus tard).

La ligne 2 spécifie le topic à écouter, et dans notre cas il s’agit d’une wildcard. En outre, tous les messages ayant pour adresse de publication une chaîne contenant “/thermal/” seront pris en compte. Par exemple :

  • ilab/thermal/sensor1 sera écoutée
  • ilab/thermal/sensor2 sera écoutée
  • cds/thermal/sensor3 sera écoutée
  • ilab/co/sensor1 ne sera pas écoutée

Nous aurions pu spécifier un topic bien précis (sans wildcard), mais dans notre cas on souhaite lier l’ensemble des données issues des capteurs de températures à une classe (Java) de traitement unique.

Les lignes 3 et 4 quant à elles détiennent l’adresse et le port à utiliser pour contacter Mosquitto.

Ecouter et traiter les flux

Nous partirons du principe que chaque type de mesure sera traité de façon unique, et disposerons donc d’une classe de processing dédiée.

Ainsi pour les relevés de température nous créerons la classe ThermalProcessor, annotée avec @ApplicationScoped (précisant que cette classe est un composant lié au contexte de l’application).

ThermalProcessor disposera d’une série de méthodes décrivant chacune une étape du traitement de la donnée. L’une d’entre elle devra faire office de point d’entrée en convertissant le message reçu en JsonObject :

@Incoming("topic-thermal")
@Outgoing("converted-message")
public JsonObject convertMessage(MqttMessage<byte[]> message) {
   JsonObject msg = new JsonObject()
        .put("topic", message.getTopic())
        .put("value", Double.valueOf(new String(message.getPayload())));
   message.ack();
   return msg;
}

La première ligne spécifie l’entrée que la méthode s’attend a recevoir. Ici topic-thermal renvoie à la configuration définie à l’étape précédente. La méthode convertMessageToJsonObject traitera donc les messages provenant des topics du type +/thermal/+, et va ensuite émettre les résultats sur une sortie converted-message.

La méthode suivante prendra en entrée ce qui est émis sur converted-message et filtrera les messages en fonction de leur provenance : si un message émis sur +/thermal/+ provient d’une zone non référencée dans Consul, alors il est simplement retiré de la fil et non traité. En revanche les messages envoyés par depuis des zones “autorisées” sont redirigés vers filtered-message-by-area.

@Incoming("converted-message")
@Outgoing("filtered-message")
@Broadcast public PublisherBuilder<JsonObject> filterByArea(JsonObject message) { String[] splittedMessage = message.getString("topic").split("/"); String areaName = splittedMessage[0]; return ReactiveStreams.of(message).filter(s -> Optional.ofNullable(consulAuthorizedTopics.get()) .map(str -> new JsonObject(str)) .filter(config -> config.containsKey(areaName)) .isPresent()); }

On retrouve donc bien l’enchaînement d’étapes dont nous parlions un peu plus haut.

Vous noterez cependant une petite différence avec la première méthode : ici nous ajoutons l’annotation @Broadcast dont le rôle est d’assurer la diffusion des messages à l’ensemble des méthodes annotées avec @Incoming(“filtered-message-by-area”). Ceci est très être utile dans le cas où nous souhaiterions disposer, par exemple, d’une méthode servant les données à un front, et une autre assurant l’enregistrement des messages en base après avoir effectuée quelques post-traitements.

@Incoming("filtered-message")
@Outgoing("thermal-stream")
public String publishValue(JsonObject message) {
    //...
    return new JsonObject()
          .put("area", areaName)
          .put("kind", kindName)
          .put("sensor", sensorName)
          .put("value", temp)
          .toString();
}
//...
@Incoming("filtered-message")
public void saveRoundedValue(JsonObject message) {
    //...
    dbService.create(new JsonObject()
           .put("area", areaName)
           .put("kind", kindName)
           .put("sensor", sensorName)
           .put("value", temp));
}

Ici la méthode publishValue expose les données sur “thermal-stream”, données qui seront par la suite poussées au front grâce aux server-sent events…mais nous verrons cela dans un prochain épisode 😉

Coming soon

Vous savez à présent comment Quarkus gère l’accès aux configurations et utiliser les reactive stream avec SmallRye. Dans le prochain épisode vous découvrirez comment enregistrer en base les valeurs mesurées par les capteurs et réceptionnées par notre application, l’occasion de manipuler le client MongoDB mis à disposition par votre prochain framework préféré !