Русский
Русский
English
Статистика
Реклама

Kafkaрианский переезд или как расшевелить партиции

Вступление

Привет, Хабр! Работаю java программистом в одной финансовой организации. Решил оставить свой след на хабре и написать первую свою статью. В силу проблем с наличием девопсеров передо мной была поставлена задача обновить кластер кафки с 2.0 до 2.6 без даунтайма и потери сообщений (сами понимаете, никто не любит, когда денежки зависают в воздухе или где-то теряются). Хочу поделиться этим опытом с Вами и получить конструктивный фидбек. Итак, хватит воды, переходим сразу к делу.

Схема миграции

Задача усложнялась тем, что надо было мигрировать со старых виртуалок на новые, поэтому вариант с тем, чтобы выключить брокера, поменять бинарники и запустить, мне не подходил.

Ниже представлена схема миграции. Имеем 3 брокеров на старых ВМ, 3 брокеров на новых ВМ, также для каждого брокера имеется свой zookeeper.

План миграции

Для того чтобы нам мигрировать так, чтобы этого никто не заметил, придется "немного" попотеть. Ниже приведен общий план.

  1. Добавить в настройки приложения адреса новых брокеров

  2. Пробить доступы между всеми и вся

  3. Подготовить инфраструктуру на новых виртуалках

  4. Поднять новый кластер зукиперов и объединить его со старым

  5. Поднять новых брокеров кафки

  6. Мигрировать все партиции со старых брокеров на новые

  7. Отключить старые брокеры кафки и старых зукиперов

  8. Убрать из новых конфигов старых брокеров и зукиперов

А теперь поехали по порядку разбирать подробнее каждый пункт

Добавление брокеров в приложение

Самый простой пункт. В коде клиента, там где были прописаны адреса старых брокеров, туда же дописываем новых. В свойство "bootstrap.servers"

Строка настройки

old-server1:9092,old-server2:9092,old-server3:9092,new-server4:9092,new-server5:9092,new-server6:9092

Пробитие доступов

Одно из самых нелюбимых занятий - это пробитие доступов. Заявки согласуются долго, а выполняются еще дольше. Доступов нам придется заказать немало, давайте разберемся какие именно.

  1. Как вы уже догадались по строке настройки брокеров, нам нужен порт 9092 в направлении Приложение -> новые брокеры кафки (в сумме 3 доступа)

  2. Этот же порт для Новые брокеры <----> Старые брокеры (+18 доступов)

  3. Все тот же порт 9092 между каждым новым брокером (+6 доступов)

  4. Аналогичные доступы из пунктов 2 и 3 для портов зукиперов: 2181, 2888, 3888 ( (18+6)*3 = 72)

Итого: 99 доступов. Получите и распишитесь! А потом еще проверьте все.

обидно что до 100 не дотянули, было бы посолиднее

Когда мы прошли через эти страдания и настроили доступы, приступаем к настройке инфраструктуры.

Инфраструктура нового кластера

Для начала нам нужно создать пользователя kafka на новых виртуалках

Далее очень важный пункт - необходимо прописать разрешенное количество открытых файлов пользователю kafka. А именно, в файле /etc/security/limits.conf дописываем строки

kafka hard nofile 262144kafka soft nofile 262144

Если мы этого не сделаем, то это грозит нам внезапным падением брокера, так как кафка по своей природе любит открывать много файлов, поэтому надо подготовиться к ее аппетитам.

Честно говоря, число 262144, взято просто потому, что на старом кластере было установлено такое значение (исторически сложилось). Хотя у нас кафка открывает не больше 10 тысяч файлов, поэтому вы можете провести анализ и выставить более подходящее.

Далее нам надо прописать переменную среды к java

в файл /home/kafka/.bash_profile дописываем

export JAVA_HOME=/opt/javaexport PATH=JAVA_HOME/bin:PATH

Соответственно у вас должна лежать jre в папке /opt/java

Далее раскидываем папки по нужным директориям и устанавливаем права

Ниже описан скрипт, реализующий, то что я описал выше. Его надо выполнить на каждом новом брокере.

setup.sh
tar -xf ../jdk1.8.0_181.tar.gz -C /opt/mv /home/kafka/kafka /optmv /home/kafka/zookeeper /optmv /home/kafka/kafka-start.sh /optmv /home/kafka/scripts /optln -sfn /opt/kafka/kafka_2.13-2.6.0 /opt/kafka/currentln -sfn /opt/zookeeper/zookeeper-3.6.2 /opt/zookeeper/currentln -sfn /opt/jdk1.8.0_181/ /opt/javachown -R kafka:kafka /optchmod -R 700 /opt#env_var start ------------------------------->kafkaProfile=/home/kafka/.bash_profilehomeVar="export JAVA_HOME=/opt/java"javaHome=$(cat $kafkaProfile | grep "$homeVar")if [ "$javaHome" != "$homeVar" ]; then    echo -e "\n$homeVar\n" >> $kafkaProfilefipathVar="export PATH=\$JAVA_HOME/bin:\$PATH"path=$(cat $kafkaProfile | grep "$pathVar")if [ "$path" != "$pathVar" ]; then    echo -e "\n$pathVar\n" >> $kafkaProfilefi#env_var end --------------------------------->#ulimit start >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>limitsFile=/etc/security/limits.confsoft="kafka soft nofile 262144"limitSoft=$(cat $limitsFile | grep "$soft")if [ "$limitSoft" != "$soft" ]; then    echo -e "\n$soft\n" >> $limitsFilefihard="kafka hard nofile 262144"limitHard=$(cat $limitsFile | grep "$hard")if [ "$limitHard" != "$hard" ]; then    echo -e "\n$hard\n" >> $limitsFilefi#ulimit end >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

Далее из под пользователя kafka проверяем что у нас все установлено правильно

ulimit -n  = 262144echo $JAVA_HOME  = /opt/javaecho $PATH  должен содержать /opt/java/bin

Новый кластер зукиперов

Необходимо проверить, что в конфиге новых зукиперов прописаны все участники ансабля, а также указана директория, для работы зукипера, где хранится файл myid, в котором прописаны id зукиперов.

ниже кусок файла конфигурации зукипера /opt/zookeeper/current/conf/zoo.cfg

zoo.cfg
dataDir=/opt/zookeeper/zookeeper-dataserver.1=server1:2888:3888server.2=server2:2888:3888server.3=server3:2888:3888server.4=server4:2888:3888server.5=server5:2888:3888server.6=server6:2888:3888

Стартуем все зукиперы /opt/zookeeper/current/bin/zkServer.sh start

Далее нам надо добавить в конфиг старых зукиперов новых участников. Для этого на старых серверах выполняем такой скрипт

zk-add-new-servers.sh
echo -e "\nserver.4=server4:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfgecho -e "\nserver.5=server5:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfgecho -e "\nserver.6=server6:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg

Теперь нужно определить кто из зукиперов был лидером, и перезапустить все зукиперы. Лидера в последнюю очередь.

/opt/zookeeper/zookeeper-3.4.13/bin/zkServer.sh restart

Наконец проверяем, что все зукиперы увидели друг друга и выбрали лидера. Везде выполняем

.../zkServer.sh status

Настройка ансамбля зукиперов завершена.

Поднятие новых брокеров

Теперь можно заняться запуском новых брокеров, но перед этим нужно убедиться что в server.properties у нас проставлен broker.id и zookeeper.connect

Пример для 4 брокера

broker.id=4zookeeper.connect=server4:2181,server5:2181,server6:2181/cluster_name

Здесь мы указали только новых зукиперов, но так как они находятся в ансамбле, то новые брокеры узнают через них о старом кластере.

Также необходимо там же в server.properties прописать текущую версию протокола взаимодействия между брокерами (2.0.0). Иначе мы не сможем переместить партиции.

inter.broker.protocol.version=2.0.0

Запускаем новых брокеров

nohup /opt/kafka/current/bin/kafka-server-start.sh /opt/kafka/config/server.properties > log.log 2>&1 &

Теперь необходимо проверить что новые брокеры присоединились к кластеру. Для этого заходим в cli любого из зукиперов.

/opt/zookeeper/current/bin/zkCli.sh

и выполняем там команду

ls /cluster_name/brokers/ids

Должен быть выведен такой ответ

[1,2,3,4,5,6]

Это значит что в кластере у нас все 6 штук брокеров, чего мы и добивались.

Перемещение партиций

Сейчас начинается самое интересное. Изначально где-то сидело в голове, а иногда и подкреплялось чатиками по кафке, что партиции сами переедут, нужно только поднять новых брокеров и поочереди гасить старые. replication.factor должен сработать и реплицировать все на новых брокеров. В действительности все оказалось не так просто. Текущие партиции мертво приклеены к старым брокерам и только создание новых топиков приводит к появлению партиций на новых брокерах.

Заглянув в документацию я увидел , что перемещение партиций "автоматическое", но надо проделать немалое количество манипуляций, чтобы запустить все это дело.

Сначала нужно сгенерировать json в котором надо указать все топики, которые будут перемещаться.

{"topics": [{"topic": "foo1"},              {"topic": "foo2"}],  "version":1 }

Так как топиков у нас было много, я сгенерил несколько таких файлов

Далее все эти json файлы надо скормить утилите kafka-reassign-partitions.sh с аргументом --generate и указать id брокеров куда мы хотим переехать --broker-list "4,5,6".

У нас получится несколько файлов такого вида

generate1.json
{"version":1,  "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2,3]},                {"topic":"foo1","partition":0,"replicas":[3,2,1]},                {"topic":"foo2","partition":2,"replicas":[1,2,3]},                {"topic":"foo2","partition":0,"replicas":[3,2,1]},                {"topic":"foo1","partition":1,"replicas":[2,3,1]},                {"topic":"foo2","partition":1,"replicas":[2,3,1]}]  }  Proposed partition reassignment configuration  {"version":1,  "partitions":[{"topic":"foo1","partition":2,"replicas":[5,4,6]},                {"topic":"foo1","partition":0,"replicas":[4,5,6]},                {"topic":"foo2","partition":2,"replicas":[6,4,5]},                {"topic":"foo2","partition":0,"replicas":[4,5,6]},                {"topic":"foo1","partition":1,"replicas":[5,4,6]},                {"topic":"foo2","partition":1,"replicas":[4,5,6]}]  }

Все что идет до Proposed partition reassignment configuration это текущее распределение партиций (может пригодиться для роллбека). Все что после - распределение после переезда. Соответственно нам надо собрать все эти нижние половины со всех файлов, сформировать новые json.

Так как все это делать руками ну ооочень долго. я "быстро" накатал скрипт, который вызывает все эти команды и режет файлы как нам надо. За резку файлов отвечает джарник kafka-reassign-helper.jar код можно глянуть тут.

Ниже скрипт подготовки файлов для переезда партиций

prepare-for-reassignment.sh
#параметр отвечает за количество топиков в одном файле. у нас было 20if [ "$#" -eq 0 ]then    echo "no arguments"    exit 1fiecho "Start reassignment preparing"/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> topics.txtecho created topics.txtjava -jar kafka-reassign-helper.jar generate topics.txt $1fileCount=$(ls -dq generate*.json | wc -l)echo "created $fileCount file for topics to move"echo -e "\nCreating generated files\n"mkdir -p generatedfor ((i = 1; i < $fileCount+1; i++ ))do/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --broker-list "4,5,6" --topics-to-move-json-file "generate$i.json" --generate >> "generated/generated$i.txt"echo "generated/generated$i.txt" createddoneecho -e "\nCreating execute/rollback files"java -jar kafka-reassign-helper.jar execute $fileCountecho -e "\nexecute/rollback files created"echo -e "\nPreparing finished successfully!"

После выполнения скрипта скармливаем сгенеренные файлы по очереди утилите kafka-reassign-partitions.sh, но на этот раз с параметром --execute

move-partitions.sh
#параметр отвечает за номер файла execute1.json, execute2.json ....if [ "$#" -eq 0 ]then    echo "no arguments"    exit 1fi        /opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file "execute/execute$1.json" --execute

Вот теперь кафка начнет перемещать партиции. В логе новых брокеров судорожно забегают строки, как и в логах старых. Пока все это происходит, чтобы не скучать, нам предоставили третий аргумент kafka-reassign-partitions.sh, а именно --verify, который предоставляет узнать текущий статус переезда.

Опять же в силу лени, чтобы постоянно не дергать руками эту команду, был написан очередной скрипт.

reassign-verify.sh
progress=-1while [ $progress != 0 ]do    progress=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "in progress" -c)    complete=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "is complete" -c)    failed=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "failed" -c)    echo "In progress:" $progress;    echo "Is complete:" $complete;    echo "Failed:" $failed;    sleep 2sdone

Вызываем и ждем пока In progress станет равным 0. а Is complete примет какое-то конечное значение. И надеемся что Failed так и останется 0.

Когда все закончилось вызываем снова move-partitions.sh и reassign-verify.sh для следующего файла, до тех пор пока не переберем все наши файлы миграции.

После обработки всех файлов идем в log.dirs старых брокеров и убеждаемся, что там остались только технические логи самой кафки - значит все партиции точно на новом кластере.

Отключение старого кластера

Тут все просто. выполняем kafka-server-stop.sh и zkStop.sh для брокеров и зукиперов соответственно.

Чистим конфиги нового кластера

Начинаем с зукиперов. Из файлов zoo.cfg удаляем лишние записи

zk-remove-old-servers.sh
sed -i '/server.1/d' /opt/zookeeper/current/conf/zoo.cfgsed -i '/server.2/d' /opt/zookeeper/current/conf/zoo.cfgsed -i '/server.3/d' /opt/zookeeper/current/conf/zoo.cfg

Далее, как обычно, у зукиперов определяем кто лидер и перезапускаем по очереди, лидера в последнюю очередь. После перезапуска проверяем статусы: 2 фолловера и 1 лидер.

Удаляем из server.properties брокеров старый протокол общения

remove-old-protocol.sh
sed -i '/inter.broker.protocol.version=2.0.0/d' /opt/kafka/config/server.properties

Теперь вроде как можно перезапустить брокеров, но надо убедиться, что все партиции находятся в состоянии insync, причем не в количестве min.insync.replicas (у нас равно 2), а в количестве default.replication.factor (у нас 3)

Иначе если где-то будет 2 реплики, а не 3, то при перезапуске брокера, на котором лежит одна из этих двух реплик, мы получим ошибку на клиенте, что не хватает insync реплик.

Ниже скриптик для проверки

check-insync.sh
input="check-topics.txt"rm -f $input/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> check-topics.txtcheckPerIter=100i=0list=""notInsync=0while IFS= read -r linedo ((i=i+1)) list+="${line}|" if [ $i -eq $checkPerIter ]  then   list=${list::${#list}-1}   echo "checking $list"   count=$(/opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$" -c)   if [ "$count" -ne 0 ]    then     /opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$"   fi   ((notInsync=notInsync+count))   list=""   i=0 fidone < "$input"echo "not insync: $notInsync"

Если на выходе получаем not insync: 0, то можно по очереди перезапустить брокеров.

Вот и все. Миграция брокеров на этом завершена, если не считать последущей перенастройки мониторинга и прочих вспомогательных дел.

Вот как выглядит инструкция, которую я отправил админам, которые все это проворачивали на бою. На удивление справились с первого раза и без вопросов.

README.txt
Инструкция по миграции кафка 2.0.0 -> 2.6.01 этап. Подготовка инфраструктуры1.1 Скопировать архивы для миграции на сервера кафкиNEW4.tar.gz -> адрес 4 брокераmigration.tar.gz -> адрес 4 брокераNEW5.tar.gz -> адрес 5 брокераNEW6.tar.gz -> адрес 6 брокера1.2 Разархивировать архивыtar -xf NEW4.tar.gz -C /home/kafkatar -xf NEW5.tar.gz -C /home/kafkatar -xf NEW6.tar.gz -C /home/kafka1.3 От пользователя root выполнить скрипт на каждом новом сервере (установка файлов в нужные директории, прав и лимитов)/home/kafka/scripts/setup.sh1.4 Перейти на пользователя kafka (на каждом новом сервере)1.5 Проверить следующие параметры среды (на каждом новом сервере)ulimit -n  = 262144echo $JAVA_HOME  = /opt/javaecho $PATH  должен содержать /opt/java/binсодержимое папки /opt должно принадлежать kafka kafka2 этап. Запуск зукиперов и брокеров2.1 Перейти под пользователя kafka (на каждом новом сервере)2.2 Выполнить скрипт на каждом новом сервере (запускаются новые зукиперы)/opt/scripts/zkStart.sh2.3 Скопировать архивы на старые сервера кафки OLD1.tar.gz -> адрес 1 брокераOLD2.tar.gz -> адрес 2 брокераOLD3.tar.gz -> адрес 3 брокера2.4 Разархивировать архивыtar -xf OLD1.tar.gz -C /home/kafkatar -xf OLD2.tar.gz -C /home/kafkatar -xf OLD3.tar.gz -C /home/kafka2.5 От пользователя root выполнить скрипт на каждом старом сервере (установка прав на скрипты миграции)/home/kafka/scripts/setup-old.sh2.6 Перейти под пользователя kafka на каждом старом сервере2.7 Выполнить скрипт на каждом старом сервере (добавляем новые зукиперы в конфиг старых зукиперов)/home/kafka/scripts/zk-add-new-servers.sh2.8 Выполнить скрипт на каждом старом сервере (проверка статуса зукиперов)/home/kafka/scripts/zkStatus.shопределить какой из старых серверов является лидером2.9 Перезапустить по очереди зукиперы на старых серверахлидера перезапускать в последнюю очередь/home/kafka/scripts/zkRestart.sh2.10 Проверить что кластер зукиперов подхватил новые серверадля этого на всех серверах выполнить/home/kafka/scripts/zkStatus.sh (на старых)/opt/scripts/zkStatus.sh (на новых)все сервера кроме одного должны быть в состоянии followerодин сервер leader2.11 запуск новых брокеровна новых серверах выполнить скрипт/opt/kafka-start.sh2.12 проверить что новые брокеры в кластеревыполнить на новом сервере/home/kafka/migration/zkCli.shls /cluster_name/brokers/idsдолжно быть выведено [1,2,3,4,5,6]3 этап Перемещение партиций на новых брокеров3.1 Выполнить скрипт (подготовка файлов для перемещения партиций)/home/kafka/migration/prepare-for-assignment.sh  203.2 Выполнить данный пункт по очереди для каждого файла в директории /home/kafka/migration/executeПример:для файла execute1.json/home/kafka/migration/move-partitions.sh 1после начала выполнения скрипта выполнить скрипт Пример:/home/kafka/migration/reassign-verify.sh 1когда количество партиций в состоянии "in progress" достигнет 0 дождаться ОК от программиста и приступить к п.3.2 для следующего файла3.3 После завершения перемещения партиций производится проверка логов на наличие ошибок, а также проверка директории /opt/kafka/kafka-data на всех старых брокерахДолжны остаться только технические логи кафки3.4 По очереди выключить на старых серверах брокеров, выполнив скрипт/opt/kafka/current/bin/kafka-server-stop.sh3.5 По очереди выключить зукиперов на старых серверах, выполнив скрипт/home/kafka/scripts/zkStop.sh3.6 На новых серверах выполнить скрипт (удаляем старых зукиперов из конфига новых зукиперов)/opt/scripts/zk-remove-old-servers.sh3.7 На новых серверах перезапустить зукиперов/opt/scripts/zkRestart.sh3.8 На новых серверах проверить статус зукиперов (один лидер, 2 фоловера)/opt/scripts/zkStatus.sh3.9 На новых серверах выполнить скрипт (удаляем из конфига старый протокол общения между брокерами)/opt/scripts/remove-old-protocol.sh3.10 Проверить что все реплики в статусе insyncдля этого запустить скрипт /home/kafka/migration/check-insync.shnot insync должно быть равно 03.11 По очереди для каждого нового брокера выполнить/opt/kafka/current/bin/kafka-server-stop.shдождаться пока брокер остановиться (ps aux | grep kafka должен показывать только зукипера)выполнить /opt/kafka-start.shПриступить к 3.10 следующего брокераМиграция завершена!

Надеюсь, что кому-то помог в этом вопросе. Жду ваших комментов и замечаний.

Все скрипты и инструкции, в том числе для роллбеков можно найти тут

Источник: habr.com
К списку статей
Опубликовано: 06.02.2021 12:22:42
0

Сейчас читают

Комментариев (0)
Имя
Электронная почта

Java

It-инфраструктура

Devops

Кафка

Миграция

Девопс

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru