Русский
Русский
English
Статистика
Реклама

Kafka

Неужели нельзя обойтись без кафок и рэббитов, когда принимаешь 10 000 ивентов в секунду

21.01.2021 20:07:17 | Автор: admin

Однажды я вел вебинар про то, как принимать 10 000 ивентов в секунду. Показал вот такую картинку, зрители увидели сиреневый слой, и началось: Ребят, а зачем нам все эти кафки и рэббиты, неужели без них не обойтись? Мы и ответили: Зачем-зачем, чтобы пройти собес!

Очень смешно, но давайте я все-таки объясню.


Мы можем принимать ивенты сразу в зеленой области и заставить наши приложения писать их в кликхаус.

Но кликхаус любит, когда в него пишут сообщения пачками

Другими словами, в него лучше запихнуть миллион сообщений, вместо того чтобы писать по одному. Kafka, Rabbit или Яндекс.Кью выступают как буфер, и мы можем контролировать с его помощью входящую нагрузку.

Как бывает: в одну секунду пришло 10 тысяч ивентов, в следующую тысяча, в другую 50 тысяч. Это нормально, пользователи рандомно включают свои мобильные приложения. В таком случае в кликхаус напрямую будет заходить то 2 тысячи, то 10 тысяч сообщений. Но с помощью буфера вы можете подкопить сообщения, потом достать из этой копилки миллион ивентов и направить в кликхаус. И вот она желанная стабильная нагрузка на ваш кластер.

Это все история про очереди

Во-первых, очереди можно использовать для передачи сообщений между различными сервисами.

Например, для бэкграунд задач. Вы заходите в админку магазина и генерируете отчет по продажам за год. Задача трудоемкая: нужно прочитать миллионы строк из базы, это хлопотно и очень долго. Если клиент будет висеть постоянно с открытым http-коннектом 5, 10 минут связь может оборваться, и он не получит файл.

Логично выполнить эту задачу асинхронно на фоне. Пользователь нажимает кнопку сгенерировать отчет, ему пишут: Все окей, отчет генерируется, он придет на вашу почту в течение часа. Задача автоматически попадает в очередь сообщений и далее на стол воркера, который выполнит ее и отправит пользователю на ящик.

Второй кейс про кучу микросервисов, которые общаются через шину.

Например, один сервис принимает ивенты от пользователей, передает их в очередь. Следующий сервис вытаскивает ивенты и нормализует их, к примеру, проверяет, чтобы у них был валидный e-mail или телефон. Если все хорошо, он перекладывает сообщение дальше, в следующую очередь, из которой данные будут записываться в базу.

Еще один поинт это падение дата-центра, в котором хостится база.

Конец, ничего не работает. Что будет с сообщениями? Если писать без буфера, сообщения потеряются. Кликхаус недоступен, клиенты отвалились. До какого-то предела выручит память, но с буфером безопаснее вы просто взяли и записали сообщения в очередь (например, в кафку). И сообщения будут храниться там, пока не закончится место или пока их не прочитают и не обработают.


Как автоматически добавлять новые виртуалки при увеличении нагрузки

Чтобы протестить нагрузку, я написал приложение и протестил автоматически масштабируемые группы.

Мы создаем инстанс-группу. Задаем ей имя и указываем сервисный аккаунт. Он будет использоваться для создания виртуалок.

resource "yandex_compute_instance_group" "events-api-ig" {  name               = "events-api-ig"  service_account_id = yandex_iam_service_account.instances.id

Затем указываем шаблон виртуалки. Указываем CPU, память, размер диска и т.д.

instance_template {    platform_id = "standard-v2"    resources {      memory = 2      cores  = 2    }    boot_disk {      mode = "READ_WRITE"      initialize_params {        image_id = data.yandex_compute_image.container-optimized-image.id        size = 10      }

Указываем, к какому сетевому интерфейсу его подрубить.

}    network_interface {      network_id = yandex_vpc_network.internal.id      subnet_ids = [yandex_vpc_subnet.internal-a.id, yandex_vpc_subnet.internal-b.id, yandex_vpc_subnet.internal-c.id]      nat = true    }

Самое интересное это scale_policy.

Можно задать группу фиксированного размера fixed scale с тремя инстансами A, B, C.

scale_policy {    fixed_scale {      size = 3    }  }  allocation_policy {    zones = ["ru-central1-a", "ru-central1-b", "ru-central1-c"]  }

Либо использовать auto_scale тогда группа будет автоматически масштабироваться в зависимости от нагрузки и параметров.

scale_policy {auto_scale {    initial_size = 3    measurment_duration = 60    cpu_utilization_target = 60    min_zone_size = 1    max_size = 6    warmup_duration = 60    stabilization_duration = 180}

Главный параметр, на который надо обратить внимание, это cpu utilization target. Можно выставить значение, при превышении которого Яндекс.Облако автоматически создаст нам новую виртуалку.

Теперь протестируем автомасштабирование при увеличении нагрузки

Наше приложение принимает различные ивенты, проверяет джейсонку и направляет в кафку.

Перед нашей инстанс-группой стоит load-балансер. Он принимает все запросы, которые приходят на адрес 84.201.147.84 на порту 80, и направляет их на нашу инстанс-группу на порт 8080.

У меня есть виртуалка, которая с помощью Yandex.Tank делает тестовую нагрузку. Для теста я установил 20 тысяч запросов в течение 5 минут.


Итак, нагрузка пошла.

Сначала все ноды будут загружены во всех трех зонах (A, B и C), но когда мы превысим нагрузку, Яндекс.Облако должно развернуть дополнительные инстансы.

По логам будет видно, что нагрузка выросла и в каждом регионе количество нод увеличилось до двух. В балансировку добавилась еще одна машина, количество инстансов тоже везде увеличилось.

При этом у меня был интересный момент. Один инстанс, который находится в регионе С, записывал данные (от момента приема данных до записи) за 23 миллисекунды, а у инстанса из региона А было 12,8 миллисекунд. Такое происходит из-за расположения кафки. Кафка находится в регионе А, поэтому в нее записи идут быстрее.

Ставить все инстансы кафки в одном регионе не надо.

Когда добавилась еще одна машина, новая нагрузка спала, показатель CPU вернулся к норме. Полную аналитику по тестовому запуску можно посмотреть по ссылке: overload.yandex.net/256194.


Как написать приложение для работы с очередями и буферами обмена

Приложение написано на golang. Сначала мы импортируем встроенные модули.

package mainimport (    "encoding/json"    "flag"    "io"    "io/ioutil"    "log"    "net/http"    "strings")

Затем подключаем github.com/Shopify/sarama это библиотека для работы с кафкой.

Прописываем github.com/prometheus/client_golang/prometheus, чтобы метрики передавались в API Metrics.

Также подключаем github.com/streadway/amqp для работы с rabbitmq.

Затем следуют параметры бэкендов, в которые мы будем записывать.

var (    // Config options    addr     = flag.String("addr", ":8080", "TCP address to listen to")    kafka    = flag.String("kafka", "127.0.0.1:9092", "Kafka endpoints")    enableKafka    = flag.Bool("enable-kafka", false, "Enable Kafka or not")amqp    = flag.String("amqp", "amqp://guest:guest@127.0.0.1:5672/", "AMQP URI")enableAmqp    = flag.Bool("enable-amqp", false, "Enable AMQP or not")sqsUri    = flag.String("sqs-uri", "", "SQS URI")sqsId    = flag.String("sqs-id", "", SQS Access id")sqsSecret    = flag.String("sqs-secret", "", "SQS Secret key")enableSqs    = flag.Bool("enable-sqs", false, "Enable SQS or not")        // Declaring prometheus metrics    apiDurations = prometheus.NewSummary(        prometheus.SummaryOpts{            Name:       "api_durations_seconds",            Help:       "API duration seconds",            Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},        },    )

Адрес кафки (строка).

Включить кафку или нет поскольку приложение может писать в несколько разных бэкендов.

В приложении реализована возможность работы с тремя очередям.

Первое это кафка.

Второе amqp для рэббита.

И третья очередь sqs для Яндекс.Кью.

Дальше мы открываем и задаем общие глобальные переменные для работы с нашим бэкендом. Прописываем настройки prometheus для отображения и визуализации.

В main мы включаем кафку, рэббит и создаем очередь с названием Load.

И если у нас включен sqs, мы создаем клиент для Яндекс.Кью.

Дальше наше приложение по http принимает несколько инпоинтов:

/status просто отдает okey, это сигнал для load-балансера, что наше приложение работает.

Если вы кидаете запрос на /post/kafka, ваша джейсонка попадет в кафку. Также работают /post/amqp и /post/sqs.

Как работает кафка

Кафка простой, некапризный и очень эффективный инструмент. Если вам нужно быстро принять и сохранить много сообщений, кафка к вашим услугам.

Как-то на одном из проектов важно было уложиться в маленький бюджет. И вот представьте, мы берем самые дешевые машины без SSD (а кафка пишет последовательно и читает последовательно, так что можно не тратиться на дорогие диски), ставим кафку и zookeeper. Наше скромное решение на три ноды спокойно выдерживает нагрузку 200 тысяч сообщений в секунду! Кафка это про поставил и забыл, за пару лет работы кластер ни разу нас не потревожил. И стоил 120 евро в месяц.

Единственное, что нужно запомнить кафка очень требовательна к CPU, и ей очень не нравится, когда кто-то рядом поджирает проц. Если у нее будет сосед под боком, она начнет тормозить.

Кафка устроена так: у вас есть topic, можно сказать, что это название очереди. Каждый topic бьется на части до 50 partitions. Эти партиции размещаются на разных серверах.

Как вы видите на схемке, topic load разбит на 3 партиции. Partition 1 оказывается на Kafka 1, вторая партиция на кафка 2, третья на 3. Тем самым нагрузка полностью распределяется. Когда кластер начинает принимать нагрузку, сообщения пишутся в один топик, а кафка раскидывает их по партициям, гоняет их по кругу. В итоге все ноды нагружаются равномерно.

Можно заморочиться и разбить топик на 50 партиций, поставить 50 серверов и расположить на каждом сервере по 1 партиции нагрузка распределится на 50 нод. И это очень круто.

Партиции могут реплицироваться благодаря zookeeper. Кафке необходимо минимум 3 ноды зукипера. Например, вы хотите, чтобы ваша партиция реплицировались на 2 ноды. Указываете репликейшн фактор 2 и каждая партиция будет закинута 2 раза на рандомные хосты. И если ваша нода упадет, то благодаря зукиперу кафка это увидит: ага, первая нода в дауне, кафка 2 заберет себе первую партицию.

Как я разворачивал кафку с помощью Terraform

В репозитории у нас есть terraform-файл, он называется kafka.tf .

Вначале мы поднимем 3 зукипера: resource yandex compute instance zookeeper count = 3.

Потом находим zookeeper_deploy, который деплоит наш зукипер. Хорошо, если он будет вынесен на отдельные машины, где кроме него ничего нет. Далее собираем айдишники нод и генерируем файл. Запускаем ansible для настройки зукипера.

Кафку поднимаем аналогично зукиперу и, что важно, после него.

Как работает RabbitMQ

Благодаря тому, что кафка по сути просто сохраняет сообщения на диск и по запросу отдает клиенту с нужного места, она очень и очень шустрая. Производительность рэббита значительно ниже, но он напичкан фичами под завязку! Рэббит пытается сделать очень многое, что естественным образом влечет за собой потребление ресурсов.

Рэббит уже не так прост тут вам и exchanges с роутингом, и куча плагинов для delayed messages, deadletter и прочего хлама. За сообщениями следит сам кролик. Как только консьюмер подтвердил обработку сообщения, оно удаляется. Если консьюмер отвалился посередине рэббит вернет сообщение в очередь. В общем, хороший комбайн, когда нужно перекидывать сообщения между сервисами. Цена этого производительность.

Рэббит практически все делает внутри себя. В нем есть много встроенных инструментов, можно подключать разные плагины, много настроек для работы с сообщениями и очередями.

Если вам нужно перекидывать сообщения между сервисами в небольшом количестве ваш выбор однозначно RabbitMQ. Если вам необходимо быстро сохранять кучу событий метрики от клиентов, логи, аналитика и т.д. ваш выбор kafka. Подробнее о сравнении двух инструментов можно прочитать в моей статье.

И еще: рэббиту не нужен зукипер.

Подробнее..

Доступны бесплатные уроки видеокурса по Apache Kafka

26.02.2021 06:14:08 | Автор: admin


Мы открыли доступ к базовым темам курса по Apache Kafka, начать учиться можно уже сейчас.


В программе две теоретические темы Введение и Базовые основы технологии и практическая тема Установка Kafka. В ней поработаем с технологией руками:


  1. Развернём Kafka в самом простом её варианте с одним брокером и одной нодой ZooKeeper.
  2. Запишем и прочитаем сообщения, посмотрим в конфиги и увидим, как данные хранятся на диске.

Спикеры курса


  • Анатолий Солдатов, Lead Engineer в Авито;
  • Александр Миронов, Infrastructure Engineer в Stripe, ex-Booking.

Пара отзывов про базовые темы

1.
Спасибо, действительно толково объяснено, вспомнил и узнал некоторые нюансы! Как вариант, посмотрел бы полный курс, без практики и стендов, т.к. я разработчик, а не администратор, для расширения кругозора и понимания нюансов использования тула.


Хорошо бы файл с презентацией заранее показать, а то сидел делал лишние скриншоты со схемами :) + Показать запуск команд на видео, наверное, важно, но здорово, что вы сделали текстовую транскрипцию. Лично мне гораздо проще прочитать ее в такие моменты. + Отличное качество контента, видео/звук/презентации, приятно слушать и смотреть преподавателей. Молодцы!


2.
Очень крутой курс, который дает минимальное понимание, что такое Kafka и как с ней работать. Очень крутые спикеры, которых реально интересно слушать. Информация не скучная и хочется продолжить изучение после каждого этапа/шага. Жаль, что так мало вошло в бесплатную версию. Очень бы хотелось получить больше материала в виде видео, даже без практических заданий и обеспечения тестовых стендов, аналогично тому, как было сделано с курсом Kubernetes База, который выходил на YouTube в открытом доступе. В остальном, курс очень интересный и данную тему хочется изучать глубже. Спасибо за материал и вашу работу!


Доступ к курсу придёт после регистрации: https://slurm.io/kafka


Релиз всех тем видеокурса будет 7 апреля. До релиза действует цена предзаказа 40 000 руб. вместо 50 000 руб.

Подробнее..

Перевод Как Apache Kafka поддерживает 200К партиций в кластере?

02.03.2021 04:06:51 | Автор: admin


В Kafka топик может содержать множество партиций, между которыми распределяются записи. Партиции это единицы параллелизма. В целом, чем больше партиций, тем выше пропускная способность. Однако есть некоторые факторы, которые стоит учитывать, когда в кластере Kafka много партиций. Рад сообщить, что в Apache Kafka, начиная с версии 1.1.0, было существенно увеличено количество партиций, которые может поддерживать один кластер Kafka с точки зрения деплоймента и доступности.


Чтобы понять это улучшение, будет полезно повторить базовую информацию о лидерах партиций и контроллере. Во-первых, каждая партиция может иметь несколько реплик для большей доступности и устойчивости. Одна из реплик назначается лидером, и все запросы клиента обслуживаются этой ведущей репликой. Во-вторых, один из брокеров в кластере выступает в качестве контроллера, который управляет всем кластером. В случае отказа брокера контроллер отвечает за выбор новых лидеров партиций на отказавшем брокере.


Брокер Kafka по умолчанию выполняет контролируемое отключение, чтобы минимизировать сбои в обслуживании клиентов. Контролируемое отключение проходит следующие этапы. (1) Сигнал SIG_TERM отправляется брокеру для завершения работы. (2) Брокер отправляет запрос контроллеру, уведомляя, что он готов к отключению. (3) Контроллер затем меняет лидеров партиций на этом брокере на других брокеров и сохраняет эту информацию в ZooKeeper. (4) Контроллер отправляет информацию о новых лидерах другим брокерам в кластере. (5) Контроллер отправляет выключающемуся брокеру положительный ответ на запрос, и брокер, наконец, завершает свой процесс. Таким образом это никак не влияет на клиентов, потому что их трафик уже перенаправлен на других брокеров. Этот процесс изображен на Рисунке 1. Заметьте, что шаги (4) и (5) могут происходить параллельно.



Рис. 1. (1) Инициация отключения на брокере 1; (2) брокер 1 отправляет запрос о контролируемом отключении контроллеру на брокере 0; (3) контроллер записывает новых лидеров в ZooKeeper; (4) контроллер отправляет информацию о новых лидерах брокеру 2; (5) контроллер отправляет положительный ответ брокеру 1.


До версии Kafka 1.1.0 во время контролируемого отключения контроллер перемещает лидеров по одной партиции за раз. Для каждой партиции контроллер выбирает нового лидера, параллельно записывает его в ZooKeeper и отправляет нового лидера другим брокерам через удаленный запрос. В этом процессе есть несколько недостатков. Во-первых, у синхронной записи в ZooKeeper более высокое время задержки, что замедляет процесс контролируемого отключения. Во-вторых, отправка нового лидера по партиции за раз добавляет много маленьких удаленных запросов к каждому брокеру, что может вызвать задержку обработки новых лидеров.


Начиная с Kafka 1.1.0 и далее контролируемое отключение происходит значительно быстрее. Во-первых, при записи в ZooKeeper используется асинхронный API. Вместо того чтобы записывать лидера для одной партиции, ждать, пока этот процесс завершится, а потом записывать следующего, контроллер асинхронно размещает лидеров для многих партиций в ZooKeeper, а потом ждет завершения процесса. Это позволяет обрабатывать запросы на пайплайне между брокером Kafka и сервером ZooKeeper и снижает общее время задержки. Во-вторых, информация о новых лидерах отправляется батчами. Вместо одного удаленного запроса для каждой партиции контроллер отправляет один удаленный запрос с лидером от всех затронутых партиций.


Время обработки отказа контроллером также было значительно улучшено. Если случается отказ контроллера, кластер Kafka автоматически выбирает контроллером другого брокера. Прежде чем иметь возможность выбрать лидеров партиций свеженазначенный контроллер должен загрузить состояние всех партиций в кластере из ZooKeeper. Если произошел серьезный сбой контроллера, окно недоступности партиции может длиться столько, сколько продолжается срок действия сессии ZooKeeper, плюс время перезагрузки состояния контроллера. Поэтому сокращение времени перезагрузки состояния улучшает доступность в таком редком случаем. До версии 1.1.0 в Kafka для перезагрузки используется синхронный API ZooKeeper. Начиная с версии 1.1.0 это также заменено на асинхронный API для сокращения времени задержки.


Мы провели тесты для оценки улучшений по времени контролируемого отключения и перезагрузки контроллера. Для обоих тестов мы загрузили набор из пяти узлов ZooKeeper на выделенных серверах.


Для первого теста мы подготовили кластер Kafka с пятью брокерами на отдельных серверах. В этом кластере мы создали 25 000 топиков, в каждом топике по одной партиции и две реплики, в общем получив 50 000 партиций. Таким образом на каждом брокере было 10 000 партиций. Затем мы замерили время, которое понадобилось для контролируемого отключения. Результаты представлены в таблице ниже.


Версия Kafka 1.0.0 Kafka 1.1.0
Время контролируемого отключения 6,5 минут 3 секунды

Большую часть улучшений дает исправление затрат на журналирование, при котором проводятся ненужные записи для каждой партиции в кластере при смене лидера одной партиции. Просто исправив затраты на журналирование, мы снизили время контролируемого отключения с 6,5 минут до 30 секунд. Переход на асинхронный API ZooKeeper снизил это время до 3 секунд. Эти улучшения существенно снизили время, необходимое для перезагрузки кластера Kafka.


Для второго теста мы подготовили другой кластер Kafka, состоящий из пяти брокеров, создали 2 000 топиков, в каждом по 50 партиций и одной реплике. В сумме во всем кластере получилось 100 000 партиций. Затем мы замерили время перезагрузки состояния контроллера и увидели 100% улучшение (время перезагрузки снизилось с 28 секунд в Kafka 1.0.0 до 14 секунда в Kafka 1.1.0).


Учитывая эти изменения, на поддержку какого количества партиций вы можете рассчитывать в Kafka? Точное число зависит от таких факторов как допустимое окно недоступности, время задержки ZooKeeper, тип хранения на брокере и т.д. В качестве общего правила мы рекомендуем иметь на каждом брокере до 4 000 партиций и до 200 000 партиций в кластере. Основная причина для лимита на кластере заключается в том, что нужно подстраховаться на тот редкий случай серьезного сбоя контроллера, о котором мы писали выше. Обратите внимание, что другие соображения, связанные с партициями, также применимы, и вам может потребоваться дополнительная настройка конфигурации с большим количеством партиций.


Более подробную информацию вы найдете в KAFKA-5642 и KAFKA-5027.


От редакции: Команда Слёрма готовит подробный видеокурс по Apache Kafka. Спикеры Анатолий Солдатов из Авито и Александр Миронов из Stripe. Вводные уроки уже доступны в личном кабинете. Релиз полной версии курса апрель 2021. Подробности.

Подробнее..

Интеграционный слой с Kafka и микросервисами опыт построения операционной CRM контакт-центра торговой сети Пятерочка

04.03.2021 10:11:49 | Автор: admin

Из этого поста вы узнаете, зачем добавлять в интеграционный слой бизнес-логику, что случается, когда не летит Service mesh, и почему иногда костыли лучшее решение проблемы.



Привет Хабр, на связи Иван Большаков архитектор интеграционных решений, эксперт департамента разработки ПО КРОК. Я расскажу, как мы делали интеграционный слой для CRM-системы группы контакт-центров торговой сети Пятерочка.


Всего в системе одновременно находятся десятки тысяч пассивных пользователей с открытыми интерфейсами и сотни активных, которые пишут в чаты, принимают звонки и нажимают на кнопки. Операторы одновременно работают с десятком различных систем


На старте разработки контакт-центры Пятерочки располагались на 7 разных площадках. И все они использовали разные решения для учета звонков: платформы дискретных каналов, обработки обращений и т. д. Решения по обращениям велись в отдельной системе, а уведомления по обращениям делались в ручном режиме.


Между этими системами не было интеграций, так что операторы переключались между окнами, копировали номера телефонов, искали профили клиентов и выполняли массу других рутинных действий.


Все это замедляло обработку обращений и увеличивало время реакции в каждом конкретном случае. Операторы просто не могли решить проблему клиента здесь и сейчас. Вместо этого они тратили силы и время на то, что стоило бы автоматизировать.




Сотрудники контакт-центров справлялись с работой и могли бы продолжать в том же духе, но заказчик подсчитал, во сколько обходится эта рутина, и уже не мог спать спокойно.


Возможные решения



Первое решение, которое мы рассматривали, перенести логику в веб-приложение оператора контакт-центра. То есть, сделать UI, который будет вызывать методы на бэкендах тех или иных систем.


Логичный выход, который решит проблему заказчика, но не дает дополнительных преимуществ.



Другой вариант enterprise service bus. Это классический сценарий интеграции: разработать пользовательский интерфейс, подключить его к опубликованным на интеграционной шине методам и наслаждаться жизнью.


Но ESB в 2020 году уже не самое популярное решение. К тому же, заказчику хотелось реализовать дополнительную функциональность, например, подсистему отчетности, а она требует разработки бэкенда.



Мы решили использовать для интеграционного слоя микросервисы. Они позволяют реализовать дополнительную бизнес-логику и обеспечивают масштабируемость. Их проще распределить по нескольким ЦОД, по сравнению с ESB.


Кроме того, интеграционный слой на базе микросервисов можно передавать постепенно, с развитием на стороне заказчика.


Архитектура интеграционного слоя


Опыт разработки микросервисов подсказывал нам, что не стоит требовать радикальной доработки информационных систем на стороне заказчика.


Нет гарантий того, что к концу проекта состав и интеграционные контракты систем, с которыми мы работаем, останутся прежними. У Пятерочки несколько контакт-центров от разных подрядчиков, и они могут сменить их в любой момент. Так что, интеграционный слой должен минимально влиять на окружающие системы.


Мы использовали стандартный прием спроектировали интерфейсы, написали адаптеры для каждой системы и связали их с нашим ядром через законтрактованный API.


Так как в интеграционный слой попала бизнес-логика, выбор пал на событийно-ориентированную архитектуру. В ней проще следить за согласованностью данных.


Проектируя архитектуру интеграционного слоя, мы старались не переусложнять, ведь микросервисность не самоцель проекта.


Мы не разделяли по сервисам компоненты, которые будут дорабатываться или масштабироваться вместе. Если можно было разрабатывать в составе одного сервиса несколько бизнес-сущностей, мы это делали. И все равно, в итоге получилось больше 30 микросервисов.


Вся сложность во взаимодействиях


С внешними системами было просто. Мы не могли ничего менять, так что использовали те API, которые они предоставляли.



Внутри системы мы выделили два типа взаимодействий: модифицирующие и читающие. Модифицирующие воздействия реализовали при помощи обмена сообщениями через топики в Kafka. Каждому сервису присвоили топик, который начинается с его имени. Сервис отправляет туда сообщения обо всех событиях, произошедших с его объектами:


с бизнес-сущностью, которую обрабатывает сервис user, что-то произошло, вот ее новые данные

Этот топик читают другие сервисы. Они реагируют на изменения и отписываются в свои топики. Так действие пользователя по каскаду распространяется по всей системе.


Это удобно, ведь при необходимости можно перечитать данные из Kafka, и, если что-то пойдет не так, откатить транзакции сразу в нескольких сервисах.


Как видите, вместо оркестрации мы остановились на хореографии. В нашей системе нет центрального сервиса, в котором закожены бизнес-процессы. Они складываются из множества мелких взаимодействий. Кусочек бизнес-логики заложен в каждом сервисе.



Читающие взаимодействия идут по gRPC. Мы выбрали этот протокол не из-за фич типа стриминга (их пока не используем), а потому, что с его помощью удобно управлять контрактами.


Там, где согласованность не нужна, по gRPC идут и модифицирующие взаимодействия. Так мы снизили сложность системы и сэкономили время разработчиков.


Kafka и очереди сообщений


Мы использовали Kafka в качестве брокера сообщений, потому что купились на отказоустойчивость. Между ЦОДами, где расположены системы заказчика, небольшие задержки, так что она работает неплохо. Однако, где-то в середине проекта мы поняли, что просчитались.



Потребовалась приоритизация сообщений. Нужно было сделать так, чтобы сообщения из одного топика читались, только когда все вычитано из другого. Вообще, приоритизация стандартная вещь, которая есть хоть в каком-то виде в любом брокере, но Kafka не брокер.


Для последовательного чтения пришлось прикрутить некрасивый костыль, который проверяет отставание офсета в первом топике и разрешает чтение из другого. Если бы мы использовали RabbitMQ, было бы проще, но, возможно, возникли бы вопросы относительно производительности.


Взаимодействие с фронтом


Здесь обошлось без rocket science. Взаимодействие с фронтом мы выстроили при помощи WebSocket и REST. WebSocket нужен для передачи событий от бека к фронту и высоконагруженных управляющих взаимодействий, а REST для запроса данных. Правда, пришлось написать отдельный микросервис для фронтенд-разработчиков.



Ходить за несколькими сущностями в три разных сервиса, а потом собирать на фронте какую-то адекватную модель для представления не очень удобно. Куда удобнее использовать composer, который будет брать сразу несколько сущностей со связями между ними и компоновать в один JSON. Для этого отлично подошел Apache Camel. Правда, в следующий раз мы не станем заморачиваться с REST и используем graphQL.


Service mesh и безопасность


Service mesh нужен, чтобы не прописывать вручную всю логику межсервисных взаимодействий. Как правило, для соединения микросервисов мы используем Istio и в этом проекте тоже рассчитывали на эту платформу. Но в этом году все идет как-то не так, как ожидалось.



Мы столкнулись с такой версией OKD, на которую никак не получалось установить Istio. Заказчик собирался в будущем обновить систему оркестрации, но нам нужно было решение здесь и сейчас.


Дедлайн приближался, а инженеры не знали, когда мы справимся с проблемой. Пришлось минимизировать риски и сделать какое-то подобие service mesh, чтобы точно запуститься вовремя.


Конечно, полноценный service mesh на коленке не напишешь, но можно было обойтись без управления трафиком, а мониторинг настроить вручную. Главное, что нам нужно было от service mesh, это безопасность.



Было необходимо идентифицировать запросы от фронта к бекенду, проверять токены пользователей в каждом микросервисе и делать запросы GET, POST и так далее, в зависимости от роли в токене.


Итак, мы начали пилить собственный service mesh. В его основу легла простая идея: если нельзя (не хочется) поместить логику в код сервиса, сделаем внешний sidecar-контейнер по аналогии с Istio, использовав OpenResty и язык Lua.


Написали в контейнере код, который проверяет подпись токена. В качестве параметров к нему подкладываются правила, описывающие, какие запросы пропускать от пользователей с различным ролями.


Аудит


Теперь, когда у нас были sidecar, на них можно было повесить отправку данных в Kafka и реализовать бизнесовый аудит. Так заказчик будет знать, кто из пользователей системы изменял те или иные важные данные.



Существуют штатные библиотеки для работы с Kafka, но и с ними возникли проблемы. Все ломалось, как только для подключения к Kafka требовался SSL. Спас положение старый добрый kafka-console-producer, который лежит в каждом sidecar и вызывается с параметрами из nginx.


Получился рабочий почти-service mesh. Казалось бы, можно было расслабиться, но мы чуть не упустили еще одну важную вещь.


Балансировка gRPC


gRPC то работает на HTTP/2, а этот протокол устанавливает соединение и держит, пока оно не порвется. Возможна ситуация, когда инстанс сервиса A намертво прицепится к одному из инстансов сервиса B и будет направлять все запросы только на него. Тогда от масштабирования сервиса B не будет толка. Второй инстанс сервиса B останется не нагружен и будет впустую тратить процессорное время и память.



Мы могли сделать полноценную балансировку на клиентской стороне, но было бы неправильно тратить на это время. После внедрения полноценного service mesh эти наработки потеряют актуальность.


Поэтому мы реализовали перехват и подсчет всех вызовов gRPC на уровне кода на стороне клиента. Когда счетчики подходят к определенному значению, вызывается метод, приводящий к переустановке соединения, и балансировщик OKD задействует незагруженный инстанс сервиса B.


Конечно, не стоит повторять это идеальных условиях, но если возникли неожиданные препятствия, и нужно срочно запуститься, это решение имеет право на жизнь. Такая балансировка работает. Графики нагрузки по сервисам ровные.


Управление версиями


Напоследок, несколько слов о том, как мы управляли версиями кода.


В проекте задействованы Spring, Maven и Java. Каждый микросервис хранится в собственном микрорепозитории. В первую очередь потому что мы передавали систему заказчику по частям, сервис за сервисом.


Поэтому единой версии коммита для всей системы нет. Но, когда шла активная разработка, нам нужен был механизм контроля версий. Без него систему не получится целостно протестировать, выкатить на стейдж, а затем и на прод.



Прежде всего, мы стали собирать сервисы в образы, которые тегируются коротким хешем коммита. А внутри Maven проекта нет версионирования кода, там вместо версии мы использовали заглушку.


Затем мы выделили отдельный репозиторий со списком всех сервисов и их версиями. Коммит в этот репозиторий и назывался релизом. То есть, мы формировали манифест совместимых между собой версий микросервисов. Конечно, не руками, а через самодельные веб-интерфейсы.


Еще один момент, который стоит упомянуть, управление версиями интеграционных контрактов между сервисами.


Если сервис A и сервис B общаются по gRPC, и разработчик сервиса B исправит что-то в протофайле и сломает обратную совместимость, это выстрелит только, когда выйдет в рантайм. Слишком поздно. Поэтому мы храним все интеграционные контракты (протофайлы, классы для сериализации Kafka и т. д.) отдельно и подключаем как саб-модули. Благодаря этому решению, многие возможные проблемы проявляются на этапе компиляции.


Что мы вынесли из проекта


В этом проекте мы вновь убедились в том, как важно заранее обследовать инфраструктуру. Но даже после тщательной проверки нельзя расслабляться. Некоторые проблемы, типа несовместимости конкретной версии OKD с Istio, практически невозможно предвидеть.


В конечном счете мы рады, что не переусердствовали в разделении на бизнес-сущности и не разобрали систему на винтики. Можно было разбить каждый микросервис еще на три, но это ничего не дало бы заказчику. Архитектура операционной CRM контакт-центра и так смотрится достаточно красиво, и мы потратили на разработку в полтора раза меньше времени, чем могли бы.


Если подвести итог, то интеграция без ESB имеет право на жизнь, особенно, когда нужно решить задачу в сжатые сроки.


Если у вас появились вопросы, или вы знаете, как красиво сделать приоритизацию сообщений в Kafka, пишите в комментариях или на почту IBolshakov@croc.ru.

Подробнее..

Аналитика событий на опасном производстве, или зачем Цифровому рабочему Kafka, Esper и Clickhouse

23.03.2021 10:07:12 | Автор: admin

Привет, Хабр! Я Алексей Коняев. Последние пару лет участвую в развитии платформы Цифровой рабочий в роли ведущего java-разработчика.

Представьте, что вы приехали на экскурсию на завод. Там огромная территория, и вы вместе с гидом передвигаетесь на машине. Он рассказывает: Посмотрите направо, здесь новое здание литейного цеха, а вот слева старое здание, которое скоро должны снести... Как вдруг через минуту это старое здание взрывают! Гид, конечно, в шоке, да и вы тоже, но, к счастью, всё обошлось. Спрашивается, какого черта машина с экскурсантами оказалась в месте проведения взрывных работ?! И наш Цифровой рабочий на этот вопрос тоже не ответит, но он поможет вовремя предупредить всех заинтересованных лиц о том, что в геозоне, где сейчас проводятся опасные работы, появились посторонние в машине местного гида.

Если в двух словах, то система позволяет предупреждать опасные ситуации на производстве благодаря носимым устройствам Outdoor/Indoor-навигации и видеоаналитике. Цифровой рабочий может определять местоположение, физическое состояние или опасное поведение людей, строить различную аналитику, в том числе realtime, и выполнять разбор полётов, т.е. воспроизводить историю событий, чтобы можно было выяснить, что привело к нежелательной ситуации.

Дальше расскажу про архитектуру нашей системы, как мы используем Kafka, Esper и Clickhouse и на какие грабли уже наступили.

Вообще делать свой продукт это очень интересно, особенно когда ощущаешь пользу от его применения на промышленных объектах. Там бывает очень опасно работать, но люди привыкли рисковать и, как говорят наши заказчики, иногда теряют чувство самосохранения.

Пример из жизни Цифрового рабочего на пилоте в нефтянке. Работник залез на стремянку, чтобы затянуть вентиль газовой трубы. Резьба вдруг сорвалась, он не удержался и, упав с высоты 5 метров, от удара потерял сознание. А вентиль при этом открылся, и газ начал поступать в помещение. Но датчики падения и удара, которые встроены в носимое устройство (у него был модуль на каске) передали сигналы на сервер. Там эти два сигнала были обработаны на платформе, и алгоритм выявления внештатных ситуаций сформировал событие-тревогу. Оно было передано оператору, а также другим сотрудникам, которые в тот момент были недалеко от пострадавшего и смогли быстро прийти на помощь.

Архитектура

Когда мы только начинали проектировать Цифровой рабочий, решили пойти по пути Событийно-ориентированной архитектуры. Рассуждали так: всё начинается с носимых устройств, или, как мы их называем, меток. Они, по сути, просто умеют передавать с определённой частотой какую-то телеметрию или, другими словами, информацию об изменении своего внутреннего состояния. И вот это изменение состояния метки и есть входное для системы событие.

Далее, нам нужно уметь обрабатывать поток этих входных событий, причём это должен быть не просто последовательный конвейер, а параллельная обработка разными модулями, которые в процессе работы будут порождать новые события, обрабатываемые другими модулями, и т.д. В итоге какие-то события будут передаваться в UI, чтобы отрисовывать перемещение объектов на карте.

В качестве шины или слоя передачи событий выбрали Apache Kafka. На тот момент Kafka уже зарекомендовала себя как зрелый и надежный продукт (мы начинали с версии 2.0.0). Кроме того, выбирали только среди Open-source решений, чтобы удовлетворить требованиям импортозамещения. А с функциональной точки зрения в Kafkа нам понравилась возможность независимо подключать различных консьюмеров к одному и тому же топику, возможность прочитать события из топика ещё раз за период в прошлом, механизм стриминговой обработки Kafka Streams, ну и, конечно, масштабируемость благодаря партиционированию топиков.

Архитектура системы включает следующие компоненты:

  • Есть различные носимые устройства (метки) и системы позиционирования, которые уже существуют на рынке и которые умеют работать с теми или иными девайсами.

  • Для каждой такой системы позиционирования, с которой мы решили интегрироваться, у нас есть свой модуль Адаптер, который публикует в Kafka события с телеметрией от меток.

  • Далее эти события обрабатываются Транслятором, который выполняет первичную обработку (связывание метки с сотрудником, вычисление геозоны, в которой находится сейчас метка и др.).

  • Модуль Complex Event Processing-а (CEP-процессор) обрабатывает события, которые порождает Транслятор; здесь мы занимаемся выявлением внештатных ситуаций, анализируя различные типы событий, в том числе от разных меток.

  • В UI поступают события как от Транслятора (для отрисовки перемещения сотрудников), так и от CEP-процессора (отображение алертов).

  • Для хранения справочных данных, как то список меток, сотрудников, геозон и пр., используем реляционную БД PostgreSQL.

  • А для хранения данных, по которым строится аналитика ClickHouse.

  • Но в ClickHouse напрямую никто не ходит для этого используется модуль Reports, который выполняет обработку запросов к аналитическим данным (запросы на обновление данных виджетов на аналитической панели в UI, запросы на формирование различных отчётов и др.).

  • И ещё есть файловое хранилище S3, где мы храним файлы 3D-моделей и файлы сформированных отчётов.

Ну а теперь давайте расскажу поподробнее про все модули системы, как они устроены внутри.

Интегрируемся со всеми: адаптеры

Основная задача Адаптера взаимодействие с системой позиционирования через её API для того, чтобы получать информацию о координатах метки и значения встроенных в метку датчиков. Например, заряд аккумулятора, статус нажатия тревожной кнопки, статус датчика падения и неподвижности, температура, влажность, уровень CO2 и др.

Системы позиционирования это не наша разработка, а уже существующие на рынке системы, которые, как правило, умеют взаимодействовать с одним каким-то конкретным девайсом. Мы же, подключая через адаптеры такие системы, получаем возможность использовать в Цифровом рабочем большой ассортимент разных меток.

Хотя для некоторых меток мы всё-таки запилили систему позиционирования сами, когда не удавалось найти существующего решения или хотелось поэкспериментировать. А один из экспериментов привел к тому, кто мы разработали свою метку с креплением на каску:

Ещё одна важная особенность адаптеров это сложность с их масштабированием. Когда меток много, то важно успевать обрабатывать информацию от всех, и решение в лоб поставить несколько адаптеров. Но сделать это не всегда просто, т.к. мы можем упереться в ограничения API системы позиционирования, например, когда адаптер с определённой периодичностью дергает рест-апи и получает статус сразу всех меток.

Поэтому основное требование к адаптеру быть максимально производительным, т.е. получил данные в чужом формате, преобразовал к нашему внутреннему и отправил событие в Kafka. Эти события, которые порождает адаптер, мы называем Сырые.

Сырые события бывают такие:

  • TagMovement перемещение метки, содержит координаты;

  • TagInfo телеметрия со значениями датчиков метки;

  • TagAlert события нажатия тревожной кнопки, события падения и удара.

Разделяй и властвуй: топики Kafka

Отдельно хочу остановиться на топиках.

Для каждого типа события в Цифровом рабочем используется свой отдельный топик. Такой подход позволяет:

  1. Индивидуально настраивать retention для каждого топика. Так, например, для сырых событий ретеншн всего 1 сутки, потому что эти события практически сразу будут обработаны, и хранить их долго не нужно (но 1 сутки всё-таки храним на случай сбоя).

  2. Использовать надстройку над Kafka, которую мы сделали, чтобы модули, которым нужно читать или писать из Kafka, могли просто подписываться на события заданного типа, или просто публиковать события, не задумываясь о топике, который определяется автоматически.

  3. Реализовать автоматическую сборку топологии Kafka Streams процессоров если вкратце, то работает она так:

    3.1. в прикладном модуле нужно объявить процессоры, указав тип входного события и тип выходного;

    3.2. также можно указать, будет ли данный процессор использовать состояние;

    3.3. все процессоры это Spring Bean-ы;

    3.4. при запуске приложения сборщик топологии находит все процессоры и собирает их в граф, стыкуя процессоры так, чтобы тип выхода одного подходил под тип входа другого.

Примерно так выглядит топология процессоров Транслятора, о котором речь пойдет ниже:

Шеф-повар: транслятор

Транслятор это модуль, который готовит сырые события, поступающие от адаптеров. При этом то, какой конкретно адаптер был источником события, на этом этапе обработки уже не важно. Все сырые события одинаковые.

Это, кстати, позволило реализовать адаптер-заглушку, который мы используем для отладки и тестирования. С его помощью можно управлять перемещением сотрудников клавишами на клавиатуре, так как бегать каждый раз по офису с реальной меткой не очень прикольно, хотя с точки зрения физкультуры и полезно.

Внутри Транслятора несколько процессоров на Kafka Streams (см. processor-api), причём многие из них используют состояние.

Kafka Streams предоставляет API для работы с состоянием как с Key-Value таблицей. Тут важно понимать, что для каждого ключа входного события процессора существует свое состояние. Ключ у каждого типа события свой. Например, для события перемещения метки это будет серийный номер метки. Это позволяет выполнять обработку очередного события от какой-то конкретной метки с учетом истории обработки событий от этой же метки.

Транслятор решает следующие задачи:

  • Вычисление географических координат. Дело в том, что координаты, которые приходят в сырых событиях, могут содержать не широту и долготу, а, например, смещение по оси X и Y в метрах относительно внутренней системы координат, системы позиционирования. И помня о том, что Адаптеры должны работать максимально быстро, приведением координат к абсолютным или географическим занимается Транслятор.

  • Связывание метки и сотрудника. Здесь процессор Транслятора обращается к реляционной БД за справочной информацией, чтобы определить, какой именно сотрудник сейчас носит эту метку.

  • Определение остановки сотрудника. Здесь мы смотрим, если новые координаты метки не сильно отличаются от предыдущих в течение некоторого таймаута, то значит, сотрудник остановился.

  • Обнаружение потери сигнала от метки. Тут используем такую штуку, как Punctuator это механизм Kafka Streams, который позволяет с заданной периодичностью просматривать состояния процессора по всем ключам. И логика простая: если нашли состояние, в котором время последних полученных координат позже, чем максимально разрешенное, то значит, сигнала от метки не было давно, и следует считать сигнал потерянным.

  • Ещё есть процессоры, которые работают с геозонами. Суть в том, что вся территория и здания размечаются на геозоны. Например, Корпус Альфа, который, в свою очередь, разделяется на геозоны этажей, а каждый этаж на коридоры и кабинеты. И есть процессор, который определяет, что координаты метки попали внутрь той или иной геозоны, а другой процессор выполняет подсчёт количества сотрудников внутри геозон.

События, которые создаются в результате работы Транслятора, мы называем бизнес-события, потому что:

  • эти события уже представляют интерес для пользователей системы события перемещения сотрудников передаются в UI для отрисовки их на карте; также в UI отображается телеметрия метки, которая уже ассоциирована с определённым сотрудником;

  • почти все бизнес-события сохраняются в аналитическую БД;

  • многие бизнес-события используются для выявления внештатных ситуаций;

  • и ещё эти события мы храним в Kafka долго (1 месяц) для того, чтобы иметь возможность воспроизвести их и посмотреть, что происходило в определённый интервал времени в прошлом.

Спасительный кэш при потоковой обработке

Когда мы начали проводить нагрузочное тестирование, то оказалось, что транслятор сильно тормозит, и связано это было с тем, что многие его процессоры при обработке каждого события ходили в БД за справочной информацией. Естественным решением проблемы было кэширование доступа к БД. Но кэш нужен был не совсем уж простой, т.к. информация в справочниках хоть и меняется редко, но всё равно это может произойти в процессе работы. Например, сотрудник потерял метку, и ему выдали другую.

И, кроме того, некоторые процессоры в результате работы могут обновлять какие-то данные в БД. Поэтому нужен был кэш, который будет:

  • обновляться с некоторой периодичностью;

  • периодически сливать изменения в БД.

Также нужно было иметь в виду, что Трансляторы мы точно будет запускать в несколько экземпляров, чтобы масштабировать на нагрузку. И при этом всем нодам Транслятора должен быть доступен одинаковый кэш. Т.е. запаздывание при чтении справочников из БД не страшны, но если в ходе обработки сообщений какой-то процессор меняет значение в кэше, то это новое значение должно быть сразу же доступно всем нодам Транслятора.

Мы сделали выбор в пользу Hazelcast-а, потому что:

  • Его достаточно легко использовать это просто библиотека, которую вы подключаете в проект.

  • Кэши Hazelcast-а автоматически синхронизируются на всех нодах. Но тут нужно быть осторожным если не ограничить область видимости кэша через задание имени группы (в конфиге экземпляра Hazelcast-а), то он может незаметно для вас реплицироваться в каком-нибудь ещё модуле, где вы тоже решили использовать Hazelcast. Т.е. в нашем случае, все кэши, которые нужны Трансляторам и только им, объединены в группу translator.

После добавления кэширования производительность Транслятора выросла на несколько порядков! Цифры получились такие: одна нода, которой выделены ресурсы, эквивалентные инстансу t4g.micro в облаке Amazon EC2 (2CPU + 2Gb), обрабатывала без задержки до 500 входящих сырых событий в секунду. 500 кажется не много, но метки разных производителей передают данные с разной частотой от 3 событий в минуту до 5 событий в секунду. И высокопроизводительные метки, которые дают большую точность, могут использоваться не на всей территории объекта. Таким образом, в худшем случае одна нода Транслятора выдерживает нагрузку от 100 меток, а в лучшем от 10 тысяч.

Высоко сижу далеко гляжу: отображение объектов на карте

UI состоит из двух частей серверная часть и клиент.

Серверная часть подписывается на определённые бизнес-события, в первую очередь события перемещения сотрудников. Эти события через WebSocket передаются на клиента, но предварительно выполняется:

  • фильтрация каждому клиенту отправляются только те события, которые относятся к текущей выбранной оператором геозоне, т.е. если оператор выбрал, например, 7 этаж, то клиент будет получать события только по этому этажу;

  • редукция событий события накапливаются в буфере и отправляются на клиента 1 раз в секунду.

Это нужно для того, чтобы снизить нагрузку на клиента.

Клиентская часть это web-приложение на React-е. Основную рабочую область UI занимает карта с 3D-моделями зданий, которые можно посмотреть в разрезе провалиться на любой этаж и увидеть, что там происходит. Для отрисовки 3D-моделей мы используем библиотеку CesiumJS.

Complex Event Processing

Термин Complex Event Processing (CEP) придумал профессор Стенфордского университета David Luckham. Вкратце определение звучит так: Complex Event Processing это обработка потока различных событий в реальном времени с целью выявления паттернов значимых событий. Ещё часто CEP сравнивают с ESP (Event Stream Processing). И здесь David Luckham выделяет следующие отличия между ними:

ESP это обработка упорядоченного потока события одного типа, которая выполняется, как правило, с целью фильтрации, трансформации событий и/или выполнения над потоком событий каких-либо математических операций, в том числе, агрегация и группировка.

CEP это обработка нескольких потоков различных событий, причём не обязательно упорядоченных (т.е. допускается нарушение порядка поступления событий в сравнении с порядком их возникновения), которая выполняется внутри некоторого окна (временного, например) с учетом причинно-следственной связи между различными событиями.

Например, обработка событий от термометра с целью управления кондиционером (температура стала ниже порога включили обогрев, и наоборот) это ESP. А вот обработка событий от термометра и одновременно от датчика освещённости в течение суток позволяет сделать вывод, что на улице зима.

Но в тоже время, в одной из своих статей David Luckham с коллегой, видя, что современные инструменты Event Stream Processing-а всё больше и больше приобретают возможности CEP-инструментов, делают вывод, что со временем разница между ними будет стерта (см. The Future of Event Stream Analytics and CEP).

CEP-процессор

Давайте перейдем от теории к практике!

Как вы уже поняли, в Цифровом рабочем именно модуль CEP-процессор выполняет сложную обработку событий, которыми в контексте нашей системы являются внештатные ситуации, такие как:

  • Вход сотрудника в геозону, имеющую в данный момент статус опасная.

  • Пульс выше или ниже нормы.

  • Срабатывание двух из трех датчиков: падение/удар/неподвижность (заставлять реагировать оператора на единичный сигнал падения неправильно, т.к. метка просто могла упасть, но, если в течение небольшого промежутка времени пришло сразу несколько таких сигналов, то регистрируется внештатная ситуация).

В качестве CEP-движка мы используем Open-source библеотеку Esper, которую с 2006 года разрабатывает компания EsperTech.

Esper мы выбрали по следующим соображениям:

  • Описание паттернов сложных событий (или правил) выполняется на языке Event Processing Language (EPL), который является расширением стандарта SQL-92; соответственно, правила описываются в декларативном виде, а нам очень хотелось, чтобы эти правила могли понимать не только программисты, но и, например, аналитики (хотя, если правило действительно сложное, то без подготовки его будет трудно понять).

  • Esper мощный инструмент, и достаточно сложные паттерны можно описать в несколько строк на EPL.

  • Есть интеграция с Kafka, которая позволяет описывать правила, оперируя событиями, потребляемыми из Kafka, а результат работы правила также оформлять в виде события и публиковать в Kafka.

Пример реализации правила на Esper-е

Давайте рассмотрим небольшую задачу и её решение на Esper-е. В КРОКе есть свой ЦОД и газотурбинные генераторы, которые используются как резервный источник питания. Эти штуки нужно периодически запускать, но оставлять их включёнными без присмотра надолго нельзя. Предположим, что сотрудник, который их обслуживает, не всегда соблюдает регламент и может отлучиться на более продолжительное время, чем это разрешено.

Задача: если оборудование включено, и в помещении, где оно расположено, нет ни одного сотрудника больше 10 минут, то необходимо сформировать событие-тревогу.

Для отладки решения будем использовать Esper Notebook, в котором можно описать правило на EPL-е и сценарий с входными данными.

Чтобы Esper Notebook понял, что текст это правило, в самом начале нужно написать ключевое слово %esperepl.

Далее, нужно определить схемы или таблицы, строки которых будут представлять входные события, в потоке которых мы будем искать интересующий нас паттерн:

@Description('Кол-во людей в геозоне')create schema PersonsInZone(zoneId string, number int);@Description('Статус устройства (вкл / выкл)')create schema DeviceStatus(deviceId string, zoneId string, turnedOn bool);

Нас интересует, что происходит, когда оборудование включено, поэтому определим контекст, который откроется при включении оборудования, и закроется при его выключении. При этом, для каждой единицы оборудования будет свой собственный контекст:

create context TurnerOnDeviceContext    partition by deviceId from DeviceStatus    initiated by DeviceStatus(turnedOn = true)    terminated by DeviceStatus(turnedOn = false);

Контекст это своего рода окно, в рамках которого мы будет наблюдать входящие события.

// для выражения можно задать имя, чтобы видеть его в логах@Name('Unattended device')// указывая контекст, мы говорим, чтобы данное выражение// выполнялось только для событий из контекстаcontext TurnerOnDeviceContextselect    ds.zoneId,    ds.deviceIdfrom    // в качестве источника событий задаем шаблон,    // который словами можно сформулировать так:    // Устройство включили  потом Все ушли  потом (Прошло 10 минут И Никто не вернулся)    pattern [        ds = DeviceStatus(turnedOn = true)        -> every (            PersonsInZone(zoneId = ds.zoneId and number = 0)            -> (timer:interval(10 minutes)                    and not PersonsInZone(zoneId = ds.zoneId and number > 0)               )        )    ];

Здесь ещё используется ключевое слово every, которое нужно для того, чтобы следующая за ним часть шаблона продолжала находиться даже после того, как один раз целый шаблон уже был найден.

Например, если сотрудник включил генератор, потом ушел, и прошло больше 10 минут, сработает правило, и мы пошлем нотификацию оператору, тот позвонит сотруднику, отругает его, и тот вернется. Но генератор сотрудник так и не выключит, поработает с ним ещё какое то время и, опять всё забыв, снова уйдет больше, чем на 10 минут.

Таким образом, мы получим ситуацию, когда часть шаблона Все ушли ("Прошло 10 минут И Никто не вернулся) будет найден снова, в то время, как первая часть Устройство включили уже была найдена ранее. И, чтобы целый шаблон найти ещё раз, нужно написать every перед второй его чатью.

Теперь нам нужно протестировать наше правило. Для этого используется сценарий (начинается с ключевого слова %esperscenario), в котором можно публиковать входные события, увеличивая текущее время сценария:

%esperscenario// задаем начальное времяt = 2020-12-10 12:00:00.000// публикуем событие в помещение A вошёл 1 сотрудникPersonsInZone = {zoneId=room-A, number=1}// через 1 минуту включаем генератор 1 в помещении Аt = t.plus(1 minute)DeviceStatus = {deviceId=generator-1, zoneId=room-A, turnedOn=true}// через 4 часа сотрудник вышел из помещения А, не выключив генераторt = t.plus(4 hours)PersonsInZone = {zoneId=room-A, number=0}// прошло ещё 10 минут  и должно сработать наше правило!t = t.plus(10 minute)

Если в правиле сработает какое-то выражение, то в блоке сценария мы должны увидеть его вывод. В нашем случае выражение это select, а вывод будет такой:

Unattended device-output={ds.zoneId='room-A', ds.deviceId='generator-1'}

Вот ещё один пример сценария, посложнее:

%esperscenariot = 2020-12-10 12:00:00.000// сотрудник вошёл в помещение АPersonsInZone = {zoneId=room-A, number=1}// через 1 минуту включил генератор 1t = t.plus(1 minute)DeviceStatus = {deviceId=generator-1, zoneId=room-A, turnedOn=true}// через 30 минут вышел, не выключив генераторt = t.plus(30 minutes)PersonsInZone = {zoneId=room-A, number=0}// но через 5 минут вернулсяt = t.plus(5 minute)PersonsInZone = {zoneId=room-A, number=1}// прошло ещё 5 минут  тревоги не должно быть, сотрудник ведь вернулсяt = t.plus(5 minute)// ещё через 3 часа ушёл, а генератор всё также остался включённымt = t.plus(3 hour)PersonsInZone = {zoneId=room-A, number=0}// через 10 минут  тревога!t = t.plus(10 minute)

Этот же пример, но с полноценной интеграцией с Kafka, который можно собрать и запустить, доступен по ссылке digital-worker-architecture.

В двух словах, его отличие в том, что нужно:

  1. настроить движок Esper-а подключить к нему плагины для взаимодействия с Kafka;

  2. зарегистрировать типы классов, которые будут представлять события в Kafka, и десериализатор для них.

И тогда при описании правил можно уже не создавать схемы событий, а использовать имена классов событий. А результат работы выражений (select-ов) оборачивать в создание выходных событий, которые публиковать в соответствующие топики Kafka.

Всех посчитали: регистрация внештатных ситуаций

CEP-процессор при срабатывании правил делает следующее:

  • Регистрирует внештатную ситуацию в журнале событий. Мы предполагаем, что события в этот журнал не должны попадать часто, но если уж попали, то требуется их явная обработка оператором (написать резолюцию, закрыть событие).

  • Формирует событие-нотификацию, которое, как и все остальные события в Цифровом рабочем, публикуются в свой топик в Kafka. На этот топик подписан модуль Нотификаций, который выполняет их маршрутизацию. В результате нотификация передается в UI (оператор видит уведомление) или конкретному сотруднику через тот канал, который для него определён (например, на почту, в Telegram или на метку, которую носит сотрудник, если она поддерживает обратный канал).

Летим по приборам: аналитика

Аналитика доступна оператору в виде панели инструментов, на которую он может вынести интересующие его виджеты, и в виде механизма формирования отчётов.

Виджеты это те же отчёты, у которых есть какие-то свои входные параметры, но результат выводится не в Excel или PDF-файл, а в виде графика или диаграммы. Еще виджеты обновляются раз в 15 секунд, что позволяет оператору видеть актуальные на текущий момент времени данные.

Также существуют отчёты со специализированным отображением информации непосредственно на карте (тепловая карта и маршруты):

Данные, по которым строится аналитика, хранятся в БД ClickHouse. Но прежде, чем попасть в ClickHouse, они проходят дополнительную обработку в модуле Подготовки данных. А, чтобы достать эти данные из БД и передать потребителю (в данном случае, в UI), используется модуль Формирования отчётов.

ClickHouse

К аналитической БД у нас были следующие требования:

  • нужно уметь хранить очень много данных. Наши данные это события, у которых есть ключ и временная метка;

  • нужно быстро выполнять запросы с условием С ПО, т.е. за определённый период времени.

Когда мы выбирали БД, то понимали, что здесь точно нужна не реляционка, а, скорее, NoSql, которая умеет линейно масштабироваться и обеспечивает отказоустойчивость. На момент проектирования Цифрового рабочего у нас был опыт применения NoSql БД Cassandra, но ClickHouse был на слуху и, изучив документацию и посмотрев несколько презентаций от ребят, которые уже успешно используют его в проде, мы тоже решили попробовать.

Из особенностей ClickHouse мы для себя выделили следующие:

  • Это колоночная БД наши события имеют много атрибутов, которые удобно разложить по колонкам.

  • Хорошо подходит для Time-serias данных ClickHouse может партиционировать таблицу по ключу, который является функцией от временной метки события, что позволяет хранить в общей партиции все события полученные, например, за один и тот же день. А это позволяет эффективно выполнять запросы, у которых задан период времени (сперва будут выбраны только нужные партиции, а уже потом только по ним выполняется остальная часть запроса).

  • Есть встроенная интеграция с Kafka ClickHouse напрямую из Kafka умеет загружать данные в таблицы, при этом можно дополнительно выполнять обработку данных непосредственно в момент их загрузки.

  • Есть возможность подключить внешнюю реляционную БД мы подключаем справочники, которые храним в PostgreSql, после чего их можно использовать как обычные таблицы в Sql-выражениях.

  • Богатый набор встроенных аналитических функций например, для формирования Тепловой карты мы используем функцию Квантиль. А ещё мне очень нравится работа с массивами в ClickHouse можно джойнить элементы колонки-массива одной таблицы со строками другой таблицы или трансформировать строки таблицы в колонку-массив.

  • Масштабируемая и отказоустойчивая чем больше нод базы поднимете, тем больше данных можно будет хранить. Плюс есть встроенный механизм сжатия данных.

Что касается схемы БД, то она устроена примерно так:

  • Есть таблицы, которые содержат данные в более или менее исходном виде, т.е. в том, в котором поступают события из Kafka. Эти таблицы используются для формирования большинства отчётов.

  • Но также есть специализированные таблицы, которые хранят данные в ином виде, удобном для формирования каких-то конкретных отчётов.

  • И придерживаемся принципа: данные в БД готовим исходя из сценария их использования (какие именно запросы будем выполнять к БД и с какими условиями фильтрации).

Подготовка данных

Подготовка данных выполняется, по большому счёту, для того, чтобы сделать максимально больше работы заранее, а не в тот момент, когда придет запрос в ClickHouse для формирования отчёта.

Модуль подготовки данных это Kafka Streams приложение, которое обрабатывает бизнес-события и выполняет следующие действия:

  • фильтрация бизнес-событий может быть очень много (например, события перемещения сотрудника), а для некоторых отчётов все они не нужны, и их можно проредить;

  • денормализация и обогащение чтобы при формировании отчёта лишний раз не выполнять join-ы, здесь мы заранее собираем все необходимые данные, например, по ИД сотрудника получаем его ФИО и название должностной позиции;

  • форматирование приведение дат к нужному формату, форматирование дробных чисел, локализация текста и т.п.

Формирование отчётов

Для того, чтобы получить отчёт, заказчик отчёта отправляет в топик Kafka report-request событие-запрос. Здесь мы отходим от принципа один тип события один топик для каждого конкретного отчёта существует свой тип события, который содержит специфичные для данного отчёта параметры. Но при этом все они унаследованы от базового класса ReportRequest.

Эти ReportRequest-ы обрабатывает модуль формирования отчётов:

  • по типу запроса определяется генератор запроса;

  • генератор формирует специфичный для отчёта SQL-запрос в ClickHouse, и, при необходимости, выполняет дополнительную обработку результата SQL-запроса уже на стороне java-кода;

  • результат работы генератора, если нужно, преобразуется в Excel или Pdf файл, который сохраняется в файловое хранилище;

  • создается событие-ответ, унаследованное от базового ReportResponse-а, которое содержит данные отчёта или ссылку на файл в хранилище.

  • в событие-ответ добавляется информация о событии-запросе, чтобы заказчик отчёта мог связать ответ с ранее отправленных запросов, после чего оно публикуется в топик report-response.

Мы выбрали такую схему формирования отчётов, потому что:

  • хотели сделать запрос отчётов асинхронным, чтобы не блокировать заказчика отчёта, пока тот формируется (обычно отчёты формируются быстро, но если задать очень большой временной интервал, за который нужно получить данные, то придется подождать);

  • хотели предоставить возможность формировать отчёты любым клиентам, у которых будет доступ к соответствующим топикам Kafka; это могут быть как модули Цифрового рабочего, так и внешние системы, с которыми мы интегрируемся;

  • использование Kafka автоматически позволяет масштабировать модули формирования просто поднимаем несколько узлов и получаем балансировку нагрузки (каждый узел обрабатывает свои партиции топика report-request).

Файловое хранилище

Изначально мы не очень понимали, какой именно интерфейс файлового хранилища нам подойдет, но было понятно, что нужно уметь хранить в нем:

  • 3D-модели зданий, которые загружаются на клиента при отрисовке карты.

  • Сформированные отчёты.

  • Возможно, какой-то ещё статический контент.

Сначала выбрали Apache Sling. Это контент-система, которая хранит данные в иерархической структуре и позволяет обращаться к файлам по URL-у, в котором отражен путь до соответствующего узла в иерархии. С точки зрения работы клиента, Sling довольно удобное решение просто загружаешь файл по URL-у напрямую. Со стороны backend-а тоже особо проблем не было java-api довольно простой.

Но через некоторое время мы столкнулись с тем, что Sling начал тормозить и тем сильнее, чем больше файлов в нем хранилось. Оказалось, для нашего случая с получением списка всех отчётов, которые сформировал пользователь, мы заставляли Sling обходить какой-то кусок его иерархии файлов и вычитывать каждый файл, чтобы посчитать его размер. Варианты, как это оптимизировать, виделись такие:

  • Научиться получать большой список файлов из Sling-а так, чтобы не вычитывать каждый файл всякий раз для получения его размера;

  • Попробовать другое хранилище.

Мы пошли сразу двумя путями, и второй путь оказался пройден быстрее, а также давал ещё другие преимущества.

Заменили Sling на S3. У него не было проблем с получением большого списка файлов, а как бонус мы перестали поднимать свое файловое хранилище на стендах, а стали использовать S3, поднятое в облаке КРОК. Т.е. немного упростили процесс развертывания системы.

Как это было: потоки событий и воспроизведение истории

Если посмотреть на все модули системы вместе, то можно связать их потоками событий, которые протекают между ними:

Потоки событий, которые приходят в UI, используются для отображения перемещения сотрудников, отрисовки статуса геозон, вывода предупреждений и другой информации, которая отражает состояние дел в текущий момент времени. Но когда происходит внештатная ситуация, то бывает очень полезно посмотреть, а что происходило в тот момент, когда эта ситуация возникла, и за некоторое время до неё.

Для этого в Цифровом рабочем используется механизм воспроизведения истории, где оператор может задать период времени в прошлом, и система начнет воспроизводить перемещения сотрудников и прочую информацию, как если бы это происходило сейчас. При этом оператор может изменять скорость воспроизведения.

Этот механизм реализован следующим образом:

  • UI подписывается на нужные ему события с помощью нашего KafkaConsumer-а, который инкапсулирует взаимодействие с Kafka и имеет два режима работы: REAL_TIME и PAST_TIME.

  • Если включен режим REAL_TIME, то KafkaConsumer просто передает события, которые в данный момент поступают в топики Kafka.

  • А когда включен режим PAST_TIME (при его включении необходимо задать период времени в прошлом), то KafkaConsumer вычитывает события из топиков, начиная с offset-а, соответствующего началу периода, и заканчивая offset-ом конца периода. При этом KafkaConsumer делает задержки между событиями, чтобы передавать их потребителю с тем же темпом, с которым они поступали в реальном времени.

Плюсы этого решения такие:

  • для истории не нужно отдельного хранилища вся история хранится в Kafka; при этом retention можно задавать большим только для тех топиков, которые нужны для воспроизведения истории;

  • KafkaConsumer с двумя режимами работы обеспечивает прозрачность для потребителя, основная логика которого никак не зависит от выбранного режима.

Но есть и минус: в начальный момент времени периода в прошлом мы не можем получить полное состояние системы. Предположим, в 12:00 в кабинет вошло 10 сотрудников, а к 13:00 у половины из них сел аккумулятор метки, и она перестала передавать события со своим местоположением. Тогда воспроизводя историю с 13:00, мы увидим в этом кабинете только 5 сотрудников, у которых метки продолжили работать, т.к. только от этих меток в топиках будут события, начиная с 13:00.

Я подозреваю, что решить эту проблему только на базе Kafka не получится. Но сейчас текущее решение удовлетворяет нашим потребностям, поэтому мы его не трогаем. А как вариант на будущее, можно будет перенести хранение истории в ClickHouse и сделать Consumer, который будет для REAL_TIME-режима ходить в Kafka, а для PAST_TIME в ClickHouse.

А что у вас?

Несмотря на то, что Цифровой рабочий уже много чего умеет, продукт продолжает развиваться. Мы постоянно пилим какие-то новые фичи, подключаем новые системы позиционирования, что-то переделываем и оптимизируем, экспериментируем с разными технологиями потоковой обработки, прикручиваем ML (вот здесь, например, о том, что сделали интеграцию с системой видеоаналитики). В общем, куча идей и планов на будущее!

Было бы интересно услышать в комментариях, разрабатываете ли вы что-то подобное, например, в области IoT с потоковой обработкой данных, какие технологии применяете и как?

И, кстати, вот: Оцифровка рабочего в режиме реального времени - мой доклад на SmartData conf.

Подробнее..

Перевод Pulsar vs Kafka сравнение и мифы

26.03.2021 08:19:22 | Автор: admin


Pulsar или Kafka что лучше? Здесь мы обсудим плюсы и минусы, распространенные мифы и нетехнические критерии, чтобы найти лучший инструмент для ваших задач.


Обычно я рассказываю об Apache Kafka и ее экосистеме. О Pulsar за последние годы меня спрашивали только коммитеры и авторы Pulsar. Они задавали сложные технические вопросы, чтобы показать, что Kafka не идет ни в какое сравнение с Pulsar. На Reddit и подобных платформах разгораются яростные и очень субъективные споры на эту тему. Я поделюсь своей точкой зрения, основанной на многолетнем опыте работы со стриминговыми опенсорс-платформами.


Сравнение технологий нынче в моде: Kafka vs. Middleware, Event Streaming и API Platforms


Цель сравнения технологий помочь людям подобрать подходящее решение и архитектуру для своих потребностей. Универсального решения не существует. И это не дело вкуса. Просто для каждой задачи есть свой инструмент.


При этом такие сравнения почти всегда субъективны. Даже если автор не работает на вендора и представляется независимым консультантом, скорее всего, у него все равно есть любимчики, даже если он сам себе в этом не признается. Однако полезно посмотреть на инструменты с разных точек зрения. Поскольку Apache Pulsar сейчас обсуждают, я решил поделиться своим мнением и сравнить его с Kafka. Я работаю в Confluent, а мы лучшие эксперты по Apache Kafka и ее экосистеме. И все же я постараюсь быть максимально объективным и оперировать голыми фактами.


Опенсорс-фреймворки и коммерческие программы постоянно сравнивают. Я и сам делал такие сравнения в своем блоге и на других платформах, например InfoQ: Сравнение платформ интеграции, Выбор подходящего ESB для ваших потребностей, Kafka vs. ETL / ESB / MQ, Kafka vs. Mainframe и Apache Kafka и API Management / API Gateway. Просто заказчики хотели понять, какой инструмент поможет решить те или иные задачи.


Если говорить о сравнении Pulsar и Kafka, ситуация немного отличается.


Зачем сравнивать Pulsar и Kafka?


Реальные и потенциальные заказчики редко спрашивают о Pulsar. Хотя, если честно, в последние месяцы чуть чаще где-то на каждой 15-й или 20-й встрече. Потому что фичи и варианты использования у них частично совпадают. Хотя, по-моему, все дело в паре статей о том, что Pulsar якобы в чем-то лучше Kafka. В пользу противоположной точки зрения нет никаких фактов и очень мало материала.


Я не видел ни одной организации, которая бы всерьез задумывалась о Pulsar в продакшене, хотя по всему миру множество пользователей, которым нужна технология распределенной обработки сообщений, вроде Kafka или Pulsar. По-моему, те, кто рекомендует Pulsar, чего-то не договаривают.


Например, их главный пользователь Tencent, крупная китайская технологическая компания, которая при этом очень активно использует Kafka везде, кроме одного проекта, где пригодился Pulsar. Tencent обрабатывает с Kafka десять триллионов сообщений в день (только представьте: 10 000 000 000 000). Если посчитать, Tencent использует Kafka в тысячу раз активнее, чем Pulsar (10 трлн против десятков млрд). Tencent с удовольствием рассказывают о своем деплое Kafka: Как Tencent PCG использует Apache Kafka для обработки 10+ трлн сообщений в день.


Сравнение двух конкурирующих опенсорс-платформ


Apache Kafka и Apache Pulsar две замечательные конкурирующие технологии. Логично будет их сравнить.


У Apache Kafka и Apache Pulsar очень схожий набор функций. Советую оценить обе платформы на предмет доступных функций, зрелости, распространения на рынке, опенсорс-инструментов и проектов, обучающего материала, проведения митапов по регионам, видео, статей и так далее. Примеры вариантов использования в вашей отрасли помогут принять правильное решение.


Confluent уже писал сравнение Kafka vs. Pulsar vs. RabbitMQ: производительность, архитектура и функции. Я тоже поучаствовал. Значит, сравнение уже есть


О чем тогда эта статья?


Я хочу рассмотреть несколько мифов из споров на тему Kafka против Pulsar, которые регулярно возникают в блогах и на форумах. Затем я сравню их не только по техническим аспектам, потому что обычно никто не говорит о Pulsar с этой стороны.



Kafka vs Pulsar мифы


Мне попадалось несколько мифов. С некоторыми я согласен, другие можно легко развенчать фактами. Ваше мнение может отличаться от моего, это нормально. Я излагаю исключительно свою точку зрения.


Миф 1. У Pulsar есть характеристики, которых нет у Kafka.


Правда.


Если сравнивать Apache Kafka и Apache Pulsar, можно найти различия в многоуровневой архитектуре, очередях и мультитенантности.


Но!


У Kafka тоже есть уникальные особенности:


  • В два раза меньше серверов, которыми приходится управлять.
  • Данные сохраняются на диск только один раз.
  • Данные кэшируются в памяти только однажды.
  • Проверенный протокол репликации.
  • Производительность zero-copy.
  • Транзакции.
  • Встроенная обработка потока.
  • Долгосрочное хранилище.
  • В разработке: удаление ZooKeeper (KIP-500), чтобы использовать и деплоить Kafka было еще проще, чем Pulsar с его четырехкомпонентной архитектурой (Pulsar, ZooKeeper, BookKeeper и RocksDB), а еще чтобы повысить масштабируемость, устойчивость и т. д.
  • В разработке: многоуровневое хранилище (KIP-405), чтобы повысить эластичность и экономичность Kafka.

Спросите себя: можно ли сравнивать опенсорс-платформы или продукты и вендоров с комплексным предложением?


Легко добавлять новые функции, если для них не нужно предоставлять критическую поддержку. Не оценивайте характеристики по списку. Узнайте, насколько они проверены в рабочих сценариях. Сколько из этих уникальных особенностей имеют низкое качество и реализованы наспех?


Например. Понадобилось несколько лет, чтобы реализовать и испытать в бою Kafka Streams в качестве нативного движка обработки потоков. Как это можно сравнивать с Pulsar Functions? Последнее позволяет добавлять определяемые пользователем функции (UDF) безо всякой связи с обработкой реальных потоков. Или это скорее похоже на Single Message Transformations (SMT), основную функцию Kafka Connect? Не сравнивайте круглое с квадратным и не забывайте учитывать зрелость. Чем критичнее функция, тем больше зрелости ей требуется.


Сообщество Kafka вкладывает невероятные усилия в работу над основным проектом и его экосистемой. Только в Confluent 200 инженеров занимаются исключительно проектом Kafka, дополнительными компонентами, коммерческими продуктами и предложениями SaaS в основных облачных провайдерах.


Миф 2. У Pulsar есть несколько крупных пользователей, например китайский Tencent.


Правда.


Но! Tencent использует Kafka активнее, чем Pulsar. Отдел выставления счетов, где используется Pulsar, это лишь малая часть Tencent, в то время как остальные используют Kafka. У них есть глобальная архитектура на основе Kafka, где больше тысячи брокеров сгруппированы в единый логический кластер.


Будьте осторожны с опенсорс-проектами. Проверяйте, какого успеха добились с ними обычные компании. Если эту технологию использует один технологический гигант, это еще не значит, что вам она тоже подойдет. Сколько компаний из Fortune 2000 могут похвастаться историями успеха с Pulsar?


Ищите примеры не только среди гигантов!


Примеры обычных компаний позволят лучше понять, как применяется тот или иной инструмент в реальном мире. Если это не истории успеха от самих вендоров. На сайте Kafka можно найти много примеров компаний. Более того, на конференциях Kafka Summit в Сан-Франциско, Нью-Йорке и Лондоне каждый год разные предприятия из разных отраслей делятся своими историями и кейсами. Это компании из Fortune 2000, предприятия среднего бизнеса и стартапы.


Приведу один пример про Kafka. Для репликации данных в реальном времени между отдельными кластерами Kafka существует много разных инструментов, включая MirrorMaker 1 (часть проекта Apache Kafka), MirrorMaker 2 (часть проекта Apache Kafka), Confluent Replicator (от Confluent, доступен только в рамках Confluent Platform и Confluent Cloud), uReplicator (опенсорс от Uber), Mirus (опенсорс от Salesforce), Brooklin (опенсорс от LinkedIn).


На практике разумно использовать только два варианта, если, конечно, вы не хотите обслуживать и улучшать код самостоятельно. Это MirrorMaker 2 (еще не вполне зрелая новинка, но отличный выбор для средне- и долгосрочной перспективы) и Confluent Replicator (проверенный на практике во многих критически важных для бизнеса деплоях, но платный). Остальные варианты тоже нормальные. Но кто обслуживает эти проекты? Кто разбирается с багами и проблемами безопасности? Кому звонить, если в продакшене что-то случилось? Одно дело деплоить критически важные системы в продакшене, другое оценивать и пробовать опенсорс-проект.


Миф 3. Pulsar предлагает очереди и стриминг в одном решении.


Частично.


Очереди используются для обмена сообщениями между двумя точками. Они предоставляют асинхронный протокол взаимодействия, то есть отправитель и получатель сообщения не должны обращаться к очереди одновременно.


Pulsar предлагает только ограниченную поддержку очередей сообщений и стриминга событий. В обеих областях он пока не может составить крепкую конкуренцию по двум причинам:


  1. Pulsar предлагает ограниченную поддержку очередей сообщений, потому что ему не хватает таких популярных функций, как XA-транзакции, маршрутизация, фильтрация сообщений и т. д. Это обычные функции в таких системах сообщений, как IBM MQ, RabbitMQ, и ActiveMQ. Адаптеры Pulsar для систем сообщений тоже ограничены в этом смысле. В теории все выглядит нормально, но на практике...


  2. Pulsar предлагает ограниченную поддержку стриминга. Например, на практике в большинстве случаев он не поддерживает семантику строго однократной (exactly-once) доставки и обработки. Вряд ли кто-то станет использовать Pulsar для платежной системы, потому что платежи могут дублироваться и теряться. Ему не хватает функционала для обработки потоков с соединением, агрегированием, окнами, отказоустойчивым управлением состоянием и обработкой на основе времени событий. Топики в Pulsar отличаются от топиков в Kafka в худшую сторону из-за BookKeeper, который был придуман в 2008 году как лог упреждающей записи для HDFS namenode в Hadoop в расчете на краткосрочное хранение.



Примечание. Адаптер Kafka для Pulsar тоже ограничен. В теории звучит заманчиво, но в реальности не все получается, потому что он поддерживает только небольшую часть функционала Kafka.


Как и Pulsar, Kafka предлагает ограниченную поддержку очереди.


В Kafka можно использовать разные обходные пути, чтобы реализовать нормальное поведение очереди. Если вы хотите использовать отдельные очереди вместо общих топиков Kafka для:


  • Безопасности => используйте Kafka ACL (и дополнительные инструменты, вроде контроля доступа на основе ролей (RBAC) от Confluent).
  • Семантики (отдельных приложений) => используйте группы консюмеров в Kafka.
  • Балансировки нагрузки => используйте партиции Kafka.

Обычно я спрашиваю заказчиков, зачем им нужны очереди. Kafka предлагает готовые решения для разных сценариев, на которые можно просто посмотреть с другой стороны. Редко встречаются случаи, когда действительно требуется высокая пропускная способность с очередями.


Зная обходные пути и ограничения Pulsar и Kafka в плане сообщений, давайте проясним: ни одна из платформ не предлагает решение для обмена сообщениями.


Если вам нужно именно оно, возьмите что-то вроде RabbitMQ или NATS.


Однозначного решения здесь нет. Многие наши заказчики заменяют имеющиеся системы сообщений, вроде IBM MQ, на Kafka (из соображений масштабируемости и затрат). Просто нужно знать имеющиеся варианты и их недостатки, все взвесить и подобрать решение, которое лучше всего подходит именно вам.


Миф 4. Pulsar предоставляет потоковую обработку.


Неправда.


Или, если по-честному, все зависит от того, что вы называете обработкой потоков. Это какая-то простейшая функция или прямо полноценная обработка?


Если в двух словах, обработкой потоков я называю непрерывное потребление, обработку и агрегирование событий из разных источников данных. В реальном времени. В любом масштабе. Естественно, с защитой от сбоев, особенно если речь идет о stateful.



Pulsar предлагает минимальную потоковую обработку через Pulsar Functions. Она подходит для простых обратных вызовов, но не сравнится с функционалом Kafka Streams или ksqlDB для создания стриминговых приложений со stateful-информацией, скользящими окнами и другими функциями. Варианты применения можно найти в разных отраслях. Например, на сайте Kafka Streams есть кейсы New York Times, Pinterest, Trivago, Zalando и других.


Для стриминга аналитики Pulsar обычно используется в сочетании с нормальной платформой обработки потоков, вроде Apache Spark или Apache Flink, но вам придется управлять дополнительным элементами распределенной архитектуры и разбираться в их сложном взаимодействии.


Миф 5. Pulsar предоставляет семантику exactly-once, как и Kafka.


Неправда.


В Pulsar есть функция дедупликации, чтобы одно сообщение не сохранялось в брокере Pulsar дважды, но ничто не мешает консюмеру прочитать это сообщение несколько раз. Этого недостаточно для любого применения обработки потоков, где для входа и выхода используется Pulsar.


В отличие от функции транзакций в Kafka, здесь нельзя привязывать отправленные сообщения к состоянию, записанному в обработчике потоков.


Семантика Exactly-Once Semantics (EOS) доступна с версии Kafka 0.11, которая вышла три года назад, и используется во многих продакшен-деплоях. Kafka EOS поддерживает всю экосистему Kafka, включая Kafka Connect, Kafka Streams, ksqlDB и такие клиенты, как Java, C, C++, Go и Python. На конференции Kafka Summit было несколько выступлений, посвященных функционалу Kafka EOS, включая это превосходное и понятное введение со слайдами и видео.


Миф 6. У Pulsar производительность выше, чем у Kafka.


Неправда.


Я не поклонник разных бенчмарков производительности и пропускной способности. Обычно они субъективны и относятся к конкретной задаче независимо от того, кто их проводит вендор, независимый консультант или исследователь.


Например, GIGAOM опубликовали один бенчмарк, где сравнивается задержка и производительность в Kafka и Pulsar. Автор намеренно замедлил Kafka, настроив параметр flush.messages = 1, в результате которого каждый запрос вызывает fsync и Kafka синхронизируется с диском при каждом сообщении. В этом же бенчмарке консюмер Kafka отправляет подтверждение синхронно, а консюмер Pulsar асинхронно. Неудивительно, что Pulsar вышел явным победителем. Правда, автор забыл упомянуть или объяснить существенные отличия в конфигурации и измерениях. Давайте не будем путать теплое с мягким.


На самом деле архитектура Pulsar сильнее нагружает сеть (из-за брокера, который выступает как прокси перед серверами bookie BookKeeper) и требует в два раза больше пропускной способности (потому что BookKeeper записывает данные в лог упреждающей записи и в главный сегмент).


Confluent тоже делал бенчмарки. Только там сравнивалось сопоставимое. Неудивительно, что результаты получились другими. Но нужно ли вообще встревать в эти бои на бенчмарках между вендорами?


Оцените свои требования к производительности. Сделайте proof-of-concept с Kafka и Pulsar, если надо. Скорее всего, в 99% случаев обе платформы покажут приемлемую производительность для вашего сценария. Не доверяйте посторонним субъективным бенчмаркам! Ваш случай все равно уникален, и производительность это только один из аспектов.


Миф 7. Pulsar проще в использовании, чем Kafka.


Неправда.


Без дополнительных инструментов вам будет сложно и с Kafka, и с Pulsar.


У Kafka две распределенных системы: сама Kafka и Apache ZooKeeper.


Но! У Pulsar три распределенных системы, а еще хранилище: Pulsar, ZooKeeper и Apache BookKeeper. Как и Pulsar, BookKeeper использует ZooKeeper. Для некоторых задач хранения используется RocksDB. Это означает, что Pulsar гораздо сложнее понять и настроить по сравнению с Kafka. Кроме того, у Pulsar больше параметров конфигурации, чем у Kafka.


Kafka движется в противоположном направлении скоро ZooKeeper будет удален (см. KIP-500), так что останется всего одна распределенная система, которую нужно деплоить, обслуживать, масштабировать и мониторить:



ZooKeeper мешает масштабированию в Kafka и усложняет эксплуатацию. Но у Pulsar все еще хуже!


Одна из главных проблем моих заказчиков использование ZooKeeper в критически важных деплоях в большом масштабе. С нетерпением жду упрощения архитектуры Kafka, чтобы остались только брокеры. Кроме того, мы получим единую модель безопасности, потому что больше не придется отдельно настраивать безопасность для ZooKeeper. Это огромное преимущество, особенно для крупных организаций и отраслей со строгим регулированием. Ребята из комплаенса и ИБ скажут вам спасибо за упрощенную архитектуру.


Использование платформы связано НЕ только с архитектурой


У Kafka гораздо более подробная документация, огромное сообщество экспертов и большой набор поддерживающих инструментов, которые упрощают работу.


Кроме того, по Kafka есть много вариантов обучения онлайн-курсы, книги, митапы и конференции. К сожалению, у Pulsar с этим все плохо.


Миф 8. Архитектура с тремя уровнями лучше, чем с двумя.


Зависит от ситуации.


Лично я скептически отношусь к тому, что трехуровневую архитектуру Pulsar (брокеры Pulsar, ZooKeeper и BookKeeper) можно считать преимуществом для большинства проектов. Тут есть издержки.


Twitter рассказал, как отказался от BookKeeper + DistributedLog (DistributedLog похож на Pulsar по архитектуре и дизайну) около года назад, соблазнившись такими преимуществами одноуровневой архитектуры Kafka, как экономичность и улучшенная производительность, которых недоставало двухуровневой архитектуре с отдельным хранилищем.


Как и Pulsar, DistributedLog построен на BookKeeper и добавляет стриминговый функционал, схожий с Pulsar (например, уровень обработки существует отдельно от уровня хранилища). Изначально DistributedLog был отдельным проектом, но потом присоединился к BookKeeper, хотя сегодня им, похоже, мало кто занимается (всего пара коммитов за последний год). Среди основных причин перехода Twitter на Kafka называлась значительная экономия и повышение производительности, а также обширное сообщество и популярность Kafka. Вот какие выводы они сделали: Для одного консюмера экономия ресурсов составила 68%, а для нескольких целых 75%.


Преимущества трехуровневой архитектуры проявляются при создании масштабируемой инфраструктуры. При этом дополнительный уровень увеличивает нагрузку на сеть минимум на 33%, а данные, которые хранятся в брокерах Pulsar, приходится дополнительно кэшировать на обоих уровнях, чтобы добиться аналогичной производительности, а еще дважды записывать на диск, потому что формат хранения в Bookkeeper основан не на логе.


В облаке, где работает большинство деплоев Kafka, лучшим внешним хранилищем выступает не такая узкоспециализированная технология, как BookKeeper, а популярное и проверенное хранилище объектов, вроде AWS S3 и GCP GCS.


Tiered Storage в Confluent Platform на основе AWS S3, GCP GCS и им подобных, дает те же преимущества без дополнительного слоя BookKeeper, как у Pulsar, и без дополнительных затрат и задержек, связанных с передачей данных по сети. У Confluent ушло два года на то, чтобы выпустить GA-версию Tiered Storage for Kafka с круглосуточной поддержкой для самых критичных данных. Tiered Storage пока недоступно для опенсорсной версии Apache Kafka, но Confluent вместе с сообществом Kafka (включая крупные технологические компании, вроде Uber) работает над KIP-405, чтобы добавить Tiered Storage в Kafka с разными вариантами хранения.


У обеих архитектур есть плюсы и минусы. Лично мне кажется, что 95% проектов не нуждаются в сложной трехуровневой архитектуре. Она может пригодиться там, где требуется внешнее недорогое хранилище, но вам придется отвечать за круглосуточное соблюдение SLA, масштабируемость и пропускную способность. А еще за безопасность, управление, поддержку и интеграцию в вашу экосистему. Если вам действительно нужна трехуровневая архитектура, не стану вас останавливать.


Связанный миф: Pulsar лучше подходит для отстающих консюмеров благодаря уровню кэширования и хранения.


Неправда.


Основная проблема отстающих консюмеров в том, что они занимают page cache, то есть последние сообщения уже кэшированы. При чтении из предыдущих сегментов они заменяются, что снижает производительность консюмеров, читающих с начала лога.


Архитектура Pulsar проявляет себя даже хуже в этом отношении. Проблема сброса кэша сохраняется, при этом для чтения нужно сделать лишний прыжок по сети и загрузить сеть вместо того, чтобы просто прочитать сообщение с локального носителя.


Миф 9. Kafka масштабируется хуже, чем Pulsar.


Неправда.


Это один из главных аргументов сообщества Pulsar. Как я уже говорил, это всегда зависит от выбранного бенчмарка. Например, я видел тесты с эквивалентными вычислительными ресурсами, где на высокой пропускной способности у Kafka результат был гораздо лучше, чем у Pulsar. Вот один из таких бенчмарков:



В большинстве случаев масштабируемость не проблема. Kafka можно легко разогнать до нескольких гигабайтов в секунду, как в демонстрации Масштабирование Apache Kafka до 10+ ГБ в секунду в Confluent Cloud:



Честно говоря, этот вопрос должен волновать менее 1% пользователей. Если у вас требования, как у Netflix (петабайты в день) или LinkedIn (триллионы сообщений), есть смысл обсуждать самую подходящую архитектуру, железо и конфигурацию. Остальным можно не беспокоиться.


Связанный миф: сейчас в Kafka может храниться всего 500к партиций на кластер.


Правда.


Пока у Kafka не лучшая архитектура для масштабных деплоев с сотнями тысяч топиков и партиций.


Но! Pulsar тоже не резиновый. Просто у него другие лимиты.


Ограничения в Kafka связаны с Zookeeper. Когда Zookeeper удалят через KIP-500, верхняя граница уйдет вместе с ним.


Примечание: успех зависит от подходящего устройства архитектуры!

Если у заказчика возникли проблемы с количеством партиций и масштабируемостью в Kafka, скорее всего, он ошибся в архитектуре и приложениях (Pulsar бы тут точно не помог).


Kafka это платформа стриминга, а не очередной IBM MQ. Если вы попробуете воссоздать любимое решение и архитектуру MQ с Kafka, у вас вряд ли что-то получится. У меня было несколько заказчиков, которые потерпели крах, но смогли привести все в порядок с нашей помощью.


Скорее всего, у вас не возникнет проблем с количеством партиций и масштабируемостью, даже пока Kafka еще использует ZooKeeper, если вы правильно все спроектируете и разберетесь в базовых принципах Kafka. Такие проблемы часто возникают с любой технологией, вроде Kafka, которая вводит совершенно новый уровень и парадигму (например, при первом появлении облаков немало компаний набили себе шишки при попытке их освоить).


Связанный миф: Pulsar поддерживает практически неограниченное число партиций.


Неправда.


У BookKeeper существуют те же ограничения по одному файлу на ledger, что и у Kafka, но в одной партиции таких ledger несколько. Брокеры Pulsar объединяют партиции в группы, но на уровне хранения, в Bookkeeper, данные хранятся в сегментах, и на каждую партицию приходится много сегментов.


В Kafka метаданные для этих сегментов хранятся в Zookeeper, откуда и возникают эти ограничения по числу. Когда Kafka избавится от этой зависимости, границы возможного раздвинутся. С нетерпением жду реализации KIP-500. Подробности читайте в статье Apache Kafka справится сама: удаление зависимости от Apache ZooKeeper.


Связанный миф: потребности масштабирования в Kafka необходимо определять при создании топика.


Отчасти это правда.


Если требуется очень большой масштаб, в топиках Kafka можно сразу создать больше партиций, чем обычно требуется для этой задачи. (См. Потоки и таблицы в Apache Kafka: топики, партиции и хранение.) Или можно добавить партиции позже. Это не идеальное решение, но так уж устроены распределенные системы стриминга (которые, кстати, масштабируются лучше, чем традиционные системы обработки сообщений, вроде IBM MQ).


Оптимальные методы создания топиков и изменения их конфигурации уже тщательно продуманы за вас, так что не волнуйтесь.


Но! У топиков Pulsar есть то же ограничение!


Пропускная способность для записи зависит от числа партиций, назначенных топику Pulsar, точно так же, как это происходит в топике Kafka, поэтому там тоже придется создавать партиции с запасом. Дело в том, что в каждой партиции записывать можно только в один ledger за раз (а их может быть несколько). Увеличение числа партиций динамически влияет на порядок сообщений, как и в Kafka (порядок нарушается).


У Kafka и Pulsar с масштабированием все отлично. Этого будет более чем достаточно почти в любом сценарии.


Если вам нужен совсем заоблачный масштаб, лучше взять реализацию без ZooKeeper. Так что KIP-500 это самое ожидаемое изменение в Kafka, судя по сообществу и заказчикам Confluent.


Миф 10. В случае аппаратного сбоя Pulsar восстанавливается моментально, а Kafka приходится перегружать данные.


И да, и нет.


Если брокер Pulsar вырубится, ничего не страшного не будет, это правда, но, в отличие от брокера Kafka, он и не хранит данные, а просто выступает в качестве прокси для уровня хранения, то есть BookKeeper. Так что подобные заявления о Pulsar это просто маркетинговый ход. Почему никто не говорит, что бывает, если полетит нода BookKeeper (bookie)?


Если завершить и перезапустить BookKeeper bookie, данные будут точно так же перераспределяться, как и в Kafka. В этом суть распределенных систем с их репликацией и партициями.


Kafka уже предлагает эластичность.


Это важно. Основатель Confluent, Джей Крепс (Jay Kreps) недавно писал об этом: Эластичные кластеры Apache Kafka в Confluent Cloud. В облачном сервисе SaaS, вроде Confluent Cloud, конечный пользователь не думает об аппаратных сбоях. Он ожидает непрерывный аптайм и SLA на уровне 99,xx. С оплатой за потребление пользователь не заботится об управлении брокерами, изменении размеров нод, увеличении или уменьшении кластеров и подобных деталях.


Самоуправляемым кластерам Kafka нужны такие же возможности. Tiered Storage for Kafka это огромное хранилище, с которым данные не хранятся на брокере и восстановление после сбоев происходит почти моментально. Если добавить сюда такие инструменты, как Self-Balancing Kafka (эта фича от Confluent описана в статье по ссылке выше), можно вообще забыть об эластичности в самоуправляемых кластерах.


К сожалению, в Pulsar вы ничего подобного не найдете.


Миф 11. Pulsar справляется с репликацией между кластерами лучше, чем Kafka.


Неправда.


Любая распределенная система должна решать такие проблемы, как CAP-теорема и кворум в распределенных вычислениях. Кворум это минимальное число голосов, которое распределенная транзакция должна получить, чтобы выполнить операцию в распределенной системе. Такой подход позволяет обеспечить согласованность операций.


Задачи по кворуму Kafka поручает ZooKeeper. Даже после реализации KIP-500 и удаления ZooKeeper законы физики не перестанут действовать: проблемы задержки в распределенных системах существуют в таких регионах, как Восточная, Центральная и Западная часть США и даже по всему миру. Скорость света, конечно, впечатляет, но все же имеет ограничения.


Эту проблему можно обойти разными способами, включая использование инструментов репликации в реальном времени, например MirrorMaker 2 в Apache Kafka, Confluent Replicator или Confluent Multi-Region-Clusters. Эти варианты и советы см. в статье Архитектурные шаблоны для распределенных, гибридных, периферийных и глобальных деплоев Apache Kafka.



Вы не найдете универсальное решение, которое обеспечит глобальную репликацию + нулевой простой + нулевую потерю данных. Для самых критически важных приложений Confluent Multi-Region-Clusters предлагает RTO=0 и RPO=0 (нулевой простой и нулевую потерю данных) с автоматическим аварийным восстановлением и отработкой отказа, даже если упадет целый датацентр или облачный регион.


Для таких случаев архитектура Pulsar будет еще сложнее, чем ее базовая версия. Просто для георепликации Pulsar требует дополнительный глобальный кластер Zookeeper, а для больших расстояний это не годится. Обходной путь есть, но от CAP-теоремы и законов физики не уйдешь.


Что с Kafka, что с Pulsar вам нужна проверенная архитектура для борьбы с физикой в глобальных деплоях.


Миф 12. Pulsar совместим с интерфейсом и API Kafka.


Отчасти.


Pulsar предлагает простейшую реализацию с зачаточной совместимостью с протоколом Kafka v2.0.


У Pulsar есть конвертер для базовых элементов протокола Kafka.


В теории совместимость с Kafka выглядит убедительно, но вряд ли это серьезный аргумент для переноса действующей инфраструктуры Kafka в Pulsar. Зачем такой риск?


Мы слышали заявления о совместимости с Kafka и в других примерах, например для гораздо более зрелого сервиса Azure Event Hubs. Почитайте об ограничивающих факторах их Kafka API. Вы удивитесь. Никакой поддержки основных функций Kafka, вроде транзакций (и семантики exactly-once), сжатия или compaction для логов.


Раз, по сути, это не Kafka, существующие приложения Kafka будут вести себя непредсказуемо в таких вот совместимых системах, будь то Azure Event Hubs, Pulsar или другая оболочка.


Kafka vs. Pulsar комплексное сравнение


Выше мы говорили о мифах, которые гуляют по разным статьям и постам. С ними, вроде, все понятно.


Но я обещал, что мы поговорим не только о технических деталях. При выборе технологии важны и другие аспекты.


Рассмотрим три из них: доля на рынке, корпоративная поддержка и облачные предложения.


Доля на рынке у Apache Kafka и Apache Pulsar


Статистика в Google Trends за последние пять лет совпадает с моими личными наблюдениями интерес к Apache Kafka гораздо выше, чем к Apache Pulsar:



Примерно так же выглядит картинка сравнения на Stack Overflow и аналогичных платформах, по числу и размеру поддерживающих вендоров, открытой экосистеме (интеграция инструментов, обертки, вроде Spring Kafka) и подобным характеристикам.


Открытые вакансии еще один индикатор распространения технологии. Для Pulsar их мало, то есть его использует не так много компаний. Убедитесь сами поищите на любом сайте. Если искать по всему миру, для Pulsar вакансий меньше сотни, а для Kafka тысячи. Кроме того, в большинстве вакансий для Pulsar указано, что требуется опыт с Kafka, Pulsar, Kinesis и аналогичными технологиями.


В большинстве случаев такие характеристики важнее для успешного проекта, чем технические детали. В конце концов ваша цель решить определенную бизнес-задачу, правда же?


Раз Pulsar так мало распространен, почему о нем вообще говорят? Во-первых, потому, что независимые консультанты, аналитики и блоггеры (включая меня) должны рассказывать о новых современных технологиях, чтобы привлечь аудиторию. Если честно, люди любят об этом читать.


Корпоративная поддержка для Kafka и Pulsar


Корпоративная поддержка для Kafka и Pulsar существует.


Но все не совсем так, как можно ожидать. Вот несколько вендоров, к которым вы можете обратиться, если решили работать с Pulsar:


  • Streamlio (теперь принадлежит Splunk) компания, которая раньше стояла за Apache Pulsar. Splunk пока не объявили о своей будущей стратегии для поддержки пользователей, которые работают над проектами на основе Pulsar. Splunk славится своими популярными аналитическими платформами. Это их основной бизнес (1,8 млрд долларов в 2019 году). Единственное, на что жалуются пользователи, ценник. Splunk активно используют Kafka, а сейчас интегрируют Pulsar в Splunk Data Stream Processor (DSP). Сильно сомневаюсь, что Splunk вдруг увлечется опенсорсом, чтобы поддержать ваш драгоценный проект на базе Pulsar (возможно, чего-то можно ожидать от DSP). Будущее покажет.
  • StreamNative основана одним из изначальных разработчиков Apache Pulsar и поддерживает платформу стриминга на основе Pulsar. На июнь 2020 года у StreamNative было 13 (целых 13, да!) сотрудников в LinkedIn. Уж не знаю, хватит этого для поддержки вашего критически важного проекта или нет.
  • TIBCO объявили о поддержке Pulsar в декабре 2019 года. За последние годы стратегия у них сместилась с интеграции на аналитику. Пользователи их промежуточного ПО уходят от TIBCO толпами. В отчаянии они приняли стратегическое решение: поддерживать другие платформы, даже если у них нет никакого опыта и никакой связи с ними. Да, звучит как миф, но TIBCO делают то же самое для Kafka. Любопытный факт: TIBCO предлагает Kafka и ZooKeeper в Windows! Такого больше никто не делает. Все знают, это приводит к нестабильности и несогласованности. Но если что, TIBCO помогут вам с Kafka и Pulsar. Зачем вообще сравнивать эти платформы, если можно получить обе у одного вендора, причем даже на Windows, с .exe и bat-скриптами для запуска серверных компонентов:


Вендоров для Kafka больше с каждым днем.


Это очень распространенная на рынке платформа. Лучшее подтверждение поддержка от крупнейших вендоров. IBM, Oracle, Amazon, Microsoft и многие другие поддерживают Kafka и встраивают возможности интеграции с ней в свои продукты.


Окончательное подтверждение этому я получил на конференции Oracle OpenWorld 2019 в Сан-Франциско, где выступал менеджер по GoldenGate (великолепный и очень дорогой инструмент CDC от Oracle). В основном он рассказывал, как GoldenGate станет платформой интеграции данных вообще для всего. Половина выступления была посвящена стримингу, платформе Kafka и тому, что GoldenGate будет обеспечивать интеграцию с разными базами/озерами данных и Kafka в обоих направлениях.


Полностью управляемые облачные сервисы для Kafka и Pulsar


Какие облачные решения доступны для Kafka и Pulsar?


Для Apache Pulsar есть облачный сервис с очень говорящим названием:


Kafkaesque.


Я серьезно. Сами посмотрите [UPD: 17 июня они переименовали KAFKAESQUE в KESQUE. Видимо, поняли, всю комичность ситуации.]


Для Apache Kafka существуют разные предложения на выбор:


  • Confluent Cloud (SaaS) полностью управляемый сервис с оплатой по мере использования, круглосуточной доступностью, эластичностью и бессерверными функциями для Apache Kafka и связанной экосистемы (Schema Registry, коннекторы Kafka Connect и ksqlDB для обработки потоков).
  • Amazon MSK (PaaS) подготавливает ZooKeeper и брокеры Kafka, а конечные пользователи сами обслуживают их, исправляют баги, устанавливают обновления и т. д. Важный факт: AWS не включает проблемы с Kafka в поддержку и SLA 99,95.
  • Azure Event Hubs (SaaS) предоставляет конечную точку Kafka (с проприетарной реализацией) для взаимодействия с приложениями Kafka. Это легко масштабируемое решение с высокой производительностью. Это не вполне Kafka, просто эмуляция, и тут не хватает некоторых важных функций, вроде семантики exactly-once, compaction логов и сжатия. Не говоря уже о дополнительных возможностях, вроде Kafka Connect и Kafka Streams
  • Big Blue (IBM) и Big Red (Oracle) предлагают облачные решения для Kafka и соответствующих API. Не знаю, использует их кто-нибудь или нет. Понятия не имею, насколько они хороши, никогда не видел их в деле.
  • Много компаний поменьше, вроде Aiven, CloudKarafka, Instaclustr и других.

В общем, про распространенность Kafka и Pulsar на рынке все очевидно.


И все же Apache Kafka или Apache Pulsar?


В двух словах: Pulsar пока здорово отстает от Kafka по зрелости в плане надежности для больших масштабов и обширности сообщества.


И вообще не факт, что Pulsar лучше.


Есть смысл сравнивать их, если вы решили использовать только опенсорс-инструменты. Узнайте, что подойдет лучше именно вам. Не забудьте учесть технические характеристики, уровень зрелости, вендоров, сообщество разработчиков и другие важные факторы. Что вписывается в вашу ситуацию?


Если в Kafka и Pulsar есть не все нужные вам функции, выбирайте Kafka: для нее есть много вендоров и облачных решений. Pulsar, к сожалению, сильно отстает и вряд ли догонит в ближайшем будущем.

Подробнее..

Обогащение данных что это и почему без него никак

15.04.2021 06:04:33 | Автор: admin

Задача обогащения данных напрямую связана с темой их обработки и анализа. Обогащение нужно для того, чтобы конечные потребители данных получали качественную и полную информацию.

Сам термин "обогащение данных" это перевод англоязычного Data enrichment, который проводит аналогию между данными и... ураном. Точно так же, как промышленники насыщают урановую руду, увеличивая долю изотопа 235U, чтобы её можно было использовать (хочется надеяться, в мирных целях), в процессе обогащения данных мы насыщаем их информацией.

Обогащение данных это процесс дополнения сырых данных той информацией, которая в исходном виде в них отсутствует, но необходима для качественного анализа.

Конечными потребителями такой информации чаще всего являются аналитики, но могут быть и нейронные сети, пользователи приложения или предоставляемого API.

Заметим, что обогащение данных термин широкий, и получать данные из внешних источников можно весьма разнообразными способами. Например, представим бизнес-процесс регистрации нового клиента. Если в данных этого клиента отсутствует e-mail, то взаимодействие с внешним источником в этом случае может быть буквально следующим: взяли телефон, позвонили клиенту, узнали его e-mail. Получается, этот процесс может включать в себя такие совершенно не автоматизированные операции, как обычный телефонный разговор со всеми этими эс как доллар, "а" как русская. Это тоже можно считать обогащением данных, но данный пласт проблем в этой статье мы затрагивать не будем. Нас интересуют вполне конкретные случаи, когда данные хранятся в базе данных и именно БД служит внешним источником данных для обогащения.

Источниками сырых исходных данных могут быть:

  • Система кассовых платежей, которая отслеживает транзакции и отправляет в хранилище данных: время, id торговой точки, количество и стоимость купленного товара и т.д.

  • Логистическая система, которая отслеживает движение товара: id транспорта и его водителя, gps-координаты в заданные моменты времени, статус, маршрут и т.д.

  • Телеметрия с датчиков интернета вещей.

  • Система мониторинга инфраструктуры.

Все эти примеры объединяет то, что в них регулярно передаются относительно небольшие пакеты данных, каждый из которых имеет идентификатор. То есть каждый источник знает свой идентификатор и передает его вместе с пакетом данных. По этому идентификатору в хранилище данных можно получить справочную информацию об источнике.

Таким образом, типовую задачу можно сформулировать следующим образом: на вход поступают данные вида <таймстамп, идентификатор источника, содержимое пакета>, а конечному потребителю требуется соединить (в смысле join) справочную информацию об источнике из хранилища данных с идентификатором источника из сырых данных.

Как обогащаем данные

Один из вариантов решения задачи сопоставления данных по расписанию запускать скрипты, которые подготовят подходящие для аналитиков представления. Например, это можно делать с помощью Apache Airflow. Но в любом случае потребуется ждать и сохранять данные из внешних источников.

Другой вариант использовать инструменты потоковой обработки данных. В этом случае нужно определиться, где же всё-таки хранить справочную информацию и что будет являться Single Source of Truth (SSOT), или единым источником истины для справочных данных. Если хранить справочные данные в хранилище, то к нему придется каждый раз обращаться, и это может быть накладным, так как к сетевым издержкам добавится ещё и обращение к диску. Вероятно, оптимальнее хранить справочную информацию в оперативной памяти или другом горячем хранилище, например, в Tarantool.

Мы, очевидно, отдаём предпочтению именно последнему варианту, и наша схема приобретает завершенный вид.

Она отличается от первой иллюстрации процесса обогащения данных тем, что появилась конкретика: процесс обогащения происходит на лету с помощью потокового обработчика, справочные данные хранятся в горячем хранилище (оперативной памяти).

Давайте посмотрим, какие плюсы и минусы мы получим, если возьмем за основу данную схему.

Преимущества потокового обогащения данных:

  • В хранилище всегда актуальные данные, которые готовы к использованию. В нашем динамичном мире зачастую необходимо иметь доступ к реалтайм-данным и сразу их использовать, не тратя время на дополнительную подготовку, распаковку и т.д. Потоковый обработчик, можно сказать, приносит их нам на блюдечке.

  • Схема хорошо масштабируется, если подобрать соответствующие технологии. С правильными инструментами можно масштабироваться под требуемую пропускную способность платформы и под объемы справочной информации.

Недостатки:

  • Источник истины для справочной информации вынесен из надёжного дискового хранилища данных в менее надёжную оперативную память. При сбое это грозит потерей всех несинхронизированных данных.

  • Требуется гораздо больше памяти как оперативной, так и постоянной. В целом, для любой системы аналитики желательно иметь побольше дискового пространства и оперативной памяти, но здесь это становится обязательным условием.

Какие технологии используем

Что касается технологий, то мы выбираем более-менее устоявшиеся на рынке решения с открытым исходным кодом и хорошей документацией, учитывая совместимость.

Выбранный стек технологий:

  • Apache Kafka источник данных и брокер очередей;

  • Apache Spark потоковый обработчик данных;

  • Apache Ignite горячее хранение справочной информации;

  • Greenplum и Apache Hadoop хранилище данных.

В выборе Greenplum мы немного поступились совместимостью. Связать его со Spark не совсем тривиальная задача, для этого не существует стандартного open source коннектора (подробнее рассказывали в этой статье). Поэтому мы разрабатываем такой коннектор сами.

В остальном набор достаточно стандартный. Hadoop держим на всякий случай, например, чтобы использовать тот же Hive поверх HDFS вместо Greenplum.

Каждое из этих решений масштабируемо. Поэтому мы можем построить любую архитектуру, в зависимости от объемов справочной информации и ожидаемой пропускной способности.

Итак, вот окончательные очертания нашей схемы обогащения данных.

Версии, которые используем:

  • Apache Spark 2.4.6.

  • Apache Ignite 2.8.1.

  • Apache Kafka 2.4.1.

  • Greenplum 6.9.0.

  • Apache Hadoop 2.10.1.

Обращаю на это внимание, потому что от версий выбранных компонент зависит совместимость и возможность этих компонент взаимодействовать. Например, если нужно, чтобы Apache Spark писал данные в базу данных Greenplum, это может стать отдельной проблемой, и версии также будут важны. Наш коннектор на текущий момент работает именно со Spark v2, для Spark v3 его нужно отдельно дорабатывать.

Что в результате

Если объединять данные, применяя предложенную схему обогащения, в дальнейшем аналитикам не потребуется каждый раз делать JOIN-запрос, что сэкономит как ценное время людей, так и машинные ресурсы.

Но есть и ограничения, связанные с тем, что source of truth, по сути, находится в оперативной памяти. Поэтому при редактировании справочной информации надо напрямую работать с Ignite через интерфейсы самого Ignite. Кроме этого, нужен аккуратный механизм синхронизации, чтобы кэш Ignite был персистентным. У Ignite есть собственный механизм для записи на диск, но все же Ignite больше ориентирован на работу в ОЗУ, поэтому для резервирования справочной информации в хранилище данных лучше использовать что-нибудь специально для этого предназначенное, например, Airflow.

Полезные ссылки, чтобы понять, как строить взаимодействие между Spark и Ignite и насколько это уже проработанный механизм:

Пользуясь случаем: мы расширяем отдел систем обработки данных. Если вам интересно заниматься с подобного рода задачами, пишите мне в личку, в телеграм @its_ihoziainov или на job@itsumma.ru с пометкой data engineering.

Если вы уже сталкивались с такого рода задачами, делитесь мнением в комментариях, задавайте вопросы они, как известно, очень помогают находить лучшие решения. Спасибо.

Подробнее..

Перевод Apache Kafka скоро без ZooKeeper

16.04.2021 08:17:32 | Автор: admin

image


В основе Apache Kafka находится лог простая структура данных, которая использует последовательные операции, работающие в симбиозе с оборудованием. Эффективное использование дискового буфера и кэша процессора, prefetch, передача данных zero-copy и много других радостей все это благодаря построенной на логе структуре, которая славится своей эффективностью и пропускной способностью. Обычно эти преимущества, а еще базовая реализация в виде лога коммитов, первое, что люди узнают о Kafka.


Код самого лога составляет относительно малую часть всей системы. Гораздо больше занимает код, который отвечает за организацию партиций (т. е. логов) на множестве брокеров в кластере назначает лидеров, обрабатывает сбои и т. д. Этот код и делает Kafka надежной распределенной системой.


Раньше важной частью работы распределенного кода был Apache ZooKeeper. Он хранил самые важные метаданные системы: где находятся партиции, кто из реплик лидер и т. д. Поначалу эффективный и проверенный ZooKeeper действительно был нужен, но, по сути, это что-то вроде специфической файловой системы или API триггеров поверх согласованного лога. А Kafka это API pub/sub (издатель-подписчик) поверх согласованного лога. В результате пользователи настраивают, конфигурируют, мониторят, защищают и обдумывают взаимодействие и производительность между двумя реализациями лога, двумя сетевыми уровнями и двумя реализациями системы безопасности каждая со своими инструментами и хуками мониторинга. Все стало неоправданно сложно, поэтому решено было заменить ZooKeeper сервисом кворума прямо в самой Kafka.


Работы предстояло немало, поэтому в прошлом апреле мы запустили проект с участием сообщества, чтобы ускорить темпы и создать рабочую систему к концу года.


image


Мы с Джейсоном, Колином и командой KIP-500 прогнали полный жизненный цикл сервера Kafka с созданием и потреблением сообщений и все без Zookeeper. Какая же красота!
Бен Стопфорд (Ben Stopford)


Рады сообщить, что ранний доступ к KIP-500 уже закоммичен в ствол и будет включен в предстоящий релиз 2.8. Впервые Kafka можно использовать без ZooKeeper. Мы называем это режимом Kafka Raft Metadata, сокращенно KRaft (читается крафт).


В этом релизе пока доступны не все фичи. Мы еще не поддерживаем ACL и другие транзакции или функции безопасности. Переназначение партиций и JBOD тоже не поддерживаются в режиме KRaft (надеемся, что они будут доступны в одном из релизов Apache Kafka в этом году). Так что контроллер кворума считаем экспериментальным и для прода пока не используем. В остальном преимуществ масса: деплоймент и обслуживание станут проще, всю Kafka можно запустить как один процесс, а на кластер помещается гораздо больше партиций (цифры см. ниже).


Контроллер кворума: консенсус на основе событий


Если запустить Kafka с новым контроллеров кворума, все задачи по метаданным, которые раньше выполнялись контроллером Kafka и ZooKeeper, ложатся на новый сервис, запущенный прямо в кластере Kafka. Контроллер кворума можно запустить на выделенном оборудовании, если этого требует ваш сценарий.



Внутри происходит много интересного. Контроллеры кворума используют новый протокол KRaft, чтобы точно реплицировать метаданные по всему кворуму. Протокол во многом напоминает ZooKeeper ZAB и Raft, но с важными отличиями например, что логично, он использует архитектуру на основе событий.


Контроллер кворума хранит свой стейт с помощью модели на основе событий, чтобы стейт-машину всегда можно было в точности воссоздать. Чтобы лог события, который хранит стейт (также называется топиком метаданных), не разрастался до бесконечности, периодически он обрезается с созданием снапшотов. Остальные контроллеры в кворуме фолловят активный контроллер и реагируют на события, которые он создает и сохраняет в свой лог. Если одна нода приостановлена во время разделения на партиции, например, она быстро догонит пропущенные события с помощью лога. Так мы существенно сокращаем период простоя и время восстановления системы в худшем сценарии.


image
Консенсус на основе событий


Раз протокол KRaft управляется событиями, в отличие от контроллера ZooKeeper контроллер кворума не загружает стейт из ZooKeeper, когда становится активным. При смене лидера у нового активного контроллера уже есть в памяти все закоммиченные записи метаданных. Более того, для отслеживания метаданных в кластере используется тот же механизм на основе событий, что и в протоколе KRaft. Задача, которую раньше обрабатывали RPC, теперь полагается на события и использует лог для коммуникации. Приятное последствие этих изменений (и изменений, которые были предусмотрены изначальным замыслом) Kafka теперь поддерживает гораздо больше партиций, чем раньше. Обсудим подробнее.


Увеличение масштаба Kafka до миллионов партиций


Максимальное число партиций в кластере Kafka определяется двумя свойствами: лимит партиций для ноды и лимит партиций для кластера. Сейчас на ограничение в кластере во многом влияет управление метаданными. Предыдущие KIP улучшили лимит для ноды, хотя и здесь нет предела совершенству. Масштабируемость в Kafka зависит от добавления нод. Здесь нам важен лимит для кластера, который определяет верхнюю границу масштабирования всей системы.


Новый контроллер кворума может обрабатывать гораздо больше партиций на кластере. Чтобы это проверить, можно провести такие же тесты, как в 2018 году, с помощью которых мы демонстрировали лимиты партиций в Kafka. Эти тесты измеряют время, которое требуется для завершения работы и восстановления. В старом контроллере это операция O (число партиций). Она устанавливает верхнюю границу числа партиций, которые сейчас поддерживает один кластер Kafka.


Предыдущая реализация, как рассказывалось в посте по ссылке, могла достигать 200K партиций. Ограничивающим фактором служило время, которое требовалось для переноса важных метаданных между внешним консенсусом (ZooKeeper) и внутренним управлением лидером (контроллер Kafka). С новым контроллером кворума обе роли выполняет один компонент. Подход на основе событий означает, что контроллер отрабатывает отказ почти моментально. Ниже приводятся цифры нашего тестирования для кластера с 2 млн партиций (в 10 раз больше предыдущего лимита):


image


Контроллер С контроллером ZooKeeper С контроллером кворума
Время контролируемой остановки (2 млн партиций) 135 сек 32 сек
Восстановление после неконтролируемой остановки (2 млн партиций) 503 сек 37 сек

Важно учитывать контролируемую и неконтролируемую остановку. Контролируемые остановки включают типичные рабочие сценарии, вроде rolling restart стандартная процедура развертывания изменений без потери доступности. Восстановление после неконтролируемой остановки важнее, потому что оно задает целевое время восстановления (RTO) системы в случае, скажем, неожиданного сбоя, например, на виртуальной машине или поде, или потери связи с дата-центром. Хотя эти цифры указывают на производительность всей системы, они напрямую определяют хорошо известное узкое место, которое возникает при использовании ZooKeeper.


Измерения для контролируемой и неконтролируемой остановки не сравниваются напрямую, потому что неконтролируемая остановка включает выбор новых лидеров, а контролируемая нет. Это расхождение введено специально, чтобы контролируемая остановка соответствовала изначальным тестам 2018 года.


Упрощение Kafka единый процесс


Kafka часто воспринимается как тяжеловес, и такой репутацией она во многом обязана сложностями с управлением ZooKeeper. Из-за этого для запуска проектов часто выбирают более легкую очередь сообщений, вроде ActiveMQ или RabbitMQ, а Kafka появляется на сцене только при разрастании масштаба.


Это печально, ведь Kafka дает удобную абстракцию на основе лога коммитов, которая одинаково хорошо подходит для маленьких рабочих нагрузок у стартапов и для бешеного трафика у Netflix или Instagram. А если вам нужна стриминговая обработка, без Kafka с абстракцией лога коммитов не обойтись, будь то Kafka Streams, ksqlDB или другой аналогичный фреймворк. Управлять двумя отдельными системами (Kafka и Zookeeper) сложно, поэтому пользователи выбирали между масштабированием и простотой.


Теперь можно получить все и сразу. Благодаря KIP-500 и режиму KRaft мы можем легко начать проект с Kafka или использовать его как альтернативу монолитным брокерам, вроде ActiveMQ или RabbitMQ. Благодаря легкости и единому процессу это подходящий вариант для ограниченных в ресурсах устройств. В облаке все еще проще. Управляемые сервисы, например, Confluent Cloud, все берут на себя. Не важно, собственный это будет кластер или управляемый, мы можем начать с малого, а потом расширять его вместе со своими рабочими нагрузками все в рамках одной инфраструктуры. Давайте посмотрим, как выглядит развертывание с одним процессом.


Испытываем Kafka без ZooKeeper


Новый контроллер кворума пока доступен в экспериментальном режиме, но, скорее всего, будет включен в релиз Apache Kafka 2.8. На что он способен? Самое крутое это, конечно, возможность создавать кластер Kafka с одним процессом, как в демо.


Конечно, если вам нужна очень высокая пропускная способность, и вы хотите добавить репликацию для отказоустойчивости, понадобятся еще процессы. Пока контроллер кворума на базе KRaft доступен только как Early Access, для критических рабочих нагрузок он не годится. В ближайшие месяцы мы будем добавлять недостающие фрагменты, моделировать протокол с помощью TLA+ и внедрять контроллер кворума в Confluent Cloud.


image


Но поэкспериментировать вы можете уже сейчас. См. полный README на GitHub.

Подробнее..

Итоговый проект для видеокурса и подкаст Проблемная Kafka

20.04.2021 08:05:05 | Автор: admin

Гостем подкаста The Art Of Programming стал спикер курса Слёрма по Kafka Александр Миронов, Infrastructure Engineer в Stripe. Тема выпуска Проблемная Kafka. Обсудили вопросы, часто возникающие при работе с Kafka: аудит входных данных, квоты, способы хранения данных, возможный даунтайм в консьюмер-группах и др.


Итоговый проект по Kafka


Релиз видеокурса от Александра Миронова и Анатолия Солдатова состоялся 7 апреля. Кроме заявленных тем мы добавили в курс итоговый проект: студентам нужно будет выполнить практическое задание под звёздочкой с применением знаний, полученных на курсе.


Мы добавили два варианта заданий из области разработки и из области администрирования. В первом варианте понадобится создать приложение для генерации real-time статистики продаж интернет-магазина, а во втором спасти кластер от хаоса.


Подробнее о курсе
Бесплатные материалы курса

Подробнее..

Kafka Streams непростая жизнь в production

14.01.2021 16:09:15 | Автор: admin

Привет, Хабр! Вокруг меня сформировался позитивный информационный фон на тему обработки событий через Kafka Streams. Этот инструмент привлекает множеством видео-докладов и статей на Хабре, подробной документацией, понятным API и красивой архитектурой. Некоторые мои знакомые и коллеги разрабатывают с его помощью свои системы. Но что происходит с в реальной жизни, когда эти системы уходят в production?

В этой статье я опущу введение в Kafka Streams, предполагая, что читатель уже знаком с ней, и расскажу о нашем опыте жизни с этой библиотекой на примере достаточно нагруженной системы.

Коротко о проекте

Внутренней командой вместе с партнерами мы работаем над биржой Ad Exchange, которая помогает перепродавать рекламный трафик. Специфику подобных инструментов мы уже когда-то описывали в статье на Хабре. По мере роста числа партнеров среди SSP и DSP, нагрузка на сервера биржи растет. А для повышения ценности самой биржи мы должны собирать с этого трафика развернутую аналитику. Тут-то мы и попытались применить Kafka Streams.

Внедрить новый инструмент у себя на продакшене - это всегда некоторый риск. Мы его осознавали, но смирились с этим, поскольку в целом Kafka Streams концептуально должен был хорошо подойти для расчета агрегатов. И хотя первое впечатление было отличным, далее я расскажу о проблемах, к которым это привело.

Статья не претендует на объективность и описывает только лишь наш опыт и проблемы, с которыми мы столкнулись, работая с этой технологией. Надеюсь, кому-то она поможет избежать ошибок при использовании Kafka Streams. А может быть даст повод посмотреть по сторонам.

Агрегаты

В нашу систему приходят десятки и сотни тысяч сообщений в секунду. Первое, что нам необходимо было сделать, - агрегации по различным ключам. Например, идёт поток событий "Показ рекламы". Для него надо считать на лету общую стоимость и количество, группируя по различным ключам: стране, партнёру, рекламной площадке и т.д.

Вроде бы стандартный кейс для Kafka Streams: используем функции groupBy и aggregate и получаем нужный результат. Всё работает именно так, причем с отличной скоростью за счёт внутреннего кэша: несколько последовательных изменений по одному и тому же ключу сначала выполняются в кэше и только в определённые моменты отправляются в changelog-топик. Далее Kafka в фоне удаляет устаревшие дублирующиеся ключи через механизм log compaction. Что здесь может пойти не так?

Репартиционирование

Если ваш ключ группировки отличается от ключа, под которым изначально пришло событие, то Kafka Streams создаёт специальный repartition-топик, отправляет в него событие уже под новым ключом, а потом считывает его оттуда и только после этого проводит агрегацию и отправку в changelog-топик. В нашем примере вполне может быть, что событие "Показ рекламы" пришло с ключом в виде UUID. Почему нет? Если вам надо сделать группировку, например по трём другим ключам, то это будет три разных repartition-топика. На каждый топик будет одно дополнительное чтение и одна дополнительная запись в Kafka. Чувствуете, к чему я веду?

Предположим, на входе у вас 100 тысяч показов рекламы в секунду. В нашем примере вы создадите дополнительную нагрузку на брокер сообщений в размере +600 тысяч сообщений в секунду (300 на запись и 300 на чтение). И ведь не только на брокер. Для таких объёмов надо добавлять дополнительные сервера с сервисами Kafka Streams. Можете посчитать, во сколько тысяч долларов обойдется такое решение с учётом цен на железо.

Для читателей, не очень хорошо знакомых с механизмом репартиционирования, я поясню один момент. Это не баг или недоработка Kafka Streams. С учётом её идеологии и архитектуры это единственное возможное поведение - его нельзя просто взять и отключить. Когда у сообщения меняется ключ, этот новый ключ надо равномерно "размазать" по кластеру так, чтобы каждый инстанс Kafka Streams имел собственный набор ключей. Для этого и служат дополнительные запись/чтение в repartition-топик. При этом если инстанс А записал в топик сообщение, то не факт, что он же его и прочитает. Это может сделать инстанс Б, В и т.д. в зависимости от того, кто какую партицию читает. В итоге каждый ключ группировки у вас будет более-менее равномерно распределён по серверам кластера (если вы его хэшируете, конечно).

Запросы к агрегатам

Данные пишут для того, чтобы с ними потом можно было работать. Например делать к ним запросы. И здесь наши возможности оказались серьезно ограничены. Исчерпывающий список того, как мы можем запрашивать данные у Kafka Streams, есть здесь или здесь для оконных агрегатов. Если используется стандартная реализация state-store на основе RocksDB (а скорее всего это так), фактически данные можно получать только по ключам.

Предположу, что первое, что вам хочется сделать с полученными данными, - это фильтрация, сортировка и пагинация. Но здесь таких возможностей нет. Максимум, что вы можете, - это считать всё, что есть, используя метод all(), а потом вручную делать нужные операции. Вам повезет, если ключей не слишком много и данные уместятся в оперативной памяти. Нам не повезло. Пришлось писать дополнительный модуль, который по ночам выгружал данные из RocksDB и отправлял в Postgres.

Ещё надо помнить, что на каждом отдельно взятом сервисе находится только часть ключей. Что если вам необходимо отдавать ваши агрегаты, скажем, по HTTP? Вот кто-то сделал запрос к одному из сервисов Kafka Streams: мол, дай мне такие-то данные. Сервис смотрит у себя локально - данных нет. Или есть, но какая-то часть. Другая часть может быть на другом сервере. Или на всех. Что делать? Документация Kafka Streams говорит, что это наши проблемы.

Стабильность Kafka Streams

У нас бывают проблемы с хостинг провайдером. Иногда выходит из строя оборудование, иногда человеческий фактор, иногда и то, и другое. Если по какой-то причине теряется связь с Kafka, то Kafka Streams переводит все свои потоки в состояние DEAD и ждёт, когда мы проснёмся и сделаем ей рестарт. При этом рядом стоят соседние сервисы, которые работают с той же Kafka через Spring и @KafkaListener. Они восстанавливаются сами, как ни в чём не бывало.

Kafka Streams может умереть и по другой причине: не пойманные исключения в вашем коде. Предположим, вы делаете какой-то внешний вызов внутри потоков Kafka Streams. Пусть это будет обращение к базе данных. Когда база данных падает, у вас два варианта. Первый - вы ловите эту ошибку и продолжаете работать дальше. Но если база будет лежать долго, а работа с ней - критична, вы будете считывать большое количество сообщений из топиков и некорректно их обрабатывать. Второй вариант кажется лучше - совсем не ловить исключение и дать Kafka Streams умереть. Дальше как обычно: ночной подъём, рестарт всех серверов с Kafka Streams. "Из коробки" у вас нет варианта подождать, пока база восстановится, и продолжить работу.

Нам пришлось дописать в каждый сервис Kafka Streams дополнительный модуль, который работает как watchdog: поднимает Kafka Streams, если видит, что она умерла.

Кстати, если вы работаете с Kafka Streams через Spring, то не забудьте переопределить стандартный StreamsBuilderFactoryBean, указав в нём свой CleanupConfig. Иначе будете неприятно удивлены тем, что при каждом рестарте будет удаляться вся локальная база RocksDB. Напомню, что это приведёт к тому, что при каждом рестарте все сервера начнут активно считывать данные из changelog-топика. Поверьте, вам это не нужно.

KStream-KStream Join

Здесь можно было бы обойтись одной фразой: никогда это не используйте. Джоин двух потоков создаёт десятки топиков в Kafka и огромную нагрузку на всю систему. Просто не делайте этого. Ну или хотя бы проверьте все под нагрузкой, прежде чем ставить в production.

Вообще Kafka Streams любит создавать топики под различные свои нужды. Если вы не изучили под лупой документацию и не протестировали, как это работает, то это может стать для вас и ваших DevOps неприятным сюрпризом. Чем больше топиков, тем сложнее их администрировать.

Масштабируемость

Любой человек, знакомый с Kafka, скажет, что масштабируемость - одна из главных её преимуществ: "В чём проблема? Добавляем партиций и радуемся". Всё так. Но не для Kafka Streams.

Если вы используете джоины, то все ваши топики должны быть ко-партиционированы (co-partitioning), что, в числе прочего, означает, что у них должно быть одинаковое количество партиций. Так в чём же проблема?

Представим, что вы сделали несколько топологий: с агрегатами, джоинами, всё как положено. Поставили в production, оно работает, вы радуетесь. Дальше бизнес пошел в гору, нагрузка начала расти, вы добавили серверов и хотите увеличить количество партиций, чтобы загрузить эти новые сервера. Но Kafka Streams наплодил десятки топиков. И у всех надо поднимать количество партиций, иначе следующий рестарт ваших сервисов может стать последним. Если Kafka Streams увидит, что топики не ко-партиционированы, она выдаст ошибку и просто не запустится.

На вопрос, что с этим делать, у меня сегодня ответа нет. Вероятно, можно потушить все рабочие инстансы Kafka Streams, потом поднять число партиций на всех причастных топиках, затем поднять Kafka Streams обратно и молиться. А может быть последовать совету отсюда: Matthias J. Sax пишет, что это нужно делать, создавая новый топик с новым количеством партиций и подключать к нему Kafka Streams с новым application.id. Там же есть совет, что если вы знаете заранее, что нагрузка будет большая, то лучше сделать партиций с запасом.

Заключение

Если у вас в системе нет и не планируется высоких нагрузок, то со всеми этими проблемами вы скорее всего не столкнётесь. Всё будет работать отлично. В противном случае стоит серьёзно задуматься, нужно ли вам это, особенно если планируете использовать следующий уровень абстракции - KSQL.

Автор статьи: Андрей Буров, Максилект.

P.S. Мы публикуем наши статьи на нескольких площадках Рунета. Подписывайтесь на наши страницы в VK, FB, Instagram или Telegram-канал, чтобы узнавать обо всех наших публикациях и других новостях компании Maxilect.

Подробнее..

Перевод Как справиться с более чем двумя миллиардами записей в SQL-базе данных

22.03.2021 18:15:22 | Автор: admin

В рамках набора группы учащихся на курс "Highload Architect" подготовили перевод интересной статьи.

Приглашаем также посетить вебинар на тему
Выбор архитектурного стиля. На этом открытом уроке участники вместе с экспертом рассмотрят различия между микросервисным и монолитным подходами, преимущества и недостатки подходов, обсудят принципы выбора архитектурного стиля.


У одного из наших клиентов возникла проблема с большой, постоянно растущей, таблицей в MySQL с более чем 2 миллиардами записей. Без модернизации инфраструктуры была опасность исчерпания дискового пространства, что потенциально могло сломать все приложение. С такой большой таблицей были и другие проблемы: низкая производительность запросов, плохая схема, и, из-за огромного количества записей, не было простого способа анализировать эти данные. Также нам нужно было решить эти проблемы без простоев в работе приложения.

В этом посте я хотел рассказать о нашем подходе к данной проблеме, но сразу хочу оговориться, что это не универсальное решение: каждый случай индивидуален и требует разных подходов. Но, возможно, вы найдете здесь некоторые полезные идеи для себя.

Спасение в облаках

После оценки нескольких альтернативных решений мы решили отправлять данные в какое-нибудь облачное хранилище. И наш выбор пал на Google Big Query. Мы выбрали его, потому что клиент предпочитал облачные решения от Google, а также данные были структурированными, предназначались для аналитики и нам не требовалась низкая задержка передачи данных (low latency). Поэтому BigQuery, казалась, идеальным решением (см. диаграмму ниже).

После тестов, о которых вы можете прочитать в посте Анджея Людвиковски (Andrzej Ludwikowski), мы убедились, что Big Query достаточно хорошее решение, отвечающее потребностям наших клиентов и легко позволяет использовать аналитические инструменты для анализа данных. Но, как вы, возможно, уже знаете, большое количество запросов в BigQuery может привести к увеличению стоимости, поэтому мы хотели избежать запросов в BigQuery напрямую из приложения и использовать его только для аналитики и как что-то вроде резервной копии.

https://cloud.google.com/solutions/infrastructure-options-for-data-pipelines-in-advertising#storing_data

Передача данных в облако

Для передачи потока данных есть много разных способов, но наш выбор был очень прост. Мы использовали Apache Kafka просто потому, что она уже широко использовалась в проекте и не было смысла внедрять другое решение. Использование Kafka дало нам еще одно преимущество мы могли передавать все данные в Kafka и хранить их там в течение необходимого времени, а затем использовать для миграции в выбранное решение, которое справилось бы со всеми проблемами без большой нагрузки на MySQL. С таким подходом мы подготовили себе запасной вариант в случае проблем с BigQuery, например, слишком высокой стоимости или сложностей и с выполнением необходимых запросов. Как вы увидите ниже, это было важное решение, которое дало нам много преимуществ без каких-то серьезных накладных расходов.

Потоковая передача из MySQL

Итак, когда речь заходит о передаче потока данных из MySQL в Kafka, вы, вероятно, думаете о Debezium или Kafka Connect. Оба решения отличный выбор, но в нашем случае не было возможности их использовать. Версия сервера MySQL была настолько старой, что Debezium ее не поддерживал, а обновление MySQL было невозможным. Мы также не могли использовать Kafka Connect из-за отсутствия автоинкрементного столбца в таблице, который мог бы использоваться коннектором для запроса новых записей без потери каких-либо из них. Мы знали, что можно использовать timestamp-столбцы, но при этом подходе могли быть потери строк из-за того, что запрос использовал более низкую точность timestamp, чем указано в определении столбца.

Конечно, оба решения хороши, и если нет никаких препятствий для их использования, то я могу рекомендовать их для передачи данных из вашей базы данных в Kafka. В нашем случае нам нужно было разработать простого Kafka Producer, который запрашивал данные без потери каких-либо записей и передавал их в Kafka. И Kafka Consumer, отправляющего данные в BigQuery, как показано на диаграмме ниже.

Отправка данных в BigQueryОтправка данных в BigQuery

Секционирование как способ экономии места

Итак, мы отправили все данные в Kafka (сжимая их для уменьшения полезной нагрузки), а затем в BigQuery. Это помогло нам решить проблемы с производительностью запросов и быстро анализировать большой объем данных. Но осталась проблема с доступным местом. Мы хотели найти решение с заделом на будущее, которое справилось бы с проблемой сейчас и могло быть легко использовано в будущем. Мы начали с разработки новой таблицы. Мы использовали serial id в качестве первичного ключа и секционирование по месяцам. Секционирование этой большой таблицы дало нам возможность создавать резервные копии старых секций и усекать (truncate) / удалять (drop) их, чтобы освободить место, когда секция больше не нужна. Итак, мы создали новую таблицу с новой схемой и использовали данные из Kafka для ее заполнения. После переноса всех записей мы развернули новую версию приложения, которая для INSERT использовала новую таблицу с секционированием и удалили старую, чтобы освободить место. Конечно, вам понадобится достаточно свободного места для переноса старых данных в новую таблицу, но в нашем случае во время миграции мы постоянно делали резервные копии и удаляли старые разделы, чтобы быть уверенными, что у нас хватит места для новых данных.

Передача данных в секционированную таблицуПередача данных в секционированную таблицу

Сжатие данных как еще один способ освободить пространство

Как я уже упоминал, после передачи данных в BigQuery мы могли легко анализировать их, и это дало нам возможность проверить несколько новых идей, которые могли бы позволить нам уменьшить пространство, занимаемое таблицей в базе данных.

Одна из идей была посмотреть, как различные данные распределены по таблице. После нескольких запросов выяснилось, что почти 90% данных никому не нужны. Поэтому мы решили их сжать, написав Kafka Consumer, который отфильтровал бы ненужные записи и вставлял только нужные в еще одну таблицу. Назовем ее сжатой таблицей (compacted table), что показано на приведенной ниже диаграмме.

После сжатия (строки со значением "A" и "B" в колонке type были отфильтрованы во время миграции).

Передача данных в compacted-таблицуПередача данных в compacted-таблицу

После этого мы обновили наше приложение и теперь выполняли чтение из новой таблицы (compacted table), а запись делали в секционированную таблицу (partitioned table), из которой мы непрерывно передавали данные с помощью Kafka в сжатую таблицу (compacted table).

Итак, как видите, мы устранили проблемы, с которыми столкнулся наш клиент. Благодаря секционированию была устранена проблема нехватки места. Сжатие и правильное проектирование индексов решили некоторые проблемы с производительностью запросов из приложения, и, наконец, передача всех данных в облако дала нашему клиенту возможность легко анализировать все данные.

Так как мы используем BigQuery только для аналитических запросов, а остальные запросы, отправляемые пользователями через приложение, по-прежнему выполняются в MySQL, то затраты оказались не такие и большие, как можно было бы ожидать. Еще одна важная деталь все было выполнено без простоев, ни один клиент не пострадал.

Резюме

Итак, подведем итоги. Мы начали с использования Kafka в качестве инструмента для потоковой передачи данных в BigQuery. Но так как все данные были в Kafka, это дало нам возможность легко решить другие проблемы, которые были важны для нашего клиента.


Узнать подробнее о курсе "Highload Architect".

Смотреть вебинар на тему Выбор архитектурного стиля.

Подробнее..

Тестирование в Apache Spark Structured Streaming

02.01.2021 20:04:09 | Автор: admin

Введение


На текущий момент не так много примеров тестов для приложений на основе Spark Structured Streaming. Поэтому в данной статье приводятся базовые примеры тестов с подробным описанием.


Все примеры используют: Apache Spark 3.0.1.


Подготовка


Необходимо установить:


  • Apache Spark 3.0.x
  • Python 3.7 и виртуальное окружение для него
  • Conda 4.y
  • scikit-learn 0.22.z
  • Maven 3.v
  • В примерах для Scala используется версия 2.12.10.

  1. Загрузить Apache Spark
  2. Распаковать: tar -xvzf ./spark-3.0.1-bin-hadoop2.7.tgz
  3. Создать окружение, к примеру, с помощью conda: conda create -n sp python=3.7

Необходимо настроить переменные среды. Здесь приведен пример для локального запуска.


SPARK_HOME=/Users/$USER/Documents/spark/spark-3.0.1-bin-hadoop2.7PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip;

Тесты


Пример с scikit-learn


При написании тестов необходимо разделять код таким образом, чтобы можно было изолировать логику и реальное применение конечного API. Хороший пример изоляции: DataFrame-pandas, DataFrame-spark.


Для написания тестов будет использоваться следующий пример: LinearRegression.


Итак, пусть код для тестирования использует следующий "шаблон" для Python:


class XService:    def __init__(self):        # Инициализация    def train(self, ds):        # Обучение    def predict(self, ds):        # Предсказание и вывод результатов

Для Scala шаблон выглядит соответственно.


Полный пример:


from sklearn import linear_modelclass LocalService:    def __init__(self):        self.model = linear_model.LinearRegression()    def train(self, ds):        X, y = ds        self.model.fit(X, y)    def predict(self, ds):        r = self.model.predict(ds)        print(r)

Тест.


Импорт:


import unittestimport numpy as np

Основной класс:


class RunTest(unittest.TestCase):

Запуск тестов:


if __name__ == "__main__":    unittest.main()

Подготовка данных:


X = np.array([    [1, 1],  # 6    [1, 2],  # 8    [2, 2],  # 9    [2, 3]  # 11])y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3

Создание модели и обучение:


service = local_service.LocalService()service.train((X, y))

Получение результатов:


service.predict(np.array([[3, 5]]))service.predict(np.array([[4, 6]]))

Ответ:


[16.][19.]

Все вместе:


import unittestimport numpy as npfrom spark_streaming_pp import local_serviceclass RunTest(unittest.TestCase):    def test_run(self):        # Prepare data.        X = np.array([            [1, 1],  # 6            [1, 2],  # 8            [2, 2],  # 9            [2, 3]  # 11        ])        y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3        # Create model and train.        service = local_service.LocalService()        service.train((X, y))        # Predict and results.        service.predict(np.array([[3, 5]]))        service.predict(np.array([[4, 6]]))        # [16.]        # [19.]if __name__ == "__main__":    unittest.main()

Пример с Spark и Python


Будет использован аналогичный алгоритм LinearRegression. Нужно отметить, что Structured Streaming основан на тех же DataFrame-х, которые используются и в Spark Sql. Но как обычно есть нюансы.


Инициализация:


self.service = LinearRegression(maxIter=10, regParam=0.01)self.model = None

Обучение:


self.model = self.service.fit(ds)

Получение результатов:


transformed_ds = self.model.transform(ds)q = transformed_ds.select("label", "prediction").writeStream.format("console").start()return q

Все вместе:


from pyspark.ml.regression import LinearRegressionclass StructuredStreamingService:    def __init__(self):        self.service = LinearRegression(maxIter=10, regParam=0.01)        self.model = None    def train(self, ds):        self.model = self.service.fit(ds)    def predict(self, ds):        transformed_ds = self.model.transform(ds)        q = transformed_ds.select("label", "prediction").writeStream.format("console").start()        return q

Сам тест.


Обычно в тестах можно использовать данные, которые создаются прямо в тестах.


train_ds = spark.createDataFrame([    (6.0, Vectors.dense([1.0, 1.0])),    (8.0, Vectors.dense([1.0, 2.0])),    (9.0, Vectors.dense([2.0, 2.0])),    (11.0, Vectors.dense([2.0, 3.0]))],    ["label", "features"])

Это очень удобно и код получается компактным.


Но подобный код, к сожалению, не будет работать в Structured Streaming, т.к. созданный DataFrame не будет обладать нужными свойствами, хотя и будет соответствовать контракту DataFrame.
На текущий момент для создания источников для тестов можно использовать такой же подход, что и в тестах для Spark.


def test_stream_read_options_overwrite(self):    bad_schema = StructType([StructField("test", IntegerType(), False)])    schema = StructType([StructField("data", StringType(), False)])    df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \        .schema(bad_schema)\        .load(path='python/test_support/sql/streaming', schema=schema, format='text')    self.assertTrue(df.isStreaming)    self.assertEqual(df.schema.simpleString(), "struct<data:string>")

И так.


Создается контекст для работы:


spark = SparkSession.builder.enableHiveSupport().getOrCreate()spark.sparkContext.setLogLevel("ERROR")

Подготовка данных для обучения (можно сделать обычным способом):


train_ds = spark.createDataFrame([    (6.0, Vectors.dense([1.0, 1.0])),    (8.0, Vectors.dense([1.0, 2.0])),    (9.0, Vectors.dense([2.0, 2.0])),    (11.0, Vectors.dense([2.0, 3.0]))],    ["label", "features"])

Обучение:


service = structure_streaming_service.StructuredStreamingService()service.train(train_ds)

Получение результатов. Для начала считываем данные из файла и выделяем: признаки и идентификатор для объектов. После запускаем предсказание с ожиданием в 3 секунды.


def extract_features(x):    values = x.split(",")    features_ = []    for i in values[1:]:        features_.append(float(i))    features = Vectors.dense(features_)    return featuresextract_features_udf = udf(extract_features, VectorUDT())def extract_label(x):    values = x.split(",")    label = float(values[0])    return labelextract_label_udf = udf(extract_label, FloatType())predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \    .withColumn("features", extract_features_udf(col("value"))) \    .withColumn("label", extract_label_udf(col("value")))service.predict(predict_ds).awaitTermination(3)

Ответ:


15.9669918.96138

Все вместе:


import unittestimport warningsfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, udffrom pyspark.sql.types import FloatTypefrom pyspark.ml.linalg import Vectors, VectorUDTfrom spark_streaming_pp import structure_streaming_serviceclass RunTest(unittest.TestCase):    def test_run(self):        spark = SparkSession.builder.enableHiveSupport().getOrCreate()        spark.sparkContext.setLogLevel("ERROR")        # Prepare data.        train_ds = spark.createDataFrame([            (6.0, Vectors.dense([1.0, 1.0])),            (8.0, Vectors.dense([1.0, 2.0])),            (9.0, Vectors.dense([2.0, 2.0])),            (11.0, Vectors.dense([2.0, 3.0]))        ],            ["label", "features"]        )        # Create model and train.        service = structure_streaming_service.StructuredStreamingService()        service.train(train_ds)        # Predict and results.        def extract_features(x):            values = x.split(",")            features_ = []            for i in values[1:]:                features_.append(float(i))            features = Vectors.dense(features_)            return features        extract_features_udf = udf(extract_features, VectorUDT())        def extract_label(x):            values = x.split(",")            label = float(values[0])            return label        extract_label_udf = udf(extract_label, FloatType())        predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \            .withColumn("features", extract_features_udf(col("value"))) \            .withColumn("label", extract_label_udf(col("value")))        service.predict(predict_ds).awaitTermination(3)        # +-----+------------------+        # |label|        prediction|        # +-----+------------------+        # |  1.0|15.966990887541273|        # |  2.0|18.961384020443553|        # +-----+------------------+    def setUp(self):        warnings.filterwarnings("ignore", category=ResourceWarning)        warnings.filterwarnings("ignore", category=DeprecationWarning)if __name__ == "__main__":    unittest.main()

Нужно отметить, что для Scala можно воспользоваться созданием потока в памяти.
Это может выглядеть вот так:


implicit val sqlCtx = spark.sqlContextimport spark.implicits._val source = MemoryStream[Record]source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))val predictDs = source.toDF()service.predict(predictDs).awaitTermination(2000)

Полный пример на Scala (здесь, для разнообразия, не используется sql):


package aaa.abc.dd.spark_streaming_pr.clusterimport org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}import org.apache.spark.sql.DataFrameimport org.apache.spark.sql.functions.udfimport org.apache.spark.sql.streaming.StreamingQueryclass StructuredStreamingService {  var service: LinearRegression = _  var model: LinearRegressionModel = _  def train(ds: DataFrame): Unit = {    service = new LinearRegression().setMaxIter(10).setRegParam(0.01)    model = service.fit(ds)  }  def predict(ds: DataFrame): StreamingQuery = {    val m = ds.sparkSession.sparkContext.broadcast(model)    def transformFun(features: org.apache.spark.ml.linalg.Vector): Double = {      m.value.predict(features)    }    val transform: org.apache.spark.ml.linalg.Vector => Double = transformFun    val toUpperUdf = udf(transform)    val predictionDs = ds.withColumn("prediction", toUpperUdf(ds("features")))    predictionDs      .writeStream      .foreachBatch((r: DataFrame, i: Long) => {        r.show()        // scalastyle:off println        println(s"$i")        // scalastyle:on println      })      .start()  }}

Тест:


package aaa.abc.dd.spark_streaming_pr.clusterimport org.apache.spark.ml.linalg.Vectorsimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.execution.streaming.MemoryStreamimport org.scalatest.{Matchers, Outcome, fixture}class StructuredStreamingServiceSuite extends fixture.FunSuite with Matchers {  test("run") { spark =>    // Prepare data.    val trainDs = spark.createDataFrame(Seq(      (6.0, Vectors.dense(1.0, 1.0)),      (8.0, Vectors.dense(1.0, 2.0)),      (9.0, Vectors.dense(2.0, 2.0)),      (11.0, Vectors.dense(2.0, 3.0))    )).toDF("label", "features")    // Create model and train.    val service = new StructuredStreamingService()    service.train(trainDs)    // Predict and results.    implicit val sqlCtx = spark.sqlContext    import spark.implicits._    val source = MemoryStream[Record]    source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))    source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))    val predictDs = source.toDF()    service.predict(predictDs).awaitTermination(2000)    // +-----+---------+------------------+    // |label| features|        prediction|    // +-----+---------+------------------+    // |  1.0|[3.0,5.0]|15.966990887541273|    // |  2.0|[4.0,6.0]|18.961384020443553|    // +-----+---------+------------------+  }  override protected def withFixture(test: OneArgTest): Outcome = {    val spark = SparkSession.builder().master("local[2]").getOrCreate()    try withFixture(test.toNoArgTest(spark))    finally spark.stop()  }  override type FixtureParam = SparkSession  case class Record(label: Double, features: org.apache.spark.ml.linalg.Vector)}

Выводы


При написании тестов необходимо разделять код таким образом, чтобы разделять логику и применение конкретных вызовов API. Можно использоваться любые доступные источники. В том числе и kafka.


Такие абстракции как DataFrame позволяют это сделать легко и просто.


При использовании Python данные придется хранить в файлах.


Ссылки и ресурсы


Подробнее..
Категории: Scala , Python , Testing , Apache , Spark , Apache spark , Kafka , Streaming

Перевод Использование микросервисов в работе с Kubernetes и GitOps

10.06.2021 18:12:02 | Автор: admin

Архитектуры микросервисов продолжают развиваться в инженерных организациях, поскольку команды стремятся увеличить скорость разработки. Микросервисы продвигают идею модульности как объекты первого класса в распределенной архитектуре, обеспечивая параллельную разработку и компоненты с независимыми циклами выпуска. Как и при принятии любых технологических решений, необходимо учитывать компромиссы. В случае микросервисов они включают потенциальную потерю централизованных стандартов разработки, а также повышенную сложность эксплуатации.

К счастью, существуют стратегии решения этих проблем. Сначала мы рассмотрим рефакторинг сервиса на основе Kafka Streams с использованием Microservices Framework, который обеспечивает стандарты для тестирования, конфигурации и интеграции. Затем мы используем существующий проект streaming-ops для создания, проверки и продвижения нового сервиса из среды разработки в рабочую среду. Хотя это и не обязательно, но вы если хотите выполнить шаги, описанные в этой заметке, то вам понадобится собственная версия проекта streaming-ops, как описано в документации.

Проблемы микросервисной архитектуры

По мере того как инженерные группы внедряют архитектуры микросервисов, отдельные команды могут начать расходиться в своих технических решениях. Это может привести к различным проблемам:

  • Множественные решения общих потребностей в рамках всей организации нарушают принцип "Не повторяйся".

  • Разработчики, желающие сменить команду или перейти в другую, сталкиваются с необходимостью изучения нескольких технологических стеков и архитектурных решений.

  • Операционные команды, которые проверяют и развертывают несколько приложений, сталкиваются с трудностями, поскольку им приходится учитывать технологические решения каждой команды.

Spring Boot

Чтобы снизить эти риски, разработчики обращаются к микросервисным фреймворкам для стандартизации общих задач разработки, и Spring Boot (расширение фреймворка Spring) является популярным примером одного из таких фреймворков.

Spring Boot предоставляет согласованные решения для общих проблем разработки программного обеспечения, например, конфигурация, управление зависимостями, тестирование, веб-сервисы и другие внешние системные интеграции, такие как Apache Kafka. Давайте рассмотрим пример использования Spring Boot для переписывания существующего микросервиса на основе Kafka Streams.

Сервис заказов

Проект streaming-ops - это среда, похожая на рабочую, в которой работают микросервисы, основанные на существующих примерах Kafka Streams. Мы рефакторизовали один из этих сервисов для использования Spring Boot, а полный исходный код проекта можно найти в репозитории GitHub. Давайте рассмотрим некоторые основные моменты.

Интеграция Kafka

Библиотека Spring for Apache Kafka обеспечивает интеграцию Spring для стандартных клиентов Kafka, Kafka Streams DSL и приложений Processor API. Использование этих библиотек позволяет сосредоточиться на написании логики обработки потоков и оставить конфигурацию и построение зависимых объектов на усмотрение Spring dependency injection (DI) framework. Здесь представлен компонент сервиса заказов Kafka Streams, который агрегирует заказы и хранит их по ключу в хранилище состояний:

@Autowiredpublic void orderTable(final StreamsBuilder builder) {  logger.info("Building orderTable");  builder    .table(this.topic,    Consumed.with(Serdes.String(), orderValueSerde()),    Materialized.as(STATE_STORE))    .toStream()    .peek((k,v) -> logger.info("Table Peek: {}", v));}

Аннотация @Autowired выше предписывает фреймворку Spring DI вызывать эту функцию при запуске, предоставляя инстанс StreamsBuilder, который мы используем для построения нашего DSL-приложения Kafka Streams. Этот метод позволяет нам написать класс с узкой направленностью на бизнес-логику, оставляя детали построения и конфигурирования объектов поддержки Kafka Streams фреймворку.

Конфигурация

Spring предоставляет надежную библиотеку конфигурации, позволяющую использовать различные методы для внешней настройки вашего сервиса. Во время выполнения Spring может объединять значения из файлов свойств, переменных окружения и аргументов программы для конфигурирования приложения по мере необходимости (порядок приоритета доступен в документации).

В примере с сервисом заказов мы решили использовать файлы свойств Spring для конфигурации, связанной с Apache Kafka. Значения конфигурации по умолчанию предоставляются во встроенном ресурсе application.properties, и мы переопределяем их во время выполнения с помощью внешних файлов и функции Profiles в Spring. Здесь вы можете увидеть сниппет ресурсного файла application.properties по умолчанию:

# ################################################ For Kafka, the following values can be# overridden by a 'traditional' Kafka# properties filebootstrap.servers=localhost:9092...# Spring Kafkaspring.kafka.properties.bootstrap.servers=${bootstrap.servers}...

Например, значение spring.kafka.properties.bootstrap.servers обеспечивается значением в bootstrap.servers с использованием синтаксиса плейсхолдер ${var.name} .

Во время выполнения Spring ищет папку config в текущем рабочем каталоге запущенного процесса. Файлы, найденные в этой папке, которые соответствуют шаблону application-<profile-name>.properties, будут оценены как активная конфигурация. Активными профилями можно управлять, устанавливая свойство spring.profiles.active в файле, в командной строке или в переменной окружения. В проекте streaming-ops мы разворачиваем набор файлов свойств, соответствующих этому шаблону, и устанавливаем соответствующие активные профили с помощью переменной окружения SPRING_PROFILES_ACTIVE.

Управление зависимостями

В приложении сервиса заказов мы решили использовать Spring Gradle и плагин управления зависимостями Spring. dependency-management plugin впоследствии будет управлять оставшимися прямыми и переходными зависимостями за нас, как показано в файле build.gradle:

plugins {  id 'org.springframework.boot' version '2.3.4.RELEASE'  id 'io.spring.dependency-management' version '1.0.10.RELEASE'  id 'java'}

Следующие библиотеки Spring могут быть объявлены без конкретных номеров версий, поскольку плагин предоставит совместимые версии от нашего имени:

dependencies {  implementation 'org.springframework.boot:spring-boot-starter-web'  implementation 'org.springframework.boot:spring-boot-starter-actuator'  implementation 'org.springframework.boot:spring-boot-starter-webflux'  implementation 'org.apache.kafka:kafka-streams'  implementation 'org.springframework.kafka:spring-kafka'  ...

REST-сервисы

Spring предоставляет REST-сервисы с декларативными аннотациями Java для определения конечных точек HTTP. В сервисе заказов мы используем это для того, чтобы использовать фронтенд API для выполнения запросов в хранилище данных Kafka Streams. Мы также используем асинхронные библиотеки, предоставляемые Spring, например, для неблокирующей обработки HTTP-запросов:

@GetMapping(value = "/orders/{id}", produces = "application/json")public DeferredResult<ResponseEntity> getOrder(  @PathVariable String id,  @RequestParam Optional timeout) {     final DeferredResult<ResponseEntity> httpResult =     new DeferredResult<>(timeout.orElse(5000L));...

Смотрите полный код в файле OrdersServiceController.java.

Тестирование

Блог Confluent содержит много полезных статей, подробно описывающих тестирование Spring для Apache Kafka (например, смотрите Advanced Testing Techniques for Spring for Apache Kafka). Здесь мы кратко покажем, как легко можно настроить тест с помощью Java-аннотаций, которые будут загружать Spring DI, а также встроенный Kafka для тестирования клиентов Kafka, включая Kafka Streams и использование AdminClient:

@RunWith(SpringRunner.class)@SpringBootTest@EmbeddedKafka@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)public class OrderProducerTests {...

С помощью этих полезных аннотаций и фреймворка Spring DI создание тестового класса, использующего Kafka, может быть очень простым:

@Autowiredprivate OrderProducer producer;...@Testpublic void testSend() throws Exception {  ...  List producedOrders = List.of(o1, o2);  producedOrders.forEach(producer::produceOrder);  ...

Смотрите полный файл OrderProducerTests.java для наглядного примера.

Проверка в dev

Код сервиса заказов содержит набор интеграционных тестов, которые мы используем для проверки поведения программы; репозиторий содержит задания CI, которые вызываются при появлении PR или переносе в основную ветвь. Убедившись, что приложение ведет себя так, как ожидается, мы развернем его в среде dev для сборки, тестирования и дальнейшего подтверждения поведения кода.

Проект streaming-ops запускает свои рабочие нагрузки микросервисов на Kubernetes и использует подход GitOps для управления операционными проблемами. Чтобы установить наш новый сервис в среде dev, мы изменим развернутую версию в dev, добавив переопределение Kustomize в сервис заказов Deployment, и отправим PR на проверку.

Когда этот PR будет объединен, запустится процесс GitOps, модифицируя объявленную версию контейнера службы заказов. После этого контроллеры Kubernetes развертывают новую версию, создавая заменяющие Поды и завершая работу предыдущих версий.

После завершения развертывания мы можем провести валидацию новой службы заказов, проверив, правильно ли она принимает REST-звонки, и изучив ее журналы. Чтобы проверить конечную точку REST, мы можем открыть приглашение внутри кластера Kubernetes с помощью хелпер-команды в предоставленном Makefile, а затем использовать curl для проверки конечной точки HTTP:

$ make promptbash-5.0# curl -XGET http://orders-servicecurl: (7) Failed to connect to orders-service port 80: Connection refused

Наша конечная точка HTTP недостижима, поэтому давайте проверим журналы:

kubectl logs deployments/orders-service | grep ERROR2020-11-22 20:56:30.243 ERROR 21 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread     : stream-thread [order-table-4cca220a-53cb-4bd5-8c34-d00a5aa77e63-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:           org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: order-table

Эти ошибки, скорее всего, ортогональны и поэтому потребуют независимых исправлений. Не имеет значения, как они будут устранены, необходимо быстро вернуть нашу систему в работоспособное состояние. GitOps предоставляет хороший путь для ускорения этого процесса путем отмены предыдущего коммита. Мы используем функцию возврата GitHub PR, чтобы организовать последующий PR, который отменяет изменения.

Как только PR будет объединен, процесс GitOps применит отмененные изменения, возвращая систему в предыдущее функциональное состояние. Для лучшей поддержки этой возможности целесообразно сохранять изменения небольшими и инкрементными. Среда dev полезна для отработки процедур отката.

Мы выявили две проблемы в новом сервисе, которые вызвали эти ошибки. Обе они связаны со значениями конфигурации по умолчанию в этом сервисе, которые отличаются от первоначальных.

  1. HTTP-порт по умолчанию был другим, из-за чего служба Kubernetes не могла правильно направить трафик сервису заказов.

  2. Идентификатор приложения Kafka Streams по умолчанию отличался от настроенного списка контроля доступа (ACL) в Confluent Cloud, что лишало наш новый сервис заказов доступа к кластеру Kafka.

Мы решили отправить новый PR, исправляющий значения по умолчанию в приложении. Изменения содержатся в конфигурационных файлах, расположенных в развернутых ресурсах Java Archive (JAR).

В файле application.yaml мы изменяем порт HTTP-сервиса по умолчанию:

Server:  Port: 18894

А в файле application.properties (который содержит соответствующие конфигурации Spring для Apache Kafka) мы модифицируем ID приложения Kafka Streams на значение, заданное декларациями Confluent Cloud ACL:

spring.kafka.streams.application-id=OrdersService

Когда новый PR будет отправлен, процесс CI/CD на основе GitHub Actions запустит тесты. После слияния PR другой Action опубликует новую версию Docker-образа службы заказов.

Еще один PR с новой версией службы заказов позволит нам развернуть новый образ с правильными настройками по умолчанию обратно в среду dev и повторно провести валидацию. На этот раз после развертывания мы сможем взаимодействовать с новым сервисом заказов, как и ожидалось.

$ make promptbash-5.0# curl http://orders-service/actuator/health{"status":"UP","groups":["liveness","readiness"]}bash-5.0# curl -XGET http://orders-service/v1/orders/284298{"id":"284298","customerId":0,"state":"FAILED","product":"JUMPERS","quantity":1,"price":1.0}

Наконец, с нашего устройства разработки мы можем использовать Confluent Cloud CLI для потоковой передачи заказов из темы orders в формате Avro (см. документацию Confluent Cloud CLI для инструкций по настройке и использованию CLI).

 ccloud kafka topic consume orders --value-format avroStarting Kafka Consumer. ^C or ^D to exit{"quantity":1,"price":1,"id":"284320","customerId":5,"state":"CREATED","product":"UNDERPANTS"}{"id":"284320","customerId":1,"state":"FAILED","product":"STOCKINGS","quantity":1,"price":1}{"id":"284320","customerId":1,"state":"FAILED","product":"STOCKINGS","quantity":1,"price":1}^CStopping Consumer.

Продвижение в prd

Имея на руках наш новый отрефакторенный и валидированный сервис заказов, мы хотим завершить работу, продвинув его в продакшн. С нашим инструментарием GitOps это простой процесс. Давайте посмотрим, как это сделать.

Сначала оценим хелпер-команду, которую можно запустить для проверки разницы в объявленных версиях сервиса заказов в каждой среде. С устройства разработчика в репозитории проекта мы можем использовать Kustomize для сборки и оценки окончательно материализованных манифестов Kubernetes, а затем поиска в них визуальной информации о сервисе заказов. Наш проект streaming-ops предоставляет полезные команды Makefile для облегчения этой задачи:

 make test-prd test-dev >/dev/null; diff .test/dev.yaml .test/prd.yaml | grep "orders-service"< image: cnfldemos/orders-service:sha-82165db > image: cnfldemos/orders-service:sha-93c0516

Здесь мы видим, что версии тегов образов Docker отличаются в средах dev и prd. Мы сохраним финальный PR, который приведет среду prd в соответствие с текущей версией dev. Для этого мы модифицируем тег изображения, объявленный в базовом определении для службы заказов, и оставим на месте переопределение dev. В данном случае оставление dev-переопределения не оказывает существенного влияния на развернутую версию службы заказов, но облегчит будущие развертывания на dev. Этот PR развернет новую версию на prd:

Перед слиянием мы можем повторно выполнить наши тестовые команды, чтобы убедиться, что в развернутых версиях службы заказов не будет различий, о чем свидетельствует отсутствие вывода команд diff и grep:

 make test-prd test-dev >/dev/null; diff .test/dev.yaml .test/prd.yaml | grep "orders-service"

Этот PR был объединен, и контроллер FluxCD в среде prd развернул нужную версию. Используя jq и kubectl с флагом --context, мы можем легко сравнить развертывание сервиса заказов на кластерах dev и prd:

 kubectl --context= get deployments/orders-service -o json | jq -r '.spec.template.spec.containers | .[].image'cnfldemos/orders-service:sha-82165db kubectl --context= get deployments/orders-service -o json | jq -r '.spec.template.spec.containers | .[].image'cnfldemos/orders-service:sha-82165db

Мы можем использовать curl внутри кластера, чтобы проверить, что развертывание работает правильно. Сначала установите контекст kubectl на ваш рабочий кластер:

 kubectl config use-context <your-prd-k8s-context>Switched to context "kafka-devops-prd".

Хелпер-команда подсказки в репозитории кода помогает нам создать терминал в кластере prd, который мы можем использовать для взаимодействия с REST-сервисом службы заказов:

 make promptLaunching-util-pod-------------------------------- kubectl run --tty -i --rm util --image=cnfldemos/util:0.0.5 --restart=Never --serviceaccount=in-cluster-sa --namespace=defaultIf you don't see a command prompt, try pressing enter.bash-5.0#

Внутри кластера мы можем проверить работоспособность (здоровье - health) службы заказов:

bash-5.0# curl -XGET http://orders-service/actuator/health{"status":"UP","groups":["liveness","readiness"]}bash-5.0# exit

Наконец, мы можем убедиться, что заказы обрабатываются правильно, оценив журналы из orders-and-payments-simulator:

 kubectl logs deployments/orders-and-payments-simulator | tail -n 5Getting order from: http://orders-service/v1/orders/376087   .... Posted order 376087 equals returned order: OrderBean{id='376087', customerId=2, state=CREATED, product=STOCKINGS, quantity=1, price=1.0}Posting order to: http://orders-service/v1/orders/   .... Response: 201Getting order from: http://orders-service/v1/orders/376088   .... Posted order 376088 equals returned order: OrderBean{id='376088', customerId=5, state=CREATED, product=STOCKINGS, quantity=1, price=1.0}Posting order to: http://orders-service/v1/orders/   .... Response: 201Getting order from: http://orders-service/v1/orders/376089   .... Posted order 376089 equals returned order: OrderBean{id='376089', customerId=1, state=CREATED, product=JUMPERS, quantity=1, price=1.0}

Симулятор заказов и платежей взаимодействует с конечной точкой REST сервиса заказов, публикуя новые заказы и получая их обратно от конечной точки /v1/validated. Здесь мы видим код 201 ответа в журнале, означающий, что симулятор и сервис заказов взаимодействуют правильно, и сервис заказов правильно считывает заказы из хранилища состояния Kafka Streams.

Резюме

Успешное внедрение микросервисов требует тщательной координации в вашей инженерной организации. В этом посте вы увидели, как микросервисные фреймворки полезны для стандартизации практики разработки в ваших проектах. С помощью GitOps вы можете уменьшить сложность развертывания и расширить возможности таких важных функций, как откат. Если у вас есть идеи относительно областей, связанных с DevOps, о которых вы хотите узнать от нас, пожалуйста, не стесняйтесь задать вопрос в проекте, или, что еще лучше - PRs открыты для этого!

Все коды на изображениях для копирования доступны здесь.


Перевод материала подготовлен в рамках курса Microservice Architecture. Всех желающих приглашаем на открытый урок Атрибуты качества, тактики и паттерны. На этом вебинаре рассмотрим, что такое качественная архитектура, основные атрибуты качества и тактики работы с ними.

Подробнее..

Анонс, предзаказ и бесплатные уроки видеокурса по Apache Kafka

24.12.2020 18:19:54 | Автор: admin


Открываем предзаказ продвинутого курса по Apache Kafka.


Видеокурс о том, как настроить и оптимизировать Apache Kafka брокер сообщений для микросервисов. Вы последовательно узнаете, откуда взялась технология, как настраивать распределенный отказоустойчивый кластер, как отслеживать метрики, работать с балансировкой.


Курс подойдет опытным инженерам.


Программа


Базовая часть (будет доступна бесплатно в личном кабинете Слёрма)


Введение


  • Немного истории: почему лог и стриминг важны, и занимают значительное место в бизнес процессах компаний;
  • Fun fact: почему "кафка"? А не почему, просто хорошо звучит.

Базовые основы технологии


  • Сравнения с подобными технологиями;
  • Базовые примитивы: брокеры, топики, партиции, оффсеты, retention.

Установка, запись и чтение создание и конфигурация топиков


  • Первая запись и первое чтение;
  • Базовые основы Apache Zookeeper.

Продвинутая часть


Тема 1: Работа с распределенным кластером


  • Конфигурация кластера,
  • Отказоустойчивость,
  • Контроллер,
  • Репликация между кластерами в разных ЦОД,
  • Примеры архитектуры.

Тема 2: Клиентские библиотеки


  • Producer,
  • Consumer,
  • Как (не) потерять данные: консистентность против доступности.

Тема 3: Мониторинг


  • Ключевые метрики и алерты,
  • SLI & SLO.

Тема 4: Анализ производительности


  • Почему Кафка такая быстрая?
  • Бенчмаркинг.

Тема 5: Поддержка работоспособности кластера и траблшутинг


  • Балансировка нагрузки: partition assignment & repartitioning,
  • Обновление версии кластера и клиентов,
  • Чтение логов,
  • Продвинутый функционал траблшутинга,
  • Примеры сбоев из жизни.

Тема 6: Развертывание кластера в проде


  • Рекомендуемая конфигурация и архитектура,
  • Практики и примеры из жизни.

Авторы курса:


  • Анатолий Солдатов, Lead Engineer в Avito;
  • Александр Миронов, Infrastructure Engineer в Stripe.

Релиз курса в апреле 2021 года. Сейчас собираем обратную связь по программе, готовим материалы и уже записываем бесплатную базовую часть курса. Оставить заявку на весь курс или бесплатные вводные уроки можно здесь.

Подробнее..

Перевод Практический взгляд на хранение в Kafka

29.12.2020 06:05:42 | Автор: admin


Kafka повсюду. Где есть микросервисы и распределенные вычисления, а они сейчас популярны, там почти наверняка есть и Kafka. В статье я попытаюсь объяснить, как в Kafka работает механизм хранения.


Я, конечно, постарался не усложнять, но копать будем глубоко, поэтому какое-то базовое представление о Kafka не помешает. Иначе не все будет понятно. В общем, продолжайте читать на свой страх и риск.


Обычно считается, что Kafka это распределенная и реплицированная очередь сообщений. С технической точки зрения все верно, но термин очередь сообщений не все понимают одинаково. Я предпочитаю другое определение: распределенный и реплицированный журнал коммитов. Эта формулировка кажется более точной, ведь мы все прекрасно знаем, как журналы записываются на диск. Просто в этом случае на диск попадают сообщения, отправленные в Kafka.



Применительно к хранению в Kafka используется два термина: партиции и топики. Партиции это единицы хранения сообщений, а топики что-то вроде контейнеров, в которых эти партиции находятся.


С основной теорией мы определились, давайте перейдем к практике.


Я создам в Kafka топик с тремя партициями. Если хотите повторять за мной, вот как выглядит команда для локальной настройки Kafka в Windows.


kafka-topics.bat --create --topic freblogg --partitions 3 --replication-factor 1 --zookeeper localhost:2181

В каталоге журналов Kafka создано три каталога:


$ tree freblogg*freblogg-0|-- 00000000000000000000.index|-- 00000000000000000000.log|-- 00000000000000000000.timeindex`-- leader-epoch-checkpointfreblogg-1|-- 00000000000000000000.index|-- 00000000000000000000.log|-- 00000000000000000000.timeindex`-- leader-epoch-checkpointfreblogg-2|-- 00000000000000000000.index|-- 00000000000000000000.log|-- 00000000000000000000.timeindex`-- leader-epoch-checkpoint

Мы создали в топике три партиции, и у каждой свой каталог в файловой системе. Еще тут есть несколько файлов (index, log и т д.), но о них чуть позже.


Обратите внимание, что в Kafka топик это логическое объединение, а партиция фактическая единица хранения. То, что физически хранится на диске. Как устроены партиции?


Партиции


В теории партиция это неизменяемая коллекция (или последовательность) сообщений. Мы можем добавлять сообщения в партицию, но не можем удалять. И под мы я подразумеваю продюсеров в Kafka. Продюсер не может удалять сообщения из топика.


Сейчас мы отправим в топик пару сообщений, но сначала обратите внимание на размер файлов в папках партиций.


$ ls -lh freblogg-0total 20M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121   0 Aug  5 08:26 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121   0 Aug  5 08:26 leader-epoch-checkpoint

Как видите, файлы index вместе весят 20 МБ, а файл log совершенно пустой. В папках freblogg-1 и freblogg-2 то же самое.
Давайте отправим сообщения через console producer и посмотрим, что будет:


kafka-console-producer.bat --topic freblogg --broker-list localhost:9092

Я отправил два сообщения сначала ввел стандартное Hello World, а потом нажал на Enter, и это второе сообщение. Еще раз посмотрим на размеры файлов:


$ ls -lh freblogg*freblogg-0:total 20M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121   0 Aug  5 08:26 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121   0 Aug  5 08:26 leader-epoch-checkpointfreblogg-1:total 21M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121  68 Aug  5 10:15 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121  11 Aug  5 10:15 leader-epoch-checkpointfreblogg-2:total 21M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121  79 Aug  5 09:59 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121  11 Aug  5 09:59 leader-epoch-checkpoint

Два сообщения заняли две партиции, и файлы log в них теперь имеют размер. Это потому, что сообщения в партиции хранятся в файле xxxx.log. Давайте заглянем в файл log и убедимся, что сообщение и правда там.


$ cat freblogg-2/*.log@^@^B^@^K^X^@^@^@^A"^@^@^A^VHello World^@

Файлы с форматом log не очень удобно читать, но мы все же видим в конце Hello World, то есть файл обновился, когда мы отправили сообщение в топик. Второе сообщение мы отправили в другую партицию.


Обратите внимание, что первое сообщение попало в третью партицию (freblogg-2), а второе во вторую (freblogg-1). Для первого сообщения Kafka выбирает партицию произвольно, а следующие просто распределяет по кругу (round-robin). Если мы отправим третье сообщение, Kafka запишет его во freblogg-0 и дальше будет придерживаться этого порядка. Мы можем и сами выбирать партицию, указав ключ. Kafka хранит все сообщения с одним ключом в одной и той же партиции.


Каждому новому сообщению в партиции присваивается Id на 1 больше предыдущего. Этот Id еще называют смещением (offset). У первого сообщения смещение 0, у второго 1 и т. д., каждое следующее всегда на 1 больше предыдущего.



<Небольшое отступление>


Давайте используем инструмент Kafka, чтобы понять, что это за странные символы в файле log. Нам они кажутся бессмысленными, но для Kafka это метаданные каждого сообщения в очереди. Выполним команду:


kafka-run-class.bat kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files logs\freblogg-2\00000000000000000000.log

Получим результат:


umping logs\freblogg-2\00000000000000000000.logStarting offset: 0offset: 0 position: 0 CreateTime: 1533443377944 isvalid: true keysize: -1 valuesize: 11 producerId: -1 headerKeys: [] payload: Hello Worldoffset: 1 position: 79 CreateTime: 1533462689974 isvalid: true keysize: -1 valuesize: 6 producerId: -1 headerKeys: [] payload: amazon

(Я удалил из выходных данных кое-что лишнее.)


Здесь мы видим смещение, время создания, размер ключа и значения, а еще само сообщение (payload).


</Небольшое отступление>


Надо понимать, что партиция привязана к брокеру. Если у нас, допустим, три брокера, а папка freblogg-0 существует в broker-1, в других брокерах ее не будет. У одного топика могут быть партиции в нескольких брокерах, но одна партиция всегда существует в одном брокере Kafka (если установлен коэффициент репликации по умолчанию 1, но об этом чуть позже).



Сегменты


Что это за файлы index и log в каталоге партиции? Партиция, может, и единица хранения в Kafka, но не минимальная. Каждая партиция разделена на сегменты, то есть коллекции сообщений. Kafka не хранит все сообщения партиции в одном файле (как в файле лога), а разделяет их на сегменты. Это дает несколько преимуществ. (Разделяй и властвуй, как говорится.)


Главное это упрощает стирание данных. Я уже говорил, что сами мы не можем удалить сообщение из партиции, но Kafka может это сделать на основе политики хранения для топика. Удалить сегмент проще, чем часть файла, особенно когда продюсер отправляет в него данные.


$ ls -lh freblogg-0total 20M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121   0 Aug  5 08:26 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121   0 Aug  5 08:26 leader-epoch-checkpoint

Нули (00000000000000000000) в файлах log и index в каждой папке партиции это имя сегмента. У файла сегмента есть файлы segment.log, segment.index и segment.timeindex.


Kafka всегда записывает сообщения в файлы сегментов в рамках партиции, причем у нас всегда есть активный сегмент для записи. Когда Kafka достигает лимита по размеру сегмента, создается новый файл сегмента, который станет активным.



В имени каждого файла сегмента отражается смещение от первого сообщения. На картинке выше в сегменте 0 содержатся сообщения со смещением от 0 до 2, в сегменте 3 от 3 до 5, и так далее. Последний сегмент, шестой, сейчас активен.


$ ls -lh freblogg*freblogg-0:total 20M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121   0 Aug  5 08:26 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121   0 Aug  5 08:26 leader-epoch-checkpointfreblogg-1:total 21M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121  68 Aug  5 10:15 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121  11 Aug  5 10:15 leader-epoch-checkpointfreblogg-2:total 21M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121  79 Aug  5 09:59 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121  11 Aug  5 09:59 leader-epoch-checkpoint

У нас всего по одному сегменту в каждой партиции, поэтому они называются 00000000000000000000. Раз других файлов сегментов нет, сегмент 00000000000000000000 и будет активным.


По умолчанию сегменту выдается целый гигабайт, но представим, что мы изменили параметры, и теперь в каждый сегмент помещается только три сообщения. Посмотрим, что получится.


Допустим, мы отправили в партицию freblogg-2 три сообщения, и она выглядит так:



Три сообщения это наш лимит. На следующем сообщении Kafka автоматически закроет текущий сегмент, создаст новый, сделает его активным и сохранит новое сообщение в файле log этого сегмента. (Я не показываю предыдущие нули, чтобы было проще воспринять).


freblogg-2|-- 00.index|-- 00.log|-- 00.timeindex|-- 03.index|-- 03.log|-- 03.timeindex`--

Удивительное дело, но новый сегмент называется не 01. Мы видим 03.index, 03.log. Почему так?



Kafka называет сегмент по имени минимального смещения в нем. Новое сообщение в партиции имеет смещение 3, поэтому Kafka так и называет новый сегмент. Раз у нас есть сегменты 00 и 03, мы можем быть уверены, что сообщения со смещениями 0, 1 и 2 и правда находятся в сегменте 00. Новые сообщения в партиции freblogg-2 со смещениями 3 ,4 и 5 будут храниться в сегменте 03.


В Kafka мы часто читаем сообщения по определенному смещению. Искать смещение в файле log затратно, особенно если файл разрастается до неприличных размеров (по умолчанию это 1 ГБ). Для этого нам и нужен файл .index. В файле index хранятся смещения и физическое расположение сообщения в файле log.


Файл index для файла log, который я приводил в кратком отступлении, будет выглядеть как-то так:



Если нужно прочитать сообщение со смещением 1, мы ищем его в файле index и видим, что его положение 79. Переходим к положению 79 в файле log и читаем. Это довольно эффективный способ мы быстро находим нужное смещение в уже отсортированном файле index с помощью бинарного поиска.


Параллелизм в партициях


Чтобы гарантировать порядок чтения сообщений из партиции, Kafka дает доступ к партиции только одному консюмеру (из группы консюмеров). Если партиция получает сообщения a, f и k, консюмер читает их в том же порядке: a, f и k. Это важно, ведь порядок потребления сообщений на уровне топика не гарантирован, если у вас несколько партиций.


Если консюмеров будет больше, параллелизм не увеличится. Нужно больше партиций. Чтобы два консюмера параллельно считывали данные из топика, нужно создать две партиции по одной на каждого. Партиции в одном топике могут находиться в разных брокерах, поэтому два консюмера топика могут считывать данные из двух разных брокеров.


Топики


Наконец, переходим к топикам. Мы уже кое-что знаем о них. Главное, что нужно знать: топик это просто логическое объединение нескольких партиций.


Топик может быть распределен по нескольким брокерам через партиции, но каждая партиция находится только в одном брокере. У каждого топика есть уникальное имя, от которого зависят имена партиций.


Репликация


Как работает репликация? Создавая топик в Kafka, мы указываем для него коэффициент репликации replication-factor. Допустим, у нас два брокера и мы устанавливаем replication-factor 2. Теперь Kafka попытается всегда создавать бэкап, или реплику, для каждой партиции в топике. Kafka распределяет партиции примерно так же, как HDFS распределяет блоки данных по нодам.


Допустим, для топика freblogg мы установили коэффициент репликации 2. Мы получим примерно такое распределение партиций:



Даже если реплицированная партиция находится в другом брокере, Kafka не разрешает ее читать, потому что в каждом наборе партиций есть LEADER, то есть лидер, и FOLLOWERS ведомые, которые остаются в резерве. Ведомые периодически синхронизируются с лидером и ждут своего звездного часа. Когда лидер выйдет из строя, один из in-sync ведомых будет выбран новым лидером, и вы будете получать данные из этой партиции.


Лидер и ведомый одной партиции всегда находятся в разных брокерах. Думаю, не нужно объяснять, почему.


Мы подошли к концу этой длинной статьи. Если вы добрались до этого места поздравляю. Теперь вы знаете почти все о хранении данных в Kafka. Давайте повторим, чтобы ничего не забыть.


Итоги


  • В Kafka данные хранятся в топиках.
  • Топики разделены на партиции.
  • Каждая партиция разделена на сегменты.
  • У каждого сегмента есть файл log, где хранится само сообщение, и файл index, где хранится позиция сообщения в файле log.
  • У одного топика могут быть партиции в разных брокерах, но сама партиция всегда привязана к одному брокеру.
  • Реплицированные партиции существуют пассивно. Вы обращаетесь к ним, только если сломался лидер.

От редакции:
Более подробно от работе с Apache Kafka можно узнать на курсе Слёрма. Курс сейчас в разработке. В программе бесплатные базовые уроки и платная продвинутая часть. Оставьте заявку на курс, чтобы получать уведомления.


Ресурсы:


Схема Kafka https://kafka.apache.org/images/kafka_diagram.png

Подробнее..

Сервисы с Apache Kafka и тестирование

09.01.2021 18:16:01 | Автор: admin

Когда сервисы интегрируются при помощи Kafka очень удобно использовать REST API, как универсальный и стандартный способ обмена сообщениями. При увеличении количества сервисов сложность коммуникаций увеличивается. Для контроля можно и нужно использовать интеграционное тестирование. Такие библиотеки как testcontainers или EmbeddedServer прекрасно помогают организовать такое тестирование. Существуют много примеров для micronaut, Spring Boot и т.д. Но в этих примерах опущены некоторые детали, которые не позволяют с первого раза запустить код. В статье приводятся примеры с подробным описанием и ссылками на код.


Пример


Для простоты можно принять такой REST API.


/runs POST-метод. Инициализирует запрос в канал связи. Принимает данные и возвращает ключ запроса.
/runs/{key}/status GET-метод. По ключу возвращает статус запроса. Может принимать следующие значения: UNKNOWN, RUNNING, DONE.
/runs /{key} GET-метод. По ключу возвращает результат запроса.


Подобный API реализован у livy, хотя и для других задач.


Реализация


Будут использоваться: micronaut, Spring Boot.


micronaut


Контроллер для API.


import io.micronaut.http.annotation.Body;import io.micronaut.http.annotation.Controller;import io.micronaut.http.annotation.Get;import io.micronaut.http.annotation.Post;import io.reactivex.Maybe;import io.reactivex.schedulers.Schedulers;import javax.inject.Inject;import java.util.UUID;@Controller("/runs")public class RunController {    @Inject    RunClient runClient;    @Inject    RunCache runCache;    @Post    public String runs(@Body String body) {        String key = UUID.randomUUID().toString();        runCache.statuses.put(key, RunStatus.RUNNING);        runCache.responses.put(key, "");        runClient.sendRun(key, new Run(key, RunType.REQUEST, "", body));        return key;    }    @Get("/{key}/status")    public Maybe<RunStatus> getRunStatus(String key) {        return Maybe.just(key)                .subscribeOn(Schedulers.io())                .map(it -> runCache.statuses.getOrDefault(it, RunStatus.UNKNOWN));    }    @Get("/{key}")    public Maybe<String> getRunResponse(String key) {        return Maybe.just(key)                .subscribeOn(Schedulers.io())                .map(it -> runCache.responses.getOrDefault(it, ""));    }}

Отправка сообщений в kafka.


import io.micronaut.configuration.kafka.annotation.*;import io.micronaut.messaging.annotation.Body;@KafkaClientpublic interface RunClient {    @Topic("runs")    void sendRun(@KafkaKey String key, @Body Run run);}

Получение сообщений из kafka.


import io.micronaut.configuration.kafka.annotation.*;import io.micronaut.messaging.annotation.Body;import javax.inject.Inject;@KafkaListener(offsetReset = OffsetReset.EARLIEST)public class RunListener {    @Inject    RunCalculator runCalculator;    @Topic("runs")    public void receive(@KafkaKey String key, @Body Run run) {        runCalculator.run(key, run);    }}

Обработка сообщений происходит в RunCalculator. Для тестов используется особая реализация, в которой происходит переброска сообщений.


import io.micronaut.context.annotation.Replaces;import javax.inject.Inject;import javax.inject.Singleton;import java.util.UUID;@Replaces(RunCalculatorImpl.class)@Singletonpublic class RunCalculatorWithWork implements RunCalculator {    @Inject    RunClient runClient;    @Inject    RunCache runCache;    @Override    public void run(String key, Run run) {        if (RunType.REQUEST.equals(run.getType())) {            String runKey = run.getKey();            String newKey = UUID.randomUUID().toString();            String runBody = run.getBody();            runClient.sendRun(newKey, new Run(newKey, RunType.RESPONSE, runKey, runBody + "_calculated"));        } else if (RunType.RESPONSE.equals(run.getType())) {            runCache.statuses.replace(run.getResponseKey(), RunStatus.DONE);            runCache.responses.replace(run.getResponseKey(), run.getBody());        }    }}

Тест.


import io.micronaut.http.HttpRequest;import io.micronaut.http.client.HttpClient;import static org.junit.jupiter.api.Assertions.assertEquals;public abstract class RunBase {    void run(HttpClient client) {        String key = client.toBlocking().retrieve(HttpRequest.POST("/runs", "body"));        RunStatus runStatus = RunStatus.UNKNOWN;        while (runStatus != RunStatus.DONE) {            runStatus = client.toBlocking().retrieve(HttpRequest.GET("/runs/" + key + "/status"), RunStatus.class);            try {                Thread.sleep(500);            } catch (InterruptedException e) {                e.printStackTrace();            }        }        String response = client.toBlocking().retrieve(HttpRequest.GET("/runs/" + key), String.class);        assertEquals("body_calculated", response);    }}

Для использования EmbeddedServer необходимо.


Подключить библиотеки:


testImplementation("org.apache.kafka:kafka-clients:2.6.0:test")testImplementation("org.apache.kafka:kafka_2.12:2.6.0")testImplementation("org.apache.kafka:kafka_2.12:2.6.0:test")

Тест может выглядеть так.


import io.micronaut.context.ApplicationContext;import io.micronaut.http.client.HttpClient;import io.micronaut.runtime.server.EmbeddedServer;import org.junit.jupiter.api.Test;import java.util.HashMap;import java.util.Map;public class RunKeTest extends RunBase {    @Test    void test() {        Map<String, Object> properties = new HashMap<>();        properties.put("kafka.bootstrap.servers", "localhost:9092");        properties.put("kafka.embedded.enabled", "true");        try (EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class, properties)) {            ApplicationContext applicationContext = embeddedServer.getApplicationContext();            HttpClient client = applicationContext.createBean(HttpClient.class, embeddedServer.getURI());            run(client);        }    }}

Для использования testcontainers необходимо.


Подключить библиотеки:


implementation("org.testcontainers:kafka:1.14.3")

Тест может выглядеть так.


import io.micronaut.context.ApplicationContext;import io.micronaut.http.client.HttpClient;import io.micronaut.runtime.server.EmbeddedServer;import org.junit.jupiter.api.Test;import org.testcontainers.containers.KafkaContainer;import org.testcontainers.utility.DockerImageName;import java.util.HashMap;import java.util.Map;public class RunTcTest extends RunBase {    @Test    public void test() {        try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.3"))) {            kafka.start();            Map<String, Object> properties = new HashMap<>();            properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());            try (EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class, properties)) {                ApplicationContext applicationContext = embeddedServer.getApplicationContext();                HttpClient client = applicationContext.createBean(HttpClient.class, embeddedServer.getURI());                run(client);            }        }    }}

Spring Boot


Контроллер для API.


import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;import java.util.UUID;@RestController@RequestMapping("/runs")public class RunController {    @Autowired    private RunClient runClient;    @Autowired    private RunCache runCache;    @PostMapping()    public String runs(@RequestBody String body) {        String key = UUID.randomUUID().toString();        runCache.statuses.put(key, RunStatus.RUNNING);        runCache.responses.put(key, "");        runClient.sendRun(key, new Run(key, RunType.REQUEST, "", body));        return key;    }    @GetMapping("/{key}/status")    public RunStatus getRunStatus(@PathVariable String key) {        return runCache.statuses.getOrDefault(key, RunStatus.UNKNOWN);    }    @GetMapping("/{key}")    public String getRunResponse(@PathVariable String key) {        return runCache.responses.getOrDefault(key, "");    }}

Отправка сообщений в kafka.


import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class RunClient {    @Autowired    private KafkaTemplate<String, String> kafkaTemplate;    @Autowired    private ObjectMapper objectMapper;    public void sendRun(String key, Run run) {        String data = "";        try {            data = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(run);        } catch (JsonProcessingException e) {            e.printStackTrace();        }        kafkaTemplate.send("runs", key, data);    }}

Получение сообщений из kafka.


import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class RunListener {    @Autowired    private ObjectMapper objectMapper;    @Autowired    private RunCalculator runCalculator;    @KafkaListener(topics = "runs", groupId = "m-group")    public void receive(ConsumerRecord<?, ?> consumerRecord) {        String key = consumerRecord.key().toString();        Run run = null;        try {            run = objectMapper.readValue(consumerRecord.value().toString(), Run.class);        } catch (JsonProcessingException e) {            e.printStackTrace();        }        runCalculator.run(key, run);    }}

Обработка сообщений происходит в RunCalculator. Для тестов используется особая реализация, в которой происходит переброска сообщений.


import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.UUID;@Componentpublic class RunCalculatorWithWork implements RunCalculator {    @Autowired    RunClient runClient;    @Autowired    RunCache runCache;    @Override    public void run(String key, Run run) {        if (RunType.REQUEST.equals(run.getType())) {            String runKey = run.getKey();            String newKey = UUID.randomUUID().toString();            String runBody = run.getBody();            runClient.sendRun(newKey, new Run(newKey, RunType.RESPONSE, runKey, runBody + "_calculated"));        } else if (RunType.RESPONSE.equals(run.getType())) {            runCache.statuses.replace(run.getResponseKey(), RunStatus.DONE);            runCache.responses.replace(run.getResponseKey(), run.getBody());        }    }}

Тест.


import com.fasterxml.jackson.databind.ObjectMapper;import org.springframework.http.MediaType;import org.springframework.test.web.servlet.MockMvc;import org.springframework.test.web.servlet.MvcResult;import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;import static org.junit.jupiter.api.Assertions.assertEquals;import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;public abstract class RunBase {    void run(MockMvc mockMvc, ObjectMapper objectMapper) throws Exception {        MvcResult keyResult = mockMvc.perform(MockMvcRequestBuilders.post("/runs")                .content("body")                .contentType(MediaType.APPLICATION_JSON)                .accept(MediaType.APPLICATION_JSON))                .andExpect(status().isOk())                .andReturn();        String key = keyResult.getResponse().getContentAsString();        RunStatus runStatus = RunStatus.UNKNOWN;        while (runStatus != RunStatus.DONE) {            MvcResult statusResult = mockMvc.perform(MockMvcRequestBuilders.get("/runs/" + key + "/status")                    .contentType(MediaType.APPLICATION_JSON)                    .accept(MediaType.APPLICATION_JSON))                    .andExpect(status().isOk())                    .andReturn();            runStatus = objectMapper.readValue(statusResult.getResponse().getContentAsString(), RunStatus.class);            try {                Thread.sleep(500);            } catch (InterruptedException e) {                e.printStackTrace();            }        }        String response = mockMvc.perform(MockMvcRequestBuilders.get("/runs/" + key)                .contentType(MediaType.APPLICATION_JSON)                .accept(MediaType.APPLICATION_JSON))                .andExpect(status().isOk())                .andReturn().getResponse().getContentAsString();        assertEquals("body_calculated", response);    }}

Для использования EmbeddedServer необходимо.


Подключить библиотеки:


<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId>    <version>2.5.10.RELEASE</version></dependency><dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka-test</artifactId>    <version>2.5.10.RELEASE</version>    <scope>test</scope></dependency>

Тест может выглядеть так.


import com.fasterxml.jackson.databind.ObjectMapper;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.boot.test.context.TestConfiguration;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Import;import org.springframework.kafka.test.context.EmbeddedKafka;import org.springframework.test.web.servlet.MockMvc;@AutoConfigureMockMvc@SpringBootTest@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})@Import(RunKeTest.RunKeTestConfiguration.class)public class RunKeTest extends RunBase {    @Autowired    private MockMvc mockMvc;    @Autowired    private ObjectMapper objectMapper;    @Test    void test() throws Exception {        run(mockMvc, objectMapper);    }    @TestConfiguration    static class RunKeTestConfiguration {        @Autowired        private RunCache runCache;        @Autowired        private RunClient runClient;        @Bean        public RunCalculator runCalculator() {            RunCalculatorWithWork runCalculatorWithWork = new RunCalculatorWithWork();            runCalculatorWithWork.runCache = runCache;            runCalculatorWithWork.runClient = runClient;            return runCalculatorWithWork;        }    }}

Для использования testcontainers необходимо.


Подключить библиотеки:


<dependency>    <groupId>org.testcontainers</groupId>    <artifactId>kafka</artifactId>    <version>1.14.3</version>    <scope>test</scope></dependency>

Тест может выглядеть так.


import com.fasterxml.jackson.databind.ObjectMapper;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import org.junit.ClassRule;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.boot.test.context.TestConfiguration;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Import;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.core.*;import org.springframework.test.web.servlet.MockMvc;import org.testcontainers.containers.KafkaContainer;import org.testcontainers.utility.DockerImageName;import java.util.HashMap;import java.util.Map;@AutoConfigureMockMvc@SpringBootTest@Import(RunTcTest.RunTcTestConfiguration.class)public class RunTcTest extends RunBase {    @ClassRule    public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.3"));    static {        kafka.start();    }    @Autowired    private MockMvc mockMvc;    @Autowired    private ObjectMapper objectMapper;    @Test    void test() throws Exception {        run(mockMvc, objectMapper);    }    @TestConfiguration    static class RunTcTestConfiguration {        @Autowired        private RunCache runCache;        @Autowired        private RunClient runClient;        @Bean        ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();            factory.setConsumerFactory(consumerFactory());            return factory;        }        @Bean        public ConsumerFactory<Integer, String> consumerFactory() {            return new DefaultKafkaConsumerFactory<>(consumerConfigs());        }        @Bean        public Map<String, Object> consumerConfigs() {            Map<String, Object> props = new HashMap<>();            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");            props.put(ConsumerConfig.GROUP_ID_CONFIG, "m-group");            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);            return props;        }        @Bean        public ProducerFactory<String, String> producerFactory() {            Map<String, Object> configProps = new HashMap<>();            configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());            configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);            configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);            return new DefaultKafkaProducerFactory<>(configProps);        }        @Bean        public KafkaTemplate<String, String> kafkaTemplate() {            return new KafkaTemplate<>(producerFactory());        }        @Bean        public RunCalculator runCalculator() {            RunCalculatorWithWork runCalculatorWithWork = new RunCalculatorWithWork();            runCalculatorWithWork.runCache = runCache;            runCalculatorWithWork.runClient = runClient;            return runCalculatorWithWork;        }    }}

Перед всеми тестами необходимо стартовать kafka. Это делается вот таким вот образом:


kafka.start();

Дополнительные свойства для kafka в тестах можно задать в ресурсном файле.


application.yml

spring:  kafka:    consumer:      auto-offset-reset: earliest

Ресурсы и ссылки


Код для micronaut


Код для Spring Boot


PART 1: TESTING KAFKA MICROSERVICES WITH MICRONAUT


Testing Kafka and Spring Boot


Micronaut Kafka


Spring for Apache Kafka

Подробнее..

Поиск в Кафке

26.02.2021 10:11:26 | Автор: admin

Меня зовут Сергей Калинец, я архитектор в компании Parimatch Tech, и в этой публикации хочу поделиться нашим опытом в области поиска сообщений в Kafka.

Для нашей компании Kafka является центральной нервной системой, через которую микросервисы обмениваются информацией. От входа до выхода сообщение может пройти через десяток сервисов, которые его фильтруют и трансформируют, перекладывая из одного топика в другой. Этими сервисами владеют разные команды, и очень полезно бывает посмотреть, что же содержится в том или ином сообщении. Особенно интересно это в случаях, когда что-то идет не по плану важно понять на каком этапе все превратилось в тыкву (ну и кому нужно в тыкву дать, чтобы такого больше не повторялось). С высоты птичьего полета решение простое нужно взять соответствующие сообщения из кафки и посмотреть что в них не так. Но, как обычно, интересноеначинаетсяв деталях.

Начнем с того, что кафка это не просто брокер сообщений, как многие думают и пользуются ей, но и распределенный лог. Это значит много чего, но нам интересно то, что сообщения не удаляются из топиков после того, как их прочитали получатели, и технически можно в любой момент их прочитать заново и посмотреть, что там внутри. Однако все усложняется тем, что читать из Kafka можно только последовательно. Нужно знать смещение (для упрощения это порядковый номер в топике), с которого нам нужны сообщения. Также возможно в качестве начальной точки указать время, но потом все равно можно только читать все сообщения по порядку.

Получается, что для того, чтобы найти нужное сообщение, нужно прочитать пачку и найти среди них те, которые интересны. Например, если мы хотим разобраться в проблемах игрока с id=42, нужно найти все сообщения, где он упоминается (playerId: 42), выстроить их в цепочку, ну и дальше уже смотреть, на каком этапе все пошло не так.

В отличии от баз данных вроде MySQL или MSSQL, у которых в комплекте поставки сразу есть клиентские приложения с графическим интерфейсом, ванильная Kafka не балует нас такими изысками и предлагает разве что консольные утилиты с довольно узким (на первый взгляд) функционалом.

Но есть и хорошие новости. На рынке есть ряд решений, которые помогают облегчить весь процесс. Сразу отмечу, что на рынке тут не в смысле за деньги все рассмотренные ниже инструменты бесплатны.

Я опишу некоторые наиболее часто используемые, а в конце более детально разберем самое, на наш взгляд, близкое к идеальному.

Итак, из чего можно выбирать?

Kafka Tool

(скриншот с официального сайта https://www.kafkatool.com/features.html)

Наверное самый популярный инструмент, которым пользуются люди, привыкшие к GUI инструментам. Позволяет увидеть список топиков, и прочитать отдельные сообщения. Есть возможность фильтровать по содержимому сообщений, указывать с какого смещения читать и сколько сообщений нужно прочитать. Но все же нужно знать примерные смещения интересующих нас сообщений, потому что Kafka Tool ищет только в рамках тех сообщений, которые были прочитаны. Говоря словами Дятлова из сериала Чернобыль (в оригинальной озвучке): Not great not terrible.

Часто это единственная альтернатива, с которой вынуждены работать люди, волей судьбы столкнувшиеся с кафкой. Но есть и другие средства.

Kafka Console Consumer

Это одна из утилит, входящая в комплект поставки кафки. И она позволяет читать данные из нее. Как и сама Kafka, это JVM приложение, которое для своей работы требует установленной Java. В принципе, это справедливо и для Kafka Tool, но в данном случае можно обойтись без Java если запускать через docker:

По сути, нужно просто перед самой командой добавить docker run --rm -it taion809/kafka-cli:2.2.0, что на докерском значит запусти вот такой образ, выведи, то, что он показывает, на мой экран и удали образ, когда он закончит работу. Можно пойти еще дальшеи добавить алиас типа

Утилита кажется архаичной и ее консольность может отпугнуть неискушенных адептов графических интерфейсов, но, как и большинство консольных инструментов, при правильном использовании она дает более мощный результат. Как именно рассмотрим на примере следующего инструмента, тоже консольного.

Kafkacat

Это уже весьма могучая штука, которая позволяет читать и писать из Кафки, а также получать список топиков. Она тоже консольная, но при этом поудобнее в использовании, чем стандартные утилиты вроде kafka-console-consumer (и ее тоже можно запустить из докера).

Вот так можно сохранить 10 сообщений в файле messages (в формате JSON):

Файл можно будет использовать для анализа или чтобы воспроизвести какой-то сценарий.Безусловно, для решения этой задачи можно было бы и использовать родной консьюмер, но kafkacat позволяет выразить это короче.

Или вот пример посложнее (в смысле букв немного больше, но решение проще многих других альтернатив):

Код взят с блога одного из самых топовых популяризаторов Kafka экосистемы Robin Moffatt. В двух словах один инстанс kafkacat читает сообщения из топика Kafka, потом сообщения трансформируются в необходимый формат и скармливаются другому инстансу kafkacat, который пишет их в другой топик. Многим такое может показаться вырвиглазным непонятным брейнфаком, однако на самом деле у нас готовое решение, которое можно запустить в докере и оно будет работать. Реализация такого сценария на вот этих ваших дотнетах и джавах потребует больше букв однозначно.

Но это меня уже занесло немного не в ту степь. Статья же про поиск. Так вот, похожим способом можно организовать и поиск сообщений просто перенаправить вывод в какой-то grep и дело с концом.

Из возможностей для улучшений можно отметить тот факт, что kafkacat поддерживает Avro из коробки, а вот protobuf нет.

Kafka Connect + ELK

Все вышеперечисленные штуки работают и решают поставленные задачи, однако не всем удобны. При разборе инцидентов нужно именно поискать сообщения в разных топиках по определенному тексту это может быть определенный идентификатор или имя. Наши QA (а именно они в 90% случаев занимаются подобными расследованиями) наловчились пользоваться Kafka Tool, а некоторые и консольными утилитами. Но это все меркнет по сравнению с возможностями, которые дает Kibana, UI оболочка вокруг базы данных Elasticsearch. Kibana тоже используется нами QA для анализа логов. И не раз поднимался вопрос давайте логировать все сообщения, чтобы можно было искать в Kibana. Но оказалось, что есть способ намного проще, чем добавление вызова логера в каждый из наших сервисов, и имя ему Kafka Connect.

Kafka Connect это решение для интеграции Kafka с другими системами. В двух словах,оно (или он?) позволяет экспортировать из и импортировать данные в Kafka без написания кода. Все, что нужно это поднятый кластер Connect и конфиги наших хотелок в формате JSON. Слово кластер звучит дорого и сложно, но на самом деле это один или больше инстансов, которые можно поднять где угодно мы, например, запускаем их там же, где и обычные сервисы в Kubernetes.

Kafka Connect предоставляет REST API, c помощью которого можно управлять коннекторами, занимающимися перегонкой данных из и в Kafka. Коннекторы задаются конфигурациями, в случае Elasticsearch эта конфигурация может быть вот такой:

Если такой конфиг через HTTP PUT передать на сервер Connect, то, при определенном стечении обстоятельств, создастся коннектор с именем ElasticSinkConnector, который будет в три потока читать данные из топика и писать их в Elastic.

Выглядит всё крайне просто, но самое интересное, конечно же, в деталях. А деталей тут есть )

Большинство проблем связанно с данными. Обычно, как и в нашем случае, нужно работать с форматами данных, разработчики которого явно не думали, что когда-то эти данные будут попадать в Elasticsearch.

Для решения нюансов с данными есть трансформации. Это такие себе функции, которые можно применять к данным и мутировать их, подстраивая под требования получателя. При этом всегда есть возможность использовать любую Kafka клиент технологию для случаев, когда трансформации бессильны. Какие же сценарии мы решали с помощью трансформаций?

В нашем случае есть 4 трансформации. Вначале мы их перечисляем, а потом конфигурим. Трансформации применяются в порядке их перечисления, и это позволяет интересно их комбинировать.

Имена

Вначале добавляем возможность поиска по имени топика просто дополняем наши сообщения полем с нужной информацией.

Индексы

Elasticsearch работает с индексами, которые имеют свойство забиваться и нервировать девопсов. Нужна поддержка удобной для управления схемой индексирования. В нашем случае мы остановились на индексе на сервис / команду с ежедневной ротацией. Плюс решения хранение данные из разных топиков в одном индексе с возможностью контролировать его возраст.

Даты

Для того, чтобы в Kibana можно было искать по дате, необходимо задать поле, в котором эта дата содержится. Мы используем дату публикации сообщения в Kafka. Чтобы ее получить, мы вначале вставляем поле с датой сообщения, а потом конвертируем ее в UTC формат. Конвертация была добавлена, чтобы помочь Elasticsearch распознать в этом поле timestamp, однако в нашем случае это не всегда происходило, поэтому мы добавили index template, который явно говорил в этом поле дата:

В результате у нас сообщения практически мгновенно становятся доступными для анализа, и время, потраченное на разбор инцидентов, уменьшается.

Тут, конечно, стоит отметить, что данная инициатива сейчас у нас на этапе внедрения, поэтому нельзя сказать,что все массово ищут все что нужно в Kibana, но мы к этому уверенно идем.

Вообще, Kafka Connect применимо не только для таких задач. Его можно вполне использовать в тех случаях, где нужна интеграция с другими системами, Реально, например, реализовать полнотекстовый поиск в вашем приложении с помощью двух коннекторов. Один будет читать из операционной базы обновления и писать их в Kafka. А второй читать из Kafka и отсылать в Elasticsearch. Приложение делает поисковый запрос в Elasticsearch, получает id и по нему находит нужные данные в базе.

Заключение

Ну а наша публикация подходит к концу. Очень надеюсь, что вы узнали что-то новое для себя, а иначе зачем это все? Если что-то не получилось раскрыть, или вы категорически не согласны с чем-то, или может есть более удобные способы решения подобных проблем напишите про это в комментариях, обсудим )

Подробнее..

Гибриды побеждают или холивары дорого

11.01.2021 02:05:19 | Автор: admin

Мотивом для написания данной статьи послужил тот факт, что на habr.com участилось появление материалов маркетингового характера про Apache Kafka. А также тот факт, что из статей складывается впечатление что пишут их немного далекие от реального использования люди это конечно же только впечателение, но почему-то в большинстве своем статьи обязательно содержат сравнение Apache Kafka с RabbitMQ, причем не в пользу последнего. Что самое интересное читая подобные статьи управленцы без технического бэкграунда начинают тратить деньги на внутренние исследования, чтобы ведущие разработчики и технические директора выбрали одно из решений. Так как я очень жадный/домовитый, а также так как я сторонник тезиса "В споре НЕ рождается истина" предлагаю вам ознакомится с другим подходом почти без сравнения разных брокеров.


Без сравнения никуда


Вообще, по правильному, я должен был сделать статью в формате Kafka+RabbitMQ+Nats+ActiveMQ+Mosquito+etc, но мне кажется, что для Вас дорогие читатели это будет перебор, хотя обычно в моих архитектурных решениях присутствуют все вышеуказанные сервисы (и не только). И это я еще не рассказываю про AzureServiceBus/AmazonServiceBus которые также участвуют в "гибридах" при крупных программах проектов. Поэтому пока остановимся на связке Kafka+RabbitMQ и далее вы поймете почему: по аналогии можно подключить любой сервис с его протоколом. Потому что:


сравнивая Apache Kafka и RabbitMQ вы сравниваете 2 (два) бренда, а точнее 2 коммерческие компании Confluent и vmWare, и немножко Apache Software Foundation (но это не компания)

то есть формально при сравнении мы должны сравнивать бизнес-модели компаний которые являются основными драйверами развития наших сегодняшних подоопытных. Так как Хабр все таки не портал экономических исследований, поэтому мы для начала должны вспомнить не бренды, а те описания которые стоят за этими брендами (то как сами себя называют наши сегодняшние участники).


  • RabbitMQ мультипротокольный и расширяемый брокер сообщений
  • Apache Kafka платформа для распределенной потоковой передачи событий
  • Confluent Platform платформа потоковой передачи событий с возможностью создания высокопроизводительных конвейеров обработки данных для целей аналитики и интеграции в бизнес-сценариях

Я не зря третьим пунктом выделяю наработки компании Confluent те кто собирается использовать Apache Kafka в продуктиве должны хотя бы видеть какую функциональность дополнительно добавляет Confluent к Apache Kafka. А это SchemeRegistry, RestProxy, kSQL и еще несколько интересных штук, о одной из которых мы поговорим ниже, она называется Kafka-Connect.


Но вернемся к сравнению внимательный читатель видит, что RabbitMQ сам себя называет брокером сообщений выделяя свою главную фишку "мультипротокольность", а товарищи из экосистемы Kafka почему-то называют себя аж платформой (завышенное самомнение оно такое).


Итак чтобы было совсем понятно, куда я веду.


  • ключевая особенность RabbitMQ мультипротокольность и расширяемость. (основной язык якобы Erlang)
  • ключевая особенность экосистемы Kafka потоковая передача с обработкой (основной язык якобы Scala/Java)

Отсюда и возникают минусы каждого из решений


  • для RabbitMQ мы не сможем построить нормального решения для потоковой обработки. Точнее сможем, но НЕ штатно.
  • а для Kafka мы не сможем сделать мультипротокольность, точнее сможем но НЕ штатно.

Сократ не говорил, что в споре рождается истина


Еще одна новость: действительно если почитать источник, то Сократ вообще-то в итоге пришел к тому, что нужно обеспечить диалог, а если по научному то истина рождается в научном споре, который формально представляет собой процесс публикация со ссылкой на источники -> а затем научная критика оппонентов -> истина


А значит перейдем к ссылкам для начала их будет три. Когда 14 лет назад я совместно с коллегами начинал использовать брокеры сообщений в качестве основы для построения своих интеграционных решений, мы сразу обратили внимание, что фактически с точки зрения "клиента" (конечного приложения), под разные задачи подходят разные протоколы интеграции.


  • ODBC
  • AMQP
  • MSMQ
  • XMPP
  • IP over Avian Carriers

так как тогда наша задача была интегрировать всякое (python, C#, java) и 1С был придуман проект One-S-Connectors (https://code.google.com/archive/p/one-c-connectors/source/default/source). Сейчас он имеет сугубо академический интерес (так как в 1С мире моя персона достаточно известна и на Хабре много 1С специалистов из сообщества "воинствующих 1С-ников" эта ссылка специально для них).
Однако уже тогда (в 2006 году) стало понятно, что по большому счету конечному разработчику придется менять/выбирать протокол под бизнес-задачу. А инфраструктурщикам придется обеспечить максимально широкий спектр интеграционных протоколов. От ODBC до Kafka/NATs/ModBus.


Но вернемся к дню сегодняшнему когда я начал использовать в проектах уровня ГИС (госсударственные информационные системы) различные транспорта данных внезапно выяснилось, что универсальные адаптеры это не только концепт воинствующих 1С-ников, но и соседей. Поэтому многие идеи при внедрении черпались из еще двух интересных проектов



маленькое примечание для менеджеров про Kombu как то так получилось, что имплементация протокола Apache Kafka до сих пор открыта https://github.com/celery/kombu/issues/301 и почему-то перешла в разряд "Дайте денег", поэтому для Python проектов приходится использовать дополнительно https://github.com/confluentinc/confluent-kafka-python

Когда вы дочитаете до этого момента предполагаю, что вы зададите вопрос про остальные языки: Java, GoLang, RUST, etc. Но во первых я не зря выше указал что по серьезному в наш обсуждаемый сегодня гибрид нужно добавить историю про NATs и ActiveMQ и внезапно JMS поэтому просьба дочитать до конца: Java будет, а во вторых мы переходим к еще трем полезным ссылкам



Прокоментируем их? Дело в том, что как бы вы не хотели, а для полноценного использования "в длинную" вам придется подписаться на историю релизов как сервера RabbitMQ и самое главное на те самые расширения (лежат в каталоге /deps) которые постоянно добавляются в ядро RabbitMQ, так и на портал компании Confluent где она публикует приложения полезные для конечного бизнеса использующего Apache Kafka в продуктиве.


подход к расширяемости за счет активируемых расширений также используется в экосистеме PostgreSQL тот который CREATE EXTENSION hypopg, так что подход реализованный компанией Pivotal/vmWare далеко не новый в нашем чудесном мире архитектуры программного обеспечения

Дополнительно на чудесном рынке облачных услуг в формате "Серьезная штука как сервис" есть еще один игрок это компания 84Codes https://github.com/84codes. Когда в рамках проектов внедрения нет нормальных инженеров по инфраструктуре именно 84Codes спасает пилотные проекты, потому как у них можно легко арендовать бесплатные/сильнодешевые контура CloudAMQP и CloudKarafka.


Я как бы обещал, что не буду ничего говорить про деньги, однако придется отразить 2 ключевых момента:


  • компания vmWare зарабатывает известно на чем, поэтому RabbitMQ ей развивается как часть своей платформы то есть они инвестируют в открытый проект не особо занимаясь его монетизацией. Возврат их инвестиций происходит в других местах, ну и также за счет контрибьторов на GitHub.
  • а вот компания Confuent собирается монетизировать свою платформу через Enterprise лицензию в которую включает те самые коннекторы Enterprise-Kafka-Connect, а также GUI для управления платформой.

Когда-то давно существовал https://github.com/jcustenborder/kafka-connect-rabbitmq, примечателен тот факт что товарищ Джереми его скрыл, оставив только свои наработки для Java разработчиков в виде Maven Archetype https://github.com/jcustenborder/kafka-connect-archtype еще раз обращаю Ваше внимание, что компания Confluent будет и дальше пытаться монетизировать свою деятельность, так что переводить всю интеграцию только на Kafka я бы на вашем месте поостерегся.

Поэтому когда вам топят за Kafka учитывайте, что вы либо изучаете Java, либо платите за Enterprise лицензию. А когда вам топят за RabbitMQ учитывайте, что либо вы изучаете системное администрирование (Erlang накладывает особенности системного администрирования), либо покупаете сервис у провайдеров типа 84Codes. Кодить на Erlang никогда не придется там это не нужно, если только вы не контрибьюторы OpenStack.


Поставил и забыл уже не работает


Приближаемся к дальнейшему пониманию. Данный раздел уже будет полезен инфраструктурщикам, хотя и разработчикам важно знать, что в эпоху когда семимильными шагами развивается имплементация ITILv4, для того чтобы перейти от текста и менеджерских хитростей про риски и деньги к реальности нам придется осознать 3 тезиса


  • использование только одного протокола интеграции приводит к появлению ProtocolLock и как следствие к VendorLock я же не зря выше написал, что за каждым открытым продуктом, стоит какой-то ключевой комплект вендоров как они себя поведут: мы не знаем.
  • в мире ИТ больше нет серьезных продуктов, которые бы представляли собой монолитную службу все приложения давно стали композитными.
  • все нормальные вендоры сокращают свои релизные циклы по ключевым продуктам нормальной практикой стало выпускать редакции раз в 3 месяца TDD, BDD, CICD, ScallableAgile и DevOps (DocOps, DevSecOps) эти инженерные практики и методики управления не просто так развиваются. Всем очень хочется сокращать себестоимость и TimeToMarket.

Абзац выше важен, как финальный аккорд, прежде чем мы перейдем к Docker-Compose. А именно к нему я вел чтобы и разработчики и инфраструктурщики понимали что такое гибридная инфраструктура в режиме мультипротокольности (с) нужно сделать так, чтобы каждый мог поэкспериментировать с предлагаемым контуром. Как я уже указал выше первично подобное применительно к Kafka+RabbitMQ было подсмотрено именно у коллег из 84Codes (хорошие ребята всем советую https://www.84codes.com/).


Чтобы вы смогли поэкспериментировать сами


Итак подходим к примерам, так как обоснования и вводных уже хватит. Предположим вы уже поняли, что вам также нужна мультипротокольность, однако мы же помним, что все рекламные материалы про Apache Kafka нам рассказывают что это единственное решение с реализацией exactly-ones доставки сообщений от отправителя получателю. Собственно на самом деле нам и нужен гибрид, чтобы сделать из связки ТочкаОбмена->Очередь журнал Kafka (это тот который Topic) чтобы возникла сущность под называнием Offsets у нашей очереди событий.


exactly-ones

проверка на внимательность читающего exactly-ones это шутка в формате "Хотя бы один раз из 1С", а имеется в виду концепт Exactly once строго однократная доставка сообщений получателю, без необходимости повторной отправки от отправителя.


Предлагаю попробовать. Концепт для проверки Вашими руками будет состоять из:


  • Zookeper
  • KafkaBroker
  • RabbitMQ
  • KafkaConnect

и трех приложений приложений


  • отправитель на Python по протоколу AMQP 0.9
  • получатель на С# по протоколу AMQP 1.0
  • получатель на C# по протоколу Kafka

Еще интересное замечание: когда вы смотрите на всякие обучающие видео по Apache Kafka авторы часто (но не всегда) старательно пишут примеры на Java, это они делают скорее всего для того, чтобы скрыть от вас особенности использования librdkafka C++ библиотеки на основе которой сделаны многие не-джава адаптеры,. Я же наоборот предлагаю вам начинать исследование интеграции с Kafka именно с неё, чтобы четко оценивать риски "куда вы ввязываетесь": очень примечательно что там работает фактически один разработчик, формально в одиночку https://github.com/edenhill/librdkafka/pulse/monthly, а допустим wmWare старается поддерживать свою линейку клиентов под своим брендом https://github.com/rabbitmq

ну и самое главное и тяжелое:


контур содержит открытый форк старого RabbitMQ-Kafka-Sinc-Connector того самого который товарищи из Confluent в своё время скрыли с Github.


Докер контура для экспериментов


Для показательного эксперимента мы сделаем 2 композитных приложения инфраструктурное-трансформационное и непосредственно бизнес-приложения.


Развертываем RabbitMQ и Kafka


контур инфраструктуры который нам понадобится запускается достаточно просто


docker-compose -f dockers/infra.yml up -d

Если вам интересно что же там внутри, нашего композитного приложения, то в конце статьи дается ссылка на полный комплект исходников, наиболее интересен в нем Kafka-UI и непосредственно RabbitMQ-Sinc, все остальное обычно и штатно для всех известных примеров по Kafka или RabbitMQ


    image: provectuslabs/kafka-ui:latest    ports:      - 8080:8080    depends_on:      - kafka-broker      - zookeeper    environment:      KAFKA_CLUSTERS_0_NAME: local      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181      KAFKA_CLUSTERS_0_JMXPORT: 9101

Но самое главное кроется в репозитории Java


    <parent>        <groupId>com.github.jcustenborder.kafka.connect</groupId>        <artifactId>kafka-connect-parent</artifactId>        <version>1.0.0</version>    </parent>

Если подробно изучить pom.xml то выяснится, что существует заглавный проект для всех конекторов к Кафка https://github.com/jcustenborder/kafka-connect-parent, в котором используется Java-Kafka-Adapter


И непосредственно синхронизацией c RMQ занимается штатный Java клиент https://www.rabbitmq.com/java-client.html


            <groupId>com.rabbitmq</groupId>            <artifactId>amqp-client</artifactId>            <version>${rabbitmq.version}</version>

Таким образом по правильному, чтобы получились повторить тот же эксперимент что и у меня, необходимо выполнить:


  • собрать из исходников java синхронизатор -1-build-connect-jar.bat
  • собрать контейнер с синхрозатором 00-build-connect-image.sh

и уже потом запустить полный инфраструктурный контур


  • стартуем полный инфраструктурный контур 01-start-infra.sh

обратите внимание так как Docker использует разное поведение при работе с PWD для Windows и Linux приходится делать дубликаты скриптов. В остальных случаях под обоими операционными системами используется интерпретатор sh

В итоге вы получите следующий комплект сервисов



На картинке можно увидеть как подключаются конфигурационные файлы к RabbitMQ и какая топология сетевых портов у нас будет участвовать в эксперименте:


Назначение портов:


  • 9092 будет использоваться для Kafka протокола
  • 8080 используется для отображения красивой картинки состояния Apache Kafka UI
  • 5672 будет использоваться для протокола AMQP 0.9 и он же будет работать и как AMQP 1.0
  • 15672 используется для красивой картинки управления RabbitMQ
  • 28082 отладочный порт для управления через curl трансформатором протоколов

В этот момент нужно остановится и прокомментировать особенность развертывания RabbitMQ в Docker:


  • хорошей практикой является версионирование включенных плагинов расширений enabled-rmq-plugins

[    rabbitmq_management,     rabbitmq_amqp1_0,     rabbitmq_mqtt,     rabbitmq_federation,     rabbitmq_federation_management,    rabbitmq_shovel,    rabbitmq_shovel_management,    rabbitmq_prometheus].

  • а также в крупных проектах когда нужно передать разработчику преднастроенную топологию точек обмена и очередей, можно и нужно добавлять это в виде конфигурационного файла rmq_definitions.json

     "bindings":[        {           "source":"orders-send",           "vhost":"/",           "destination":"orders-amqp-10-consumer",           "destination_type":"queue",           "routing_key":"",           "arguments":{

Запускаем наши приложения


Остается только запустить наши приложения эмулирующие подключения


docker-compose -f dockers/infra.yml restart protocol-connect-syncdocker-compose -f applications.yml builddocker-compose -f applications.yml up

Топология наших тестовых приложений достаточно простая



Исходный код также максимально упрощён:


  • отправляется как-будто бы заказ Васи с периодичностью в 2 секунды

        producer = conn.Producer(serializer='json')        producer.publish({'client': 'Вася', 'count': 10, 'good': 'АйФончик'},                      exchange=order_exchange,                      declare=[kafka_queue, amqp10_queue])        time.sleep(2)

RUN python -m pip install \    kombu \    librabbitmq

причем используется для этого максимально производительная библиотека на Си для AMQP 0.9 librabbitmq наследуется именно от неё https://github.com/alanxz/rabbitmq-c


  • создан подписчик который уже по протоколу AMQP 1.0 смотрит в свою очередь и получает события, соответственно очередь очищается и больше мы заказов Васи не получим. В этом потоке нам это и не нужно.

            Attach recvAttach = new Attach()            {                Source = new Source()                {                    Address = "orders-amqp-10-consumer",                    Durable = 1,                },

            ReceiverLink receiver =                 new ReceiverLink(session,"netcore_amqp_10_consumer", recvAttach, null);            Console.WriteLine("Receiver connected to broker.");            while (true) {                Message message = receiver.Receive();                if (message == null)                {                    Console.WriteLine("Client exiting.");                    break;                }                Console.WriteLine("Received "                   + System.Text.Encoding.UTF8.GetString((byte[])message.Body)

Причем в качестве драйвера выбран


  <ItemGroup>    <PackageReference Include="AMQPNetLite.Core" Version="2.4.1" />  </ItemGroup>

именно его https://github.com/Azure/amqpnetlite Microsoft использует для маркетинга своей реализации сервисной шины. Собственно именно AMQP 1.0 как протокол они и рекламируют https://docs.microsoft.com/ru-ru/azure/service-bus-messaging/service-bus-amqp-overview


Ну и финально


  • создан подписчик по протоколу Kafka который при каждом старте перечитывает с нуля журнал отправленных заказов Васи. Тот самый Exactly once.

                AutoOffsetReset = AutoOffsetReset.Earliest

                c.Subscribe("orders-from-amqp");

                    while (true)                    {                        try                        {                            var cr = c.Consume(cts.Token);

Выглядит наш контур в итоге следующим образом:


  • 5 инфраструктурных контейнеров


  • 3 контейнера с приложениями


  • готовый журнал транзакций заказов который можно посмотреть через Kafka-Ui


  • и готовый контур связей для RabbitMQ


А где же Java ?


Не волнуйтесь при таком гибридном подходе, без неё никуда, для того чтобы всё вышеуказанное заработало пришлось сделать форк и актуализировать версии Kafka-Connect-Base


[submodule "dockers/rabbitmq-kafka-sink"]    path = dockers/rabbitmq-kafka-sink    url = https://github.com/aliczin/kafka-connect-rabbitmq

Но самое интересное не это, самое интересное что в этом самом Kafka-Connect нет по сути никакой магии только код трансформации.


По сути нам предлагают:


  • создать наследника абстрактной задачи Источника

public class RabbitMQSourceTask extends SourceTask {

  • выполнить подписку на очередь сообщений

        this.channel.basicConsume(queue, this.consumer);        log.info("Setting channel.basicQos({}, {});", this.config.prefetchCount, this.config.prefetchGlobal);        this.channel.basicQos(this.config.prefetchCount, this.config.prefetchGlobal);

  • трасформировать полученные сообщения в абстрактные записи причем с буфером.

  @Override  public List<SourceRecord> poll() throws InterruptedException {    List<SourceRecord> batch = new ArrayList<>(4096);    while (!this.records.drain(batch)) {

Отдельно можно выделить чудесный трансформатор сообщений из AMQP 0.9 в Кафка. У несведующего в Java глаз может задергаться. У автора чувствуется многолетный опыт работы в J2EE.


  private static final Logger log = LoggerFactory.getLogger(MessageConverter.class);  static final String FIELD_ENVELOPE_DELIVERYTAG = "deliveryTag";  static final String FIELD_ENVELOPE_ISREDELIVER = "isRedeliver";  static final String FIELD_ENVELOPE_EXCHANGE = "exchange";  static final String FIELD_ENVELOPE_ROUTINGKEY = "routingKey";  static final Schema SCHEMA_ENVELOPE = SchemaBuilder.struct()      .name("com.github.jcustenborder.kafka.connect.rabbitmq.Envelope")      .doc("Encapsulates a group of parameters used for AMQP's Basic methods. See " +          "`Envelope <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html>`_")      .field(FIELD_ENVELOPE_DELIVERYTAG, SchemaBuilder.int64().doc("The delivery tag included in this parameter envelope. See `Envelope.getDeliveryTag() <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html#getDeliveryTag-->`_").build())      .field(FIELD_ENVELOPE_ISREDELIVER, SchemaBuilder.bool().doc("The redelivery flag included in this parameter envelope. See `Envelope.isRedeliver() <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html#isRedeliver-->`_").build())      .field(FIELD_ENVELOPE_EXCHANGE, SchemaBuilder.string().optional().doc("The name of the exchange included in this parameter envelope. See `Envelope.getExchange() <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html#getExchange-->`_"))      .field(FIELD_ENVELOPE_ROUTINGKEY, SchemaBuilder.string().optional().doc("The routing key included in this parameter envelope. See `Envelope.getRoutingKey() <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html#getRoutingKey-->`_").build())      .build();

Но Не будем критиковать, мы же в самом начале договорились что наша главная задача добиться конечного результата удобным на сегодня способом. А итоги у нас получаются следующие.


Итоги


Все что здесь продемонстрировано естественно лежит на Github.


В репозитории https://github.com/aliczin/hybrid-eventing. Лицензия выставленна простая до невозможности Creative Commons Attribution 4.0 International.


Полезно использовать в обучающих целях для команд разработки и инфраструктуры и поиграться с DevOps и поиграться с мультипротокольными приложениями. Ничего особо экстравагантного в данном концепте конечно нет, ключевое тут я как я написал в самом начале мы делаем избыточное количество интеграционных протоколов, добавляя транформаторов между потоками интеграции.


Схема коммуникации в итоге для "разработчика интеграционных потоков" (с) выглядит следующим образом для источника и брокеров


orderEventsApp->Amqp09: send orderAmqp09->Amqp10: fanout\n copy eventAmqp09->KafkaQ: fanout\n copy eventKafkaQ->KafkaConnect: consume\n on messageKafkaConnect->KafkaConnect: transform\n messageKafkaConnect->Kafka: publish to topic


а для приемников все упрощается


Amqp10->orderEventSubApp: subcribe\n for eventorderJournalApp->Kafka: read kafka journal


Приемники берут нужные им данные только по нужному им протоколу

Ключевые посылы


Ключевые моменты которые я хотел расскрыть данной статьей


  • стройте эксперименты и продуктивы с Apache Kafka не со штатным Java клиентом, а librdkafka и базирующихся на ней адаптерах это позволит вам отладить сценарии разных версий протоколов KafkaAPI. Java вам пригодится в другом месте.


  • не ввязывайтесь с священные войны, что лучше RabbitMQ/Kafka/Nats/ActiveMQ просто развертывайте сервисы и публикуйте протоколы и пробуйте свои бизнес-сценарии.


  • начните уже внедрять продуктивный Docker, или хотя бы пилотные и разработческие контура.


  • реальный ИТ ландшафт почти всегда будет мультипротокольным



Примечание для понимающих


чтобы гибриды развивались дальше:


  • Mosquito очень удобен как встраиваемый брокер на уровне контролера SCADA для преобразования из ModBus/OPC-UA. Хотя как вы уже поняли из статьи интересны реализации "мостов из протокола в протокол" пример https://github.com/mainflux/mainflux


  • ActiveMQ удобен для Java разработчиков, потому что у них есть боязнь Erlang, но как мы выше уже сказали мост RabbitMQ AMQP 1.0 -> ActiveMQ легко организуется средствами RabbitMQ, кстати также как и JMS.


  • NATs интересен как часть OpenFaaS платформы, при внедрении "своего маленького" Amazon Lambda с преферансом. И опять же подход будет всё тот же мультипротокольные мосты с трансформацией: https://github.com/nats-io/nats-kafka если Вам не страшно посмотрите эксперименты с OpenFaaS веселых 1С-ников 2.5 часа примеров https://youtu.be/8sF-oGGVa9M



Надеюсь мой архитектурный подход Вам придется по душе и вы перестанете тратить деньги заказчика (инвестора/свои если вы стартапщик: Маша это замечание специально для тебя) на бессмысленные обсуждения что же выбрать в качестве брокера/платформы, и начнете наконец-то делать функциональность, которая будет использовать тот протокол, который удобен прямо сейчас. С возможностью переключения в случае "если чё"


Функциональность: Мультипротокольный адаптер    Как разработчик я хочу иметь абстракцию Produser/Consumer    С возможность изменения протокола интеграции    Чтобы под каждую задачу выбирать разные протоколы     и единый интерфейс вызова для обеспечения независимости от вендора предоставляющего транспортСценарий: vmWare реализует протокол Stream средствами RabbitMQ     Когда vmWare закончит свой плагин для потоков    Тогда я активирую новый протокол     И быстро воткну его в приложение    И так как у меня есть продуктивный кластер RabbitMQ    И мне нужно будет просто поменять канал для отдельных бизнес сценариевСценарий: Завтра придут 1С-ники со своим ActiveMQ из Шины для 1С    Когда мне нужно быстро включить очереди 1С в общий контур    И чтобы на Питоне использовать старые наработки с Kafka API    Тогда я добавляю трансформацию ActivemeMQ2Kafka    и живу по старому а события ходят уже и из 1Сetc

А чтобы вы не думали, что данный подход это нечто уникальное вот Вам еще интересная ссылка: https://github.com/fclairamb/ftpserver/pull/34 это когда нужен FTP сервер, а хочется S3.


Ну и в качестве финального момента обратите внимание: есть и риски данного подхода: но они я думаю Вам и так понятны.


  • Придется оркестрировать такой комплект сервисов и вручную это почти невозможно. Придется использовать DevOps штуки типа k8s, OpenShift, etc но если вы уже решились на интеграцию в режимах слабой связаности приложений в режиме онлайн, у вас что-то на эту тему уже скорее всего есть.
  • Трансформаторы между протоколами приходится дорабатывать ничего готового открытого и PRODUCTION-READY на данный момент найти почти невозможно.

Финальное примечение для любителей писать ТЗ по ГОСТу


так как Хабр читают любители цифровой трансформации (чтобы кто не понимал под этим словом) советую в техническое задание добавлять не упоминание конкретных реализации серверов, а что-то примерно следующее:


комплект программ для интеграции должен реализовывать коммуникацию конечных приложений по открытым протоколам HTTP, AMQP 0.9, AMQP 1.0, Apache Kafka не ниже версии 23, MQTT, WebSockets, <ЛюбойДругойХотьSOAPХотяЭтоЖуть> с возможность преобразования между протоколами дополнительными средствами администрирования

Надеюсь моя публикация после долгого перерыва Вам будет полезна в ваших интеграционных проектах. Предполагаю что будет вопрос про 1С и тут у меня совет только один. Используйте Google по ключевым словам 1С+RabbitMQ или 1С+Kafka или 1С+OpenFaas и RabbitMQ и Kafka "в 1С" давно и непринужденно используются. Потому что 1С это не только язык, но и несколько сообществ где уже давно сделаны все возможные адаптеры и платные и бесплатные. Собственно как и в Java/C#/Python/C++/Rust/etc.


Данная статья написана с применением расширения https://shd101wyy.github.io/markdown-preview-enhanced для Visual Studio Code за что автору летят дополнительные лучи добра.


Ну и в качестве финального момента хотел бы заметить, что выбор Cunfluent Inc в качестве платформы разработки Kafka-Connect экосистемы JDK выглядит все таки странно. Не удивлюсь если их конкуренты сделают такое же, но на GoLang, NodeJS (что-нибудь типа Kafka-Beats-Hub)



Красивые картинки в формате GraphViz я делаю при помощи хитрого проекта Docker2GraphViz помогает поддерживать актуальный контур и техническую документацию в формате Markdown


set CURPATH=%~dp0set DOCKER_DIR=%CURPATH%\dockersdocker run --rm -it --name dcv -v %DOCKER_DIR%\:/input pmsipilot/docker-compose-viz render -m image --force --output-file=infra-topology.png infra.ymldocker run --rm -it --name dcv -v %CURPATH%\:/input pmsipilot/docker-compose-viz render -m image --force --output-file=apps-topology.png applications.ymlcopy /b/v/y dockers\infra-topology.png content\assets\infra-topology.pngcopy /b/v/y apps-topology.png content\assets\apps-topology.png
Подробнее..

First touch of Kafka

23.02.2021 12:21:01 | Автор: admin

Прежде чем начать я бы хотел отметить, что это всего лишь небольшой туториал по быстрому старту для тех кто, как и я, ни разу не использовал Kafka на практике

И так приступим!

Единственный брокер Kafka и необходимый для его работы ZooKeeper я буду запускать в Docker

Сперва создам отдельную сеть kafkanet

docker network create kafkanet

Запуск контейнера с ZooKeeper

docker run -d --network=kafkanet --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper

Запуск контейнера с Kafka

docker run -d --network=kafkanet --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 9092:9092 confluentinc/cp-kafka

Для того чтобы убедиться в отсутствии ошибок, можно вывести лог docker logs kafka

Далее проверю функционирование брокера Kafka, выполнив простые операции, включающие создание тестовой темы, генерацию сообщений и их потребление

Для этого сценария подключусь к контейнеру kafka

docker exec -it kafka bash

Создам топик demo-topic

/bin/kafka-topics --create --topic demo-topic --bootstrap-server kafka:9092

Выведу список всех топиков

/bin/kafka-topics --list --zookeeper zookeeper:2181

И выведу описание созданного топика

/bin/kafka-topics --describe --topic demo-topic --bootstrap-server kafka:9092

Сгенерирую несколько сообщений

/bin/kafka-console-producer --topic demo-topic --bootstrap-server kafka:9092

И после прочитаю эти сообщения

/bin/kafka-console-consumer --topic demo-topic --from-beginning --bootstrap-server kafka:9092

Далее я создам два небольших .NET приложения: KafkaProducer, которое будет генерировать сообщения, и KafkaConsumer, которое будет потреблять сообщения. Для реализации мне понадобятся пакеты Confluent.Kafka и Microsoft.Extensions.Hosting

В проект KafkaProducer добавлю класс KafkaProducerService

using Confluent.Kafka;using Microsoft.Extensions.Hosting;using Microsoft.Extensions.Logging;using System.Threading;using System.Threading.Tasks;namespace KafkaProducer{    public class KafkaProducerService : IHostedService    {        private readonly ILogger<KafkaProducerService> _logger;        private readonly IProducer<Null, string> _producer;        public KafkaProducerService(ILogger<KafkaProducerService> logger)        {            _logger = logger;            var config = new ProducerConfig            {                BootstrapServers = "localhost:9092"            };            _producer = new ProducerBuilder<Null, string>(config).Build();        }        public async Task StartAsync(CancellationToken cancellationToken)        {            for (var i = 0; i < 5; i++)            {                var value = $"Event N {i}";                _logger.LogInformation($"Sending >> {value}");                await _producer.ProduceAsync(                    "demo-topic",                    new Message<Null, string> { Value = value },                    cancellationToken);            }        }        public Task StopAsync(CancellationToken cancellationToken)        {            _producer?.Dispose();            _logger.LogInformation($"{nameof(KafkaProducerService)} stopped");            return Task.CompletedTask;        }    }}

Изменю файл Program.cs

using Microsoft.Extensions.DependencyInjection;using Microsoft.Extensions.Hosting;using System;namespace KafkaProducer{    class Program    {        static void Main(string[] args)        {            CreateHostBuilder(args).Build().Run();            Console.ReadKey();        }        private static IHostBuilder CreateHostBuilder(string[] args) =>            Host                .CreateDefaultBuilder(args)                .ConfigureServices((context, collection) =>                    collection.AddHostedService<KafkaProducerService>());    }}

В проект KafkaConsumer добавлю класс KafkaConsumerService

using Confluent.Kafka;using Microsoft.Extensions.Hosting;using Microsoft.Extensions.Logging;using System.Threading;using System.Threading.Tasks;namespace KafkaConsumer{    public class KafkaConsumerService : IHostedService    {        private readonly ILogger<KafkaConsumerService> _logger;        private readonly IConsumer<Ignore, string> _consumer;        public KafkaConsumerService(ILogger<KafkaConsumerService> logger)        {            _logger = logger;            var config = new ConsumerConfig            {                BootstrapServers = "localhost:9092",                GroupId = "demo-group",                AutoOffsetReset = AutoOffsetReset.Earliest            };            _consumer = new ConsumerBuilder<Ignore, string>(config).Build();        }        public Task StartAsync(CancellationToken cancellationToken)        {            _consumer.Subscribe("demo-topic");            while (!cancellationToken.IsCancellationRequested)            {                var consumeResult = _consumer.Consume(cancellationToken);                _logger.LogInformation($"Received >> {consumeResult.Message.Value}");            }            return Task.CompletedTask;        }        public Task StopAsync(CancellationToken cancellationToken)        {            _consumer?.Dispose();            _logger.LogInformation($"{nameof(KafkaConsumerService)} stopped");            return Task.CompletedTask;        }    }}

Изменю файл Program.cs

using Microsoft.Extensions.DependencyInjection;using Microsoft.Extensions.Hosting;using System;namespace KafkaConsumer{    class Program    {        static void Main(string[] args)        {            CreateHostBuilder(args).Build().Run();            Console.ReadKey();        }        private static IHostBuilder CreateHostBuilder(string[] args) =>            Host                .CreateDefaultBuilder(args)                .ConfigureServices((context, collection) =>                    collection.AddHostedService<KafkaConsumerService>());    }}

Результат работы приложений (ссылка на репозиторий)

Подробнее..
Категории: Net , Kafka

Как мы Schema Registry для Kafka настраивали, и что могло пойти не так

16.03.2021 14:17:52 | Автор: admin

Всем привет.

В статье я опишу, как мы настраивали реестр схем данных для того, чтобы использовать его для сериализации и десериализации сообщений Kafka.

Спойлер: на данный момент реестр схем данных настроен и используется в боевой системе, каких-то проблем, связанных с SR, замечено не было.

Исходные данные (небольшое оправдание)

Разработчикам не всегда выпадает возможность выбирать от и до инструменты, которые следует использовать при реализации той или иной задачи. Например, если вы приходите в команду, где уже начали реализовывать что-то на чем-то, то полностью изменить направление может быть неподъемной задачей. Таким образом, примем за данность, что используется Kafka для асинхронного взаимодействия сервисов - передачи сообщений по событиям.

Что? Почему? Зачем?

А вот то, что мы выбрали осознанно, так это использование Apache Avro как систему сериализации данных.Почему мы выбрали Apache Avro?

Если кратко, то это:

  • компактность представления бинарного формата данных (а также есть возможность кодировки в JSON);

  • поддержка логических типов данных: BigDecimal, дата, дата/время в нативном виде;

  • версионирование моделей;

Этих пунктов хватило, чтобы выбрать Avro как схему данных. А версию взяли 1.8.2, которая вышла в мае 2017 года.

Последняя релизная версия Avro - 1.10.1 от декабря 2020-го. В ней решены некоторые проблемы, которые нам пришлось чинить костылями, но взять ее мы не могли по определенным причинам (так как в одном из сервисов для сериализации ответа внешней системе используется схема версии 1.8.2 и поднятие версии может привести к нарушению этого взаимодействия)

До версии 1.9 в Avro используется библиотека Joda Time для обработки логических типов, связанных с датой и временем. А начиная с версии 1.9 для этого используются нативные Java 8 библиотеки.

Перед непосредственно разработкой функциональности мы все-таки попробовали новую версию Avro, но испытали проблемы и конверсиями дат (например, сериализатор не смог корректно обработать LocalDateTime). Точных примеров я не вспомню, но нам нужно было очень аккуратно передавать время в разных временных зонах, а это не самая простая часть для конвертации.

Несомненно, можно найти еще преимущества и недостатки этой схемы данных, но мы остановились на ней.

Итак, с системой сериализации определились.

Далее речь пойдет о реестре схем данных.

Реестр схем данных, или The Confluent Schema Registry for Kafka. Благодаря SR можно обеспечить совместимость схем данных между продюсером и консьюмером Kafka. И, чего нам очень хотелось, SR поможет не потерять сообщения из-за ошибок сериализации и десериализации во время эволюции схемы данных.

В процессе настройки и при тестовых прогонах мы выявили некоторые, скажем так, особенности использования SR, о которых я хочу рассказать вам и попытаюсь собрать все неувязки в одном месте.

Настройка деплоя моделей

Прежде всего, модели нужно описать в JSON-формате. Мы создали репозиторий и описали там нужные нам модели. Описывать модели в JSON просто и понятно, например:

{ "type": "record", "name": "SomeMessage", "namespace": "com.trthhrts", "fields": [   {     "name": "title",     "type": "string"   }  ]}

Если хотим nullable-поле, то добавляем в тип возможность null:

{ "type": "record", "name": "AnotherMessage", "namespace": "com.trthhrts", "fields": [    {     "name": "messageDate",     "type": [       "null",       {         "type": "long",         "logicalType": "timestamp-millis"       }     ],     "default": null    }  ]}

Модели описали, теперь воспользуемся maven-плагином для взаимодействия с SR:

<plugin>   <groupId>io.confluent</groupId>   <artifactId>kafka-schema-registry-maven-plugin</artifactId>   <version>6.0.1</version>   <configuration>     <schemaRegistryUrls>         <param>${SCHEMA_REGISTRY_URL}</param>     </schemaRegistryUrls>     <outputDirectory>${project.build}/avro</outputDirectory>     <userInfoConfig />     <subjects>       <com.trthhrts.SomeMessage>src/SomeMessage.avsc</com.trthhrts.SomeMessage>     </subjects>     <schemaTypes>       <com.trthhrts.SomeMessage>AVRO</com.trthhrts.SomeMessage>     </schemaTypes>   </configuration></plugin>

Далее смотрим здесь описание возможностей плагина, и как мы можем управлять реестром схем:

  • проверить (validate) на корректность локальную схему;

  • проверить изменение схемы на совместимость (test-compatibility);

  • скачать схему с SR (download);

  • зарегистрировать схему (register).

Мы настроили в Gitlab CI stage с деплоем схем так (ручной запуск и только на master-ветке):

Примечание: ручной запуск, потому что в репозитории находятся не только модели Avro, но, например, тесты моделей, вспомогательные библиотеки для работы с Kafka. Смысла регистрировать новые версии схем, если в них не было изменений - нет.

register_schemas: stage: deploy script:   - mvn schema-registry:test-compatibility $MAVEN_CLI_OPTS -B   - mvn schema-registry:register $MAVEN_CLI_OPTS -B when: manual only:   - master
Возможно, стоит добавить фазу validate в самое начало скрипта:
register_schemas: stage: deploy before_script:   - *scm_connect script:   - mvn schema-registry:validate $MAVEN_CLI_OPTS -B   - mvn schema-registry:test-compatibility $MAVEN_CLI_OPTS -B   - mvn schema-registry:register $MAVEN_CLI_OPTS -B when: manual only:   - master

Итак, после деплоя наши схемы успешно хранятся в Kafka. Стоп, скажете вы, в смысле в Kafka? А как же SR?! Ну, SR живет отдельно от Kafka брокеров. Это дополнительный компонент, который может быть настроен на любом Kafka кластере и который использует Kafka для хранения в том числе и схем. И, SR предоставляет пользователю REST API для взаимодействия со схемами данных.

В специальном топике Kafka (<kafkastore.topic>, по умолчанию _schemas), который имеет 1 партицию и используется как WAL. И все данные, которыми оперирует SR, добавляются как сообщения в этот лог.

Упрощенно взаимодействие с SR выглядит так:

Настройка использования Schema Registry

Чтобы использовать SR, достаточно добавить свойство с URL подключения к SR и назначить в качестве сериализатора и десериализатора соответствующие классы: KafkaAvroSerializer и KafkaAvroDeserializer

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

И для консьюмера, например, так:

    @Bean    public ConcurrentKafkaListenerContainerFactory<String, ? extends SpecificRecordBase> kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory<String, ? extends SpecificRecordBase> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaConfigs(), new StringDeserializer(), deserializer()));        factory.setErrorHandler(errorHandler());        return factory;    }    private KafkaAvroDeserializer deserializer() {        KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();        deserializer.configure(kafkaConfigs(), false);        return deserializer;    }

НО...

Есть несколько нюансов, которые следует учесть перед тестом и тем более перед деплоем на бой.

Стратегия наименования сабжектов (subjects)

Если не кратко, то: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#sr-schemas-subject-name-strategy

А если кратко, то существует 3 стратегии:

  • TopicNameStrategy - название сабжекта основывается на названии топика (стратегия по умолчанию!);

  • RecordNameStrategy - название сабжекта основывается на названии модели данных (как способ логически объединить в группу похожие события, которые, однако, могут иметь различную структуру данных, но передавать в рамках одного топика);

  • TopicRecordNameStrategy - название сабжекта основывается и на названии топика, и на названии модели данных (опять же как способ логической группировки событий с различной структурой данных);

Для себя мы выбрали стратегию наименования по названию модели данных:

props.put(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class);

Выбрали такой вариант потому что:

  • топики для тестовых стендов и боевого названы по разному, а значит, выбери мы другую стратегию, в реестре схем данных копилось бы много схем для каждого топика, это привело бы к дополнительным сложностям при анализе работы и вообще поиску багов;

  • возможность на разных топиках протестировать именно те схемы, которые будут использоваться в боевом окружении.

Имейте это ввиду, если тоже зададитесь вопросом "А какую стратегию выбрать?".

Аналогично, можно задать стратегию наименования схемы для ключа (key) сообщения, но мы не используем ключ для сообщений (гарантируя очередность сообщений тем, что у нас задана только 1 партиция на брокерах), поэтому и задавать стратегию для ключа смысла нет.

auto.register.schemas

Свойство, которое отвечает за то, чтобы клиентские приложения автоматически регистрировали новые схемы в SR. Для dev-окружения это может быть полезно, но для боевого сервера все-таки лучше отключить. И да, по умолчанию это свойство активировано, имейте ввиду.

Ну а мы продюсерам это свойство отключили на первых этапах внедрения SR (еще в тестовом режиме):

props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);

use.latest.version

Применимо, только если предыдущее свойство (auto.register.schemas) выставлено в false. Если use.latest.version установлено в true, это говорит SR брать последнюю версию в сабжекте для сериализации, а не ту, которая идет вместе с данными клиента. По умолчанию false. Мы обнаружили, что сериализатор той версии, которую мы используем - 5.3.0, не задействует это свойство.

Тип совместимости схем

В SR доступно несколько типов совместимости схем. Подробно о них написано, например, в документации Confluent.

По умолчанию используется тип BACKWARD, который разрешает удалять поля из модели данных и добавлять optional-поля (те, которые могут быть null). Сравнение схемы идет только с последней версией, а при обновлении модели в первую очередь нужно обновлять консьюмеры, чтобы не получить ошибку десериализации (например, удалили поле, обновили продюсер, который послал сообщение без этого поля, а консьюмер будет его ожидать и свалится с ошибкой десериализации).

Нас устроил тип по умолчанию, не меняли.

specific.avro.reader

Мы захотели использовать типы BigDecimal и Timestamp в моделях. Чтобы такие типы корректно сериализовать и десериализовать, следует добавить это свойство:

put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

А так же добавить в класс SpecificData при запуске приложения нужные конверторы, например, так:

static {  SpecificData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());  SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());}

После этого, если ваши модели данных наследуются от SpecificRecordBase (что по умолчанию при типе записи record) - данные будут корректно сериализовываться и десериализовываться.

Вот пример описания модели с такими логическими типами:

{  "type": "record",  "name": "SomePaymentInfoModel",  "namespace": "com.trthhrts",  "fields": [    {      "name": "paymentDate",      "type": [        "null",        {          "type": "long",          "logicalType": "timestamp-millis"        }      ],      "default": null    },    {      "name": "amount",      "type": [        "null",        {          "type": "bytes",          "logicalType": "decimal",          "precision": 9,          "scale": 2        }      ],      "default": null    }  ]}

Кэширование

В KafkaAvroSerializer и KafkaAvroDeserializer используется локальное кэширование, а значит клиенты не будут каждый раз запрашивать схему у SR, только в случае отсутствия нужной схемы нужной версии в локальном кэше.

Для нас это означало, что сервисы не попадут в постоянную зависимость на доступность SR и отказ SR далеко не всегда будет означать невозможность сериализации и десериализации сообщений Kafka - по крайней мере, пока жив локальный кэш в приложении.

И все?

Нет.

В процессе тестирования и разработки мы столкнулись с не совсем очевидными вещами, о которых я расскажу, возможно, это сэкономит вам времени.

Joda Time

Avro 1.8.2 использует Joda Time библиотеку для работы с полями дат и времени. А мы пишем на Java 8 и используем нативные классы для работы с датами.

В процессе разработки был составлен класс утилитных методов для конвертаций дат. Мы нашли это наименьшей головной болью при передаче дат с часовым поясом с помощью Avro по Kafka.

Вот так мы, например, конвертируем даты перед отправкой:

convertDate(model.getBeginDate()).ifPresent(builder::setBeginDate)convertLocalDateToJodaDateTime(model.getPaymentDate()).ifPresent(builder::setPaymentDate);

А вот так, например, принимаем:

convertToLocalDateTime(message.getLastUpdate()).ifPresent(model::setLastUpdate);
Привожу класс целиком, собрали методы из разных уголков вселенной
/** * Класс с утилитными методам для конвертации типов */public final class MessageUtils {    /** Кол-во наносекунд в миллисекунде */    private static final int NANO_SECONDS_IN_MILLISECOND = 1_000_000;    /** Кол-во миллисекунд в секунде */    private static final int MILLISECONDS_IN_SECONDS = 1000;    /**     * Запрещаем создание     */    private MessageUtils() {    }    /**     * Конвертирует Joda {@link DateTime} в Java 8 {@link ZonedDateTime}     * @param dateTime дата в формате Joda {@link DateTime}     * @return {@link ZonedDateTime}     */    public static Optional<ZonedDateTime> convertToZonedDateTime(DateTime dateTime) {        return Optional.ofNullable(dateTime).map(dt -> {            DateTime localDateTime = dateTime.withZone(DateTimeZone.getDefault());            return ZonedDateTime.ofLocal(                    LocalDateTime.of(                            localDateTime.getYear(),                            localDateTime.getMonthOfYear(),                            localDateTime.getDayOfMonth(),                            localDateTime.getHourOfDay(),                            localDateTime.getMinuteOfHour(),                            localDateTime.getSecondOfMinute(),                            localDateTime.getMillisOfSecond() * NANO_SECONDS_IN_MILLISECOND),                    ZoneId.of(localDateTime.getZone().getID(), ZoneId.SHORT_IDS),                    ZoneOffset.ofTotalSeconds(localDateTime.getZone().getOffset(localDateTime) / MILLISECONDS_IN_SECONDS));        });    }    /**     * Конвертирует Joda {@link DateTime} в Java 8 {@link LocalDateTime}     * @param dateTime дата в формате Joda {@link DateTime}     * @return {@link LocalDateTime}     */    public static Optional<LocalDateTime> convertToLocalDateTime(DateTime dateTime) {        return convertToZonedDateTime(dateTime).map(ZonedDateTime::toLocalDateTime);    }    /**     * Конвертирует Joda {@link DateTime} в Java 8 {@link LocalDate}     * @param dateTime дата в формате Joda {@link DateTime}     * @return {@link LocalDate}     */    public static Optional<LocalDate> convertToLocalDate(DateTime dateTime) {        return convertToZonedDateTime(dateTime).map(ZonedDateTime::toLocalDate);    }    /**     * Конвертирует {@link LocalDate} в {@link org.joda.time.LocalDate}     * @param ld {@link LocalDate}     * @return {@link org.joda.time.LocalDate}     */    public static Optional<org.joda.time.LocalDate> convertLocalDateToJodaLocalDate(LocalDate ld) {        return Optional.ofNullable(ld).map(date -> new org.joda.time.LocalDate(date.getYear(), date.getMonthValue(), date.getDayOfMonth()));    }    /**     * Конвертирует {@link org.joda.time.LocalDate} в {@link LocalDate}     * @param ld {@link org.joda.time.LocalDate}     * @return {@link LocalDate}     */    public static Optional<LocalDate> convertJodaLocalDateToLocalDate(org.joda.time.LocalDate ld) {        return Optional.ofNullable(ld).map(date -> LocalDate.of(date.getYear(), date.getMonthOfYear(), date.getDayOfMonth()));    }    /**     * Конвертирует {@link LocalDateTime} в Joda {@link DateTime}     * @param ldt дата в формате {@link LocalDateTime}     * @return дата в формате Joda {@link DateTime}     */    public static Optional<DateTime> convertLocalDateTimeToJodaDateTime(LocalDateTime ldt) {        return Optional.ofNullable(ldt)                .map(date -> new org.joda.time.LocalDateTime(date.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()).toDateTime());    }    /**     * Конвертирует {@link LocalDate} в Joda {@link DateTime}     * @param ld дата в формате {@link LocalDate}     * @return дата в формате Joda {@link DateTime}     */    public static Optional<DateTime> convertLocalDateToJodaDateTime(LocalDate ld) {        return Optional.ofNullable(ld)                .map(date ->                        new org.joda.time.LocalDate(date.getYear(), date.getMonthValue(), date.getDayOfMonth()).toDateTimeAtStartOfDay());    }}

"avro.java.string": "String"

Сразу кину ссылку на issue: https://issues.apache.org/jira/browse/AVRO-2702

Если грубо, то в самой схеме, допустим, нет такой строчки. Но maven-плагин пишет в схему внутри модели именно так:

{  "type": "record",  "name": "FromMaven",  "namespace": "com.trthhrts",  "fields": [    {      "name": "message",      "type": {        "type": "string",        "avro.java.string": "String"      }    }  ]}

От этого стратегия наименования значений сабжектов по названию модели - RecordNameStrategy - отрабатывает некорректно, и мы получаем ошибку при десериализации, как будто схема не найдена.

Workaround: добавить "avro.java.string": "String" для каждого string-поля в моделях (тогда будет совпадение с тем, как схему опишет maven-плагин).

Решение совсем неэлегантное, если есть варианты изящнее - с удовольствием обсудим в комментариях.

Выводы

В итоге, мы внедрили Schema Registry, и успешно его используем. Инструмент очень полезный, но есть определенные подводные грабли, которые мы успешно протестировали и отложили в сторону еще на этапе разработки.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru