Русский
Русский
English
Статистика
Реклама

Data-flow

Spring Integration динамические потоки данных

05.07.2020 20:11:11 | Автор: admin
Салют Хабр! Сегодня мы разберем достаточно специфичную область потоковая обработка данных, с помощью Spring Integration фреймворка и как сделать эти потоки в runtime без предварительной инициализации в контексте приложения. Полный пример приложения лежит в Гите.

Введение


Spring Integration фреймворк корпоративной интеграции (EIP), использующий под капотом механизмы обмена сообщениями между адаптерами различных протоколов/систем интеграции на основе каналов сообщений (условные очереди). Известными аналогами являются Camel, Mule, Nifi.

Из тестового кейса у нас будет сделать REST сервис, который умеет считывать полученные параметры запроса, ходить в нашу базу, к примеру, postgres, делать обновление и выборку из данных таблиц по параметрам, полученных от источника, и отдавать результат в очередь обратно (request/response), а также сделать несколько экземпляров с разными путями запроса.

Условно диаграмма data flow (потока) будет выглядеть так:

image

Далее я покажу, как это можно просто сделать без особых танцев с бубном, с помощью IntegrationFlowContext, с REST-управляющими эндепоинтами компонентов/потоков. Весь основной код проекта будет расположен в репозитории, здесь укажу лишь некоторые вырезки. Что ж, кто заинтересован, прошу под кат.


Инструментарий


По стандарту начнем с блока зависимостей. В основном нам понадобятся проекты spring-boot для REST идеологии управления потоками/компонентами, spring-integration для создания нашего кейса на основе каналов и адаптеров.

И сразу думаем, что же нам еще понадобиться для воспроизведения кейса. Кроме core зависимостей, нам понадобятся подпроекты integration-http, integration-jdbc, integration-groovy (обеспечивает динамически-настраиваемые преобразователи данных на основе Goovy скриптов). Отдельно скажу, что в данном примере мы не будем использовать groovy преобразователь за ненадобностью, но предоставим возможность его настройки из вне.

Dependency list
 <!-- Spring block -->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.data</groupId>            <artifactId>spring-data-commons</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-integration</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.integration</groupId>            <artifactId>spring-integration-groovy</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.integration</groupId>            <artifactId>spring-integration-http</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.integration</groupId>            <artifactId>spring-integration-jdbc</artifactId>        </dependency>        <!-- Db block -->        <dependency>            <groupId>org.postgresql</groupId>            <artifactId>postgresql</artifactId>        </dependency>        <dependency>            <groupId>com.zaxxer</groupId>            <artifactId>HikariCP</artifactId>        </dependency>        <!-- Utility block -->        <dependency>            <groupId>org.apache.commons</groupId>            <artifactId>commons-lang3</artifactId>        </dependency>        <dependency>            <groupId>org.reflections</groupId>            <artifactId>reflections</artifactId>            <version>0.9.12</version>        </dependency>        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>            <version>1.18.4</version>            <scope>provided</scope>        </dependency>



Внутренняя кухня


Перейдем к созданию необходимых компонентов системы (оберток/моделей). Нам понадобятся модели channel, bean, httpInboundGateway, handler, jdbcOutboundGateway и result.

bean вспомогательный объект, необходимый для работы адаптеров, потока
channel канал поставки сообщений в/из компонентов потока
httpInboundGateway http точка доступа, которой мы дальше будем отправлять запрос с данными для последующей обработки
handler обобщенный тип обработчика (груви трансформеры, различные адаптеры, etc.)
jdbcOutboundGateway адаптер jdbc
result обработчик отдачи информации в определенный канал

Обертки нам понадобятся для хранения параметров и корректной инициализации компонентов цельного потока, поэтому сразу делаем хранилище компонентов, доп. функционал конверторов JSON -> Definition Model. Прямое сопоставление полей с помощью jackson и объектов в моем кейсе не было применимо имеем еще один велосипед под специфический протокол общения.

Сразу сделаем красиво, на аннотациях:
StreamComponent отвечает за идентификацию классов, как настроечную модель компонента потока и имеет в себе служебную информацию имя компонента, тип компонента, вложенный ли компонент и описание;
SettingClass отвечает за дополнительные опции сканирования модели, такие как сканирования полей супер класса и игнорировании полей при инициализации значений;
SettingValue отвечает за идентификацию поля класса, как настраиваемого из вне, с настройками именования в JSON, описанием, преобразователем типов, флагом обязательного поля и флагом внутреннего объекта для информативности;

Менеджер хранения компонентов

Вспомогательные методы работы с моделями для REST контроллеров

Базовая модель абстракция с набором вспомогательных полей/методов модели

Текущие модели настройки потока

Маппер JSON -> Definition Model

Основную почву для работы подготовили. Теперь приступим к реализации, непосредственно, сервисов, которые будут отвечать за жизненный цикл, хранение и инициализацию потоков и сразу будем закладывать идею того, что 1 поток с тем же именованием мы можем расспараллелить на несколько экземпляров, т.е. нам нужно будет делать уникальные идентификаторы (гуиды) для всех компонентов потока иначе в контексе приложения могу возникнуть коллизии с другими singleton компонентами (бинами, каналами и пр.). Но предварительно сделаем мапперы двух компонентов это http и jdbc, т.е. приращение моделей, сделанных ранее к компонентам самого потока (HttpRequestHandlerEndpointSpec и JdbcOutboundGateway).

HttpRegistry

JdbcRegistry

Центральный управляющий сервис (StreamDeployingService) выполняет функции хранения рабочих/неактивных, регистрирует новые, запускает, останавливает и удаляет потоки полностью из контекста приложения. Важной особенностью сервиса является внедрение зависимости IntegrationFlowBuilderRegistry, который нам и помогает делать динамику приложения (возможно вспомните эти конфигурационные xml файлы или DSL классы на километры). По спецификации потока он должен всегда начинаться с inbound компонента или канала, поэтому учитываем это в реализации метода registerStreamContext.

И вспомогательный менеджер (IntegrationFlowBuilderRegistry), выполняющий функцию как маппера моделей на компоненты потока, так и инициализацию самого потока средствами IntegrationFlowBuilder. Так же я внедрил обработчик логов в пайплайн потока, сервис сбора метрик каналов потока (выключаемая опция) и возможную реализацию преобразователей сообщений потока, основанных на Groovy реализации (если вдруг этот пример станет основой прода, то предкомпиляцию groovy скриптов обязательно нужно делать на этапе инициализации потока, ибо упретесь на нагрузочных тестах в ОЗУ и без разницы сколько у Вас ядер и мощности). В зависимости от конфигурации модели параметров log-stages и log-level, он будет активен после каждой передачи сообщения от компонента к компоненту. Мониторинг включается и отключается параметром в application.yml:
monitoring:  injectction:    default: true


Теперь у нас есть вся механика для инициализации динамических потоков обработки данных, можно дополнительно написать мапперы для различных протоколов и адаптеров типа RabbitMQ, Kafka, Tcp, Ftp, etc. тем более собственноручно в большинстве случаев ничего самому (кроме, естественно, моделей настройки и вспомогательных методов) ничего писать уже не нужно достаточно большое количество компонентов уже присутствуют в репозитории.

Заключительным этапом будет реализация контроллеров получения информации о существующих компонентах системы, управления потоками и получения метрик.

ComponentsController обеспечивает отдачу информации как о всех компонентах в удобочитаемой модели, так и одного компонента по имени и типу.

StreamController обеспечивает полноценное управление потоками, а именно инициализацию новых по JSON модели, запуск, остановку, удаление и выдачу метрик по идентификатору.

Конечный продукт


Поднимаем получившееся приложение и описываем тестовый кейс в JSON формате.

Sample Data Stream
Скрипт инициализации базы данных:
CREATE TABLE IF NOT EXISTS account_data(    id          INT                      NOT NULL,    accountname VARCHAR(45)              NOT NULL,    password    VARCHAR(128),    email       VARCHAR(255),    last_ip     VARCHAR(15) DEFAULT NULL NOT NULL);CREATE UNIQUE INDEX account_data_username_uindex    ON account_data (accountname);ALTER TABLE account_data    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (        SEQUENCE NAME account_data_id_seq            START WITH 1            INCREMENT BY 1            NO MINVALUE            NO MAXVALUE            CACHE 1        );ALTER TABLE account_data    ADD CONSTRAINT account_data_pk        PRIMARY KEY (id);CREATE TABLE IF NOT EXISTS account_info(    id             INT NOT NULL,    banned         BOOLEAN  DEFAULT FALSE,    premium_points INT      DEFAULT 0,    premium_type   SMALLINT DEFAULT -1);ALTER TABLE account_info    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (        SEQUENCE NAME account_info_id_seq            START WITH 1            INCREMENT BY 1            NO MINVALUE            NO MAXVALUE            CACHE 1        );ALTER TABLE account_info    ADD CONSTRAINT account_info_account_data_id_fk FOREIGN KEY (id) REFERENCES account_data (id)        ON UPDATE CASCADE ON DELETE CASCADE;ALTER TABLE account_info    ADD CONSTRAINT account_info_pk        PRIMARY KEY (id);INSERT INTO account_data (accountname, password, email, last_ip)VALUES ('test', 'test', 'test@test', '127.0.0.1');INSERT INTO account_info (banned, premium_points, premium_type)VALUES (false, 1000, 1);


Важно: параметр order служит для последовательной инициализации компонентов в контексте потока, т.е. как выстроены компоненты по этому параметру, так и будет воспроизведена обработка входящего сообщения. (каналы и бины всегда выставляются первыми в списке). А по хорошему нужно сделать обработку графа и надобность в этом параметре отпадет сам собой.
{  "flowName": "Rest Postgres stream",  "components": [    {      "componentName": "bean",      "componentType": "other",      "componentParameters": {        "id": "pgDataSource",        "bean-type": "com.zaxxer.hikari.HikariDataSource",        "property-args": [          {            "property-name": "username",            "property-value": "postgres"          },          {            "property-name": "password",            "property-value": "postgres"          },          {            "property-name": "jdbcUrl",            "property-value": "jdbc:postgresql://localhost:5432/test"          },          {            "property-name": "driverClassName",            "property-value": "org.postgresql.Driver"          }        ]      }    },    {      "componentName": "message-channel",      "componentType": "source",      "componentParameters": {        "id": "jdbcReqChannel",        "order": 1,        "channel-type": "direct",        "max-subscribers": 1000      }    },    {      "componentName": "message-channel",      "componentType": "source",      "componentParameters": {        "id": "jdbcRepChannel",        "order": 1,        "channel-type": "direct"      }    },    {      "componentName": "http-inbound-gateway",      "componentType": "source",      "componentParameters": {        "order": 2,        "http-inbound-supported-methods": [          "POST"        ],        "payload-type": "org.genfork.integration.model.request.http.SimpleJdbcPayload",        "log-stages": true,        "log-level": "INFO",        "request-channel": "jdbcReqChannel",        "reply-channel": "jdbcRepChannel"      }    },    {      "componentName": "handler",      "componentType": "processor",      "componentParameters": {        "order": 3,        "handler-definition": {          "componentName": "jdbc-outbound-adapter",          "componentType": "app",          "componentParameters": {            "data-source": "pgDataSource",            "query": "SELECT accountname, password, email, last_ip, banned, premium_points, premium_type FROM account_data d INNER JOIN account_info i ON d.id = i.id WHERE d.id = :payload.accountId",            "update-query": "UPDATE account_info SET banned = true WHERE id = :payload.accountId",            "jdbc-reply-channel": "jdbcRepChannel",            "log-stages": true,            "log-level": "INFO"          }        }      }    },    {      "componentName": "result",      "componentType": "app",      "componentParameters": {        "order": 4,        "cancel": false,        "result-channel": "jdbcRepChannel"      }    }  ]}



Тестируем:
1) Инициализируем новый поток методом:
POST /stream/deploy, где в теле запроса будет наш JSON.

В ответ система должна будет прислать, если все корректно, в противном же случаи будет видно сообщение об ошибке:
{    "status": "SUCCESS", - статус инициализации    "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b" - идентификатор потока}


2) Инициируем запуск методом:
GET /stream/2bf65d9d-97c6-4199-86aa-0c808c25071b/start, где указываем идентификатор проинициализированного потока ранее.

В ответ система должна будет прислать, если все корректно, в противном же случаи будет видно сообщение об ошибке:
{    "status": "SUCCESS", - статус инициализации}


3) Вызываем поток по идентификатору в системе? Как, что и где в маппере модели HttpRegistry я прописал условие
Http.inboundGateway(localPath != null ? localPath : String.format("/stream/%s/call", uuid))

где, учитывается параметр http-inbound-path, и, если он не указан явно в конфигурации компонента, то игнорируется и выставляется системный путь вызова. В нашем случаи это будет:

POST /stream/ece4d4ac-3b46-4952-b0a6-8cf334074b99/call где присутствует идентификатор потока, c телом запроса:
{    "accountId": 1}


В ответ получим, если этапы обработки запроса отработали корректно, получим плоскую структуру записей таблиц account_data и account_info.

{    "accountname": "test",    "password": "test",    "email": "test@test",    "last_ip": "127.0.0.1",    "banned": true,    "premium_points": 1000,    "premium_type": 1}


Специфика адаптера JdbcOutboundGateway такая, что если указывать параметр update-query, то регистрируется дополнительный обработчик, который выполняет сначала обновление данных, а только потом выборку по параметру query.

Если указывать одинаковые пути вручную, то возможность запуска с HttpInboundGateway компонентов как точкой доступа к потоку в несколько экземпляров будет упразднена ибо система не даст зарегистрировать аналогичный путь.

4) Посмотрим метрики методом GET /stream/2bf65d9d-97c6-4199-86aa-0c808c25071b/metrics
Содержимое ответа
Показана информация о отправленном количестве сообщений в физические каналы компонентов потока, рейте ошибочных/успешных сообщений, среднее/минимальное и максимальное время обработки на каждом канале/компоненте потока:
[    {        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcReqChannel",        "sendDuration": {            "count": 1,            "min": 153.414,            "max": 153.414,            "mean": 153.414,            "standardDeviation": 0.0,            "countLong": 1        },        "maxSendDuration": 153.414,        "minSendDuration": 153.414,        "meanSendDuration": 153.414,        "meanSendRate": 0.001195117818082359,        "sendCount": 1,        "sendErrorCount": 0,        "errorRate": {            "count": 0,            "min": 0.0,            "max": 0.0,            "mean": 0.0,            "standardDeviation": 0.0,            "countLong": 0        },        "meanErrorRate": 0.0,        "meanErrorRatio": 1.1102230246251565E-16    },    {        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",        "channelName": "application.2bf65d9d-97c6-4199-86aa-0c808c25071b.channel#2",        "sendDuration": {            "count": 1,            "min": 0.1431,            "max": 0.1431,            "mean": 0.1431,            "standardDeviation": 0.0,            "countLong": 1        },        "maxSendDuration": 0.1431,        "minSendDuration": 0.1431,        "meanSendDuration": 0.1431,        "meanSendRate": 0.005382436008121413,        "sendCount": 1,        "sendErrorCount": 0,        "errorRate": {            "count": 0,            "min": 0.0,            "max": 0.0,            "mean": 0.0,            "standardDeviation": 0.0,            "countLong": 0        },        "meanErrorRate": 0.0,        "meanErrorRatio": 0.0    },    {        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcRepChannel",        "sendDuration": {            "count": 1,            "min": 0.0668,            "max": 0.0668,            "mean": 0.0668,            "standardDeviation": 0.0,            "countLong": 1        },        "maxSendDuration": 0.0668,        "minSendDuration": 0.0668,        "meanSendDuration": 0.0668,        "meanSendRate": 0.001195118373693797,        "sendCount": 1,        "sendErrorCount": 0,        "errorRate": {            "count": 0,            "min": 0.0,            "max": 0.0,            "mean": 0.0,            "standardDeviation": 0.0,            "countLong": 0        },        "meanErrorRate": 0.0,        "meanErrorRatio": 1.1102230246251565E-16    }]



Заключение


Таким образом было показано, как можно потратив чуть больше времени и сил, написать приложении интеграции с различными системами, чем каждый раз в своем же приложении писать дополнительные ручные обработчики (пайплайны) для интеграции с другими системами по 200-500 строк кода.
В текущем примере можно распараллеливать работу однотипных потоков на несколько экземпляров средством уникальных идентификаторов избегая коллизий в глобальном контексте приложения между зависимостями потока (бинами, каналами, etc.)
В дополнении можно развивать проект:
сделать сохранение потоков в бд;
сделать поддержку всех интеграционных компонентов, что предоставляет нам сообщество spring и spring-integration;
сделать воркеры, которые по расписанию бы выполняли работу с потоками;
сделать вменяемый UI по конфигурированию потоков условной мышкой и кубиками компонентов (кстати, пример частично затачивался под проект github.com/spring-cloud/spring-cloud-dataflow-ui);

И еще раз продублирую ссылку на репозиторий.
Подробнее..
Категории: Java , Spring , Integration , Data-flow

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru