Русский
Русский
English
Статистика
Реклама

Retail

Одна Kafka хорошо, а несколько лучше

26.01.2021 20:09:19 | Автор: admin

Всем привет! Меня зовут Александр, я инженер команды, отвечающей за развитие централизованныхIT-сервисов, которыми пользуются продуктовые команды вX5RetailGroup.

В этой статье речь пойдёт обApacheKafkaи том, как этот продукт используется для обеспечения потребностей команд разработки. Статья не погружает в технические аспекты, но может быть полезна архитекторам и менеджерам, которые думают о том, чтобы попробовать использоватьKafka, но не знают, подойдёт ли она для их задач, а также разработчикам, которые могут открыть для себя новые инструменты для удобной работы с кластерами.

На момент написания статьи силами нашей команды развёрнуты и поддерживаются 14 продуктивных кластеров (1 централизованный и 13 у продуктовых команд) и 15 непродуктивных.

Централизованный кластер Kafka

Основной сценарий, в рамках которогоKafkaиспользует наша команда доставка логов вElasticsearch.

Немного цифр об этом кластере для начала:

  • брокеры - 5

  • топики 179

  • consumer группы 77

  • средний объем данных[1]в топиках 555.1 ГБ

[1] значение за последние 90 дней

Небольшое лирическое отступление. Многие сталкивались с ситуацией, когда в одно прекрасное утро ты видишь на графиках резкий рост количества логов, но не понимаешь, что именно стало причиной: новые команды не заезжали в сервис, новый функционал не планировался, и команды не предупреждали о том, что ожидается рост (потому что изначально команда закладывает вычислительные мощности под определенный объем данных). В результате расследования выясняется, что разработчики просто включили уровень логированияDebugилиTrace. Также, иногда, встречаются сложные системы, бизнес-логика которых требует сохранять максимально полную информацию, растущая, как снежный ком, с течением времени. Например,X5 использует в работе систему маркировки табачных изделий. В какой-то момент мы обнаружили, что размер одного сообщения с логами достигает порядка 600 кб, потому что вся информация о продукции и ее перемещении дополняется на всем пути до магазина.

Поэтому для нас также было важно обеспечить доступность сервиса и не позволить, чтобы поток логов перегрузил нашу систему до отказа в результате незапланированного роста количества данных.

На этапе проектирования сервиса сбора логов,командапоняла, чтонеобходимо гарантировать запись вElasticsearchвсех без исключения событий. Таким образом обеспечивается целостность данных, которые поступают от внешних систем, и команды всегда могут получить полную картину в том виде, в котором они пришли из внешних источников. Помимо этого, важно было иметь в виду, что количество команд будет расти со временем, а сервисы развиваться. Количество сообщений и информации в них будет увеличиваться, а для нас будет более затруднительно контролировать, какую информацию в логах пишут команды. Это значит, что мы должны сделать отказоустойчивую систему, которую можно сравнительно легко масштабировать в будущем.

Для достижения этих целей нам отлично подошло решениеApacheKafkaпо следующим причинам:

  • репликация и валидация записи.

    Kafkaимеет механизм валидации записи acknowledgements. С помощью параметраacks можно настроить, сколько брокеров (реплик) должны отправить наproducerподтверждение записи. Конечно, использованиеacks, особенно в случае, если мы хотим быть уверены, что данные реплицировались на все брокеры, добавляет небольшую задержку, которая требуется на репликацию. Но для нас важнее быть уверенными, что данные, которые мы хотим передавать дальше, будут записаны вKafka;

  • хранение сообщений в очереди.Если потребитель (в нашем случае этоLogstash, который забирает сообщения изKafka) по какой-то причине не успевает обрабатывать сообщения или просто недоступен, эти данные будут прочитаны и доставлены в конечную систему сразу после стабилизации работы потребителей;

  • хранение сообщений после прочтения.

    Kafkaне удаляет сообщения, а хранит в течение времени, которое описывается в параметрахretention. Это дает возможность восстановить данные в случае, если что-то случится с индексом вElasticsearchи данные станут недоступны;

  • партиционирование.

    За счет увеличения числа партиций топика можно увеличить пропускную способностьKafka, добавив дополнительных потребителей. Это увеличивает количество потоков, которые могут читать данные параллель и полезно в случае, когдаproducerгенерирует большое количество сообщений.

Изначально кластер был развернут натрехмашинах, но после роста числа команд мы масштабировали кластер, добавив ещё две ноды.

#

vCPU

RAM

Storage[2]

Kafka

Kafka Uptime

Zookeeper

ZK Uptime

1

4

16 ГБ

290 ГБ

+

1г1м

+

1г5м

2

4

16ГБ

270 ГБ

+

1г1м

+

1г5м

3

4

16ГБ

290 ГБ

+

1г1м

+

1г5м

4

4

16ГБ

270 ГБ

+

-

-

5

4

16ГБ

270 ГБ

+

1г1м

-

-

[2] учитывается объем, отведенный под данныеKafka

Мы видим, что последние 2 ноды были добавлены в кластер чуть более года назад, как раз в это время и произошел перезапуск сервиса на нодах 1-3, а на 4-й ноде перезапуск происходил позднее, скорее всего, проводились какие-то работы.

Когда внутри одной системы хранятся данные нескольких команд важно обеспечить конфиденциальность данных и разграничить права доступа.

Управление доступами

Чтобы разграничить команды по топикам, мы используемKafkaSecurityManager (https://github.com/simplesteph/kafka-security-manager). Все правила доступа мы описываем в файле сACL. Выглядит это вот так:

User:projectprodwrite@srelogs,Topic,PREFIXED,projectprod,Create,Allow,User:projectprodread@srelogs,Topic,PREFIXED,projectprod,Read,Allow,User:projectprodread@srelogs,Group,PREFIXED,projectprod,All,Allow,

где:

UserCNсертификата, который используется для подключения,

srelogs имя кластера,

Topic/Group объект, которым управляет данная запись,

PREFIXED/LITERAL как будет применяться, относительно именем объекта вKafka(по префиксу или полное совпадение),

project_prod имя объекта и права, которые получает пользователь.

Producer/consumerавторизуются с помощьюSSLсертификатов, которые мы генерируем автоматически и храним вVault.

Интеграция в конвейер поставки логов

Подготовка всех необходимых компонентов выполняется с помощью ролейAnsible. В зависимости от окружения (продуктивное или непродуктивное), и прочих параметров, описанных в инвентаре, создается набор сущностей и конфигурируются нужные сервисы (индексы и тенанты вElasticsearch, пайплайны вLogstash)

После того, как все необходимые компоненты созданы и настроены, топики автоматически создаются, как только первые сообщения начинают отправляться вKafkaблагодаря параметруauto.create.topics.enable=True

Для обеспечения высокой производительности кластера, рекомендуется использоватьKafkaс сообщениями небольшого размера. По этой причине мы настоятельно рекомендуем командам следить за тем, чтобы в логи писалась только полезная информация, а дляElasticsearchпоставили ограничение на размер одного сообщения.

В целом, графики измеряющие поток входящих сообщений показывают нам стабильную и равномерную картину, однако, время от времени возникают неожиданные всплески (фиолетовые и желтые графики), которые создают команды разработки в топикахdevсред, предположительно, во время проведения тестов.

ИспользованиеKafkaв цепочке поставки логов позволяет нам контролировать поток входящих сообщений,Logstash(у каждой команды свой) будет равномерно вычитывать все, что попадает в топикKafka, а мы будем спокойны, что наш конвейер поставки логов не упадет от внезапно возросшей нагрузки. В случае, если нашconsumerстанет недоступен или не будет справляться с нагрузкой, все события так или иначе останутся в топике и будут прочитаны и отправлены вElasticsearchпосле восстановления работоспособностиLogstash.

Кластер для команды

Кроме большого кластера, который мы используем для гарантированной доставки логов, некоторые команды используют кластерыKafkaв своих проектах. При этом задачи, которые они решают с помощью этого продукта, схожи с нашими гарантированная запись данных, возможность обратиться к любым данным в течение времени их жизни в топике, отказоустойчивость и целостность.

У нас есть информационный ресурс, в котором описаны все сервисы, предоставляемые нашей командой, в том числе и параметры по умолчанию, которые устанавливаются для той или иной системы. Аналогичные дефолтные значения прописаны и для кластераKafka. Вот некоторые из них:

auto.create.topics.enable=Truedelete.topic.enable=Truelog.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connection.timeout.ms=6000auto.leader.rebalance.enable=true

Нести в статью все, что описано в роляхAnsibleя не вижу смысла, но приведу некоторые примеры того, что используем мы вX5 .

Для обеспечения надежности кластера рекомендуется использовать количество реплик равное количеству нод и выставить значение минимально синхронизированных реплик на единицу меньше, чтобы в случае вылета одного узла, кластер смог продолжить работу. Для удобства мы используем именно эти значения, если иное не описано в инвентаре для проекта:

- name: 01|Set Kafka replication factor  set_fact:    kafka_cfg_default_replication_factor: "{{  kafka_cfg_default_replication_factor | default(kafka_hosts|length) }}"    kafka_cfg_offsets_topic_replication_factor: "{{ kafka_cfg_offsets_topic_replication_factor | default(kafka_hosts|length) }}"    kafka_cfg_transaction_state_log_replication_factor: "{{ kafka_cfg_transaction_state_log_replication_factor | default(kafka_hosts|length) }}"  run_once: True- name: 01|Set kafka ISR  set_fact:    kafka_cfg_min_insync_replicas: "{{ kafka_cfg_min_insync_replicas | default([kafka_cfg_default_replication_factor|int - 1 , 1] | max) }}"    kafka_cfg_transaction_state_log_min_isr: "{{ kafka_cfg_transaction_state_log_min_isr | default([kafka_cfg_transaction_state_log_replication_factor|int - 1 , 1] | max) }}"  run_once: True

По умолчанию мы выставляем удаление всех событий старше 30 дней, обычно этого хватает командам:

log.retention.hours=720

В случае, если команде требуются иные значения, то изменить какие-то параметры несложно. Нужно просто описать параметры и их значения в инвентаре нужного проекта:

Project.yml---project: name. . .kafka_scala_version: "2.11"kafka_zk_chroot: '/'kafka_enable_protocol: ['PLAINTEXT']kafka_cfg_default_replication_factor: 2kafka_cfg_log_retention_hours: 6kafka_cfg_log_segment_bytes: 52428800

Как и в случае с общим кластером, для обеспечения безопасности, мы используем SSL сертификаты. По умолчанию предоставляем кластер с параметромkafkaenableprotocol: ['SSL'], что гарантирует возможность подключения к кластеру только тех, кто имеет соответствующие клиентские сертификаты.

- name: Lookup for ssl data in Vault  set_fact:    jks_b64: "{{ lookup('hashi_vault', 'secret=sre/{{ env }}/{{ project }}/kafka/{{ inventory_hostname }}:kafka.keystore.jks.b64') }}"- name: Copy keystore data from Vault  copy:    dest: "/opt/kafka/ssl/{{ inventory_hostname }}/kafka.keystore.jks"    content: "{{ jks_b64 | b64decode }}"

Для удобства управления мы заворачиваемKafkaиZookeeperв сервисы, поскольку не используем контейнеры. Пример шаблона сервисаKafka, которыйAnsibleприносит на виртуальную машину:

[Unit]Description=Kafka DaemonAfter=zookeeper.service[Service]Type=simpleUser={{ kafka_user }}Group={{ kafka_group }}LimitNOFILE={{ kafka_nofiles_limit }}Restart=on-failureEnvironmentFile=/etc/default/kafkaExecStart={{ kafka_bin_path }}/kafka-server-start.sh {{ kafka_config_path }}/server.propertiesExecStop={{ kafka_bin_path }}/kafka-server-stop.shWorkingDirectory={{ kafka_bin_path }}[Install]WantedBy=multi-user.target

Плюсытакогоразделения:

  • разграничение ресурсов.

    Каждая команда знает, сколько ресурсов выделено конкретно под их продукт, и не переживает о том, что проект-сосед может занять большой объем оперативной памяти, тем самым повлиять на производительность их системы. Помимо этого, мы можем предоставлять командам разработки дополнительные инструменты и не думать о том, что какие-то из их действий могут навредить кому-то еще кроме них самих;

  • гибкость управления.

    В случае, если необходимо изменить какие-то параметры и перезапустить Kafka, не требуется согласовывать работы с большим количеством команд.

Пример нагрузки на одном из кластеров, который использует система маркировки:

Видно, что нагрузка на систему держится примерно на одном уровне на протяжении любого периода времени (в примере это неделя).

Из минусов можно выделить то, чтоKafkaможет быть избыточна для команд, которым нужен простой брокер сообщений.

Дополнительные инструменты для работы с Kafka

Поскольку основными пользователями отдельных кластеров являются продуктовые команды, мы должны обеспечить их всем необходимым для того, чтобы можно было следить за состоянием кластера, получать информацию о настройках и содержимом топиков.

Таким набором по умолчанию у нас являются экспортеры для сбора метрик и панели графиковGrafanaдля визуализации этих метрик:

jmx-exporter позволяет отслеживать состояниеJavaVirtualMachine,

kafka-exporter,zookeeper-exporter для того чтобы понимать, как себя чувствуют наши сервисы и получать поверхностную картину,

telegraf дает информацию о состоянии ноды, на которой крутитсяKafka.

Большинству команд этого хватает. Для тех, кому нужно чуть больше информативности, мы предлагаемkafka-minionexporter(https://github.com/cloudworkz/kafka-minion), который позволяет получать больше информации о том, что происходит с топиками, например, сколько групп потребителей подключены к топику и т.п.

Поскольку у команд нет прямого доступа на сервер сKafka, им нужно дать возможность просматривать содержимое и, например, быстро удалять топики, не дергая для этого каждый раз нас. Для решения этих задач мы предлагаем использоватьKafdrop (https://github.com/obsidiandynamics/kafdrop). Для оперативного предоставленияKafdrop, мы используем готовыйCIpipeline, который поднимает в окруженииOpenShiftдва пода:Kafdropиnginx. В результате мы получаемwebUIсbasicаутентификацией, настроенной средствамиnginx.

Помимо этого, точечно по запросам команд мы можем подготовить различные коннекторы, например, коннекторы для баз данных (PostgreSQLConnector,MongoDBKafkaConnector),ksqlDBилиKafkaC22CC23CC24Cдля взаимодействия с кластером черезRESTAPI.C25C

Заключение

Как видно из нашего рассказа,Kafkaотлично зарекомендовала себя в качестве вспомогательного сервиса в цепочке поставки логов, в том числе за счет удобства масштабирования и возможности вернуться к сообщениям, которые уже были прочитаны. Для применения в разработке продуктовKafkaимеет всевозможные коннекторы, которые облегчают интеграцию с другими компонентами продукта. Кроме этого, существует большое количество инструментов, облегчающих жизнь инженеров и разработчиков, работающих с кластером.

Тем не менее, для команд не всегда этот инструмент подходит из-за возможной избыточности функционала и объема необходимых вычислительных ресурсов, что в свою очередь увеличивает затраты на бюджет проекта. Именно поэтому сейчас мы начинаем в пилотном режиме предоставлять командамRabbitMQ, но это уже совсем другаяистория.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru