Il y a quelques mois, nous vous présentions NATS, un système de messaging open source. Son écosystème avait été parcouru dans les grandes lignes et notamment NATS Streaming, une surcouche de persistance et de re-jeu d’envoi de messages. Cette dernière solution, bien que très pratique a tout de mêmes quelques limites. Nous vous proposons dans cet article de découvrir une alternative à NATS Streaming, et cette fois-ci au travers d’un cas d’utilisation pratique et complet.

Un autre moteur de persistance pour NATS ?

Mais avant de soulever le capot et de mettre les mains dans le cambouis, il est toujours intéressant d’étudier les outils que nous avons à disposition. L’article précédent présentait NATS et la surcouche NATS Streaming, dont le but était d’apporter un certain nombre de mécanismes visant à permettre de persister les messages, mais aussi de les rejouer en cas de besoin. Mais alors pourquoi ne pas s’appuyer sur NATS Streaming pour cet article ? Tout simplement parce que Nats Streaming a quelques limites. Limites balayées par JetStream ! En effet, cette dernière solution comble certaines lacunes de NATS Streaming et notamment :

  • la scalabilité horizontale
  • l’ajout d’un mode “pull” pour les clients. Avec NATS Streaming les messages sont uniquement “pushed” (Jetstream supporte les deux modes)
  • le support des wildcards lors de la création de “subjects” et de la souscription aux channels (chose impossible avec NATS Streaming)
  • le support des “NACK” (messages non acquittés)
  • la gestion fine des restrictions d’accès des clients et de leur abonnement aux “subjects”
  • la simplification de mise en œuvre de clusters (l’intégration avec les dernières versions de NATS est relativement complexe avec NATS Streaming, notamment lors des phases de sécurisation)

Autant de raisons de passer directement à JetStream ! Voyons maintenant la mise en oeuvre.

Les deux mains dans le cambouis

L’objectif de cet article (et du prochain) est donc d’illustrer le fonctionnement de NATS et JetStream. Pour cela, nous prendrons un cas pratique consistant à collecter, agréger et stocker des news issues de différentes sources (API Rest de Twitter, flux RSS du site developpez.com …).

En outre voici un schéma global du projet :

La publication, le traitement de messages en Golang, le déploiement d’un cluster, la sécurisation, le monitoring… nous allons balayer large 😉

En outre :

  • Les composants ProgrammezExporter et TwitterExporter interrogeront les flux RSS du site www.programmez.com et l’API de Twitter.
  • Les informations obtenues seront poussées dans deux subjects (SOURCE.programmez et SOURCE.twitter), subjects liés à deux consumers via le stream SOURCE.
  • Deux consumers (TwitterConsumer et ProgrammezConsumer) écoutent les messages et “nettoient” les données avant de les republier sur un subject NEWS.received (exposé via un second stream NEWS).
  • Enfin le consumer NewsCreator pull les messages de NEWS.received afin de les persister en base, puis de les republier sur NEWS.processed (les messages seront ici “push” à NewsMonitor). Ce dernier composant s’appuie sur NATS Surveyor qui exposera des metrics à Prometheus (et qui pourront donc être exploitées dans un board Grafana).

Création du cluster

Quelques noeuds avec Docker Compose

La mise en œuvre d’un cluster est on ne peut plus simple avec Docker Compose. En effet, Synadia nous met à disposition une image Docker embarquant NATS et tout ce dont nous avons besoin. Le fichier docker-compose.yml suivant décrit comment démarrer un cluster constitué de trois nœuds :

version: '3'
services:
  n1:
    container_name: n1
    image: synadia/jsm:nightly
    entrypoint: /nats-server
    command: "--config /config/jetstream.conf --server_name S1"
    networks:
      - nats
    ports:
      - 4222:4222
    volumes:
      - ./config:/config
      - ./persistent-data/server-n1/:/data/nats-server/jetstream

  n2:
    container_name: n2
    image: synadia/jsm:nightly
    entrypoint: /nats-server
    command: "--config /config/jetstream.conf --server_name S2"
    networks:
      - nats
    ports:
      - 4223:4222
    volumes:
      - ./config:/config
      - ./persistent-data/server-n2/:/data/nats-server/jetstream

  n3:
    container_name: n3
    image: synadia/jsm:nightly
    entrypoint: /nats-server
    command: "--config /config/jetstream.conf --server_name S3"
    networks:
      - nats
    ports:
      - 4224:4222
    volumes:
      - ./config:/config
      - ./persistent-data/server-n3/:/data/nats-server/jetstream

networks:
  nats: {}

Comme vous pourrez le voir, chaque conteneur démarrera en se basant sur un fichier jetstream.conf. Ce fichier contient l’ensemble des configurations couvrant les aspects de sécurité, mais permet également de gérer la création de Stream et les autorisations de souscription.

Créer les Stream et création des subjects

Le fichier jetstream.conf permet divers paramétrages et en particulier la gestion des autorisations et création des “subjects”. On commence donc par initialiser les routes du cluster et spécifier le répertoire dans lequel les données seront persistées :

debug: true
trace: false
port: 4222
# Definition of the persistance directory
jetstream = {
  store_dir: "/data/nats-server/"
}

cluster = {
  name: "JSC"
  listen: "0.0.0.0:4245"
  # Routes specification
  routes = [
    "nats://n1:4245"
    "nats://n2:4245"
    "nats://n3:4245"
  ]
   authorization = {
    user: "server@localhost"
  }
}

...

Vient ensuite la déclaration des autorisations. Dans notre cas, les clients ne pourront se connecter qu’a certains subjects, la liste étant spécifiée comme suit :

# Client authorization
authorization = {
  users = [
    {
      user: "client@localhost"
      permissions = {
        publish = [
          "$JS.API.INFO"
          "$JS.API.STREAM.NAMES"
          "$JS.API.CONSUMER.NAMES.*"

          "$JS.ACK.SOURCES.>"
          "$JS.API.STREAM.CREATE.SOURCES"
          "$JS.API.STREAM.INFO.SOURCES"
          "$JS.API.CONSUMER.DURABLE.CREATE.SOURCES.*"
          "$JS.API.CONSUMER.MSG.NEXT.SOURCES.*"
          "$JS.API.CONSUMER.INFO.SOURCES.*"

          "$JS.ACK.NEWS.>"
          "$JS.API.STREAM.CREATE.NEWS"
          "$JS.API.STREAM.INFO.NEWS"
          "$JS.API.CONSUMER.DURABLE.CREATE.NEWS.*"
          "$JS.API.CONSUMER.MSG.NEXT.NEWS.*"
          "$JS.API.CONSUMER.INFO.NEWS.*"

          "NEWS.received"
          "SOURCES.twitter"
          "SOURCES.programmez"
        ]
      }
    }
  ]
}

Ici les instructions $JS.API.STREAM.CREATE permettent de préciser que le client client@localhost peut créer les Streams SOURCES (associé aux subjects SOURCES.twitter et SOURCES.programmez) et NEWS (associé au subject NEWS.received). Chaque message transitant par ces Streams devra être acquitté ($JS.ACK.NEWS.> et $JS.ACK.SOURCES.>). Dans l’idéal, nous aurions pu déclarer deux clients différents : l’un pour le Stream SOURCES et l’autre pour NEWS.

Sachez enfin que les wildcards sont ici supportés : il est donc possible d’autoriser un client à se connecter à n’importe quel subject comme suit :

# Client authorization
authorization = {
  users = [
    {
      user: "client@localhost"
      permissions = {
        publish = [
          "$JS.API.INFO"
          "$JS.API.STREAM.CREATE.SOURCES"
           ...
          "SOURCES.*"
           ...
          "$JS.ACK.SOURCES.>"
        ]
      }
    }
  ]
}

Sécuriser les échanges

Il est tout à fait possible de monter un cluster simple à des fins expérimentales, et sans prêter attention aux clients souscrivant à tel ou tel subject. Néanmoins, cela est fortement déconseillé en production. Nous allons donc générer des certificats qui seront utilisés afin de contrôler qu’un client est bien autorisé à échanger avec le cluster.

Nous commençons donc par créer ces certificats (ils seront automatiquement créés dans un répertoire config, monté en tant que volume dans le fichier docker-compose.yml):

$ mkcert -client -cert-file config/client-cert.pem -key-file config/client-key.pem localhost ::1 client@localhost
$ mkcert -client -cert-file config/server-cert.pem -key-file config/server-key.pem n1 n2 n3 ::1 server@localhost
$ cp "$$(mkcert -CAROOT)"/rootCA.pem ./config/rootCA.pem

client-cert.pem et client-key.pem seront également réutilisés par les consumers et producers que nous allons écrire.

Afin de référencer les certificats auprès du cluster, nous allons enrichir le fichier jetstream.conf avec le contenu suivant :

cluster = {
...
  tls = {
    cert_file: "https://d3uyj2gj5wa63n.cloudfront.net/config/server-cert.pem"
    key_file: "https://d3uyj2gj5wa63n.cloudfront.net/config/server-key.pem"
    ca_file: "https://d3uyj2gj5wa63n.cloudfront.net/config/rootCA.pem"
    verify_and_map: true
  }
...
}

tls = {
 cert_file: "https://d3uyj2gj5wa63n.cloudfront.net/config/client-cert.pem"
 key_file: "https://d3uyj2gj5wa63n.cloudfront.net/config/client-key.pem"
 ca_file: "https://d3uyj2gj5wa63n.cloudfront.net/config/rootCA.pem"
 verify_and_map: true
}

Et voilà 🙂 un simple docker compose up -d dans un terminal et nous disposons à présent d’un cluster JetStream complet !

Les exporters

Pour illustrer concrètement la façon dont les messages sont publiés, voyons comment s’articulent les composants ProgrammezExporter et TwitterExporter.

ProgrammezExporter scrute régulièrement le flux RSS du site programmez.com. En outre il s’agit simplement d’un “cron” qui se lance a intervalle régulier pour récupérer ce flux RSS, en sortir une grappe d’objets exploitable et la pousser sur un subject NATS.

TwitterExporter est similaire puisqu’il a en charge la récupération des Tweets depuis l’API Twitter, si ce n’est qu’il s’appuie sur le stream mis à disposition par le réseau social : les Tweets arriveront donc en temps réel, et il n’y aura donc pas besoin de “scheduler” le traitement de ceux-ci (en outre TwitterExporter reste à l’écoute, et publie un message dès qu’un nouveau Tweet lui est poussé).

Une connexion sécurisée au cluster

Pour publier des messages sur un subject provisionné sur notre cluster, il sera nécessaire d’utiliser les certificats créés précédemment. On commence donc par se créer une petite fonction réutilisable, qui retourne une connexion au cluster :

const (
	certFile   = "https://d3uyj2gj5wa63n.cloudfront.net/path/to/config/client-cert.pem"
	keyFile    = "https://d3uyj2gj5wa63n.cloudfront.net/path/to/config/client-key.pem"
	rootCAFile = "https://d3uyj2gj5wa63n.cloudfront.net/path/to/config/rootCA.pem"
	servers    = "nats://localhost:4222, nats://localhost:4223, nats://localhost:4224"
)

// connect to a JetStream cluster using X509 certificates to authenticate securely.
func Connect() (*nats.Conn, error) {
	clientCert, err := tls.LoadX509KeyPair(certFile, keyFile)
	if err != nil {
		return nil, fmt.Errorf("error parsing client X509 certificate/key pair: %s", err)
	}

	config := &tls.Config{
		Certificates: []tls.Certificate{clientCert},
		MinVersion:   tls.VersionTLS12,
	}

	nc, err := nats.Connect(servers, nats.Secure(config), nats.RootCAs(rootCAFile), nats.UseOldRequestStyle())
	if err != nil {
		return nil, fmt.Errorf("Got an error on Connect with Secure Options: %s\n", err)
	}
	return nc, nil
}

Dans l’idéal cette fonction sera externalisée dans une librairie 😉

La connexion se fera ensuite tout simplement via :

nc, err := lib.Connect()
if err != nil {
  log.Fatal(err)
}
defer nc.Close()

Publication des messages

Viens ensuite la publication des données sur le subject NATS.

Pour cela commençons par instancier un JetStreamContext à partir de la connexion créée lors de l’étape précédente :

js, err := nc.JetStream()
if err != nil {
  log.Fatal(err)
}

Nous n’aborderons pas la récupération périodique des informations contenues dans le flux RSS, cet aspect étant “en dehors du scope de cet article”. Sachez simplement que gocron fait très bien le job 😉 Et pour les lecteurs intéressés, le code est disponible ici.

Les messages étant publiés sous forme de bytes, l’envoi de données pourra alors se faire comme suit côté ProgrammezExporter :

type Msg struct{
  Data string
  Timestamp string
}
// ...
var programmezSubjectName = "SOURCES.programmez"
s1 := gocron.NewScheduler(time.UTC)
s1.Every(3).Seconds().Do(func() {
    msg := Msg{}
    rssContent, _ := getRss("https://www.programmez.com/rss/rss_actu.php")
    msg.Data = rssContent
    msg.Timestamp = time.Now().String()
    msgBytesBuffer := new(bytes.Buffer)
    json.NewEncoder(msgBytesBuffer).Encode(msg)
    _, err = js.Publish(programmezSubjectName, msgBytesBuffer.Bytes())
   if err != nil {
     log.Fatal(err)
   }
})
s1.StartBlocking()

Rien de bien compliqué ici, on définit une structure qui encapsulera les données utiles d’un message (dans notre cas le flux RSS brut contenant un ensemble de news, et un timestamp).
L’objet obtenu est transformé en tableau de bytes et publié via js.Publish, qui prend en paramètre le subject sur lequel le message doit être publié (SOURCES.programmez), ainsi que le message encodé en bytes[].

Dès lors, à chaque lancement de ce cronjob, le flux RSS de programmez.com est transformé en message et publié sur un subject. Voyons maintenant comment consommer les messages de ce subject.

En ce qui concerne TwitterExporter, on reste en écoute et on publie a chaque nouveau Tweet. Le code est donc sensiblement le même :

req, err := http.NewRequest("GET", "https://api.twitter.com/2/tweets/search/stream?media.fields=url&tweet.fields=created_at", nil)
req.Header.Add("Authorization", `Bearer ` + token )
resp, err := http.DefaultClient.Do(req)
reader := bufio.NewReader(resp.Body)
for {
	line, _ := reader.ReadBytes('\n')
        // Publication ...
}

Voilà pour la publication de messages, place à la consommation !

Les consumers

Constituant la seconde couche de notre projet, ProgrammezConsumer et TwitterConsumer ont en charge le nettoyage des données. Tout à l’heure, nous avons exporté périodiquement un flux RSS brut dans un seul message. Pour résumer un même message comportera une liste news. ProgrammezConsumer va donc récupérer ce message pour en extraire la liste de news, avant de republier chacune d’entre elles sur le subject NEWS.received.

TwitterConsumer quant à lui sera beaucoup plus simple et ne fera que mapper les messages issus de SOURCE.twitter vers un objet news, avant de le republier sur NEWS.received.


Un mot sur les modes de consommation

Jetstream supporte deux types de consumers qui définissent de quelle façon les messages sont consommés :

  • En mode pull, c’est le consumer lui même qui demande au système le prochain message à consommer. Cela se traduira par une lecture en boucle des messages côté consumer, qui sollicitera à chaque fois le serveur pour obtenir le prochain message à traiter. La librairie jsm.go fournie quelques facilitants :
consumer, _ := mgr.NewConsumer("ConsumerName", jsm.FilterStreamBySubject("STREAM.subject"))
msg, err := consumer.NextMsg()
// Récupération des metadata du message (comme le subject ou le stream sur lequel le message a été émis)
meta, _ := jsm.ParseJSMsgMetadata(msg)

En outre, en mode pull, il est possible de facilement scaler en augmentant le nombre d’instances d’un même consumer. Les messages seront alors répartis entre ces instances en fonction de la disponibilité de chacune.

  • En mode push, c’est le système qui envoie le message au consumer et ce dernier réagit à la réception de celui-ci. Côté code, l’approche “classique” offerte par la librairie NATS officielle sera utilisée comme suit :
sub, _ := nc.Subscribe(ib, func(m *nats.Msg){ 
    log.Print("Msg Processing"); 
    //...
})

Notons qu’en réalité nous n’écrirons pas de consumer, mais plutôt des composants qui eux utiliseront des consumers créés sur JetStream pour récupérer les messages de tel ou tel subject.

Consommation des messages

La consommation des messages publiés sur SOURCE.programmez et SOURCE.twitter se fera en mode pull. Nous l’avons vu un peu plus haut, la fonction NextMsg() permet de “réclamer” des messages au serveur. Mais pour pouvoir l’utiliser, il faut créer un consumer (ou charger un consumer existant) à partir d’un stream :

import (
   "../../lib"
    ...
   "github.com/nats-io/jsm.go"
   "log"
)
//...
nc, _ := lib.Connect()
defer nc.Close()
js, _ := nc.JetStream()
mgr, _ := jsm.New(nc)
stream, _ := mgr.LoadOrNewStream("SOURCES", jsm.Subjects("SOURCES.programmez"), jsm.LimitsRetention(), jsm.FileStorage(), jsm.InterestRetention())
consumer, err := stream.LoadOrNewConsumer("ProgrammezConsumer", jsm.FilterStreamBySubject(programmezSubjectName), jsm.DurableName("ProgrammezConsumer"))

La fonction LoadOrNewConsumer permet de rechercher un consumer et le charger. Si ce dernier n’existe pas alors il sera créé.
Si nous avions simplement utilisé NewConsumer, nous aurions pu obtenir une erreur à chaque redémarrage de notre composant puisque ce dernier aurait ordonné la création d’un consumer existant (créé lors du premier démarrage du dit composant).

La récupération du stream se fait de la même façon avec LoadOrNewStream, qui prend en paramètre le nom du stream, le ou les subjects de ce stream, et le type de rétention des messages (ici ceux ci seront persistés sous forme de fichiers).

Par ailleurs un consumer pourrait être préalablement créé avec le CLI de NATS comme suit :

$ nats con add SOURCES NEW --filter SOURCES.programmez --ack explicit --pull --deliver all

Pour la consommation à proprement parlé nous procéderons alors de la façon suivante (ici pour ProgrammezConsumer):

var sourcesStreamName = "SOURCES"
var programmezSubjectName = "SOURCES.programmez"
var newsSubjectName = "NEWS.received"
for {
	msg, err := consumer.NextMsg()
	msg.Ack()
	if err == nil && len(msg.Data) > 0 {
		msgContent := MsgModel.Msg{}
		msgBytesBuffer := new(bytes.Buffer)
		json.NewDecoder(msgBytesBuffer).Decode(msg.Data)
		if err := json.Unmarshal(msg.Data, &msgContent); err != nil {
			panic(err)
		}
		rss := RssModel.Rss{}
		if err := xml.Unmarshal([]byte(msgContent.Data), &rss); err != nil {
			panic(err)
		}
		for _, newsElm := range rss.Items {
			news := NewsModel.News {
				Title: newsElm.Title,
				Description: newsElm.Description,
				Date: newsElm.PubDate,
				Link: newsElm.Link,
			}
			msgBytesBuffer := new(bytes.Buffer)
			json.NewEncoder(msgBytesBuffer).Encode(news)

			_, err = js.Publish(newsSubjectName, msgBytesBuffer.Bytes())
			if err != nil {
				log.Fatal(err)
			}
		}
	}
}

On répète ici à l’infini la réclamation de messages via NextMsg(). Chaque message obtenu est acquitté et désérialisé. Comme précisé un peu plus haut, les messages destinés à ProgrammezConsumer comporte une liste de News. Cette liste est donc parcourue et chaque élément est transformé en un objet News, puis republié sur le subject NEWS.received.

Persistance des News en base

Dernier maillon de la chaine : l’intégration des News postées sur NEWS.received en base de données. Ce subject contient a la fois les messages envoyés par ProgrammezConsumer et TwitterConsumer (messages ayant tous la même forme à savoir une structure composée d’un titre, une description, une date et un lien) :

type News struct {
	Title       string
	Description string
	Date		string
	Link		string
}

Le composant NewsCreator consomme les messages de NEWS.received exactement comme le ferait TwitterConsumer et ProgrammezConsumer, et se contente d’enregistrer les données en base MongoDB :

for {
	msg, err := consumer.NextMsg()
	msg.Ack()
	if err == nil && len(msg.Data) > 0 {
		news := NewsModel.News{}
		msgBytesBuffer := new(bytes.Buffer)
		json.NewDecoder(msgBytesBuffer).Decode(msg.Data)
		//...

		newsBson := bson.D {
		     {"title" , news.Title},
		     {"date" , news.Date},
		     {"description" , news.Description},
		     {"link" , news.Link},
		}

		db := mongoClient.Database("ilab-news-aggregator-db")
		collection := db.Collection("news")
		filter := bson.D {{"title", news.Title },}
		existingNews := NewsModel.News{}
		cursor := collection.FindOne(ctx, filter)
		err := cursor.Decode(&existingNews)
		if err != nil {
			log.Print(news.Title)
			collection.InsertOne(ctx, newsBson)
		}
	}
}

Et voilà ! Une chaine complète d’intégration de données, de la captation jusqu’à leur persistance, le tout avec NATS !

En attendant la suite …

Vous avez vu dans cet article quels étaient les avantages de JetStream et en particulier les modes de consommation de messages. Cependant, deux points n’ont pas été abordés :

  • La surveillance de notre chaîne d’intégration de News. J’évoquai au début de l’article un monitoring assuré par Prometheus et permettant de contrôler les messages publiés sur NEWS.processed (l’occasion d’aborder par la pratique le mode de consommation “Push”).
  • Le contrôle du format des messages transitant sur nos subjects. Dans le cas présent nous pouvons publier à peu près tout et n’importe quoi. Cependant, tout comme pour Kafka, un mécanisme intéressant existe et permet de valider les messages : les schémas. Et le plus beau, JetStream permet de les centraliser dans un Schema Registry !

Un peu de patience, ces deux sujets seront au programme du prochain article 😉

D’ici là un peu de lecture pour patienter :