В этой статье пройдемся по еще не рассмотренным библиотекам для работы с распределенными транзакциями, очередями и БД, которые можно найти в нашем репозитории на GitHub (исходники лежат здесь), а Nuget-пакеты здесь.

ViennaNET.Sagas
Когда в проекте происходит переход на DDD и микросервисную архитектуру, то при разнесении бизнес-логики по разным сервисам, возникает проблема, связанная с необходимостью реализации механизма распределенных транзакций, ведь многие сценарии часто затрагивают сразу несколько доменов. С такими механизмами подробнее можно познакомиться, например, в книге Microservices Patterns, Chris Richardson.
В наших проектах мы реализовали простой, но полезный механизм: сага, а точнее сага на основе оркестрации. Суть ее в следующем: есть некий бизнес-сценарий, в котором необходимо последовательно совершить операции в разных сервисах, при этом, в случае возникновения каких-либо проблем на любом шаге, необходимо вызвать процедуру отката всех предыдущих шагов, где она предусмотрена. Таким образом, в конце выполнения саги, независимо от успешности, мы получаем консистентные данные во всех доменах.
Наша реализация пока сделана в базовом виде и не завязана на использовании каких-либо способов взаимодействия с другими сервисами. Применять её несложно: достаточно сделать наследника от базового абстрактного класса SagaBase<Т>, где T это ваш класс контекста, в котором можно хранить исходные данные, необходимые для работы саги, а также некоторые промежуточные результаты. Экземпляр контекста будет пробрасываться во все шаги во время выполнения. Сама сага является stateless классом, поэтому экземпляр может быть помещен в DI как Singleton, чтобы получить необходимые зависимости.
Пример объявления:
public class ExampleSaga : SagaBase<ExampleContext>{ public ExampleSaga() { Step("Step 1") .WithAction(c => ...) .WithCompensation(c => ...); AsyncStep("Step 2") .WithAction(async c => ...); }}
Пример вызова:
var saga = new ExampleSaga();var context = new ExampleContext();await saga.Execute(context);
Полноценные примеры разных реализаций можно посмотреть здесь и в сборке с тестами.
ViennaNET.Orm.*
Набор библиотек для работы с различными БД через Nhibernate. У нас используется подход DB-First с применением Liquibase, поэтому здесь присутствует только функционал по работе с данными в готовой БД.
ViennaNET.Orm.Seedwork и ViennaNET.Orm
главные сборки,
содержащие базовые интерфейсы и их реализации соответственно.
Остановимся на их содержимом подробнее.Интерфейс
IEntityFactoryService
и его реализация
EntityFactoryService
являются главной отправной точкой
для работы с БД, так как здесь создается Unit of Work, репозитории
для работы с конкретными сущностями, а также исполнители команд и
прямых SQL-запросов. Иногда удобно ограничить возможности класса по
работе с БД, например, дать возможность только для чтения данных.
Для таких случаев у IEntityFactoryService
есть предок
интерфейс IEntityRepositoryFactory
, в котором объявлен
только метод для создания репозиториев.Для непосредственного обращения к БД используется механизм провайдеров. Для каждой используемой у нас в командах СУБД есть своя реализация:
ViennaNET.Orm.MSSQL, ViennaNET.Orm.Oracle,
ViennaNET.Orm.SQLite, ViennaNET.Orm.PostgreSql
.При этом в одном приложении может быть зарегистрировано несколько провайдеров одновременно, что позволяет, например, в рамках одного сервиса без каких-либо затрат на доработку инфраструктуры провести пошаговую миграцию с одной СУБД на другую. Механизм выбора необходимого подключения и, следовательно, провайдера для конкретного класса-сущности (для которого и пишется маппинг на таблицы БД) реализован через регистрацию сущности в классе BoundedContext (содержит метод для регистрации доменных сущностей) или его наследника ApplicationContext (содержит методы для регистрации аппликационных сущностей, прямых запросов и команд), где в качестве аргумента принимается идентификатор подключения из конфигурации:
"db": [ { "nick": "mssql_connection", "dbServerType": "MSSQL", "ConnectionString": "...", "useCallContext": true }, { "nick": "oracle_connection", "dbServerType": "Oracle", "ConnectionString": "..." }],
Пример ApplicationContext:
internal sealed class DbContext : ApplicationContext{ public DbContext() { AddEntity<SomeEntity>("mssql_connection"); AddEntity<MigratedSomeEntity>("oracle_connection"); AddEntity<AnotherEntity>("oracle_connection"); }}
Если идентификатор подключения не указан, то будет использоваться подключение с именем default.
Непосредственно маппинг сущностей на таблицы БД реализуется стандартными средствами NHibernate. Можно использовать описание как через xml-файлы, так и через классы. Для удобного написания репозиториев-заглушек в Unit-тестах, имеется библиотека
ViennaNET.TestUtils.Orm
.Полноценные примеры использования ViennaNET.Orm.* можно найти здесь.
ViennaNET.Messaging.*
Набор библиотек для работы с очередями.
Для работы с очередями был выбран такой же подход, что и с различными СУБД, а именно максимально возможный унифицированный подход с точки зрения работы с библиотекой, независимо от используемого менеджера очередей. Библиотека
ViennaNET.Messaging
как раз отвечает за эту
унификацию, а ViennaNET.Messaging.MQSeriesQueue,
ViennaNET.Messaging.RabbitMQQueue и
ViennaNET.Messaging.KafkaQueue
содержат реализации адаптеров
для IBM MQ, RabbitMQ и Kafka соответственно.В работе с очередями есть два процесса: получение сообщения и отправка.
Рассмотрим получение. Здесь есть 2 варианта: для постоянного прослушивания и для получения единичного сообщения. Для постоянного прослушивания очереди необходимо для начала описать класс процессора, унаследованный от
IMessageProcessor
,
который будет отвечать за обработку входящего сообщения. Далее его
необходмо привязать к определенной очереди, делается это через
регистрацию в IQueueReactorFactory
с указанием
идентификатора очереди из конфигурации:
"messaging": { "ApplicationName": "MyApplication"},"rabbitmq": { "queues": [ { "id": "myQueue", "queuename": "lalala", ... } ]},
Пример запуска прослушивания:
_queueReactorFactory.Register<MyMessageProcessor>("myQueue");var queueReactor = queueReactorFactory.CreateQueueReactor("myQueue");queueReactor.StartProcessing();
Затем, при старте сервиса и вызова метода для начала прослушивания, все сообщения из указанной очереди будут попадать в соответствующий процессор.
Для получения единичного сообщения в интерфейсе-фабрике
IMessagingComponentFactory
есть метод
CreateMessageReceiver
, который создаст получателя,
ожидающего сообщения из указанной ему очереди:
using (var receiver = _messagingComponentFactory.CreateMessageReceiver<TestMessage>("myQueue")){ var message = receiver.Receive();}
Для отправки сообщения необходимо воспользоваться всё той же
IMessagingComponentFactory
и создать отправителя
сообщения:
using (var sender = _messagingComponentFactory.CreateMessageSender<MyMessage>("myQueue")){ sender.SendMessage(new MyMessage { Value = ...});}
Для сериализации и десереализации сообщения есть три готовых варианта: просто текст, XML и JSON, но при необходимости спокойно можно сделать свои реализации интерфейсов
IMessageSerializer
и IMessageDeserializer
.Мы постарались сохранить уникальные возможности каждого менеджера очередей, например,
ViennaNET.Messaging.MQSeriesQueue
позволяет отправлять не только текстовые, но и байтовые сообщения,
а ViennaNET.Messaging.RabbitMQQueue
поддерживает
роутинг и создание очередей на лету. В нашей обертке адаптера для
RabbitMQ также реализовано некоторое подобие RPC: отправляем
сообщение и ожидаем ответа из специальной временной очереди,
которая создается только для одного ответного сообщения.Вот пример использования очередей с основными нюансами подключения.
ViennaNET.CallContext
Мы используем очереди не только для интеграции между разными системами, но и для общения между микросервисами одного приложения, например, в рамках саги. Это привело к необходимости передачи вместе с сообщением таких вспомогательных данных, как логин пользователя, идентификатор запроса для сквозного логирования, ip-адрес источника и авторизационые данные. Для реализации пробрасывания этих данных разработали библиотеку
ViennaNET.CallContext
, которая позволяет хранить
данные из входящего в сервис запроса. При этом то, каким образом
был сделан запрос, через очередь или через Http, не играет роли.
Затем, перед отправкой исходящего запроса или сообщения, достаются
данные из контекста и помещаются в заголовки. Таким образом,
следующий сервис получает вспомогательные данные и аналогично ими
распоряжается.Спасибо за внимание, ждём ваших комментариев и pull request-ов!