ZeroMQ
Introduction
Il y a quelques temps je me suis trouvé face à un projet où l’équipe avait développé de zéro un Framework de communication inter-processus à base mémoire partagée, un Framework avec envoie de messages, de process qui s’abonnent à des types de messages, le tout avec de fortes contraintes temporelles.
Ceci ressemblait furieusement à quelques chose dont j’avais déjà entendu parlé, une bibliothèque de Message Queuing ( [https://en.wikipedia.org/wiki/Message_queue] ). He oui, encore une équipe qui réinvente la roue …
Bref, voici une bonne excuse pour s’intéresser à cette technologie et expérimenter un peu avec. Comme j’avais déjà été un peu exposé à la bibliothèque ZeroMQ, que j’avais entendu beaucoup de bien du guide http://zguide.zeromq.org/page:all et que j’ai réentendu parlé de ZeroMQ via T-Rex (un générateur de trafic réseau) c’est via cette bibliothèque que je vais faire mon chemin.
D’après la page Wikipedia ( https://fr.wikipedia.org/wiki/ZeroMQ):
ZeroMQ (également écrit ØMQ, 0MQ ou ZMQ) est une bibliothèque de messagerie asynchrone haute performance, destinée à être utilisée dans des applications distribuées ou concurrentes. Il fournit une file d’attente de messages, mais contrairement au Message-oriented middleware, un système ZeroMQ peut fonctionner sans message broker. L’API de la bibliothèque est conçue pour ressembler à celle des sockets BSD.
Voyons voir ça.
Installation
Sur mon Ubuntu l’installation est simple et directe, j’installe libzmq et le bindings pour Python.
# apt-get install libzmq3-dev
# apt-get install python3-zmq
Le message queuing
Le but d’une bibliothèque de messages est de pouvoir envoyer des messages entre programmes (ou entre thread). La bibliothèque mets en place des files d’attente de messages et permet au programme de déclencher un code particulier pour traiter les messages reçus.
Mais pourquoi n’utilise t-on pas juste une communication ad-hoc (genre mémoire partagée) ?
Voyons voir ce qu’il faudrait.
Il faut un moyen d’envoyer un message d’un coté. Puis un moyen de le recevoir de l’autre. Soit le receveur attend l’arrivée de message soit il est prévenu quand un message arrive. Il faut un moyen de gérer le fait qu’il puisse y avoir plusieurs messages avant que le receveur ne les voient / ne puisse les traiter. Il peut y avoir d’autre besoins, veut on que l’émetteur soit prévenu de la bonne réception, veut on pouvoir gérer l’arrêt/redémarrage d’un des processus. Et si l’on veut envoyer le même message à plusieurs processus ?
Une réponse à toutes (ou la plupart de) ces problématiques) est la bibliothèque de Message Queuing.
La bibliothèque permet donc au processus de ne pas avoir à traiter explicitement le stockage des messages, les moyens de transmissions et de répartition des messages. Les programmes doivent juste s’enregistrer et déclarer une action à effectuer à la réception d’un message.
L’API, socket like de ZMQ
ZMQ permet d’utiliser une interface de programmation similaire à celle des sockets.
On ne manipule pas directement les sockets. Une couche intermédiaire se charge de manipuler réellement les sockets. Cette couche peut d’ailleurs utiliser autre chose que des sockets réseau pour la communication de messages.
ZMQ permet de faire communiquer des éléments :
- par le réseau (comme de vraies sockets réseau)
- entre processus sur une même machine
- entre Thread d’une même machine.
Et le plus beau c’est que, dans la plus part des cas, tout le code de communication entre deux entités reste le même, il suffit de changer le mode de transport:
- url:/
- ipc:/
- inproc:/
Prenons l’exemple de 2 entités, A envoie une Requête et attend une Réponse de B.
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
# Wait for next request from client
message = socket.recv()
print("Received request: %s" % message)
# Do some 'work'
time.sleep(1)
# Send reply back to client
socket.send(b"World")
import zmq
context = zmq.Context()
# Socket to talk to server
print("Connecting to hello world server…")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
# Do 10 requests, waiting each time for a response
for request in range(10):
print("Sending request %s …" % request)
socket.send(b"Hello")
# Get the reply.
message = socket.recv()
print("Received reply %s [ %s ]" % (request, message))
Si on remplace “tcp://localhost:5555” et “tcp://*:5555” par :
- “tcp://localhost:5555”/ “tcp://*:5555” on a 2 programmes communiquant sur l’internet
- “ipc://tmp/hello”, on a 2 programmes communiquant sur la même machine
- “inproc://hello”, on a 2 thread du même programme qui communiquent
# Les Patterns de communication
ZMQ présente le message queuing sous forme de Patttern :
- Exclusive PAIR: Pair à Pair
- REQ/REP : Request / Reply
- PUB/SUB : Publish / Subscribe
- PUSH/PULL : Pipeline
Ces Patterns permettent de raisonner par rapport à des concepts familiers et cachent la complexité de la gestion des connexions.
## Exclusive PAIR
Ce pattern ne sert apparement que pour connecter 2 Thread (choses dont on est sur qu’ils ne tomberont pas, à moins d’un échec de tout le process).
## REQ/REP
En REQ/REP, le Client envoie une Requete et bloque jusqu’au retour d’une Reponse émise par un Serveur qui attendait la Requete.
Il est à noter qu’il est possible d’avoir plusieurs Serveurs pour un Client. Dans ce cas ZMQ dispatchera les requêtes vers les Serveurs (une sorte de Load Balancing).
Les pattern ROUTER et DEALER sont également de type request-reply. Contrairement à REQ/REP ils sont asynchrone.
Les Request-Reply sont utilisés dans un cadre de distribution de taches.
## PUB/SUB
Des entités qui publient des informations, sans se soucier si des personnes écoutent et des entité qui souscrivent à des sujets.
Les Publisher PUB, envoie un message avec dans le corps un “Topic”, un “sujet” quoi.
Pour s’abonner à un “Sujet” particulier le Subscriber SUB, utilise setsockopt()
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
Le cas d’utilisation est la distribution de données, d’un producteur vers plusieurs souscripteurs.
## PUSH/PULL (Pipeline)
Ce pattern est destiné au suite de traitement de données, des A produisent vers des B qui traitent qui envoient le résultats vers des C qui eux même envoie vers le ou les collecteurs finaux D.
Une application
Pour bien comprendre les choses rien ne vaut l’action. Je vais essayer d’utiliser ZMQ pour programmer une communication entre plusieurs processus sur une même machine.
Spécifications
Une succincte explication de ce que je voudrais réaliser : Une programme Launcher chargé de :
- lancer d’autres pgms,
- de surveiller qu’ils tournent toujours (les relancer si besoin),
- d’arrêter proprement les programmes et que les pgms lancés communiquent entre eux.
C’est une spécification légère, mais c’est suffisant pour une première prise en main de ZMQ.
Ci-dessous un découpage en quelques tâches :
- Faire un Pgm Launcher qui lance 2 autres pgms.
- Faire un heartbeat par ZMQ par proc:// avec réception du message dans le Launcher
- Faire que les process soient indépendants quand le Launcher meurt, les autres survivent.
- Faire que les pgms soient relancés par le Launcher quand 1 des pgms meurt
- Faire que le Launcher prenne de façon interactive une commande pour arrêter proprement les process. Donc envoi de message du Launcher aux autre pour un arrêt Au bout d’un moment,si pas de réponse des pgms, arrêt brutal.
- Faire communiquer entre eux 2 les pgms lancés.
## Journal de programmation
Toutes les sources et les différents commits peuvent être retrouvés à l’adresse https://github.com/bellefab/zmq_test_launcher
Tâche 1 - Le lancement de plusieurs programmes avec heartBeat
Le programme launcher.py lance les scripts Python client1.py et client2.py et se met en attente de messages pendant 10 secondes et affiche les messages reçus.
import subprocess
import zmq
import sys
import os
import time
pgm_list = (u"./client1.py", u"./client2.py")
processes = []
def killAll_process():
for idx in range(len(processes)):
processes[idx].kill()
if __name__ == '__main__':
# zmq init
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind("ipc:///tmp/hearbeat")
socket.setsockopt_string(zmq.SUBSCRIBE, u'')
# loop to launch processes
for p in pgm_list:
p = subprocess.Popen(['/usr/bin/python3', p], start_new_session=True)
processes.append(p)
# wait for heartbeat signals / stop after 10 seconds
t_start = time.time()
while True:
str = socket.recv_string()
print("Msg received : ", str)
if time.time() > (t_start + 10):
break
print("END !!!")
killAll_process()
La validation de cette première version est réalisée par un lancement du programme et la constatation visuelle que l’on reçoit les messages de client1 et client2 pendant 10 secondes puis que tout s’arrête. Une commande ps permettra de s’assurer qu’aucun processus ne reste.
L’option start_new_session=True de subprocess.Popen() permet de détacher le nouveau programme de son parent et de s’assurer que si celui çi meurt, il continuera à vivre. On s’en assurera en faisant un Ctrl-C sur le launcher.py. Il faudra alors utiliser la commande kill pour arrêter client1 et client2.
La partie ZMQ du programme est assez limitée. Dans cette exemple j’ai choisi le pattern PUB/SUB de ZMQ. Les Publisher seront les programmes lancés et le Subscriber le Launcher. La ligne
socket.setsockopt_string(zmq.SUBSCRIBE, u'')
permet d’indiquer que Launcher s’abonne à tout les messages.
Le moyen de transport à utiliser est ici IPC, chose que l’on indique par le ipc de ipc:///tmp/hearbeat. Le Endpoint /tmp/heartbeat est une chaîne arbitraire identifiant un chemin de fichier.
Les programmes client et client2 utilisent la méthode connect() et Launcher la méthode bind(). En effet, dans le cas précis d’une communication IPC, il n’est possible de faire un seul bind() et plusieurs connect() et pas l’inverse. Si on se trompe, que l’on fit le bind() dans le client et le connect() dans le Launcher c’est le dernier bind() qui gagnera et le Launcher ne verra jamais que les messages d’un seul programme.
### Tâche 2 - Les messages HeartBeat
Dans Launcher nous utilisons un Poller. Cet élément permet :
- de ne pas être bloquant sur une attente de message. Il permet donc de définir un TIMEOUT.
- de définir un ensemble de socket que l’on va observer (dans notre exemple nous n’avons qu’une seule socket)
Launcher va donc attendre au maximum TIMEOUT millisecond. Si notre socket n’a reçu aucun message durant cet intervalle alors nous vérifions si aucun pgms lancé n’est considéré comme perdu, et si un message est reçu alors nous mettons à jour les signes de vie des pgms.
Un pgm sera considéré comme perdu si son dernier signe de vie dépasse un certain délai, LOST.
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
t_start = time.time()
while True:
# wait for heartbeat signals / stop after MAX_TIME seconds
if time.time() > (t_start + MAX_TIME):
break
socks = dict(poller.poll(REQUEST_TIMEOUT))
if socks.get(socket) == zmq.POLLIN:
str = socket.recv_string()
update_last_heart_beat(str)
else:
print("\n\n\n --------TIMEOUT----------\n\n\n")
check_and_restart()
Tâche 3 - Un peu d’interactivité
Le programme est utilisable mais bon … il serait un peu mieux avec la possibilité de donner quelques commandes.
Nous allons donc ajouter un thread qui va lancer une boucle attendant les commandes de l’utilisateur. le nouveau thread va communiquer avec le thread principal via ZMQ, plus précisément le transport inter-thread, inproc, avec le pattern REQ/REP.
Dans un premier temps les commandes disponible seront :
help/h : this help message.\
list/l : list all processes planned to be launched.\
quit/q : quit the program after killing all launched processes.\
killall : kill all launched processes.\
kill : kill one particular process.\
Coté ZMQ, tout est assez simple, une nouvelle socket ZMQ, un ajout au Poller
# socket for shell_loop
socket_shell = context.socket(zmq.REP)
socket_shell.bind(shell_url)
# start command loop
shell_thread = Shell()
shell_thread.start()
...
poller = zmq.Poller()
...
poller.register(socket_shell, zmq.POLLIN)
...
while True:
# wait for heartbeat signals / stop after MAX_TIME seconds
if time.time() > (t_start + MAX_TIME):
break
socks = dict(poller.poll(REQUEST_TIMEOUT))
if socks.get(socket) == zmq.POLLIN:
....
elif socks.get(socket_shell) == zmq.POLLIN:
msg_shell = socket_shell.recv_string()
print("Received from shell ", msg_shell)
msg_shell_answer, cont_stop = treat_shell_cmd(msg_shell)
socket_shell.send_string(msg_shell_answer)
if cont_stop == "stop":
break
...
Liens
https://www.linuxembedded.fr/2015/06/une-introduction-a-zeromq/ intéressant pour voir un SocketMonitor ce dernier permet de voir les
https://blog.octo.com/delegation-de-taches-avec-zeromq/
https://www.aosabook.org/en/zeromq.html
Laisser un commentaire