Symfony Messenger asynchronicznie

gildia-developerow.pl 1 rok temu

To, iż Symfony Messenger jest niezastąpiony, wszyscy wiedzą. Za to, jak go skonfigurować – niekoniecznie. Z tego powodu właśnie powstał dzisiejszy post. Skonfigurujmy razem Messengera, aby przeprocesował komendę asynchronicznie!

Command Pattern zawsze na propsie

Jedną z zalet pracy z Symfony Messengerem jest to, iż nasz kod układa się w bardzo interesujący schemat komend, które będąc przez nas wydanymi, zostają przez system przeprocesowane. Logika, którą mamy w aplikacji staje się reużywalna; tą samą komendę możemy wykorzystać zarówno w kontekście HTTP (przy obsłudze formularza, w API) oraz w CLI. Dodatkowo, o ile dobrze się zakręcimy, to możemy zoptymalizować naszą aplikację, zlecając jej wykonanie mniej krytycznych czasowo operacji do tzw. backgroundu, czyli wykonania asynchronicznego.

Samo skonfigurowanie Symfony Messengera w aplikacji nie jest niczym skomplikowanym. Schody zaczynają się właśnie wtedy, kiedy chcemy pobawić się w konfigurację asynchronicznych transportów. Zwłaszcza, kiedy nie mamy jeszcze doświadczenia z systemami kolejkowymi. Dlatego właśnie przejdziemy przez ten proces tu – razem

Postawmy sobie kolejkę!

Ponieważ będziemy pracowali na systemie kolejkowym, to fajnie by było mieć gdzieś w systemie postawiony system obsługujący kolejki. Nie chciałbym się skupiać nad tym, który system warto postawić. Dla mnie wystarczy zwykły RabbitMQ postawiony na Dockerze:

# docker-compose.rabbitmq.yaml version: "3.4" services: rabbitmq: image: rabbitmq:3-management-alpine ports: - 5672:5672 - 15672:15672 volumes: - rabbit-data:/var/lib/rabbitmq - rabbit-log:/var/log/rabbitmq volumes: rabbit-data: rabbit-log:

To wyżej, to jest plik konfiguracyjny docker-compose. Aby uruchomić Rabbita na jego podstawie, trzeba skorzystać z komendy:

$ docker-compose -f ./docker-compose.rabbitmq.yaml up

Nie wiem, jak Wy, ale ja w taki sposób odpalam sobie mnóstwo zewnętrznych serwisów: Redisa, Elastic Searcha, Mailhoga, czasami choćby MySQLa. Jest to fajne rozwiązanie, kiedy przełączamy się między mnóstwem projektów, które do developmentu potrzebują różnych serwisów w różnych wersjach.

Teraz popracujmy na Symfony…

Wszystko, co przedstawiam poniżej, można wyklikać sobie na czystym Symfony. o ile nie chcecie brudzić sobie swojego pobocznego projektu, to postawcie sobie nową apkę.

Aplikacja, którą tworzymy ma proste zadanie – stworzyć endpoint, którego wywołanie ma wrzucić wiadomość na kolejkę, do której będzie podpięty skrypt konsumujący wiadomości. Sama wiadomość ma dotyczyć tworzenia nowego produktu, o ile ten nie istnieje.

Do pracy będzie potrzebna bardzo prosta encja produktu:

<?php declare(strict_types=1); namespace App\Entity; class Product { protected ?int $id; public function __construct( protected string $sku, protected string $name, protected int $price, ) { } public function getId(): ?int { return $this->id; } public function getSku(): string { return $this->sku; } public function getName(): string { return $this->name; } public function getPrice(): int { return $this->price; } }

Konfiguracja mapowania naszej encji ma się następująco:

<?xml version="1.0" encoding="UTF-8"?> <doctrine-mapping xmlns="http://doctrine-project.org/schemas/orm/doctrine-mapping" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://doctrine-project.org/schemas/orm/doctrine-mapping http://doctrine-project.org/schemas/orm/doctrine-mapping.xsd"> <entity name="App\Entity\Product" table="product"> <id name="id" type="integer" column="id"> <generator strategy="AUTO"/> </id> <field name="name" type="string" nullable="false" /> <field name="sku" type="string" nullable="false" /> <field name="price" type="integer" nullable="false" /> </entity> </doctrine-mapping>

Jak widać, rocket science tutaj nie ma. Trzy pola na krzyż plus identyfikator. Tak na prawdę, to chodzi o to, aby mieć jakiś zasób, który możemy utworzyć, aby stwierdzić, iż skrypt konsumujący działa dobrze. Tego, w jaki sposób podpiąć mapowanie encji pod Doctrine tłumaczyć nie będę – zakładam, iż tego typu wiedza znajdzie się gdzieś w głowie, albo w internecie.

Teraz tworzymy komendę, czyli wiadomość, która zostanie wysłana na kolejkę:

<?php declare(strict_types=1); namespace App\Command; final class CreateNewProductCommand { public function __construct( readonly private string $sku, readonly private string $name, readonly private int $price, ) { } public function getSku(): string { return $this->sku; } public function getName(): string { return $this->name; } public function getPrice(): int { return $this->price; } }

Ponieważ Symfony Messenger domyślnie serializuje wiadomości do postaci JSONa, polecam korzystać z typów prostych, wbudowanych w PHPa. Do tego, w ślad za sentencją „Raz wydanej komendy nie można zmienić” – setterów nie implementujemy .

Następnym krokiem będzie utworzenie klasy handlera, który zwykle będzie entrypointem do logiki biznesowej:

<?php declare(strict_types=1); namespace App\CommandHandler; use App\Command\CreateNewProductCommand; use App\Entity\Product; use Doctrine\ORM\EntityManagerInterface; use RuntimeException; final class CreateNewProductCommandHandler { public function __construct( readonly private EntityManagerInterface $entityManager, ) { } public function __invoke(CreateNewProductCommand $command): void { $repository = $this->entityManager->getRepository(Product::class); $sku = $command->getSku(); $existingProduct = $repository->findOneBy(['sku' => $sku]); if ($existingProduct !== null) { throw new RuntimeException(sprintf('Can\'t create product with SKU %s', $sku)); } $newProduct = new Product($sku, $command->getName(), $command->getPrice()); $this->entityManager->persist($newProduct); } }

Ponieważ u nas logika jest ultra prosta, to zawarłem ją całą w handlerze. Możecie zauważyć, iż nie mamy tutaj żadnego wykonania metody flush() na obiekcie Entity Managera. To nie jest pomyłka – ten temat załatwimy gdzie indziej

Dalej tworzymy kontroler, który przechwyci żądanie, a następnie utworzy obiekt komendy i wrzuci go na szynę zdefiniowaną w konfiguracji Messengera:

<?php declare(strict_types=1); namespace App\Controller; use App\Command\CreateNewProductCommand; use Symfony\Component\HttpFoundation\JsonResponse; use Symfony\Component\HttpFoundation\Request; use Symfony\Component\HttpFoundation\Response; use Symfony\Component\Messenger\MessageBusInterface; final class CreateNewProductController { public function __construct( readonly private MessageBusInterface $commandBus, ) { } public function __invoke(Request $request): Response { $payload = $request->getPayload(); $sku = $payload->get('sku', 'no-sku'); $name = $payload->get('name', 'no-name'); $price = (int)$payload->get('price', 0); $this->commandBus->dispatch( new CreateNewProductCommand($sku, $name, $price), ); return new JsonResponse(['queued' => true]); } }

A teraz trochę konfiguracji…

Kodzik mamy za sobą, teraz pobawmy się w konfigurowanie. Na pewno będziemy potrzebowali definicję dwóch serwisów: kontrolera, oraz handlera.

# config/services.yaml services: App\Controller\CreateNewProductController: arguments: - "@app.command_bus" public: true App\CommandHandler\CreateNewProductCommandHandler: arguments: - "@doctrine.orm.entity_manager" tags: - "messenger.message_handler"

Kontroler jako argument potrzebuje instancję Message Busa. Tak się składa, iż na podstawie konfiguracji, Symfony Messenger tworzy w locie serwisy odpowiadające zdefiniowanym Message Busom. Nazwa serwisu jest taka sama, co skonfigurowana nazwa Message Busa. Do tego, ze względu na wymogi Symfony – serwis konfigurujemy jako publiczny.

Oprócz kontrolera definiujemy również serwis dla Command Handlera. Argument, który jest potrzebny ze względu na naszą logikę biznesową – Entity Manager – wstawiamy w dosyć domyślny sposób. Oprócz tego, musimy jakoś skonfigurować nasz Command Handler, aby Messenger o nim wiedział. Po to właśnie konfigurujemy tag messenger.message_handler. Resztę, czyli to, do jakiej komendy ten Handler jest przypisany – Messenger sam sobie wyciągnie.

Oprócz serwisów konfigurujemy również routing:

# config/routes.yaml products_post: path: /products methods: [POST] defaults: _controller: App\Controller\CreateNewProductController

Podsumowując: mamy endpoint doczepiony do kontrolera, który wrzuca wiadomość na szynę. Mamy dla tej szyny skonfigurowaną klasę Handlera. Czego nam tutaj brakuje? A no tak, samego Messengera.

Wisienka na torcie – konfiguracja Symfony Messengera

Do obsługi Rabbita będą nam potrzebne trzy paczki:

$ composer require symfony/messenger $ composer require sroze/messenger-enqueue-transport $ composer require enqueue/amqp-bunny

Symfony Messenger wystepuje tutaj jako biblioteka, która daje nam abstrakcję związaną z dispatchowaniem wiadomości. Dalej mamy paczkę sroze/messenger-enqueue-transport, która jest rozszerzeniem Messengera – implementuje ona transport (w przełożeniu na nomenklaturę Messengera) realizowany przez Enqueue – bibliotekę dającą jednolitą abstrakcję kolejkową dla wielu systemów kolejkowych. To właśnie dzięki Enqueue jesteśmy w stanie w relatywnie prosty sposób przełączyć się między różnego typu systemami kolejkowymi.

Na koniec, AMQP-Bunny występuje tutaj jako driver dający nam możliwość nawiązania komunikacji z RabbitMQ przez protokół AMQP. Jest to zależność wymagana przez Enqueue do działania z Rabbitem. Co do samego AMQP to należy pamiętać, iż jest on szerszym pojęciem, które nie dotyczy wyłącznie Rabbita. Istnieje wiele różnych systemów kolejkowych opartych o ten protokół.

Tyle o samych paczkach, przechodzimy teraz do ich konfiguracji. Najpierw konfigurujemy DSN, po którym Enqueue będzie komunikował się z Rabbitem. Robimy to w miejscu, gdzie definiujemy zmienne środowiskowe:

# .env.local ###> enqueue/enqueue-bundle ### ENQUEUE_DSN=amqp://guest:guest@localhost:5672/%2f ###< enqueue/enqueue-bundle ###

Dalej konfigurujemy Messengera. Tworzymy Message Bus o nazwie app.command_bus, do którego dopinamy middleware doctrine_transaction, który opakowuje nasze wiadomości w transakcję bazodanową. Sam w sobie Messenger nie posiada tego Middleware. Jest on dostępny z poziomu paczki symfony/doctrine-bridge.

# config/packages/messenger.yaml framework: messenger: default_bus: app.command_bus buses: app.command_bus: middleware: - doctrine_transaction transports: async_transport: dsn: enqueue://default options: delayStrategy: Enqueue\AmqpTools\RabbitMqDlxDelayStrategy routing: App\Command\CreateNewProductCommand: async_transport

Oprócz definicji Message Busa, mamy definicję transportu. Na pewno Waszą uwagę zwróci opcja delayStrategy. I na pewno zadacie sobie (mi) pytanie, po co nam tutaj ta opcja. Dodałem ją, bo gdzieś pod spodem Enqueue w sposób domyślny wykorzystuje opóźnienia w odbiorze wiadomości, przy wykorzystaniu RabbitMqDelayPluginDelayStrategy. Niestety, ten feature jest dostępny po doinstalowaniu odpowiedniego pluginu do Rabbita. Miałem do wyboru albo bawić się ze snippetem docker-compose z początku posta, albo – wyłączyć to tutaj.

No to co, uruchamiamy?

Aby uruchomić całość, potrzebujemy dwóch rzeczy: działającego skryptu konsumującego oraz wywołania utworzonego wcześniej endpointu. Skrypt konsumujący dostarcza nam Messenger przy pomocą komendy:

$ bin/console messenger:consume async_transport

A request na endpoint puszczamy chociażby w terminalu:

curl --location --request POST 'localhost:8000/products' \ --header 'Content-Type: application/json' \ --data '{ "name": "Test", "sku": "test", "price": 123 }'

Jeżeli nie chcecie bawić się CURLem w konsoli, to polecam Postmana lub Insomnię.

Mamy to!

Chciałem napomknąć, iż dzisiejszy wpis miał za zadanie przedstawić jedynie koncept, który nie powinien być wykorzystywany produkcyjnie, bez uprzedniego zbadania tematu i dostosowania konfiguracji użytych bibliotek do swoich potrzeb.

Aha, no i zapomniałbym. Dwie lekcje, które należy odrobić, aby żyło nam się lepiej:

  1. Pomimo, iż w Messengerze można, to nie zwracaj nic z klasy Command Handlera. o ile cokolwiek będzie uzależnione od tej wartości zwróconej, to przeniesienie wykonania do backgroundu serwerowego może okazać się trudne, bądź choćby niemożliwe do wykonania.
  2. Tworząc komendy pracuj na typach prostych. Im prostsza serializacja, tym lepiej. A już na pewno nie wrzucaj tam encji ani innego rodzaju obiektów, które przetrzymują połączenia, otwarte deskryptory do plików itp. Zamiast tego, wrzucaj do komend takie dane, które pozwolą Ci na nowo zbudować ten obiekt.
Idź do oryginalnego materiału