Русский
Русский
English
Статистика
Реклама

Partition

Перевод Как Apache Kafka поддерживает 200К партиций в кластере?

02.03.2021 04:06:51 | Автор: admin


В Kafka топик может содержать множество партиций, между которыми распределяются записи. Партиции это единицы параллелизма. В целом, чем больше партиций, тем выше пропускная способность. Однако есть некоторые факторы, которые стоит учитывать, когда в кластере Kafka много партиций. Рад сообщить, что в Apache Kafka, начиная с версии 1.1.0, было существенно увеличено количество партиций, которые может поддерживать один кластер Kafka с точки зрения деплоймента и доступности.


Чтобы понять это улучшение, будет полезно повторить базовую информацию о лидерах партиций и контроллере. Во-первых, каждая партиция может иметь несколько реплик для большей доступности и устойчивости. Одна из реплик назначается лидером, и все запросы клиента обслуживаются этой ведущей репликой. Во-вторых, один из брокеров в кластере выступает в качестве контроллера, который управляет всем кластером. В случае отказа брокера контроллер отвечает за выбор новых лидеров партиций на отказавшем брокере.


Брокер Kafka по умолчанию выполняет контролируемое отключение, чтобы минимизировать сбои в обслуживании клиентов. Контролируемое отключение проходит следующие этапы. (1) Сигнал SIG_TERM отправляется брокеру для завершения работы. (2) Брокер отправляет запрос контроллеру, уведомляя, что он готов к отключению. (3) Контроллер затем меняет лидеров партиций на этом брокере на других брокеров и сохраняет эту информацию в ZooKeeper. (4) Контроллер отправляет информацию о новых лидерах другим брокерам в кластере. (5) Контроллер отправляет выключающемуся брокеру положительный ответ на запрос, и брокер, наконец, завершает свой процесс. Таким образом это никак не влияет на клиентов, потому что их трафик уже перенаправлен на других брокеров. Этот процесс изображен на Рисунке 1. Заметьте, что шаги (4) и (5) могут происходить параллельно.



Рис. 1. (1) Инициация отключения на брокере 1; (2) брокер 1 отправляет запрос о контролируемом отключении контроллеру на брокере 0; (3) контроллер записывает новых лидеров в ZooKeeper; (4) контроллер отправляет информацию о новых лидерах брокеру 2; (5) контроллер отправляет положительный ответ брокеру 1.


До версии Kafka 1.1.0 во время контролируемого отключения контроллер перемещает лидеров по одной партиции за раз. Для каждой партиции контроллер выбирает нового лидера, параллельно записывает его в ZooKeeper и отправляет нового лидера другим брокерам через удаленный запрос. В этом процессе есть несколько недостатков. Во-первых, у синхронной записи в ZooKeeper более высокое время задержки, что замедляет процесс контролируемого отключения. Во-вторых, отправка нового лидера по партиции за раз добавляет много маленьких удаленных запросов к каждому брокеру, что может вызвать задержку обработки новых лидеров.


Начиная с Kafka 1.1.0 и далее контролируемое отключение происходит значительно быстрее. Во-первых, при записи в ZooKeeper используется асинхронный API. Вместо того чтобы записывать лидера для одной партиции, ждать, пока этот процесс завершится, а потом записывать следующего, контроллер асинхронно размещает лидеров для многих партиций в ZooKeeper, а потом ждет завершения процесса. Это позволяет обрабатывать запросы на пайплайне между брокером Kafka и сервером ZooKeeper и снижает общее время задержки. Во-вторых, информация о новых лидерах отправляется батчами. Вместо одного удаленного запроса для каждой партиции контроллер отправляет один удаленный запрос с лидером от всех затронутых партиций.


Время обработки отказа контроллером также было значительно улучшено. Если случается отказ контроллера, кластер Kafka автоматически выбирает контроллером другого брокера. Прежде чем иметь возможность выбрать лидеров партиций свеженазначенный контроллер должен загрузить состояние всех партиций в кластере из ZooKeeper. Если произошел серьезный сбой контроллера, окно недоступности партиции может длиться столько, сколько продолжается срок действия сессии ZooKeeper, плюс время перезагрузки состояния контроллера. Поэтому сокращение времени перезагрузки состояния улучшает доступность в таком редком случаем. До версии 1.1.0 в Kafka для перезагрузки используется синхронный API ZooKeeper. Начиная с версии 1.1.0 это также заменено на асинхронный API для сокращения времени задержки.


Мы провели тесты для оценки улучшений по времени контролируемого отключения и перезагрузки контроллера. Для обоих тестов мы загрузили набор из пяти узлов ZooKeeper на выделенных серверах.


Для первого теста мы подготовили кластер Kafka с пятью брокерами на отдельных серверах. В этом кластере мы создали 25 000 топиков, в каждом топике по одной партиции и две реплики, в общем получив 50 000 партиций. Таким образом на каждом брокере было 10 000 партиций. Затем мы замерили время, которое понадобилось для контролируемого отключения. Результаты представлены в таблице ниже.


Версия Kafka 1.0.0 Kafka 1.1.0
Время контролируемого отключения 6,5 минут 3 секунды

Большую часть улучшений дает исправление затрат на журналирование, при котором проводятся ненужные записи для каждой партиции в кластере при смене лидера одной партиции. Просто исправив затраты на журналирование, мы снизили время контролируемого отключения с 6,5 минут до 30 секунд. Переход на асинхронный API ZooKeeper снизил это время до 3 секунд. Эти улучшения существенно снизили время, необходимое для перезагрузки кластера Kafka.


Для второго теста мы подготовили другой кластер Kafka, состоящий из пяти брокеров, создали 2 000 топиков, в каждом по 50 партиций и одной реплике. В сумме во всем кластере получилось 100 000 партиций. Затем мы замерили время перезагрузки состояния контроллера и увидели 100% улучшение (время перезагрузки снизилось с 28 секунд в Kafka 1.0.0 до 14 секунда в Kafka 1.1.0).


Учитывая эти изменения, на поддержку какого количества партиций вы можете рассчитывать в Kafka? Точное число зависит от таких факторов как допустимое окно недоступности, время задержки ZooKeeper, тип хранения на брокере и т.д. В качестве общего правила мы рекомендуем иметь на каждом брокере до 4 000 партиций и до 200 000 партиций в кластере. Основная причина для лимита на кластере заключается в том, что нужно подстраховаться на тот редкий случай серьезного сбоя контроллера, о котором мы писали выше. Обратите внимание, что другие соображения, связанные с партициями, также применимы, и вам может потребоваться дополнительная настройка конфигурации с большим количеством партиций.


Более подробную информацию вы найдете в KAFKA-5642 и KAFKA-5027.


От редакции: Команда Слёрма готовит подробный видеокурс по Apache Kafka. Спикеры Анатолий Солдатов из Авито и Александр Миронов из Stripe. Вводные уроки уже доступны в личном кабинете. Релиз полной версии курса апрель 2021. Подробности.

Подробнее..

Перевод Практический взгляд на хранение в Kafka

29.12.2020 06:05:42 | Автор: admin


Kafka повсюду. Где есть микросервисы и распределенные вычисления, а они сейчас популярны, там почти наверняка есть и Kafka. В статье я попытаюсь объяснить, как в Kafka работает механизм хранения.


Я, конечно, постарался не усложнять, но копать будем глубоко, поэтому какое-то базовое представление о Kafka не помешает. Иначе не все будет понятно. В общем, продолжайте читать на свой страх и риск.


Обычно считается, что Kafka это распределенная и реплицированная очередь сообщений. С технической точки зрения все верно, но термин очередь сообщений не все понимают одинаково. Я предпочитаю другое определение: распределенный и реплицированный журнал коммитов. Эта формулировка кажется более точной, ведь мы все прекрасно знаем, как журналы записываются на диск. Просто в этом случае на диск попадают сообщения, отправленные в Kafka.



Применительно к хранению в Kafka используется два термина: партиции и топики. Партиции это единицы хранения сообщений, а топики что-то вроде контейнеров, в которых эти партиции находятся.


С основной теорией мы определились, давайте перейдем к практике.


Я создам в Kafka топик с тремя партициями. Если хотите повторять за мной, вот как выглядит команда для локальной настройки Kafka в Windows.


kafka-topics.bat --create --topic freblogg --partitions 3 --replication-factor 1 --zookeeper localhost:2181

В каталоге журналов Kafka создано три каталога:


$ tree freblogg*freblogg-0|-- 00000000000000000000.index|-- 00000000000000000000.log|-- 00000000000000000000.timeindex`-- leader-epoch-checkpointfreblogg-1|-- 00000000000000000000.index|-- 00000000000000000000.log|-- 00000000000000000000.timeindex`-- leader-epoch-checkpointfreblogg-2|-- 00000000000000000000.index|-- 00000000000000000000.log|-- 00000000000000000000.timeindex`-- leader-epoch-checkpoint

Мы создали в топике три партиции, и у каждой свой каталог в файловой системе. Еще тут есть несколько файлов (index, log и т д.), но о них чуть позже.


Обратите внимание, что в Kafka топик это логическое объединение, а партиция фактическая единица хранения. То, что физически хранится на диске. Как устроены партиции?


Партиции


В теории партиция это неизменяемая коллекция (или последовательность) сообщений. Мы можем добавлять сообщения в партицию, но не можем удалять. И под мы я подразумеваю продюсеров в Kafka. Продюсер не может удалять сообщения из топика.


Сейчас мы отправим в топик пару сообщений, но сначала обратите внимание на размер файлов в папках партиций.


$ ls -lh freblogg-0total 20M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121   0 Aug  5 08:26 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121   0 Aug  5 08:26 leader-epoch-checkpoint

Как видите, файлы index вместе весят 20 МБ, а файл log совершенно пустой. В папках freblogg-1 и freblogg-2 то же самое.
Давайте отправим сообщения через console producer и посмотрим, что будет:


kafka-console-producer.bat --topic freblogg --broker-list localhost:9092

Я отправил два сообщения сначала ввел стандартное Hello World, а потом нажал на Enter, и это второе сообщение. Еще раз посмотрим на размеры файлов:


$ ls -lh freblogg*freblogg-0:total 20M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121   0 Aug  5 08:26 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121   0 Aug  5 08:26 leader-epoch-checkpointfreblogg-1:total 21M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121  68 Aug  5 10:15 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121  11 Aug  5 10:15 leader-epoch-checkpointfreblogg-2:total 21M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121  79 Aug  5 09:59 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121  11 Aug  5 09:59 leader-epoch-checkpoint

Два сообщения заняли две партиции, и файлы log в них теперь имеют размер. Это потому, что сообщения в партиции хранятся в файле xxxx.log. Давайте заглянем в файл log и убедимся, что сообщение и правда там.


$ cat freblogg-2/*.log@^@^B^@^K^X^@^@^@^A"^@^@^A^VHello World^@

Файлы с форматом log не очень удобно читать, но мы все же видим в конце Hello World, то есть файл обновился, когда мы отправили сообщение в топик. Второе сообщение мы отправили в другую партицию.


Обратите внимание, что первое сообщение попало в третью партицию (freblogg-2), а второе во вторую (freblogg-1). Для первого сообщения Kafka выбирает партицию произвольно, а следующие просто распределяет по кругу (round-robin). Если мы отправим третье сообщение, Kafka запишет его во freblogg-0 и дальше будет придерживаться этого порядка. Мы можем и сами выбирать партицию, указав ключ. Kafka хранит все сообщения с одним ключом в одной и той же партиции.


Каждому новому сообщению в партиции присваивается Id на 1 больше предыдущего. Этот Id еще называют смещением (offset). У первого сообщения смещение 0, у второго 1 и т. д., каждое следующее всегда на 1 больше предыдущего.



<Небольшое отступление>


Давайте используем инструмент Kafka, чтобы понять, что это за странные символы в файле log. Нам они кажутся бессмысленными, но для Kafka это метаданные каждого сообщения в очереди. Выполним команду:


kafka-run-class.bat kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files logs\freblogg-2\00000000000000000000.log

Получим результат:


umping logs\freblogg-2\00000000000000000000.logStarting offset: 0offset: 0 position: 0 CreateTime: 1533443377944 isvalid: true keysize: -1 valuesize: 11 producerId: -1 headerKeys: [] payload: Hello Worldoffset: 1 position: 79 CreateTime: 1533462689974 isvalid: true keysize: -1 valuesize: 6 producerId: -1 headerKeys: [] payload: amazon

(Я удалил из выходных данных кое-что лишнее.)


Здесь мы видим смещение, время создания, размер ключа и значения, а еще само сообщение (payload).


</Небольшое отступление>


Надо понимать, что партиция привязана к брокеру. Если у нас, допустим, три брокера, а папка freblogg-0 существует в broker-1, в других брокерах ее не будет. У одного топика могут быть партиции в нескольких брокерах, но одна партиция всегда существует в одном брокере Kafka (если установлен коэффициент репликации по умолчанию 1, но об этом чуть позже).



Сегменты


Что это за файлы index и log в каталоге партиции? Партиция, может, и единица хранения в Kafka, но не минимальная. Каждая партиция разделена на сегменты, то есть коллекции сообщений. Kafka не хранит все сообщения партиции в одном файле (как в файле лога), а разделяет их на сегменты. Это дает несколько преимуществ. (Разделяй и властвуй, как говорится.)


Главное это упрощает стирание данных. Я уже говорил, что сами мы не можем удалить сообщение из партиции, но Kafka может это сделать на основе политики хранения для топика. Удалить сегмент проще, чем часть файла, особенно когда продюсер отправляет в него данные.


$ ls -lh freblogg-0total 20M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121   0 Aug  5 08:26 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121   0 Aug  5 08:26 leader-epoch-checkpoint

Нули (00000000000000000000) в файлах log и index в каждой папке партиции это имя сегмента. У файла сегмента есть файлы segment.log, segment.index и segment.timeindex.


Kafka всегда записывает сообщения в файлы сегментов в рамках партиции, причем у нас всегда есть активный сегмент для записи. Когда Kafka достигает лимита по размеру сегмента, создается новый файл сегмента, который станет активным.



В имени каждого файла сегмента отражается смещение от первого сообщения. На картинке выше в сегменте 0 содержатся сообщения со смещением от 0 до 2, в сегменте 3 от 3 до 5, и так далее. Последний сегмент, шестой, сейчас активен.


$ ls -lh freblogg*freblogg-0:total 20M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121   0 Aug  5 08:26 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121   0 Aug  5 08:26 leader-epoch-checkpointfreblogg-1:total 21M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121  68 Aug  5 10:15 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121  11 Aug  5 10:15 leader-epoch-checkpointfreblogg-2:total 21M- freblogg 197121 10M Aug  5 08:26 00000000000000000000.index- freblogg 197121  79 Aug  5 09:59 00000000000000000000.log- freblogg 197121 10M Aug  5 08:26 00000000000000000000.timeindex- freblogg 197121  11 Aug  5 09:59 leader-epoch-checkpoint

У нас всего по одному сегменту в каждой партиции, поэтому они называются 00000000000000000000. Раз других файлов сегментов нет, сегмент 00000000000000000000 и будет активным.


По умолчанию сегменту выдается целый гигабайт, но представим, что мы изменили параметры, и теперь в каждый сегмент помещается только три сообщения. Посмотрим, что получится.


Допустим, мы отправили в партицию freblogg-2 три сообщения, и она выглядит так:



Три сообщения это наш лимит. На следующем сообщении Kafka автоматически закроет текущий сегмент, создаст новый, сделает его активным и сохранит новое сообщение в файле log этого сегмента. (Я не показываю предыдущие нули, чтобы было проще воспринять).


freblogg-2|-- 00.index|-- 00.log|-- 00.timeindex|-- 03.index|-- 03.log|-- 03.timeindex`--

Удивительное дело, но новый сегмент называется не 01. Мы видим 03.index, 03.log. Почему так?



Kafka называет сегмент по имени минимального смещения в нем. Новое сообщение в партиции имеет смещение 3, поэтому Kafka так и называет новый сегмент. Раз у нас есть сегменты 00 и 03, мы можем быть уверены, что сообщения со смещениями 0, 1 и 2 и правда находятся в сегменте 00. Новые сообщения в партиции freblogg-2 со смещениями 3 ,4 и 5 будут храниться в сегменте 03.


В Kafka мы часто читаем сообщения по определенному смещению. Искать смещение в файле log затратно, особенно если файл разрастается до неприличных размеров (по умолчанию это 1 ГБ). Для этого нам и нужен файл .index. В файле index хранятся смещения и физическое расположение сообщения в файле log.


Файл index для файла log, который я приводил в кратком отступлении, будет выглядеть как-то так:



Если нужно прочитать сообщение со смещением 1, мы ищем его в файле index и видим, что его положение 79. Переходим к положению 79 в файле log и читаем. Это довольно эффективный способ мы быстро находим нужное смещение в уже отсортированном файле index с помощью бинарного поиска.


Параллелизм в партициях


Чтобы гарантировать порядок чтения сообщений из партиции, Kafka дает доступ к партиции только одному консюмеру (из группы консюмеров). Если партиция получает сообщения a, f и k, консюмер читает их в том же порядке: a, f и k. Это важно, ведь порядок потребления сообщений на уровне топика не гарантирован, если у вас несколько партиций.


Если консюмеров будет больше, параллелизм не увеличится. Нужно больше партиций. Чтобы два консюмера параллельно считывали данные из топика, нужно создать две партиции по одной на каждого. Партиции в одном топике могут находиться в разных брокерах, поэтому два консюмера топика могут считывать данные из двух разных брокеров.


Топики


Наконец, переходим к топикам. Мы уже кое-что знаем о них. Главное, что нужно знать: топик это просто логическое объединение нескольких партиций.


Топик может быть распределен по нескольким брокерам через партиции, но каждая партиция находится только в одном брокере. У каждого топика есть уникальное имя, от которого зависят имена партиций.


Репликация


Как работает репликация? Создавая топик в Kafka, мы указываем для него коэффициент репликации replication-factor. Допустим, у нас два брокера и мы устанавливаем replication-factor 2. Теперь Kafka попытается всегда создавать бэкап, или реплику, для каждой партиции в топике. Kafka распределяет партиции примерно так же, как HDFS распределяет блоки данных по нодам.


Допустим, для топика freblogg мы установили коэффициент репликации 2. Мы получим примерно такое распределение партиций:



Даже если реплицированная партиция находится в другом брокере, Kafka не разрешает ее читать, потому что в каждом наборе партиций есть LEADER, то есть лидер, и FOLLOWERS ведомые, которые остаются в резерве. Ведомые периодически синхронизируются с лидером и ждут своего звездного часа. Когда лидер выйдет из строя, один из in-sync ведомых будет выбран новым лидером, и вы будете получать данные из этой партиции.


Лидер и ведомый одной партиции всегда находятся в разных брокерах. Думаю, не нужно объяснять, почему.


Мы подошли к концу этой длинной статьи. Если вы добрались до этого места поздравляю. Теперь вы знаете почти все о хранении данных в Kafka. Давайте повторим, чтобы ничего не забыть.


Итоги


  • В Kafka данные хранятся в топиках.
  • Топики разделены на партиции.
  • Каждая партиция разделена на сегменты.
  • У каждого сегмента есть файл log, где хранится само сообщение, и файл index, где хранится позиция сообщения в файле log.
  • У одного топика могут быть партиции в разных брокерах, но сама партиция всегда привязана к одному брокеру.
  • Реплицированные партиции существуют пассивно. Вы обращаетесь к ним, только если сломался лидер.

От редакции:
Более подробно от работе с Apache Kafka можно узнать на курсе Слёрма. Курс сейчас в разработке. В программе бесплатные базовые уроки и платная продвинутая часть. Оставьте заявку на курс, чтобы получать уведомления.


Ресурсы:


Схема Kafka https://kafka.apache.org/images/kafka_diagram.png

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru