Русский
Русский
English
Статистика
Реклама

Apache

Одна Kafka хорошо, а несколько лучше

26.01.2021 20:09:19 | Автор: admin

Всем привет! Меня зовут Александр, я инженер команды, отвечающей за развитие централизованныхIT-сервисов, которыми пользуются продуктовые команды вX5RetailGroup.

В этой статье речь пойдёт обApacheKafkaи том, как этот продукт используется для обеспечения потребностей команд разработки. Статья не погружает в технические аспекты, но может быть полезна архитекторам и менеджерам, которые думают о том, чтобы попробовать использоватьKafka, но не знают, подойдёт ли она для их задач, а также разработчикам, которые могут открыть для себя новые инструменты для удобной работы с кластерами.

На момент написания статьи силами нашей команды развёрнуты и поддерживаются 14 продуктивных кластеров (1 централизованный и 13 у продуктовых команд) и 15 непродуктивных.

Централизованный кластер Kafka

Основной сценарий, в рамках которогоKafkaиспользует наша команда доставка логов вElasticsearch.

Немного цифр об этом кластере для начала:

  • брокеры - 5

  • топики 179

  • consumer группы 77

  • средний объем данных[1]в топиках 555.1 ГБ

[1] значение за последние 90 дней

Небольшое лирическое отступление. Многие сталкивались с ситуацией, когда в одно прекрасное утро ты видишь на графиках резкий рост количества логов, но не понимаешь, что именно стало причиной: новые команды не заезжали в сервис, новый функционал не планировался, и команды не предупреждали о том, что ожидается рост (потому что изначально команда закладывает вычислительные мощности под определенный объем данных). В результате расследования выясняется, что разработчики просто включили уровень логированияDebugилиTrace. Также, иногда, встречаются сложные системы, бизнес-логика которых требует сохранять максимально полную информацию, растущая, как снежный ком, с течением времени. Например,X5 использует в работе систему маркировки табачных изделий. В какой-то момент мы обнаружили, что размер одного сообщения с логами достигает порядка 600 кб, потому что вся информация о продукции и ее перемещении дополняется на всем пути до магазина.

Поэтому для нас также было важно обеспечить доступность сервиса и не позволить, чтобы поток логов перегрузил нашу систему до отказа в результате незапланированного роста количества данных.

На этапе проектирования сервиса сбора логов,командапоняла, чтонеобходимо гарантировать запись вElasticsearchвсех без исключения событий. Таким образом обеспечивается целостность данных, которые поступают от внешних систем, и команды всегда могут получить полную картину в том виде, в котором они пришли из внешних источников. Помимо этого, важно было иметь в виду, что количество команд будет расти со временем, а сервисы развиваться. Количество сообщений и информации в них будет увеличиваться, а для нас будет более затруднительно контролировать, какую информацию в логах пишут команды. Это значит, что мы должны сделать отказоустойчивую систему, которую можно сравнительно легко масштабировать в будущем.

Для достижения этих целей нам отлично подошло решениеApacheKafkaпо следующим причинам:

  • репликация и валидация записи.

    Kafkaимеет механизм валидации записи acknowledgements. С помощью параметраacks можно настроить, сколько брокеров (реплик) должны отправить наproducerподтверждение записи. Конечно, использованиеacks, особенно в случае, если мы хотим быть уверены, что данные реплицировались на все брокеры, добавляет небольшую задержку, которая требуется на репликацию. Но для нас важнее быть уверенными, что данные, которые мы хотим передавать дальше, будут записаны вKafka;

  • хранение сообщений в очереди.Если потребитель (в нашем случае этоLogstash, который забирает сообщения изKafka) по какой-то причине не успевает обрабатывать сообщения или просто недоступен, эти данные будут прочитаны и доставлены в конечную систему сразу после стабилизации работы потребителей;

  • хранение сообщений после прочтения.

    Kafkaне удаляет сообщения, а хранит в течение времени, которое описывается в параметрахretention. Это дает возможность восстановить данные в случае, если что-то случится с индексом вElasticsearchи данные станут недоступны;

  • партиционирование.

    За счет увеличения числа партиций топика можно увеличить пропускную способностьKafka, добавив дополнительных потребителей. Это увеличивает количество потоков, которые могут читать данные параллель и полезно в случае, когдаproducerгенерирует большое количество сообщений.

Изначально кластер был развернут натрехмашинах, но после роста числа команд мы масштабировали кластер, добавив ещё две ноды.

#

vCPU

RAM

Storage[2]

Kafka

Kafka Uptime

Zookeeper

ZK Uptime

1

4

16 ГБ

290 ГБ

+

1г1м

+

1г5м

2

4

16ГБ

270 ГБ

+

1г1м

+

1г5м

3

4

16ГБ

290 ГБ

+

1г1м

+

1г5м

4

4

16ГБ

270 ГБ

+

-

-

5

4

16ГБ

270 ГБ

+

1г1м

-

-

[2] учитывается объем, отведенный под данныеKafka

Мы видим, что последние 2 ноды были добавлены в кластер чуть более года назад, как раз в это время и произошел перезапуск сервиса на нодах 1-3, а на 4-й ноде перезапуск происходил позднее, скорее всего, проводились какие-то работы.

Когда внутри одной системы хранятся данные нескольких команд важно обеспечить конфиденциальность данных и разграничить права доступа.

Управление доступами

Чтобы разграничить команды по топикам, мы используемKafkaSecurityManager (https://github.com/simplesteph/kafka-security-manager). Все правила доступа мы описываем в файле сACL. Выглядит это вот так:

User:projectprodwrite@srelogs,Topic,PREFIXED,projectprod,Create,Allow,User:projectprodread@srelogs,Topic,PREFIXED,projectprod,Read,Allow,User:projectprodread@srelogs,Group,PREFIXED,projectprod,All,Allow,

где:

UserCNсертификата, который используется для подключения,

srelogs имя кластера,

Topic/Group объект, которым управляет данная запись,

PREFIXED/LITERAL как будет применяться, относительно именем объекта вKafka(по префиксу или полное совпадение),

project_prod имя объекта и права, которые получает пользователь.

Producer/consumerавторизуются с помощьюSSLсертификатов, которые мы генерируем автоматически и храним вVault.

Интеграция в конвейер поставки логов

Подготовка всех необходимых компонентов выполняется с помощью ролейAnsible. В зависимости от окружения (продуктивное или непродуктивное), и прочих параметров, описанных в инвентаре, создается набор сущностей и конфигурируются нужные сервисы (индексы и тенанты вElasticsearch, пайплайны вLogstash)

После того, как все необходимые компоненты созданы и настроены, топики автоматически создаются, как только первые сообщения начинают отправляться вKafkaблагодаря параметруauto.create.topics.enable=True

Для обеспечения высокой производительности кластера, рекомендуется использоватьKafkaс сообщениями небольшого размера. По этой причине мы настоятельно рекомендуем командам следить за тем, чтобы в логи писалась только полезная информация, а дляElasticsearchпоставили ограничение на размер одного сообщения.

В целом, графики измеряющие поток входящих сообщений показывают нам стабильную и равномерную картину, однако, время от времени возникают неожиданные всплески (фиолетовые и желтые графики), которые создают команды разработки в топикахdevсред, предположительно, во время проведения тестов.

ИспользованиеKafkaв цепочке поставки логов позволяет нам контролировать поток входящих сообщений,Logstash(у каждой команды свой) будет равномерно вычитывать все, что попадает в топикKafka, а мы будем спокойны, что наш конвейер поставки логов не упадет от внезапно возросшей нагрузки. В случае, если нашconsumerстанет недоступен или не будет справляться с нагрузкой, все события так или иначе останутся в топике и будут прочитаны и отправлены вElasticsearchпосле восстановления работоспособностиLogstash.

Кластер для команды

Кроме большого кластера, который мы используем для гарантированной доставки логов, некоторые команды используют кластерыKafkaв своих проектах. При этом задачи, которые они решают с помощью этого продукта, схожи с нашими гарантированная запись данных, возможность обратиться к любым данным в течение времени их жизни в топике, отказоустойчивость и целостность.

У нас есть информационный ресурс, в котором описаны все сервисы, предоставляемые нашей командой, в том числе и параметры по умолчанию, которые устанавливаются для той или иной системы. Аналогичные дефолтные значения прописаны и для кластераKafka. Вот некоторые из них:

auto.create.topics.enable=Truedelete.topic.enable=Truelog.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connection.timeout.ms=6000auto.leader.rebalance.enable=true

Нести в статью все, что описано в роляхAnsibleя не вижу смысла, но приведу некоторые примеры того, что используем мы вX5 .

Для обеспечения надежности кластера рекомендуется использовать количество реплик равное количеству нод и выставить значение минимально синхронизированных реплик на единицу меньше, чтобы в случае вылета одного узла, кластер смог продолжить работу. Для удобства мы используем именно эти значения, если иное не описано в инвентаре для проекта:

- name: 01|Set Kafka replication factor  set_fact:    kafka_cfg_default_replication_factor: "{{  kafka_cfg_default_replication_factor | default(kafka_hosts|length) }}"    kafka_cfg_offsets_topic_replication_factor: "{{ kafka_cfg_offsets_topic_replication_factor | default(kafka_hosts|length) }}"    kafka_cfg_transaction_state_log_replication_factor: "{{ kafka_cfg_transaction_state_log_replication_factor | default(kafka_hosts|length) }}"  run_once: True- name: 01|Set kafka ISR  set_fact:    kafka_cfg_min_insync_replicas: "{{ kafka_cfg_min_insync_replicas | default([kafka_cfg_default_replication_factor|int - 1 , 1] | max) }}"    kafka_cfg_transaction_state_log_min_isr: "{{ kafka_cfg_transaction_state_log_min_isr | default([kafka_cfg_transaction_state_log_replication_factor|int - 1 , 1] | max) }}"  run_once: True

По умолчанию мы выставляем удаление всех событий старше 30 дней, обычно этого хватает командам:

log.retention.hours=720

В случае, если команде требуются иные значения, то изменить какие-то параметры несложно. Нужно просто описать параметры и их значения в инвентаре нужного проекта:

Project.yml---project: name. . .kafka_scala_version: "2.11"kafka_zk_chroot: '/'kafka_enable_protocol: ['PLAINTEXT']kafka_cfg_default_replication_factor: 2kafka_cfg_log_retention_hours: 6kafka_cfg_log_segment_bytes: 52428800

Как и в случае с общим кластером, для обеспечения безопасности, мы используем SSL сертификаты. По умолчанию предоставляем кластер с параметромkafkaenableprotocol: ['SSL'], что гарантирует возможность подключения к кластеру только тех, кто имеет соответствующие клиентские сертификаты.

- name: Lookup for ssl data in Vault  set_fact:    jks_b64: "{{ lookup('hashi_vault', 'secret=sre/{{ env }}/{{ project }}/kafka/{{ inventory_hostname }}:kafka.keystore.jks.b64') }}"- name: Copy keystore data from Vault  copy:    dest: "/opt/kafka/ssl/{{ inventory_hostname }}/kafka.keystore.jks"    content: "{{ jks_b64 | b64decode }}"

Для удобства управления мы заворачиваемKafkaиZookeeperв сервисы, поскольку не используем контейнеры. Пример шаблона сервисаKafka, которыйAnsibleприносит на виртуальную машину:

[Unit]Description=Kafka DaemonAfter=zookeeper.service[Service]Type=simpleUser={{ kafka_user }}Group={{ kafka_group }}LimitNOFILE={{ kafka_nofiles_limit }}Restart=on-failureEnvironmentFile=/etc/default/kafkaExecStart={{ kafka_bin_path }}/kafka-server-start.sh {{ kafka_config_path }}/server.propertiesExecStop={{ kafka_bin_path }}/kafka-server-stop.shWorkingDirectory={{ kafka_bin_path }}[Install]WantedBy=multi-user.target

Плюсытакогоразделения:

  • разграничение ресурсов.

    Каждая команда знает, сколько ресурсов выделено конкретно под их продукт, и не переживает о том, что проект-сосед может занять большой объем оперативной памяти, тем самым повлиять на производительность их системы. Помимо этого, мы можем предоставлять командам разработки дополнительные инструменты и не думать о том, что какие-то из их действий могут навредить кому-то еще кроме них самих;

  • гибкость управления.

    В случае, если необходимо изменить какие-то параметры и перезапустить Kafka, не требуется согласовывать работы с большим количеством команд.

Пример нагрузки на одном из кластеров, который использует система маркировки:

Видно, что нагрузка на систему держится примерно на одном уровне на протяжении любого периода времени (в примере это неделя).

Из минусов можно выделить то, чтоKafkaможет быть избыточна для команд, которым нужен простой брокер сообщений.

Дополнительные инструменты для работы с Kafka

Поскольку основными пользователями отдельных кластеров являются продуктовые команды, мы должны обеспечить их всем необходимым для того, чтобы можно было следить за состоянием кластера, получать информацию о настройках и содержимом топиков.

Таким набором по умолчанию у нас являются экспортеры для сбора метрик и панели графиковGrafanaдля визуализации этих метрик:

jmx-exporter позволяет отслеживать состояниеJavaVirtualMachine,

kafka-exporter,zookeeper-exporter для того чтобы понимать, как себя чувствуют наши сервисы и получать поверхностную картину,

telegraf дает информацию о состоянии ноды, на которой крутитсяKafka.

Большинству команд этого хватает. Для тех, кому нужно чуть больше информативности, мы предлагаемkafka-minionexporter(https://github.com/cloudworkz/kafka-minion), который позволяет получать больше информации о том, что происходит с топиками, например, сколько групп потребителей подключены к топику и т.п.

Поскольку у команд нет прямого доступа на сервер сKafka, им нужно дать возможность просматривать содержимое и, например, быстро удалять топики, не дергая для этого каждый раз нас. Для решения этих задач мы предлагаем использоватьKafdrop (https://github.com/obsidiandynamics/kafdrop). Для оперативного предоставленияKafdrop, мы используем готовыйCIpipeline, который поднимает в окруженииOpenShiftдва пода:Kafdropиnginx. В результате мы получаемwebUIсbasicаутентификацией, настроенной средствамиnginx.

Помимо этого, точечно по запросам команд мы можем подготовить различные коннекторы, например, коннекторы для баз данных (PostgreSQLConnector,MongoDBKafkaConnector),ksqlDBилиKafkaC22CC23CC24Cдля взаимодействия с кластером черезRESTAPI.C25C

Заключение

Как видно из нашего рассказа,Kafkaотлично зарекомендовала себя в качестве вспомогательного сервиса в цепочке поставки логов, в том числе за счет удобства масштабирования и возможности вернуться к сообщениям, которые уже были прочитаны. Для применения в разработке продуктовKafkaимеет всевозможные коннекторы, которые облегчают интеграцию с другими компонентами продукта. Кроме этого, существует большое количество инструментов, облегчающих жизнь инженеров и разработчиков, работающих с кластером.

Тем не менее, для команд не всегда этот инструмент подходит из-за возможной избыточности функционала и объема необходимых вычислительных ресурсов, что в свою очередь увеличивает затраты на бюджет проекта. Именно поэтому сейчас мы начинаем в пилотном режиме предоставлять командамRabbitMQ, но это уже совсем другаяистория.

Подробнее..

Доступны бесплатные уроки видеокурса по Apache Kafka

26.02.2021 06:14:08 | Автор: admin


Мы открыли доступ к базовым темам курса по Apache Kafka, начать учиться можно уже сейчас.


В программе две теоретические темы Введение и Базовые основы технологии и практическая тема Установка Kafka. В ней поработаем с технологией руками:


  1. Развернём Kafka в самом простом её варианте с одним брокером и одной нодой ZooKeeper.
  2. Запишем и прочитаем сообщения, посмотрим в конфиги и увидим, как данные хранятся на диске.

Спикеры курса


  • Анатолий Солдатов, Lead Engineer в Авито;
  • Александр Миронов, Infrastructure Engineer в Stripe, ex-Booking.

Пара отзывов про базовые темы

1.
Спасибо, действительно толково объяснено, вспомнил и узнал некоторые нюансы! Как вариант, посмотрел бы полный курс, без практики и стендов, т.к. я разработчик, а не администратор, для расширения кругозора и понимания нюансов использования тула.


Хорошо бы файл с презентацией заранее показать, а то сидел делал лишние скриншоты со схемами :) + Показать запуск команд на видео, наверное, важно, но здорово, что вы сделали текстовую транскрипцию. Лично мне гораздо проще прочитать ее в такие моменты. + Отличное качество контента, видео/звук/презентации, приятно слушать и смотреть преподавателей. Молодцы!


2.
Очень крутой курс, который дает минимальное понимание, что такое Kafka и как с ней работать. Очень крутые спикеры, которых реально интересно слушать. Информация не скучная и хочется продолжить изучение после каждого этапа/шага. Жаль, что так мало вошло в бесплатную версию. Очень бы хотелось получить больше материала в виде видео, даже без практических заданий и обеспечения тестовых стендов, аналогично тому, как было сделано с курсом Kubernetes База, который выходил на YouTube в открытом доступе. В остальном, курс очень интересный и данную тему хочется изучать глубже. Спасибо за материал и вашу работу!


Доступ к курсу придёт после регистрации: https://slurm.io/kafka


Релиз всех тем видеокурса будет 7 апреля. До релиза действует цена предзаказа 40 000 руб. вместо 50 000 руб.

Подробнее..

Перевод Как Apache Kafka поддерживает 200К партиций в кластере?

02.03.2021 04:06:51 | Автор: admin


В Kafka топик может содержать множество партиций, между которыми распределяются записи. Партиции это единицы параллелизма. В целом, чем больше партиций, тем выше пропускная способность. Однако есть некоторые факторы, которые стоит учитывать, когда в кластере Kafka много партиций. Рад сообщить, что в Apache Kafka, начиная с версии 1.1.0, было существенно увеличено количество партиций, которые может поддерживать один кластер Kafka с точки зрения деплоймента и доступности.


Чтобы понять это улучшение, будет полезно повторить базовую информацию о лидерах партиций и контроллере. Во-первых, каждая партиция может иметь несколько реплик для большей доступности и устойчивости. Одна из реплик назначается лидером, и все запросы клиента обслуживаются этой ведущей репликой. Во-вторых, один из брокеров в кластере выступает в качестве контроллера, который управляет всем кластером. В случае отказа брокера контроллер отвечает за выбор новых лидеров партиций на отказавшем брокере.


Брокер Kafka по умолчанию выполняет контролируемое отключение, чтобы минимизировать сбои в обслуживании клиентов. Контролируемое отключение проходит следующие этапы. (1) Сигнал SIG_TERM отправляется брокеру для завершения работы. (2) Брокер отправляет запрос контроллеру, уведомляя, что он готов к отключению. (3) Контроллер затем меняет лидеров партиций на этом брокере на других брокеров и сохраняет эту информацию в ZooKeeper. (4) Контроллер отправляет информацию о новых лидерах другим брокерам в кластере. (5) Контроллер отправляет выключающемуся брокеру положительный ответ на запрос, и брокер, наконец, завершает свой процесс. Таким образом это никак не влияет на клиентов, потому что их трафик уже перенаправлен на других брокеров. Этот процесс изображен на Рисунке 1. Заметьте, что шаги (4) и (5) могут происходить параллельно.



Рис. 1. (1) Инициация отключения на брокере 1; (2) брокер 1 отправляет запрос о контролируемом отключении контроллеру на брокере 0; (3) контроллер записывает новых лидеров в ZooKeeper; (4) контроллер отправляет информацию о новых лидерах брокеру 2; (5) контроллер отправляет положительный ответ брокеру 1.


До версии Kafka 1.1.0 во время контролируемого отключения контроллер перемещает лидеров по одной партиции за раз. Для каждой партиции контроллер выбирает нового лидера, параллельно записывает его в ZooKeeper и отправляет нового лидера другим брокерам через удаленный запрос. В этом процессе есть несколько недостатков. Во-первых, у синхронной записи в ZooKeeper более высокое время задержки, что замедляет процесс контролируемого отключения. Во-вторых, отправка нового лидера по партиции за раз добавляет много маленьких удаленных запросов к каждому брокеру, что может вызвать задержку обработки новых лидеров.


Начиная с Kafka 1.1.0 и далее контролируемое отключение происходит значительно быстрее. Во-первых, при записи в ZooKeeper используется асинхронный API. Вместо того чтобы записывать лидера для одной партиции, ждать, пока этот процесс завершится, а потом записывать следующего, контроллер асинхронно размещает лидеров для многих партиций в ZooKeeper, а потом ждет завершения процесса. Это позволяет обрабатывать запросы на пайплайне между брокером Kafka и сервером ZooKeeper и снижает общее время задержки. Во-вторых, информация о новых лидерах отправляется батчами. Вместо одного удаленного запроса для каждой партиции контроллер отправляет один удаленный запрос с лидером от всех затронутых партиций.


Время обработки отказа контроллером также было значительно улучшено. Если случается отказ контроллера, кластер Kafka автоматически выбирает контроллером другого брокера. Прежде чем иметь возможность выбрать лидеров партиций свеженазначенный контроллер должен загрузить состояние всех партиций в кластере из ZooKeeper. Если произошел серьезный сбой контроллера, окно недоступности партиции может длиться столько, сколько продолжается срок действия сессии ZooKeeper, плюс время перезагрузки состояния контроллера. Поэтому сокращение времени перезагрузки состояния улучшает доступность в таком редком случаем. До версии 1.1.0 в Kafka для перезагрузки используется синхронный API ZooKeeper. Начиная с версии 1.1.0 это также заменено на асинхронный API для сокращения времени задержки.


Мы провели тесты для оценки улучшений по времени контролируемого отключения и перезагрузки контроллера. Для обоих тестов мы загрузили набор из пяти узлов ZooKeeper на выделенных серверах.


Для первого теста мы подготовили кластер Kafka с пятью брокерами на отдельных серверах. В этом кластере мы создали 25 000 топиков, в каждом топике по одной партиции и две реплики, в общем получив 50 000 партиций. Таким образом на каждом брокере было 10 000 партиций. Затем мы замерили время, которое понадобилось для контролируемого отключения. Результаты представлены в таблице ниже.


Версия Kafka 1.0.0 Kafka 1.1.0
Время контролируемого отключения 6,5 минут 3 секунды

Большую часть улучшений дает исправление затрат на журналирование, при котором проводятся ненужные записи для каждой партиции в кластере при смене лидера одной партиции. Просто исправив затраты на журналирование, мы снизили время контролируемого отключения с 6,5 минут до 30 секунд. Переход на асинхронный API ZooKeeper снизил это время до 3 секунд. Эти улучшения существенно снизили время, необходимое для перезагрузки кластера Kafka.


Для второго теста мы подготовили другой кластер Kafka, состоящий из пяти брокеров, создали 2 000 топиков, в каждом по 50 партиций и одной реплике. В сумме во всем кластере получилось 100 000 партиций. Затем мы замерили время перезагрузки состояния контроллера и увидели 100% улучшение (время перезагрузки снизилось с 28 секунд в Kafka 1.0.0 до 14 секунда в Kafka 1.1.0).


Учитывая эти изменения, на поддержку какого количества партиций вы можете рассчитывать в Kafka? Точное число зависит от таких факторов как допустимое окно недоступности, время задержки ZooKeeper, тип хранения на брокере и т.д. В качестве общего правила мы рекомендуем иметь на каждом брокере до 4 000 партиций и до 200 000 партиций в кластере. Основная причина для лимита на кластере заключается в том, что нужно подстраховаться на тот редкий случай серьезного сбоя контроллера, о котором мы писали выше. Обратите внимание, что другие соображения, связанные с партициями, также применимы, и вам может потребоваться дополнительная настройка конфигурации с большим количеством партиций.


Более подробную информацию вы найдете в KAFKA-5642 и KAFKA-5027.


От редакции: Команда Слёрма готовит подробный видеокурс по Apache Kafka. Спикеры Анатолий Солдатов из Авито и Александр Миронов из Stripe. Вводные уроки уже доступны в личном кабинете. Релиз полной версии курса апрель 2021. Подробности.

Подробнее..

Интеграционный слой с Kafka и микросервисами опыт построения операционной CRM контакт-центра торговой сети Пятерочка

04.03.2021 10:11:49 | Автор: admin

Из этого поста вы узнаете, зачем добавлять в интеграционный слой бизнес-логику, что случается, когда не летит Service mesh, и почему иногда костыли лучшее решение проблемы.



Привет Хабр, на связи Иван Большаков архитектор интеграционных решений, эксперт департамента разработки ПО КРОК. Я расскажу, как мы делали интеграционный слой для CRM-системы группы контакт-центров торговой сети Пятерочка.


Всего в системе одновременно находятся десятки тысяч пассивных пользователей с открытыми интерфейсами и сотни активных, которые пишут в чаты, принимают звонки и нажимают на кнопки. Операторы одновременно работают с десятком различных систем


На старте разработки контакт-центры Пятерочки располагались на 7 разных площадках. И все они использовали разные решения для учета звонков: платформы дискретных каналов, обработки обращений и т. д. Решения по обращениям велись в отдельной системе, а уведомления по обращениям делались в ручном режиме.


Между этими системами не было интеграций, так что операторы переключались между окнами, копировали номера телефонов, искали профили клиентов и выполняли массу других рутинных действий.


Все это замедляло обработку обращений и увеличивало время реакции в каждом конкретном случае. Операторы просто не могли решить проблему клиента здесь и сейчас. Вместо этого они тратили силы и время на то, что стоило бы автоматизировать.




Сотрудники контакт-центров справлялись с работой и могли бы продолжать в том же духе, но заказчик подсчитал, во сколько обходится эта рутина, и уже не мог спать спокойно.


Возможные решения



Первое решение, которое мы рассматривали, перенести логику в веб-приложение оператора контакт-центра. То есть, сделать UI, который будет вызывать методы на бэкендах тех или иных систем.


Логичный выход, который решит проблему заказчика, но не дает дополнительных преимуществ.



Другой вариант enterprise service bus. Это классический сценарий интеграции: разработать пользовательский интерфейс, подключить его к опубликованным на интеграционной шине методам и наслаждаться жизнью.


Но ESB в 2020 году уже не самое популярное решение. К тому же, заказчику хотелось реализовать дополнительную функциональность, например, подсистему отчетности, а она требует разработки бэкенда.



Мы решили использовать для интеграционного слоя микросервисы. Они позволяют реализовать дополнительную бизнес-логику и обеспечивают масштабируемость. Их проще распределить по нескольким ЦОД, по сравнению с ESB.


Кроме того, интеграционный слой на базе микросервисов можно передавать постепенно, с развитием на стороне заказчика.


Архитектура интеграционного слоя


Опыт разработки микросервисов подсказывал нам, что не стоит требовать радикальной доработки информационных систем на стороне заказчика.


Нет гарантий того, что к концу проекта состав и интеграционные контракты систем, с которыми мы работаем, останутся прежними. У Пятерочки несколько контакт-центров от разных подрядчиков, и они могут сменить их в любой момент. Так что, интеграционный слой должен минимально влиять на окружающие системы.


Мы использовали стандартный прием спроектировали интерфейсы, написали адаптеры для каждой системы и связали их с нашим ядром через законтрактованный API.


Так как в интеграционный слой попала бизнес-логика, выбор пал на событийно-ориентированную архитектуру. В ней проще следить за согласованностью данных.


Проектируя архитектуру интеграционного слоя, мы старались не переусложнять, ведь микросервисность не самоцель проекта.


Мы не разделяли по сервисам компоненты, которые будут дорабатываться или масштабироваться вместе. Если можно было разрабатывать в составе одного сервиса несколько бизнес-сущностей, мы это делали. И все равно, в итоге получилось больше 30 микросервисов.


Вся сложность во взаимодействиях


С внешними системами было просто. Мы не могли ничего менять, так что использовали те API, которые они предоставляли.



Внутри системы мы выделили два типа взаимодействий: модифицирующие и читающие. Модифицирующие воздействия реализовали при помощи обмена сообщениями через топики в Kafka. Каждому сервису присвоили топик, который начинается с его имени. Сервис отправляет туда сообщения обо всех событиях, произошедших с его объектами:


с бизнес-сущностью, которую обрабатывает сервис user, что-то произошло, вот ее новые данные

Этот топик читают другие сервисы. Они реагируют на изменения и отписываются в свои топики. Так действие пользователя по каскаду распространяется по всей системе.


Это удобно, ведь при необходимости можно перечитать данные из Kafka, и, если что-то пойдет не так, откатить транзакции сразу в нескольких сервисах.


Как видите, вместо оркестрации мы остановились на хореографии. В нашей системе нет центрального сервиса, в котором закожены бизнес-процессы. Они складываются из множества мелких взаимодействий. Кусочек бизнес-логики заложен в каждом сервисе.



Читающие взаимодействия идут по gRPC. Мы выбрали этот протокол не из-за фич типа стриминга (их пока не используем), а потому, что с его помощью удобно управлять контрактами.


Там, где согласованность не нужна, по gRPC идут и модифицирующие взаимодействия. Так мы снизили сложность системы и сэкономили время разработчиков.


Kafka и очереди сообщений


Мы использовали Kafka в качестве брокера сообщений, потому что купились на отказоустойчивость. Между ЦОДами, где расположены системы заказчика, небольшие задержки, так что она работает неплохо. Однако, где-то в середине проекта мы поняли, что просчитались.



Потребовалась приоритизация сообщений. Нужно было сделать так, чтобы сообщения из одного топика читались, только когда все вычитано из другого. Вообще, приоритизация стандартная вещь, которая есть хоть в каком-то виде в любом брокере, но Kafka не брокер.


Для последовательного чтения пришлось прикрутить некрасивый костыль, который проверяет отставание офсета в первом топике и разрешает чтение из другого. Если бы мы использовали RabbitMQ, было бы проще, но, возможно, возникли бы вопросы относительно производительности.


Взаимодействие с фронтом


Здесь обошлось без rocket science. Взаимодействие с фронтом мы выстроили при помощи WebSocket и REST. WebSocket нужен для передачи событий от бека к фронту и высоконагруженных управляющих взаимодействий, а REST для запроса данных. Правда, пришлось написать отдельный микросервис для фронтенд-разработчиков.



Ходить за несколькими сущностями в три разных сервиса, а потом собирать на фронте какую-то адекватную модель для представления не очень удобно. Куда удобнее использовать composer, который будет брать сразу несколько сущностей со связями между ними и компоновать в один JSON. Для этого отлично подошел Apache Camel. Правда, в следующий раз мы не станем заморачиваться с REST и используем graphQL.


Service mesh и безопасность


Service mesh нужен, чтобы не прописывать вручную всю логику межсервисных взаимодействий. Как правило, для соединения микросервисов мы используем Istio и в этом проекте тоже рассчитывали на эту платформу. Но в этом году все идет как-то не так, как ожидалось.



Мы столкнулись с такой версией OKD, на которую никак не получалось установить Istio. Заказчик собирался в будущем обновить систему оркестрации, но нам нужно было решение здесь и сейчас.


Дедлайн приближался, а инженеры не знали, когда мы справимся с проблемой. Пришлось минимизировать риски и сделать какое-то подобие service mesh, чтобы точно запуститься вовремя.


Конечно, полноценный service mesh на коленке не напишешь, но можно было обойтись без управления трафиком, а мониторинг настроить вручную. Главное, что нам нужно было от service mesh, это безопасность.



Было необходимо идентифицировать запросы от фронта к бекенду, проверять токены пользователей в каждом микросервисе и делать запросы GET, POST и так далее, в зависимости от роли в токене.


Итак, мы начали пилить собственный service mesh. В его основу легла простая идея: если нельзя (не хочется) поместить логику в код сервиса, сделаем внешний sidecar-контейнер по аналогии с Istio, использовав OpenResty и язык Lua.


Написали в контейнере код, который проверяет подпись токена. В качестве параметров к нему подкладываются правила, описывающие, какие запросы пропускать от пользователей с различным ролями.


Аудит


Теперь, когда у нас были sidecar, на них можно было повесить отправку данных в Kafka и реализовать бизнесовый аудит. Так заказчик будет знать, кто из пользователей системы изменял те или иные важные данные.



Существуют штатные библиотеки для работы с Kafka, но и с ними возникли проблемы. Все ломалось, как только для подключения к Kafka требовался SSL. Спас положение старый добрый kafka-console-producer, который лежит в каждом sidecar и вызывается с параметрами из nginx.


Получился рабочий почти-service mesh. Казалось бы, можно было расслабиться, но мы чуть не упустили еще одну важную вещь.


Балансировка gRPC


gRPC то работает на HTTP/2, а этот протокол устанавливает соединение и держит, пока оно не порвется. Возможна ситуация, когда инстанс сервиса A намертво прицепится к одному из инстансов сервиса B и будет направлять все запросы только на него. Тогда от масштабирования сервиса B не будет толка. Второй инстанс сервиса B останется не нагружен и будет впустую тратить процессорное время и память.



Мы могли сделать полноценную балансировку на клиентской стороне, но было бы неправильно тратить на это время. После внедрения полноценного service mesh эти наработки потеряют актуальность.


Поэтому мы реализовали перехват и подсчет всех вызовов gRPC на уровне кода на стороне клиента. Когда счетчики подходят к определенному значению, вызывается метод, приводящий к переустановке соединения, и балансировщик OKD задействует незагруженный инстанс сервиса B.


Конечно, не стоит повторять это идеальных условиях, но если возникли неожиданные препятствия, и нужно срочно запуститься, это решение имеет право на жизнь. Такая балансировка работает. Графики нагрузки по сервисам ровные.


Управление версиями


Напоследок, несколько слов о том, как мы управляли версиями кода.


В проекте задействованы Spring, Maven и Java. Каждый микросервис хранится в собственном микрорепозитории. В первую очередь потому что мы передавали систему заказчику по частям, сервис за сервисом.


Поэтому единой версии коммита для всей системы нет. Но, когда шла активная разработка, нам нужен был механизм контроля версий. Без него систему не получится целостно протестировать, выкатить на стейдж, а затем и на прод.



Прежде всего, мы стали собирать сервисы в образы, которые тегируются коротким хешем коммита. А внутри Maven проекта нет версионирования кода, там вместо версии мы использовали заглушку.


Затем мы выделили отдельный репозиторий со списком всех сервисов и их версиями. Коммит в этот репозиторий и назывался релизом. То есть, мы формировали манифест совместимых между собой версий микросервисов. Конечно, не руками, а через самодельные веб-интерфейсы.


Еще один момент, который стоит упомянуть, управление версиями интеграционных контрактов между сервисами.


Если сервис A и сервис B общаются по gRPC, и разработчик сервиса B исправит что-то в протофайле и сломает обратную совместимость, это выстрелит только, когда выйдет в рантайм. Слишком поздно. Поэтому мы храним все интеграционные контракты (протофайлы, классы для сериализации Kafka и т. д.) отдельно и подключаем как саб-модули. Благодаря этому решению, многие возможные проблемы проявляются на этапе компиляции.


Что мы вынесли из проекта


В этом проекте мы вновь убедились в том, как важно заранее обследовать инфраструктуру. Но даже после тщательной проверки нельзя расслабляться. Некоторые проблемы, типа несовместимости конкретной версии OKD с Istio, практически невозможно предвидеть.


В конечном счете мы рады, что не переусердствовали в разделении на бизнес-сущности и не разобрали систему на винтики. Можно было разбить каждый микросервис еще на три, но это ничего не дало бы заказчику. Архитектура операционной CRM контакт-центра и так смотрится достаточно красиво, и мы потратили на разработку в полтора раза меньше времени, чем могли бы.


Если подвести итог, то интеграция без ESB имеет право на жизнь, особенно, когда нужно решить задачу в сжатые сроки.


Если у вас появились вопросы, или вы знаете, как красиво сделать приоритизацию сообщений в Kafka, пишите в комментариях или на почту IBolshakov@croc.ru.

Подробнее..

Перевод Pulsar vs Kafka сравнение и мифы

26.03.2021 08:19:22 | Автор: admin


Pulsar или Kafka что лучше? Здесь мы обсудим плюсы и минусы, распространенные мифы и нетехнические критерии, чтобы найти лучший инструмент для ваших задач.


Обычно я рассказываю об Apache Kafka и ее экосистеме. О Pulsar за последние годы меня спрашивали только коммитеры и авторы Pulsar. Они задавали сложные технические вопросы, чтобы показать, что Kafka не идет ни в какое сравнение с Pulsar. На Reddit и подобных платформах разгораются яростные и очень субъективные споры на эту тему. Я поделюсь своей точкой зрения, основанной на многолетнем опыте работы со стриминговыми опенсорс-платформами.


Сравнение технологий нынче в моде: Kafka vs. Middleware, Event Streaming и API Platforms


Цель сравнения технологий помочь людям подобрать подходящее решение и архитектуру для своих потребностей. Универсального решения не существует. И это не дело вкуса. Просто для каждой задачи есть свой инструмент.


При этом такие сравнения почти всегда субъективны. Даже если автор не работает на вендора и представляется независимым консультантом, скорее всего, у него все равно есть любимчики, даже если он сам себе в этом не признается. Однако полезно посмотреть на инструменты с разных точек зрения. Поскольку Apache Pulsar сейчас обсуждают, я решил поделиться своим мнением и сравнить его с Kafka. Я работаю в Confluent, а мы лучшие эксперты по Apache Kafka и ее экосистеме. И все же я постараюсь быть максимально объективным и оперировать голыми фактами.


Опенсорс-фреймворки и коммерческие программы постоянно сравнивают. Я и сам делал такие сравнения в своем блоге и на других платформах, например InfoQ: Сравнение платформ интеграции, Выбор подходящего ESB для ваших потребностей, Kafka vs. ETL / ESB / MQ, Kafka vs. Mainframe и Apache Kafka и API Management / API Gateway. Просто заказчики хотели понять, какой инструмент поможет решить те или иные задачи.


Если говорить о сравнении Pulsar и Kafka, ситуация немного отличается.


Зачем сравнивать Pulsar и Kafka?


Реальные и потенциальные заказчики редко спрашивают о Pulsar. Хотя, если честно, в последние месяцы чуть чаще где-то на каждой 15-й или 20-й встрече. Потому что фичи и варианты использования у них частично совпадают. Хотя, по-моему, все дело в паре статей о том, что Pulsar якобы в чем-то лучше Kafka. В пользу противоположной точки зрения нет никаких фактов и очень мало материала.


Я не видел ни одной организации, которая бы всерьез задумывалась о Pulsar в продакшене, хотя по всему миру множество пользователей, которым нужна технология распределенной обработки сообщений, вроде Kafka или Pulsar. По-моему, те, кто рекомендует Pulsar, чего-то не договаривают.


Например, их главный пользователь Tencent, крупная китайская технологическая компания, которая при этом очень активно использует Kafka везде, кроме одного проекта, где пригодился Pulsar. Tencent обрабатывает с Kafka десять триллионов сообщений в день (только представьте: 10 000 000 000 000). Если посчитать, Tencent использует Kafka в тысячу раз активнее, чем Pulsar (10 трлн против десятков млрд). Tencent с удовольствием рассказывают о своем деплое Kafka: Как Tencent PCG использует Apache Kafka для обработки 10+ трлн сообщений в день.


Сравнение двух конкурирующих опенсорс-платформ


Apache Kafka и Apache Pulsar две замечательные конкурирующие технологии. Логично будет их сравнить.


У Apache Kafka и Apache Pulsar очень схожий набор функций. Советую оценить обе платформы на предмет доступных функций, зрелости, распространения на рынке, опенсорс-инструментов и проектов, обучающего материала, проведения митапов по регионам, видео, статей и так далее. Примеры вариантов использования в вашей отрасли помогут принять правильное решение.


Confluent уже писал сравнение Kafka vs. Pulsar vs. RabbitMQ: производительность, архитектура и функции. Я тоже поучаствовал. Значит, сравнение уже есть


О чем тогда эта статья?


Я хочу рассмотреть несколько мифов из споров на тему Kafka против Pulsar, которые регулярно возникают в блогах и на форумах. Затем я сравню их не только по техническим аспектам, потому что обычно никто не говорит о Pulsar с этой стороны.



Kafka vs Pulsar мифы


Мне попадалось несколько мифов. С некоторыми я согласен, другие можно легко развенчать фактами. Ваше мнение может отличаться от моего, это нормально. Я излагаю исключительно свою точку зрения.


Миф 1. У Pulsar есть характеристики, которых нет у Kafka.


Правда.


Если сравнивать Apache Kafka и Apache Pulsar, можно найти различия в многоуровневой архитектуре, очередях и мультитенантности.


Но!


У Kafka тоже есть уникальные особенности:


  • В два раза меньше серверов, которыми приходится управлять.
  • Данные сохраняются на диск только один раз.
  • Данные кэшируются в памяти только однажды.
  • Проверенный протокол репликации.
  • Производительность zero-copy.
  • Транзакции.
  • Встроенная обработка потока.
  • Долгосрочное хранилище.
  • В разработке: удаление ZooKeeper (KIP-500), чтобы использовать и деплоить Kafka было еще проще, чем Pulsar с его четырехкомпонентной архитектурой (Pulsar, ZooKeeper, BookKeeper и RocksDB), а еще чтобы повысить масштабируемость, устойчивость и т. д.
  • В разработке: многоуровневое хранилище (KIP-405), чтобы повысить эластичность и экономичность Kafka.

Спросите себя: можно ли сравнивать опенсорс-платформы или продукты и вендоров с комплексным предложением?


Легко добавлять новые функции, если для них не нужно предоставлять критическую поддержку. Не оценивайте характеристики по списку. Узнайте, насколько они проверены в рабочих сценариях. Сколько из этих уникальных особенностей имеют низкое качество и реализованы наспех?


Например. Понадобилось несколько лет, чтобы реализовать и испытать в бою Kafka Streams в качестве нативного движка обработки потоков. Как это можно сравнивать с Pulsar Functions? Последнее позволяет добавлять определяемые пользователем функции (UDF) безо всякой связи с обработкой реальных потоков. Или это скорее похоже на Single Message Transformations (SMT), основную функцию Kafka Connect? Не сравнивайте круглое с квадратным и не забывайте учитывать зрелость. Чем критичнее функция, тем больше зрелости ей требуется.


Сообщество Kafka вкладывает невероятные усилия в работу над основным проектом и его экосистемой. Только в Confluent 200 инженеров занимаются исключительно проектом Kafka, дополнительными компонентами, коммерческими продуктами и предложениями SaaS в основных облачных провайдерах.


Миф 2. У Pulsar есть несколько крупных пользователей, например китайский Tencent.


Правда.


Но! Tencent использует Kafka активнее, чем Pulsar. Отдел выставления счетов, где используется Pulsar, это лишь малая часть Tencent, в то время как остальные используют Kafka. У них есть глобальная архитектура на основе Kafka, где больше тысячи брокеров сгруппированы в единый логический кластер.


Будьте осторожны с опенсорс-проектами. Проверяйте, какого успеха добились с ними обычные компании. Если эту технологию использует один технологический гигант, это еще не значит, что вам она тоже подойдет. Сколько компаний из Fortune 2000 могут похвастаться историями успеха с Pulsar?


Ищите примеры не только среди гигантов!


Примеры обычных компаний позволят лучше понять, как применяется тот или иной инструмент в реальном мире. Если это не истории успеха от самих вендоров. На сайте Kafka можно найти много примеров компаний. Более того, на конференциях Kafka Summit в Сан-Франциско, Нью-Йорке и Лондоне каждый год разные предприятия из разных отраслей делятся своими историями и кейсами. Это компании из Fortune 2000, предприятия среднего бизнеса и стартапы.


Приведу один пример про Kafka. Для репликации данных в реальном времени между отдельными кластерами Kafka существует много разных инструментов, включая MirrorMaker 1 (часть проекта Apache Kafka), MirrorMaker 2 (часть проекта Apache Kafka), Confluent Replicator (от Confluent, доступен только в рамках Confluent Platform и Confluent Cloud), uReplicator (опенсорс от Uber), Mirus (опенсорс от Salesforce), Brooklin (опенсорс от LinkedIn).


На практике разумно использовать только два варианта, если, конечно, вы не хотите обслуживать и улучшать код самостоятельно. Это MirrorMaker 2 (еще не вполне зрелая новинка, но отличный выбор для средне- и долгосрочной перспективы) и Confluent Replicator (проверенный на практике во многих критически важных для бизнеса деплоях, но платный). Остальные варианты тоже нормальные. Но кто обслуживает эти проекты? Кто разбирается с багами и проблемами безопасности? Кому звонить, если в продакшене что-то случилось? Одно дело деплоить критически важные системы в продакшене, другое оценивать и пробовать опенсорс-проект.


Миф 3. Pulsar предлагает очереди и стриминг в одном решении.


Частично.


Очереди используются для обмена сообщениями между двумя точками. Они предоставляют асинхронный протокол взаимодействия, то есть отправитель и получатель сообщения не должны обращаться к очереди одновременно.


Pulsar предлагает только ограниченную поддержку очередей сообщений и стриминга событий. В обеих областях он пока не может составить крепкую конкуренцию по двум причинам:


  1. Pulsar предлагает ограниченную поддержку очередей сообщений, потому что ему не хватает таких популярных функций, как XA-транзакции, маршрутизация, фильтрация сообщений и т. д. Это обычные функции в таких системах сообщений, как IBM MQ, RabbitMQ, и ActiveMQ. Адаптеры Pulsar для систем сообщений тоже ограничены в этом смысле. В теории все выглядит нормально, но на практике...


  2. Pulsar предлагает ограниченную поддержку стриминга. Например, на практике в большинстве случаев он не поддерживает семантику строго однократной (exactly-once) доставки и обработки. Вряд ли кто-то станет использовать Pulsar для платежной системы, потому что платежи могут дублироваться и теряться. Ему не хватает функционала для обработки потоков с соединением, агрегированием, окнами, отказоустойчивым управлением состоянием и обработкой на основе времени событий. Топики в Pulsar отличаются от топиков в Kafka в худшую сторону из-за BookKeeper, который был придуман в 2008 году как лог упреждающей записи для HDFS namenode в Hadoop в расчете на краткосрочное хранение.



Примечание. Адаптер Kafka для Pulsar тоже ограничен. В теории звучит заманчиво, но в реальности не все получается, потому что он поддерживает только небольшую часть функционала Kafka.


Как и Pulsar, Kafka предлагает ограниченную поддержку очереди.


В Kafka можно использовать разные обходные пути, чтобы реализовать нормальное поведение очереди. Если вы хотите использовать отдельные очереди вместо общих топиков Kafka для:


  • Безопасности => используйте Kafka ACL (и дополнительные инструменты, вроде контроля доступа на основе ролей (RBAC) от Confluent).
  • Семантики (отдельных приложений) => используйте группы консюмеров в Kafka.
  • Балансировки нагрузки => используйте партиции Kafka.

Обычно я спрашиваю заказчиков, зачем им нужны очереди. Kafka предлагает готовые решения для разных сценариев, на которые можно просто посмотреть с другой стороны. Редко встречаются случаи, когда действительно требуется высокая пропускная способность с очередями.


Зная обходные пути и ограничения Pulsar и Kafka в плане сообщений, давайте проясним: ни одна из платформ не предлагает решение для обмена сообщениями.


Если вам нужно именно оно, возьмите что-то вроде RabbitMQ или NATS.


Однозначного решения здесь нет. Многие наши заказчики заменяют имеющиеся системы сообщений, вроде IBM MQ, на Kafka (из соображений масштабируемости и затрат). Просто нужно знать имеющиеся варианты и их недостатки, все взвесить и подобрать решение, которое лучше всего подходит именно вам.


Миф 4. Pulsar предоставляет потоковую обработку.


Неправда.


Или, если по-честному, все зависит от того, что вы называете обработкой потоков. Это какая-то простейшая функция или прямо полноценная обработка?


Если в двух словах, обработкой потоков я называю непрерывное потребление, обработку и агрегирование событий из разных источников данных. В реальном времени. В любом масштабе. Естественно, с защитой от сбоев, особенно если речь идет о stateful.



Pulsar предлагает минимальную потоковую обработку через Pulsar Functions. Она подходит для простых обратных вызовов, но не сравнится с функционалом Kafka Streams или ksqlDB для создания стриминговых приложений со stateful-информацией, скользящими окнами и другими функциями. Варианты применения можно найти в разных отраслях. Например, на сайте Kafka Streams есть кейсы New York Times, Pinterest, Trivago, Zalando и других.


Для стриминга аналитики Pulsar обычно используется в сочетании с нормальной платформой обработки потоков, вроде Apache Spark или Apache Flink, но вам придется управлять дополнительным элементами распределенной архитектуры и разбираться в их сложном взаимодействии.


Миф 5. Pulsar предоставляет семантику exactly-once, как и Kafka.


Неправда.


В Pulsar есть функция дедупликации, чтобы одно сообщение не сохранялось в брокере Pulsar дважды, но ничто не мешает консюмеру прочитать это сообщение несколько раз. Этого недостаточно для любого применения обработки потоков, где для входа и выхода используется Pulsar.


В отличие от функции транзакций в Kafka, здесь нельзя привязывать отправленные сообщения к состоянию, записанному в обработчике потоков.


Семантика Exactly-Once Semantics (EOS) доступна с версии Kafka 0.11, которая вышла три года назад, и используется во многих продакшен-деплоях. Kafka EOS поддерживает всю экосистему Kafka, включая Kafka Connect, Kafka Streams, ksqlDB и такие клиенты, как Java, C, C++, Go и Python. На конференции Kafka Summit было несколько выступлений, посвященных функционалу Kafka EOS, включая это превосходное и понятное введение со слайдами и видео.


Миф 6. У Pulsar производительность выше, чем у Kafka.


Неправда.


Я не поклонник разных бенчмарков производительности и пропускной способности. Обычно они субъективны и относятся к конкретной задаче независимо от того, кто их проводит вендор, независимый консультант или исследователь.


Например, GIGAOM опубликовали один бенчмарк, где сравнивается задержка и производительность в Kafka и Pulsar. Автор намеренно замедлил Kafka, настроив параметр flush.messages = 1, в результате которого каждый запрос вызывает fsync и Kafka синхронизируется с диском при каждом сообщении. В этом же бенчмарке консюмер Kafka отправляет подтверждение синхронно, а консюмер Pulsar асинхронно. Неудивительно, что Pulsar вышел явным победителем. Правда, автор забыл упомянуть или объяснить существенные отличия в конфигурации и измерениях. Давайте не будем путать теплое с мягким.


На самом деле архитектура Pulsar сильнее нагружает сеть (из-за брокера, который выступает как прокси перед серверами bookie BookKeeper) и требует в два раза больше пропускной способности (потому что BookKeeper записывает данные в лог упреждающей записи и в главный сегмент).


Confluent тоже делал бенчмарки. Только там сравнивалось сопоставимое. Неудивительно, что результаты получились другими. Но нужно ли вообще встревать в эти бои на бенчмарках между вендорами?


Оцените свои требования к производительности. Сделайте proof-of-concept с Kafka и Pulsar, если надо. Скорее всего, в 99% случаев обе платформы покажут приемлемую производительность для вашего сценария. Не доверяйте посторонним субъективным бенчмаркам! Ваш случай все равно уникален, и производительность это только один из аспектов.


Миф 7. Pulsar проще в использовании, чем Kafka.


Неправда.


Без дополнительных инструментов вам будет сложно и с Kafka, и с Pulsar.


У Kafka две распределенных системы: сама Kafka и Apache ZooKeeper.


Но! У Pulsar три распределенных системы, а еще хранилище: Pulsar, ZooKeeper и Apache BookKeeper. Как и Pulsar, BookKeeper использует ZooKeeper. Для некоторых задач хранения используется RocksDB. Это означает, что Pulsar гораздо сложнее понять и настроить по сравнению с Kafka. Кроме того, у Pulsar больше параметров конфигурации, чем у Kafka.


Kafka движется в противоположном направлении скоро ZooKeeper будет удален (см. KIP-500), так что останется всего одна распределенная система, которую нужно деплоить, обслуживать, масштабировать и мониторить:



ZooKeeper мешает масштабированию в Kafka и усложняет эксплуатацию. Но у Pulsar все еще хуже!


Одна из главных проблем моих заказчиков использование ZooKeeper в критически важных деплоях в большом масштабе. С нетерпением жду упрощения архитектуры Kafka, чтобы остались только брокеры. Кроме того, мы получим единую модель безопасности, потому что больше не придется отдельно настраивать безопасность для ZooKeeper. Это огромное преимущество, особенно для крупных организаций и отраслей со строгим регулированием. Ребята из комплаенса и ИБ скажут вам спасибо за упрощенную архитектуру.


Использование платформы связано НЕ только с архитектурой


У Kafka гораздо более подробная документация, огромное сообщество экспертов и большой набор поддерживающих инструментов, которые упрощают работу.


Кроме того, по Kafka есть много вариантов обучения онлайн-курсы, книги, митапы и конференции. К сожалению, у Pulsar с этим все плохо.


Миф 8. Архитектура с тремя уровнями лучше, чем с двумя.


Зависит от ситуации.


Лично я скептически отношусь к тому, что трехуровневую архитектуру Pulsar (брокеры Pulsar, ZooKeeper и BookKeeper) можно считать преимуществом для большинства проектов. Тут есть издержки.


Twitter рассказал, как отказался от BookKeeper + DistributedLog (DistributedLog похож на Pulsar по архитектуре и дизайну) около года назад, соблазнившись такими преимуществами одноуровневой архитектуры Kafka, как экономичность и улучшенная производительность, которых недоставало двухуровневой архитектуре с отдельным хранилищем.


Как и Pulsar, DistributedLog построен на BookKeeper и добавляет стриминговый функционал, схожий с Pulsar (например, уровень обработки существует отдельно от уровня хранилища). Изначально DistributedLog был отдельным проектом, но потом присоединился к BookKeeper, хотя сегодня им, похоже, мало кто занимается (всего пара коммитов за последний год). Среди основных причин перехода Twitter на Kafka называлась значительная экономия и повышение производительности, а также обширное сообщество и популярность Kafka. Вот какие выводы они сделали: Для одного консюмера экономия ресурсов составила 68%, а для нескольких целых 75%.


Преимущества трехуровневой архитектуры проявляются при создании масштабируемой инфраструктуры. При этом дополнительный уровень увеличивает нагрузку на сеть минимум на 33%, а данные, которые хранятся в брокерах Pulsar, приходится дополнительно кэшировать на обоих уровнях, чтобы добиться аналогичной производительности, а еще дважды записывать на диск, потому что формат хранения в Bookkeeper основан не на логе.


В облаке, где работает большинство деплоев Kafka, лучшим внешним хранилищем выступает не такая узкоспециализированная технология, как BookKeeper, а популярное и проверенное хранилище объектов, вроде AWS S3 и GCP GCS.


Tiered Storage в Confluent Platform на основе AWS S3, GCP GCS и им подобных, дает те же преимущества без дополнительного слоя BookKeeper, как у Pulsar, и без дополнительных затрат и задержек, связанных с передачей данных по сети. У Confluent ушло два года на то, чтобы выпустить GA-версию Tiered Storage for Kafka с круглосуточной поддержкой для самых критичных данных. Tiered Storage пока недоступно для опенсорсной версии Apache Kafka, но Confluent вместе с сообществом Kafka (включая крупные технологические компании, вроде Uber) работает над KIP-405, чтобы добавить Tiered Storage в Kafka с разными вариантами хранения.


У обеих архитектур есть плюсы и минусы. Лично мне кажется, что 95% проектов не нуждаются в сложной трехуровневой архитектуре. Она может пригодиться там, где требуется внешнее недорогое хранилище, но вам придется отвечать за круглосуточное соблюдение SLA, масштабируемость и пропускную способность. А еще за безопасность, управление, поддержку и интеграцию в вашу экосистему. Если вам действительно нужна трехуровневая архитектура, не стану вас останавливать.


Связанный миф: Pulsar лучше подходит для отстающих консюмеров благодаря уровню кэширования и хранения.


Неправда.


Основная проблема отстающих консюмеров в том, что они занимают page cache, то есть последние сообщения уже кэшированы. При чтении из предыдущих сегментов они заменяются, что снижает производительность консюмеров, читающих с начала лога.


Архитектура Pulsar проявляет себя даже хуже в этом отношении. Проблема сброса кэша сохраняется, при этом для чтения нужно сделать лишний прыжок по сети и загрузить сеть вместо того, чтобы просто прочитать сообщение с локального носителя.


Миф 9. Kafka масштабируется хуже, чем Pulsar.


Неправда.


Это один из главных аргументов сообщества Pulsar. Как я уже говорил, это всегда зависит от выбранного бенчмарка. Например, я видел тесты с эквивалентными вычислительными ресурсами, где на высокой пропускной способности у Kafka результат был гораздо лучше, чем у Pulsar. Вот один из таких бенчмарков:



В большинстве случаев масштабируемость не проблема. Kafka можно легко разогнать до нескольких гигабайтов в секунду, как в демонстрации Масштабирование Apache Kafka до 10+ ГБ в секунду в Confluent Cloud:



Честно говоря, этот вопрос должен волновать менее 1% пользователей. Если у вас требования, как у Netflix (петабайты в день) или LinkedIn (триллионы сообщений), есть смысл обсуждать самую подходящую архитектуру, железо и конфигурацию. Остальным можно не беспокоиться.


Связанный миф: сейчас в Kafka может храниться всего 500к партиций на кластер.


Правда.


Пока у Kafka не лучшая архитектура для масштабных деплоев с сотнями тысяч топиков и партиций.


Но! Pulsar тоже не резиновый. Просто у него другие лимиты.


Ограничения в Kafka связаны с Zookeeper. Когда Zookeeper удалят через KIP-500, верхняя граница уйдет вместе с ним.


Примечание: успех зависит от подходящего устройства архитектуры!

Если у заказчика возникли проблемы с количеством партиций и масштабируемостью в Kafka, скорее всего, он ошибся в архитектуре и приложениях (Pulsar бы тут точно не помог).


Kafka это платформа стриминга, а не очередной IBM MQ. Если вы попробуете воссоздать любимое решение и архитектуру MQ с Kafka, у вас вряд ли что-то получится. У меня было несколько заказчиков, которые потерпели крах, но смогли привести все в порядок с нашей помощью.


Скорее всего, у вас не возникнет проблем с количеством партиций и масштабируемостью, даже пока Kafka еще использует ZooKeeper, если вы правильно все спроектируете и разберетесь в базовых принципах Kafka. Такие проблемы часто возникают с любой технологией, вроде Kafka, которая вводит совершенно новый уровень и парадигму (например, при первом появлении облаков немало компаний набили себе шишки при попытке их освоить).


Связанный миф: Pulsar поддерживает практически неограниченное число партиций.


Неправда.


У BookKeeper существуют те же ограничения по одному файлу на ledger, что и у Kafka, но в одной партиции таких ledger несколько. Брокеры Pulsar объединяют партиции в группы, но на уровне хранения, в Bookkeeper, данные хранятся в сегментах, и на каждую партицию приходится много сегментов.


В Kafka метаданные для этих сегментов хранятся в Zookeeper, откуда и возникают эти ограничения по числу. Когда Kafka избавится от этой зависимости, границы возможного раздвинутся. С нетерпением жду реализации KIP-500. Подробности читайте в статье Apache Kafka справится сама: удаление зависимости от Apache ZooKeeper.


Связанный миф: потребности масштабирования в Kafka необходимо определять при создании топика.


Отчасти это правда.


Если требуется очень большой масштаб, в топиках Kafka можно сразу создать больше партиций, чем обычно требуется для этой задачи. (См. Потоки и таблицы в Apache Kafka: топики, партиции и хранение.) Или можно добавить партиции позже. Это не идеальное решение, но так уж устроены распределенные системы стриминга (которые, кстати, масштабируются лучше, чем традиционные системы обработки сообщений, вроде IBM MQ).


Оптимальные методы создания топиков и изменения их конфигурации уже тщательно продуманы за вас, так что не волнуйтесь.


Но! У топиков Pulsar есть то же ограничение!


Пропускная способность для записи зависит от числа партиций, назначенных топику Pulsar, точно так же, как это происходит в топике Kafka, поэтому там тоже придется создавать партиции с запасом. Дело в том, что в каждой партиции записывать можно только в один ledger за раз (а их может быть несколько). Увеличение числа партиций динамически влияет на порядок сообщений, как и в Kafka (порядок нарушается).


У Kafka и Pulsar с масштабированием все отлично. Этого будет более чем достаточно почти в любом сценарии.


Если вам нужен совсем заоблачный масштаб, лучше взять реализацию без ZooKeeper. Так что KIP-500 это самое ожидаемое изменение в Kafka, судя по сообществу и заказчикам Confluent.


Миф 10. В случае аппаратного сбоя Pulsar восстанавливается моментально, а Kafka приходится перегружать данные.


И да, и нет.


Если брокер Pulsar вырубится, ничего не страшного не будет, это правда, но, в отличие от брокера Kafka, он и не хранит данные, а просто выступает в качестве прокси для уровня хранения, то есть BookKeeper. Так что подобные заявления о Pulsar это просто маркетинговый ход. Почему никто не говорит, что бывает, если полетит нода BookKeeper (bookie)?


Если завершить и перезапустить BookKeeper bookie, данные будут точно так же перераспределяться, как и в Kafka. В этом суть распределенных систем с их репликацией и партициями.


Kafka уже предлагает эластичность.


Это важно. Основатель Confluent, Джей Крепс (Jay Kreps) недавно писал об этом: Эластичные кластеры Apache Kafka в Confluent Cloud. В облачном сервисе SaaS, вроде Confluent Cloud, конечный пользователь не думает об аппаратных сбоях. Он ожидает непрерывный аптайм и SLA на уровне 99,xx. С оплатой за потребление пользователь не заботится об управлении брокерами, изменении размеров нод, увеличении или уменьшении кластеров и подобных деталях.


Самоуправляемым кластерам Kafka нужны такие же возможности. Tiered Storage for Kafka это огромное хранилище, с которым данные не хранятся на брокере и восстановление после сбоев происходит почти моментально. Если добавить сюда такие инструменты, как Self-Balancing Kafka (эта фича от Confluent описана в статье по ссылке выше), можно вообще забыть об эластичности в самоуправляемых кластерах.


К сожалению, в Pulsar вы ничего подобного не найдете.


Миф 11. Pulsar справляется с репликацией между кластерами лучше, чем Kafka.


Неправда.


Любая распределенная система должна решать такие проблемы, как CAP-теорема и кворум в распределенных вычислениях. Кворум это минимальное число голосов, которое распределенная транзакция должна получить, чтобы выполнить операцию в распределенной системе. Такой подход позволяет обеспечить согласованность операций.


Задачи по кворуму Kafka поручает ZooKeeper. Даже после реализации KIP-500 и удаления ZooKeeper законы физики не перестанут действовать: проблемы задержки в распределенных системах существуют в таких регионах, как Восточная, Центральная и Западная часть США и даже по всему миру. Скорость света, конечно, впечатляет, но все же имеет ограничения.


Эту проблему можно обойти разными способами, включая использование инструментов репликации в реальном времени, например MirrorMaker 2 в Apache Kafka, Confluent Replicator или Confluent Multi-Region-Clusters. Эти варианты и советы см. в статье Архитектурные шаблоны для распределенных, гибридных, периферийных и глобальных деплоев Apache Kafka.



Вы не найдете универсальное решение, которое обеспечит глобальную репликацию + нулевой простой + нулевую потерю данных. Для самых критически важных приложений Confluent Multi-Region-Clusters предлагает RTO=0 и RPO=0 (нулевой простой и нулевую потерю данных) с автоматическим аварийным восстановлением и отработкой отказа, даже если упадет целый датацентр или облачный регион.


Для таких случаев архитектура Pulsar будет еще сложнее, чем ее базовая версия. Просто для георепликации Pulsar требует дополнительный глобальный кластер Zookeeper, а для больших расстояний это не годится. Обходной путь есть, но от CAP-теоремы и законов физики не уйдешь.


Что с Kafka, что с Pulsar вам нужна проверенная архитектура для борьбы с физикой в глобальных деплоях.


Миф 12. Pulsar совместим с интерфейсом и API Kafka.


Отчасти.


Pulsar предлагает простейшую реализацию с зачаточной совместимостью с протоколом Kafka v2.0.


У Pulsar есть конвертер для базовых элементов протокола Kafka.


В теории совместимость с Kafka выглядит убедительно, но вряд ли это серьезный аргумент для переноса действующей инфраструктуры Kafka в Pulsar. Зачем такой риск?


Мы слышали заявления о совместимости с Kafka и в других примерах, например для гораздо более зрелого сервиса Azure Event Hubs. Почитайте об ограничивающих факторах их Kafka API. Вы удивитесь. Никакой поддержки основных функций Kafka, вроде транзакций (и семантики exactly-once), сжатия или compaction для логов.


Раз, по сути, это не Kafka, существующие приложения Kafka будут вести себя непредсказуемо в таких вот совместимых системах, будь то Azure Event Hubs, Pulsar или другая оболочка.


Kafka vs. Pulsar комплексное сравнение


Выше мы говорили о мифах, которые гуляют по разным статьям и постам. С ними, вроде, все понятно.


Но я обещал, что мы поговорим не только о технических деталях. При выборе технологии важны и другие аспекты.


Рассмотрим три из них: доля на рынке, корпоративная поддержка и облачные предложения.


Доля на рынке у Apache Kafka и Apache Pulsar


Статистика в Google Trends за последние пять лет совпадает с моими личными наблюдениями интерес к Apache Kafka гораздо выше, чем к Apache Pulsar:



Примерно так же выглядит картинка сравнения на Stack Overflow и аналогичных платформах, по числу и размеру поддерживающих вендоров, открытой экосистеме (интеграция инструментов, обертки, вроде Spring Kafka) и подобным характеристикам.


Открытые вакансии еще один индикатор распространения технологии. Для Pulsar их мало, то есть его использует не так много компаний. Убедитесь сами поищите на любом сайте. Если искать по всему миру, для Pulsar вакансий меньше сотни, а для Kafka тысячи. Кроме того, в большинстве вакансий для Pulsar указано, что требуется опыт с Kafka, Pulsar, Kinesis и аналогичными технологиями.


В большинстве случаев такие характеристики важнее для успешного проекта, чем технические детали. В конце концов ваша цель решить определенную бизнес-задачу, правда же?


Раз Pulsar так мало распространен, почему о нем вообще говорят? Во-первых, потому, что независимые консультанты, аналитики и блоггеры (включая меня) должны рассказывать о новых современных технологиях, чтобы привлечь аудиторию. Если честно, люди любят об этом читать.


Корпоративная поддержка для Kafka и Pulsar


Корпоративная поддержка для Kafka и Pulsar существует.


Но все не совсем так, как можно ожидать. Вот несколько вендоров, к которым вы можете обратиться, если решили работать с Pulsar:


  • Streamlio (теперь принадлежит Splunk) компания, которая раньше стояла за Apache Pulsar. Splunk пока не объявили о своей будущей стратегии для поддержки пользователей, которые работают над проектами на основе Pulsar. Splunk славится своими популярными аналитическими платформами. Это их основной бизнес (1,8 млрд долларов в 2019 году). Единственное, на что жалуются пользователи, ценник. Splunk активно используют Kafka, а сейчас интегрируют Pulsar в Splunk Data Stream Processor (DSP). Сильно сомневаюсь, что Splunk вдруг увлечется опенсорсом, чтобы поддержать ваш драгоценный проект на базе Pulsar (возможно, чего-то можно ожидать от DSP). Будущее покажет.
  • StreamNative основана одним из изначальных разработчиков Apache Pulsar и поддерживает платформу стриминга на основе Pulsar. На июнь 2020 года у StreamNative было 13 (целых 13, да!) сотрудников в LinkedIn. Уж не знаю, хватит этого для поддержки вашего критически важного проекта или нет.
  • TIBCO объявили о поддержке Pulsar в декабре 2019 года. За последние годы стратегия у них сместилась с интеграции на аналитику. Пользователи их промежуточного ПО уходят от TIBCO толпами. В отчаянии они приняли стратегическое решение: поддерживать другие платформы, даже если у них нет никакого опыта и никакой связи с ними. Да, звучит как миф, но TIBCO делают то же самое для Kafka. Любопытный факт: TIBCO предлагает Kafka и ZooKeeper в Windows! Такого больше никто не делает. Все знают, это приводит к нестабильности и несогласованности. Но если что, TIBCO помогут вам с Kafka и Pulsar. Зачем вообще сравнивать эти платформы, если можно получить обе у одного вендора, причем даже на Windows, с .exe и bat-скриптами для запуска серверных компонентов:


Вендоров для Kafka больше с каждым днем.


Это очень распространенная на рынке платформа. Лучшее подтверждение поддержка от крупнейших вендоров. IBM, Oracle, Amazon, Microsoft и многие другие поддерживают Kafka и встраивают возможности интеграции с ней в свои продукты.


Окончательное подтверждение этому я получил на конференции Oracle OpenWorld 2019 в Сан-Франциско, где выступал менеджер по GoldenGate (великолепный и очень дорогой инструмент CDC от Oracle). В основном он рассказывал, как GoldenGate станет платформой интеграции данных вообще для всего. Половина выступления была посвящена стримингу, платформе Kafka и тому, что GoldenGate будет обеспечивать интеграцию с разными базами/озерами данных и Kafka в обоих направлениях.


Полностью управляемые облачные сервисы для Kafka и Pulsar


Какие облачные решения доступны для Kafka и Pulsar?


Для Apache Pulsar есть облачный сервис с очень говорящим названием:


Kafkaesque.


Я серьезно. Сами посмотрите [UPD: 17 июня они переименовали KAFKAESQUE в KESQUE. Видимо, поняли, всю комичность ситуации.]


Для Apache Kafka существуют разные предложения на выбор:


  • Confluent Cloud (SaaS) полностью управляемый сервис с оплатой по мере использования, круглосуточной доступностью, эластичностью и бессерверными функциями для Apache Kafka и связанной экосистемы (Schema Registry, коннекторы Kafka Connect и ksqlDB для обработки потоков).
  • Amazon MSK (PaaS) подготавливает ZooKeeper и брокеры Kafka, а конечные пользователи сами обслуживают их, исправляют баги, устанавливают обновления и т. д. Важный факт: AWS не включает проблемы с Kafka в поддержку и SLA 99,95.
  • Azure Event Hubs (SaaS) предоставляет конечную точку Kafka (с проприетарной реализацией) для взаимодействия с приложениями Kafka. Это легко масштабируемое решение с высокой производительностью. Это не вполне Kafka, просто эмуляция, и тут не хватает некоторых важных функций, вроде семантики exactly-once, compaction логов и сжатия. Не говоря уже о дополнительных возможностях, вроде Kafka Connect и Kafka Streams
  • Big Blue (IBM) и Big Red (Oracle) предлагают облачные решения для Kafka и соответствующих API. Не знаю, использует их кто-нибудь или нет. Понятия не имею, насколько они хороши, никогда не видел их в деле.
  • Много компаний поменьше, вроде Aiven, CloudKarafka, Instaclustr и других.

В общем, про распространенность Kafka и Pulsar на рынке все очевидно.


И все же Apache Kafka или Apache Pulsar?


В двух словах: Pulsar пока здорово отстает от Kafka по зрелости в плане надежности для больших масштабов и обширности сообщества.


И вообще не факт, что Pulsar лучше.


Есть смысл сравнивать их, если вы решили использовать только опенсорс-инструменты. Узнайте, что подойдет лучше именно вам. Не забудьте учесть технические характеристики, уровень зрелости, вендоров, сообщество разработчиков и другие важные факторы. Что вписывается в вашу ситуацию?


Если в Kafka и Pulsar есть не все нужные вам функции, выбирайте Kafka: для нее есть много вендоров и облачных решений. Pulsar, к сожалению, сильно отстает и вряд ли догонит в ближайшем будущем.

Подробнее..

Обогащение данных что это и почему без него никак

15.04.2021 06:04:33 | Автор: admin

Задача обогащения данных напрямую связана с темой их обработки и анализа. Обогащение нужно для того, чтобы конечные потребители данных получали качественную и полную информацию.

Сам термин "обогащение данных" это перевод англоязычного Data enrichment, который проводит аналогию между данными и... ураном. Точно так же, как промышленники насыщают урановую руду, увеличивая долю изотопа 235U, чтобы её можно было использовать (хочется надеяться, в мирных целях), в процессе обогащения данных мы насыщаем их информацией.

Обогащение данных это процесс дополнения сырых данных той информацией, которая в исходном виде в них отсутствует, но необходима для качественного анализа.

Конечными потребителями такой информации чаще всего являются аналитики, но могут быть и нейронные сети, пользователи приложения или предоставляемого API.

Заметим, что обогащение данных термин широкий, и получать данные из внешних источников можно весьма разнообразными способами. Например, представим бизнес-процесс регистрации нового клиента. Если в данных этого клиента отсутствует e-mail, то взаимодействие с внешним источником в этом случае может быть буквально следующим: взяли телефон, позвонили клиенту, узнали его e-mail. Получается, этот процесс может включать в себя такие совершенно не автоматизированные операции, как обычный телефонный разговор со всеми этими эс как доллар, "а" как русская. Это тоже можно считать обогащением данных, но данный пласт проблем в этой статье мы затрагивать не будем. Нас интересуют вполне конкретные случаи, когда данные хранятся в базе данных и именно БД служит внешним источником данных для обогащения.

Источниками сырых исходных данных могут быть:

  • Система кассовых платежей, которая отслеживает транзакции и отправляет в хранилище данных: время, id торговой точки, количество и стоимость купленного товара и т.д.

  • Логистическая система, которая отслеживает движение товара: id транспорта и его водителя, gps-координаты в заданные моменты времени, статус, маршрут и т.д.

  • Телеметрия с датчиков интернета вещей.

  • Система мониторинга инфраструктуры.

Все эти примеры объединяет то, что в них регулярно передаются относительно небольшие пакеты данных, каждый из которых имеет идентификатор. То есть каждый источник знает свой идентификатор и передает его вместе с пакетом данных. По этому идентификатору в хранилище данных можно получить справочную информацию об источнике.

Таким образом, типовую задачу можно сформулировать следующим образом: на вход поступают данные вида <таймстамп, идентификатор источника, содержимое пакета>, а конечному потребителю требуется соединить (в смысле join) справочную информацию об источнике из хранилища данных с идентификатором источника из сырых данных.

Как обогащаем данные

Один из вариантов решения задачи сопоставления данных по расписанию запускать скрипты, которые подготовят подходящие для аналитиков представления. Например, это можно делать с помощью Apache Airflow. Но в любом случае потребуется ждать и сохранять данные из внешних источников.

Другой вариант использовать инструменты потоковой обработки данных. В этом случае нужно определиться, где же всё-таки хранить справочную информацию и что будет являться Single Source of Truth (SSOT), или единым источником истины для справочных данных. Если хранить справочные данные в хранилище, то к нему придется каждый раз обращаться, и это может быть накладным, так как к сетевым издержкам добавится ещё и обращение к диску. Вероятно, оптимальнее хранить справочную информацию в оперативной памяти или другом горячем хранилище, например, в Tarantool.

Мы, очевидно, отдаём предпочтению именно последнему варианту, и наша схема приобретает завершенный вид.

Она отличается от первой иллюстрации процесса обогащения данных тем, что появилась конкретика: процесс обогащения происходит на лету с помощью потокового обработчика, справочные данные хранятся в горячем хранилище (оперативной памяти).

Давайте посмотрим, какие плюсы и минусы мы получим, если возьмем за основу данную схему.

Преимущества потокового обогащения данных:

  • В хранилище всегда актуальные данные, которые готовы к использованию. В нашем динамичном мире зачастую необходимо иметь доступ к реалтайм-данным и сразу их использовать, не тратя время на дополнительную подготовку, распаковку и т.д. Потоковый обработчик, можно сказать, приносит их нам на блюдечке.

  • Схема хорошо масштабируется, если подобрать соответствующие технологии. С правильными инструментами можно масштабироваться под требуемую пропускную способность платформы и под объемы справочной информации.

Недостатки:

  • Источник истины для справочной информации вынесен из надёжного дискового хранилища данных в менее надёжную оперативную память. При сбое это грозит потерей всех несинхронизированных данных.

  • Требуется гораздо больше памяти как оперативной, так и постоянной. В целом, для любой системы аналитики желательно иметь побольше дискового пространства и оперативной памяти, но здесь это становится обязательным условием.

Какие технологии используем

Что касается технологий, то мы выбираем более-менее устоявшиеся на рынке решения с открытым исходным кодом и хорошей документацией, учитывая совместимость.

Выбранный стек технологий:

  • Apache Kafka источник данных и брокер очередей;

  • Apache Spark потоковый обработчик данных;

  • Apache Ignite горячее хранение справочной информации;

  • Greenplum и Apache Hadoop хранилище данных.

В выборе Greenplum мы немного поступились совместимостью. Связать его со Spark не совсем тривиальная задача, для этого не существует стандартного open source коннектора (подробнее рассказывали в этой статье). Поэтому мы разрабатываем такой коннектор сами.

В остальном набор достаточно стандартный. Hadoop держим на всякий случай, например, чтобы использовать тот же Hive поверх HDFS вместо Greenplum.

Каждое из этих решений масштабируемо. Поэтому мы можем построить любую архитектуру, в зависимости от объемов справочной информации и ожидаемой пропускной способности.

Итак, вот окончательные очертания нашей схемы обогащения данных.

Версии, которые используем:

  • Apache Spark 2.4.6.

  • Apache Ignite 2.8.1.

  • Apache Kafka 2.4.1.

  • Greenplum 6.9.0.

  • Apache Hadoop 2.10.1.

Обращаю на это внимание, потому что от версий выбранных компонент зависит совместимость и возможность этих компонент взаимодействовать. Например, если нужно, чтобы Apache Spark писал данные в базу данных Greenplum, это может стать отдельной проблемой, и версии также будут важны. Наш коннектор на текущий момент работает именно со Spark v2, для Spark v3 его нужно отдельно дорабатывать.

Что в результате

Если объединять данные, применяя предложенную схему обогащения, в дальнейшем аналитикам не потребуется каждый раз делать JOIN-запрос, что сэкономит как ценное время людей, так и машинные ресурсы.

Но есть и ограничения, связанные с тем, что source of truth, по сути, находится в оперативной памяти. Поэтому при редактировании справочной информации надо напрямую работать с Ignite через интерфейсы самого Ignite. Кроме этого, нужен аккуратный механизм синхронизации, чтобы кэш Ignite был персистентным. У Ignite есть собственный механизм для записи на диск, но все же Ignite больше ориентирован на работу в ОЗУ, поэтому для резервирования справочной информации в хранилище данных лучше использовать что-нибудь специально для этого предназначенное, например, Airflow.

Полезные ссылки, чтобы понять, как строить взаимодействие между Spark и Ignite и насколько это уже проработанный механизм:

Пользуясь случаем: мы расширяем отдел систем обработки данных. Если вам интересно заниматься с подобного рода задачами, пишите мне в личку, в телеграм @its_ihoziainov или на job@itsumma.ru с пометкой data engineering.

Если вы уже сталкивались с такого рода задачами, делитесь мнением в комментариях, задавайте вопросы они, как известно, очень помогают находить лучшие решения. Спасибо.

Подробнее..

Перевод Apache Kafka скоро без ZooKeeper

16.04.2021 08:17:32 | Автор: admin

image


В основе Apache Kafka находится лог простая структура данных, которая использует последовательные операции, работающие в симбиозе с оборудованием. Эффективное использование дискового буфера и кэша процессора, prefetch, передача данных zero-copy и много других радостей все это благодаря построенной на логе структуре, которая славится своей эффективностью и пропускной способностью. Обычно эти преимущества, а еще базовая реализация в виде лога коммитов, первое, что люди узнают о Kafka.


Код самого лога составляет относительно малую часть всей системы. Гораздо больше занимает код, который отвечает за организацию партиций (т. е. логов) на множестве брокеров в кластере назначает лидеров, обрабатывает сбои и т. д. Этот код и делает Kafka надежной распределенной системой.


Раньше важной частью работы распределенного кода был Apache ZooKeeper. Он хранил самые важные метаданные системы: где находятся партиции, кто из реплик лидер и т. д. Поначалу эффективный и проверенный ZooKeeper действительно был нужен, но, по сути, это что-то вроде специфической файловой системы или API триггеров поверх согласованного лога. А Kafka это API pub/sub (издатель-подписчик) поверх согласованного лога. В результате пользователи настраивают, конфигурируют, мониторят, защищают и обдумывают взаимодействие и производительность между двумя реализациями лога, двумя сетевыми уровнями и двумя реализациями системы безопасности каждая со своими инструментами и хуками мониторинга. Все стало неоправданно сложно, поэтому решено было заменить ZooKeeper сервисом кворума прямо в самой Kafka.


Работы предстояло немало, поэтому в прошлом апреле мы запустили проект с участием сообщества, чтобы ускорить темпы и создать рабочую систему к концу года.


image


Мы с Джейсоном, Колином и командой KIP-500 прогнали полный жизненный цикл сервера Kafka с созданием и потреблением сообщений и все без Zookeeper. Какая же красота!
Бен Стопфорд (Ben Stopford)


Рады сообщить, что ранний доступ к KIP-500 уже закоммичен в ствол и будет включен в предстоящий релиз 2.8. Впервые Kafka можно использовать без ZooKeeper. Мы называем это режимом Kafka Raft Metadata, сокращенно KRaft (читается крафт).


В этом релизе пока доступны не все фичи. Мы еще не поддерживаем ACL и другие транзакции или функции безопасности. Переназначение партиций и JBOD тоже не поддерживаются в режиме KRaft (надеемся, что они будут доступны в одном из релизов Apache Kafka в этом году). Так что контроллер кворума считаем экспериментальным и для прода пока не используем. В остальном преимуществ масса: деплоймент и обслуживание станут проще, всю Kafka можно запустить как один процесс, а на кластер помещается гораздо больше партиций (цифры см. ниже).


Контроллер кворума: консенсус на основе событий


Если запустить Kafka с новым контроллеров кворума, все задачи по метаданным, которые раньше выполнялись контроллером Kafka и ZooKeeper, ложатся на новый сервис, запущенный прямо в кластере Kafka. Контроллер кворума можно запустить на выделенном оборудовании, если этого требует ваш сценарий.



Внутри происходит много интересного. Контроллеры кворума используют новый протокол KRaft, чтобы точно реплицировать метаданные по всему кворуму. Протокол во многом напоминает ZooKeeper ZAB и Raft, но с важными отличиями например, что логично, он использует архитектуру на основе событий.


Контроллер кворума хранит свой стейт с помощью модели на основе событий, чтобы стейт-машину всегда можно было в точности воссоздать. Чтобы лог события, который хранит стейт (также называется топиком метаданных), не разрастался до бесконечности, периодически он обрезается с созданием снапшотов. Остальные контроллеры в кворуме фолловят активный контроллер и реагируют на события, которые он создает и сохраняет в свой лог. Если одна нода приостановлена во время разделения на партиции, например, она быстро догонит пропущенные события с помощью лога. Так мы существенно сокращаем период простоя и время восстановления системы в худшем сценарии.


image
Консенсус на основе событий


Раз протокол KRaft управляется событиями, в отличие от контроллера ZooKeeper контроллер кворума не загружает стейт из ZooKeeper, когда становится активным. При смене лидера у нового активного контроллера уже есть в памяти все закоммиченные записи метаданных. Более того, для отслеживания метаданных в кластере используется тот же механизм на основе событий, что и в протоколе KRaft. Задача, которую раньше обрабатывали RPC, теперь полагается на события и использует лог для коммуникации. Приятное последствие этих изменений (и изменений, которые были предусмотрены изначальным замыслом) Kafka теперь поддерживает гораздо больше партиций, чем раньше. Обсудим подробнее.


Увеличение масштаба Kafka до миллионов партиций


Максимальное число партиций в кластере Kafka определяется двумя свойствами: лимит партиций для ноды и лимит партиций для кластера. Сейчас на ограничение в кластере во многом влияет управление метаданными. Предыдущие KIP улучшили лимит для ноды, хотя и здесь нет предела совершенству. Масштабируемость в Kafka зависит от добавления нод. Здесь нам важен лимит для кластера, который определяет верхнюю границу масштабирования всей системы.


Новый контроллер кворума может обрабатывать гораздо больше партиций на кластере. Чтобы это проверить, можно провести такие же тесты, как в 2018 году, с помощью которых мы демонстрировали лимиты партиций в Kafka. Эти тесты измеряют время, которое требуется для завершения работы и восстановления. В старом контроллере это операция O (число партиций). Она устанавливает верхнюю границу числа партиций, которые сейчас поддерживает один кластер Kafka.


Предыдущая реализация, как рассказывалось в посте по ссылке, могла достигать 200K партиций. Ограничивающим фактором служило время, которое требовалось для переноса важных метаданных между внешним консенсусом (ZooKeeper) и внутренним управлением лидером (контроллер Kafka). С новым контроллером кворума обе роли выполняет один компонент. Подход на основе событий означает, что контроллер отрабатывает отказ почти моментально. Ниже приводятся цифры нашего тестирования для кластера с 2 млн партиций (в 10 раз больше предыдущего лимита):


image


Контроллер С контроллером ZooKeeper С контроллером кворума
Время контролируемой остановки (2 млн партиций) 135 сек 32 сек
Восстановление после неконтролируемой остановки (2 млн партиций) 503 сек 37 сек

Важно учитывать контролируемую и неконтролируемую остановку. Контролируемые остановки включают типичные рабочие сценарии, вроде rolling restart стандартная процедура развертывания изменений без потери доступности. Восстановление после неконтролируемой остановки важнее, потому что оно задает целевое время восстановления (RTO) системы в случае, скажем, неожиданного сбоя, например, на виртуальной машине или поде, или потери связи с дата-центром. Хотя эти цифры указывают на производительность всей системы, они напрямую определяют хорошо известное узкое место, которое возникает при использовании ZooKeeper.


Измерения для контролируемой и неконтролируемой остановки не сравниваются напрямую, потому что неконтролируемая остановка включает выбор новых лидеров, а контролируемая нет. Это расхождение введено специально, чтобы контролируемая остановка соответствовала изначальным тестам 2018 года.


Упрощение Kafka единый процесс


Kafka часто воспринимается как тяжеловес, и такой репутацией она во многом обязана сложностями с управлением ZooKeeper. Из-за этого для запуска проектов часто выбирают более легкую очередь сообщений, вроде ActiveMQ или RabbitMQ, а Kafka появляется на сцене только при разрастании масштаба.


Это печально, ведь Kafka дает удобную абстракцию на основе лога коммитов, которая одинаково хорошо подходит для маленьких рабочих нагрузок у стартапов и для бешеного трафика у Netflix или Instagram. А если вам нужна стриминговая обработка, без Kafka с абстракцией лога коммитов не обойтись, будь то Kafka Streams, ksqlDB или другой аналогичный фреймворк. Управлять двумя отдельными системами (Kafka и Zookeeper) сложно, поэтому пользователи выбирали между масштабированием и простотой.


Теперь можно получить все и сразу. Благодаря KIP-500 и режиму KRaft мы можем легко начать проект с Kafka или использовать его как альтернативу монолитным брокерам, вроде ActiveMQ или RabbitMQ. Благодаря легкости и единому процессу это подходящий вариант для ограниченных в ресурсах устройств. В облаке все еще проще. Управляемые сервисы, например, Confluent Cloud, все берут на себя. Не важно, собственный это будет кластер или управляемый, мы можем начать с малого, а потом расширять его вместе со своими рабочими нагрузками все в рамках одной инфраструктуры. Давайте посмотрим, как выглядит развертывание с одним процессом.


Испытываем Kafka без ZooKeeper


Новый контроллер кворума пока доступен в экспериментальном режиме, но, скорее всего, будет включен в релиз Apache Kafka 2.8. На что он способен? Самое крутое это, конечно, возможность создавать кластер Kafka с одним процессом, как в демо.


Конечно, если вам нужна очень высокая пропускная способность, и вы хотите добавить репликацию для отказоустойчивости, понадобятся еще процессы. Пока контроллер кворума на базе KRaft доступен только как Early Access, для критических рабочих нагрузок он не годится. В ближайшие месяцы мы будем добавлять недостающие фрагменты, моделировать протокол с помощью TLA+ и внедрять контроллер кворума в Confluent Cloud.


image


Но поэкспериментировать вы можете уже сейчас. См. полный README на GitHub.

Подробнее..

Итоговый проект для видеокурса и подкаст Проблемная Kafka

20.04.2021 08:05:05 | Автор: admin

Гостем подкаста The Art Of Programming стал спикер курса Слёрма по Kafka Александр Миронов, Infrastructure Engineer в Stripe. Тема выпуска Проблемная Kafka. Обсудили вопросы, часто возникающие при работе с Kafka: аудит входных данных, квоты, способы хранения данных, возможный даунтайм в консьюмер-группах и др.


Итоговый проект по Kafka


Релиз видеокурса от Александра Миронова и Анатолия Солдатова состоялся 7 апреля. Кроме заявленных тем мы добавили в курс итоговый проект: студентам нужно будет выполнить практическое задание под звёздочкой с применением знаний, полученных на курсе.


Мы добавили два варианта заданий из области разработки и из области администрирования. В первом варианте понадобится создать приложение для генерации real-time статистики продаж интернет-магазина, а во втором спасти кластер от хаоса.


Подробнее о курсе
Бесплатные материалы курса

Подробнее..

Перевод Как объяснить детям, что такое Apache Kafka за 15 минут с картинками и выдрами

19.06.2021 04:20:24 | Автор: admin


Я учусь иллюстрировать сложные процессы с помощью комиксов. Нашла себе в копилку крутой кейс: как с помощью комиксов про милых выдр можно ребенку объяснить такую сложную штуку как Apache Kafka, и сделать мир немного добрее.

Легко по течению легкое введение в потоковую обработку и Apache Kafka. Группа выдр обнаруживает, что они могут использовать гигантскую реку для общения друг с другом. По мере того, как все больше выдр перемещается в лес, они должны научиться адаптировать свою систему, чтобы справиться с возросшей активностью леса.

Под катом 25 слайдов, объясняющие основы Kafka для детей и гуманитариев. И много милых выдр.



Легко по течению



image

Поначалу, в лесу было тихо, и только две семьи выдр жили среди деревьев и рек.

image

Всякий раз, когда им нужно было поделиться новостями, они просто разговаривали друг с другом напрямую.

Они делились новостями о вечеринка в честь дня рождения, новых гостях и других событиях, происходящих в лесу.

image

Со временем, все больше и больше выдр переселялось в лес.

И точно так же росло количество происходящих в лесу событий.

image

Это было целым вызовом для маленьких выдр. Каждый раз, когда у выдры были новости, чтобы поделиться ими, она должна была найти каждую другую семью и рассказать им лично.

image

Это не только занимало много времени, но было чревато ошибками.

Что если какая-то выдрячья семья была на пикнике, и не могла получить уведомление о предстоящем событии?

image

Ну, тогда выдре, которая хотела оповестить соседей, придется вернуться позже или сдаться.

image

Поскольку остальные выдры общались друг с другом лично, говорили, что они Тесно Связаны.

Быть тесно связанными супер мило, но это делает коммуникацию с другими сложнее. Это называется Проблема Масштабирования.

image
Именно тогда у одной маленькой выдры по имени Никси, появилась идея, которая навсегда изменила лес.

По лесу текла огромная река Кафка, и Никси знала, что выдры могу использовать эту реку для общения.

Вы можете установить Kafka в своем лесу:

# clone the repositorygit clone \https://github.com/round-robin-books/gently-kafka.git# start kafkadocker-compose up


image

Она даже сочинила песню, чтобы объяснить, как это работает:

События свои
В реку ты опусти,
Река их отнести
Сможет выдрам другим.

Что о событиях,
Плывущих в потоке,
Этот путь удивителен,
Мы все тут в шоке.

image

Не отстанешь ли ты? вмешался дельфин
Нельзя оставлять все на милость глубин!

Не будет в доставке такого.
Мы командой реку разделим,
На множество мелких потоков
И вместе достигнем цели.

Давайте посмотрим, как это рабоnает



image

Во первых, выдра наблюдает Событие, что-то, что произошло в определенный момент времени.

К примеру, Сегодня вернулись пчелы это событие.

image

Дальше, выдра создает Запись о событии.
Записи (которые иногда называют сообщения) включают время события (отметка о времени) и дополнительную информацию о событии.

image

Тогда выдры смогут решать в какую часть реки направить это сообщение.

Река делится на потоки, которые называют Топики, которые делают организацию сообщений проще.

image

В итоге, выдры бросали свои сообщения в потоки (топики), чтобы другие выдры могли их найти.

По началу они использовали стеклянные бутылки, но бутылки просто уплывали.

image

Тогда, выдры переключились на стеклянные поплавки. Поскольку поплавки оставались на месте и никуда не уплывали, это было названо Устойчивостью.

Устойчивость важна, потому что это позволяет выдрам читать сообщения когда они захотят, к примеру после их пикника.

2 типа выдр



image

Выдры, которые помещают сообщения в реку, называются Продюсеры.

Продюсеры кидают маленькие партии сообщений в реку, не зная, кто придет читать их.

Незнание помогает Разделять системы, то есть выдр, которые создают события, и которые читают события.

image

Выдры, которые читают события в потоке называются Консьюмеры.

Консьюмеры следят за любым топиком, который их интересует.

Например, выдры Первой помощи следят за топиком с лесными алертами, так что они могут реагировать в чрезвычайных ситуациях (как в ситуации с пчелами).

image

Эта система работает хорошо, но спустя какое-то время река становится слишком загруженной огромным количеством событий. Как же маленьким выдрам все успеть?

Будучи социальными животными, они предпочитают работать вместе. Во первых, они скидывают огромные камни в реку, разделяя каждый топик на небольшое количество потоков, или Разделов.

image

Тогда, один член семьи, так называемый Лидер Группы, назначает подмножество мелких потоков (разделов) каждому члену семьи.

Это подразумевает, что каждая выдра ответственна только за ту часть потока, которую она мониторит.

image

Работая совместно в этом случае, каждая выдрячья семья называется Группой Консьюмеров, которая успевает следить за своей частью потока сообщений, сколько бы их не было всего в реке.

image

Более того, если выдра заболела или у нее есть дела, ее работу можно поручить другой выдре в ее группе.

Поскольку кто-то всегда был рядом, выдры были Высоко доступными.

И поскольку они могли справляться с Незапланированными Ситуациями, они были Отказоустойчивыми.

image

Еще существовала магическая часть леса, Земля Потоковой Обработки, где выдры могли делать реально классные вещи с событиями в реке.

image

Целая книга написана об этом магическом месте: Mastering Kafka Streams and ksqlDB

image

Выдры также построили дороги, которые называются Кафка Коннекторы, чтобы донести сообщения других сообществ в реку Кафка. Все присоединились к веселью.

image

Кафка продолжала помогать многим другим во всем мире. И возвращалась обратно в лес.

Жизнь шла своим чередом многие годы, а выдры жили долго и счастливо.



Я не удержалась, вот вам несколько фактов про выдр:



Выдрята, или щенки выдр, спят у мам-выдр на груди.



Взрослые выдры держат друг друга за лапки во сне, чтобы их не разделило течение.



А одинокие выдры, чтобы не дрейфовать, заворачиваются в водоросли.

Строго говоря, автор использует термин otter, что переводится как выдровые, и включает себя 13 видов животных с распространением по всему миру, кроме Австралии.

И если эти слайды вы действительно использовали этот материал на детях в образовательных целях, то завершить можно речевым упражнением:

В недрах тундры выдры в гетрах тырят в ведра ядра кедров. Выдрав с выдры в тундре гетры, вытру выдрой ядра кедра, вытру гетрой выдре морду ядра в вёдра, выдру в тундру.
Подробнее..

Перевод Масштабирование итеративных алгоритмов в Spark

26.01.2021 14:17:08 | Автор: admin

Итеративные алгоритмы широко применяются в машинном обучении, связанных компонентах, ранжировании страниц и т.д. Эти алгоритмы усложняются итерациями, размеры данных на каждой итерации увеличивается, и сделать их отказоустойчивыми на каждой итерации непросто.

В этой статье я бы подробно остановился на некоторых моментах, которые необходимо учитывать при работе с этими задачами. Мы использовали Spark для реализации нескольких итерационных алгоритмов, таких как построение связанных компонентов, обход больших связанных компонентов и т.д. Ниже приведен мой опыт работы в лабораториях Walmart по построению связанных компонентов для 60 миллиардов узлов клиентской идентификации.

Количество итераций никогда не предопределено, всегда есть условие завершения ?.Количество итераций никогда не предопределено, всегда есть условие завершения ?.

Типы итеративных алгоритмов

Конвергентные данные: Здесь мы видим, что с каждой итерацией количество данных уменьшается, т.е. мы начинаем 1-ю итерацию с огромными наборами данных, а размер данных уменьшается с увеличением количества итераций. Основной задачей будет работа с огромными наборами данных в первых нескольких итерациях, и после того, как набор данных значительно уменьшится, можно будет легко справляться с дальнейшими итерациями до их завершения.

Расходящиеся данные: Количество данных увеличивается при каждой итерации, и иногда они могут появляться быстрее и сделать невозможной дальнейшую работу. Для работы этих алгоритмов необходимы такие ограничения, как ограничения по количеству итераций, начальному размеру данных, вычислительной мощности и т.д.

Аналогичные данные: На каждой итерации у нас были бы более или менее одинаковые данные, и с таким алгоритмом было бы очень легко работать.

Инкрементные данные: На каждой итерации у нас могут появляться новые данные, особенно в ML у нас могут появляться новые обучающие наборы данных с периодическими интервалами.

Препятствия

  1. RDD линии: Одним из распространенных способов сохранения отказоустойчивости системы является хранение копий данных в разных местах, чтобы в случае падения одного узла у нас была копия, которая помогала бы до тех пор, пока узел не восстановится. Но Spark не поддерживает дубликаты данных, а поддерживает линейный график преобразований, выполненных на данных в драйвере. Поэтому такой линейный график был бы полезен, если какой-либо фрагмент данных отсутствует, он может построить его обратно с помощью линейного графика, следовательно, Spark является отказоустойчивым. По мере того, как линейный график становится большим, становится трудно строить данные обратно, так как количество итераций увеличивается.

  2. Память и дисковый ввод/вывод: В Spark RDD являются непреложными, поэтому на каждой итерации мы будем создавать новую копию преобразованных данных (новый RDD), что увеличит использование Памяти и Диска. По мере того, как итерации будут увеличивать использование диска/памяти исполнителями, это может привести к замедлению работы из-за нехватки памяти и ожиданию, пока GC выполнит очистку. В некоторых случаях куча памяти будет недостаточной и может привести к невозможности выполнения задачи.

  3. Размер задачи: В некоторых случаях может быть несколько задач, которые могут не подходить для одного исполнителя, или одна задача занимает гораздо больше времени, чем остальные задачи, что может привести к препятствию.

Советы по преодолению вышеуказанных проблем

  1. Хранение большого линейного графика в памяти, и, в случае сбоя узла, восстановление потерянных наборов данных займет много времени. В таких случаях можно использовать кэширование или запись данных о состоянии в контрольной точке на каждой N итерации. Это сохранит рассчитанный RDD на каждой N итерации (кэширование будет храниться в памяти или на диске исполнителей, запись данных о состоянии в контрольной точке использует HDFS, мы должны принять решение исходя из нашей потребности, так как скорость будет различаться для каждой из них). В случае неудачи RDD вычисляется обратно от последней контрольной точки/кэширования. Вместо использования двух вышеупомянутых методов можно также создать временную таблицу и сохранить вычисленный набор данных, разделенный итерацией. В случае неудачи задания Spark, можно сделать перезапуск с последней N-ой итерации, а преимущество сохранения во временную таблицу состоит в том, что можно избавиться от линейного графика RDD до этой итерации и запустить свежий линейный график с этой точки. По мере того, как линейный график RDD растет в итерационных алгоритмах, нам необходимо строить гибридные решения с использованием кэширования, контрольных точек (см. ссылку [2]) и временных таблиц для различных вариантов использования.

  2. Как и выше, сохранение во временную таблицу и чтение из временной таблицы может помочь избавиться от линейного графика и очистить память и диск предыдущих RDD. Такая запись и чтение замедляет процесс, но это даст огромное преимущество при работе с большими наборами данных. Особенно при конвергировании наборов данных, нам может понадобиться выполнять этот процесс только в течение первых нескольких итераций и использовать кэширование, когда наборы данных становятся маленькими при работе с итерациями. Экономия во временной таблице в качестве контрольной точки выглядит тривиально, но она не просто действует как контрольная точка. Так как мы избавляемся от истории линейных графов, делая это на периодических итерациях, это уменьшит риск сбоя в работе и сократит время на построение ее обратной копии из потерянных данных.

  3. Работа с расходящимися данными сложна, так как размер каждой задачи будет увеличиваться с увеличением количества итераций и займет намного больше времени для каждого исполнителя. Поэтому нам нужен фактор для вычисления количества задач в ( i + 1) итерации по сравнению с i-й итерацией таким образом, чтобы размер задачи остался прежним. Например, скажем, количество задач в i-й итерации 100, и каждая задача обрабатывает около 100 МБ данных. В i+1 итерации размер каждой задачи увеличивается до 150 МБ, и мы можем перетасовать эти 100 задач до 150 задач и оставить 100 МБ на каждую задачу. Таким образом, в расходящихся наборах данных нам необходимо увеличить количество задач, переструктурировав и изменив перетасованные разделы на основе итерации.

  4. В случаях, когда размер spark задачи огромен, попробуйте увеличить память исполнителя в соответствии с размером задачи. А если нужно выполнить соединения на искаженных наборах данных, где 10% задач занимает 90% времени исполнения и 90% задач выполняются за 10% времени, эти задачи предлагается обрабатывать отдельно, выполняя их в виде двух разных запросов. Нужно определить причину больших задач, и можем ли мы разделить их на две группы, т.е. маленькие и большие задачи. В 1-м запросе мы бы обработали 90% задач, т.к. нет никаких препятствий для их обработки, и это заняло бы 10% времени, как и раньше. В другом запросе мы бы обрабатывали большие задачи (10% задач) с помощью всенаправленного соединения, так как количество таких задач меньше, а также избегали бы перетасовки данных.

Пример: Допустим, у нас есть таблица А и таблица Б. Таблица А это данные о населении со столбцами user_id, имя, город, штат. Таблица B это то, что группирует данные со столбцами user_id, group_id. Например, мы пытаемся найти 5 крупнейших городов с наибольшим количеством используемых групп. В этом примере могут быть тупиковые ситуации, как города с большим количеством населения могут быть большой задачей, пользователи с большим количеством групп могут привести к большим задачам. Для решения этих тупиковых ситуаций, объединение между этими таблицами может быть сделано в двух запросах. Мы можем отфильтровать больших пользователей с большим количеством групп (скажем, порог в 1000 групп на пользователя) и относиться к ним как к большим задачам. И выполнять соединения отдельно для больших пользователей, используя всенаправленное объединение, так как количество больших пользователей будет мало по сравнению с общими данными. Аналогичным образом, для остальных пользователей выполняйте тасовку объединения и объединяйте результаты и агрегируйте по городам, чтобы найти 5 лучших городов.


А прямо сейчас в OTUS открыт набор на курс Экосистема Hadoop, Spark, Hive.

Всех желающих приглашаем записаться на бесплатный демо-урок по теме Spark Streaming.

ЗАБРАТЬ СКИДКУ

Подробнее..

Перевод Apache Spark на Kubernetes чем полезен Apache YuniKorn

25.02.2021 10:21:06 | Автор: admin

Сунил Говиндан, Вэйвэй Янг, Вангда Tан, Уилфред Шпигельбург

Предпосылки

Почему для Apache Spark выбирают K8s

Apache Spark унифицирует в рамках одной платформы пакетную обработку в режиме реального времени, потоковую аналитику, машинное обучение и интерактивные запросы. Хотя Apache Spark предоставляет множество возможностей для разнообразных сценариев применения, его использование сопряжено с дополнительной сложностью и высокими затратами на обслуживание и администрирование кластера. Давайте рассмотрим некоторые высокоуровневые требования к базовому оркестратору ресурсов, чтобы расширить возможности Spark как единой платформы:

Контейнерные вычисления Spark для предоставления общих ресурсов разным заданиям машинного обучения и ETL.

Поддержка нескольких версий Spark, версий Python с контролем версий, контейнеры на общих кластерах K8s для более быстрой итерации и стабильной работы в продуктиве.

Единая унифицированная инфраструктура для большинства пакетных рабочих нагрузок и микросервисов.

Контроль доступа к общим кластерам.

Kubernetes как фактический стандарт для развертывания сервисов предлагает более точный и детализированный контроль всех вышеупомянутых аспектов по сравнению с другими оркестраторами ресурсов. Kubernetes предлагает упрощенный способ управления инфраструктурой и приложениями с практическим подходом к изоляции рабочих нагрузок, ограничению использования ресурсов, развертыванию ресурсов по запросу и возможностям автоматического масштабирования по мере необходимости.

Проблемы планирования задач Apache Spark на K8s

В планировщике Kubernetes по умолчанию есть пробелы с точки зрения эффективного развертывания пакетных рабочих нагрузок на том же кластере, где также планируется длительная работа других сервисов. Для пакетных рабочих нагрузок из-за требуемого параллелизма вычислений в основном должно планироваться совместное и гораздо более частое выполнение. Давайте подробно рассмотрим некоторые из этих пробелов.

Отсутствие первоклассной концепции приложения

В зависимости от типа развертываемого контейнера пакетные задания часто необходимо планировать к выполнению последовательно. Например, поды драйверов Spark нужно планировать раньше, чем рабочие поды. Четкая концепция первоклассного приложения может помочь с упорядочиванием или постановкой в очередь при каждом развертывании контейнера. Кроме того, такая концепция помогает администратору визуализировать задания, запланированные для отладки.

Отсутствие эффективных возможностей управления емкостью/квотами

Для управления ресурсами при выполнении рабочих нагрузок Spark в случае использования ресурсов несколькими арендаторами могут применяться квоты Kubernetes на ресурсы пространства имен (неймспейсов). Однако здесь есть несколько проблем:

1. Задания Apache Spark в отношении использования ресурсов являются динамическими по своей природе. А квоты пространства имен фиксируются и проверяются на этапе допуска. Запрос пода отклоняется, если он не соответствует квоте пространства имен. Это требует, чтобы задание Apache Spark вместо того, чтобы ставить запрос на выполнение в очередь внутри самого Kubernetes, реализовывало механизм повтора запросов пода.

2. Квота ресурсов пространства имен плоская, она не поддерживает иерархии управления квотами ресурсов.

Также часто пользователь может сталкиваться с нехваткой ресурсов для выполнения пакетных рабочих нагрузок, поскольку квоты пространства имен Kubernetes нередко не соответствуют плану распределения ресурсов на основе организационной иерархии. Эластичное и иерархическое управление приоритетами заданий в K8s сегодня отсутствует.

Недостаточно справедливое распределение ресурсов между арендаторами

В производственной, рабочей среде часто обнаруживается, что планировщик Kubernetes по умолчанию не может эффективно управлять диверсифицированными рабочими нагрузками и обеспечивать справедливость распределения ресурсов для рабочих нагрузок арендаторов. Вот некоторые из основных причин:

Управление рабочими нагрузками в производственной среде часто выполняется большим количеством пользователей.

В плотной производственной среде, где выполняются разные типы рабочих нагрузок, весьма вероятно, что поды драйверов Spark могут занять все ресурсы в пространстве имен. Такие сценарии создают большую проблему для эффективного совместного использования ресурсов.

Неправомерные или поврежденные задания могут легко захватить ресурсы и повлиять на производственные рабочие нагрузки.

Строгие требования SLA с задержкой планирования

На большинстве загруженных производственных кластеров, предназначенных для пакетных рабочих нагрузок, часто выполняются тысячи заданий с сотнями тысяч тасков ежедневно. Эти рабочие нагрузки требуют большего количества параллельно развертываемых контейнеров, и часто время жизни таких контейнеров весьма короткое (от нескольких секунд до часов). Обычно это влечет потребность в развертывании тысяч подов или контейнеров, ожидающих выполнения, и использование по умолчанию планировщика Kubernetes может привести к дополнительным задержкам, что, в свою очередь, приведет к невыполнению SLA.

Как может помочь Apache YuniKorn

Обзор Apache YuniKorn

YuniKorn - это улучшенный планировщик Kubernetes, как для сервисов, так и для пакетных рабочих нагрузок. В зависимости от сценариев использования и развертывания, YuniKorn может либо полностью заменить стандартный планировщик Kubernetes, либо работать с ним совместно.

YuniKorn является унифицированным кроссплатформенным планировщиком смешанных рабочих нагрузок, состоящих как из stateless пакетных рабочих нагрузок, так и stateful сервисов.

Сравнение YuniKorn и стандартного планировщика Kubernetes

Фича

Планировщик по умолчанию

YUNIKORN

Примечание

Концепция приложения

x

Приложения в YuniKorn приоритетны. YuniKorn планирует приложения с учетом, например, порядка их запуска, приоритета, использования ресурсов и т. д.

Упорядочение заданий

x

YuniKorn поддерживает правила упорядочения заданийFIFO/FAIR/Приоритеты (WIP)

Детализированное управление ресурсами

x

Управление ресурсами кластера с помощью иерархии очередей. Очереди предоставляют гарантированные ресурсы (минимум) и предельные квоты ресурсов (максимум).

Справедливое распределение ресурсов

x

Справедливое распределение ресурсов между приложениями и очередями для их идеального распределения для всех запущенных приложений.

Нативная поддержка нагрузок Big Data

x

Стандартный планировщик ориентирован на долговременные сервисы. YuniKorn разработан для рабочих нагрузок приложений Big Data и изначально поддерживает эффективное выполнение Spark / Flink / Tensorflow и т. д. на базе K8s.

Масштабирование и производительность

x

YuniKorn оптимизирован по производительности, он подходит для высокопроизводительных и крупномасштабных сред.


Как YuniKorn помогает в работе Spark на K8s

YuniKorn имеет богатый набор функций, которые помогают более эффективно запускать Apache Spark на Kubernetes. Подробности можно найти здесь (инструкции по запуску Spark на K8s с YuniKorn).

Ознакомьтесь с более подробной информацией о том, как YuniKorn расширяет возможности Spark на K8s: Cloud-Native Spark Scheduling with YuniKorn Scheduler на Spark & AI саммите 2020.

Давайте рассмотрим некоторые варианты использования и то, как YuniKorn помогает улучшить планирование ресурсов Spark в этих сценариях.

Несколько пользователей одновременно запускают разные рабочие нагрузки (шумные соседи)

По мере того, как все больше пользователей начинают одновременно запускать джобы, становится все сложнее изолировать их и справедливо предоставлять им необходимые ресурсы. Планировщик YuniKorn предоставляет оптимальное решение для управления квотами ресурсов путем использования очередей ресурсов.

В приведенном выше примере структуры очередей в YuniKorn и пространства имен, определенные в Kubernetes, сопоставляются с очередями неймспейсов родительской очереди с помощью политики размещения. Очереди сред тестирования и разработки имеют фиксированные ограничения по ресурсам. Все остальные очереди ограничены только размером кластера. Ресурсы распределяются между очередями с использованием справедливой политики, а задания в очереди в производственной среде планируются в порядке FIFO.

Вот некоторые из основных преимуществ:

Одна очередь YuniKorn может автоматически сопоставляться с одним пространством имен в Kubernetes.

Емкость очереди эластична по своей природе и может обеспечивать диапазон ресурсов от минимального до максимального заданного значения.

Справедливость распределения ресурсов, которая поможет избежать возможной нехватки ресурсов.

YuniKorn предоставляет простой способ управления квотой ресурсов для кластера Kubernetes, он может работать как замена квоты ресурсов пространства имен. Управление квотами ресурсов в YuniKorn позволяет задействовать очереди запросов подов и совместное использование ограниченных ресурсов между заданиями на основе подключаемых политик планирования. Все это может быть выполнено без каких-либо дополнительных манипуляций, таких как повторная отправка запроса к поду Apache Spark.

Настройка кластера для модели распределения ресурсов на основе существующей в организации иерархии

В большой производственной среде несколько пользователей будут одновременно запускать рабочие нагрузки разных типов. Часто эти пользователи вынуждены потреблять ресурсы в зависимости от бюджетных ограничений в иерархии организации. Такая производственная настройка помогает эффективно использовать ресурсы кластера в пределах выделенной квоты ресурсов.

YuniKorn позволяет управлять ресурсами кластера с иерархией очередей. Детальное управление распределением ресурсов в многопользовательской среде становится возможным за счет использования очередей ресурсов с четкой иерархией (например, соответствующей иерархии организации). Очереди YuniKorn можно настраивать статически или динамически управлять ими, а с помощью функции динамического управления очередями пользователи могут настраивать правила для делегирования функций управления очередью.

Улучшенное SLA заданий Spark в мультиарендном кластере

Обычные рабочие нагрузки ETL, выполняемые в мультиарендном (мультитенантном) кластере, требуют более простых средств определения детализированных политик для выполнения заданий в желаемой для организации иерархии очередей. Часто такие политики помогают определять для выполнения заданий более строгие SLA.

YuniKorn дает администраторам возможность размещать задания в очередях на основе более простых политик, таких как FIFO, FAIR и т. д. Политика сортировки приложений StateAware упорядочивает задания в очереди в порядке FIFO и планирует их выполнение одно за другим согласно условиям. Это позволяет избежать обычного состояния гонки при отправке большого количества пакетных заданий, например Spark, в одно пространство имен (или кластер). Обеспечивая определенный порядок заданий, она также улучшает планирование заданий, делая его более предсказуемым.

Использование различных фич K8s для планирования заданий Apache Spark

YuniKorn полностью совместим с основными выпущенными версиями K8s. Пользователи могут прозрачным образом менять планировщик в существующем кластере K8s. YuniKorn полностью поддерживает всю родную семантику K8s, которая может использоваться во время планирования, такую как селектор меток, афинность/анти-афинность пода, толерантность, PV/PVC и т. д. YuniKorn также совместим с командами управления и утилитами, такими как узлы cordon, получение событий через kubectl и т. д.

Apache YuniKorn в CDP

Платформа CDP от Cloudera предлагает аналитическое приложение Cloudera Data Engineering на базе Apache YuniKorn (в инкубации).

Некоторые из основных вариантов использования YuniKorn в Cloudera включают в себя:

Управление квотами ресурсов для виртуальных кластеров CDE.

Расширенные возможности планирования заданий Spark.

Ответственность за планирование как микросервисных, так и пакетных заданий.

Работа в облаке с поддержкой автоматического масштабирования.

Планы на улучшение поддержки рабочих нагрузок Spark

Сообщество YuniKorn активно изучает определенные улучшения основных функций для поддержки выполнения рабочих нагрузок Spark. Например, для более эффективного выполнения важно выделить минимальное количество подов драйверов и рабочих подов. Параллельное планирование (gang scheduling) помогает обеспечить выделение для выполнения задания Spark необходимого количества подов. Такая функция будет очень полезна в шумном окружении при развертывании многопользовательского кластера. Для получения более подробной информации см. YUNIKORN-2 (Jira отслеживает прогресс функций).

Поддержка приоритетов заданий/задач

Упорядочение по приоритетам на уровне заданий помогает администраторам расставлять приоритеты и YuniKorn выделять необходимые ресурсы заданиям с высоким уровнем SLA. Это также дает большую гибкость для эффективного использования ресурсов кластера. Для получения более подробной информации см. YUNIKORN-1 (Jira отслеживает прогресс функции).

Распределенная трассировка

В YUNIKORN-387 для улучшения общего контроля планировщика используется открытая трассировка. С помощью этой функции можно собирать и сохранять критические трассировки в рамках основного цикла планирования для устранения неполадок, профилирования системы и мониторинга.

Резюме

YuniKorn помогает эффективно осуществлять детальное распределение ресурсов для различных рабочих нагрузок Spark в крупномасштабных многопользовательских средах, с одной стороны, и динамически создаваемых облачных средах - с другой.

Таким образом, благодаря YuniKorn платформа Apache Spark может стать для пользователей важной платформой корпоративного уровня, предлагающей надежную основу для различных приложений, от крупномасштабного преобразования данных до задач аналитики и машинного обучения.

Подробнее..

Импорт ЕГРЮЛ ФНС средствами Apache NiFi. Шаг 3 преобразование JSON с помощью JOLT

11.02.2021 18:04:31 | Автор: admin

В одном из проектов возникла необходимость перевести процессы импорта данных сторонних систем на микросервисную архитектуру. В качестве инструмента выбран Apache NiFi. В качестве первого подопытного выбран импорт ЕГРЮЛ ФНС.

В предыдущей статье был описан способ преобразования XML в JSON с использованием AVRO schema.

В данной статье описан способ преобразования JSON с помощью JOLT спецификации.

Используемые процессоры и контроллеры

Деление JSON на части

FlowFile, полученный на предыдущем этапе, содержит JSON с массивом выписок ЕГРЮЛ по разным организациям. Для начала разделим его на части, чтобы каждый FlowFile содержал одну выписку.

Для этого используем процессор SplitJson. Из настроек - требуется указать выражение JsonPath для разделения json на части. В данном случае $.*

Документация по JsonPath здесь

Потренироваться можно здесь

Преобразование JSON

Полученный JSON имеет излишне сложную структуру для того, чтобы в дальнейшем хранить и обрабатывать его. Адрес и ФИО лучше объединить в одну строку, некоторые элементы перенести выше по иерархии.

JSON перед трансформацией
{  "reportDate" : "2020-05-20",  "ogrn" : "1234567890123",  "ogrnDate" : "2002-12-30",  "inn" : "1234567890",  "kpp" : "123456789",  "opfCode" : "12300",  "opfName" : "Общества с ограниченной ответственностью",  "name" : {    "fullName" : "ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ",    "shortName" : "ООО"  },  "address" : {    "addressRF" : {      "region" : {        "type" : "ОБЛАСТЬ",        "name" : "МОСКОВСКАЯ"      },      "district" : null,      "town" : {        "type" : "ГОРОД",        "name" : "ИСТРА"      },      "settlement" : null,      "street" : {        "type" : "ПЕРЕУЛОК",        "name" : "ВОЛОКОЛАМСКИЙ"      },      "index" : "143500",      "regionCode" : "50",      "kladr" : "500000570000011",      "house" : null,      "building" : null,      "apartment" : null    }  },  "termination" : null,  "capital" : null,  "manageOrg" : null,  "director" : [ {    "fl" : {      "lastName" : "ИВАНОВ",      "firstName" : "ИВАН",      "patronymic" : "ИВАНОВИЧ",      "inn" : "123456789012"    },    "position" : {      "ogrnip" : null,      "typeCode" : "02",      "typeName" : "Руководитель юридического лица",      "name" : "ГЕНЕРАЛЬНЙ ДИРЕКТОР"    },    "disqualification" : null  } ],  "founders" : {    "founderULRF" : null,    "founderULForeign" : null,    "founderFL" : [ {      "fl" : {        "lastName" : "ИВАНОВ",        "firstName" : "ИВАН",        "patronymic" : "ИВАНОВИЧ",        "inn" : "123456789012"      },      "capitalPart" : {        "nominal" : 20000.0,        "size" : {          "percent" : 50.0,          "decimalPart" : null,          "simplePart" : null        }      }    }, {      "fl" : {        "lastName" : "ПЕТРОВ",        "firstName" : "ПЕТР",        "patronymic" : "ПЕТРОВИЧ",        "inn" : "123456789021"      },      "capitalPart" : {        "nominal" : 20000.0,        "size" : {          "percent" : 50.0,          "decimalPart" : null,          "simplePart" : null        }      }    } ],    "founderGov" : null,    "founderPIF" : null  },  "capitalPart" : null,  "holderReestrAO" : null,  "okved" : {    "mainOkved" : {      "code" : "47.11",      "name" : "Торговля розничная преимущественно пищевыми продуктами, включая напитки, и табачными изделиями в неспециализированных магазинах"    },    "addOkved" : null  }}

Для трансформации JSON используется процессор JoltTransformJSON.

Настройки:

  • Jolt Transformation DSL - тип трансформации. В данном случае Chain - цепочка из нескольких трансформаций

  • Jolt Specification - собственно сама спецификация. Ее разбор ниже

JOLT спецификация

Собственно, сам субъект - ссылка на исходники и документацию.

Потренироваться можно здесь.

Меня интересовали операции сдвига элементов по иерархии - операция shift и преобразование самих данных - операция modify-overwrite-beta. По последней доки как таковой и нет. Исходники операции в Modifier.java, там можно посмотреть список доступных функций. Но на jolt-demo.appspot.com внизу есть примеры для этой операции. Так что методом научного тыка есть возможность прийти к решению.

JOLT спецификация
[{"operation": "modify-overwrite-beta","spec": {"address": {"addressRF": {"region": "=concat(@(type), ' ', @(name))","district": "=concat(@(type), ' ', @(name))","town": "=concat(@(type), ' ', @(name))","settlement": "=concat(@(type), ' ', @(name))","street": "=concat(@(type), ' ', @(name))"}},"director": {"*": {"fl": {"fio": "=concat(@(1,lastName), ' ', @(1,firstName), ' ', @(1,patronymic))"}}},"founders": {"founderFL": {"*": {"fl": {"fio": "=concat(@(1,lastName), ' ', @(1,firstName), ' ', @(1,patronymic))"}}},"founderGov": {"*": {"founderImplFL": {"fl": {"fio": "=concat(@(1,lastName), ' ', @(1,firstName), ' ', @(1,patronymic))"}}}}}}},{"operation": "modify-overwrite-beta","spec": {"address": {"addressRF": {"value": "=concat(@(1,index), ', ', @(1,region), ', ', @(1,district), ', ', @(1,town), ', ', @(1,settlement), ', ', @(1,street), ', ', @(1,house), ', ', @(1,building), ', ', @(1,apartment))","fias": null}}}},{"operation": "shift","spec": {"reportDate|ogrn|ogrnDate|inn|kpp|opfCode|opfName": "&","name": {"*": "&"},"address": {"addressRF": {"kladr|regionCode|value|fias": "&2.&"}},"termination": {"method": {"*": "&2.&"},"*": "&1.&"},"capital": "&","manageOrg": {"egrulData": {"*": "&2.&"}},"director": {"*": {"fl": {"fio|inn": "&3[&2].&"},"position": {"name": "&3[&2].&1","*": "&3[&2].&"},"disqualification": "&2[&1].&"}},"founders": {"founderULRF|founderULForeign": {"*": {"egrulData|foreignReg": {"*": "&4.&3[&2].&"},"*": "&3.&2[&1].&"}},"founderFL": {"*": {"fl": {"fio|inn": "&4.&3[&2].&"},"*": "&3.&2[&1].&"}},"founderGov": {"*": {"govOrg": {"*": "&4.&3[&2].&"},"capitalPart": "&3.&2[&1].&","founderImplUL": {"egrulData": {"*": "&5.&4[&3].&2.&"}},"founderImplFL": {"fl": {"fio|inn": "&5.&4[&3].&2.&"}}}},"founderPIF": {"*": {"PIFName": {"name": "&4.&3[&2].&1"},"manageOrg": {"egrulData": {"*": "&5.&4[&3].&"}},"capitalPart": "&3.&2[&1].&"}}},"capitalPart": "&","holderReestrAO": {"egrulData": {"*": "&2.&"}},"okved": "&"}}]

Операцию modify-overwrite-beta пришлось делать два раза, т.к. по другому собрать адрес в одну строку у меня не получилось.

Как видно, вся спецификация представляет собой массив из трех операций: две - modify-overwrite-beta и одна - shift. Описание каждой операции содержит ее тип - элемент operation и спецификацию - элемент spec.

Преобразование выполняется по цепочке от первого к последнему блоку таким образом, что выходные данные каждого блока служат входными данными для следующего.

Операция modify-overwrite-beta

Спецификация операции содержит левую и правую часть. В левой части требуется указать путь к элементу, который должен быть получен на выходе. В правой части указывается, каким образом должно быть получено содержимое элемента.

Преобразование адреса

Разберем преобразование адреса.

Первый этап (см. первый блок modify-overwrite-beta) - объединить type и name для region, district, town, settlement и street. Для этого прописываем путь к элементам и в правой части для каждого пишем "=concat(@(type), ' ', @(name))" .

"address": {"addressRF": {"region": "=concat(@(type), ' ', @(name))","district": "=concat(@(type), ' ', @(name))","town": "=concat(@(type), ' ', @(name))","settlement": "=concat(@(type), ' ', @(name))","street": "=concat(@(type), ' ', @(name))"}}

Что это означает. Например, "region": "=concat(@(type), ' ', @(name))", означает: на выходе требуется получить элемент region, а в качестве его содержимого требуется получить конкатенацию содержимого элементов type и name. Причем искать эти элементы необходимо непосредственно внутри существующего элемента region, о чем говорит конструкция @(type).

Второй этап (см. второй блок modify-overwrite-beta) - объединить составляющие адреса в одну строку и записать в элемент value.

"address": {"addressRF": {"value": "=concat(@(1,index), ', ', @(1,region), ', ', @(1,district), ', ', @(1,town), ', ', @(1,settlement), ', ', @(1,street), ', ', @(1,house), ', ', @(1,building), ', ', @(1,apartment))","fias": null}}

Здесь примерно то же самое, но теперь используется конструкция вида @(1,index). Она означает, что для поиска элемента index необходимо подняться на один уровень вверх от текущего и искать его там. Т.е. от уровня value необходимо перейти к уровню addressRF, и в пределах addressRF найти элемент index.

Следует обратить внимание, что не должно быть пробелов между = и concat, а также в @(1,index).

Элемент fias был добавлен в надежде в дальнейшем осуществить поиск кода ФИАС по адресу в каком-нибудь стороннем сервисе.

На этом преобразование адреса завершено. Про операцию shift будет ниже.

Объединение ФИО для физических лиц

Объединение ФИО для физических лиц осуществляется аналогичным образом. Здесь обращает на себя внимание только наличие селектора "*" в левой части. Он используется, т.к. содержимое элемента director представляет собой массив, а это отдельный уровень иерархии.

"director": {"*": {"fl": {"fio": "=concat(@(1,lastName), ' ', @(1,firstName), ' ', @(1,patronymic))"}}}

Операция shift

В блок shift на вход поступает следующий JSON.

Промежуточный JSON
{  "reportDate" : "2020-05-20",  "ogrn" : "1234567890123",  "ogrnDate" : "2002-12-30",  "inn" : "1234567890",  "kpp" : "123456789",  "opfCode" : "12300",  "opfName" : "Общества с ограниченной ответственностью",  "name" : {    "fullName" : "ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ",    "shortName" : "ООО"  },  "address" : {    "addressRF" : {      "region" : "ОБЛАСТЬ МОСКОВСКАЯ",      "district" : " ",      "town" : "ГОРОД ИСТРА",      "settlement" : " ",      "street" : "ПЕРЕУЛОК ВОЛОКОЛАМСКИЙ",      "index" : "143500",      "regionCode" : "50",      "kladr" : "500000570000011",      "house" : null,      "building" : null,      "apartment" : null,      "value" : "143500, ОБЛАСТЬ МОСКОВСКАЯ,  , ГОРОД ИСТРА,  , ПЕРЕУЛОК ВОЛОКОЛАМСКИЙ, , , ",      "fias" : null    }  },  "termination" : null,  "capital" : null,  "manageOrg" : null,  "director" : [ {    "fl" : {      "lastName" : "ИВАНОВ",      "firstName" : "ИВАН",      "patronymic" : "ИВАНОВИЧ",      "inn" : "123456789012",      "fio" : "ИВАНОВ ИВАН ИВАНОВИЧ"    },    "position" : {      "ogrnip" : null,      "typeCode" : "02",      "typeName" : "Руководитель юридического лица",      "name" : "ГЕНЕРАЛЬНЙ ДИРЕКТОР"    },    "disqualification" : null  } ],  "founders" : {    "founderULRF" : null,    "founderULForeign" : null,    "founderFL" : [ {      "fl" : {        "lastName" : "ИВАНОВ",        "firstName" : "ИВАН",        "patronymic" : "ИВАНОВИЧ",        "inn" : "123456789012",        "fio" : "ИВАНОВ ИВАН ИВАНОВИЧ"      },      "capitalPart" : {        "nominal" : 20000,        "size" : {          "percent" : 50,          "decimalPart" : null,          "simplePart" : null        }      }    }, {      "fl" : {        "lastName" : "ПЕТРОВ",        "firstName" : "ПЕТР",        "patronymic" : "ПЕТРОВИЧ",        "inn" : "123456789021",        "fio" : "ПЕТРОВ ПЕТР ПЕТРОВИЧ"      },      "capitalPart" : {        "nominal" : 20000,        "size" : {          "percent" : 50,          "decimalPart" : null,          "simplePart" : null        }      }    } ],    "founderGov" : null,    "founderPIF" : null  },  "capitalPart" : null,  "holderReestrAO" : null,  "okved" : {    "mainOkved" : {      "code" : "47.11",      "name" : "Торговля розничная преимущественно пищевыми продуктами, включая напитки, и табачными изделиями в неспециализированных магазинах"    },    "addOkved" : null  }}

Как видно, остались лишние элементы - например, данные адреса, фамилия, имя отчество. Стоит отметить, что если в операции modify-overwrite-beta не указано преобразование для элемента, то он переносится в неизменном виде. В отличие от этого, в операции shift - если преобразование для элемента не указано, то он будет удален.

Описание операции shift
{"operation": "shift","spec": {"reportDate|ogrn|ogrnDate|inn|kpp|opfCode|opfName": "&","name": {"*": "&"},"address": {"addressRF": {"kladr|regionCode|value|fias": "&2.&"}},"termination": {"method": {"*": "&2.&"},"*": "&1.&"},"capital": "&","manageOrg": {"egrulData": {"*": "&2.&"}},"director": {"*": {"fl": {"fio|inn": "&3[&2].&"},"position": {"name": "&3[&2].&1","*": "&3[&2].&"},"disqualification": "&2[&1].&"}},"founders": {"founderULRF|founderULForeign": {"*": {"egrulData|foreignReg": {"*": "&4.&3[&2].&"},"*": "&3.&2[&1].&"}},"founderFL": {"*": {"fl": {"fio|inn": "&4.&3[&2].&"},"*": "&3.&2[&1].&"}},"founderGov": {"*": {"govOrg": {"*": "&4.&3[&2].&"},"capitalPart": "&3.&2[&1].&","founderImplUL": {"egrulData": {"*": "&5.&4[&3].&2.&"}},"founderImplFL": {"fl": {"fio|inn": "&5.&4[&3].&2.&"}}}},"founderPIF": {"*": {"PIFName": {"name": "&4.&3[&2].&1"},"manageOrg": {"egrulData": {"*": "&5.&4[&3].&"}},"capitalPart": "&3.&2[&1].&"}}},"capitalPart": "&","holderReestrAO": {"egrulData": {"*": "&2.&"}},"okved": "&"}}

В операции shift присутствуют левая и правая часть инструкции. Левая часть указывает, где брать данные, а правая указывает путь, куда их разместить. Путь представляет собой цепочку наименований элементов, разделенных точкой. С помощью знака & осуществляется подстановка наименований существующих элементов. Исчисление начинается с того элемента, который указан в левой части, ему соответствует &0. Ноль при этом можно опустить. Выше него по иерархии будет &1, и т.д. К знакам & можно добавлять префиксы и суффиксы - например, pre-&-post. Т.е. если & соответствует элементу name, то на выходе получим pre-name-post. Результирующая цепочка элементов размещается в корне иерархии. Рассмотрим на примерах.

Самое простое - "reportDate|ogrn|ogrnDate|inn|kpp|opfCode|opfName": "&". Будет взят каждый из перечисленных элементов, и они будут размещены в корне. Перечисление осуществляется с помощью |.

Далее переносим fullName и shortName на один уровень вверх с помощью инструкции "name": { "*": "&" }.
"*" означает, что требуется выбрать содержимое всех элементов, вложенных в name.
"&" означает, что их требуется разместить в корне иерархии.

Следующее - перенос данных адреса.

"address": {"addressRF": {"kladr|regionCode|value|fias": "&2.&"}}

Здесь мы указываем нужные элементы. Ненужные не указываем. Инструкция для размещения - "&2.&". Она означает, что требуется составить цепочки из нулевого и второго уровня, минуя первый. &2 соответствует элементу address, а & - элементам из перечисления. &1 соответствует элементу addressRF, он будет удален из иерархии. Т.обр. будут составлены четыре цепочки: address.kladr, address.regionCode, address.value и address.fias. И все они буду размещены в корне результирующего JSON.

Массивы разберем на примере данных о директоре

"director" : [ {    "fl" : {      "lastName" : "ИВАНОВ",      "firstName" : "ИВАН",      "patronymic" : "ИВАНОВИЧ",      "inn" : "123456789012",      "fio" : "ИВАНОВ ИВАН ИВАНОВИЧ"    },    "position" : {      "ogrnip" : null,      "typeCode" : "02",      "typeName" : "Руководитель юридического лица",      "name" : "ГЕНЕРАЛЬНЙ ДИРЕКТОР"    },    "disqualification" : null  } ]

Нужно убрать lastName, firstName и patronymic.
inn и fio перенести на один уровень выше.
ogrnip, typeCode и typeName также перенести на один уровень выше.
Значение name установить в качестве значения position.
disqualification оставить без изменений.

В общем-то алгоритм действий тот же самый, но следуют помнить, что массив - это отдельный уровень иерархии. Когда работаем с массивом, то в цепочке иерархии соответствующий ему & должен быть помещен в квадратные скобки - [&].

"director": {"*": {"fl": {"fio|inn": "&3[&2].&"},"position": {"name": "&3[&2].&1","*": "&3[&2].&"},"disqualification": "&2[&1].&"}}

Например, fio и inn. Для них цепочка &3[&2].&. Точку перед открывающей квадратной скобкой можно опустить. Получаем: &3 - соответствует элементу director, [&2] - соответствует уровню элементов массива, & - сами fio и inn.

Элемент name в position. &3 - соответствует элементу director, [&2] - соответствует уровню элементов массива, &1 - соответствует элементу position. &, соответствующий самому элементу name отсутствует, значит его содержимое будет перенесено в position.

Остальные элементы в position просто переносятся на один уровень вверх. disqualification остается без изменений.

Далее используются аналогичные конструкции.

Пример

Ну и напоследок продублирую исходный JSON, JOLT спецификацию и результирующий JSON

Исходный JSON
{  "reportDate": "2020-05-20",  "ogrn": "1234567890123",  "ogrnDate": "2002-12-30",  "inn": "1234567890",  "kpp": "123456789",  "opfCode": "12300",  "opfName": "Общества с ограниченной ответственностью",  "name": {    "fullName": "ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ",    "shortName": "ООО"  },  "address": {    "addressRF": {      "region": {        "type": "ОБЛАСТЬ",        "name": "МОСКОВСКАЯ"      },      "district": null,      "town": {        "type": "ГОРОД",        "name": "ИСТРА"      },      "settlement": null,      "street": {        "type": "ПЕРЕУЛОК",        "name": "ВОЛОКОЛАМСКИЙ"      },      "index": "143500",      "regionCode": "50",      "kladr": "500000570000011",      "house": null,      "building": null,      "apartment": null    }  },  "termination": null,  "capital": null,  "manageOrg": null,  "director": [    {      "fl": {        "lastName": "ИВАНОВ",        "firstName": "ИВАН",        "patronymic": "ИВАНОВИЧ",        "inn": "123456789012"      },      "position": {        "ogrnip": null,        "typeCode": "02",        "typeName": "Руководитель юридического лица",        "name": "ГЕНЕРАЛЬНЙ ДИРЕКТОР"      },      "disqualification": null    }  ],  "founders": {    "founderULRF": null,    "founderULForeign": null,    "founderFL": [      {        "fl": {          "lastName": "ИВАНОВ",          "firstName": "ИВАН",          "patronymic": "ИВАНОВИЧ",          "inn": "123456789012"        },        "capitalPart": {          "nominal": 20000,          "size": {            "percent": 50,            "decimalPart": null,            "simplePart": null          }        }      },      {        "fl": {          "lastName": "ПЕТРОВ",          "firstName": "ПЕТР",          "patronymic": "ПЕТРОВИЧ",          "inn": "123456789021"        },        "capitalPart": {          "nominal": 20000,          "size": {            "percent": 50,            "decimalPart": null,            "simplePart": null          }        }      }    ],    "founderGov": null,    "founderPIF": null  },  "capitalPart": null,  "holderReestrAO": null,  "okved": {    "mainOkved": {      "code": "47.11",      "name": "Торговля розничная преимущественно пищевыми продуктами, включая напитки, и табачными изделиями в неспециализированных магазинах"    },    "addOkved": null  }}
JOLT спецификация
[{"operation": "modify-overwrite-beta","spec": {"address": {"addressRF": {"region": "=concat(@(type), ' ', @(name))","district": "=concat(@(type), ' ', @(name))","town": "=concat(@(type), ' ', @(name))","settlement": "=concat(@(type), ' ', @(name))","street": "=concat(@(type), ' ', @(name))"}},"director": {"*": {"fl": {"fio": "=concat(@(1,lastName), ' ', @(1,firstName), ' ', @(1,patronymic))"}}},"founders": {"founderFL": {"*": {"fl": {"fio": "=concat(@(1,lastName), ' ', @(1,firstName), ' ', @(1,patronymic))"}}},"founderGov": {"*": {"founderImplFL": {"fl": {"fio": "=concat(@(1,lastName), ' ', @(1,firstName), ' ', @(1,patronymic))"}}}}}}},{"operation": "modify-overwrite-beta","spec": {"address": {"addressRF": {"value": "=concat(@(1,index), ', ', @(1,region), ', ', @(1,district), ', ', @(1,town), ', ', @(1,settlement), ', ', @(1,street), ', ', @(1,house), ', ', @(1,building), ', ', @(1,apartment))","fias": null}}}},{"operation": "shift","spec": {"reportDate|ogrn|ogrnDate|inn|kpp|opfCode|opfName": "&","name": {"*": "&"},"address": {"addressRF": {"kladr|regionCode|value|fias": "&2.&"}},"termination": {"method": {"*": "&2.&"},"*": "&1.&"},"capital": "&","manageOrg": {"egrulData": {"*": "&2.&"}},"director": {"*": {"fl": {"fio|inn": "&3[&2].&"},"position": {"name": "&3[&2].&1","*": "&3[&2].&"},"disqualification": "&2[&1].&"}},"founders": {"founderULRF|founderULForeign": {"*": {"egrulData|foreignReg": {"*": "&4.&3[&2].&"},"*": "&3.&2[&1].&"}},"founderFL": {"*": {"fl": {"fio|inn": "&4.&3[&2].&"},"*": "&3.&2[&1].&"}},"founderGov": {"*": {"govOrg": {"*": "&4.&3[&2].&"},"capitalPart": "&3.&2[&1].&","founderImplUL": {"egrulData": {"*": "&5.&4[&3].&2.&"}},"founderImplFL": {"fl": {"fio|inn": "&5.&4[&3].&2.&"}}}},"founderPIF": {"*": {"PIFName": {"name": "&4.&3[&2].&1"},"manageOrg": {"egrulData": {"*": "&5.&4[&3].&"}},"capitalPart": "&3.&2[&1].&"}}},"capitalPart": "&","holderReestrAO": {"egrulData": {"*": "&2.&"}},"okved": "&"}}]
Результирующий JSON
{  "reportDate" : "2020-05-20",  "ogrn" : "1234567890123",  "ogrnDate" : "2002-12-30",  "inn" : "1234567890",  "kpp" : "123456789",  "opfCode" : "12300",  "opfName" : "Общества с ограниченной ответственностью",  "fullName" : "ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ",  "shortName" : "ООО",  "address" : {    "kladr" : "500000570000011",    "regionCode" : "50",    "value" : "143500, ОБЛАСТЬ МОСКОВСКАЯ,  , ГОРОД ИСТРА,  , ПЕРЕУЛОК ВОЛОКОЛАМСКИЙ, , , ",    "fias" : null  },  "capital" : null,  "director" : [ {    "fio" : "ИВАНОВ ИВАН ИВАНОВИЧ",    "inn" : "123456789012",    "ogrnip" : null,    "typeCode" : "02",    "typeName" : "Руководитель юридического лица",    "position" : "ГЕНЕРАЛЬНЙ ДИРЕКТОР",    "disqualification" : null  } ],  "founders" : {    "founderFL" : [ {      "fio" : "ИВАНОВ ИВАН ИВАНОВИЧ",      "inn" : "123456789012",      "capitalPart" : {        "nominal" : 20000,        "size" : {          "percent" : 50,          "decimalPart" : null,          "simplePart" : null        }      }    }, {      "fio" : "ПЕТРОВ ПЕТР ПЕТРОВИЧ",      "inn" : "123456789021",      "capitalPart" : {        "nominal" : 20000,        "size" : {          "percent" : 50,          "decimalPart" : null,          "simplePart" : null        }      }    } ]  },  "capitalPart" : null,  "okved" : {    "mainOkved" : {      "code" : "47.11",      "name" : "Торговля розничная преимущественно пищевыми продуктами, включая напитки, и табачными изделиями в неспециализированных магазинах"    },    "addOkved" : null  }}

Далее

Далее получившийся JSON следует куда-то разместить для хранения и дальнейшего использования. Но это выходит за рамки повествования. Тут уж кому что удобно.

Подробнее..

Продолжаем знакомство с APIM Gravitee

27.05.2021 20:23:24 | Автор: admin

Всем привет! Меня всё ещё зовут Антон. В предыдущейстатьея провел небольшой обзор APIM Gravitee и в целом систем типа API Management. В этой статье я расскажу,как поднять ознакомительный стенд APIM Gravitee (https://www.gravitee.io), рассмотрим архитектуру системы, содержимое docker compose file, добавим некоторые параметры, запустим APIM Gravitee и сделаем первую API. Статья немного погружает в технические аспекты и может быть полезна администраторам и инженерам, чтобы начать разбираться в системе.

Архитектура

Для ознакомительного стенда будем использовать простейшую архитектуру

Все в докере, в том числе и MongoDB и Elasticsearch. Чтобы сделать полноценную среду для тестирования крайне желательно компоненты MongoDB и Elasticsearch вынести за пределы Docker. Также для простоты манипулирования настройками можно вынести конфигурационные файлы Gateway и Management API: logback.xml и gravitee.yml.

docker-compose.yml

Среду для начальных шагов будем поднимать, используяdocker-compose file, предоставленный разработчиками наgithub. Правда,внесемнесколькокорректив.

docker-compose.yml# Copyright (C) 2015 The Gravitee team (<http://gravitee.io>)## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at##<http://www.apache.org/licenses/LICENSE-2.0>## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#version: '3.5'networks:frontend:name: frontendstorage:name: storagevolumes:data-elasticsearch:data-mongo:services:mongodb:image: mongo:${MONGODB_VERSION:-3.6}container_name: gio_apim_mongodbrestart: alwaysvolumes:- data-mongo:/data/db- ./logs/apim-mongodb:/var/log/mongodbnetworks:- storageelasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION:-7.7.0}container_name: gio_apim_elasticsearchrestart: alwaysvolumes:- data-elasticsearch:/usr/share/elasticsearch/dataenvironment:- http.host=0.0.0.0- transport.host=0.0.0.0- xpack.security.enabled=false- xpack.monitoring.enabled=false- cluster.name=elasticsearch- bootstrap.memory_lock=true- discovery.type=single-node- "ES_JAVA_OPTS=-Xms512m -Xmx512m"ulimits:memlock:soft: -1hard: -1nofile: 65536networks:- storagegateway:image: graviteeio/apim-gateway:${APIM_VERSION:-3}container_name: gio_apim_gatewayrestart: alwaysports:- "8082:8082"depends_on:- mongodb- elasticsearchvolumes:- ./logs/apim-gateway:/opt/graviteeio-gateway/logsenvironment:- gravitee_management_mongodb_uri=mongodb://mongodb:27017/gravitee?serverSelectionTimeoutMS=5000&connectTimeoutMS=5000&socketTimeoutMS=5000- gravitee_ratelimit_mongodb_uri=mongodb://mongodb:27017/gravitee?serverSelectionTimeoutMS=5000&connectTimeoutMS=5000&socketTimeoutMS=5000- gravitee_reporters_elasticsearch_endpoints_0=http://elasticsearch:9200networks:- storage- frontendmanagement_api:image: graviteeio/apim-management-api:${APIM_VERSION:-3}container_name: gio_apim_management_apirestart: alwaysports:- "8083:8083"links:- mongodb- elasticsearchdepends_on:- mongodb- elasticsearchvolumes:- ./logs/apim-management-api:/opt/graviteeio-management-api/logsenvironment:- gravitee_management_mongodb_uri=mongodb://mongodb:27017/gravitee?serverSelectionTimeoutMS=5000&connectTimeoutMS=5000&socketTimeoutMS=5000- gravitee_analytics_elasticsearch_endpoints_0=http://elasticsearch:9200networks:- storage- frontendmanagement_ui:image: graviteeio/apim-management-ui:${APIM_VERSION:-3}container_name: gio_apim_management_uirestart: alwaysports:- "8084:8080"depends_on:- management_apienvironment:- MGMT_API_URL=http://localhost:8083/management/organizations/DEFAULT/environments/DEFAULT/volumes:- ./logs/apim-management-ui:/var/log/nginxnetworks:- frontendportal_ui:image: graviteeio/apim-portal-ui:${APIM_VERSION:-3}container_name: gio_apim_portal_uirestart: alwaysports:- "8085:8080"depends_on:- management_apienvironment:- PORTAL_API_URL=http://localhost:8083/portal/environments/DEFAULTvolumes:- ./logs/apim-portal-ui:/var/log/nginxnetworks:- frontend

Первичные системы, на основе которых строится весь остальной сервис:<o:p>

  1. MongoDB - хранение настроек системы, API, Application, групп, пользователей и журнала аудита.

  2. Elasticsearch(Open Distro for Elasticsearch) - хранение логов, метрик, данных мониторинга.

MongoDB

docker-compose.yml:mongodb

mongodb:image: mongo:${MONGODB_VERSION:-3.6}<o:p>container_name: gio_apim_mongodb<o:p>restart: always<o:p>volumes:<o:p>- data-mongo:/data/db<o:p>- ./logs/apim-mongodb:/var/log/mongodb<o:p>networks:<o:p>- storage<o:p>

С MongoDB всё просто поднимается единственный экземпляр версии 3.6, если не указано иное, с volume для логов самой MongoDB и для данных в MongoDB.<o:p>

Elasticsearch

docker-compose.yml:elasticsearch

elasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION:-7.7.0}<o:p>container_name: gio_apim_elasticsearchrestart: alwaysvolumes:- data-elasticsearch:/usr/share/elasticsearch/dataenvironment:- http.host=0.0.0.0- transport.host=0.0.0.0<o:p>- xpack.security.enabled=false- xpack.monitoring.enabled=false- cluster.name=elasticsearch- bootstrap.memory_lock=true- discovery.type=single-node- "ES_JAVA_OPTS=-Xms512m -Xmx512m"ulimitsmemlock:soft: -1hard: -1nofile: 65536networks:- storage

elasticsearch:

elasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION:-7.7.0}container_name: gio_apim_elasticsearchrestart: alwaysvolumes:- data-elasticsearch:/usr/share/elasticsearch/data<o:p>environment:- http.host=0.0.0.0- transport.host=0.0.0.0- xpack.security.enabled=false- xpack.monitoring.enabled=false- cluster.name=elasticsearch- bootstrap.memory_lock=true- discovery.type=single-node- "ES_JAVA_OPTS=-Xms512m -Xmx512m"ulimits:memlock:soft: -1hard: -1nofile: 65536networks:- storage

С Elasticsearch также всё просто поднимается единственный экземпляр версии 7.7.0, если не указано иное, с volume для данных в Elasticsearch. Сразу стоит убрать строки xpack.security.enabled=false и xpack.monitoring.enabled=false, так как хоть они и указаны как false, Elasticsearch пытается найти XPack и падает. Исправили ли этот баг в новых версиях не понятно, так что просто убираем их, или комментируем. Также стоит обратить внимание на секцию ulimits, так как она требуется для нормальной работы Elasticsearch в docker.<o:p>

Дальше уже поднимаются компоненты сервиса:

  1. Gateway

  2. Management API

  3. Management UI

  4. Portal UI

Gateway/APIM Gateway

docker-compose.yml:gatewaygateway:image: graviteeio/apim-gateway:${APIM_VERSION:-3}container_name: gio_apim_gatewayrestart: alwaysports:- "8082:8082"depends_on:- mongodb- elasticsearchvolumes:- ./logs/apim-gateway:/opt/graviteeio-gateway/logs      environment:    - gravitee_management_mongodb_uri=mongodb://mongodb:27017/gravitee?serverSelectionTimeoutMS=5000&connectTimeoutMS=5000&socketTimeoutMS=5000      - gravitee_ratelimit_mongodb_uri=mongodb://mongodb:27017/gravitee?serverSelectionTimeoutMS=5000&connectTimeoutMS=5000&socketTimeoutMS=5000- gravitee_reporters_elasticsearch_endpoints_0=http://elasticsearch:9200networks:- storage- frontend<o:p>

С Gateway всё несколько сложнее поднимается единственный экземпляр версии 3, если не указано иное. Если мы посмотрим, что лежит наhub.docker.com, то увидим, что у версий 3 и latest совпадают хеши. Дальше мы видим, что запуск данного сервиса, зависит от того, как будут работать сервисы MongoDB иElasticsearch. Самое интересное, что если Gateway запустился и забрал данные по настроенным API из mongodb, то дальше связь с mongodb и elasticsearch не обязательна. Только в логи будут ошибки сыпаться, но сам сервис будет работать и соединения обрабатывать согласно той версии настроек, которую последний раз закачал в себя Gateway. В секции environment можно указать параметры, которые будут использоваться в работе самого Gateway, для переписывания данных из главного файла настроек: gravitee.yml. Как вариант можно поставить теги, тенанты для разграничения пространств, если используется Open Distro for Elasticsearch вместо ванильного Elasticsearch. Например, так мы можем добавить теги, тенанты и включить подсистему вывода данных о работе шлюза.

environment:- gravitee_tags=service-tag #включаемтег: service-tag- gravitee_tenant=service-space #включаемтенант: service-space- gravitee_services_core_http_enabled=true # включаем сервис выдачи данных по работе Gateway  - gravitee_services_core_http_port=18082 # порт сервиса- gravitee_services_core_http_host=0.0.0.0 # адрес сервиса- gravitee_services_core_http_authentication_type=basic # аутентификация либо нет, либо basic - логин + пароль  - gravitee_services_core_http_authentication_type_users_admin=password #логин: admin,пароль: password  Чтобы к подсистеме мониторинга был доступ из вне, надо ещё открыть порт 18082.ports:- "18082:18082"Management API/APIM APIdocker-compose.yml:management_apimanagement_api:image: graviteeio/apim-management-api:${APIM_VERSION:-3}container_name: gio_apim_management_apirestart: alwaysports:- "8083:8083"links:- mongodb- elasticsearchdepends_on:- mongodb- elasticsearchvolumes:- ./logs/apim-management-api:/opt/graviteeio-management-api/logsenvironment:- gravitee_management_mongodb_uri=mongodb://mongodb:27017/gravitee?serverSelectionTimeoutMS=5000&connectTimeoutMS=5000&socketTimeoutMS=5000  - gravitee_analytics_elasticsearch_endpoints_0=http://elasticsearch:9200 networks:    - storage- frontend

ManagementAPI это ядро всей системы и предоставляет службы для управления и настройкиAPI, логи, аналитики и веб-интерфейсовManagementUIиPortalUI. Зависит от MongoDB и Elasticsearch. Также можно через секцию environment указать параметры, которые будут использоваться в работе самого ядра системы. Дополним наши настройки:

environment:- gravitee_email_enable=true # включаем возможность отправлять письма- gravitee_email_host=smtp.domain.example # указываем сервер через который будем отправлять письма- gravitee_email_port=25 # указываем порт для сервера- gravitee_email_username=domain.example/gravitee #логиндлясервера- gravitee_email_password=password #парольдлялогинаотсервера  - gravitee_email_from=noreply@domain.example # указываем от чьего имени будут письма- gravitee_email_subject="[Gravitee.io] %s" #указываемтемуписьма

Management UI/APIM Console

docker-compose.yml:apim_consolemanagement_ui:image: graviteeio/apim-management-ui:${APIM_VERSION:-3}container_name: gio_apim_management_uirestart: alwaysports:- "8084:8080"depends_on:- management_apienvironment:- MGMT_API_URL=http://localhost:8083/management/organizations/DEFAULT/environments/DEFAULT/      volumes:    - ./logs/apim-management-ui:/var/log/nginxnetworks:- frontend

Management UI предоставляет интерфейс для работы администраторам и разработчикам. Все основные функции можно осуществлять и выполняя запросы непосредственно к REST API. По опыту могу сказать, что в переменной MGMT_API_URL вместоlocalhostнадо указать IP адрес или название сервера, где вы это поднимаете, иначе контейнер не найдет Management API.

Portal UI/APIM Portaldocker-compose.yml:apim_portalportal_ui:image: graviteeio/apim-portal-ui:${APIM_VERSION:-3}container_name: gio_apim_portal_uirestart: alwaysports:- "8085:8080"depends_on:- management_apienvironment:- PORTAL_API_URL=http://localhost:8083/portal/environments/DEFAULTvolumes:- ./logs/apim-portal-ui:/var/log/nginxnetworks:- frontend

Portal UI это портал для менеджеров. Предоставляет доступ к логам, метрикам и документации по опубликованным API. По опыту могу сказать, что в переменной PORTAL_API_URL вместоlocalhostнадо указать IP-адрес или название сервера, где вы это поднимаете, иначе контейнер не найдет Management API.<o:p>

Теперь соберем весь файл вместе.

docker-compose.yml

# Copyright (C) 2015 The Gravitee team (<http://gravitee.io>)## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at##         <http://www.apache.org/licenses/LICENSE-2.0>## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#version: '3.5'networks:  frontend:    name: frontend  storage:    name: storagevolumes:  data-elasticsearch:  data-mongo:services:  mongodb:    image: mongo:${MONGODB_VERSION:-3.6}    container_name: gio_apim_mongodb    restart: always    volumes:      - data-mongo:/data/db      - ./logs/apim-mongodb:/var/log/mongodb    networks:      - storage  elasticsearch:    image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION:-7.7.0}    container_name: gio_apim_elasticsearch    restart: always    volumes:      - data-elasticsearch:/usr/share/elasticsearch/data    environment:      - http.host=0.0.0.0      - transport.host=0.0.0.0      - cluster.name=elasticsearch      - bootstrap.memory_lock=true      - discovery.type=single-node      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"    ulimits:      memlock:        soft: -1        hard: -1      nofile: 65536    networks:      - storage  gateway:    image: graviteeio/apim-gateway:${APIM_VERSION:-3}    container_name: gio_apim_gateway    restart: always    ports:      - "8082:8082"      - "18082:18082"    depends_on:      - mongodb      - elasticsearch    volumes:      - ./logs/apim-gateway:/opt/graviteeio-gateway/logs    environment:      - gravitee_management_mongodb_uri=mongodb://mongodb:27017/gravitee?serverSelectionTimeoutMS=5000&connectTimeoutMS=5000&socketTimeoutMS=5000      - gravitee_ratelimit_mongodb_uri=mongodb://mongodb:27017/gravitee?serverSelectionTimeoutMS=5000&connectTimeoutMS=5000&socketTimeoutMS=5000      - gravitee_reporters_elasticsearch_endpoints_0=http://elasticsearch:9200         - gravitee_tags=service-tag # включаем тег: service-tag         - gravitee_tenant=service-space # включаем тенант: service-space         - gravitee_services_core_http_enabled=true # включаем сервис выдачи данных по работе Gateway         - gravitee_services_core_http_port=18082 # порт сервиса         - gravitee_services_core_http_host=0.0.0.0 # адрес сервиса          - gravitee_services_core_http_authentication_type=basic # аутентификация либо нет, либо basic - логин + пароль         - gravitee_services_core_http_authentication_type_users_admin=password # логин: admin, пароль: password    networks:      - storage      - frontend  management_api:    image: graviteeio/apim-management-api:${APIM_VERSION:-3}    container_name: gio_apim_management_api    restart: always    ports:      - "8083:8083"    links:      - mongodb      - elasticsearch    depends_on:      - mongodb      - elasticsearch    volumes:      - ./logs/apim-management-api:/opt/graviteeio-management-api/logs    environment:      - gravitee_management_mongodb_uri=mongodb://mongodb:27017/gravitee?serverSelectionTimeoutMS=5000&connectTimeoutMS=5000&socketTimeoutMS=5000      - gravitee_analytics_elasticsearch_endpoints_0=http://elasticsearch:9200         - gravitee_email_enable=true # включаем возможность отправлять письма         - gravitee_email_host=smtp.domain.example # указываем сервер через который будем отправлять письма         - gravitee_email_port=25 # указываем порт для сервера         - gravitee_email_username=domain.example/gravitee # логин для сервера         - gravitee_email_password=password # пароль для логина от сервера         - gravitee_email_from=noreply@domain.example # указываем от чьего имени будут письма          - gravitee_email_subject="[Gravitee.io] %s" # указываем тему письма    networks:      - storage      - frontend  management_ui:    image: graviteeio/apim-management-ui:${APIM_VERSION:-3}    container_name: gio_apim_management_ui    restart: always    ports:      - "8084:8080"    depends_on:      - management_api    environment:      - MGMT_API_URL=http://localhost:8083/management/organizations/DEFAULT/environments/DEFAULT/    volumes:      - ./logs/apim-management-ui:/var/log/nginx    networks:      - frontend  portal_ui:    image: graviteeio/apim-portal-ui:${APIM_VERSION:-3}    container_name: gio_apim_portal_ui    restart: always    ports:      - "8085:8080"    depends_on:      - management_api    environment:      - PORTAL_API_URL=http://localhost:8083/portal/environments/DEFAULT    volumes:      - ./logs/apim-portal-ui:/var/log/nginx    networks:      - frontend

Запускаем

Итоговый файл закидываем на сервер с примерно следующими характеристиками:

vCPU: 4

RAM: 4 GB

HDD: 50-100 GB

Для работы Elasticsearch, MongoDB и Gravitee Gateway нужно примерно по 0.5 vCPU, больше только лучше. Примерно тоже самое и с оперативной памятью - RAM. Остальные сервисы по остаточному принципу. Для хранения настроек много места не требуется, но учитывайте, что в MongoDB еще хранятся логи аудита системы. На начальном этапе это будет в пределах 100 MB. Остальное место потребуется для хранения логов в Elasticsearch.

docker-compose up -d # если не хотите видеть логиdocker-compose up # если хотите видеть логи и как это все работает

Как только в логах увидите строки:

gio_apim_management_api_dev | 19:57:12.615 [graviteeio-node] INFO  i.g.r.a.s.node.GraviteeApisNode - Gravitee.io - Rest APIs id[5728f320-ba2b-4a39-a8f3-20ba2bda39ac] version[3.5.3] pid[1] build[23#2f1cec123ad1fae2ef96f1242dddc0846592d222] jvm[AdoptOpenJDK/OpenJDK 64-Bit Server VM/11.0.10+9] started in 31512 ms.

Можно переходить по адресу: http://ваш_адрес:8084/.

Нужно учесть, что Elasticsearch может подниматься несколько дольше, поэтому не пугайтесь если увидите такое "приглашение":

Надо просто ещё немного подождать. Если ошибка не ушла, то надо закапываться в логи и смотреть, что там за ошибки. Видим такое приглашение отлично!

Вводим стандартный логин и пароль: admin/admin и мы в системе!

Первичная настройка

Настройки самой системы

Переходим в менюSettings PORTAL Settings

Здесь можно настроить некоторый параметры системы. Например: доступные методы аутентификации наших клиентов: Keyless, API_KEY, Oauth2 или JWT. Подробно их мы рассмотрим скорее всего в третьей статье, а пока оставим как есть. Можно подключить Google Analytics. Время обновления по задачам и нотификациям. Где лежит документация и много ещё чего по мелочи.

Добавление tags и tenant

ПереходимвменюSettings GATEWAY Shardings Tags

Здесь надо добавить теги, по которым у нас будут различаться шлюзы. Нажимаем "+" и добавляем тег и его описание.

Сохраняем. Также здесь можно тег закрепить за какой-либо командой, чтобы только они могли пользоваться теми шлюзами, которые зарегистрированы с указанными тегами.

ПереходимвменюSettings GATEWAY Tenants

То же самое и с настройкой тенантов. Только тут нет кнопки "+", но есть серенькая надпись "New tenant", которую надо нажать для добавления нового тенанта. Естественно, данный тенант должен быть создан в Open Distro for Elasticsearch, и к нему должны быть выданы права.

Добавление пользователей

Переходим вSettings USER MANAGEMENT Users

Здесь можно добавлять пользователей, вот только работать это будет, если у нас настроена рассылка по email. Иначе новым пользователям не придёт рассылка от системы со ссылкой на сброс пароля. Есть ещё один способ добавить пользователя, но это прям хардкод-хардкод!

В файле настроек Management API: gravitee.yml есть такой кусочек настроек:

security:  providers:  # authentication providers    - type: memory      # password encoding/hashing algorithm. One of:      # - bcrypt : passwords are hashed with bcrypt (supports only $2a$ algorithm)      # - none : passwords are not hashed/encrypted      # default value is bcrypt      password-encoding-algo: bcrypt      users:        - user:          username: admin          password: $2a$10$Ihk05VSds5rUSgMdsMVi9OKMIx2yUvMz7y9VP3rJmQeizZLrhLMyq          roles: ORGANIZATION:ADMIN,ENVIRONMENT:ADMIN

Здесьперечисленытипыхранилищдляпользователей: memory, graviteeиldap.Данные из хранилищаmemoryберутся из файла настроек:gravitee.yml. Данные из хранилищаgraviteeхранятся вMongoDB. Для хранения пользовательских паролей, по умолчанию используется тип хеширования BCrypt с алгоритмом $2a$. В представленных настройках мы видим пользователя: admin с хешированным паролем: admin и его роли. Если мы будем добавлять пользователей через UI, то пользователи будут храниться в MongoDB и тип их будет уже gravitee.

Создание групп пользователей

Переходим вSettings USER MANAGEMENT Groups

При нажатии на "+" получаем возможность добавить группу и пользователей в эту группу.

Проверка доступных шлюзов

При переходе в меню Gateways у нас отображаются все шлюзы, которые у нас есть в доступе.

Здесь мы видим настройки шлюза. В частности, Sharding tags и Tenant. Их мы добавили чуть раньше.

Естественно, есть возможность мониторинга состояния шлюза. Данные по мониторингу хранятся в Elasticsearch в отдельном индексе.

Публикация первого API

Для публикации первого API нам сначала потребуется сделать какой-нибудь backend с API.

BackEnd с API, балеринами и Swagger.

Возьмём FastAPI и сделаем простейшее backend с API.

#!/bin/env python3import uvicornfrom fastapi import FastAPIapp = FastAPI()@app.get('/')@app.get('/{name}')def read_root(name: str = None):    """    Hello world    :return: str = Hello world    """    if name:        return {"Hello": name}    return {"Hello": "World"}@app.get("/items/{item_id}")@app.post("/items/{item_id}")@app.put("/items/{item_id}")def gpp_item(item_id: str):    """    Get items    :param item_id: id    :return: dict    """    return {"item_id": item_id}if __name__ == "__main__":    uvicorn.run(app, host="0.0.0.0", port=8000)

Это иAPIможно назвать с трудом, но для примера работы вполне хватит.

Теперь запустим его! Можно даже на том же сервере.

python3 main.py

Теперь, если мы зайдем на этот серверhttp://backend_server:8000/,мы увидим приветствие миру! Если подставим своё имя, типа так:http://backend_server:8000/Anton, то приветствовать уже будут вас! Если же хотите познать всю мощьFastAPI, то сразу заходите на адрес:http://backend_server:8000/docsилиhttp://backend_server:8000/redoc. На этих страницах можно найти все методы, с которым работает данное API и также ссылку на swagger файл. Копируем URL до swagger файла.

В прошлый раз мы создавали наш план вручную. Было несколько утомительно. Сегодня мы сделаем все быстрее, через импорт swagger файла!

На главном экране Gravitee нажимаем большой "+", выбираем "IMPORT FROM LINK", вставляем URL и нажимаем "IMPORT".

Получается как-то так

Нажимаем "IMPORT"!

Почти полностью сформированный API! Теперь немного доработаем напильником...

Для начала нажимаем "START THE API" чтобы API запустить.

Переходим в "Plans" и нажимаем "+".

Создаем тестовый план.

Тип аутентификации выбираем Keyless (public) и нажимаем "NEXT".

Ограничения по количеству запросов и путям пропускаем. Нажимаем "NEXT".

Политики нам тоже пока не нужны. Нажимаем "SAVE".

План создан, но пока находиться в стадии "Staging"

Нажимаем на кнопку публикации плана - синее облачко со стрелочкой вверх! Подтверждаем кнопкой "PUBLISH"

И получаем опубликованный план и какую-то желтую полоску с призывом синхронизировать новую версию API.

Нажимаем "deploy your API" и подтверждаем наше желание "OK"

Переходим вAPIs Proxy Entrypoints

Здесь можно указать точки входа для нашего API и URL пути. У нас указан только путь "/fastapi". Можно переключиться в режим "virtual-hosts" и тогда нам будет доступен вариант с указанием конкретных серверов и IP. Это может быть актуально для серверов с несколькими сетевыми интерфейсами.

ВAPIs Proxy GENERAL CORSможнопроизвестинастройкиCross-origin resource sharing.

ВAPIs Proxy GENERAL Deploymentsнадоуказатьвсеsharding tags,которыебудутиспользоватьсяэтимAPI.

ВAPIs Proxy BACKEND SERVICES Endpointsможно указать дополнительные точки API и настроить параметры работы с ними.

Сейчас нас интересуют настройки конкретной Endpoint, поэтому нажимаем на нижнюю шестеренку.

Исправляем "Target" на http://backend_server:8000/, устанавливаем tenant, сохраняем и деплоим!

ВAPIs Proxy Deploymentsнадо указать те sharding tags, которые могут использовать данное API. После этого необходимо вернуться в созданный план и в списке Sharding tags выбрать тег "service-tag".

ВAPIs Designможно указать политики, которые будут отрабатывать при обработке запросов.

ВAPIs Analytics Overviewможно смотреть статистику по работе данного конкретного API.

ВAPIs Analytics Logsможно настроить логи и потом их смотреть.

ВAPIs Auditможно посмотреть, как изменялись настройки API.

Остальное пока рассматривать не будем, на работу оно не влияет.

После всего, что мы тут сделали, можно пойти проверять, как работает наш шлюз и API.

Переходим наhttp://gravitee_host:8082/fastapi/ , и вам покажется приветствие миру:

Также сразу можно заглянуть вAPIs Analytics Overview/Logsдля просмотра статистики обработки запросов.

Заключение

Итак, поздравляю всех, кто дочитал до сюда и остался в живых! Теперь вы знаете, как поднять ознакомительный стенд APIM Gravitee, как его настроить, создать новое API из swagger файла и проверить, что всё работает. Вариантов настройки шлюзов, точек входа и выхода, сертификатов, балансировок нагрузки и записи логов много. В одной статье всего и не расскажешь. Так что в следующей статье я расскажу о более продвинутых настройках системы APIM Gravitee. В Телеграмме есть канал по данной системе:https://t.me/gravitee_ru, вопросы по нюансам настройки можно задавать туда.

Подробнее..

Cassandra Day Russia 2021 онлайн-конференция 27 марта

18.02.2021 16:06:37 | Автор: admin
image

Что объединяет Apple, Netflix, Huawei и Instragram? Не только миллиарды запросов, петабайты данных и пользователи по всему миру. Все эти компании используют распределённую NoSQL базу данных Apache Cassandra.

Приглашаем на однодневную онлайн-конференцию Cassandra Day Russia 2021 в субботу 27 марта. Опытные NoSQL специалисты расскажут о возможностях одной из самых мощных баз данных современности и поделятся практическим опытом управления СУБД Cassandra.

О мероприятии


Конференция будет состоять из двух параллельных потоков:

  • Воркшопы для тех, кто только начинает или планирует работу с Cassandra;
  • Доклады для опытных специалистов.

Время проведения: 27 марта, 10:0017:00 (UTC+3)

Ведущий Александр Волочнев, Developer Advocate, DataStax, Inc.

Партнеры конференции: DataStax, Слёрм.

Для участников конференции DataStax, Inc. спонсирует сертификацию по Apache Cassandra: вы сможете бесплатно пройти обучение, сдать экзамен и стать Certified Apache Cassandra Developer/Administrator (обычная стоимость экзамена $145). Также будут другие подарки от партнёров.

Участие бесплатное, необходима регистрация.
Подробнее..

Перевод Apache Cassandra 4.0 бенчмарки

20.02.2021 14:06:35 | Автор: admin


Apache Cassandra 4.0 приближается к бете (прим. переводчика: на текущий момент уже доступна бета 4, выпущенная в конце декабря 2020), и это первая версия, которая будет поддерживать JDK 11 и более поздних версий. Пользователей Apache Cassandra, очевидно, волнует задержка, так что мы возлагаем большие надежды на ZGC новый сборщик мусора с низкой задержкой, представленный в JDK 11.


В JDK 14 он был выпущен уже в GA-версии, и нам было очень интересно оценить, насколько он подходит для кластеров Apache Cassandra. Мы хотели сравнить производительность Apache Cassandra 3.11.6 и 4.0 и проверить, подходит ли Shenandoah, сборщик мусора от Red Hat, для продакшена. Спойлер: Cassandra 4.0 значительно лучше по производительности сама по себе, а с новыми сборщиками мусора (ZGC и особенно Shenandoah) будет совсем хорошо.


Методология бенчмарков


Мы провели следующие бенчмарки, используя утилиту tlp-cluster для развертывания и настройки кластеров Apache Cassandra в AWS и tlp-stress для создания нагрузок и сбора метрик. Все эти инструменты доступны в опенсорсе, все бенчмарки легко воспроизвести при наличии учетки AWS.


Кластеры состояли из трех нод на инстансах r3.2xlarge и одной стресс-ноды на c3.2xlarge.


Мы использовали дефолтные параметры Apache Cassandra, кроме настроек кучи и сборщика мусора.


Для развертывания и настройки кластера мы взяли последний релиз tlp-cluster. Недавно мы добавили скрипты, чтобы автоматизировать создание кластера и установку Reaper и Medusa.


Установите и настройте tlp-cluster по инструкциям в документации, чтобы создать такие же кластеры, которые мы использовали для бенчмарков:


# 3.11.6 CMS JDK8build_cluster.sh -n CMS_3-11-6_jdk8 -v 3.11.6 --heap=16 --gc=CMS -s 1 -i r3.2xlarge --jdk=8 --cores=8# 3.11.6 G1 JDK8build_cluster.sh -n G1_3-11-6_jdk8 -v 3.11.6 --heap=31 --gc=G1 -s 1 -i r3.2xlarge --jdk=8 --cores=8# 4.0 CMS JDK11build_cluster.sh -n CMS_4-0_jdk11 -v 4.0~alpha4 --heap=16 --gc=CMS -s 1 -i r3.2xlarge --jdk=11 --cores=8# 4.0 G1 JDK14build_cluster.sh -n G1_4-0_jdk14 -v 4.0~alpha4 --heap=31 --gc=G1 -s 1 -i r3.2xlarge --jdk=14 --cores=8# 4.0 ZGC JDK11build_cluster.sh -n ZGC_4-0_jdk11 -v 4.0~alpha4 --heap=31 --gc=ZGC -s 1 -i r3.2xlarge --jdk=11 --cores=8# 4.0 ZGC JDK14build_cluster.sh -n ZGC_4-0_jdk14 -v 4.0~alpha4 --heap=31 --gc=ZGC -s 1 -i r3.2xlarge --jdk=14 --cores=8# 4.0 Shenandoah JDK11build_cluster.sh -n Shenandoah_4-0_jdk11 -v 4.0~alpha4 --heap=31 --gc=Shenandoah -s 1 -i r3.2xlarge --jdk=11 --cores=8

Примечание. Чтобы воспроизвести схожие условия, мы использовали для всех тестов один набор инстансов EC2.


Мы сделали апгрейд с Cassandra 3.11.6 на Cassandra 4.0~alpha4 и меняли JDK следующим кодом:


#!/usr/bin/env bashOLD=$1NEW=$2curl -sL https://github.com/shyiko/jabba/raw/master/install.sh | bash. ~/.jabba/jabba.shjabba uninstall $OLDjabba install $NEWjabba alias default $NEWsudo update-alternatives --install /usr/bin/java java ${JAVA_HOME%*/}/bin/java 20000sudo update-alternatives --install /usr/bin/javac javac ${JAVA_HOME%*/}/bin/javac 20000

Мы использовали следующие значения JDK при вызове jabba:


  • openjdk@1.11.0-2
  • openjdk@1.14.0
  • openjdk-shenandoah@1.8.0
  • openjdk-shenandoah@1.11.0

OpenJDK 8 был установлен с помощью Ubuntu apt.


Вывод команды java -version для разных JDK, которые мы использовали для бенчмарков:


jdk8


openjdk version "1.8.0_252"OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1~18.04-b09)OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)

jdk8 с Shenandoah


openjdk version "1.8.0-builds.shipilev.net-openjdk-shenandoah-jdk8-b712-20200629"OpenJDK Runtime Environment (build 1.8.0-builds.shipilev.net-openjdk-shenandoah-jdk8-b712-20200629-b712)OpenJDK 64-Bit Server VM (build 25.71-b712, mixed mode)

jdk11


openjdk version "11.0.2" 2019-01-15OpenJDK Runtime Environment 18.9 (build 11.0.2+9)OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

jdk11 с Shenandoah


openjdk version "11.0.8-testing" 2020-07-14OpenJDK Runtime Environment (build 11.0.8-testing+0-builds.shipilev.net-openjdk-shenandoah-jdk11-b277-20200624)OpenJDK 64-Bit Server VM (build 11.0.8-testing+0-builds.shipilev.net-openjdk-shenandoah-jdk11-b277-20200624, mixed mode)

jdk14


openjdk version "14.0.1" 2020-04-14OpenJDK Runtime Environment (build 14.0.1+7)OpenJDK 64-Bit Server VM (build 14.0.1+7, mixed mode, sharing)

CMS


CMS (Concurrent Mark Sweep) это текущий дефолтный сборщик мусора в Apache Cassandra. Он был удален из JDK 14, так что все тесты проводились с JDK 8 или 11.


Для CMS параметры были такие:


-XX:+UseParNewGC-XX:+UseConcMarkSweepGC-XX:+CMSParallelRemarkEnabled-XX:SurvivorRatio=8-XX:MaxTenuringThreshold=1-XX:CMSInitiatingOccupancyFraction=75-XX:+UseCMSInitiatingOccupancyOnly-XX:CMSWaitDuration=10000-XX:+CMSParallelInitialMarkEnabled-XX:+CMSEdenChunksRecordAlways-XX:+CMSClassUnloadingEnabled-XX:ParallelGCThreads=8-XX:ConcGCThreads=8-Xms16G-Xmx16G-Xmn8G

Флаг -XX:+UseParNewGC удален из JDK 11 и является неявным. С этим флагом JVM не запустится.


Для CMS мы задали максимальный размер кучи 16 ГБ, иначе между сборками будут слишком длинные паузы.


G1


G1GC (сборщик мусора по принципу Garbage-First) настроить легче, чем CMS, потому что он динамически изменяет размер младшего поколения, но лучше работает с большими кучами (от 24 ГБ). Это объясняет, почему он не стал дефолтным сборщиком мусора. Задержки у него выше, чем у настроенного CMS, зато пропускная способность лучше.


Для G1 параметры были такие:


-XX:+UseG1GC-XX:G1RSetUpdatingPauseTimePercent=5-XX:MaxGCPauseMillis=300-XX:InitiatingHeapOccupancyPercent=70-XX:ParallelGCThreads=8-XX:ConcGCThreads=8-Xms31G-Xmx31G

Для версии 4.0 мы использовали JDK 14 в тестах с G1.


Мы взяли размер кучи 31 ГБ, чтобы использовать преимущества compressed oops и получить больше адресуемых объектов для кучи минимального размера.


ZGC


ZGC (Z Garbage Collector) это новейший сборщик мусора в JDK, который минимизирует задержку благодаря паузам stop-the-world меньше 10 мс. Предполагается, что благодаря этому размер кучи не будет влиять на продолжительность пауз, что позволит увеличить его до 16 ТБ. Если так и будет, хранилище вне кучи не понадобится, и некоторые аспекты разработки в Apache Cassandra станут проще.


Для ZGC параметры были такие:


-XX:+UnlockExperimentalVMOptions-XX:+UseZGC-XX:ConcGCThreads=8-XX:ParallelGCThreads=8-XX:+UseTransparentHugePages-verbose:gc-Xms31G-Xmx31G

-XX:+UseTransparentHugePages здесь нужен как обходной путь, чтобы избежать включения больших страниц в Linux. В официальной документации по ZGC сказано, что потенциально могут возникать пики задержки, но на практике мы такого не увидели. Может быть, следует протестировать пропускную способность на больших страницах и посмотреть, как это скажется на результате.


ZGC не может использовать compressed oops и не соблюдает порог 32 ГБ. Мы взяли кучу размером 31 ГБ, как и для G1, чтобы у системы был тот же объем свободной оперативки.


Shenandoah


Shenandoah это сборщик мусора с низкой задержкой от Red Hat. Он доступен как бэкпорт в JDK 8 и 11 и в mainline-сборках OpenJDK начиная с Java 13.


Как и ZGC, Shenandoah стремится к параллелизму, чтобы паузы не зависели от размера кучи.
Для Shenandoah параметры были такие:


-XX:+UnlockExperimentalVMOptions-XX:+UseShenandoahGC-XX:ConcGCThreads=8-XX:ParallelGCThreads=8-XX:+UseTransparentHugePages-Xms31G-Xmx31G

Shenandoah может использовать compressed oops, а значит лучше работает с кучами размером чуть меньше 32 ГБ.


Конфигурация Cassandra 4.0 JVM


Cassandra 4.0 поставляется с отдельными файлами jvm.options для Java 8 и Java 11, а именно:


  • conf/jvm-server.options
  • conf/jvm8-server.options
  • conf/jvm11-server.options

Апгрейд до 4.0 будет работать с имеющимся файлом jvm.options версии 3.11, если его переименовать в jvm-server.options, а файлы jvm8-server.options и jvm11-server.options удалить. Но мы такой подход не рекомендуем.


Лучше повторно применить параметры из предыдущего файла jvm.options к новым файлам jvm-server.options и jvm8-server.options. Конкретные файлы опций Java, в основном, связаны с флагами для сборки мусора. Если обновить эти два файла, будет проще настроить jvm11-server.options и перейти с JDK 8 на JDK 11.


Рабочие нагрузки


В бенчмарках мы использовали восемь потоков с ограничением скорости и соотношением записи и чтения 80/20%. tlp-stress широко использует асинхронные запросы, которые легко могут перегрузить ноды Cassandra с ограниченным числом стресс-потоков. В нагрузочных тестах каждый поток отправлял 50 параллельных запросов за раз. Мы создали keyspace с коэффициентом репликации 3, и все запросы выполнялись на уровне согласованности LOCAL_ONE.


Все сборщики мусора и версии Cassandra тестировались по нарастанию 25к, 40к, 45к и 50к операций в секунду, чтобы оценить производительность при разных уровнях нагрузки.


Команды tlp-stress:


tlp-stress run BasicTimeSeries -d 30m -p 100M -c 50 --pg sequence -t 8 -r 0.2 --rate <desired rate> --populate 200000

Все рабочие нагрузки выполнялись 30 минут, загружая от 5 до 16 ГБ данных на ноду и допуская нагрузку от compaction в разумной степени.


Примечание. Мы не пытались оценить максимальную производительность Cassandra, которую можно по-разному настроить в зависимости от рабочей нагрузки. И настраивать сборщики мусора мы тоже не планировали, потому что у них множество параметров для разных нагрузок. Мы просто хотели честно сравнить разные сборщики мусора, в основном на дефолтных параметрах, при одинаковой нагрузке в Cassandra.


Результаты бенчмарков


3.11.6, 25-40к операций/с:



4.0, 25-40к операций/с:



4.0, 45-50к операций/с:




В плане пропускной способности Cassandra 3.11.6 осилила 41к операций в секунду, а Cassandra 4.0 51к, то есть на 25% больше. В обоих случаях использовался CMS. В версии 4.0 много улучшений, которые объясняют эти результаты, особенно в вопросах нагрузки на кучу, вызванной compaction (см. CASSANDRA-14654, например).


Shenandoah в jdk8 на Cassandra 3.11.6 не достигает максимальной пропускной способности в тесте на 40к операций в секунду появляются невыполненные запросы. С jdk11 и Cassandra 4.0 результат получше 49,6к, почти как у CMS. У G1 и Shenandoah с jdk 8 получилось 36к/с на Cassandra 3.11.6.


G1, видимо, усовершенствован в jdk14 и показал себя немного лучше, чем с jdk11, 47 против 50к/с.


ZGC не дотянул до конкурентов ни с jdk11, ни с jdk14, показав 41к/с максимум.








При умеренной нагрузке Shenandoah в jdk8 показывает хороший результат на Cassandra 3.11.6, но под высокой нагрузкой серьезно увеличиваются задержки.


С CMS Cassandra 4.0 показывает среднюю задержку для p99 (99-го перцентиля) от 11 до 31 мс при 50к операций в секунду. Средняя задержка операций чтения для p99 при умеренной нагрузке снизилась с 17 мс в Cassandra 3.11.6 до 11,5 мс в Cassandra 4.0, то есть на целых 30%.


В целом Cassandra 4.0 лучше Cassandra 3.11.6 с теми же сборщиками мусора на 2530% по пропускной способности и задержке.


Shenandoah дает очень низкие задержки при умеренной нагрузке в Cassandra 3.11.6, но с высокими нагрузками справляется не очень.


По задержкам ZGC хорошо себя проявляет при умеренной нагрузке, особенно с jdk14, но при более высокой скорости отстает от Shenandoah. Почти во всех нагрузочных тестах Shenandoah показал минимальные средние задержки для p99 для чтения и записи. Если сложить эти задержки с пропускной способностью в Cassandra 4.0, получится что этот сборщик мусора неплохой выбор, если вы обновляете версию. Средняя задержка 2,64 мс при чтении для p99 при умеренной нагрузке это впечатляет. Тем более на клиенте.


G1, в основном, соответствует настроенному максимальному времени паузы 300 мс для максимального значения p99, но с меньшей паузой при высоких нагрузках могут возникать нежелательные эффекты, и пауза может быть больше.


При умеренной нагрузке Shenandoah дает среднюю задержку для p99 на 77% ниже. 2,64 мс это самый низкий результат. Полезно для сценариев, где важна задержка. По сравнению с CMS в Cassandra 3.11.6 это на целых 85% ниже для операций чтения на p99!


Отдадим должное ZGC в jdk14, который хорошо себя показал при умеренных нагрузках, но с увеличением скорости, увы, не справился. Мы надеемся, что через несколько месяцев он станет лучше и сможет конкурировать с Shenandoah.


Итоги


G1 улучшает юзабилити Cassandra не нужно настраивать размеры поколений в ущерб производительности. В версии Apache Cassandra 4.0, которая и без того работает гораздо лучше, можно будет использовать сборщики мусора нового поколения, например Shenandoah и ZGC, которые не требуют тщательной настройки и сокращают задержки.


Мы не рекомендуем Shenandoah для Cassandra 3.11.6, поскольку он не справляется с высокими нагрузками, но начиная с jdk11 и Cassandra 4.0 этот сборщик мусора показывает низкие задержки и обеспечивает почти максимальную пропускную способность для базы данных.


Ваши цифры могут отличаться в зависимости от рабочей нагрузки, но результаты позволяют надеяться, что в будущем Apache Cassandra станет неплохим инструментом для сценариев, где особо важны задержки. Во всяком случае, новая версия показала себя гораздо лучше, чем Cassandra 3.11.6.


Загрузите последнюю сборку Apache 4 и попробуйте сами. Будем рады обратной связи тут или в Slack компании ASF.


От редакции: приглашаем на открытую онлайн-конференцию Cassandra Day Russia 2021 в субботу 27 марта. В программе доклады и воркшопы от опытных NoSQL-специалистов.
Подробнее..

Cassandra в Yelp

07.05.2021 10:12:10 | Автор: admin

image


Yelp это крупнейшее в США приложение для заказа еды и услуг. Оно установлено более чем на 30 млн уникальных устройств, в нём зарегистрировано более 5 млн. компаний. Для хранения и доступа к данным в Yelp используют Cassandra. Как и для каких задач применяется эта база данных, на конференции Cassandra Day Russia 2021 рассказал Александр Широков, Database Reliability Engineer в Yelp.


Cassandra это база данных из семейства NoSQL. Согласно полному определению, это distributed wide-column NoSQL datastore. Cassandra оптимизирована для writeов, и consistency запросов можно двигать на очень низком уровне.


Не углубляясь в детали, я расскажу, как мы используем Cassandra в Yelp. У нас это один из самых популярных инструментов. Мы применяем MySQL и Cassandra. Если MySQL все девелоперы знают, то Cassandra нет. Но это то, что мы используем постоянно и на довольно большом масштабе.


Я расскажу:


  1. Как мы автоматизируем изменение схемы: как девелоперы изменяют таблицы, добавляют таблицы или колонки. Если позволить им делать это по своему усмотрению, то получается очень много проблем. Чтобы их избежать, мы этот процесс автоматизировали.
  2. Как разработчик получает доступ к Cassandra. Один из вариантов находить каждый инстанс Cassandra-кластера через host и port, но это решение не масштабируется. Я покажу статистику по нашим кластерам и вы увидите, что на нашем масштабе нужно что-то другое.
  3. Закончу одним из проектов, над которым мы работаем сейчас, и который выглядит довольно многообещающим.

Как используется Cassandra в Yelp


В любой месяц отправляется около 90 млрд запросов, в любой момент времени задеплоено около 60 Kubernetes-кластеров и около 500 подов (один под один инстанс Cassandra).


С точки зрения нагрузки на кластеры, наша самая высокая Read Load около 600 тыс. RPS на один из наших кластеров, Write Load около 400 RPS. В целом довольно высокая нагрузка.


image


В Yelp мы поддерживаем широкий спектр пользовательских кейсов. Мы поддерживаем и аналитический процессинг, и процессинг транзакций. Например, есть какой-то batch-сервис, онлайн или веб-сервис, который отправляет запросы в Cassandra и что-то пишет в Cassandra.


Также наша стриминг-инфраструктура зависит от Cassandra. Плюс мы собираем в Cassandra данные для аналитики, агрегируем какие-то метрики; от Cassandra зависит наша caching-инфраструктура и distributed tracing возможность для разработчиков трейсить жизненный цикл реквеста в Yelp, смотреть, как реквест движется между микросервисами (трейсинг инфраструктуры).


Как вы видите, спектр юзкейсов довольно широкий.


Мы деплоим Cassandra в основном в двух регионах: us-east и us-west.


image


Несмотря на то, что мы деплоим на us-east и us-west, каждый наш кластер называется multi region cluster. Если клиент хочет написать в us-east, а тот недоступен, тогда он напишет в us-west.


В каждом регионе мы деплоим (так как у нас инфраструктура основана на Kubernetes) наш собственный разработанный Kubernetes-оператор. Роль этого оператора мы даём ему кастомные ресурсы смотреть на Cassandra: количество подов, которые должны быть в любой момент времени, или какие-то значения CPU и памяти. Оператор просто сравнивает те значения, которые мы ему даём, с тем, что происходит на самом деле мониторит кластер. И если случается несовпадение, то оператор поднимет ещё один под.


Например, оператору сказали, что нужно пять подов, а сейчас задеплоено только четыре, он поднимет ещё один. Это снимает много работы с нас, как с команды, которая занимается поддержкой инфраструктуры. Синхронизация происходит с помощью etcd. Etcd это простенький distributed key-value store.


Мы деплоим всё на AWS, для хранения самих данных используем EBS volumes Amazon Elastic Block Storage. Вся инфраструктура основана на Kubernetes, плюс мы разработали собственный оператор.


Для подключения клиента к каждому кластеру, каждому инстансу кластера мы используем продукт Smartstack, а в следующей итерации Envoy.


Как в Yelp делают изменение схемы (Schema Changes)


С чего можно начать автоматизировать этот процесс: можно взять все схемы (schema), которые есть, и засунуть в один репозиторий самый простой вариант. И потом на каждой Cassandra node запустить процесс, который будет смотреть на то, что у нас в репозитории и то, что у нас в Cassandra, сравнивать, и если у нас тут есть какая-нибудь схема (например, какая-то таблица), а в кластере Cassandra она не существует, она просто создаст новую таблицу так, чтобы они просто совпадали.
Если, например, несовпадение, так, чтобы мы знали, что происходит, что делает этот процесс, процесс файлит Jira-тикет.


Это то как мы начинали. Но в этом есть несколько проблем: довольно сложно поддерживать Alter Table стейтменты. Сложно понять, куда это вообще, как это поддерживать. Разработчики не знали про это. Каждый раз, когда разработчикам нужно было изменить таблицу, они шли к нам и спрашивали. Это такая проблема, о которой нам нужно было бы рассказывать разработчикам каждый раз. Можно было бы, конечно, записать это куда-то в документацию, но у кого есть время её читать?


Если разработчики создают какую-то схему, они засабмитят в гит-репозиторий, но у них никакого фидбека, нет обратной связи, когда эта схема будет применена на сам кластер, потому что процесс, который запускается на Cassandra, он, естественно, не постоянный, как butch-процесс, который периодически пулит и пытается применить изменения. Также изменения схем были применены в очень рандомном порядке в наших окружениях. То есть у нас есть несколько окружений: development, staging, production, и возможен был такой вариант, когда production была применена ранее, чем development.


Вторая проблема это то, что нет контекста. Когда разработчики самбитили новую схему или новую таблицу, то мы получали вот такую историю в гит-комит, и непонятно было, что произошло.


image


Если нам нужно было ревью каждого изменения схемы, не было процессов, когда мы, как инженеры по Database Reliability, обязательно бы это делали. И в том числе, когда разработчики пишут их SQL-команды, чтобы создать таблицу, например, не было никакой обратной связи с точки зрения ошибок: они написали, но не было возможности им сказать, что, например, вот этот тип колонки не поддерживается, или вы что-то делаете не так.


То есть довольно много проблем с таким процессом. Даже несмотря на то, что он работал изначально, мы открыли такие небольшие сюрпризы. И нам нужно было объяснять это разработчикам каждый раз, что вот это происходит, и если какие-то изменения были закомичены, то очень сложно найти, где была проблема. Нужен был постоянный дебаг что произошло. Если какая-то SQL statement была написана неправильно или неоптимально.


Как всё работает сейчас


Для начала надо дать терминологию. В Yelp используется pushplan это файл, который хранит инструкции по тому, как задеплоить какой-то код. Мы используем это в довольно широком контексте. Например, в контексте любой инфраструктурной работы. Если нам надо изменить часть инфраструктуры, добавить значение в конфиг, то мы обычно пишем pushplan. И эта pushplan-терминология используется и в случае с Cassandra.


Расскажу буквально в паре слов, как процесс выглядит сейчас с точки зрения инженера в Yelp. Если они хотят создать новую таблицу или новый keyspace в Cassandra, они создают папку pushplan и на каждом dev-боксе, который инженер использует, есть utility-функция или команда, которая называется pushplan. Они создают этот pushplan, этот текстовый файл с инструкциями, где они задают буквально пару значений: Jira-тикет, кластер, где они хотят создать, название keyspace, название таблицы и тип базы данных. В итоге эта функция генерирует SQL-файл.


image


Затем они пишут саму SQL-команду. То есть разработчики пишут, например, create table. Когда они пытаются сгенерировать pushplan, мы тут же можем применить так как всё автоматизировано, у нас есть контроль по тому, что мы можем делать с SQL-командами.


image


Разработчики видят, вот прямо сейчас, получают этот фидбек на их команду. Они видят, например, что стринг это не вали
дный тип данных. Рядом с указанием на ошибку мы даём ссылку на информацию о том, как пофиксить эту ошибку.


image


Разработчик исправляет ошибку, генерирует файл снова, теперь всё сработало отлично.


image


Пример файла, который сгенерируется: есть уникальный ID, имя автора, время, версия и сама команда.


image


Дальше разработчики открывают пул-реквест. Кто-то из команды DRE проверяет и говорит, что всё отлично, и мы мёрджим этот коммит.


После того, как мы замёрджили коммит, девелопер получает уведомление в слаке с результатами его pushplan. То есть он может увидеть в реальном времени, что их команда применилась сначала на окружение dev, потом на stage и prod. И видит, когда это случилось. Разработчики также могут запрашивать статус и видеть, на каких окружениях уже применяется и на каких нет. Если где-то ошибка, то могут увидеть, где она произошла. Для каждого кластера можно видеть историю, как изменялась схема всех таблиц.


image


Какие преимущества мы получили:


  • Разработчикам не надо вручную тестировать SQL-команды. Им не нужно переживать, что они не смогут исправить ошибку, если вдруг допустят её. Мы дадим им фидбек сразу, как только они напишут, довольно автоматизированным способом.
  • Мы как DRE-команда можем делиться best practices. Мы можем им сказать, например, что каждая таблица должна содержать как минимум один primary key; не нужно изменять какие-то колонки или что 20 колонок в одном pushplan это не лучшее решение. Мы также можем добавить правила проверки.

Вот так это и работает.


Cassandra для разработчиков


Теперь я немного расскажу, как Cassandra используется разработчиками: как они получают доступ к Cassandra-кластерам или Cassandra-инстансам.


Снова начну с истории. Около семи или восьми лет назад, когда мы начали использовать Cassandra, у нас был один главный web application и буквально несколько сервисов.


image


Потом мы задеплоили несколько Cassandra-кластеров и каждый сервис или каждая команда, которая работала с этим сервисом, должна была искать Cassandra-кластер вручную: искать host-порт, забивать это вручную в конфиг. С другой стороны, им тоже нужно было искать какой-то драйвер, чтобы отправлять запросы на Cassandra, эти драйверы имели разную версию получалась очень беспорядочная ситуация. И как вы видите, нагрузка между кластерами распределялась неравномерно.


image


К чему мы в итоге пришли довольно рано, это мы задеплоили proxy-сервис, который мы назвали Apollo. По сути это очень простой сервис на Python, который находится между сервисами и между Cassandra-кластерами.


image


Эта абстракция позволила нам решить несколько интересных проблем. Во-первых, разработчикам больше не нужно было думать, как подключиться к Cassandra, какие host и port, не искать всё это вручную, не поддерживать драйверы Cassandra. То есть вся сложность инфраструктуры была абстрагирована от разработчиков. Всё что им нужно знать это host и port сервиса Apollo.


image


Мы используем SmartStack, который я уже упоминал ранее, чтобы автоматически найти инстанс Apollo. То есть сервисам даже не нужно искать host и port, им просто нужно сказать: подключись к Apollo, и SmartStack найдёт нужный инстанс.


Это одна проблема, которую мы решили.


Другая решённая проблема это то, что сейчас мы, как команда DRE, могли производить эксперименты над нашим Cassandra-кластером. Мы могли, например, изменить версию драйвера Cassandra, если нам хотелось, или сделать scale up или scale down кластера, в зависимости от нагрузки, которую мы получаем. Это позволило нам равномерно распределить нагрузку на кластеры.


То есть одним proxy-сервисом мы решили несколько проблем.


Также мы могли изолировать клиентов в их собственный Apollo-кластер. Если, например, есть какие-то клиенты или сервисы, юзкейсы которых очень важны для бизнеса, мы просто изолируем их в отдельный кластер.


Ну и то что я сказал: изолирование сложности инфраструктуры от разработчиков это большая победа. Разработчикам стало очень легко работать над их Query-кодом. Нам было очень легко работать над инфраструктурой. Но проблема в том, что за семь лет, которые этот сервис существует, сервис вырос до довольно большого размера.


image


Идея сервиса в том, чтобы разработчики могли там создавать свой код добавлять бизнес-логику И мы заметили буквально недавно, что сервис вырос и сейчас поддерживает где-то около 140 юзкейсов и коннектится практически к 40 кластерам. И этот сервис хранит около 60 тыс. строк только бизнес-логики. И каждый раз, когда мы деплоим, мы раним 60 тысяч строк integration-тестов, что занимает около часа. Масштаб сервиса расширился до сотен инстансов.


Если разработчикам нужно написать самую простую команду, select-команду, им нужно написать довольно много кода.



Например, тут вы видите, что если они хотят написать простую функцию GET_REVIEWS, им нужно писать клиент, им нужно писать стейтменты, вызвать SQL-команду и возвратить результат.


Так же нужно писать unit-тесты, нужно писать swagger spec, потому что каждый клиент это end point или API в Apollo, и надо писать fake data и конфиги чтобы end-to-end тесты работали правильно.


image


image


Мы заметили, что (как и с pushplan) то, что работало вначале, сейчас не работает. Просто потому, что масштаб уже совсем другой. Мы заметили, что нужен какой-то другой, новый подход к решению этой проблемы, но мы не хотели начинать с абсолютного нуля. У Apollo есть некоторые преимущества, которые решали интересные проблемы: например, изоляция сложности в доступе и управлении инфраструктурой. Это важная проблема.


Но с другой стороны, с каким-то новым продуктом, с заменой Apollo, решить несколько проблем:


  1. Увеличить скорость итераций над клиентским кодом. Избавить инженеров от необходимости писать так много кода.
  2. Облегчить доступ к Cassandra-кластеру, ну и в целом модернизировать технологии, которые существовали.

В итоге мы пришли к продукту, который называется Stargate. Это Open Source продукт, который сделан и используется компанией DataStax. Про него можно думать как про API для базы данных. По сути это очень лёгкий слой между слоем приложения и базой данных.


Как это работает


Вы можете думать о Cassandra-кластере, как о коллекции нод, инстансов, организованных в кольцо. Когда приходит запрос от клиента, одна из нод выбирается как координатор, и её задача скоординировать запрос. Выбрать метрики, которые приходят от consistency requirements, от клиента, и отправить результат обратно клиенту.


image


Как это работает со Stargate. Stargate присоединяется к Cassandra-кластеру как координатор нод. И его задача просто скоординировать запрос: найти реплики Но единственное отличие в том, что Stargate не хранит никакие данные. То есть он решает две проблемы: работает как координатор, и он очень лёгкий просто потому, что ему не нужно думать о том, как хранить данные. Мы добавили небольшой proxy поверх Stargate.


image


Ну и вообще, Stargate можно думать об этом слое поверх Cassandra, который работает как координатор, но также предоставляет API для разработчиков. Он предоставляет REST API, можно коннектиться через gRPC, WebSocket или GraphQL.


image


Мы выбрали GraphQL API.


Почему GraphQL


Stargate берёт Cassandra schema все таблицы, которые в key space существуют, и автоматически генерирует GraphQL-запросы. Что это даёт разработчикам: Type-safe API. GraphQL решает очень интересную проблему over и under fetching. То есть клиенты могут очень чётко сказать: нам нужно вот такие колонки возвратить. Например, то что раньше было сложнее сделать с Apollo. И также мы можем запускать несколько Cassandra-запросов в одном запросе. Если нам нужно, например, синхронизировать какие-то таблицы в разных кластерах в одном запросе.


В целом GraphQL становится новым стандартом в API development и используется в больших технологических компаниях. Например, в Facebook (откуда он и произошёл), в Twitter, Github. В Yelp мы тоже довольно давно используем GraphQL.


Что дальше


Расскажу, что мы планируем делать с проектом дальше. Надеюсь, это вам даст немного больше информации о том, как мы делаем какие-то вещи в Yelp. Расскажу, как мы задеплоили MVP (взяли Stargate и просто задеплоили одну ноду) и потом расскажу немного про клиентские библиотеки то, как микросервисы взаимодействуют друг с другом в Yelp.


Тестирование Stargate


Перед тем как начать любую работу по замене Apollo, мы взяли Stargate и подумали: а что если мы задеплоим одну ноду (один node), которая присоединится к Cassandra-кластеру из трёх реплик, и потом запустим тестирование нагрузки запустим 100 тредов, поднимем нагрузку до 3 тыс. RPS и будем ранить это в течение 24 часов. Что тогда произойдёт? А что если потестим парочку сценариев: зарестартим один из Cassandra нодов или зарестартим сам инстанс? Что произойдёт тогда?


На графике видно, что мы проводили этот тест в течение 24 часов и подняли нагрузку примерно до 3 тыс. RPS.


image


Если посмотреть на rage of latencies всех запросов, то они довольно в допустимом пределе. Если посмотреть на высокий хвост (high tail) задержек (99.9+), то он немного spite.


image


Это дало нам несколько интересных инсайтов по тому, как Stargate работает:


  1. Да, память стабильная, CPU повышается вместе с нагрузкой, которую мы даём на кластер. Чтобы немного понизить high tail latencies, мы немного подтюнили JVM, на которой Stargate основан.
  2. В конце мы автоматизировали это, чтобы мы могли ранить это в любой момент времени, как мы девелопим Stargate.

Client libraries


Если так подумать про микросервисы, то какие варианты того, как они могут общаться между собой? Один из вариантов: отправлять друг другу http-запросы, основанные на REST-интерфейсе. Но мы идём немного дальше и пытаемся облегчить этот процесс для разработчиков.


Каждый сервис определяет свой swagger spec каждый endpoint, который экспозится сервисом. Для нашего сервиса есть один API endpoint GraphQL/{keyspace}. То есть разработчики дают по сути keyspace и отправляют их GraphQL-запрос. Что происходит дальше: мы берём swagger spec и генерируем Client Libraries.


image


На другой стороне сервис, который хочет законнектиться в наш сервис, работает с Client Libraries как с пакетом или локальной функцией, несмотря на то, что внизу, внутри этого пакета, на самом деле абстрагируется сложность отправления всех запросов, таймауты, ретраи.


Вот пример, как клиент коннектился бы к нашему Stargate-клиенту. Ему надо просто зарепортить Client Library, вызвать GraphQL Query метод и отправить запрос.


image


Что это даёт:


  1. Мы автоматически можем абстрагировать сложность Service Discovery (обратите внимание, другой сервис нигде не пишет host и port).
  2. Мы имеем тонкий контроль над request budget. Например, говорим, что на какую-то страницу в Yelp максимально можем потратить 2 секунды. Запрос идёт между кучей микросервисов и мы можем сказать, что некие два микросервиса имеют бюджет 100 миллисекунды. Если мы превышаем этот бюджет, то отправлять таймаут-ответ.
  3. Мы можем работать с distributed tracing инженеры могут следить, что происходит с их запросами.
  4. Можем обеспечивать метрики прозрачности: количество запросов, количество запросов, которые отправили ответ со статусом 200 или другим.

GraphQL Playground


Playground это интерактивный development environment, который экспозится. Если у вас бэкенд на GraphQL, вы можете использовать Playground. Stargate делает это out of the box. По сути это такой IDE, которому вы просто говорите /graphqlschema, и вы можете в интерактивном режиме ранить запросы на Cassandra-кластер, быстро итерировать над своим GraphQL Queries.


Что он даёт: автоматически можно экспозить, автоматически открыть все возможные операции, которые можно сделать над этим кластером.


Operability


С точки зрения Operability, когда мы работали над Stargate, то было буквально несколько вещей, которые нам нужно было сделать так, чтобы включить его в нашу Yelp-инфраструктуру. Первое это Distributed Tracing Integration. Второе это то, что нужно заэкспортить какие-то метрики Stargate. Опять же, Load чтобы мы могли в реальном времени наблюдать, что происходит в кластере, смотреть CPU, память


Так как Stargate присоединяется к Cassandra-кластеру, то это даёт нам такую интересную property, чтобы мы могли заэкспортить Cassandra-метрики из этого кластера. Потом нам нужно было сделать немного изменений с точки зрения логинга (чтобы могли логить какие-то ошибки) и добавить TLS между нодами.

Подробнее..

Перевод Apache Ozone следующее поколение хранилища для платформы больших данных

16.03.2021 10:10:28 | Автор: admin

Распределенная файловая система Apache Hadoop (HDFS) де-факто является файловой системой для больших данных. При этом легко забыть, насколько масштабируемая и надежная HDFS в реальном мире. Наши клиенты управляют кластерами с тысячами узлов; в этих кластерах хранится более 100 петабайт данных, обслуживающих тысячи одновременных клиентов.

Верная своим корням big data, HDFS работает лучше всего, когда большинство файлов имеют большой размер - от десятков до сотен мегабайт. HDFS страдает от известного ограничения небольших файлов, а производительность начинает ухудшаться при более чем 350 миллионах файлов. Существует повышенный спрос на HDFS-подобные системы хранения данных, которые могут масштабироваться до миллиардов маленьких файлов.

Ozone - это распределенное объектное хранилище, которое может управлять как малыми, так и большими файлами. В то время как HDFS предоставляет семантику, подобную POSIX, Ozone архитектурно выглядит и ведет себя как Object Store. Ozone разрабатывается и внедряется командой инженеров и архитекторов, имеющих значительный опыт управления большими кластерами Apache Hadoop. Это дало нам представление о том, что HDFS делает хорошо, и о некоторых вещах, которые можно делать по-другому. Эти уроки повлияли на дизайн и эволюцию Ozone.

При проектировании Озона мы руководствовались следующими принципами:

Strongly Consistent
Строгая консистенция упрощает проектирование приложений. Озон разработан для обеспечения строгой серийности.

Архитектурная простота
Простая архитектура проще в рассуждениях и проще в отладке, когда что-то идет не так. Мы постарались сделать архитектуру Ozone простой даже ценой потенциальной масштабируемости. Однако Ozone не сдаёт позиции, когда речь заходит о масштабировании. Мы разработали его для хранения более 100 миллиардов объектов в одном кластере.

Многослойная архитектура
Для достижения масштабов современных систем хранения данных Ozone является многоуровневой файловой системой. Она отделяет управление пространством имён от слоя управления блоками и узлами, что позволяет пользователям независимо масштабировать по обеим осям.

Безболезненное восстановление
Ключевым преимуществом HDFS является то, что она может эффективно восстанавливаться после таких катастрофических событий, как отключение электроэнергии по всему кластеру без потери данных и без дорогостоящих шагов по восстановлению. Потери в стойке и в узле - относительно незначительные. Озон является таким же надежным перед лицом сбоев.

Открытые исходные коды в Apache
Мы считаем, что сообщество Apache с открытым исходным кодом имеет решающее значение для успеха Ozone. Весь дизайн и разработка Ozone осуществляется в сообществе Apache Hadoop.

Взаимодействие с экосистемой Hadoop
Озон должен использоваться существующей экосистемой Apache Hadoop и связанными с ней приложениями, такими как Apache Hive, Apache Spark и традиционные задания MapReduce. Следовательно, Озон поддерживает:
- Hadoop Compatible FileSystem API (он же OzoneFS). Это позволяет Hive, Spark и т.д. использовать Ozone в качестве хранилища с нулевыми модификациями.
- Data Locality (Локальность данных) была ключом к оригинальной архитектуре HDFS/MapReduce, позволяя планировать задачи на тех же узлах, что и данные. Ozone также поддерживает локализацию данных для приложений.
- Совместное развертывание рядом с HDFS. Ozone может быть установлен в существующем кластере Hadoop и может совместно использовать диски хранения данных с HDFS.

Озон состоит из знакомых для S3 хранилищ понятий - Volumes, Buckets и keys.
Volumes - Volumes аналогичны аккаунтам. Они могут быть созданы или удалены только администраторами. Администратор обычно создает том для всей организации или команды.

Buckets - Том может содержать ноль и более корзин. Корзины Озона похожи на корзины Amazon S3.

Keys - Ключи уникальны внутри данной корзины и похожи на Объекты в S3. Имя ключа может быть любой строкой. Значения представляют собой данные, которые вы храните внутри этих ключей, в настоящее время Ozone не применяет верхний предел размера ключа.

Чтобы масштабировать Ozone до миллиардов файлов, нам нужно было решить два узких места, которые существуют в HDFS:
Масштабируемость пространства имён
Мы больше не можем хранить все пространство имен в памяти одного узла. Ключевым моментом является то, что пространство имен имеет опорную точку, поэтому мы можем хранить только рабочий набор в памяти. Пространство имен управляется сервисом под названием Ozone Manager.

Масштабируемость карты блоков
Эту проблему решить сложнее. В отличие от пространства имён, карта блоков не имеет опорного местоположения, так как узлы хранения (DataNodes) периодически посылают отчеты о каждом блоке в системе. Озон делегирует эту проблему на общий уровень хранилища под названием Hadoop Distributed DataStore (HDDS).

Основные блоки архитектуры:

  • Storage Containers - Контейнеры для хранения, построенные с использованием готовой key-value БД (RocksDB). Типичный размер контейнера - 5 ГБ. Единица репликации, реплики синхронизированы через консенсус-протокол RAFT. Хранится на DataNodes, управляется Storage Container Manager. Эквивалент менеджера блоков HDFS.

  • Консенсусный протокол RAFT через Apache Ratis. RAFT - это консенсус-протокол, похожий по духу на Paxos, но предназначенный для простоты понимания и реализации. Apache Ratis - это Java-реализация с открытым исходным кодом RAFT, оптимизированная для высокой пропускной способности.

  • Storage Container Manager (SCM) - сервис, который управляет жизненным циклом реплицируемых контейнеров. Масштабируется, не отслеживая отдельные блоки данных. Вместо этого он отслеживает контейнеры, которые являются аггрегатами информации о блоках. Storage Container Manager выступает в качестве кластер менеджера, центра сертификации, блок-менеджера и менеджера по репликам. SCM выделяет блоки и распределяет их по узлам. Клиенты читают и записывают эти блоки напрямую. SCM отслеживает все реплики блоков.

  • Hadoop Distributed Data Store (HDDS) - общий уровень распределенного хранения для блоков, не предоставляющих пространства имен.

  • Ozone Manager (OM) - менеджер пространства имен, реализующий архитектуру ключ-значение. Пространство имён Озона - это коллекция томов, каждый из которых состоит из корзин и ключей. OM ведет список томов, корзин и ключей. Ozone Manager будет использовать Apache Ratis для воспроизведения состояния OM. Это обеспечивает высокую доступность для Ozone.

Основные свойства Озона, которые помогают ему достичь масштабируемости:

  • Пространство имён (namespace) в Ozone сохраняется в локальную RocksDB, при таком дизайне может быть легко настроен баланс между производительностью (сохранение всего в памяти) и масштабируемостью (сохранение на диске менее используемых метаданных).

  • Управление пространством имён и блоками разделено на два разных сервиса OzoneManager(OM) и StorageContainerManager(SCM) соответственно. Каждый из них может масштабироваться независимо друг от друга.

  • В отличие от HDFS, отчеты по блокам Ozone передаются через контейнерные отчеты, которые агрегируют отчеты по нескольким блокам в контейнере.

Бенчмаркинг Ozone

Apache Ozone был разработан для преодоления ограничений масштабирования HDFS при небольших файлах и большом общем количестве объектов файловой системы. На текущем оборудовании центра обработки данных ограничение HDFS составляет около 350 миллионов файлов и 700 миллионов объектов файловой системы. Архитектура Ozone устраняет эти ограничения. Ниже сравнивается производительность Ozone при работе с HDFS.

Для тестирования мы выбрали широко используемый эталонный тест TPC-DS и обычный стек Hadoop, состоящий из Hive, Tez, YARN и HDFS наряду с Ozone. В соответствии с текущей потребностью отрасли в разделении вычислений и хранилищ данных, когда используются высокоплотные узлы хранения и эластичные вычисления, мы выполняем эти тесты, используя отдельные DataNodes и NodeManagers. Фундаментальная цель данного исследования и последующих усилий по оптимизации продукта - сделать его сопоставимым по стабильности и производительности с HDFS. И мы хотели бы отметить невероятный объем работы, проделанной сообществом за последние несколько месяцев для достижения этой цели.

Hive на Ozone работает быстрее

Следующие измерения были получены путем создания двух независимых наборов данных по 100 ГБ и 1 ТБ в кластере с 12 выделенными узлами для хранения и 12 выделенными вычислительными узлами. В последнем разделе этой статьи будет представлена более подробная информация о конфигурации.

Следующие иллюстрации показывают, что, учитывая общее время выполнения наших 99 тестовых запросов, на обоих наборах данных Ozone превзошел HDFS в среднем на 3,5%.

Чтобы лучше понять подробные результаты, мы разделили наши запросы на три группы:

1. Более быстрые запросы (запросы, в которых Ozone превосходит HDFS).

2. Незначительно более медленные запросы (запросы, в которых Ozone уступает HDFS на 25% или меньше).

3. Выбросы - запросы, в которых Ozone уступает HDFS более чем на 25%).

Когда данные находятся в Ozone, более чем в 70% случаев запросы выполняются быстрее по сравнению с HDFS. Усилия сообщества, направленные на стабилизацию и улучшение производительности, похоже, окупаются. Но есть еще куда расти.

На следующем графике разброса показана средняя разница во времени выполнения между Ozone и HDFS для каждого отдельного запроса TPC-DS и каждого набора данных. Каждый запрос на графике, который колеблется в районе 0%, показывает незначительную разницу в производительности между Ozone и HDFS. Чтобы нормализовать дисперсию из-за шума, числа были усреднены для каждого запроса за 10 последовательных прогонов.

Значения по оси Y представляют собой разницу во времени выполнения по сравнению со временем выполнения запроса в HDFS. Так, например, 50% означает, что разница составляет половину времени выполнения в HDFS. Это фактически означает, что в Ozone запрос выполнялся в 2 раза быстрее, а -50% (отрицательное значение) означает, что для выполнения запроса в Ozone потребовалось в 1,5 раза большее время, чем в HDFS.

Заключение

Тестовые прогоны показывают, что Ozone с небольшим отрывом лидирует - немногим более чем на 70% запросов TPC-DS. Однако есть несколько выбросов, которые мы активно изучаем, чтобы найти узкие места и сгладить их с наивысшим приоритетом.

В настоящее время Ozone выпущен в GA-версии, готов к работе в продуктиве и включен в дистрибутив Cloudera Data Platform (CDP), мы постоянно находимся в процессе сбора отзывов и продолжения развития распределенной системы хранения больших данных следующего поколения. В рамках платформы Озон интегрирован с Ranger и поддерживает шифрование данных как в пути, так и в покое.
Сообщество пользователей активно растёт, так например Tencent использует Озон в продуктиве на около 1000 узлах, также в списке активных пользователей Bloomberg и Target. Доступны или готовятся к выходу референсные архитектуры от Cisco, Dell и HP, а также материалы от Клаудеры. Из основных планов на ближайшее будущее: реализация HA Storage Container Manager, поддержка Erasure Coding, интеграция с Atlas, NiFi/Kafka/Flink коннекторы и тул для миграции HDFS->Ozone.

Подробная информация о среде

Кластер состоит из 28 однородных физических узлов, оснащенных 20-ядерными процессорами Intel Xeon, 128 ГБ оперативной памяти, 4 дисками по 2 ТБ и сетью 10 ГБ/с. Узлы, работающие под управлением CentOS 7.4 и Cloudera Runtime 7.0.3, которые содержат Hive 3, Hadoop 3, Tez 0.9 и Ozone, созданы из основной ветки Apache с начала января 2020 г.

4 главных узла, один - для Cloudera Manager, Prometheus и Zookeeper, другой - для YARN, третий - для Hive и четвертый - для HDFS и Ozone, поскольку мы используем их в разное время.

12 узлов хранения, с 2 выделенными дисками для хранения данных и 12 вычислительных узлов с 3 дисками, выделенными для YARN и журналов (логов).

Шифрование SSL/TLS было отключено, и использовалась простая аутентификация.

Ozone был настроен на использование одного тома с одним бакетом для имитации семантики HDFS.

Высокая доступность не была включена ни для одной из служб.

Внутренняя сеть кластера используется совместно с другими хостами, и мы знаем, что трафик может быть нестабильным - от довольно низкого до пикового. Отчасти это причина того, что мы усреднили результаты десяти полных тестовых прогонов как для HDFS, так и для Ozone, прежде чем прийти к окончательному результату.

Кэширование результатов в Hive было отключено для всех запросов, чтобы все данные считывались из файловой системы.

Инструменты, которые мы использовали для тестирования, также доступны и имеют открытый исходный код - если хотите, попробуйте его в своей среде. Все инструменты можно найти в этом репозитории.

Дополнительная литература

1. Толеранция аварий в Озоне

2. Apache Hadoop Ozone - обзор хранилища объектов

3. 1 миллиард файлов в Озон

4. Apache Hadoop Ozone - архитектура хранилища объектов

Подробнее..

Поиск по синонимам контролируем процесс или доверяемся нейросетям

28.01.2021 10:04:30 | Автор: admin
image

Первое что нужно сделать при разработке поисковых, диалоговых и прочих систем, основанных на natural language processing это научиться разбирать тексты пользовательских запросов и находить в них сущности рабочей модели. Задача нахождения стандартных сущностей (geo, date, money и т.д.) в целом уже решена, остается лишь выбрать подходящий NER компонент и воспользоваться его функционалом. Если же вам нужно найти элемент, характерный для вашей конкретной модели или вы нуждаетесь в улучшенном качестве поиска стандартного элемента, придется создать свой собственный NER компонент или обучить какой-то уже существующий под свои цели.

Если вы работаете с системами вроде Alexa или Google Dialogflow процесс обучения сводится к созданию простейшей конфигурации. Для каждой сущности модели вы должны создать список синонимов. Далее в дело вступают нейронные сети. Это быстро, просто, очень удобно, все заработает сразу. Из минусов отсутствует контроль за настройками нейронных сетей, а также одна общая для данных систем проблема вероятностный характер поиска. Все эти минусы могут быть совершенно не важны для вашей модели, особенно если в ней ищется одна-две принципиально отличающиеся друг от друга сущности. Но если элементов модели достаточно много, а особенно если они в чем-то пересекаются, проблема становится более значимой.

Если вы проектируете собственную систему, обучаете и настраиваете поисковые компоненты, например от Apache OpenNlp, Stanford NLP, Google Language API, Spacy или Apache NlpCraft для поиска собственных элементов, забот, разумеется, несколько больше, но и контроль над такой системой заметно выше.

Ниже поговорим о том, как нейронные сети используются при поиске сущностей в проекте Apache NlpCraft. Для начала вкратце опишем все возможности поиска в системе.

Поиск пользовательских сущностей в Apache NlpCraft

При построении систем на базе Apache NlpCraft вы можете использовать следующие возможности по поиску собственных элементов:
  • Встроенные компоненты поиска, основанные на конфигурации синонимов элементов. Пример описания элементов моделей, основанных на синонимах, Synonym DSL т.д. приведены в ознакомительной статье о проекте.
  • Использование любого из вышеупомянутых внешних компонентов, интеграция с ними уже предусмотрена, вы просто подключаете их в конфигурации.
  • Применение составных сущностей. Суть в возможности построения новых NER компонентов на основе уже существующих. Подробнее тут.
  • Самый низкоуровневый вариант программирование и подключение в систему своего собственного парсера. Данная задача сводится к имплементации интерфейса и добавлении его в систему через IoC. На входе реализуемого компонента есть все для написания логики поиска сущностей в запросе: сам запрос и его NLP представление, модель и все уже найденные в тексте запроса другими компонентами сущности. Эта имплементация место для подключения нейросетей, использования собственных алгоритмов, интеграции с любыми внешними системами и т.д., то есть точка полного контроля над поиском.

Первый подход, работающий на основе настройки синонимов, не требующий ни программирования, ни текстовых корпусов для обучения модели, является самым простым, универсальным и быстрым для разработки.

Ниже представлен фрагмент конфигурации модели умный дом (подробнее о макросах и synonym DSL можно прочесть по ссылке).
macros: - name: "<ACTION>"   macro: "{turn|switch|dial|control|let|set|get|put}" - name: "<ENTIRE_OPT>"   macro: "{entire|full|whole|total|*}" - name: "<LIGHT>"   macro: "{all|*} {it|them|light|illumination|lamp|lamplight}"...- id: "ls:on" description: "Light switch ON action." synonyms:   - "<ACTION> {on|up|*} <LIGHT> {on|up|*}"   - "<LIGHT> {on|up}"

Элемент ls:on описан очень компактно, при этом данное описание содержит в себе более 3000 синонимов. Вот их малая часть: set lamp, light on, control lamp, put them, switch it all Синонимы конфигурируются в весьма сжатом виде, при этом вполне читаемы.

Несколько замечаний:
  • Разумеется, при работе с поиском по синонимам учитываются начальные формы слов (леммы, стеммы), стоп-слова текста запроса, конфигурируется поддержка прерывистости многословных синонимов и т.д. и т.п.
  • Часть сгенерированных синонимов не будет иметь практического смысла, это вполне ожидаемая плата за компактность записи. Если использование памяти станет узким местом (тут речь должна идти о миллионах и более вариантов синонимов на сущность), стоит задуматься об оптимизации. Вы получите все необходимые warnings при старте системы.

Итак вы полностью управляете процессом поиска ваших элементов в тексте запросов, этот процесс является детерминированным, то есть в итоге отлаживаемым, контролируемым, и имеет возможности для последовательного улучшения. Теперь вам нужно лишь аккуратно составить достаточный список синонимов. Здесь вступает в игру человеческий фактор. На этапе старта проекта можно ограничиться лишь несколькими основными синонимами на элемент, достаточно добавить буквально одно-два слова, все будет работать, но в итоге конечно хочется поддержать максимально полный список синонимов для обеспечения наиболее качественного процесса распознавания.

Что может подсказать нам недостающие синонимы в конфигурации, ведь от ее полноты напрямую будет зависеть качество нашей системы?

Расширение списка синонимов

Первое очевидное направление это в ручном режиме отслеживать логи и анализировать неотвеченные вопросы.

Второе посмотреть в словаре синонимов, что может быть достаточно полезно для очевидных случаев. Один их самых известных словарей wordnet.

image

Работа в ручном режиме может принести определенную пользу, но процесс поиска и конфигурирования дополнительных синонимов элементов здесь явно не стоит автоматизировать.

Помимо этого, разработчики Apache NlpCraft добавили в проект инструмент sugsyn, предлагающий, в процессе использования, дополнительные синонимы для элементов модели.

Компонент sugsyn работает стандартным образом через REST API, взаимодействуя с дополнительным сервером, поставляемым в бинарных релизах ContextWordServer.

Описание ContextWordServer

ContextWordServer позволяет искать синонимы к слову в заданном контексте. В качестве запроса пользователь передает предложение с помеченным словом, к которому нужно подобрать синонимы, а сервер, используя выбранную модель, возвращает список наиболее подходящих слов заменителей.

Изначально в качестве базовой модели был использован word2vec (skip-gram модель), позволяющий строить векторные представления слов (или эмбеддинги). Идея заключалась в том, чтобы посчитать эмбеддинги слов и основываясь на полученном векторном пространстве отобрать слова, находящиеся ближе всего к целевому.

В целом данный подход работал удовлетворительно, но контекст слов учитывался недостаточно хорошо, даже при больших значениях N для n-граммов. Тогда было предложено использование Bert для решения задачи masked language modeling (поиск наиболее подходящих слов, которые можно подставить вместо маски в предложение). В предложении, которое передавал пользователь, маскировалось целевое слово, и выдача Bert являлась ответом. Недостатком использования одного лишь Bert также было получение не совсем удовлетворительного списка слов, подходящих по контексту использования, но не являющихся близкими синонимами заданного.

Решением этой проблемы стало комбинирование двух методов. Теперь, выдача Bert отфильтровывается, выкидывая слова с векторным расстоянием до целевого слова выше определенного порога.

Далее, по результатам экспериментов, word2vec был заменен на fasttext (логический наследник word2vec), показывающий лучшие результаты. Была использована готовая модель, предобученная на статьях из Wikipedia, а модель Bert была заменена на улучшенную модель Roberta, смотри ссылку на предобученную модель.

Итоговый алгоритм не требует использования именно fasttext или Roberta в качестве составных частей. Обе подсистемы могут быть заменены альтернативными, способными решить схожие задачи. Кроме того, для лучшего результата предобученной модели могут быть fine-tuned, либо обучены с нуля.

Описание инструмента sugsyn

Работа с sugsyn, удобней всего осуществляется через CLI. Требуется скачать последнюю версию бинарного релиза, использование NlpCraft через maven central в данном случае будет недостаточно.

Общий принцип работы sugsyn достаточно прост. Он собирает примеры использования всех интентов запрашиваемой модели и объединяет их со всеми сконфигурированными синонимами каждого элемента модели. Созданные гибридные предложения посылаются на ContextWordServer, который возвращает рекомендации по использованию дополнительных слов в запрашиваемом контексте. Обратите внимание, для обучения нейронной сети требуются подготовленные и размеченные данные, но для работы sugsyn тоже требуются подготовленные данные примеры использования, выигрыш в том, что данных для sugsyn нужно совсем немного, кроме того мы используем уже имеющиеся примеры из тестов системы.

Пример. Пусть некий элемент какой-то модели сконфигурирован с помощью двух синонимов: ping и buzz (сокращенный вариант модели, взятой из примера с будильником), и пусть единственный интент модели содержит два примера: Ping me in 3 minutes и In an hour and 15mins, buzz me. Тогда sugsyn пошлет на ContextWordServer 4 запроса:
  • text=ping me in 3 minutes, index=0
  • text=buzz me in 3 minutes, index=0
  • text=in an hour and 15mins, ping me, index=6
  • text=in an hour and 15mins, buzz me, index=6

Это означает следующее для каждого предложения (поле text) предложи мне какие-нибудь дополнительные подходящие слова, кроме уже имеющихся слов в позиции index. ContextWordServer вернет несколько вариантов, отсортированных по итоговому весу (комбинации результатов обеих моделей, влияние каждой модели может быть сконфигурировано), далее отсортированных по повторяемости.

Использование инструмента sugsyn

Приступим к работе
  1. Запускам ContextWordServer, то есть сервер с готовыми, предобученными моделями.
    > cd ~/apache/incubator-nlpcraft/nlpcraft/src/main/python/ctxword
    > ./bin/start_server.sh

    Обратите внимание на то, что ContextWordServer должен быть предварительно проинсталирован и для успешной работы ему требуется python версий 3.6 3.8. См. install_dependencies.sh для linux и mac или мануал по установке для windows. Имейте в виду, что процесс инсталляции влечет за собой скачивание файлов моделей существенных размеров, будьте внимательны.
  2. Запускаем CLI, стартуем в нем server и probe с подготовленной моделью, подробности см. в разделе Quick Start. Для начальных экспериментов можно подготовить свою собственную модель или воспользоваться уже имеющимися в поставке примерами.
  3. Используем команду sugsyn, с одним обязательным параметром идентификатором модели, которая должна быть уже запущена в probe. Второй, необязательный параметр значение минимального коэффициента достоверности результата, поговорим о нем ниже.

image

Все описанное выше расписано шаг за шагом в мануале по ссылке.

Получение результатов для разных моделей

Начнем с примера по прогнозу погоды. Элемент wt:phen сконфигурирован с помощью множества синонимов среди которых rain, storm, sun, sunshine, cloud, dry и т.д., а элемент wt:fcast с помощью future, forecast, prognosis, prediction и т.д.

Вот часть ответа sugsyn на запрос со значением коэффициента minScore = 0.
> sugsyn --mdlId=nlpcraft.weather.ex --minScore=0

"wt:phen": [   { "score": 1.00000, "synonym": "flooding" },   ...   { "score": 0.55013,  "synonym": "freezing" },   ...   { "score": 0.09613, "synonym": "stop"},   { "score": 0.09520, "synonym": "crash" },   { "score": 0.09207, "synonym": "radar" },   ..."wt:fcast": [   { "score": 1.00000, "synonym": "outlook" },   { "score": 0.69549, "synonym": "news" },   { "score": 0.68009, "synonym": "trend" },   ...   { "score": 0.04898, "synonym": "where" },   { "score": 0.04848, "synonym": "classification" },   { "score": 0.04826, "synonym": "the" },   ...

Как мы видим из полученного ответа, чем больше значение коэффициента тем выше ценность предложенных синонимов для элементов модели. Можно сказать, что интересные для использования синонимы здесь начинаются со значения коэффициента равного 0.5. Коэффициент достоверности результата интегральный показатель, учитывающий коэффициенты выдачи моделей и частоту, с которой системой предлагается синоним в разных контекстах.

Посмотрим, что будет предложено в качестве дополнительных синонимов для элементов примера умный дом. Для ls:loc, описывающего расположение элементов освещения (синонимы: kitchen, library, closet и т.д.), предложенные варианты с высокими значениями коэффициента, большим чем 0.5, тоже выглядят заслуживающими внимания:

"lc:loc": [   { "score": 1.00000, "synonym": "apartment" },   { "score": 0.96921, "synonym": "bed" },   { "score": 0.93816, "synonym": "area" },   { "score": 0.91766, "synonym": "hall" },   ...   { "score": 0.53512, "synonym": "attic" },   { "score": 0.51609, "synonym": "restroom" },   { "score": 0.51055, "synonym": "street" },   { "score": 0.48782, "synonym": "lounge" },   ...

Но около коэффициента 0.5 для нашей модели уже попадается откровенный мусор street.

Для элемента x:alarm модели будильник, с синонимами: ping, buzz, wake, call, hit и т.д. имеем следующий результат:

"x:alarm": [   { "score": 1.00000, "synonym": "ask" },   { "score": 0.94770, "synonym": "join" },   { "score": 0.73308, "synonym": "remember" },   ...   { "score": 0.51398, "synonym": "stop" },   { "score": 0.51369, "synonym": "kill" },   { "score": 0.50011, "synonym": "send" },   ... 

То есть для элементов данной модели, с понижением коэффициента качество предложенных синонимов снижается заметно быстрее.

Для элемента x:time модели текущее время (1, 2), с синонимами типа what time, clock, date time, date and time и т.д. имеем следующий результат:
"x:time": [   { "score": 1.00000, "synonym": "night" },   { "score": 0.92325, "synonym": "year" },   { "score": 0.58671, "synonym": "place" },   { "score": 0.55458, "synonym": "month" },   { "score": 0.54937, "synonym": "events" },   { "score": 0.54466, "synonym": "pictures"},   ...    

Качество предложенных синонимов оказалось неудовлетворительным даже при высоких коэффициентах.

Оценка результатов

Перечислим факторы, от которых зависит качество синонимов, предлагаемых sugsyn для поиска в тексте элементов модели:
  • Количество синонимов, определенных пользователем в конфигурации элементов.
  • Количество и качество примеров запросов к интентам. Под качеством понимается естественность и распространенность добавленных примеров. Спросите гугл который час, и число полученных результатов будет примерно 2 630 000. Спросите час который, результатов будет примерно 136 000. Текст первого запроса более качественный, и для примера он подойдет лучше.
  • Самое главное и непредсказуемое качество зависит от типа самого элемента и типа его модели.

Иными словами, даже при прочих равных условиях, таких как достаточное количество заранее сконфигурированных синонимов и примеров использования, для некоторых видов сущностей нейронная сеть предложит более качественный набор синонимов, а для некоторых менее качественный, и это зависит от природы самих сущностей и моделей. То есть для достижения сопоставимого уровня качества поиска, выбирая из списка предлагаемых к использованию синонимов, для разных элементов и разных моделей мы должны использовать разные значения минимального коэффициента достоверности предлагаемых вариантов. В целом это вполне объяснимо, так как даже одни и те же слова в разных смысловых контекстах могут быть заменены несколько разными наборами слов заменителей. Предобученная модель, используемая по умолчанию с ContectWordServer может быть адаптирована для одних типов сущностей лучше чем для других, однако перенастройка модели может и не улучшить итоговый результат. Так как Apache NlpCraft это opensource решение, вы всегда можете изменить все настройки модели, как параметры, так и саму модель с учетом специфики вашей предметной области. К примеру, для специфичной, изолированной области, имеет смысл обучить модель bert с нуля, поскольку данных об этой области в стандартной модели может оказаться недостаточно.

При работе с sugsyn, помимо идентификатора модели, вы можете использовать всего один параметр minScore, просто ограничивая размер выборки синонимов, отсортированных в порядке снижения их качества. Подобное упрощение используется для облегчения работы специалистов, ответственных за расширение списка синонимов элементов настраиваемой модели. Обилие конфигурационных параметров, свойственное работе с нейронными сетями в данном случае способно лишь запутать пользователей системы.

Заключение

Если вы полностью делегируете задачу поиска ваших сущностей нейронным сетям, то для разных элементов и разных типов моделей вы будете получать результаты, заметно отличающиеся качеством распознавания. При этом вы даже не сможете оценить эту разницу, особенно если вы не контролируете настройки сети. Но и самостоятельная конфигурация для установки желаемого порога срабатывания, не будет являться простой задачей. Причиной, как правило, является недостаток достаточного количества данных, необходимых для корректного обучения и тестирования сети. Используя предлагаемый Apache NlpCraft механизм поиска сущностей через набор синонимов и инструмент, предлагающий варианты обогащения данного списка, вы можете самостоятельно контролировать степень достоверности синонимов вашей модели и выбрать из предлагаемых сетью только одобренные вами подходящие варианты. Фактически вы получаете возможность заглянуть в черный ящик и достать из него лишь то, что вам нужно.
Подробнее..

Как создать субдомен на том же ip, что и основной сайт? Используем apache web server

07.04.2021 10:06:45 | Автор: admin

В прошлой статье я писал, как добавить на сервер мониторинг. Вот только доступ до него не очень удобный, через порт: домен.com:8080/monitorix

Решил переделать, чтобы можно было достучаться по адресу monitorix.домен.com

И тут столкнулся с проблемой:

  • если я хочу делать субдомен только через А-запись DNS - то мне нужен будет отдельный ip адрес, а аренда отдельного ip стоит лишних денег

  • если я делаю через redirect - то в названии не будет monitorix.домен.com - будет просто при вводе адреса субдомен перенаправлять в домен.com:8080/monitorix - а это не то, что мне нужно

Как же быть? Самый простой рабочий вариант, который я нашёл - сделать распределение по субдоменам на уровне веб-сервера, на котором работает мой сайт.

Как это делается (считаем, что apache web server уже установлен):

Добавление A-записи сервера

Необходимо будет добавить А-запись.

А-запись должна вести на ваш основной сайт - на тот ip адрес, по которому работает apache web server:

Активация модулей apache web server

Для активации модулей нужно запустить последовательно следующий код,

2enmod proxy   --у меня сработало без этой строкиa2enmod proxy_httpa2enmod proxy_balancera2enmod lbmethod_byrequests

После чего необходимо перезапустить службу,

systemctl restart apache2

Добавление конфига для субдомена

Нужно перейти в папку etc/apache2/sites-available/ после чего создать в ней файл с конфигом вашего субдомена:

Файл субдомен.домен.com.conf

<VirtualHost *:80>

ServerAdmin admin@домен.com

ServerName субдомен.домен.com

ServerAlias субдомен.доменcom

ProxyRequests Off


#ProxyPass / http://localhost:8080/

<Location />

ProxyPreserveHost On

ProxyPass http://домен.com:8080/monitorix/

ProxyPassReverse http://домен.com:8080/monitorix/

</Location>

# Uncomment the line below if your site uses SSL.

#SSLProxyEngine On

</VirtualHost>

После чего добавить этот конфиг в apache web server:

a2ensite субдомен.домен.com.conf

Ещё раз перезапускаем web сервер:

systemctl restart apache2

После чего вы можете перейти на ваш субдомен и наслаждаться тем, что он открылся с другого порта того же ip адреса, что и основной сайт. И в адресной строке будет название субдомена, а не порта сайта.

Подробнее..

Перевод Как Apache Spark 3.0 увеличивает производительность ваших SQL рабочих нагрузок

01.06.2021 12:20:22 | Автор: admin

Практически в каждом секторе, работающем со сложными данными, Spark "де-факто" быстро стал средой распределенных вычислений для команд на всех этапах жизненного цикла данных и аналитики. Одна из наиболее ожидаемых функций Spark 3.0 - это новая платформа Adaptive Query Execution (AQE), устраняющая проблемы, которые возникают при многих рабочих нагрузках Spark SQL. Они были задокументированы в начале 2018 года командой специалистов Intel и Baidu. Для более глубокого изучения фреймворка можно пройти наш обновленный курс по тюнингу производительности Apache Spark (Apache Spark Performance Tuning).

Наш опыт работы с Workload XM, безусловно, подтверждает реальность и серьезность этих проблем.

AQE был впервые представлен в Spark 2.4, но в Spark 3.0 и 3.1 он стал намного более развитым. Для начала, давайте посмотрим, какие проблемы решает AQE.

Недостаток первоначальной архитектуры Catalyst

На диаграмме ниже представлен вид распределенной обработки, которая происходит, когда вы выполняете простой group-by-count запрос с использованием DataFrames.

Spark определяет подходящее количество партиций для первого этапа, но для второго этапа использует по умолчанию "магическое число" - 200.

И это плохо по трем причинам:

1. 200 вряд ли будет идеальным количеством партиций, а именно их количество является одним из критических факторов, влияющих на производительность;

2. Если вы запишете результат этого второго этапа на диск, у вас может получиться 200 маленьких файлов;

3. Оптимизация и ее отсутствие имеют косвенный эффект: если обработка должна продолжаться после второго этапа, то вы можете упустить потенциальные возможности дополнительной оптимизации.

Что можно сделать? Вручную установить значение этого свойства перед выполнением запроса с помощью такого оператора:

spark.conf.set(spark.sql.shuffle.partitions,2)

Но это также создает некоторые проблемы:

  • Задавать данный параметр перед каждым запросом утомительно.

  • Эти значения станут устаревшими по мере эволюции ваших данных.

  • Этот параметр будет применяться ко всем шаффлингах в вашем запросе.

Перед первым этапом в предыдущем примере известно распределение и объем данных, и Spark может предложить разумное значение для количества партиций. Однако для второго этапа эта информация еще не известна, так как цена, которую нужно заплатить за ее получение, - это выполнение фактической обработки на первом этапе: отсюда и обращение к магическому числу.

Принцип работы Adaptive Query Execution

Основная идея AQE состоит в том, чтобы сделать план выполнения не окончательным и перепроверять статус после каждого этапа. Таким образом, план выполнения разбивается на новые абстракции этапов запроса, разделенных этапами.

Catalyst теперь останавливается на границе каждого этапа, чтобы попытаться применить дополнительную оптимизацию с учетом информации, доступной на основе промежуточных данных.

Поэтому AQE можно определить как слой поверх Spark Catalyst, который будет изменять план Spark "на лету".

Есть недостатки? Некоторые есть, но они второстепенные:

  • Выполнение останавливается на границе каждого этапа, чтобы Spark проверил свой план, но это компенсируется увеличением производительности.

  • Пользовательский интерфейс Spark труднее читать, потому что Spark создает для данного приложения больше заданий, и эти задания не подхватывают группу заданий и описание, которое вы задали.

Адаптивное количество перемешиваемых партиций

Эта функция AQE доступна, начиная с версии Spark 2.4.

Чтобы включить ее, вам нужно установить для spark.sql.adaptive.enabled значение true, значение по умолчанию - false. Когда AQE включено, количество партиций в случайном порядке регулируется автоматически и больше не равно 200 по умолчанию или заданному вручную значению.

Вот как выглядит выполнение первого запроса TPC-DS до и после включения AQE:

Динамическая конвертация Sort Merge Joins в Broadcast Joins

AQE преобразует соединения sort-merge в broadcast хэш-соединения, если статистика времени выполнения любой из сторон соединения меньше порога broadcast хэш-соединения.

Вот как выглядят последние этапы выполнения второго запроса TPC-DS до и после включения AQE:

Динамическое объединение shuffle партиций

Если количество разделов в случайном порядке больше, чем количество групп по ключам, то много циклов ЦП теряется из-за несбалансированного распределения ключей.

Когда оба:

spark.sql.adaptive.enabled и

spark.sql.adaptive.coalescePartitions.enabled

установлены на true, Spark объединит смежные перемешанные разделы в соответствии с целевым размером, указанным в spark.sql.adaptive.advisoryPartitionSizeInBytes. Это делается, чтобы избежать слишком большого количества мелких задач.

Динамическая оптимизация обьединений с перекосом

Skew (перекос) - это камень преткновения распределенной обработки. Это может задержать обработку буквально на несколько часов:

Без оптимизации время, необходимое для выполнения объединения, будет определяться самым большим разделом.

Оптимизация skew join, таким образом, разобъет раздел A0 на подразделы, используя значение, указанное park.sql.adaptive.advisoryPartitionSizeInBytes, и присоединит каждый из них к соответствующему разделу B0 таблицы B.

Следовательно, вам необходимо предоставить AQE свое определение перекоса.

Это включает в себя два параметра:

1. spark.sql.adaptive.skewJoin.skewedPartitionFactor является относительным: партиция считается с пересом, если ее размер больше, чем этот коэффициент, умноженный на средний размер партиции, а также, если он больше, чем

2. spark.sql.adaptive.skewedPartitionThresholdInBytes, который является абсолютным: это порог, ниже которого перекос будет игнорироваться.

Динамическое сокращение разделов

Идея динамического сокращения разделов (dynamic partition pruning, DPP) - один из наиболее эффективных методов оптимизации: считываются только те данные, которые вам нужны. Если в вашем запросе есть DPP, то AQE не запускается. DPP было перенесено в Spark 2.4 для CDP.

Эта оптимизация реализована как на логическом, так и на физическом уровне.

1. На логическом уровне фильтр размера идентифицируется и распространяется через обьединение на другую часть сканирования.

2. Затем на физическом уровне фильтр выполняется один раз в части измерения, и результат транслируется в основную таблицу, где также применяется фильтр.

DPP в действительности может работать с другими типами обьединений (например, SortMergeJoin), если вы отключите spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly.

В этом случае Spark оценит, действительно ли фильтр DPP улучшает производительность запроса.

DPP может привести к значительному увеличению производительности высокоселективных запросов, например, если ваш запрос фильтрует данные за один месяц из выборки за 5 лет.

Не все запросы получают такой впечатляющий прирост производительности, но 72 из 99 запросов TPC-DS положительно влияют на DPP.

Заключение

Spark прошел долгий путь от своей первоначальной базовой парадигмы: неспешного выполнения оптимизированного статического плана для статического набора данных.

Анализ статического набора данных был пересмотрен из-за потоковой передачи: команда Spark сначала создала довольно неуклюжий дизайн на основе RDD, прежде чем придумать лучшее решение с использованием DataFrames.

Часть статического плана подверглась сомнению со стороны SQL, а структура адаптивного выполнения запросов в некотором роде - это то, чем является структурированная потоковая передача для исходной потоковой библиотеки: элегантное решение, которым она должна была быть с самого начала.

Благодаря фреймворку AQE, DPP, усиленной поддержке графических процессоров и Kubernetes перспективы увеличения производительности теперь весьма многообещающие, поэтому мы и наблюдаем повсеместный переход на Spark 3.1

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru