Русский
Русский
English
Статистика
Реклама

Netcore

Интеграция с Госуслугами. Применение Workflow Core (часть II)

23.09.2020 18:06:49 | Автор: admin
В прошлый раз мы рассмотрели место СМЭВ в задаче интеграции с порталом Госуслуг. Предоставляя унифицированный протокол общения между участниками, СМЭВ существенно облегчает взаимодействие между множеством различных ведомств и организаций, желающих предоставлять свои услуги с помощью портала.

Услугу можно рассматривать как распределённый во времени процесс, имеющий несколько точек, через которые можно повлиять на его исход (отменить через портал, отказать в ведомстве, отправить на портал сведения об изменении статуса услуги, а также отправить результат её оказания). В этой связи каждая услуга проходит свой собственный жизненный цикл по этому процессу, накапливая в его ходе данные о запросе пользователя, полученных ошибках, результатах оказания услуги и т.п. Это позволяет в любой момент времени иметь возможность контроля и принятия решения о дальнейших действиях по обработке услуги.

О том, как и с помощью чего можно организовать подобную обработку, мы и поговорим далее.

Выбор движка автоматизации бизнес-процессов


Для организации процессной обработки данных существуют библиотеки и системы автоматизации бизнес-процессов, широко представленные на рынке: от встраиваемых решений до полнофункциональных систем, предоставляющих каркас для управления процессами. В качестве средства автоматизации бизнес-процессов мы выбрали Workflow Core. Такой выбор сделан по нескольким причинам: во-первых, движок написан на C# для платформы .NET Core (это наша основная платформа для разработки), поэтому включить его в общую канву продукта проще, в отличие от, например, Camunda BPM. Кроме того, это встраиваемый (embedded) движок, что даёт широкие возможности по управлению экземплярами бизнес-процессов. Во-вторых, среди множества поддерживаемых вариантов хранения данных есть и используемый в наших решениях PostgreSQL. В-третьих, движок предоставляет простой синтаксис для описания процесса в виде fluent API (также есть вариант описания процесса в JSON-файле, однако, он показался менее удобным для использования в силу того, что становится сложно обнаружить ошибку в описании процесса до момента его фактического выполнения).

Бизнес-процессы


Среди общепринятых инструментов описания бизнес-процессов следует отметить нотацию BPMN. Например, решение задачи FizzBuzz в нотации BPMN может выглядеть так:


Движок Workflow Core содержит большинство стандартных блоков и операторов, представленных в нотации, и, как уже говорилось выше, позволяет пользоваться fluent API или данными в формате JSON для описания конкретных процессов. Реализация этого процесса средствами движка Workflow Core может принять такой вид:

// Класс с данными процесса.public class FizzBuzzWfData{  public int Counter { get; set; } = 1;  public StringBuilder Output { get; set; } = new StringBuilder();}// Описание процесса.public class FizzBuzzWorkflow : IWorkflow<FizzBuzzWfData>{  public string Id => "FizzBuzz";  public int Version => 1;  public void Build(IWorkflowBuilder<FizzBuzzWfData> builder)  {    builder      .StartWith(context => ExecutionResult.Next())      .While(data => data.Counter <= 100)        .Do(a => a          .StartWith(context => ExecutionResult.Next())            .Output((step, data) => data.Output.Append(data.Counter))          .If(data => data.Counter % 3 == 0 || data.Counter % 5 == 0)            .Do(b => b              .StartWith(context => ExecutionResult.Next())                .Output((step, data) => data.Output.Clear())              .If(data => data.Counter % 3 == 0)                .Do(c => c                  .StartWith(context => ExecutionResult.Next())                    .Output((step, data) =>                       data.Output.Append("Fizz")))              .If(data => data.Counter % 5 == 0)                .Do(c => c                  .StartWith(context => ExecutionResult.Next())                    .Output((step, data) =>                      data.Output.Append("Buzz"))))              .Then(context => ExecutionResult.Next())                .Output((step, data) =>                {                  Console.WriteLine(data.Output.ToString());                  data.Output.Clear();                  data.Counter++;                }));  }}

Безусловно, процесс можно описать проще, добавив вывод нужных значений прямо в шагах, следующих за проверками кратности. Однако при текущей реализации можно видеть, что каждый шаг способен вносить какие-то изменения в общую копилку данных процесса, а также может воспользоваться результатами работы выполненных ранее шагов. При этом данные процесса хранятся в экземпляре FizzBuzzWfData, доступ к которому предоставляется каждому шагу в момент его выполнения.

Метод Build получает в качестве аргумента объект построителя процесса, который служит отправной точкой для вызова цепочки методов расширения, последовательно описывающих шаги бизнес-процесса. Методы расширения, в свою очередь, могут содержать описание действий непосредственно в текущем коде в виде лямбда-выражений, переданных в качестве аргументов, а могут быть параметризованными. В первом случае, который представлен в листинге, простой алгоритм выливается в достаточно непростой набор инструкций. Во втором логика шагов прячется в отдельных классах-наследниках от типа Step (или AsyncStep для асинхронных вариантов), что позволяет вместить сложные процессы в более лаконичное описание. На практике более пригодным представляется второй подход, первый же достаточен для простых примеров или предельно простых бизнес-процессов.

Собственно класс-описание процесса реализует параметризованный интерфейс IWorkflow, и, выполняя контракт, содержит идентификатор и номер версии процесса. Благодаря этим сведениям движок способен порождать экземпляры процессов в памяти, наполняя их данными и фиксируя их состояние в хранилище. Поддержка версионности позволяет создавать новые вариации процессов без риска затронуть существующие в хранилище экземпляры. Для создания новой версии достаточно создать копию имеющегося описания, присвоить очередной номер свойству Version и изменить нужным образом поведение этого процесса (идентификатор при этом следует оставить неизменным).

Примерами бизнес-процессов в контексте нашей задачи служат:

  • Опрос очереди сообщений СМЭВ с новыми заявлениями на получение услуг обеспечивает периодический мониторинг очереди сообщений СМЭВ на предмет наличия новых заявлений от пользователей портала.
  • Обработка заявления содержит шаги по анализу содержимого заявления, отражению его в ИАС, отправке подготовленных результатов на портал.
  • Обработка запроса на отмену заявления позволяет отразить факт отмены поданного с портала заявления на услугу в ИАС.
  • Опрос очереди сообщений СМЭВ с ответами на запросы о смене статуса заявления делает возможным изменять статус заявления на портале через ИАС, периодически контролируя очередь ответов портала на поданные запросы о смене статуса.

Как видно из примеров, все процессы условно подразделяются на циклические, выполнение которых предполагает периодическое повторение, и линейные, выполняемые в контексте конкретных заявлений и, впрочем, не исключающие наличия неких циклических конструкций внутри себя.

Рассмотрим пример одного из работающих в нашем решении процессов по опросу входящей очереди запросов:

public class LoadRequestWf : IWorkflow<LoadRequestWfData>{  public const string DefinitionId = "LoadRequest";  public string Id => DefinitionId;  public int Version => 1;  public void Build(IWorkflowBuilder<LoadRequestWfData> builder)  {    builder      .StartWith(then => ExecutionResult.Next())        .While(d => !d.Quit)          .Do(x => x            .StartWith<LoadRequestStep>() // *              .Output(d => d.LoadRequest_Output, s => s.Output)            .If(d => d.LoadRequest_Output.Exception != null)              .Do(then => then                .StartWith(ctx => ExecutionResult.Next()) // *                  .Output((s, d) => d.Quit = true))            .If(d => d.LoadRequest_Output.Exception == null                && d.LoadRequest_Output.Result.SmevReqType                  == ReqType.Unknown)              .Do(then => then                .StartWith<LogInfoAboutFaultResponseStep>() // *                  .Input((s, d) =>                    { s.Input = d.LoadRequest_Output?.Result?.Fault; })                  .Output((s, d) => d.Quit = false))            .If(d => d.LoadRequest_Output.Exception == null               && d.LoadRequest_Output.Result.SmevReqType                 == ReqType.DataRequest)              .Do(then => then                .StartWith<StartWorkflowStep>() // *                  .Input(s => s.Input, d => BuildEpguNewApplicationWfData(d))                  .Output((s, d) => d.Quit = false))            .If(d => d.LoadRequest_Output.Exception == null              && d.LoadRequest_Output.Result.SmevReqType == ReqType.Empty)              .Do(then => then                .StartWith(ctx => ExecutionResult.Next()) // *                  .Output((s, d) => d.Quit = true))          .If(d => d.LoadRequest_Output.Exception == null             && d.LoadRequest_Output.Result.SmevReqType               == ReqType.CancellationRequest)            .Do(then => then              .StartWith<StartWorkflowStep>() // *                .Input(s => s.Input, d => BuildCancelRequestWfData(d))                .Output((s, d) => d.Quit = false)));  }}

В строках, отмеченных *, можно наблюдать использование параметризованных методов расширения, которые инструктируют движок о необходимости использовать классы шагов (об этом далее), соответствующие параметрам-типам. С помощью методов расширения Input и Output мы имеем возможность задавать исходные данные, передаваемые шагу перед началом выполнения, и, соответственно, изменить данные процесса (а они представлены экземпляром класса LoadRequestWfData) в связи с произведёнными шагом действиями. А так процесс выглядит на BPMN-диаграмме:


Шаги


Как говорилось выше, логику шагов разумно размещать в отдельных классах. Помимо придания более лаконичного вида процессу, это позволяет создавать повторно используемые шаги для типовых операций.

По степени уникальности выполняемых действий в нашем решении шаги разделяются на две категории: общие и специфические. Первые могут быть повторно использованы в любых модулях для любых проектов, поэтому они размещаются в общей библиотеке решения. Вторые уникальны для каждого заказчика, через это их место в соответствующих проектных модулях. Примерами общих шагов могут служить:

Отправка подтверждающих (Ack) запросов о получении ответа.

  • Выгрузка файлов в файловое хранилище.
  • Извлечение данных из пакета СМЭВ и т.п.

Специфические шаги:

  • Создание объектов в ИАС, обеспечивающих возможность оператору предоставить услугу.
  • Генерирование документов заданной структуры с данными из заявления и размещение их в ИАС.
  • Отправка результата оказания услуги на портал и т.п.

При описании шагов процесса мы придерживались принципа ограниченной ответственности для каждого шага. Это позволило не прятать фрагменты высокоуровневой логики бизнес-процесса в шагах и явно выражать её в описании процесса. Например, если на обнаруженную в данных заявления ошибку необходимо отправить в СМЭВ сообщение об отказе в обработке заявления, то соответствующий блок условия будет находиться прямо в коде бизнес-процесса, а шагам определения факта ошибки и реагирования на неё будут соответствовать разные классы.

Следует заметить, что шаги должны быть зарегистрированы в контейнере зависимостей, благодаря чему движок получит возможность пользоваться экземплярами шагов при движении каждого процесса по своему жизненному циклу.

Каждый шаг представляет собой связующее звено между кодом, содержащим высокоуровневое описание процесса, и кодом, решающим прикладные задачи сервисами.

Сервисы


Сервисы представляют собой следующий, более низкий уровень решения задач. Каждый шаг при выполнении своей обязанности опирается, как правило, на один или более сервисов (N.B. Понятие сервис в данном контексте более близко к аналогичному понятию сервис уровня приложения из области предметно-ориентированного проектирования (DDD)).

Примерами сервисов служат:

  • Сервис получения ответа из очереди ответов СМЭВ готовит соответствующий пакет данных в формате SOAP, отправляет его в СМЭВ и преобразует ответ в вид, пригодный для дальнейшей обработки.
  • Сервис загрузки файлов из хранилища СМЭВ обеспечивает считывание файлов, приложенных к заявлению с портала, из файлового хранилища по протоколу FTP.
  • Сервис получения результата оказания услуги считывает из ИАС данные о результатах услуги и формирует соответствующий объект, на основе которого другой сервис построит SOAP-запрос для отправки на портал.
  • Сервис выгрузки файлов, связанных с результатом оказания услуги, в файловое хранилище СМЭВ.

Сервисы в решении подразделяются на группы по признаку системы, взаимодействие с которой они обеспечивают:

  • Сервисы СМЭВ.
  • Сервисы ИАС.

Сервисы для работы со внутренней инфраструктурой интеграционного решения (журналирование информации о пакетах данных, связывание сущностей интеграционного решения с объектами ИАС и пр.).

В архитектурном плане сервисы являются наиболее низким уровнем, однако, они при решении своих задач также могут опираться на утилитарные классы. Так, например, в решении существует пласт кода, решающий задачи сериализации и десериализации SOAP-пакетов данных для разных версий протокола СМЭВ. В общем виде приведённое выше описание можно резюмировать в диаграмме классов:


Непосредственно к движку относятся интерфейс IWorkflow и абстрактный класс StepBodyAsync (впрочем, можно использовать и его синхронный аналог StepBody). Ниже на схеме представлены реализации строительных блоков конкретные классы с описаниями бизнес-процессов Workflow и используемые в них шаги (Step). На нижнем уровне представлены сервисы, которые, по существу, являются уже спецификой именно данной реализации решения и, в отличие от процессов и шагов не являются обязательными.

Сервисы, как и шаги, должны быть зарегистрированы в контейнере зависимостей с тем, чтобы шаги, которые пользуются их услугами, могли посредством инъекции через конструктор получать нужные их экземпляры.

Встраивание движка в решение


На момент начала создания системы интеграции с порталом в репозитории Nuget была доступна версия движка 2.1.2. Он встраивается в контейнер зависимостей стандартным образом в методе ConfigureServices класса Startup:

public void ConfigureServices(IServiceCollection services){  // ...  services.AddWorkflow(opts =>    opts.UsePostgreSQL(connectionString, false, false, schemaName));  // ...}

Движок можно настроить на одно из поддерживаемых хранилищ данных (среди таковых есть и другие: MySQL, MS SQL, SQLite, MongoDB). В случае PostgreSQL для работы с процессами движок использует Entity Framework Core в варианте Code First. Соответственно, при наличии пустой базы данных есть возможность применить миграцию и получить нужную структуру таблиц. Применение миграции является опциональным, этим можно управлять с помощью аргументов метода UsePostgreSQL: второй (canCreateDB) и третий (canMigrateDB) аргументы логического типа позволяют сообщить движку, может ли он создать БД при её отсутствии и применять миграции.

Поскольку при очередном обновлении движка есть ненулевая вероятность изменения его модели данных, а соответствующее применение очередной миграции может повредить уже накопленные данные, мы решили отказаться от этой опции и поддерживать структуру БД самостоятельно, на основе механизма компонентов БД, который используется в других наших проектах.

Итак, вопрос с хранением данных и регистрацией движка в контейнере зависимостей решён, перейдём к запуску движка. Для этой задачи подошёл вариант размещённой службы (hosted service, а здесь можно посмотреть пример базового класса для создания такой службы). Код, взятый за основу, был незначительно доработан для поддержания модульности, под которой понимается разделение интеграционного решения (получившего название Оникс) на общую часть, обеспечивающую инициализацию движка и выполнение некоторых служебных процедур, и часть специфическую для каждого конкретного заказчика (модули интеграции).

Каждый модуль содержит описания процессов, инфраструктуру для выполнения бизнес-логики, а также некоторый унифицированный код для предоставления возможности разработанной системе интеграции распознать и динамически загрузить описания процессов в экземпляр движка Workflow Core:



Регистрация и запуск бизнес-процессов


Теперь, когда мы имеем в распоряжении готовые описания бизнес-процессов и подключенный к решению движок, пришло время сообщить движку о том, с какими процессами ему предстоит работать.

Делается это с помощью следующего кода, который может быть расположен в рамках упомянутой ранее размещённой службы (также здесь может быть размещён код, инициирующий регистрацию процессов в подключенных модулях):

public async Task RunWorkflowsAsync(IWorkflowHost host,  CancellationToken token){  host.RegisterWorkflow<LoadRequestWf, LoadRequestWfData>();  // Регистрируем другие процессы...  await host.StartAsync(token);  token.WaitHandle.WaitOne();  host.Stop();}

Заключение


В общих чертах мы рассмотрели действия, которые необходимо предпринять для использования Workflow Core в интеграционном решении. Движок позволяет описывать бизнес-процессы в достаточно гибкой и удобной манере. Держа в уме тот факт, что мы имеем дело с задачей интеграции с порталом Госуслуг посредством СМЭВ, следует ожидать, что проектируемые бизнес-процессы будут охватывать спектр довольно разнообразных задач (опрос очереди, загрузка/выгрузка файлов, гарантирование соблюдения протокола обмена и обеспечение подтверждения получения данных, обработка ошибок на разных этапах и т.п.). Отсюда вполне естественным будет ожидать возникновения некоторых на первый взгляд неочевидных моментов реализации, и именно им мы посвятим следующую, заключительную статью цикла.

Ссылки для изучения


Подробнее..
Категории: C , Net , Бизнес-процессы , Workflow , Netcore

Интеграция с Госуслугами. Особенности реализации задачи средствами Workflow Core (часть III)

29.12.2020 16:19:06 | Автор: admin
Ранее мы рассмотрели роль СМЭВ в обеспечении работоспособности портала Госуслуг, а также общие принципы организации взаимодействия с ним на стороне поставщика сведений посредством Workflow Core.

Поскольку задача интеграции решается через посредника (СМЭВ) и, помимо прочего, с использованием движка, опыта работы с которым прежде не было, то наивно будет ожидать, что всё пройдёт гладко. Некоторым сложностям, с которыми мы встретились при решении задачи, посвящена данная статья.


Реализация циклических бизнес-процессов


Внешние циклы


Как было отмечено ранее, некоторые процессы предполагают периодическое повторение включённых в них действий. Рассмотрим процесс опроса очереди СМЭВ (пример которого был приведён в предыдущей статье) на наличие новых заявлений на оказание услуг: заявления, поданные пользователями портала, помещаются в очередь запросов СМЭВ. Мы, как сторона, отвечающая на запросы (поставщики в терминологии СМЭВ), должны периодически опрашивать эту очередь с помощью специальных SOAP-запросов. Если очередь не пуста, то забираем имеющиеся в ней запросы. Когда все запросы выбраны из очереди, СМЭВ даёт пустой ответ, означающий, что данных для нас больше нет.

Итак, в процессе намечается два цикла: первый, запускающий опрос очереди, и второй, вложенный, извлекающий запросы из очереди до тех пор, пока не будет получен пустой ответ. Вложенный цикл может работать без задержек, т.к. его задача опустошить очередь запросов, а вот каждую очередную итерацию внешнего следует выполнять с задержкой, т.к. заявления поступают с разной интенсивностью в разное время суток и существуют интервалы времени, когда новых заявлений нет вовсе.

В переложении на схему BPMN процесс может выглядеть так:



Первый цикл приступает к выполнению итерации через каждые 10 минут. Вложенный цикл после обработки очередного сообщения из очереди определяет категорию пакета (заявление на получение услуги, запрос на отмену поданного заявления) и, отталкиваясь от контекста, запускает соответствующий бизнес-процесс обработки данных (заявление, отмена).

С точки зрения бизнес-логики всё организовано верно и процесс выполняет свою задачу, но на практике с течением времени всё складывается не так безупречно. Дело в том, что при работе в режиме фиксации состояния процессов в хранилище данных (persistence mode) факты выполнения каждого шага находят своё отображение в соответствующих таблицах БД. Чем дольше работает процесс, тем большую часть эфирного времени по его обработке начинают занимать операции обмена данными между движком и базой. Это приводит к тому, что производительность системы, выражающаяся, в частности, в скорости движения процессов по шагам и, как следствие, в обработке поступающих заявлений, падает практически до нуля.

Так, описанный выше процесс мы проверяли на интервале 5 сек. (такое сокращение с изначальных 10 минут было допущено на этапе отладки и на первых порах опытной эксплуатации). Суточный прогон такого процесса оставлял за собой след из порядка 17200 записей в соответствующей таблице ExecutionPointer. Пары суток работы без ручного вмешательства хватало для того, чтобы процесс переставал функционировать, что закономерно приводило к невозможности получения новых заявлений.

Таким образом, перед нами предстала первая проблемная ситуация, вызванная циклами. Решением для неё был отказ от первого цикла (разница только в стартовом блоке процесса, который в прежней версии символизировал циклическое повторение через заданный период). Вложенный цикл работает по-прежнему, но сам процесс (и подобные ему циклические) теперь запускается отдельным сервисом, который работает независимо от движка и с его помощью следит за активностью таких циклических экземпляров. В случае окончания работы таких процессов сервис осуществляет очистку их служебных данных и запускает процессы вновь. Служебные данные можно чистить без оглядки, поскольку они не представляют интереса с точки зрения интеграции (в них не хранятся полученные сообщения, эти данные передаются дальше, в линейные, долгоживущие процессы по заявлениям).

Как мы отмечали выше, каждый процесс имеет свой идентификатор. С помощью движка по идентификатору можно получить все имеющиеся экземпляры такого процесса и, в частности, получать служебную информацию о них. Именно благодаря этому мы имеем возможность в настройках системы интеграции ограничить множество циклических процессов, за которыми и будет следить сервис, периодически отдавая команду на очистку данных и перезапуск. Прочие процессы, не требующие к себе особенного отношения, продолжат привычную жизнь в рамках движка.

Реализация циклов внутри процессов


Возникают ситуации, когда нужно обеспечить реакцию процесса на некоторые внешние события. Представим, что в рамках оказания услуги пользователь ИАС оформил пакет документов и средствами интерфейса инициировал отправку результата. Важно понимать, что задачей ИАС (информационно-аналитической системы Градоустройство) здесь является лишь сообщение системе интеграции о том, что данные по конкретному заявлению на оказание услуги готовы и можно выполнять шаги по отправке результатов на портал.

Одним из вариантов решения такой задачи может быть очередь, в которую внешний по отношению к системе интеграции код помещает команду на отправку результата, а экземпляры процессов с заданным интервалом ожидания опрашивают эту очередь и, если для какого-то из них есть соответствующая команда, выходят из цикла ожидания и продолжают выполнение шагов:



Это работающий вариант, который, однако, таит в себе часть описанной ранее проблемы: подготовка ответа может затянуться, что приведёт к повторению цикла во множестве итераций. Как результат, служебные таблицы будут содержать значительно больше записей о пройденных шагах для данного экземпляра процесса. В итоге, производительность снова падает, но на этот раз чистить служебные данные будет плохой идеей, т.к. этому процессу предстоит работать дальше, а вмешательство в его внутренние данные грозит нарушением работы по конкретному заявлению.

Решением в этом случае является использование механизма событий. Отправной точкой для событий служит экземпляр движка бизнес-процессов, который можно получить в любом месте, где доступно использование инъекций зависимостей или под рукой имеется экземпляр поставщика служб IServiceProvider, запросив объект IWorkflowHost. Например, в нашем решении одним из таких мест служит метод, вызываемый нажатием кнопки отправки результата в ИАС. А вот и инструкция, инициирующая запуск события:

await host.PublishEvent(Workflow.Events.SEND_FINAL_RESULT_EVENT,  workflow.Id, null);

Первым аргументом передаётся название события (здесь константа), вторым его ключ (некая дополнительная информация, позволяющая процессам-подписчикам определить, предназначено ли событие с данным названием именно им), третьим данные (аргумент типа object, в нашем примере данные не нужны, поэтому передаём null).

Подписка на событие происходит с помощью метода расширения WaitFor, вызванного в теле описания бизнес-процесса:

// ....WaitFor(Workflow.Events.SEND_FINAL_RESULT_EVENT,  (data, step) => s.Workflow.Id)// ...

Состав аргументов прежний: первый отвечает за название прослушиваемого события, второй позволяет передать лямбда-выражение, вычисляющее ключ события. Данные, которые могут быть переданы вместе с событием, обрабатываются последующим вызовом метода расширения Output. Вот пример ожидания и обработки другого события:

// ....WaitFor(Workflow.Events.INTERMEDIATE_STATUS_RESPONSE,    (data, step) => s.Workflow.Id)  .Output((e, d) =>  {    if (e.EventData == null)      throw new Exception("В событии нет данных.");    if (e.EventData is IntermediateStatusEventResponse eventResult)      d.IntermediateStatusWaitEvent_Output = eventResult;    else      throw new Exception("Неожиданный тип данных в событии.");})// ...

Итак, практика показала, что решение задач опроса очередей и подобных им с помощью циклов внутри бизнес-процессов путь в никуда. Но устоять перед кажущейся простотой оформления петли в процессе в первый раз нам не удалось, за что пришлось заплатить пересмотром реализации нескольких бизнес-процессов.

Обработка ошибок и журналирование


Когда речь идёт об интеграционных задачах, важно обеспечить варианты ответа на ошибочные ситуации, а также предусмотреть журналирование таких случаев для дальнейшего оперативного реагирования.

Ошибки можно подразделить на три категории:
  • Ошибки ввиду сбоев связи (напомним, что взаимодействие через СМЭВ ведётся посредством защищённых каналов, которые могут по какой-то причине выйти из строя, а то и вовсе не быть в него введёнными).
  • Ошибки уровня протокола обмена данными (например, мы послали некорректный запрос, на который получили ответ с ошибкой в соответствующем специальном блоке пакета данных).
  • Непредвиденные или специально запущенные исключения в коде реализации логики шагов.

Так или иначе, практически на все ошибки любой категории следует предусмотреть обработку на уровне описания бизнес-процесса. Например, если не удалось отправить какой-то важный с точки зрения процесса запрос по причине ошибки, то следует предусмотреть повторную попытку передачи данных.

Движок содержит в своём арсенале средства реагирования на ошибки. Так, если в шаге возникнет необработанное исключение, то движок попытается выполнить его заново через одну минуту. Таково поведение по умолчанию, но его можно изменить на уровне всего процесса или отдельных шагов. Ниже приведён пример задания реакции на ошибки на уровне шага процесса:

// ....Then<ExtractSmevPackageDataRequestInfoStep>()  .Input((step, data) =>  {    step.Input = new ExtractSmevPackageDataRequestInfoStep_Input    {      RegisteredApplicationKey = data.RegisteredApplicationKey,      SmevPackageXml = data.Input.SmevPackageXmlBody    };  })  .Output((step, data) =>    data.ExtractSmevPackageDataRequestInfoStep_Output = step.Output)  .OnError(WorkflowErrorHandling.Terminate)// ...

Фрагмент кода включает в себя шаг по извлечению данных из пакета СМЭВ с сохранением результата работы в объект данных экземпляра процесса. Последней строкой кода для шага устанавливается следующий способ реагирования на непредвиденную ошибку: прерывание работы процесса. В данном случае такое поведение уместно, т.к. позволяет не зацикливаться на шаге и попытаться повторно получить и обработать заявление (данный шаг располагается до шага отправки в СМЭВ запроса на подтверждение получения заявления, что означает, что заявление не будет удалено из очереди запросов и через определённое время мы сможем получить его снова). Если это не удастся, то благодаря подсистеме журналирования запросов у нас останется возможность предпринять меры по устранению ошибки в коде и скорректировать поведение шага.

Кроме описанного выше способа (Terminate) движок располагает ещё тремя:
  • Compensate выполнение компенсационного шага, т.е. некоторого известного действия на случай ошибки, призванного, например, откатить возможные изменения (этот термин относится к области т.н. Saga-транзакций, которые, к слову, тоже поддерживаются движком).
  • Suspend приостановка процесса с возможностью последующего продолжения выполнения по команде извне.
  • Retry уже знакомый нам вариант по умолчанию, предполагающий перезапуск шага через одну минуту (интервал можно регулировать с помощью второго аргумента метода OnError).

Кроме описанного выше порядка работы с ошибками, стоит отметить ещё один способ сообщение о проблеме с помощью выходных данных шага, например, в виде некоторого логического значения. Далее с помощью ветвления описывается способ реагирования на эту ситуацию, приемлемый в данном контексте, будь то завершение процесса или отправка сообщения в СМЭВ.

Также нужно упомянуть о журналировании: помимо традиционного вывода сообщений различных уровней средствами подходящей библиотеки (в нашем случае это NLog), нелишним будет организовать и сохранение запросов и ответов в удобном для работы хранилище (например, с помощью отдельной таблицы в БД). Последняя мера позволит проследить за обменом данными со СМЭВ и в случае проблем принять более взвешенное решение об их устранении.

Работа с Saga-транзакциями


Отдельного внимания заслуживает использование транзакционности в Workflow Core. Данное средство помогает организовать последовательность шагов процесса таким образом, что любая ошибка, возникшая в одном из них, позволяет организовать реакцию по откату результатов каждого шага или транзакции в целом. Ниже приведён пример участка бизнес-процесса, описывающего последовательность шагов по отправке результата оказания услуги на портал:

// ....WaitFor(Workflow.Events.SEND_FINAL_RESULT_EVENT,  (d, s) => s.Workflow.Id, d => DateTime.Now.ToUniversalTime()).Then(o => ExecutionResult.Next()).Saga(saga => saga // *  .StartWith<CheckFinalResultQueueStep>()    .Input((s, d) => { /* ... */ })    .Output((s, d) => { /* ... */ })  .If(d => d.CheckFinalResultQueueStep_Output.Data != null      && d.CheckFinalResultQueueStep_Output.Data.IsSent)    .Do(f => f      .StartWith(r => ExecutionResult.Next())      .EndWorkflow())  .If(d => d.CheckFinalResultQueueStep_Output.Data != null      && !d.CheckFinalResultQueueStep_Output.Data.IsSent)    .Do(f => f      .StartWith(r => ExecutionResult.Next())      .Parallel()        .Do(resultSendingBranch => resultSendingBranch          .StartWith<UploadFilesToSmevStep>()            .Input((s, d) => { /* ... */ })            .Output((s, d) => { /* ... */ })          .Then<SendFinalApplicationStatusStep>()            .Input((s, d) => { /* ... */ })            .Output((s, d) => { /* ... */ })          .Then<Steps.SaveSentFinalStatusInformationToIasStep>()            .Input((s, d) => { /* ... */ }))        .Do(eventEmitBranch => eventEmitBranch          .StartWith<PublishEventStep>()            .Input((s, d) => { /* ... */ })))      .Join()      .EndWorkflow()).OnError(WorkflowErrorHandling.Retry, // **  TimeSpan.FromSeconds(DEFAULT_ONERROR_RETRY_INTERVAL))// ...

Здесь важно отметить, что при описании транзакций крайне желательно явным образом задавать способ реагирования на ошибки (строка ** для соответствующей транзакции, открытой на строке *). Дело в том, что отсутствие такого указания приведёт к прекращению выполнения ветки процесса, обёрнутой в транзакцию. Это может стать большой неожиданностью, особенно на этапе опытной эксплуатации. Конкретно для приведённого выше примера отсутствие вызова метода расширения OnError означало бы, что, скажем, ошибка в шаге CheckFinalResultQueueStep (на котором делается обращение к таблице в БД) приведёт к тому, что результат, подготовленный оператором ИАС, никогда уйдёт на портал. И наоборот, наличие явно указанной реакции на ошибку позволит повторить всю последовательность шагов через указанный интервал времени и с большой вероятностью гарантировать, что рано или поздно результат будет доставлен адресату.

Отказы при растущих объёмах данных


С течением времени количество данных (в том числе служебных) о бизнес-процессах в хранилище неизбежно растёт. Опыт использования Workflow Core показал, что этот рост приводит к постепенному замедлению работы движка с последующим отказом в работе. По достижении критической точки процессы начинают зависать на некоторых шагах без явных предпосылок к этому из текущих данных или логики описания. Более конкретно: шаги таких процессов останавливаются в статусе 1 (Pending) и не находят дальнейшего продвижения по цепочке состояний.

Столкнувшись с этой ситуацией примерно через полгода поддержки решения в эксплуатации, мы изрядно поломали голову над возможными причинами такого поведения и в итоге пришли к необходимости периодической очистки бизнес-процессов и связанных с ними данных. Очистку, разумеется, допустимо вести только относительно тех экземпляров бизнес-процессов, по которым работа была завершена.

Удобным средством в этом деле оказался существующий в движке сервис IWorkflowPurger, с помощью которого можно очистить данные из таблиц бизнес-процессов и шагов. В дополнение к нему мы предусмотрели удаление и других служебных данных из нашей модели, обернув этот композитный сервис в размещённую службу.

Указанная мера позволила стабилизировать работу решения и на протяжении последних двух месяцев наблюдать нормальный полёт.

Новые версии бизнес-процессов


Бизнес-процессы могут пересматриваться и меняться в части существующего или добавления нового поведения. Особенно остро этот вопрос встаёт, когда в эксплуатации находится некоторое количество экземпляров бизнес-процесса, построенных и работающих по текущей версии его описания. В таких условиях необходимо обеспечить возможность завершения их работы по старым правилам и, одновременно, дать ход экземплярам новой версии.

Движок предполагает возможность ввода новых версий описаний бизнес-процессов с помощью свойства Version интерфейса. В коде, соответственно, появятся новые описания бизнес-процессов, причём изменения могут, на примере нашей архитектуры, проникнуть на более низкие уровни шагов, сервисов и т.п. Организация и поддержка такого каскадного изменения может оказаться непростой задачей, чреватой ошибками и просчётами.

Стоит признать, что первый и пока единственный опыт подобных изменений в нашей практике дался непросто и породил целый шлейф экземпляров бизнес-процессов прежних версий, требовавших чуть ли не индивидуального сопровождения. В этой же связи нужно отметить, что на первый взгляд малейшее изменение в теле описания процесса без явного ввода новой версии может вылиться в отказ существующих экземпляров в продолжении работы. Поэтому хотелось бы лишь порекомендовать перед вводом в эксплуатацию предварительно проверить работоспособность системы в условиях присутствия экземпляров изменяемого бизнес-процесса обеих версий старой и новой. Проверку необходимо производить по полному циклу, с учётом всех штатных сценариев. Это трудоёмкий процесс и лишь после устранения всех найденных нестыковок можно с большой степенью уверенности сказать, что переход в опытную эксплуатацию окажется гладким.

Особенности отладки и тестирования


Отладка бизнес-процессов является отдельной темой для рассуждений, здесь лишь кратко остановимся на паре ключевых моментов, имея в виду, что движок в нашей задаче работает в режиме сохранения данных процессов в базу данных.

Первым делом следует отметить очевидный момент: перед отладкой необходимо убедиться, что на целевой базе данных не работает другой экземпляр того же движка (на сервере или у коллеги). В такую ситуацию легко можно попасть, когда над проектом работает несколько сотрудников, а, попав в неё впервые, придётся потратить время на понимание того, что кто-то ещё прикладывает руку к движению процессов по их жизненным циклам.

Далее обратим внимание на то, что каждый бизнес-процесс в контексте нашей интеграционной задачи имеет ряд точек, требующих взаимодействия со СМЭВ. Для проверки добавляемых в систему функций не всегда допустимо использовать один из контуров СМЭВ (а таких существует три: тестовый, промышленный, разработческий). Например, на первых порах разработки доступа к может не быть в принципе, поэтому мы создали заглушку, позволяющую имитировать поведение СМЭВ для нашей системы с помощью обработки стандартных запросов и возможности передачи заготовленных на них ответов. Естественно, полностью воспроизводить поведение СМЭВ ничем не оправданная задача. Однако, для проверки базовых сценариев такого решения вполне достаточно.

Кроме того, заметим, что для тестирования непосредственно бизнес-процессов существует специальный Nuget-пакет. А тестирование отдельных шагов или сервисов вполне укладывается в парадигму модульного тестирования.

Общие рекомендации


Подводя итог сказанному выше, можно выделить следующие общие рекомендации по использованию движка в задачах, подобных описанной:
  • Постарайтесь важные для бизнес-процесса действия вынести на уровень его описания, не пряча логику в шаги или более низкие уровни. Это повысит прозрачность и читаемость процесса, позволит легче диагностировать проблемы.
  • Аккуратно используйте циклы и по возможности заменяйте их механизмом событий в задачах, требующих ожидания некоторых внешних действий.
  • Обеспечьте достаточный уровень журналирования, это также облегчит диагностику ошибок.
  • Предусмотрите способ реагирования на растущее со временем количество данных по бизнес-процессам, в том числе завершённым. Некоторые данные перестают представлять какую-либо ценность и от них можно избавиться.
  • Соблюдайте осторожность при вводе в эксплуатацию новых версий бизнес-процессов. Постарайтесь предварительно провести разностороннюю проверку их работоспособности. Любое, даже самое незначительное изменение в описании процесса (на уровне порядка и состава шагов), следует оформлять отдельной его версией.
  • Заранее подумайте о средствах, которые облегчат отладку и тестирование разрабатываемого решения. Здесь подойдут как модульные тесты, так и самодельные заглушки, имитирующие в необходимых пределах работу СМЭВ.

Заключение


Итак, наше интеграционное решение было запущено в работу в январе 2020 г. Первое заявление от пользователя портала получено 24 января. С тех пор через СМЭВ с портала Госуслуг операторы системы получили и обработали порядка 8000 заявлений от граждан и юридических лиц. На диаграмме ниже представлена динамика поступления новых заявлений в период с января по декабрь:



На протяжении этого периода мы сталкивались с отмеченными ранее проблемами и планомерно их решали, что в итоге дало достаточно стабильно работающий, тиражируемый продукт.

Благодаря возможностям версионирования процессов и событий, поддерживаемых движком, открыта возможность для расширения спектра решаемых задач. Так, к существующей функциональности мы добавили интеграцию с многофункциональными центрами (МФЦ), вследствие чего пользователям портала стало доступно получение результатов услуг не только в электронном виде, почтовым отправлением или лично в ведомстве, но и посредством МФЦ. В рамках этой работы существующие бизнес-процессы претерпели минимальные изменения (с порождением соответствующих новых версий), при этом появились и новые, с которыми был организован обмен данными через события.

Таким образом, наш пусть пока и небольшой опыт позволяет судить о том, что использование движка Workflow Core для целей интеграции с порталом Госуслуг является работоспособным решением. А некоторые нюансы его использования, которые мы рассмотрели, могут помочь сократить время на реализацию проектов с нуля.

Ссылки для изучения


Подробнее..
Категории: C , Net , Бизнес-процессы , Workflow , Netcore

Interprocess communication с использованием GRPC

15.10.2020 18:13:18 | Автор: admin

Сегодня хочу рассказать о нашем пути реализации межпроцессного взаимодействия между приложениями на NET Core и NET Framework при помощи протокола GRPC. Ирония заключается в том, что GRPC, продвигаемый Microsoft как замена WCF на своих платформах NET Core и NET5, в нашем случае случился именно из-за неполноценной реализации WCF в NET Core.


Я надеюсь эта статья найдется поиском когда кто-то будет рассматривать варианты организации IPC и позволит посмотреть на такое высокоуровневаое решение как GRPC с этой, низкоуровневой, стороны.


Вот уже более 7 лет моя трудовая деятельность связана с тем, что называют "информатизация здравоохранения". Это довольно интересная сфера, хотя и имеющая свои особенности. Некоторыми из них явяляются зашкаливающее количество легаси технологий (консервативность) и определнная закрытость для интеграции у большинства существующих решений (вендор-лок на экосистеме одного производителя).


Контекст


С комбинацией этих двух особенностей мы и столкнулись на текущем проекте: нам понадобилось инициировать работу и получать данные из некоего программно-аппаратного комплекса. Поначалу все выглядело очень неплохо: софтовая часть комплекса поднимает WCF службу, которая принимает команды на выполнение и выплёвывает результаты в файл. Более того, производитель предоставляет SDK с примерами! Что может пойти не так? Всё вполне технологично и современно. Никакого ASTM с палочками-разеделителями, ни даже обмена файлами через общую папку.


Но по какой-то странной причине, WCF служба использует дуплексные каналы и привязку WSDualHttpBinding, которая недоступна под .NET Core 3.1, только в "большом" фреймворке (или уже в "старом"?). При этом дуплексность каналов никак не используется! Она просто есть в описании службы. Облом! Ведь остальной проект живет на NET Core и отказываться от этого нет никакого желания. Придется собирать этот "драйвер" в виде отдельного приложения на NET Framework 4.8 и каким то образом пытаться организовать хождение данных между процессами.


Межпроцессное взаимодействие


В теме межпроцессного взаимодействия действительно есть из чего выбрать. Можно обмениваться файлами, кидать сигналы, использовать мьютексы, именованные каналы, поднять соединение через tcp-сокет, или даже воспользоваться каким-нибудь RPC протоколом верхнего уровня. Чтобы не потонуть в этом многообразии попробуем сформулировать требования к службе IPC:


  • возможность передавать команды, в том числе параметризованные
  • возможность получать ответы о результате выполнения команды
  • возможность асинхронно получать ответы о ходе выполнения долгоиграющей задачи
  • контроль получения сообщений второй стороной
  • Поддержка в Windows (начиная с 7 версии)
  • Поддержка в NET Framework и NET Core
  • Строгая типизация сообщений
  • Расширяемость
  • Поменьше боли

Последний пункт самый важный, потому что всегда можно сгородить сколь угодно сложный и замороченный велосипед на базе самых низкоуровневых примитивов, например разделяемой памяти и мьютексов. Но зачем?


Именованные каналы


Да, мы сделали это. В первой итерации были выбраны именно именованные каналы, по которым передавались команды и данные. У нас были абстракции для команды и для данных, и два однонаправленных канала в каждом "соединении". У нас не было контроля получения сообщений, зато была боль с поддержкой стейта канал мог развалиться по каким то странным причинам. У нас не было связного асинхронного потока ответов о прогрессе долгоиграющей операци, зато были отдельные события получения сообщения и серверу надо было передавать исходную команду к которой относится ответ. У нас не было настоящей строгой типизации, а только надежда что на той стороне данные упаковали в правильную "команду" или "ответ". Боль? Боль была, постоянная: трудновоспроизводимые баги, никакущий дебаг, мрак и заброшенность.


На самом деле все было не так плохо. Это решение работало и проработало почти год. Мы потратили много времени на его тестирование, отлавливание багов, восстановление стейта, выработку workaround, чтобы его можно было использовать у конечных пользователей. Но мне кажется его никто не любил.


GRPC


Уныло поглядев на в очередной раз глюканувший софт, рассказывающий о сломанных трубах, было принято решение все переписать. И переписать на GRPC. Почему GRPC? Ну, мы используем его в хвост и гриву для взаимодействия между клиентом и основным сервером приложения. Он произвел впечатление достаточно быстрого и богатого возможностями решения.
Что касается чеклиста с требованиями, то получается примерно так:


  • возможность передавать команды, в том числе параметризованные да, это то что называется Unary call
  • возможность получать ответы о результате выполнения команды то же
  • возможность асинхронно получать ответы о ходе выполнения долгоиграющей задачи да, это server streaming rpc
  • контроль получения сообщений второй стороной обеспечивается транспортным уровнем HTTP/2
  • Поддержка в Windows (начиная с 7 версии) да, но в семёрке есть определённые ограничения
  • Поддержка в NET Framework и NET Core да
  • Строгая типизация сообщений да, обеспечивается форматом protobuf
  • Расширяемость и расширяемость
  • Поменьше боли больно почти не будет , как комарик укусит

Затаскиваем GRPC за 5 минут


Я позволю себе написать очередной tutorial. Туториалов по GRPC много, но они все одинаковые и не об этом, а в начале своего пути по дороге IPC я бы хотел чтобы что-то такое попалось мне на глаза. Все примеры кода есть в репозитории тут.


Подготовка


Наше решение будет состоять из трёх сборок:


  • IpcGrpcSample.CoreClient консольное приложение под NET Core 3.1, будет играть роль клиента RPC
  • IpcGrpcSample.NetServer консольное приложение под NET Framework 4.8, будет за сервер RPC
  • IpcGrpcSample.Protocol библиотека, нацеленная на NET Standard 2.0. В ней будет определен контракт RPC

Для удобства работы отредактируем формат файла проекта на NET Framework под новый лад и снесем Properties\AssemblyInfo.cs


<Project Sdk="Microsoft.NET.Sdk">  <PropertyGroup>    <TargetFramework>net48</TargetFramework>  </PropertyGroup>  <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />  <PropertyGroup>...</PropertyGroup>  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">...</PropertyGroup>  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">...</PropertyGroup>  <ItemGroup>    <Reference Include="System" />    <Reference Include="System.Core" />    <Reference Include="System.Xml.Linq" />    <Reference Include="System.Data.DataSetExtensions" />    <Reference Include="Microsoft.CSharp" />    <Reference Include="System.Data" />    <Reference Include="System.Net.Http" />    <Reference Include="System.Xml" />  </ItemGroup>  <ItemGroup>    <None Include="App.config" />  </ItemGroup>  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /></Project>

Настало время тащить NuGet!


  • В проект IpcGrpcSample.Protocol подключаем пакеты Google.Protobuf, Grpc и Grpc.Tools
  • В сервер затащим Grpc, Grpc.Core, Microsoft.Extensions.Hosting и Microsoft.Extensions.Hosting.WindowsServices.
  • В клиента тащим Grpc.Net.Client и OneOf он нам пригодится.

Описываем gRPC службу


Наверно всем уже надоел пример с GreeterService? Давайте что-нибудь повеселее. Сделаем две службы. Одна будет управлять термоциклером-амплификатором, другая будет запускать протокол экстракции на автомтаизированной станции экстракции.


Службы описываются в виде .proto файлов в разделяемой сборке IpcGrpcSample.Protocol. Protobuf-компилятор потом соберет из них нужные нам классы и заготовки под сервисы и клиентов.


Описание службы работы с роботизированной станцией


//указание синтаксиса описанияsyntax = "proto3"; // импорт типа Emptyimport "google/protobuf/empty.proto";// классы будут генерироваться в этом пространстве именoption csharp_namespace = "IpcGrpcSample.Protocol.Extractor"; // описание методов RPC службы управления экстракторомservice ExtractorRpcService {    // унарный вызов "запускающий" операцию  rpc Start (google.protobuf.Empty) returns (StartResponse);  }// ответ на стартmessage StartResponse {    bool Success = 1;}

Описание службы работы с термоциклером


//указание синтаксиса описанияsyntax = "proto3"; // классы будут генерироваться в этом пространстве именoption csharp_namespace = "IpcGrpcSample.Protocol.Thermocycler"; // описание методов RPC службы управления термоциклеромservice ThermocyclerRpcService {    // server-streaming вызов "запускающий эксперимент". На один запрос отправит множество сообщений-ответов, которые будут доступны асинхронно  rpc Start (StartRequest) returns (stream StartResponse);  }// описание сообщения - запроса на запуск экспериментаmessage StartRequest {  // поля запроса - это поле будет названием эксперимента  string ExperimentName = 1;  // а числовое поле - количество циклов, в ходе которых прибор будет "снимать показания" и отправлять назад  int32 CycleCount = 2;}// сообщение из стрима после стартаmessage StartResponse {  // номер цикла  int32 CycleNumber = 1;  // поле в виде конструкции oneof - сообщение может содержать объект одного из типов.   // Что-то вроде discriminated union, но попроще  oneof Content {    // прочитанные данные реакционного блока    PlateRead plate = 2;    // сообщение статуса прибора    StatusMessage status = 3;  }}message PlateRead {  string ExperimentalData = 1;}message StatusMessage {  int32 PlateTemperature = 2;}

Все proto-файлы надо пометить как компилируемые protobuf компилятором. Ручками по одному или добавить в csproj эти строки:


  <ItemGroup>    <Protobuf Include="**\*.proto" />  </ItemGroup>

Строим сервер


На дворе 2020 год и поэтому сервер соберем на базе Hosting абстракций из NET Core. Сразу вставляем такой сниппет в Program.cs:


class Program{    static Task Main(string[] args) => CreateHostBuilder(args).Build().RunAsync();    public static IHostBuilder CreateHostBuilder(string[] args) =>        Host.CreateDefaultBuilder(args)        .UseWindowsService()        .ConfigureServices(services =>        {            services.AddLogging(loggingBuilder =>            {                loggingBuilder.ClearProviders();                loggingBuilder.SetMinimumLevel(LogLevel.Trace);                                    loggingBuilder.AddConsole();            });            services.AddTransient<ExtractorServiceImpl>(); // регистрация зависимостей - сервисов с логикой управления приборами            services.AddTransient<ThermocyclerServiceImpl>();            services.AddHostedService<GrpcServer>(); // регистрация GRPC сервера как HostedService        });}

Обойдемся в примере без конфигурации и логгирования. Реализация служб (контроллеров) будет внедряться в сервер через конструктор.
Код сервера достаточно прямолинеен в методе старта службы сервер поднимается, в методе остановки останавливается. Здесь можно определить будет ли сервер использовать TLS (тогда ему нужен сертификат для шифрования) или трафик будет ходить без шифрования и тогда необходимо использовать ServerCredentials.Insecure. Зачем это может пригодиться и что еще необходимо сделать чтобы гонять голый http/2 трафик в конце.


internal class GrpcServer : IHostedService{    private readonly ILogger<GrpcServer> logger;    private readonly Server server;    private readonly ExtractorServiceImpl extractorService;    private readonly ThermocyclerServiceImpl thermocyclerService;    public GrpcServer(ExtractorServiceImpl extractorService, ThermocyclerServiceImpl thermocyclerService, ILogger<GrpcServer> logger)    {        this.logger = logger;        this.extractorService = extractorService;        this.thermocyclerService = thermocyclerService;        var credentials = BuildSSLCredentials(); // строим креды из сертификата и приватного ключа.         server = new Server //создаем объект сервера        {            Ports = { new ServerPort("localhost", 7001, credentials) }, // биндим сервер к адресу и порту            Services = // прописываем службы которые будут доступны на сервере            {                ExtractorRpcService.BindService(this.extractorService),                ThermocyclerRpcService.BindService(this.thermocyclerService)            }        };                }    /// <summary>    /// Вспомогательный метод генерации серверных кредов из сертификата    /// </summary>    private ServerCredentials BuildSSLCredentials()    {        var cert = File.ReadAllText("cert\\server.crt");        var key = File.ReadAllText("cert\\server.key");        var keyCertPair = new KeyCertificatePair(cert, key);        return new SslServerCredentials(new[] { keyCertPair });    }    public Task StartAsync(CancellationToken cancellationToken)    {        logger.LogInformation("Запуск GRPC сервера");        server.Start();        logger.LogInformation("GRPC сервер запущен");        return Task.CompletedTask;    }    public async Task StopAsync(CancellationToken cancellationToken)    {        logger.LogInformation("Останов GRPC сервера");        await server.ShutdownAsync();        logger.LogInformation("GRPC сервер остановлен");    }}

Осталось закинуть какую нибудь логику в контроллеры и стартовать!
У экстрактора очень простой сервис. Будет через раз отвечать что все получилось или нет:


internal class ExtractorServiceImpl : ExtractorRpcService.ExtractorRpcServiceBase{    private static bool success = true;    public override Task<StartResponse> Start(Empty request, ServerCallContext context)    {        success = !success;        return Task.FromResult(new StartResponse { Success = success });    }}

А у термоциклера организуем что-нибудь повеселее:


internal class ThermocyclerServiceImpl : ThermocyclerRpcService.ThermocyclerRpcServiceBase{    private readonly ILogger<ThermocyclerServiceImpl> logger;    public ThermocyclerServiceImpl(ILogger<ThermocyclerServiceImpl> logger)    {        this.logger = logger;    }    public override async Task Start(StartRequest request, IServerStreamWriter<StartResponse> responseStream, ServerCallContext context)    {        logger.LogInformation("Эксперимент начинается");        var rand = new Random(42);        for(int i = 1; i <= request.CycleCount; ++i)        {            logger.LogInformation($"Отправка цикла {i}");            var plate = new PlateRead { ExperimentalData = $"Эксперимент {request.ExperimentName}, шаг {i} из {request.CycleCount}: {rand.Next(100, 500000)}" };            await responseStream.WriteAsync(new StartResponse { CycleNumber = i, Plate = plate });            var status = new StatusMessage { PlateTemperature = rand.Next(25, 95) };            await responseStream.WriteAsync(new StartResponse { CycleNumber = i, Status = status });            await Task.Delay(500);        }        logger.LogInformation("Эксперимент завершен");    }}

Сервер готов. Можно позапускать консольку и убедиться что GRPC сервер поднимается и останавливается по нажатию на Ctrl-C:


dbug: Microsoft.Extensions.Hosting.Internal.Host[1]      Hosting startinginfo: IpcGrpcSample.NetServer.GrpcServer[0]      Запуск GRPC сервераinfo: IpcGrpcSample.NetServer.GrpcServer[0]      GRPC сервер запущенinfo: Microsoft.Hosting.Lifetime[0]      Application started. Press Ctrl+C to shut down.info: Microsoft.Hosting.Lifetime[0]      Hosting environment: Productioninfo: Microsoft.Hosting.Lifetime[0]      Content root path: C:\Users\user\source\repos\IpcGrpcSample\IpcGrpcSample.NetServer\bin\Debugdbug: Microsoft.Extensions.Hosting.Internal.Host[2]      Hosting startedinfo: Microsoft.Hosting.Lifetime[0]      Application is shutting down...dbug: Microsoft.Extensions.Hosting.Internal.Host[3]      Hosting stoppinginfo: IpcGrpcSample.NetServer.GrpcServer[0]      Останов GRPC сервераinfo: IpcGrpcSample.NetServer.GrpcServer[0]      GRPC сервер остановленdbug: Microsoft.Extensions.Hosting.Internal.Host[4]      Hosting stopped

Соответственно в контроллерах может быть все что угодно: маппинг с транспортной модели на модель бизнеслогики которую нужно дергать из NET Framework, вызов WCF etc. И не надо тащить никакого Kestrel!
Сервер без клиента можно подергать например при помощи grpcurl, но все городилось для межпроцессного взаимодействия. Нужен клиент на NET Core.


Клиентская часть на NET Core


Здесь все проще. Реализуем пару клиентов для сервисов и будем слушать ввод от пользователя.


Код сервиса экстрактора прямолинеен и тривиален. В конструкторе настраивается канал и создается gRPC клиент. Единственный метод дергает RPC метод и разворачивает результат.


class ExtractorClient{    private readonly ExtractorRpcService.ExtractorRpcServiceClient client;    public ExtractorClient()    {        //AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); //без этого кода невозможна коммуникация через http/2 без TLS                var httpClientHandler = new HttpClientHandler        {            ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator // заглушка для самоподписанного сертификата        };        var httpClient = new HttpClient(httpClientHandler);        var channel = GrpcChannel.ForAddress("https://localhost:7001", new GrpcChannelOptions { HttpClient = httpClient });        client = new ExtractorRpcService.ExtractorRpcServiceClient(channel);    }    public async Task<bool> StartAsync()    {        var response = await client.StartAsync(new Empty());        return response.Success;    }}

В клиенте термоциклера используем IAsyncEnumerable<> возвращающим последовательность из монад OneOf<,> эмулируется асинхронная последовательность данных от прибора.


public async IAsyncEnumerable<OneOf<string, int>> StartAsync(string experimentName, int cycleCount){    var request = new StartRequest { ExperimentName = experimentName, CycleCount = cycleCount };    using var call = client.Start(request, new CallOptions().WithDeadline(DateTime.MaxValue)); // настройка времени ожидания    while (await call.ResponseStream.MoveNext())    {        var message = call.ResponseStream.Current;        switch (message.ContentCase)        {            case StartResponse.ContentOneofCase.Plate:                yield return message.Plate.ExperimentalData;                break;            case StartResponse.ContentOneofCase.Status:                yield return message.Status.PlateTemperature;                break;            default:                break;        };    }}

Остальной код клиента тривиален и позволяет только позапускать клиентов в цикле и вывести результаты на консоль.


HTTP/2 и Windows 7


Оказалось, что в седьмой версии Windows нет поддержки TLS в HTTP/2. Поэтому сервер надо бинить немного иначе, используя небезопасные креды:


server = new Server //создаем объект сервера{    Ports = { new ServerPort("localhost", 7001, ServerCredentials.Insecure) }, // биндим сервер к адресу и порту    Services = // прописываем службы которые будут доступны на сервере    {        ExtractorRpcService.BindService(this.extractorService),        ThermocyclerRpcService.BindService(this.thermocyclerService)    }};            

И у клиентов необходимо указать в качестве протокола http, а не https. Но это не все. Необходимо так же сообщить среде, что будет осознанно использоваться небезопасное соединение по http/2:


AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);

В коде проекта специально сделано много упрощений не обрабатываются исключения, не ведется нормально логгирование, параметры захардкожены в код. Это не production-ready, а заготовка для решения проблем. Надеюсь, было интересно, задавайте вопросы!

Подробнее..
Категории: Net , Netframework , Grpc , Netcore , Ipc

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru