Русский
Русский
English
Статистика
Реклама

Kafkaconnect

Поиск в Кафке

26.02.2021 10:11:26 | Автор: admin

Меня зовут Сергей Калинец, я архитектор в компании Parimatch Tech, и в этой публикации хочу поделиться нашим опытом в области поиска сообщений в Kafka.

Для нашей компании Kafka является центральной нервной системой, через которую микросервисы обмениваются информацией. От входа до выхода сообщение может пройти через десяток сервисов, которые его фильтруют и трансформируют, перекладывая из одного топика в другой. Этими сервисами владеют разные команды, и очень полезно бывает посмотреть, что же содержится в том или ином сообщении. Особенно интересно это в случаях, когда что-то идет не по плану важно понять на каком этапе все превратилось в тыкву (ну и кому нужно в тыкву дать, чтобы такого больше не повторялось). С высоты птичьего полета решение простое нужно взять соответствующие сообщения из кафки и посмотреть что в них не так. Но, как обычно, интересноеначинаетсяв деталях.

Начнем с того, что кафка это не просто брокер сообщений, как многие думают и пользуются ей, но и распределенный лог. Это значит много чего, но нам интересно то, что сообщения не удаляются из топиков после того, как их прочитали получатели, и технически можно в любой момент их прочитать заново и посмотреть, что там внутри. Однако все усложняется тем, что читать из Kafka можно только последовательно. Нужно знать смещение (для упрощения это порядковый номер в топике), с которого нам нужны сообщения. Также возможно в качестве начальной точки указать время, но потом все равно можно только читать все сообщения по порядку.

Получается, что для того, чтобы найти нужное сообщение, нужно прочитать пачку и найти среди них те, которые интересны. Например, если мы хотим разобраться в проблемах игрока с id=42, нужно найти все сообщения, где он упоминается (playerId: 42), выстроить их в цепочку, ну и дальше уже смотреть, на каком этапе все пошло не так.

В отличии от баз данных вроде MySQL или MSSQL, у которых в комплекте поставки сразу есть клиентские приложения с графическим интерфейсом, ванильная Kafka не балует нас такими изысками и предлагает разве что консольные утилиты с довольно узким (на первый взгляд) функционалом.

Но есть и хорошие новости. На рынке есть ряд решений, которые помогают облегчить весь процесс. Сразу отмечу, что на рынке тут не в смысле за деньги все рассмотренные ниже инструменты бесплатны.

Я опишу некоторые наиболее часто используемые, а в конце более детально разберем самое, на наш взгляд, близкое к идеальному.

Итак, из чего можно выбирать?

Kafka Tool

(скриншот с официального сайта https://www.kafkatool.com/features.html)

Наверное самый популярный инструмент, которым пользуются люди, привыкшие к GUI инструментам. Позволяет увидеть список топиков, и прочитать отдельные сообщения. Есть возможность фильтровать по содержимому сообщений, указывать с какого смещения читать и сколько сообщений нужно прочитать. Но все же нужно знать примерные смещения интересующих нас сообщений, потому что Kafka Tool ищет только в рамках тех сообщений, которые были прочитаны. Говоря словами Дятлова из сериала Чернобыль (в оригинальной озвучке): Not great not terrible.

Часто это единственная альтернатива, с которой вынуждены работать люди, волей судьбы столкнувшиеся с кафкой. Но есть и другие средства.

Kafka Console Consumer

Это одна из утилит, входящая в комплект поставки кафки. И она позволяет читать данные из нее. Как и сама Kafka, это JVM приложение, которое для своей работы требует установленной Java. В принципе, это справедливо и для Kafka Tool, но в данном случае можно обойтись без Java если запускать через docker:

По сути, нужно просто перед самой командой добавить docker run --rm -it taion809/kafka-cli:2.2.0, что на докерском значит запусти вот такой образ, выведи, то, что он показывает, на мой экран и удали образ, когда он закончит работу. Можно пойти еще дальшеи добавить алиас типа

Утилита кажется архаичной и ее консольность может отпугнуть неискушенных адептов графических интерфейсов, но, как и большинство консольных инструментов, при правильном использовании она дает более мощный результат. Как именно рассмотрим на примере следующего инструмента, тоже консольного.

Kafkacat

Это уже весьма могучая штука, которая позволяет читать и писать из Кафки, а также получать список топиков. Она тоже консольная, но при этом поудобнее в использовании, чем стандартные утилиты вроде kafka-console-consumer (и ее тоже можно запустить из докера).

Вот так можно сохранить 10 сообщений в файле messages (в формате JSON):

Файл можно будет использовать для анализа или чтобы воспроизвести какой-то сценарий.Безусловно, для решения этой задачи можно было бы и использовать родной консьюмер, но kafkacat позволяет выразить это короче.

Или вот пример посложнее (в смысле букв немного больше, но решение проще многих других альтернатив):

Код взят с блога одного из самых топовых популяризаторов Kafka экосистемы Robin Moffatt. В двух словах один инстанс kafkacat читает сообщения из топика Kafka, потом сообщения трансформируются в необходимый формат и скармливаются другому инстансу kafkacat, который пишет их в другой топик. Многим такое может показаться вырвиглазным непонятным брейнфаком, однако на самом деле у нас готовое решение, которое можно запустить в докере и оно будет работать. Реализация такого сценария на вот этих ваших дотнетах и джавах потребует больше букв однозначно.

Но это меня уже занесло немного не в ту степь. Статья же про поиск. Так вот, похожим способом можно организовать и поиск сообщений просто перенаправить вывод в какой-то grep и дело с концом.

Из возможностей для улучшений можно отметить тот факт, что kafkacat поддерживает Avro из коробки, а вот protobuf нет.

Kafka Connect + ELK

Все вышеперечисленные штуки работают и решают поставленные задачи, однако не всем удобны. При разборе инцидентов нужно именно поискать сообщения в разных топиках по определенному тексту это может быть определенный идентификатор или имя. Наши QA (а именно они в 90% случаев занимаются подобными расследованиями) наловчились пользоваться Kafka Tool, а некоторые и консольными утилитами. Но это все меркнет по сравнению с возможностями, которые дает Kibana, UI оболочка вокруг базы данных Elasticsearch. Kibana тоже используется нами QA для анализа логов. И не раз поднимался вопрос давайте логировать все сообщения, чтобы можно было искать в Kibana. Но оказалось, что есть способ намного проще, чем добавление вызова логера в каждый из наших сервисов, и имя ему Kafka Connect.

Kafka Connect это решение для интеграции Kafka с другими системами. В двух словах,оно (или он?) позволяет экспортировать из и импортировать данные в Kafka без написания кода. Все, что нужно это поднятый кластер Connect и конфиги наших хотелок в формате JSON. Слово кластер звучит дорого и сложно, но на самом деле это один или больше инстансов, которые можно поднять где угодно мы, например, запускаем их там же, где и обычные сервисы в Kubernetes.

Kafka Connect предоставляет REST API, c помощью которого можно управлять коннекторами, занимающимися перегонкой данных из и в Kafka. Коннекторы задаются конфигурациями, в случае Elasticsearch эта конфигурация может быть вот такой:

Если такой конфиг через HTTP PUT передать на сервер Connect, то, при определенном стечении обстоятельств, создастся коннектор с именем ElasticSinkConnector, который будет в три потока читать данные из топика и писать их в Elastic.

Выглядит всё крайне просто, но самое интересное, конечно же, в деталях. А деталей тут есть )

Большинство проблем связанно с данными. Обычно, как и в нашем случае, нужно работать с форматами данных, разработчики которого явно не думали, что когда-то эти данные будут попадать в Elasticsearch.

Для решения нюансов с данными есть трансформации. Это такие себе функции, которые можно применять к данным и мутировать их, подстраивая под требования получателя. При этом всегда есть возможность использовать любую Kafka клиент технологию для случаев, когда трансформации бессильны. Какие же сценарии мы решали с помощью трансформаций?

В нашем случае есть 4 трансформации. Вначале мы их перечисляем, а потом конфигурим. Трансформации применяются в порядке их перечисления, и это позволяет интересно их комбинировать.

Имена

Вначале добавляем возможность поиска по имени топика просто дополняем наши сообщения полем с нужной информацией.

Индексы

Elasticsearch работает с индексами, которые имеют свойство забиваться и нервировать девопсов. Нужна поддержка удобной для управления схемой индексирования. В нашем случае мы остановились на индексе на сервис / команду с ежедневной ротацией. Плюс решения хранение данные из разных топиков в одном индексе с возможностью контролировать его возраст.

Даты

Для того, чтобы в Kibana можно было искать по дате, необходимо задать поле, в котором эта дата содержится. Мы используем дату публикации сообщения в Kafka. Чтобы ее получить, мы вначале вставляем поле с датой сообщения, а потом конвертируем ее в UTC формат. Конвертация была добавлена, чтобы помочь Elasticsearch распознать в этом поле timestamp, однако в нашем случае это не всегда происходило, поэтому мы добавили index template, который явно говорил в этом поле дата:

В результате у нас сообщения практически мгновенно становятся доступными для анализа, и время, потраченное на разбор инцидентов, уменьшается.

Тут, конечно, стоит отметить, что данная инициатива сейчас у нас на этапе внедрения, поэтому нельзя сказать,что все массово ищут все что нужно в Kibana, но мы к этому уверенно идем.

Вообще, Kafka Connect применимо не только для таких задач. Его можно вполне использовать в тех случаях, где нужна интеграция с другими системами, Реально, например, реализовать полнотекстовый поиск в вашем приложении с помощью двух коннекторов. Один будет читать из операционной базы обновления и писать их в Kafka. А второй читать из Kafka и отсылать в Elasticsearch. Приложение делает поисковый запрос в Elasticsearch, получает id и по нему находит нужные данные в базе.

Заключение

Ну а наша публикация подходит к концу. Очень надеюсь, что вы узнали что-то новое для себя, а иначе зачем это все? Если что-то не получилось раскрыть, или вы категорически не согласны с чем-то, или может есть более удобные способы решения подобных проблем напишите про это в комментариях, обсудим )

Подробнее..

Гибриды побеждают или холивары дорого

11.01.2021 02:05:19 | Автор: admin

Мотивом для написания данной статьи послужил тот факт, что на habr.com участилось появление материалов маркетингового характера про Apache Kafka. А также тот факт, что из статей складывается впечатление что пишут их немного далекие от реального использования люди это конечно же только впечателение, но почему-то в большинстве своем статьи обязательно содержат сравнение Apache Kafka с RabbitMQ, причем не в пользу последнего. Что самое интересное читая подобные статьи управленцы без технического бэкграунда начинают тратить деньги на внутренние исследования, чтобы ведущие разработчики и технические директора выбрали одно из решений. Так как я очень жадный/домовитый, а также так как я сторонник тезиса "В споре НЕ рождается истина" предлагаю вам ознакомится с другим подходом почти без сравнения разных брокеров.


Без сравнения никуда


Вообще, по правильному, я должен был сделать статью в формате Kafka+RabbitMQ+Nats+ActiveMQ+Mosquito+etc, но мне кажется, что для Вас дорогие читатели это будет перебор, хотя обычно в моих архитектурных решениях присутствуют все вышеуказанные сервисы (и не только). И это я еще не рассказываю про AzureServiceBus/AmazonServiceBus которые также участвуют в "гибридах" при крупных программах проектов. Поэтому пока остановимся на связке Kafka+RabbitMQ и далее вы поймете почему: по аналогии можно подключить любой сервис с его протоколом. Потому что:


сравнивая Apache Kafka и RabbitMQ вы сравниваете 2 (два) бренда, а точнее 2 коммерческие компании Confluent и vmWare, и немножко Apache Software Foundation (но это не компания)

то есть формально при сравнении мы должны сравнивать бизнес-модели компаний которые являются основными драйверами развития наших сегодняшних подоопытных. Так как Хабр все таки не портал экономических исследований, поэтому мы для начала должны вспомнить не бренды, а те описания которые стоят за этими брендами (то как сами себя называют наши сегодняшние участники).


  • RabbitMQ мультипротокольный и расширяемый брокер сообщений
  • Apache Kafka платформа для распределенной потоковой передачи событий
  • Confluent Platform платформа потоковой передачи событий с возможностью создания высокопроизводительных конвейеров обработки данных для целей аналитики и интеграции в бизнес-сценариях

Я не зря третьим пунктом выделяю наработки компании Confluent те кто собирается использовать Apache Kafka в продуктиве должны хотя бы видеть какую функциональность дополнительно добавляет Confluent к Apache Kafka. А это SchemeRegistry, RestProxy, kSQL и еще несколько интересных штук, о одной из которых мы поговорим ниже, она называется Kafka-Connect.


Но вернемся к сравнению внимательный читатель видит, что RabbitMQ сам себя называет брокером сообщений выделяя свою главную фишку "мультипротокольность", а товарищи из экосистемы Kafka почему-то называют себя аж платформой (завышенное самомнение оно такое).


Итак чтобы было совсем понятно, куда я веду.


  • ключевая особенность RabbitMQ мультипротокольность и расширяемость. (основной язык якобы Erlang)
  • ключевая особенность экосистемы Kafka потоковая передача с обработкой (основной язык якобы Scala/Java)

Отсюда и возникают минусы каждого из решений


  • для RabbitMQ мы не сможем построить нормального решения для потоковой обработки. Точнее сможем, но НЕ штатно.
  • а для Kafka мы не сможем сделать мультипротокольность, точнее сможем но НЕ штатно.

Сократ не говорил, что в споре рождается истина


Еще одна новость: действительно если почитать источник, то Сократ вообще-то в итоге пришел к тому, что нужно обеспечить диалог, а если по научному то истина рождается в научном споре, который формально представляет собой процесс публикация со ссылкой на источники -> а затем научная критика оппонентов -> истина


А значит перейдем к ссылкам для начала их будет три. Когда 14 лет назад я совместно с коллегами начинал использовать брокеры сообщений в качестве основы для построения своих интеграционных решений, мы сразу обратили внимание, что фактически с точки зрения "клиента" (конечного приложения), под разные задачи подходят разные протоколы интеграции.


  • ODBC
  • AMQP
  • MSMQ
  • XMPP
  • IP over Avian Carriers

так как тогда наша задача была интегрировать всякое (python, C#, java) и 1С был придуман проект One-S-Connectors (https://code.google.com/archive/p/one-c-connectors/source/default/source). Сейчас он имеет сугубо академический интерес (так как в 1С мире моя персона достаточно известна и на Хабре много 1С специалистов из сообщества "воинствующих 1С-ников" эта ссылка специально для них).
Однако уже тогда (в 2006 году) стало понятно, что по большому счету конечному разработчику придется менять/выбирать протокол под бизнес-задачу. А инфраструктурщикам придется обеспечить максимально широкий спектр интеграционных протоколов. От ODBC до Kafka/NATs/ModBus.


Но вернемся к дню сегодняшнему когда я начал использовать в проектах уровня ГИС (госсударственные информационные системы) различные транспорта данных внезапно выяснилось, что универсальные адаптеры это не только концепт воинствующих 1С-ников, но и соседей. Поэтому многие идеи при внедрении черпались из еще двух интересных проектов



маленькое примечание для менеджеров про Kombu как то так получилось, что имплементация протокола Apache Kafka до сих пор открыта https://github.com/celery/kombu/issues/301 и почему-то перешла в разряд "Дайте денег", поэтому для Python проектов приходится использовать дополнительно https://github.com/confluentinc/confluent-kafka-python

Когда вы дочитаете до этого момента предполагаю, что вы зададите вопрос про остальные языки: Java, GoLang, RUST, etc. Но во первых я не зря выше указал что по серьезному в наш обсуждаемый сегодня гибрид нужно добавить историю про NATs и ActiveMQ и внезапно JMS поэтому просьба дочитать до конца: Java будет, а во вторых мы переходим к еще трем полезным ссылкам



Прокоментируем их? Дело в том, что как бы вы не хотели, а для полноценного использования "в длинную" вам придется подписаться на историю релизов как сервера RabbitMQ и самое главное на те самые расширения (лежат в каталоге /deps) которые постоянно добавляются в ядро RabbitMQ, так и на портал компании Confluent где она публикует приложения полезные для конечного бизнеса использующего Apache Kafka в продуктиве.


подход к расширяемости за счет активируемых расширений также используется в экосистеме PostgreSQL тот который CREATE EXTENSION hypopg, так что подход реализованный компанией Pivotal/vmWare далеко не новый в нашем чудесном мире архитектуры программного обеспечения

Дополнительно на чудесном рынке облачных услуг в формате "Серьезная штука как сервис" есть еще один игрок это компания 84Codes https://github.com/84codes. Когда в рамках проектов внедрения нет нормальных инженеров по инфраструктуре именно 84Codes спасает пилотные проекты, потому как у них можно легко арендовать бесплатные/сильнодешевые контура CloudAMQP и CloudKarafka.


Я как бы обещал, что не буду ничего говорить про деньги, однако придется отразить 2 ключевых момента:


  • компания vmWare зарабатывает известно на чем, поэтому RabbitMQ ей развивается как часть своей платформы то есть они инвестируют в открытый проект не особо занимаясь его монетизацией. Возврат их инвестиций происходит в других местах, ну и также за счет контрибьторов на GitHub.
  • а вот компания Confuent собирается монетизировать свою платформу через Enterprise лицензию в которую включает те самые коннекторы Enterprise-Kafka-Connect, а также GUI для управления платформой.

Когда-то давно существовал https://github.com/jcustenborder/kafka-connect-rabbitmq, примечателен тот факт что товарищ Джереми его скрыл, оставив только свои наработки для Java разработчиков в виде Maven Archetype https://github.com/jcustenborder/kafka-connect-archtype еще раз обращаю Ваше внимание, что компания Confluent будет и дальше пытаться монетизировать свою деятельность, так что переводить всю интеграцию только на Kafka я бы на вашем месте поостерегся.

Поэтому когда вам топят за Kafka учитывайте, что вы либо изучаете Java, либо платите за Enterprise лицензию. А когда вам топят за RabbitMQ учитывайте, что либо вы изучаете системное администрирование (Erlang накладывает особенности системного администрирования), либо покупаете сервис у провайдеров типа 84Codes. Кодить на Erlang никогда не придется там это не нужно, если только вы не контрибьюторы OpenStack.


Поставил и забыл уже не работает


Приближаемся к дальнейшему пониманию. Данный раздел уже будет полезен инфраструктурщикам, хотя и разработчикам важно знать, что в эпоху когда семимильными шагами развивается имплементация ITILv4, для того чтобы перейти от текста и менеджерских хитростей про риски и деньги к реальности нам придется осознать 3 тезиса


  • использование только одного протокола интеграции приводит к появлению ProtocolLock и как следствие к VendorLock я же не зря выше написал, что за каждым открытым продуктом, стоит какой-то ключевой комплект вендоров как они себя поведут: мы не знаем.
  • в мире ИТ больше нет серьезных продуктов, которые бы представляли собой монолитную службу все приложения давно стали композитными.
  • все нормальные вендоры сокращают свои релизные циклы по ключевым продуктам нормальной практикой стало выпускать редакции раз в 3 месяца TDD, BDD, CICD, ScallableAgile и DevOps (DocOps, DevSecOps) эти инженерные практики и методики управления не просто так развиваются. Всем очень хочется сокращать себестоимость и TimeToMarket.

Абзац выше важен, как финальный аккорд, прежде чем мы перейдем к Docker-Compose. А именно к нему я вел чтобы и разработчики и инфраструктурщики понимали что такое гибридная инфраструктура в режиме мультипротокольности (с) нужно сделать так, чтобы каждый мог поэкспериментировать с предлагаемым контуром. Как я уже указал выше первично подобное применительно к Kafka+RabbitMQ было подсмотрено именно у коллег из 84Codes (хорошие ребята всем советую https://www.84codes.com/).


Чтобы вы смогли поэкспериментировать сами


Итак подходим к примерам, так как обоснования и вводных уже хватит. Предположим вы уже поняли, что вам также нужна мультипротокольность, однако мы же помним, что все рекламные материалы про Apache Kafka нам рассказывают что это единственное решение с реализацией exactly-ones доставки сообщений от отправителя получателю. Собственно на самом деле нам и нужен гибрид, чтобы сделать из связки ТочкаОбмена->Очередь журнал Kafka (это тот который Topic) чтобы возникла сущность под называнием Offsets у нашей очереди событий.


exactly-ones

проверка на внимательность читающего exactly-ones это шутка в формате "Хотя бы один раз из 1С", а имеется в виду концепт Exactly once строго однократная доставка сообщений получателю, без необходимости повторной отправки от отправителя.


Предлагаю попробовать. Концепт для проверки Вашими руками будет состоять из:


  • Zookeper
  • KafkaBroker
  • RabbitMQ
  • KafkaConnect

и трех приложений приложений


  • отправитель на Python по протоколу AMQP 0.9
  • получатель на С# по протоколу AMQP 1.0
  • получатель на C# по протоколу Kafka

Еще интересное замечание: когда вы смотрите на всякие обучающие видео по Apache Kafka авторы часто (но не всегда) старательно пишут примеры на Java, это они делают скорее всего для того, чтобы скрыть от вас особенности использования librdkafka C++ библиотеки на основе которой сделаны многие не-джава адаптеры,. Я же наоборот предлагаю вам начинать исследование интеграции с Kafka именно с неё, чтобы четко оценивать риски "куда вы ввязываетесь": очень примечательно что там работает фактически один разработчик, формально в одиночку https://github.com/edenhill/librdkafka/pulse/monthly, а допустим wmWare старается поддерживать свою линейку клиентов под своим брендом https://github.com/rabbitmq

ну и самое главное и тяжелое:


контур содержит открытый форк старого RabbitMQ-Kafka-Sinc-Connector того самого который товарищи из Confluent в своё время скрыли с Github.


Докер контура для экспериментов


Для показательного эксперимента мы сделаем 2 композитных приложения инфраструктурное-трансформационное и непосредственно бизнес-приложения.


Развертываем RabbitMQ и Kafka


контур инфраструктуры который нам понадобится запускается достаточно просто


docker-compose -f dockers/infra.yml up -d

Если вам интересно что же там внутри, нашего композитного приложения, то в конце статьи дается ссылка на полный комплект исходников, наиболее интересен в нем Kafka-UI и непосредственно RabbitMQ-Sinc, все остальное обычно и штатно для всех известных примеров по Kafka или RabbitMQ


    image: provectuslabs/kafka-ui:latest    ports:      - 8080:8080    depends_on:      - kafka-broker      - zookeeper    environment:      KAFKA_CLUSTERS_0_NAME: local      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181      KAFKA_CLUSTERS_0_JMXPORT: 9101

Но самое главное кроется в репозитории Java


    <parent>        <groupId>com.github.jcustenborder.kafka.connect</groupId>        <artifactId>kafka-connect-parent</artifactId>        <version>1.0.0</version>    </parent>

Если подробно изучить pom.xml то выяснится, что существует заглавный проект для всех конекторов к Кафка https://github.com/jcustenborder/kafka-connect-parent, в котором используется Java-Kafka-Adapter


И непосредственно синхронизацией c RMQ занимается штатный Java клиент https://www.rabbitmq.com/java-client.html


            <groupId>com.rabbitmq</groupId>            <artifactId>amqp-client</artifactId>            <version>${rabbitmq.version}</version>

Таким образом по правильному, чтобы получились повторить тот же эксперимент что и у меня, необходимо выполнить:


  • собрать из исходников java синхронизатор -1-build-connect-jar.bat
  • собрать контейнер с синхрозатором 00-build-connect-image.sh

и уже потом запустить полный инфраструктурный контур


  • стартуем полный инфраструктурный контур 01-start-infra.sh

обратите внимание так как Docker использует разное поведение при работе с PWD для Windows и Linux приходится делать дубликаты скриптов. В остальных случаях под обоими операционными системами используется интерпретатор sh

В итоге вы получите следующий комплект сервисов



На картинке можно увидеть как подключаются конфигурационные файлы к RabbitMQ и какая топология сетевых портов у нас будет участвовать в эксперименте:


Назначение портов:


  • 9092 будет использоваться для Kafka протокола
  • 8080 используется для отображения красивой картинки состояния Apache Kafka UI
  • 5672 будет использоваться для протокола AMQP 0.9 и он же будет работать и как AMQP 1.0
  • 15672 используется для красивой картинки управления RabbitMQ
  • 28082 отладочный порт для управления через curl трансформатором протоколов

В этот момент нужно остановится и прокомментировать особенность развертывания RabbitMQ в Docker:


  • хорошей практикой является версионирование включенных плагинов расширений enabled-rmq-plugins

[    rabbitmq_management,     rabbitmq_amqp1_0,     rabbitmq_mqtt,     rabbitmq_federation,     rabbitmq_federation_management,    rabbitmq_shovel,    rabbitmq_shovel_management,    rabbitmq_prometheus].

  • а также в крупных проектах когда нужно передать разработчику преднастроенную топологию точек обмена и очередей, можно и нужно добавлять это в виде конфигурационного файла rmq_definitions.json

     "bindings":[        {           "source":"orders-send",           "vhost":"/",           "destination":"orders-amqp-10-consumer",           "destination_type":"queue",           "routing_key":"",           "arguments":{

Запускаем наши приложения


Остается только запустить наши приложения эмулирующие подключения


docker-compose -f dockers/infra.yml restart protocol-connect-syncdocker-compose -f applications.yml builddocker-compose -f applications.yml up

Топология наших тестовых приложений достаточно простая



Исходный код также максимально упрощён:


  • отправляется как-будто бы заказ Васи с периодичностью в 2 секунды

        producer = conn.Producer(serializer='json')        producer.publish({'client': 'Вася', 'count': 10, 'good': 'АйФончик'},                      exchange=order_exchange,                      declare=[kafka_queue, amqp10_queue])        time.sleep(2)

RUN python -m pip install \    kombu \    librabbitmq

причем используется для этого максимально производительная библиотека на Си для AMQP 0.9 librabbitmq наследуется именно от неё https://github.com/alanxz/rabbitmq-c


  • создан подписчик который уже по протоколу AMQP 1.0 смотрит в свою очередь и получает события, соответственно очередь очищается и больше мы заказов Васи не получим. В этом потоке нам это и не нужно.

            Attach recvAttach = new Attach()            {                Source = new Source()                {                    Address = "orders-amqp-10-consumer",                    Durable = 1,                },

            ReceiverLink receiver =                 new ReceiverLink(session,"netcore_amqp_10_consumer", recvAttach, null);            Console.WriteLine("Receiver connected to broker.");            while (true) {                Message message = receiver.Receive();                if (message == null)                {                    Console.WriteLine("Client exiting.");                    break;                }                Console.WriteLine("Received "                   + System.Text.Encoding.UTF8.GetString((byte[])message.Body)

Причем в качестве драйвера выбран


  <ItemGroup>    <PackageReference Include="AMQPNetLite.Core" Version="2.4.1" />  </ItemGroup>

именно его https://github.com/Azure/amqpnetlite Microsoft использует для маркетинга своей реализации сервисной шины. Собственно именно AMQP 1.0 как протокол они и рекламируют https://docs.microsoft.com/ru-ru/azure/service-bus-messaging/service-bus-amqp-overview


Ну и финально


  • создан подписчик по протоколу Kafka который при каждом старте перечитывает с нуля журнал отправленных заказов Васи. Тот самый Exactly once.

                AutoOffsetReset = AutoOffsetReset.Earliest

                c.Subscribe("orders-from-amqp");

                    while (true)                    {                        try                        {                            var cr = c.Consume(cts.Token);

Выглядит наш контур в итоге следующим образом:


  • 5 инфраструктурных контейнеров


  • 3 контейнера с приложениями


  • готовый журнал транзакций заказов который можно посмотреть через Kafka-Ui


  • и готовый контур связей для RabbitMQ


А где же Java ?


Не волнуйтесь при таком гибридном подходе, без неё никуда, для того чтобы всё вышеуказанное заработало пришлось сделать форк и актуализировать версии Kafka-Connect-Base


[submodule "dockers/rabbitmq-kafka-sink"]    path = dockers/rabbitmq-kafka-sink    url = https://github.com/aliczin/kafka-connect-rabbitmq

Но самое интересное не это, самое интересное что в этом самом Kafka-Connect нет по сути никакой магии только код трансформации.


По сути нам предлагают:


  • создать наследника абстрактной задачи Источника

public class RabbitMQSourceTask extends SourceTask {

  • выполнить подписку на очередь сообщений

        this.channel.basicConsume(queue, this.consumer);        log.info("Setting channel.basicQos({}, {});", this.config.prefetchCount, this.config.prefetchGlobal);        this.channel.basicQos(this.config.prefetchCount, this.config.prefetchGlobal);

  • трасформировать полученные сообщения в абстрактные записи причем с буфером.

  @Override  public List<SourceRecord> poll() throws InterruptedException {    List<SourceRecord> batch = new ArrayList<>(4096);    while (!this.records.drain(batch)) {

Отдельно можно выделить чудесный трансформатор сообщений из AMQP 0.9 в Кафка. У несведующего в Java глаз может задергаться. У автора чувствуется многолетный опыт работы в J2EE.


  private static final Logger log = LoggerFactory.getLogger(MessageConverter.class);  static final String FIELD_ENVELOPE_DELIVERYTAG = "deliveryTag";  static final String FIELD_ENVELOPE_ISREDELIVER = "isRedeliver";  static final String FIELD_ENVELOPE_EXCHANGE = "exchange";  static final String FIELD_ENVELOPE_ROUTINGKEY = "routingKey";  static final Schema SCHEMA_ENVELOPE = SchemaBuilder.struct()      .name("com.github.jcustenborder.kafka.connect.rabbitmq.Envelope")      .doc("Encapsulates a group of parameters used for AMQP's Basic methods. See " +          "`Envelope <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html>`_")      .field(FIELD_ENVELOPE_DELIVERYTAG, SchemaBuilder.int64().doc("The delivery tag included in this parameter envelope. See `Envelope.getDeliveryTag() <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html#getDeliveryTag-->`_").build())      .field(FIELD_ENVELOPE_ISREDELIVER, SchemaBuilder.bool().doc("The redelivery flag included in this parameter envelope. See `Envelope.isRedeliver() <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html#isRedeliver-->`_").build())      .field(FIELD_ENVELOPE_EXCHANGE, SchemaBuilder.string().optional().doc("The name of the exchange included in this parameter envelope. See `Envelope.getExchange() <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html#getExchange-->`_"))      .field(FIELD_ENVELOPE_ROUTINGKEY, SchemaBuilder.string().optional().doc("The routing key included in this parameter envelope. See `Envelope.getRoutingKey() <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html#getRoutingKey-->`_").build())      .build();

Но Не будем критиковать, мы же в самом начале договорились что наша главная задача добиться конечного результата удобным на сегодня способом. А итоги у нас получаются следующие.


Итоги


Все что здесь продемонстрировано естественно лежит на Github.


В репозитории https://github.com/aliczin/hybrid-eventing. Лицензия выставленна простая до невозможности Creative Commons Attribution 4.0 International.


Полезно использовать в обучающих целях для команд разработки и инфраструктуры и поиграться с DevOps и поиграться с мультипротокольными приложениями. Ничего особо экстравагантного в данном концепте конечно нет, ключевое тут я как я написал в самом начале мы делаем избыточное количество интеграционных протоколов, добавляя транформаторов между потоками интеграции.


Схема коммуникации в итоге для "разработчика интеграционных потоков" (с) выглядит следующим образом для источника и брокеров


orderEventsApp->Amqp09: send orderAmqp09->Amqp10: fanout\n copy eventAmqp09->KafkaQ: fanout\n copy eventKafkaQ->KafkaConnect: consume\n on messageKafkaConnect->KafkaConnect: transform\n messageKafkaConnect->Kafka: publish to topic


а для приемников все упрощается


Amqp10->orderEventSubApp: subcribe\n for eventorderJournalApp->Kafka: read kafka journal


Приемники берут нужные им данные только по нужному им протоколу

Ключевые посылы


Ключевые моменты которые я хотел расскрыть данной статьей


  • стройте эксперименты и продуктивы с Apache Kafka не со штатным Java клиентом, а librdkafka и базирующихся на ней адаптерах это позволит вам отладить сценарии разных версий протоколов KafkaAPI. Java вам пригодится в другом месте.


  • не ввязывайтесь с священные войны, что лучше RabbitMQ/Kafka/Nats/ActiveMQ просто развертывайте сервисы и публикуйте протоколы и пробуйте свои бизнес-сценарии.


  • начните уже внедрять продуктивный Docker, или хотя бы пилотные и разработческие контура.


  • реальный ИТ ландшафт почти всегда будет мультипротокольным



Примечание для понимающих


чтобы гибриды развивались дальше:


  • Mosquito очень удобен как встраиваемый брокер на уровне контролера SCADA для преобразования из ModBus/OPC-UA. Хотя как вы уже поняли из статьи интересны реализации "мостов из протокола в протокол" пример https://github.com/mainflux/mainflux


  • ActiveMQ удобен для Java разработчиков, потому что у них есть боязнь Erlang, но как мы выше уже сказали мост RabbitMQ AMQP 1.0 -> ActiveMQ легко организуется средствами RabbitMQ, кстати также как и JMS.


  • NATs интересен как часть OpenFaaS платформы, при внедрении "своего маленького" Amazon Lambda с преферансом. И опять же подход будет всё тот же мультипротокольные мосты с трансформацией: https://github.com/nats-io/nats-kafka если Вам не страшно посмотрите эксперименты с OpenFaaS веселых 1С-ников 2.5 часа примеров https://youtu.be/8sF-oGGVa9M



Надеюсь мой архитектурный подход Вам придется по душе и вы перестанете тратить деньги заказчика (инвестора/свои если вы стартапщик: Маша это замечание специально для тебя) на бессмысленные обсуждения что же выбрать в качестве брокера/платформы, и начнете наконец-то делать функциональность, которая будет использовать тот протокол, который удобен прямо сейчас. С возможностью переключения в случае "если чё"


Функциональность: Мультипротокольный адаптер    Как разработчик я хочу иметь абстракцию Produser/Consumer    С возможность изменения протокола интеграции    Чтобы под каждую задачу выбирать разные протоколы     и единый интерфейс вызова для обеспечения независимости от вендора предоставляющего транспортСценарий: vmWare реализует протокол Stream средствами RabbitMQ     Когда vmWare закончит свой плагин для потоков    Тогда я активирую новый протокол     И быстро воткну его в приложение    И так как у меня есть продуктивный кластер RabbitMQ    И мне нужно будет просто поменять канал для отдельных бизнес сценариевСценарий: Завтра придут 1С-ники со своим ActiveMQ из Шины для 1С    Когда мне нужно быстро включить очереди 1С в общий контур    И чтобы на Питоне использовать старые наработки с Kafka API    Тогда я добавляю трансформацию ActivemeMQ2Kafka    и живу по старому а события ходят уже и из 1Сetc

А чтобы вы не думали, что данный подход это нечто уникальное вот Вам еще интересная ссылка: https://github.com/fclairamb/ftpserver/pull/34 это когда нужен FTP сервер, а хочется S3.


Ну и в качестве финального момента обратите внимание: есть и риски данного подхода: но они я думаю Вам и так понятны.


  • Придется оркестрировать такой комплект сервисов и вручную это почти невозможно. Придется использовать DevOps штуки типа k8s, OpenShift, etc но если вы уже решились на интеграцию в режимах слабой связаности приложений в режиме онлайн, у вас что-то на эту тему уже скорее всего есть.
  • Трансформаторы между протоколами приходится дорабатывать ничего готового открытого и PRODUCTION-READY на данный момент найти почти невозможно.

Финальное примечение для любителей писать ТЗ по ГОСТу


так как Хабр читают любители цифровой трансформации (чтобы кто не понимал под этим словом) советую в техническое задание добавлять не упоминание конкретных реализации серверов, а что-то примерно следующее:


комплект программ для интеграции должен реализовывать коммуникацию конечных приложений по открытым протоколам HTTP, AMQP 0.9, AMQP 1.0, Apache Kafka не ниже версии 23, MQTT, WebSockets, <ЛюбойДругойХотьSOAPХотяЭтоЖуть> с возможность преобразования между протоколами дополнительными средствами администрирования

Надеюсь моя публикация после долгого перерыва Вам будет полезна в ваших интеграционных проектах. Предполагаю что будет вопрос про 1С и тут у меня совет только один. Используйте Google по ключевым словам 1С+RabbitMQ или 1С+Kafka или 1С+OpenFaas и RabbitMQ и Kafka "в 1С" давно и непринужденно используются. Потому что 1С это не только язык, но и несколько сообществ где уже давно сделаны все возможные адаптеры и платные и бесплатные. Собственно как и в Java/C#/Python/C++/Rust/etc.


Данная статья написана с применением расширения https://shd101wyy.github.io/markdown-preview-enhanced для Visual Studio Code за что автору летят дополнительные лучи добра.


Ну и в качестве финального момента хотел бы заметить, что выбор Cunfluent Inc в качестве платформы разработки Kafka-Connect экосистемы JDK выглядит все таки странно. Не удивлюсь если их конкуренты сделают такое же, но на GoLang, NodeJS (что-нибудь типа Kafka-Beats-Hub)



Красивые картинки в формате GraphViz я делаю при помощи хитрого проекта Docker2GraphViz помогает поддерживать актуальный контур и техническую документацию в формате Markdown


set CURPATH=%~dp0set DOCKER_DIR=%CURPATH%\dockersdocker run --rm -it --name dcv -v %DOCKER_DIR%\:/input pmsipilot/docker-compose-viz render -m image --force --output-file=infra-topology.png infra.ymldocker run --rm -it --name dcv -v %CURPATH%\:/input pmsipilot/docker-compose-viz render -m image --force --output-file=apps-topology.png applications.ymlcopy /b/v/y dockers\infra-topology.png content\assets\infra-topology.pngcopy /b/v/y apps-topology.png content\assets\apps-topology.png
Подробнее..

Как синхронизировать сотни таблиц базы в Kafka, не написав ни одного продюсера

25.11.2020 14:11:26 | Автор: admin


Привет, Хабр! Меня зовут Сергей Бевзенко, я ведущий разработчик Delivery Club в команде Discovery. Наша команда занимается навигацией пользователя по приложению Delivery Club: мы отвечаем за основную выдачу ресторанов, поиск и всё, что с этим связано.

Я расскажу про Kafka Connect: что это такое, какова общая концепция и как работать с этим фреймворком. Это будет полезно тем, кто использует Kafka, но не знаком с Kafka Connect. Если у вас огромный монолит и вы хотите перейти на событийную модель, но сталкиваетесь со сложностью написания продюсеров, то вы тоже найдёте здесь ответы на свои вопросы. В комментариях можем сравнить ваш опыт использования Kafka Connect с нашим и обсудить любые вопросы, которые с этим связаны.

План


1. Предпосылки
2. Как используется Kafka Connect
2.1. Как запустить Kafka Connect
3. Запуск коннекторов
4. Настройка коннекторов
4.1. Причины выбора коннекторов
4.2. Jdbc и Debezium
4.3. Debezium Connector
4.4. JdbcSinkConnector
4.5. Трансформеры
5. Deploy
5.1. Deploy Kafka Connect Delivery Club
6. Что нам дало использование Kafka Connect

Предпосылки


Delivery Club не молодая компания. Она основана в сентябре 2009 года. Мы постоянно развиваемся и улучшаем наши сервисы, без этого рост невозможен.

У нас есть 10-летний Legacy-монолит. Он служит основой многих процессов. Да, новые сервисы мы, конечно же, пишем. Делаем это на Go, и иногда на PHP. Это два основных языка backend-разработки в Delivery Club. Также мы переходим на событийную модель с использованием шины событий: все изменения данных в системе это события, попадающие в шину, и любой сервис может подписаться на них.

Какие это события?


В компании есть множество интеграции с различными ресторанами, магазинами, аптеками и т.д. Также у нас есть служба логистики, которая работает с курьерами, их маршрутами, заказами, распределением. Есть и отличный отдел R&D, который занимается различными исследованиями и околонаучной разработкой. И, конечно, есть другие отделы. У каждого направления множество сервисов, и все они генерируют огромное количество событий. В качестве шины для них мы используем Apache Kafka. Но десятилетний Legacy никуда не делся. Внутри него множество админок, которые являются источниками данных. Без крайней нужды трогать их не рекомендуется.

Сервис Каталог


Как один из этапов развития, перед нашей командой стояла задача переписать основную выдачу приложения. За неё отвечал монолит, как и за большую часть функциональности. И наступил момент, когда вносить какие-либо изменения в эту функциональность стало невероятно долго.

В нашем случае всё началось с небольшой задачи: отображать в основной выдаче дополнительные ярлыки у ресторанов, в которых есть какие-то акции. Решений было несколько, но большинство из них сильно повышало нагрузку на базу и увеличивало время ответа. Но, надо признаться, выдача и так была не особо быстрой.

Единственным оптимальным решением было написать на Go новый сервис, который помог бы решить все проблемы, имевшиеся в монолите. К тому же мы смогли сильно (в три раза) сократить время ответа.

Но наш монолит является мастером данных для основной выдачи, и новый сервис должен иметь к ним доступ.

Как писать продюсеры в условиях 10-летнего Legacy


В самой первой версии Catalog MVP мы ходили в реплику монолита, чтобы быстро запуститься (для нас важен Time to market). Но оставлять так мы не хотели, поэтому нужно было денормализовать данные из монолита. А для этого необходимо начать продьюсить данные.

Есть несколько подходов:

  1. Переписать монолит. Тут вспоминаем все те статьи, доклады и книги о том, как переписывать монолит. Это сложный и долгий процесс. Он связан с большим количеством рисков. Конечно, мы выносим функциональность из монолита, но делаем это постепенно, аккуратно. Не в ущерб бизнесу.
  2. Писать свои продюсеры в монолите. Надо найти все места в коде, где происходит изменения в базе. В этих местах добавлять также отправку событий в шину. Если у вас хорошая архитектура монолита, с выделенным слоем репозитория, то сделать это лишь вопрос времени. Но Legacy не будет Legacy, если там всё хорошо с архитектурой. Так что этот вариант тоже очень сложен и трудозатратен.
  3. Использовать готовые решения для интеграции базы данных и Kafka. Можно использовать фреймворк Kafka Connect.

Kafka Connect


Как он используется


Чаще всего Kafka используют так:

Source => Kafka

Kafka => Kafka

Kafka => Storage

Kafka => App



То есть нам приходится писать собственные консьюмеры и продюсеры и решать однообразные задачи при их разработке:

  • Прописывать правила подключения к источникам.
  • Обрабатывать ошибки.
  • Прописывать правила ретраев.

Наиболее полно API Kafka поддерживается только в языках Java и Scala. В других языках поддержка не всегда полная. Поэтому разработчики Kafka предложили свои инструменты для решения таких задач: фреймворки Kafka Connect и Kafka Streams:

Source => Kafka (connect)

Kafka => Kafka (streams)

Kafka => Storage (connect)

Kafka => App



Когда говорят, что Kafka Connect поставляется вместе с Kafka, это не какая-то скрытая функциональность Kafka-брокеров. Это именно отдельное приложение, которое имеет настройки подключения к Kafka и источнику/приёмнику. Работу с Kafka Connect мы рассмотрим ниже.

Но сначала нужно ввести три важных термина:

  • worker инстанс/сервер Kafka Connect;
  • connector Java class + пользовательские настройки;
  • task один процесс connector'a.

Worker экземпляр Kafka Connect. Kafka Connect можно запускать в двух режимах: standalone и distributed, на нескольких нодах или виртуальных машинах. То есть можно просто запустить один worker или собрать кластер workerов. Рекомендуется использовать standalone-режим при локальной разработке, настройке и отладке коннекторов, а distributed в боевых условиях.

Преимущество distributed mode


Предположим, мы запустили четыре worker'а Kafka Connect и создали три connector'а с разным количеством task'ов.

  • Во-первых, Kafka Connect автоматически распределит таски коннекторов по разным воркерам.
  • Во-вторых, Kafka Connect отслеживает своё состояние в кластере. Если обнаружит, что один из воркеров недоступен, выполнит перебалансировку и перераспределит недоступные таски по работающим воркерам.

Какие ещё задачи решает Kafka Connect:

  • отказоустойчивость (fault tolerance);
  • принцип только один раз (exactly once);
  • распределение (distribution);
  • упорядочивание (ordering).



Как я говорил выше, фреймворк используется для передачи данных из источника в Kafka либо из Kafka в приёмник. В соответствии с этим коннекторы делятся на два вида:

  • Source Connectors;
  • Sink Connectors.



Коннекторов уже очень много написано. Например, на сайте confluent их сейчас 163, а на просторах интернета ещё больше.

Вы можете написать свой коннектор на Java и Scala. Для этого нужно создать подключаемый jar-файл, реализовав простой интерфейс коннектора.

Как запустить Kafka Connect


Локально


Поставляется вместе с Kafka

Идём на сайт Kafka и скачиваем нужную нам версию: http://kafka.apache.org/downloads.

Binary downloads:

  • Scala 2.12 - kafka_2.12-2.6.0.tgz (asc, sha512)
  • Scala 2.13 - kafka_2.13-2.6.0.tgz (asc, sha512)

Например, выберем версию Scala 2.12 (kafka_2.12-2.6.0.tgz). Распакуем архив и посмотрим в директорию kafka_2.12-2.6.0/bin. Там будут скрипты для запуска Apache Kafka (kafka-server-start.sh, kafka-server-stop.sh) и утилиты для работы с ней. Например, kafka-console-consumer.sh, kafka-console-producer.sh. А также там будут скрипты для запуска Kafka Connect (connect-distributed.sh, connect-standalone.sh), и многое другое.

Рекомендую зайти в директорию kafka_2.12-2.6.0/config там вы увидите настройки по умолчанию запуска и Kafka-брокера, и Kafka Connect.

  • connect-distributed.properties
  • connect-standalone.properties

Вот так выглядит конфигурация по умолчанию config/connect-distributed.properties:

bootstrap.servers=localhost:9092rest.port=8083group.id=connect-clusterkey.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=truevalue.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter.schemas.enable=falseinternal.value.converter.schemas.enable=falseoffset.storage.topic=connect-offsetsconfig.storage.topic=connect-configsstatus.storage.topic=connect-statusoffset.flush.interval.ms=10000plugin.path=/opt/kafka/plugins

Kafka Connect можно запускать в режиме standalone. Это удобно для локальной разработки и тестирования, но в боевых условиях рекомендуется использовать connect-distributed (причины были описаны выше).

Режим standalone чаще всего используется для локальной разработке и тестирования.

Чтобы запустить Kafka Connect, выполните команду:

cd kafka_2.12-2.6.0bin/connect-standalone.sh config/connect-standalone.properties

Docker

Во многих Docker-образах используется этот же подход, поэтому вам достаточно переопределить CMD в Dockerfile, чтобы получить образ с Kafka Connect.

Например:

CMD ["bin/connect-distributed.sh", "cfg/connect-distributed.properties"]

Конечно, есть и готовые образы. Я рекомендую использовать варианты от компании Confluent:


Запуск коннекторов


После того, как вы запустите Kafka Connect, вы можете запускать на нём свои коннекторы.

Для управления Kafka Connect используется REST API. Полную документацию по нему можно посмотреть на сайте. Я опишу лишь те методы, которые нам понадобятся для демонстрации работы Kafka Connect.

Запросим список классов коннекторов, которые добавлены в ваш Kafka Connect:

curl -X GET "${KAFKA_CONNECT_HOST}/connector-plugins" -H "Content-Type: application/json"

В ответ мы получим нечто подобное:

HTTP/1.1 200 OK

[    {        "class": "io.debezium.connector.mysql.MySqlConnector"    },    {        "class": "io.confluent.connect.jdbc.JdbcSinkConnector"    }]

То есть вы можете создавать коннекторы только этих классов. Если хотите добавить новый класс, нужно скачать jar этого коннектора и добавить в директорию plugin.path из настройки Kafka Connect. См. файл connect-distributed.properties.

Запросим список запущенных коннекторов:

curl -X GET "${KAFKA_CONNECT_HOST}/connectors" -H "Content-Type: application/json"

В ответ получим:

HTTP/1.1 200 OK

Content-Type: application/json ["my-source-debezium", "my-sink-jdbc"]

Видим, что у нас создано два коннектора с именами my-source-debezium и my-sink-jdbc.

Получение информации о запущенном коннекторе

Общая информация:

curl -X GET "${KAFKA_CONNECT_HOST}/connectors/my-sink-jdbc" -H "Content-Type: application/json"

Конфигурация запущенного коннектора (config):

curl -X GET "${KAFKA_CONNECT_HOST}/connectors/my-sink-jdbc/config" -H "Content-Type: application/json"

Состояние запущенного коннектора (status):

curl -X GET "${KAFKA_CONNECT_HOST}/connectors/my-sink-jdbc/status" -H "Content-Type: application/json"

Создание коннектора


Пример:

curl -X POST "${KAFKA_CONNECT_HOST}/connectors" -H "Content-Type: application/json" -d '{ \    "name": "my-new-connector", \    "config": { \      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", \      "tasks.max": 1,      "topics": "mysql-table01,mysql-table02", \      "connection.url": "jdbc:postgresql://postgres:5432/catalog", \      "connection.user": "postgres", \      "connection.password": "postgres", \      "auto.create": "true" \    } \  }'

То есть необходимо методом POST отправить конфигурацию коннектора.

Обратите внимание, что имя коннектора должно быть уникальным в вашем кластере Kafka Connect. Но вы можете создавать несколько коннекторов одного класса с разными настройками.

Также у любого коннектора есть три обязательных параметра:

  • name уникальное имя;
  • connector.class класс коннектора;
  • tasks.max максимальное количество потоков, в которых может работать коннектор.

Настройка коннекторов


Я хотел бы рассказать про настройку коннекторов на примере DebeziumMysqlConnector и JdbcSinkConnector. С этих классов мы в Delivery Club начали работу. Но сначала я расскажу, почему вы выбрали именно их.

Причины выбора коннекторов


Как я рассказывал, мы выносили функциональность из нашего монолита. Сделали новый сервис Каталог, который отвечает за основную выдачу ресторанов.

Но для этой функциональности были необходимы данные, мастером которых был монолит. Эти данные ещё не отправлялись в шину событий.

Для MVP Каталога решили использовать Shared Database. То есть наш новый сервис обращался в базу монолита.



Таким образом мы сняли нагрузку с монолита, но нагрузка на старую базу осталась. После создания MVP нужно закрыть технический долг и отказаться от этого антипаттерна.



Две главные задачи, которые мы решали:

  • переход на событийную модель (первый этап);
  • разгрузка базы данных.

Jdbc и Debezium


Когда ищешь коннекторы для баз данных, первое, что находишь JdbcSourceConnector и JdbcSinkConnector.

Нам отлично подходит JdbcSinkConnector в качестве sink-коннектора. Он подписывается на топик Kafka и выполняет запросы на добавление, изменение и удаление данных в базе.

Но в качестве Source-коннектора он нам не подходит, так как делает SQL-запросы в базу по таймеру, а это создает ещё большую нагрузку на базу-источник. Мы как раз хотим от этого уйти.

Но нам подходит DebeziumMysqlConnector. Он делает одну классную вещь: подключается к MySQL-кластеру как обычная MySQL-реплика и умеет читать бинлог. Таким образом, мы не создаём дополнительную нагрузку на базу (за исключением встроенных механизмов MySQL-репликации).



Помимо этого, у Debezium-коннектора есть ещё одно преимущество перед Jdbc. Так как Debezium отслеживает бинлог, он может определять моменты удаления записей в базе данных. У Jdbc нет такой возможности, так как он берёт текущее состояние базы и ничего не знает о предыдущем состоянии.

Debezium Connector




Все настройки коннектора можно посмотреть на сайте.

Давайте рассмотрим настройки коннектора и обсудим выбор некоторых параметров.

Файл debezium-config.json:

{  "name": "my-debezium-mysql-connector",  "config": {    "tasks.msx": 1,    "connector.class": "io.debezium.connector.mysql.MySqlConnector",    "database.hostname": "${MYSQL_HOST}",    "database.serverTimezone": "Europe/Moscow",    "database.port": "${MYSQL_PORT}",    "database.user": "${MYSQL_USER}",    "database.password": "${MYSQL_PASS}",    "database.server.id": "223355",    "database.server.name": "monolyth_db",    "table.whitelist": "${MYSQL_DB}.table_name1",    "database.history.kafka.bootstrap.servers": "${KAFKA_BROKER}",    "database.history.kafka.topic": "monolyth_db.debezium.history",    "database.history.skip.unparseable.ddl": true,    "snapshot.mode": "initial",    "time.precision.mode": "connect"  }}

Подключения к базе данных:

    "database.hostname": "${MYSQL_HOST}",    "database.serverTimezone": "Europe/Moscow",    "database.port": "${MYSQL_PORT}",    "database.user": "${MYSQL_USER}",    "database.password": "${MYSQL_PASS}",

Следует иметь в виду, что этот пользователь должен иметь права:

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

ID реплики, под которым будет зарегистрирован коннектор, и его имя сервера:

    "database.server.id": "223355",    "database.server.name": "monolyth_db",

Список таблиц для синхронизации:

"table.whitelist": "${MYSQL_DB}.table_name1,${MYSQL_DB}.table_name2",

Имена базы и таблицы необходимо указывать через запятую.

Настройки создания snapshot'а:

    "database.history.kafka.bootstrap.servers": "${KAFKA_BROKER}",    "database.history.kafka.topic": "debezium.db.history",    "snapshot.mode": "initial",

Для чего нужен snapshot

Когда ваш коннектор Debezium MySQL запускается в первый раз, он выполняет начальный согласованный снимок вашей базы данных и сохраняет его в топик Kafka. Даже если вы будете отслеживать только несколько таблиц из базы, в database.history будет записана вся схема. Но можно не переживать из-за размера этого топика, он будет очень маленьким (менее 1 Мб).

Пропуск определений в снимке, которые по каким-то причинам не удалось распарсить:

    "database.history.skip.unparseable.ddl": true,

Эту опцию мы включили, потому что сталкивались с такими ошибками, когда определения в бинлоге использовали неверный синтаксис. Сервер MySQL более-менее интерпретирует эти инструкции и потому не падает. Но анализатор SQL-запросов в DebeziumConnector'е с ними не справляется и падает с ошибкой. Чтобы не падать, а игнорировать нечитаемые запросы, необходимо включить эту опцию.

Точность типа данных time:

"time.precision.mode": "connect",

Эта настройка уменьшает точность типа данных time с микросекунд до миллисекунд.

Описанную конфигурацию уже можно использовать для production-окружения. А в документации есть полный перечень настроек с подробным описанием.

Также нашу конфигурацию можно дополнить различными трансформерами по преобразованию данных и маршрута топиков. И один из важнейших трансформеров в проекте Debezium io.debezium.transforms.ExtractNewRecordState. Почитать подробнее о нём можно в документации. Если кратко: вам потребуется его использовать для преобразования формата Debezium в формат Jdbc.

В целом, все трансформации рекомендуется использовать на стороне Sink-коннектора, а Source-коннекторы должны отправлять данные в топик Kafka без изменений.

Создание Debezium MySqlConnector:


curl  -X POST ${KAFKA_CONNECT_HOST}/connectors -H "Content-Type: application/json" -d @debezium-config.json

При создании коннектора вы можете получить ошибку:

Connector configuration is invalid and contains the following 1 error(s):
Configuration is not defined: database.history.connector.id
Configuration is not defined: database.history.connector.class
Unable to connect: Communications link failure


The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate````


Эта ошибка говорит о том, что у вас указаны некорректные параметры подключения к MySQL. Проверьте ваши логин/пароль, а также убедитесь, что у пользователя есть права на репликацию (см. выше).

После создания коннектора можно проверить его состояние. Этот метод также используется в качестве метрики.

curl  -X GET ${KAFKA_CONNECT_HOST}/connectors/my-debezium-mysql-connector/status

Мы увидим такой ответ:

{  "name": "my-debezium-mysql-connector",  "connector": {    "state": "RUNNING",    "worker_id": "connect:8080"  },  "tasks": [    {      "id": 0,      "state": "RUNNING",      "worker_id": "connect:8080"    }  ],  "type": "source"}

После того, как мы запустили source connector, можно убедиться, что топики были созданы и можно прочитать из них данные. Для работы с Kafka будем использовать удобную утилиту kafkacat.

Какие топики были созданы нашим коннектором:

kafkacat -b ${KAFKA_BROKER} -L | grep 'monolyth_db'

Чтение данных из топика monolyth_db.debezium.history:

kafkacat -b ${KAFKA_BROKER} -t monolyth_db.debezium.history -C -f 'Offset: %o\nKey: %k\nPayload: %s\n--\n'

Чтение данных из топика monolyth_db.table_name1 (${MYSQL_DB} имя вашей базы данных):

kafkacat -b ${KAFKA_BROKER} -t monolyth_db.${MYSQL_DB}.table_name1 -C -f 'Offset: %o\nKey: %k\nPayload: %s\n--\n'

В топиках вы увидите сообщения в формате avro (если вы использовали JsonSerializer для key, value серилизаторов). Вид и описание формата лучше прочитать в документации.

JdbcSinkConnector


В качестве Sink коннектора будем использовать JdbcSinkConnector.



Рассмотрим его конфигурацию

Создадим файл my-jdbc-sink-connector.json:

{  "name": "my-jdbc-sink-connector",  "config": {    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",    "tasks.max": "2",    "connection.url": "jdbc:postgresql://${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}",    "connection.user": "${POSTGRES_USER}",    "connection.password": "${POSTGRES_PASS}",    "topics": "monolyth_db.${MYSQL_DB}.table_name1,monolyth_db.${MYSQL_DB}.table_name2",    "pk.fields": "id",    "pk.mode": "record_key",    "auto.create": "false",    "auto.evolve": "false",    "insert.mode": "upsert",    "delete.enabled": "true",    "transforms": "route,unwrap,rename_field,ts_updated_at,only_fields",    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",    "transforms.route.replacement": "${PG_DB}.public.$3",    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",    "transforms.unwrap.drop.tombstones": "false",    "transforms.rename_field.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",    "transforms.rename_field.renames": "isDeleted:is_deleted,isActive:is_active",    "transforms.ts_updated_at.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",    "transforms.ts_updated_at.target.type": "Timestamp",    "transforms.ts_updated_at.field": "updated_at",    "transforms.ts_updated_at.format": "yyyy-MM-dd'T'HH:mm:ssXXX",    "transforms.only_fields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",    "transforms.only_fields.whitelist": "id,title,url_tag,sort,hide,created_at,updated_at"  }}

Тут, конечно, три обязательных для любого коннектора параметра:

"name": "my-jdbc-sink-connector","connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "2",

Настройки подключения:

"connection.url": "jdbc:postgresql://${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}","connection.user": "${POSTGRES_USER}","connection.password": "${POSTGRES_PASS}",

Потом перечисление топиков, на которые будем подписываться:

"topics": "monolyth_db.${MYSQL_DB}.table_name1,monolyth_db.${MYSQL_DB}.table_name2",

JdbcConnector использует один топик для одной таблицы. Сопоставление топика и таблицы происходит по имени. Для коррекции используется route-трансформер. О трансформерах поговорим чуть ниже.

Если вы указываете несколько топиков, то у них у всех должны быть одинаковые pk.fields.

Сообщения в Kafka имеют ключ, создаваемый на основании первичного ключа (Primary Key) таблицы. Какой именно PR в таблице, необходимо указать в параметрах pk.fields, чаще всего это просто id:

"pk.fields": "id","pk.mode": "record_key",

Ключ может быть составной. Например, для кросс-таблиц:

"pk.fields": "user_id,service_id",

Следующие параметры очень красноречивые. Мы отключаем автоматическое создание и удаление таблиц, и разрешаем удалять данные:

"auto.create": "false","auto.evolve": "false","insert.mode": "upsert","delete.enabled": "true",

Трансформеры


Последний блок настроек касается трансформеров.

"transforms": "route, unwrap, rename_field, ts_updated_at, only_fields",

Этот параметр указывает, какие трансформеры и в каком порядке выполнять. Они расположены в этой же конфигурации коннектора. Каждый трансформер имеет type (класс) и параметры.

Например, трансформер route отвечает за сопоставление имени топика и имени таблицы:

"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)","transforms.route.replacement": "${PG_DB}.public.$3",

Он используется в Debezium MySqlConnector: отправляет данные в Kafka топики с именами {server_name}.{database_name}.{table_name}, а JdbcSinkConnector принимает {database_name}.{schema_name}.{table_name}. Так как целевая база и таблица могут отличаться по именам (и у вас вряд ли имя базы будет public), то этот коннектор изменяет целевое имя топика.

Второй важный трансформер unwrap:

"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": "false",

Он преобразует формат Debezium в формат, с которым прекрасно работает JdbcSinkConnector.

Трансформеры rename_field, ts_updated_at и only_fields используются для переименования полей, преобразования дат и указания списка тех полей, которые необходимо синхронизировать. Так указывается конфигурация трансформера ts_updated_at:

"transforms.ts_updated_at.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value","transforms.ts_updated_at.target.type": "Timestamp", "transforms.ts_updated_at.field": "updated_at", "transforms.ts_updated_at.format": "yyyy-MM-dd'T'HH:mm:ssXXX",

Deploy


В каждой компании деплой происходит по-разному: где-то используют Jenkins, где-то Gitlab CI или Bitbucket Pipelines, а кто-то пишет скрипты.

С Kafka Connect вы будете деплоить точно так же, как и в случае с другими сервисами в вашей компании.

Как я отмечал, Kafka Connect это отдельное stateless-приложение. Оно не зависит от Kafka-брокера и даже от версии Kafka. Если у вас уже есть Kafka старой версии, можно использовать новую версию Kafka Connect. Я рекомендую это и сделать. Например, мы использовали последнюю на тот момент версию Kafka Connect 2.5.0 с Kafka-брокером 0.10.х.

Поэтому нет каких-то общих советов и нюансов, как деплоить сервисы. Расскажу, как это происходит у нас.

Deploy Kafka Connect в Delivery Club


Kubernetes

Перед запуском в стейдж мы экспериментировали локально. Создавали свой Docker-образ на основе cp-kafka-connect, куда просто добавляли свои коннекторы.

Для стейджа было достаточно из этого образа собрать контейнер и выложить в Kubernetes, что мы и сделали.

Отмечу только, что 2 Гб памяти поду под Kafka Connect не хватает, и у нас поды по 4 Гб.

Production

На проде у нас внедрение совпало с внедрением нового кластера Kafka-брокеров. Мы приняли специфическое решение поднимать Kafka Connect на тех же серверах, где будут находиться Kafka-брокеры. Для этого использовали rpm-пакет от Confluent.

Сами настройки конфигов мы храним в репозитории. У нас есть несколько скриптов, которые позволяют управлять коннекторами: создавать, останавливать, перезапускать их.

Но это уже отдельная история как работать с Kafka Connect в проде, которая зависит от инфраструктуры компании.

Что нам дало использование Kafka Connect


Мы не стали писать множество продьюсеров в монолите для более чем 600 таблиц. По приблизительным подсчётам, это сэкономило нам более месяца работы пары разработчиков. И, конечно же, снизило возможность наделать множество ошибок в монолите. То есть мы избавились от потенциальных падений приложения.

Это позволило написать новый сервис выдачи ресторанов силами одной команды за один месяц.

Другие команды в компании тоже пользуются нашими топиками. Оценить выгоду очень сложно, но точно ясно: это позволило нам разрабатывать новую функциональность, не завязываясь на данные, источником которых является только монолит.

Мы сняли нагрузку с самой нагруженной нашей части база данных монолита. Это примерно 150 RPS запросов к базе. И синхронизируем более 40 таблиц со скоростью 300 RPS.

Также мы разделил ответственность сервисов, что является первым шагов к разделению доменной области.

Резюме


Я очень рад, что вам удалось добраться до конца. В этой статье вы:

  • познакомились с общими принципами работы с Kafka Connect;
  • узнали, как запустить приложение Kafka Connect в разных режимах;
  • разобрались, как запускать и настраивать коннекторы для работы с базой и Kafka.

И я рад, что вас не испугал внушительный размер статьи, и рассчитываю, что вы будете обращать к ней в качестве примера работы с Kafka Connect и краткого справочника.
Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru