Vlad Riabchenko
Architecte technique chez Webnet.
Le composant Messenger permet d’envoyer et de recevoir facilement des messages entre les applications ou differents composants d’une seule application. Je dis facilement parce que Messenger fournit une abstraction ingénieuse pour faciliter l’envoi et la réception de messages. Une ligne de code suffit pour envoyer le message :
$bus->dispatch($message);
Ce message peut être envoyé en synchrone vers un handler dans un autre bout de votre application ou bien via un transport asynchrone (AMQP broker, Redis ou encore une API quelconque).
De l’autre côté le bus de messages peut recevoir un message depuis l’extérieur de votre application et orienter celui-ci vers un handler interne. Un handler est un bout de code que vous ajoutez pour traiter ces messages spécifiques.
Le troisième scénario rappelle «l’event dispatcher». Il se peut que le message soit traité en synchrone et ne quitte pas l’application :
Le message peut être n’importe quel objet de n’importe quelle classe. Message
est un objet ne contenant que l’information utile sans regard du message bus. Il
s’avère très utile d’attacher une information complémentaire par rapport à son
traitement par message bus. Le message est emballé dans une enveloppe (instance
de classe Envelope
) à laquelle on peut attacher des cachets (stamps) (instances de
classe Stamp
). Par exemple ReceivedStamp
marque le message reçu par le
receveur extérieur. Les enveloppes contenant ReceivedStamp
seront ignorées par
les senders afin que le message issu de l’extérieur ne soit pas renvoyé à
l’extérieur. Au lieu de cela, le message bus cherchera un handler dans
l’application.
MessageBus
est un pivot de Messenger. Elle accepte tous les messages et les
oriente vers les bons destinataires. Malgré sa grande importance c’est une
petite classe qui ne comporte qu’une propriété et une méthode :
Toute la logique complexe pour faire tourner la machinerie de réception et d’envoi des messages réside dans les multiples middlewares. La seule vraie responsabilité du message bus est de sélectionner le premier middleware et de lui passer un message et un itérateur sur tous les middlewares enregistrés.
Il convient de noter qu’il peut y avoir plusieurs bus de messages.
Les middlewares implémentent MiddlewareInterface
. Un middleware reçoit le message,
le traite puis optionnellement le passe au middleware suivant. Et ainsi de
suite. L’ordre de middlewares est très important. Chaque bus possède sa propre
pile de middlewares. Quand la configuration du bus est confiée au FrameworkBundle
il ajoute les middlewares suivants :
add_bus_name_stamp_middleware
ajoute BusNameStamp
contenant le nom de bus
qui dispatche le message.reject_redelivered_message_middleware
n’est utilisé qu’avec AMQP lors de la
réception du message de la part du broker AMQP. La chaîne de middlewares sera
interrompue et un AMQP broker recevra un negative ack quand le message se
présentera la seconde fois. Cela arrive quand AMQP broker livre le message à
l’application et ne reçoit aucun avis de réception (ack), il essaye de
relivrer le message. Ce middleware existe pour empêcher la boucle infinie de
relivraison des messages problématiques.dispatch_after_current_bus
retient le traitement des nouveaux messages
envoyés par un handler (qui est déjà un train de traiter son message).
Plus d’info.failed_message_processing_middleware
. Il peut arriver qu’une exception soit
levée lors du traitement du message. Dans ce cas le message sera redirigé vers
une file «failed» avec le stamp SentToFailureTransportStamp
. La commande
messenger:failed:retry
re-dispatchera alors ces messages comme s’ils
provenaient du receveur normal à l’aide de ce middleware.TraceableMiddleware
, ValidationMiddleware
. Vos middlewares peuvent changer
le message, ajouter ou supprimer les stamps, arrêter la file de middlewares.send_message
passe un message à un sender et arrête la pile de middlewares
s’il s’agit du message fabriqué par l’application. S’il n’existe pas de sender
pour le message, celui-ci sera passé au prochain middleware. Si l’enveloppe
porte ReceivedStamp
send_message
middleware ne fait rien pour éviter le
renvoi à l’extérieur du message reçu de l’extérieur.handle_message
choisit le handler pour le message et le lui passe.Les messages sont soit envoyés vers l’extérieur par un sender ou traités par des handlers.
Les handlers déclarent les classes de messages qu’il veulent traiter (ou leurs
classes mères ou l’interface qu’ils implémentent). Le message sera passé à tous
les handlers intéressés. Handler ne reçoit que le message lui-même sans aucun
stamp. Quand le message est traité en synchrone il est possible de se servir de
HandledStamp
pour récupérer le résultat du handler :
$envelope = $bus->dispatch(new Message());
$handledStamp = $envelope->last(HandledStamp::class);
$result = $handledStamp->getResult();
Comment ça se passe ? HandledStamp
est ajouté seulement par
HandleMessageMiddleware
quand plusieurs conditions sont réunies :
namespace Symfony\Component\Messenger\Middleware;
class HandleMessageMiddleware implements MiddlewareInterface
{
private $handlersLocator;
/**
* {@inheritdoc}
*/
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$handler = null;
$message = $envelope->getMessage();
foreach ($this->handlersLocator->getHandlers($envelope) as $handlerDescriptor) {
$handler = $handlerDescriptor->getHandler();
// L’execution de handler.
// Le résultat renvoyé par le handler est sauvegardé dans HandledStamp.
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
// Attacher le stamp à l’enveloppe
$envelope = $envelope->with($handledStamp);
}
return $stack->next()->handle($envelope, $stack);
}
}
Quand le message est envoyé par le sender dans SendMessageMiddleware
ce
dernier arrête l’exécution de la chaîne de middlewares et
HandleMessageMiddleware
ne s’exécute pas :
namespace Symfony\Component\Messenger\Middleware;
class SendMessageMiddleware implements MiddlewareInterface
{
/**
* @var SendersLocatorInterface
*/
private $sendersLocator;
/**
* {@inheritdoc}
*/
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$sender = null;
if ($envelope->all(ReceivedStamp::class)) {
// it’s a received message, do not send it back
} else {
foreach ($this->getSenders($envelope, $redeliveryStamp) as $alias => $sender) {
$envelope = $envelope->with(new SentStamp(/** … */));
$envelope = $sender->send($envelope);
}
}
// Passer l’enveloppe à prochain middleware (HandleMessageMiddleware)
// si l’un des deux est vrai :
// - aucun sender ne convient pas pour ce message
// - le message a été reçu de l’extérieur (continent ReceivedStamp)
if (null === $sender) {
return $stack->next()->handle($envelope, $stack);
}
// le message envoyé ne doit pas être traité par le prochain middleware
return $envelope;
}
/**
* @return iterable|SenderInterface[]
*/
private function getSenders(Envelope $envelope, ?RedeliveryStamp $redeliveryStamp): iterable
{
return $this->sendersLocator->getSenders($envelope);
}
}
La fonctionnalité proposée dans le PR est assez spécifique et il est possible que cette idée ne soit pas retenue par la communauté. En revanche il est intéressant de réaliser une preuve de concept de cette idée et tester la flexibilité du composant Messenger. Dans cette article comme dans la PR je vais me concentrer seulement sur AMQP transport.
Quand le message est traité par un handler en synchrone, il ne sort pas de l’application. Le client peut attendre tranquillement l’exécution du handler et obtenir son résultat. Dans le cas où le message doit être envoyé vers l’extérieur le traitement est souvent asynchrone. Le sender se contente d’envoyer le message sans attendre que le message soit traité. Par conséquent le handler extérieur n’a pas la possibilité de fournir le résultat au client.
En revanche il est possible d’utiliser le patron de conception Remote Procedure Call ou RPC:
La file messages est ordinaire: plusieurs client peuvent y envoyer les requêtes, plusieurs handlers peuvent consommer et traiter ces requêtes. En envoyant une requête dans une file messages le client crée une file temporaire exclusive. C’est une file de callback dessinée exclusivement à ce client, c’est-à-dire il ne peut exister qu’un seul consommateur connecté à cette file. Le nom d’une file de callback est attaché à la requête comme la propriété reply_to. Après l’envoi d’une requête le client peut continuer son exécution normalement et à un moment donné il peut s’arrêter et attendre la réponse dans une file de callback.
Le handler envoie sa réponse dans une file indiquée dans la propriété reply_to de la requête :
La réponse est distribuée au client. Ayant terminé de traiter cette réponse le client détruit la connection à la file temporaire exclusive. Par conséquent AMQP broker supprime cette file automatiquement.
Tout d’abord il faut donner au client la possibilité d’indiquer qu’il attend la
réponse de la part de handler traitant son message. Le candidat parfait pour
cela est un nouveau stamp ReplyStamp
:
// Attacher ReplyStamp a l’enveloppe pour indiquer que le client attend la réponse.
$envelope = $messageBus->dispatch($message, [new ReplyStamp()]);
ReplyStamp
servira aussi à récupérer la réponse :
namespace Symfony\Component\Messenger\Stamp;
/**
* Stamp used to identify that client wants a response.
* Client gets a response from a handler via this stamp.
*/
class ReplyStamp implements StampInterface
{
/**
* @var mixed
*/
private $response;
/**
* @param mixed $response
*/
public function setResponse($response): void
{
$this->response = $response;
}
/**
* @return mixed
*/
public function getResponse()
{
return $this->response;
}
}
Le code du client qui attend la réponse traitée par un handler en synchrone (sans envoi à l’extérieur):
$envelope = $messageBus->dispatch($message, [new ReplyStamp()]);
$replyStamp = $envelope->last(ReplyStamp::class);
$result = $replyStamp->getResponse();
Le code du client qui attend la réponse pour sa requête traitée en asynchrone (envoyée au AMQP broker):
// Indiquer que le client attend la réponse
$envelope = $messageBus->dispatch($message, [new ReplyStamp()]);
// s’occuper d’autres choses lorsque la requête est en train d’être envoyée et traitée
$replyStamp = $envelope->last(AmqpReplyStamp::class);
// Attendre la réponse, la methode getResponse est bloquante.
$result = $replyStamp->getResponse();
A mon avis il est important que le client soit au courant que la réponse sera
renvoyée en asynchrone. Le code qui est explicite à cet égard permettra de
prévoir les problèmes potentiels et facilitera le debugging. Pour cette raison
il vaut mieux utiliser un stamp spécifique AmqpReplyStamp
plutôt que
réutiliser ReplyStamp
pour cacher RPC.
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
/**
* Stamp ajouté par @see AmqpSender quand le client attends une réponse de la
* part de handler.
* Le client récupère la réponse d’une file temporaire exclusive.
* Ce stamp implémente NonSendableStampInterface parce qu’elle ne doit pas être
* envoyé vers handler via AMQP brocker. C’est seulement le client qui est
* intéressé par ce stamp pour pouvoir recuperer le resultat.
*/
class AmqpReplyStamp implements NonSendableStampInterface
{
/**
* File de callback
*
* @var \AMQPQueue
*/
private $replyQueue;
/**
* @param \AMQPQueue $replyQueue
*/
public function __construct(\AMQPQueue $replyQueue)
{
$this->replyQueue = $replyQueue;
}
/**
* @return mixed
*/
public function getResponse()
{
$response = null;
// Consommer les messages d’une file de callback. C’est un appel bloquant.
$this->replyQueue->consume(function(\AMQPEnvelope $envelope) use (&$response) {
$response = $envelope->getBody();
// Arrêter la consommation de messages apres avoir recu la reponse.
return false;
});
return $response;
}
}
AmqpReplyStamp
doit être attaché à l’enveloppe de la requête quand elle est
envoyée vers AMQP broker. Pour attacher ce stamp il faut d’abord créer une file
temporaire de callback. AmqpSender
est un meilleur endroit cette
fonctionnalité.
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReplyStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* Symfony Messenger sender to send messages to AMQP brokers using PHP's AMQP extension.
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class AmqpSender implements SenderInterface
{
private $serializer;
private $connection;
public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);
/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delay = $delayStamp ? $delayStamp->getDelay() : 0;
/** @var AmqpStamp|null $amqpStamp */
$amqpStamp = $envelope->last(AmqpStamp::class);
+ if ($replyStamp = $envelope->last(ReplyStamp::class)) {
+ // Déclarer une file exclusive, creer un AmqpReplyStamp et
+ // ajouter-le à l’enveloppe du requete.
+ $replyQueue = $this->connection->createReplyQueue();
+ $amqpReplyStamp = new AmqpReplyStamp($replyQueue);
+ $envelope = $envelope->with($amqpReplyStamp);
+
+ // Ajouter une propriété 'reply_to' a une message pour que le handler
+ // sache dans quelle file mettre la réponse.
+ $amqpStamp = AmqpStamp::createWithAttributes(['reply_to' => $replyQueue->getName()], $amqpStamp);
+ }
// ...
try {
// Déléguer la tâche de transmettre le message a l’objet $connection
$this->connection->publish(
$encodedMessage['body'],
$encodedMessage['headers'] ?? [],
$delay,
$amqpStamp
);
} catch (\AMQPException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
return $envelope;
}
}
L’objet de $connection
contient de multiples méthodes utiles pour interagir
avec AMQP broker:
Il faut y ajouter la méthode createReplyQueue
pour créer une file de callback:
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
/**
* An AMQP connection.
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class Connection
{
private $amqpFactory;
/**
* @var \AMQPChannel|null
*/
private $amqpChannel;
// ...
+ /**
+ * Create an exclusive queue to get a response from.
+ */
+ public function createReplyQueue(): \AMQPQueue
+ {
+ $queue = $this->amqpFactory->createQueue($this->channel());
+ $queue->setFlags(\AMQP_EXCLUSIVE);
+ $queue->declareQueue();
+
+ return $queue;
+ }
public function channel(): \AMQPChannel
{
if (null === $this->amqpChannel) {
// creer un channel
}
return $this->amqpChannel;
}
// ...
}
D’un côté le composant messenger contient désormais tout ce qu’il faut pour permettre aux clients d’attendre les réponses. De l’autre côté, il faut que le résultat renvoyé par un handler soit transmis à un client intéressé.
Le handler reçoit le message tel quel, sans enveloppe ni stamps. Par conséquent
il n’est pas au courant si le client attend une réponse ou pas. Il n’est
d’ailleurs pas une bonne idée de confier une responsabilité d’envoyer la réponse
aux handlers. C’est le composant Messenger qui doit s’en occuper. Il suffit de
trouver un endroit pour ajouter ce code de renvoi. Comme c’est indiqué avant
HandleMessageMiddleware
s’occupe de choisir un bon handler et lui passer une
requête. C’est le dernier middleware dans la queue de middlewares. Il y a donc
deux choix : modifier HandleMessageMiddleware
ou ajouter un nouveau middleware
à la fin de la file des middlewares. A mon avis la création d’un nouveau
ReplyMiddleware
est la meilleure solution :
namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Messenger\Stamp\ReplyStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
/**
* Middleware responsible for replying results returned by handler.
*/
class ReplyMiddleware implements MiddlewareInterface
{
/**
* {@inheritdoc}
*/
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
// Agir si le message a été traité avec succès et le client attends la réponse
if (($handledStamp = $envelope->last(HandledStamp::class))
&& ($replyStamp = $envelope->last(ReplyStamp::class))
) {
$response = $handledStamp->getResult();
// Si le message a été traité en synchrone
// il faut juste sauvegarder la réponse du handler.
$replyStamp->setResponse($response);
// Si le message a été reçu par AmqpReceiver
// il faut envoyer la réponse vers la queue de callback.
if ($amqpRecievedStamp = $envelope->last(AmqpReceivedStamp::class)) {
$replyTo = $amqpRecievedStamp->getAmqpEnvelope()->getReplyTo();
$amqpRecievedStamp->getConnection()->reply($response, $replyTo);
}
}
return $stack->next()->handle($envelope, $stack);
}
}
Pour que ReplyMiddleware
puisse fonctionner il faut ajouter la méthode reply
dans la classe Symfony\Component\Messenger\Transport\AmqpExt\Connection
:
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
/**
* An AMQP connection.
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class Connection
{
// ...
+ /**
+ * Put a response to an exclusive queue
+ *
+ * @param string $response
+ * @param string $replyTo Queue name to reply to
+ */
+ public function reply(string $response, string $replyTo)
+ {
+ $defaultExchange = new \AMQPExchange($this->channel());
+ $defaultExchange->publish($response, $replyTo);
+ }
public function channel(): \AMQPChannel
{
if (null === $this->amqpChannel) {
// creer un channel
}
return $this->amqpChannel;
}
// ...
}
Mais d’abord ReplyMiddleware
a besoin d’accéder à la connection AMQP actuelle
afin d’appeler la méthode reply. Bien qu’il puisse exister de meilleures
solutions j’ai mis la connection AMQP dans AmqpReceivedStamp
:
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
/**
* Stamp applied when a message is received from Amqp.
*/
class AmqpReceivedStamp implements NonSendableStampInterface
{
+ private $connection;
private $amqpEnvelope;
private $queueName;
- public function __construct(\AMQPEnvelope $amqpEnvelope, string $queueName)
+ public function __construct(Connection $connection, \AMQPEnvelope $amqpEnvelope, string $queueName)
{
+ $this->connection = $connection;
$this->amqpEnvelope = $amqpEnvelope;
$this->queueName = $queueName;
}
+ public function getConnection(): Connection
+ {
+ return $this->connection;
+ }
+
public function getAmqpEnvelope(): \AMQPEnvelope
{
return $this->amqpEnvelope;
}
public function getQueueName(): string
{
return $this->queueName;
}
}
Je me suis permis de ne pas respecter la rétro-compatibilité dans le cadre de cette PR parce qu’il s’agit plutôt d’une preuve de concept. La couche de rétro-compatibilité peut être ajoutée par la suite.
Il reste à ajouter le nouveau middleware dans la pile des middlewares par
défaut. Lorsque FrameworkBundle
enregistre les bus il s’occupe de leurs
middlewares. Il suffit d’ajouter reply_middleware
à la fin de la pile des
middlewares par défaut :
namespace Symfony\Bundle\FrameworkBundle\DependencyInjection;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
/**
* FrameworkExtension.
*
* @author Fabien Potencier <fabien@symfony.com>
* @author Jeremy Mikola <jmikola@gmail.com>
* @author Kévin Dunglas <dunglas@gmail.com>
* @author Grégoire Pineau <lyrixx@lyrixx.info>
*/
class FrameworkExtension extends Extension
{
//...
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader, array $serializerConfig, array $validationConfig)
{
$loader->load('messenger.xml');
$defaultMiddleware = [
'before' => [
['id' => 'add_bus_name_stamp_middleware'],
['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
],
'after' => [
['id' => 'send_message'],
['id' => 'handle_message'],
+ ['id' => 'reply_middleware'],
],
];
foreach ($config['buses'] as $busId => $bus) {
$middleware = $bus['middleware'];
if ($bus['default_middleware']) {
$middleware = array_merge($defaultMiddleware['before'], $middleware, $defaultMiddleware['after']);
}
// ...
}
// ...
}
}
Le code du FrameworkExtension
montre que chaque application peut configurer la
file de middlewares pour ses bus.
Par exemple cette configuration insérera deux middlewares MyMiddleware
et
ValidationMiddleware
entre FailedMessageProcessingMiddleware
(dernier dans
before
) et SendMessageMiddleware
(premier dans after
).
framework:
messenger:
buses:
messenger.bus.default:
middleware:
- 'App\Middleware\MyMiddleware'
- validation
Pour ajouter des middlewares à la fin il faut composer entièrement la file de middlewares :
framework:
messenger:
buses:
messenger.bus.default:
default_middleware: false
middleware:
- 'App\Middleware\MyMiddleware'
- add_bus_name_stamp_middleware
- reject_redelivered_message_middleware
- dispatch_after_current_bus
- failed_message_processing_middleware
- send_message
- handle_message
- reply_middleware
La touche finale est la déclaration du middleware en tant que service dans
symfony/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
:
<?xml version="1.0" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd">
<services>
+ <service id="messenger.middleware.reply_middleware" class="Symfony\Component\Messenger\Middleware\ReplyMiddleware" />
</services>
</container>
Le schéma suivant montre l’envoi d’un message synchrone. Notez les stamps qui sont ajoutés par le code client et le messenger afin d’assurer le renvoi du résultat.
Quand le message est envoyé de façon asynchrone, le Messenger s’occupe de
l’envoi du message pour l’expediteur et de réception du message pour le destinataire.
Notez l’ajout du AmqpReplyStamp
par AmqpSender
lors de l’envoi, elle permet
à l’expediteur d’attendre et de récupérer le résultat.
Le message est sérialisé avec son enveloppe et ses stamps par expéditeur. Il
faut mentionner que les stamps qui implémentent NonSendableStampInterface
ne
sont pas envoyés. C’est le cas notamment de AmqpStamp
, AmapReplyStamp
,
SentStamp
. Le nouveau ReplyStamp
s’envoie avec l’enveloppe pour qu’un
destinataire soit au courant si le client attend une réponse tandis que
AmqpReplyStamp
contenant l’objet de la queue de callback n’a aucune raison
d’être envoyé. Le schéma suivant montre le processus de traitement du message
par destinataire :
En guise de conclusion je vous rappelle le lien vers la documentation expliquant comment contribuer au code de Symfony.
Joyeux Noël à tous !