Nouvelles Chroniques d'Amethyste

Penser au Sens, pas au Verbe

Azure Service Bus Queue: vision développeur

Poster un commentaire

Azure Service Bus (ASB) est une suite de 5 services dont j’ai présenté l’architecture ici:

https://amethyste16.wordpress.com/2016/04/06/focus-sur-azure-service-bus/

Je propose maintenant une vision développeur de l’un d’entre eux: la file d’attente.

 

Important: regardez bien la date de publication de cet article.

Vous trouverez pas mal de code sur Internet très différent de celui que je propose et qui ne compile d’ailleurs même pas. Le sujet a l’air d’être encore très actif chez Microsoft car ce n’est pas de petites modification du modèle de programmation. Rien ne dit aussi que le code donné en exemple fonctionne encore au moment où vous le lirez.

La masse de changement observés m’a assez surpris concernant un service en place dans Azure depuis des années. C’est d’ailleurs pour clarifier les choses suite à un précédent projet que j’ai décidé de faire cet article.

Au programme:

  • Création d’un espace de noms
  • Création d’une QUEUE
  • Les bases
  • Messages empoisonnés ou en fin de vie
  • Messages différés
  • Messages dupliqués
  • Session
  • Jeton SAS

 

Espace de noms

On peut créer un espace de noms de 2 façons:

  1. Portail
  2. PowerShell

 

Portail

Le point d’entrée aux fonctions proposées par ASB est la création d’un namespace qui joue un peu le rôle de conteneurs des services de Storage.

On construit un espace de nom depuis le portail classique, ce n’est pas encore possible depuis le nouveau portail.

2016-05-15_16-46-44

Dans l’onglet Service Bus on clique sur CREATE.

2016-05-15_16-50-30

On choisit le nom de l’espace de noms et un type. Si vous n’utilisez que Queue, Topic ou Relay, choisissez MESSAGING. Choisissez EVENTHUB si vous avez besoin en plus de EVENTHUB. On va rester sur MESSAGING. Ce découpage peut sembler bizarre, il a été introduit pour des raisons d’optimisation des performances.

 

On sélectionne un niveau de facturation parmi les 3 proposés. On trouvera un comparatif ici:

https://azure.microsoft.com/fr-fr/pricing/details/service-bus/

Le choix dépend des fonctions dont on a besoin, par exemple RELAY n’est disponible qu’en mode STANDARD. Nous allons choisir STANDARD.

On fait OK ensuite et Azure construit l’espace de noms.

2016-05-15_17-00-54

 

Si on clique sur le nom de l’espace de noms:

2016-05-15_17-37-15

On est prêt!

PowerShell

La commande suivante construit le même espace de noms que précédemment:

New-AzureSbNamespace -name amethysteSB -location ‘West Europe’ -CreateACSNamespace $false -NamespaceType Messaging

2016-05-15_17-33-21

La commande affiche le endpoint et la chaîne de connexion.

Sécuriser l’espace de noms

Si on examine la chaîne de connexion, on trouve ceci:

Endpoint=sb://amethystesb.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=vWnGTiRXfrqwdvDNAeVicKCSTVxveN3SdAkZ/Zn8oY=

 

On voit que l’espace de noms est sécurisé par défaut avec un jeton SAS. Une configuration, appelée SharedAccessKeyName, a été créée.

2016-05-15_18-06-18

On passe donc dans la chaîne de connexion le nom de la police de sécurité qui déclare les permissions accordées sur le service ainsi que la clef de sécurité.

 

La police par défaut apporte des permissions étendues, peut être trop, mais on peut en créer de nouvelles mieux adaptées:

2016-05-15_18-08-48

Création d’une QUEUE

On créée une file d’attente de 2 façons:

  1. Portail
  2. PowerShell

Portail

On se rend sur l’onglet QUEUE:

2016-05-15_18-33-52

Un clic sur la commande ouvre ce menu:

2016-05-15_18-36-49

Prenons CUSTOM CREATE:

2016-05-15_18-38-33

On donne un nom à la QUEUE, amethysteQueue, puis on fait OK.

2016-05-15_18-39-56

On va conserver les valeurs par défaut comme ci-dessus. On fait OK.

PowerShell

Il n’y a apparemment pas de commandes directe pour créer ou supprimer une Queue, mais j’ai tout de même trouvé ce script:

https://blogs.msdn.microsoft.com/paolos/2014/12/02/how-to-create-service-bus-queues-topics-and-subscriptions-using-a-powershell-script/

 

Protocole de connexion

Par défaut 3 protocoles de communications sont proposés:

  1. SBMP
  2. HTTP
  3. AMQP

Si on n’a pas de problèmes de firewall, le protocole AMQP est recommandé.

https://azure.microsoft.com/fr-fr/documentation/articles/service-bus-amqp-overview/

On peut sélectionner le protocole en ajoutant le paramètre TransportType à la chaîne de connexion suivit du type de protocole.

Emission/réception d’un message

Note: l’ensemble des démos sera faite en mode synchrone, mais les méthodes utilisées existent toutes en version asynchrone.

 

Premier essai

Nous allons enfin faire un peu de code et en particulier un exemple peu élaboré, mais démonstratif des principes de base.

Pour cela on a besoin d’un projet Console que l’on complète en appliquant le package Nuget qui va bien:

install-package WindowsAzure.ServiceBus

Je vais construire deux projets:

  1. SenderApplication
  2. ReceiverApplication

 

On commence par le premier:

string queueName = "amethystequeue";
string connexion = "Endpoint=sb://amethystesb.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=vWnGTiRXfrqwdv7DNAeVicKCSTVxveN3SdAkZ/Zn8oY=";
 
MessagingFactory messagingFactory = MessagingFactory.CreateFromConnectionString(connexion);
QueueClient client = messagingFactory.CreateQueueClient(queueName);
 
string message = "Comment allez vous?";
BrokeredMessage brokeredMessage = new BrokeredMessage(message);
client.Send(brokeredMessage);

 

Ce code va émettre un message en direction de la QUEUE ‘amethystequeue’ que nous avons créée.

La première étape est de créer une fabrique de messagerie avec MessagingFactory. La fabrique attend la chaîne de connexion vers  l’espace de noms dans sa méthode CreateFromConnectionString(). Comme alternative nous avons également la méthode Create() et sa dizaine de surcharges.

 

MessagingFactory est le point d’entrée vers les différents services proposés par Service Bus. Nous on s’intéresse à une file d’attente. On obtient un client de communication avec la QUEUE en invoquant CreateClientQueue().

 

Lancez le, rien de très visuel se produit, mais un message est belle et bien présent dans la QUEUE. On peut d’ailleurs confirmer depuis le portail que quelque chose se trouve dans la file d’attente:

2016-05-15_23-48-58

Il est possible de compléter le message avec des données supplémentaires injectées dans sa propriété Properties.

 

Le message envoyé est une String, mais n’importe quel objet sérialisable fait l’affaire du moment qu’il soit décoré avec les attributs en DataContractAttribute. Ainsi la classe:


[DataContract]
class Personne
{
   [DataMember]
   public string Nom { get; set; }
   [DataMember]
   public int Age { get; set; }
}

Pourra être poussée dans la file ainsi:


personne = new Personne() { Age = 20, Nom = "Alice" };
BrokeredMessage brokeredMessage = new BrokeredMessage(personne);
client.Send(brokeredMessage);

 

 

Nous allons écrire un récepteur pour vérifier que le message a été expédié:

MessagingFactory messagingFactory = MessagingFactory.CreateFromConnectionString(connexion);
QueueClient client = messagingFactory.CreateQueueClient("amethystequeue");
 
while (true)
{
   BrokeredMessage message = client.Receive();
   if (message != null)
   {
      Console.WriteLine(message.GetBody<string>());
 
      message.Complete();
   }
}

Comme tout à l’heure on construit un client de messagerie et on interroge la QUEUE avec la méthode Receive().

Il s’affiche:

2016-05-15_21-13-07

Cela fonctionne.

 

On peut ne pas être emballé par un while(true), il existe aussi un modèle événementiel. Par exemple:

client.OnMessage(message =>
{
   if (message != null)
   {
      try
      {
         Console.WriteLine(message.GetBody<string>());
 
         message.Complete();
      }
      catch
      {
         message.Abandon();
      }
   }
});
 
Console.WriteLine("Attente d'un message");
Console.ReadLine();

Et si on teste:

2016-05-16_07-53-12

Quelques options peuvent être passée en injectant une instance de OnMessageOptions.

 

Note: l’événement n’est déclenché qu’à l’arrivé d’un message, pas sur les messages déjà existant dans la file.

 

Receive, Complete, Abandon

On dispose de 3 méthodes pour manipuler les messages dans la queue:

  1. Receive
  2. Peek
  3. Abandon

Une fois lu avec Receive, le message est verrouillé selon une durée paramétrable:

2016-05-15_22-08-49

Il est de 30 secondes par défaut.

Vérifions ce que cela implique avec ce code après avoir envoyé un message dans la pile (il en faut un seul):


BrokeredMessage message = client.Receive();
Console.WriteLine("Message: {0}", message.MessageId);
DateTime start = DateTime.Now;
BrokeredMessage message2 = client2.Receive();
DateTime stop = DateTime.Now;
Console.WriteLine("Message: {0}", message.MessageId);
Console.WriteLine("Attente: {0}", stop-start);

L’exécution donne ceci:

2016-05-15_22-11-45

Ce test montre 3 choses intéressantes:

  1. Confirmation que Receive pose un verrou sur la queue pour une durée de 30 secondes
  2. Tant que le message n’est pas traité les appels suivants à Receive sont bloqués pendant 30 secondes.
    Notez qu’une surcharge de Receive permet de passer un TimeSan afin d’augmenter ou diminuer cette valeur
  3. Si rien n’est fait ou si le client plante, alors le message est remis dans la queue au bout de 30 seconde ce qui débloque les Receive suivants

Ce comportement est appelé sémantique au moins une fois. C’est à dire que l’on peut s’assurer qu’un message pourra être proposé au moins une fois.

 

Note: On peut en cas de besoin renouveler programmatiquement le verrou avec RenewMessageLock().

 

Comment fait t’on pour débloquer la queue plus rapidement? Il y a deux méthodes:

  1. On remet le message dans la pile avec Abandon
  2. On le supprime de la pile avec Complete

On a déjà vu l’utilisation de Complete, un usage typique de Abandon est celui-ci:

if (message != null)
{
   try
   {
      Console.WriteLine(message.GetBody<string>());
 
      message.Complete();
   }
   catch
   {
      message.Abandon();
   }
}

Il est également possible de lire un message sans le sortir de la pile ou le verrouiller avec la méthode Peek. Si on remplace Receive par Peek dans l’exemple qui précède:

2016-05-15_22-23-29

Note: les méthodes Abandon et Complete ne sont pas disponibles sur le message ainsi récupéré.

 

Messages empoisonnés ou en fin de vie

Un message a une durée de vie limitée dans la queue:

2016-05-15_22-31-02

On peut également le recevoir un nombre maximum de fois:

2016-05-15_22-29-19

 

Note: on obtient le nombre de fois où un message a été livré en interrogeant sa propriété DeliveryCount

 

Cela permet à la queue de se débloquer automatiquement si on tombe sur un message empoisonné qui ne peut être traité correctement par le client.

Voyons un peu comment on gère cette situation, mais pour se faciliter la vie on va poser une durée de vie plus courte, 1 minute suffira. Voici un premier test. Un message est posé dans la queue, on attend un peu plus d’1 minute et on le reteste:

 


BrokeredMessage message = client.Peek();
Console.WriteLine("Message: {0}", message.MessageId);
Thread.Sleep(1000 * 70);
message = client.Peek();
Console.WriteLine("Message: {0}", message == null);

 

La console affiche:

2016-05-15_23-41-19

On confirme bien la durée de vie du message dans la queue. Par défaut le message disparaît complètement:

2016-05-15_23-51-06

 

On peut également le pousser dans une autre queue appelée file d’attente lettre morte. Il s’agit d’une pile identique à la pile précédente (même paramétrage), dans laquelle les messages non traités peuvent être poussés soit automatiquement en cochant:

2016-05-15_23-43-00

Soit en appelant la méthode DeadLetter() du message.

 

Note: Il est important de comprendre qu’une fois dans cette pile, un message y reste éternellement, sauf si vous le supprimez manuellement. C’est la raison pour laquelle certains préfèrent ne pas utiliser cette pile en raison du risque de fuite mémoire que cela peut entraîner.

 

Activons la file de lettre morte afin de tester son fonctionnement et recommençons le test précédent. Le comportement du code est bien entendu le même, mais cette fois:

2016-05-15_23-58-56

Le tableau indique qu’un message est présent dans la file d’attente, la file de lettre morte en l’occurrence. Comment lire le contenu de cette pile?

Le nom de la pile de lettre morte est construit sur le modèle de celui de la pile principale en ajoutant /$DeadLetterQueue au nom de la file.


QueueClient clientDead = messagingFactory.CreateQueueClient("amethystequeue/$DeadLetterQueue");
var msg = clientDead.Receive();
Console.WriteLine("Message: {0}", msg.MessageId);
msg.Complete();

Et cette fois la pile est entièrement vidée:

2016-05-16_00-03-33

Messages différés

ASB permet d’extraire un message de la pile et le marquer comme différé. Cela signifie que le message restera dans la pile (le portail continue à l’afficher), mais il ne sera plus représenté lors d’appels successifs.

Ce mécanisme est proposé pour les situations où l’on a besoin de gérer au niveau du client l’ordre de traitement des messages. Typiquement une machine a états.

Faisons un premier essai pour clarifier les choses, on est dans la situation où l’on a envoyé deux messages:

2016-05-16_09-37-47

Lançons le code suivant:

var message = client.Receive();
message.Defer();
Console.WriteLine(message.MessageId);
 
var message2 = client.Receive();
Console.WriteLine(message2.MessageId);
message2.Complete();

Deux messages sont lus:

2016-05-16_09-40-56

Le premier est différé (Defer), le second est marqué comme traité (Complete). Côté portail on voit ceci:

2016-05-16_09-42-46

Le message différé reste dans la file.

Supposons qu’ensuite on relance le code précédent on constate qu’aucun message n’est lu. Le message différé est toujours comptabilisé dans la file, mais n’est plus proposé par Receive.

 

Si vous patientez un peu, vous constaterez que le message différé n’est jamais supprimé de la file. De plus il n’est pas possible, dans le code qui précède, de faire un Complete pour le marquer comme ayant été traité et d’écrire quelque chose comme:


message2.Complete();
message.Complete();

Un exception est levée:

The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue

 

Alors comment récupérer un message différé si on n’a plus l’instance de BrokeredMessage? Il faut avoir noté la valeur de sa propriété SequenceNumber, et dans ce cas:


var message = client.Receive(13792273858822145);
message.Complete();

A ma connaissance il n’y a pas d’autres moyens de supprimer un message différé. Cette méthode est clairement un problème dans un contexte de haute dispo, d’autant plus que le message ne sera jamais nettoyé par Azure à moins de recréer la file d’attente.

Pour moi l’architecture des messages différés est à revoir.

Messages dupliqués

La propriété MessageId d’un message est générée automatiquement par défaut, mais il est possible de la remplacer par la valeur de son choix. Il se pose alors la question du comportement de la pile au cas où une valeur est dupliquée.

 

Lançons le code de test suivant:

personne = new Personne() { Age = 20, Nom = "Alice" };
BrokeredMessage brokeredMessage = new BrokeredMessage(personne);
brokeredMessage.MessageId = "message1";
 
client.Send(brokeredMessage);</div>
 
var brokeredMessage2 = new BrokeredMessage(personne);
brokeredMessage2.MessageId = brokeredMessage.MessageId;
client.Send(brokeredMessage2);

 

Aucun message d’erreur particulier, le client de lecture reçoit deux messages avec le même MessageId sans problème.

Dans certains scénario cela peut poser problème. ASB propose un service de détection des messages dupliqués, mais on doit l’activer au moment de la création de la QUEUE:

2016-05-16_10-46-26

Si vous relancez le test qui précède alors cette fois:

 

2016-05-16_10-48-33

Un seul message dans la file et un seul message récupéré par le client. C’est déjà mieux.

Les sessions de messagerie

Une file peut être consommée par plusieurs clients. Dans un tel scénario on peut avoir besoin de router certains messages vers tel ou tel client. Un autre besoin peut être de regrouper certains messages selon une affinité.

Ces scénarios sont possibles avec les sessions de messagerie.

 

Les sessions ne sont pas actives par défaut, on doit le faire au moment de la création de la file:

 

2016-05-16_11-31-27

A partir de là, chaque message envoyé doit avoir un Id de session, autrement l’exception suivante est levée:

The SessionId was not set on a message, and it cannot be sent to the entity. Entities that have session support enabled can only receive messages that have the SessionId set to a valid value.

Alors comment fait t’on?

 

Le code de l’émetteur peut être modifié ainsi:

personne = new Personne() { Age = 20, Nom = "Alice" };
BrokeredMessage brokeredMessage = new BrokeredMessage(personne);
brokeredMessage.SessionId = "1";
 
client.Send(brokeredMessage);
personne = new Personne() { Age = 50, Nom = "Bob" };
var brokeredMessage2 = new BrokeredMessage(personne);
brokeredMessage2.SessionId = "2";
client.Send(brokeredMessage2);

 

On envoi deux messages, de la même façon que ce qui a été vu jusqu’à présent, mais avec une différence importante: on fournit une valeur dans la propriété SessionId. On pourrait avoir autant de message qu’on le souhaite dans chaque session.

 

Côté récepteur que voyons nous?


var sessions = client.GetMessageSessions();
Console.WriteLine("Trouvé {0} sessions", sessions.Count());

Qui affiche:

2016-05-16_12-11-36

On retrouve donc bien les deux sessions créées par l’émetteur.

Si nous essayons de lire les messages comme on l’a fait jusqu’à présent l’exception suivante est levée:

It is not possible for an entity that requires sessions to create a non-sessionful message receiver.

 

C’est normal, on a besoin d’indiquer au client dans quelle session il va travailler. La lecture n’est plus effectuée avec une instance de QueueClient, mais de MessageSession que l’on construit ainsi:

var sessions = client.GetMessageSessions();
foreach (var browser in sessions)
{
   Console.WriteLine(string.Format("Session Id = {0}", browser.SessionId));
   MessageSession session = client.AcceptMessageSession(browser.SessionId);
 
   // lecture des messages de la session
}

 

Dans une vraie application je pense que l’on implémenterai une logique d’analyse de SessionId qui décide quelle session sera traitée par le client, les autres étant laissées aux autres clients qui consomment la QUEUE. On peut également utiliser cette logique pour filtrer les messages par affinité en fonction de leur Id de session. On devine là un prémisse des topiques, mais ceux-ci proposent des outils de filtrage plus sophistiqués.

Une autre utilisation des sessions est la mise en place de QUEUE avec des priorités différentes.

 

La lecture proprement dite est faite avec les méthodes que nous connaissons déjà:

while (true)
{
   var message = session.Receive();
 
   if (message != null)
   {
      Console.WriteLine("Message Id = {0}", message.MessageId);
      message.Complete();
   }
   else
   {
      // plus de message dans la file d'attente
      break;
   }
}

Rien de nouveau donc une fois que l’on sait comment obtenir une session.

 

La question importante pour la viabilité des sessions est de savoir si la file est bloquée à chaque Receive sur une session. Le plus simple est de le vérifier avec toujours nos deux messages:

var sessions = client.GetMessageSessions().ToList();
MessageSession session1 = client.AcceptMessageSession(sessions[0].SessionId);
MessageSession session2 = client.AcceptMessageSession(sessions[1].SessionId);
 
var message1 = session1.Receive();
Console.WriteLine(message1.MessageId);
var message2 = session2.Receive();
Console.WriteLine(message2.MessageId);
 
message2.Complete();
message1.Complete();

 

Nous savons qu’il y a deux sessions. Les messages sont d’abord reçus (Receive) et seulement ensuite marqués comme traités, dans l’ordre inverse où on les a reçu d’ailleurs. L’affichage donne ceci:

2016-05-16_12-30-19

Chaque session se comporte donc comme un flux, c’est à dire une pile indépendante.

 

Sécuriser l’accès à la QUEUE

Lorsque l’on créé un espace de nom on créée un jeton SAS qui donne accès à toutes les ressources de l’espace de noms sans restriction. Il est préférable d’être plus fin. On peut créer des permissions au niveau de la QUEUE.

 

Si on se rend dans la configuration de la QUEUE:

2016-05-15_18-58-43

On observe qu’aucune police d’accès n’est créée par défaut. La QUEUE n’est pas sécurisée.

On va typiquement créer une police pour le rôle de l’émetteur et du récepteur:

2016-05-15_19-02-27

Comme on le voit les permissions sont bien cloisonnées. Si on fait défiler l’écran, on tombe sur la zone qui permet de récupérer le jeton de chaque police:

2016-05-15_19-04-44

La chaîne de connexion est conforme à la syntaxe que nous avons rencontré dans le chapitre qui précède. On peut aussi récupérer la chaîne de connexion depuis le Dashboard.

2016-05-15_19-10-54

 

Il ne reste plus qu’à utiliser la police avec le jeton qui va bien dans votre chaîne de connexion.

Bibliographie

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Publicités

Laisser un commentaire

Entrez vos coordonnées ci-dessous ou cliquez sur une icône pour vous connecter:

Logo WordPress.com

Vous commentez à l'aide de votre compte WordPress.com. Déconnexion / Changer )

Image Twitter

Vous commentez à l'aide de votre compte Twitter. Déconnexion / Changer )

Photo Facebook

Vous commentez à l'aide de votre compte Facebook. Déconnexion / Changer )

Photo Google+

Vous commentez à l'aide de votre compte Google+. Déconnexion / Changer )

Connexion à %s