Русский
Русский
English
Статистика
Реклама

Kafka apache

Перевод Производительпотребитель на Kafka и Kotlin

13.07.2020 20:06:08 | Автор: admin

Перевод статьи подготовлен в преддверии старта курса Backend-разработка на Kotlin




В этой статье мы поговорим о том, как создать простое приложение на Spring Boot с Kafka и Kotlin.


Введение


Начните с посещения https://start.spring.io и добавьте следующие зависимости:


Groovy


implementation("org.springframework.boot:spring-boot-starter-data-rest")implementation("org.springframework.boot:spring-boot-starter-web")implementation("com.fasterxml.jackson.module:jackson-module-kotlin")implementation("org.apache.kafka:kafka-streams")implementation("org.jetbrains.kotlin:kotlin-reflect")implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")implementation("org.springframework.kafka:spring-kafka")

В нашем примере для сборки мы воспользуемся Gradle. Вы вполне можете выбрать Maven.


Создайте и загрузите проект. Затем импортируйте его в IntelliJ IDEA.


Скачайте Apache Kafka


Загрузите последнюю версию Apache Kafka с их сайта и распакуйте в папку. Я пользуюсь операционной системой Windows 10. При запуске Kafka вы можете столкнуться с некоторыми проблемами по типу too many lines encountered. Так происходит потому что Kafka добавляет большую структуру папок в свое имя пути. Если эта проблема не будет устранена автоматически, вам придется переименовать структуру папок как-нибудь покороче и запустить приложение из Power Shell.


Чтобы запустить Kafka, воспользуйтесь следующими командами:


Shell


.\zookeeper-server-start.bat ..\..\config\zookeeper.properties.\kafka-server-start.bat ..\..\config\server.properties

Эти две команды вы увидите в папке /bin/windows.


Чтобы запустить Kafka, сначала нужно запустить Zookeeper. Zookeeper это продукт Apache, который предоставляет сервис распределенной конфигурации.


Запуск Spring Boot


Первым шагом создайте в своей IDE класс, который называется KafkaDemoApplication.kt. При создании проекта с сайта Spring, класс будет создан автоматически.


Добавьте следующий код:


Kotlin


import org.springframework.boot.autoconfigure.SpringBootApplicationimport org.springframework.boot.runApplication@SpringBootApplicationclass KafkaDemoApplication fun main(args: Array<String>) {   runApplication<KafkaDemoApplication>(*args)}

Производитель


Мы можем отправлять сообщения в топики двумя способами. Их мы рассмотрим ниже.


Мы разработаем класс-контроллер, который нужен для отправки и получения сообщений. Назовем этот класс KafkaController.kt. И добавим следующий метод:


Kotlin


var kafkaTemplate:KafkaTemplate<String, String>? = null;val topic:String = "test_topic"@GetMapping("/send")fun sendMessage(@RequestParam("message") message : String) : ResponseEntity<String> {    var lf : ListenableFuture<SendResult<String, String>> = kafkaTemplate?.send(topic, message)!!    var sendResult: SendResult<String, String> = lf.get()    return ResponseEntity.ok(sendResult.producerRecord.value() + " sent to topic")}

Для отправки сообщений в топик, который называется test_topic, мы используем KafkaTemplate. Он будет возвращать объект ListenableFuture, из которого мы можем получить результат этого действия. Такой подход является самым простым, если вы просто хотите отправлять сообщение в топик.


Второй способ


Следующий способ отправки сообщения в топик Kafka это использование объекта KafkaProducer. Для этого мы напишем следующий код:


Kotlin


@GetMapping("/produce")fun produceMessage(@RequestParam("message") message : String) : ResponseEntity<String> {    var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)    val map = mutableMapOf<String, String>()    map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"    map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"    map["bootstrap.servers"] = "localhost:9092"    var producer = KafkaProducer<String, String>(map as Map<String, Any>?)    var future:Future<RecordMetadata> = producer?.send(producerRecord)!!    return ResponseEntity.ok(" message sent to " + future.get().topic());}

И тут нужно сделать небольшое пояснение.


Нам нужно инициализировать объект KafkaProduce с Map, которая будет содержать ключ и значение для сериализации. В нашем примере речь идет о строковом сообщении, поэтому нам нужен только StringSerializer.


В принципе, Serializer это интерфейс Kafka, который преобразует строки в байты. В Apache Kafka есть и другие сериализаторы, такие как ByteArraySerializer, ByteSerializer, FloatSerializer и др.


Для map мы указываем ключ и значение с StringSerializer.


Kotlin


map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"

Следующее значение это сведения о bootstrap-сервере, необходимые для коммуникации с кластером Kafka.


Kotlin


map["bootstrap.servers"] = "localhost:9092"

Все эти атрибуты нужны, если мы используем KafkaProducer.


Затем нам нужно создать ProducerRecord с именем топика и самим сообщением. Именно это мы и сделаем в следующей строке:


Kotlin


var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)

Теперь мы можем отправить наше сообщение в топик с помощью следующего кода:


Kotlin


var future:Future<RecordMetadata> = producer?.send(producerRecord)!!

Эта операция вернет future с именем топика, который используется для отправки сообщения.


Потребитель


Мы посмотрели, как отправлять сообщения в топики. Но нам также нужно слушать входящие сообщения. Чтобы это сделать, нужно создать слушателя, который будет потреблять сообщения.
Давайте создадим класс MessageConsumer.kt и пометим его с помощью Service.


Kotlin


@KafkaListener(topics= ["test_topic"], groupId = "test_id")fun consume(message:String) :Unit {    println(" message received from topic : $message");}

Этот метод можно использовать для прослушивания сообщения с помощью аннотации @KafkaListener и вывода сообщения в консоль, как только оно появляется в топике. Только убедитесь, что вы используете то же имя топика, что и для отправки сообщения.
Исходный код вы можете посмотреть в моем репозитории на GitHub.




Узнать подробнее о курсе Backend-разработка на Kotlin



Подробнее..

Kafka Streams непростая жизнь в production

14.01.2021 16:09:15 | Автор: admin

Привет, Хабр! Вокруг меня сформировался позитивный информационный фон на тему обработки событий через Kafka Streams. Этот инструмент привлекает множеством видео-докладов и статей на Хабре, подробной документацией, понятным API и красивой архитектурой. Некоторые мои знакомые и коллеги разрабатывают с его помощью свои системы. Но что происходит с в реальной жизни, когда эти системы уходят в production?

В этой статье я опущу введение в Kafka Streams, предполагая, что читатель уже знаком с ней, и расскажу о нашем опыте жизни с этой библиотекой на примере достаточно нагруженной системы.

Коротко о проекте

Внутренней командой вместе с партнерами мы работаем над биржой Ad Exchange, которая помогает перепродавать рекламный трафик. Специфику подобных инструментов мы уже когда-то описывали в статье на Хабре. По мере роста числа партнеров среди SSP и DSP, нагрузка на сервера биржи растет. А для повышения ценности самой биржи мы должны собирать с этого трафика развернутую аналитику. Тут-то мы и попытались применить Kafka Streams.

Внедрить новый инструмент у себя на продакшене - это всегда некоторый риск. Мы его осознавали, но смирились с этим, поскольку в целом Kafka Streams концептуально должен был хорошо подойти для расчета агрегатов. И хотя первое впечатление было отличным, далее я расскажу о проблемах, к которым это привело.

Статья не претендует на объективность и описывает только лишь наш опыт и проблемы, с которыми мы столкнулись, работая с этой технологией. Надеюсь, кому-то она поможет избежать ошибок при использовании Kafka Streams. А может быть даст повод посмотреть по сторонам.

Агрегаты

В нашу систему приходят десятки и сотни тысяч сообщений в секунду. Первое, что нам необходимо было сделать, - агрегации по различным ключам. Например, идёт поток событий "Показ рекламы". Для него надо считать на лету общую стоимость и количество, группируя по различным ключам: стране, партнёру, рекламной площадке и т.д.

Вроде бы стандартный кейс для Kafka Streams: используем функции groupBy и aggregate и получаем нужный результат. Всё работает именно так, причем с отличной скоростью за счёт внутреннего кэша: несколько последовательных изменений по одному и тому же ключу сначала выполняются в кэше и только в определённые моменты отправляются в changelog-топик. Далее Kafka в фоне удаляет устаревшие дублирующиеся ключи через механизм log compaction. Что здесь может пойти не так?

Репартиционирование

Если ваш ключ группировки отличается от ключа, под которым изначально пришло событие, то Kafka Streams создаёт специальный repartition-топик, отправляет в него событие уже под новым ключом, а потом считывает его оттуда и только после этого проводит агрегацию и отправку в changelog-топик. В нашем примере вполне может быть, что событие "Показ рекламы" пришло с ключом в виде UUID. Почему нет? Если вам надо сделать группировку, например по трём другим ключам, то это будет три разных repartition-топика. На каждый топик будет одно дополнительное чтение и одна дополнительная запись в Kafka. Чувствуете, к чему я веду?

Предположим, на входе у вас 100 тысяч показов рекламы в секунду. В нашем примере вы создадите дополнительную нагрузку на брокер сообщений в размере +600 тысяч сообщений в секунду (300 на запись и 300 на чтение). И ведь не только на брокер. Для таких объёмов надо добавлять дополнительные сервера с сервисами Kafka Streams. Можете посчитать, во сколько тысяч долларов обойдется такое решение с учётом цен на железо.

Для читателей, не очень хорошо знакомых с механизмом репартиционирования, я поясню один момент. Это не баг или недоработка Kafka Streams. С учётом её идеологии и архитектуры это единственное возможное поведение - его нельзя просто взять и отключить. Когда у сообщения меняется ключ, этот новый ключ надо равномерно "размазать" по кластеру так, чтобы каждый инстанс Kafka Streams имел собственный набор ключей. Для этого и служат дополнительные запись/чтение в repartition-топик. При этом если инстанс А записал в топик сообщение, то не факт, что он же его и прочитает. Это может сделать инстанс Б, В и т.д. в зависимости от того, кто какую партицию читает. В итоге каждый ключ группировки у вас будет более-менее равномерно распределён по серверам кластера (если вы его хэшируете, конечно).

Запросы к агрегатам

Данные пишут для того, чтобы с ними потом можно было работать. Например делать к ним запросы. И здесь наши возможности оказались серьезно ограничены. Исчерпывающий список того, как мы можем запрашивать данные у Kafka Streams, есть здесь или здесь для оконных агрегатов. Если используется стандартная реализация state-store на основе RocksDB (а скорее всего это так), фактически данные можно получать только по ключам.

Предположу, что первое, что вам хочется сделать с полученными данными, - это фильтрация, сортировка и пагинация. Но здесь таких возможностей нет. Максимум, что вы можете, - это считать всё, что есть, используя метод all(), а потом вручную делать нужные операции. Вам повезет, если ключей не слишком много и данные уместятся в оперативной памяти. Нам не повезло. Пришлось писать дополнительный модуль, который по ночам выгружал данные из RocksDB и отправлял в Postgres.

Ещё надо помнить, что на каждом отдельно взятом сервисе находится только часть ключей. Что если вам необходимо отдавать ваши агрегаты, скажем, по HTTP? Вот кто-то сделал запрос к одному из сервисов Kafka Streams: мол, дай мне такие-то данные. Сервис смотрит у себя локально - данных нет. Или есть, но какая-то часть. Другая часть может быть на другом сервере. Или на всех. Что делать? Документация Kafka Streams говорит, что это наши проблемы.

Стабильность Kafka Streams

У нас бывают проблемы с хостинг провайдером. Иногда выходит из строя оборудование, иногда человеческий фактор, иногда и то, и другое. Если по какой-то причине теряется связь с Kafka, то Kafka Streams переводит все свои потоки в состояние DEAD и ждёт, когда мы проснёмся и сделаем ей рестарт. При этом рядом стоят соседние сервисы, которые работают с той же Kafka через Spring и @KafkaListener. Они восстанавливаются сами, как ни в чём не бывало.

Kafka Streams может умереть и по другой причине: не пойманные исключения в вашем коде. Предположим, вы делаете какой-то внешний вызов внутри потоков Kafka Streams. Пусть это будет обращение к базе данных. Когда база данных падает, у вас два варианта. Первый - вы ловите эту ошибку и продолжаете работать дальше. Но если база будет лежать долго, а работа с ней - критична, вы будете считывать большое количество сообщений из топиков и некорректно их обрабатывать. Второй вариант кажется лучше - совсем не ловить исключение и дать Kafka Streams умереть. Дальше как обычно: ночной подъём, рестарт всех серверов с Kafka Streams. "Из коробки" у вас нет варианта подождать, пока база восстановится, и продолжить работу.

Нам пришлось дописать в каждый сервис Kafka Streams дополнительный модуль, который работает как watchdog: поднимает Kafka Streams, если видит, что она умерла.

Кстати, если вы работаете с Kafka Streams через Spring, то не забудьте переопределить стандартный StreamsBuilderFactoryBean, указав в нём свой CleanupConfig. Иначе будете неприятно удивлены тем, что при каждом рестарте будет удаляться вся локальная база RocksDB. Напомню, что это приведёт к тому, что при каждом рестарте все сервера начнут активно считывать данные из changelog-топика. Поверьте, вам это не нужно.

KStream-KStream Join

Здесь можно было бы обойтись одной фразой: никогда это не используйте. Джоин двух потоков создаёт десятки топиков в Kafka и огромную нагрузку на всю систему. Просто не делайте этого. Ну или хотя бы проверьте все под нагрузкой, прежде чем ставить в production.

Вообще Kafka Streams любит создавать топики под различные свои нужды. Если вы не изучили под лупой документацию и не протестировали, как это работает, то это может стать для вас и ваших DevOps неприятным сюрпризом. Чем больше топиков, тем сложнее их администрировать.

Масштабируемость

Любой человек, знакомый с Kafka, скажет, что масштабируемость - одна из главных её преимуществ: "В чём проблема? Добавляем партиций и радуемся". Всё так. Но не для Kafka Streams.

Если вы используете джоины, то все ваши топики должны быть ко-партиционированы (co-partitioning), что, в числе прочего, означает, что у них должно быть одинаковое количество партиций. Так в чём же проблема?

Представим, что вы сделали несколько топологий: с агрегатами, джоинами, всё как положено. Поставили в production, оно работает, вы радуетесь. Дальше бизнес пошел в гору, нагрузка начала расти, вы добавили серверов и хотите увеличить количество партиций, чтобы загрузить эти новые сервера. Но Kafka Streams наплодил десятки топиков. И у всех надо поднимать количество партиций, иначе следующий рестарт ваших сервисов может стать последним. Если Kafka Streams увидит, что топики не ко-партиционированы, она выдаст ошибку и просто не запустится.

На вопрос, что с этим делать, у меня сегодня ответа нет. Вероятно, можно потушить все рабочие инстансы Kafka Streams, потом поднять число партиций на всех причастных топиках, затем поднять Kafka Streams обратно и молиться. А может быть последовать совету отсюда: Matthias J. Sax пишет, что это нужно делать, создавая новый топик с новым количеством партиций и подключать к нему Kafka Streams с новым application.id. Там же есть совет, что если вы знаете заранее, что нагрузка будет большая, то лучше сделать партиций с запасом.

Заключение

Если у вас в системе нет и не планируется высоких нагрузок, то со всеми этими проблемами вы скорее всего не столкнётесь. Всё будет работать отлично. В противном случае стоит серьёзно задуматься, нужно ли вам это, особенно если планируете использовать следующий уровень абстракции - KSQL.

Автор статьи: Андрей Буров, Максилект.

P.S. Мы публикуем наши статьи на нескольких площадках Рунета. Подписывайтесь на наши страницы в VK, FB, Instagram или Telegram-канал, чтобы узнавать обо всех наших публикациях и других новостях компании Maxilect.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru