Русский
Русский
English
Статистика
Реклама

Hadoop

Перевод Экономичная конфигурация исполнителей Apache Spark

20.11.2020 16:08:20 | Автор: admin

Привет, Хабр! В преддверии старта курса "Экосистема Hadoop, Spark, Hive" подготовили для вас перевод полезной статьи. А также предлагаем посмотреть бесплатную запись демо-урока по теме: "Spark 3.0: Что нового?".


Ищем наиболее оптимальную конфигурацию исполнителей для вашего узла

Количество ЦП на узел

Первый этап в определении оптимальной конфигурации исполнителей (executor) - это выяснить, сколько фактических ЦП (т.е. не виртуальных ЦП) доступно на узлах (node) в вашем кластер. Для этого вам необходимо выяснить, какой тип инстанса EC2 использует ваш кластер. В этой статье мы будем использовать r5.4xlarge, который, согласно прейскуранту на инстансы AWS EC2, насчитывает 16 процессоров.

Когда мы запускаем наши задачи (job), нам нужно зарезервировать один процессор для операционной системы и системы управления кластерами (Cluster Manager). Поэтому мы не хотели бы задействовать под задачу сразу все 16 ЦП. Таким образом, когда Spark производит вычисления, на каждом узле у нас остается только 15 доступных для аллоцирования ЦП.

Количество ЦП на исполнителя

Теперь, когда мы узнали, сколько ЦП доступно для использования на каждом узле, нам нужно определить, сколько ядер (core) Spark мы хотим назначить каждому исполнителю. С помощью базовой математики (X * Y = 15), мы можем посчитать, что существует четыре различных комбинации ядер и исполнителей, которые могут подойти для нашего случая с 15 ядрам Spark на узел:

Возможные конфигурации исполнителейВозможные конфигурации исполнителей

Давайте исследуем целесообразность каждой из этих конфигураций.

Один исполнитель с пятнадцатью ядрами

Самое очевидное решение, которое приходит на ум, - создать одного исполнителя с 15 ядрами. Проблема с большими жирными исполнителями, подобными этому, заключается в том, что исполнитель, поддерживающий такое количество ядер, обычно будет иметь настолько большой пул памяти (64 ГБ+), что задержки на сборку мусора будут неоправданно замедлять вашу работу. Поэтому мы сразу исключаем эту конфигурацию.

Пятнадцать одноядерных исполнителей

Следующее очевидное решение, которое приходит на ум создать 15 исполнителей, каждый из которых имеет только одно ядро. Проблема здесь в том, что одноядерные исполнители неэффективны, потому что они не используют преимуществ параллелизма, которые обеспечивают несколько ядер внутри одного исполнителя. Кроме того, найти оптимальный объем служебной памяти для одноядерных исполнителей может быть достаточно сложно. Давайте немного поговорим о накладных расходах памяти.

Накладные расходы памяти для исполнителя по умолчанию составляют 10% от размера выделенной вашему исполнителю памяти или 384 MB (в зависимости от того, что больше). Однако на некоторых big data платформах, таких как Qubole, накладные расходы зафиксированы на определенном значении по умолчанию, вне зависимости от размера вашего исполнителя. Вы можете проверить ваш показатель накладных расходов, перейдя во вкладку Environments в логе Spark и выполнив поиск параметра spark.executor.memoryOverhead.

Накладные расходы памяти по умолчанию в Spark будут очень маленьким, что результирует в проблемах с вашими задачами. С другой стороны, фиксированное значение накладных расходов для всех исполнителей приведет к слишком большому объему служебной памяти и, следовательно, оставит меньше места самим исполнителям. Выверить идеальный размер служебной памяти сложно, поэтому это еще одна причина, по которой одноядерный исполнитель нам не подходит.

Пять исполнителей с тремя ядрами или три исполнителя с пятью ядрами

Итак, у нас осталось два варианта. Большинство руководств по настройке Spark сходятся во мнении, что 5 ядер на исполнителя это оптимальное количество ядер с точки зрения параллельной обработки. И я тоже пришел к выводу, что это правда, в том числе, на основании моих собственных изысканий в оптимизации. Еще одно преимущество использования пятиядерных исполнителей по сравнению с трехъядерными заключается в том, что меньшее количество исполнителей на узел требует меньшее количество служебной памяти. Поэтому мы остановимся на пятиядерных исполнителях, чтобы минимизировать накладные расходы памяти на узле и максимизировать параллелизм внутри каждого исполнителя.

--executor-cores 5

Объем памяти на узел

Наш следующий шаг определить, сколько памяти назначить каждому исполнителю. Прежде чем мы сможем это сделать, мы должны определить, сколько физической памяти на нашем узле нам вообще доступно. Это важно, потому что физическая память это жесткое ограничение для ваших исполнителей. Если вы знаете, какой инстанс EC2 используете, значит, вы знаете и общий объем памяти, доступной на узле. Про наш инстанс r5.4xlarge AWS сообщает, что у него 128 ГБ доступной памяти.

Однако доступными для использования вашими исполнителями будут не все 128 ГБ, так как память нужно будет выделить также и для вашей системы управления кластерами. На рисунке ниже показано, где в YARN искать сколько памяти доступно для использования после того, как память была выделена для системы управления кластерами.

Мы видим, что на узлах этого кластера исполнителям доступно 112 ГБ.

Объем памяти на исполнителя

Если мы хотим, чтобы три исполнителя использовали 112 ГБ доступной памяти, то нам следует определить оптимальный размер памяти для каждого исполнителя. Чтобы вычислить объем памяти доступной исполнителю, мы попросту делим доступную память на 3. Затем мы вычитаем накладные расходы на память и округляем до ближайшего целого числа.

Если служебная память у вас фиксированная (как в случае с Qubole), вы будете использовать эту формулу. (112/3) = 372,3 = 34,7 = 34.

Если вы используете дефолтный метод Spark для расчета накладных расходов на память, вы будете использовать эту формулу. (112/3) = 37 / 1,1 = 33,6 = 33.

В оставшейся части этой статьи мы будем использовать фиксированный объем накладных расходов памяти для Qubole.

--executor-memory 34G

Чтобы по настоящему начать экономить, опытным тюнерам Spark необходим следующий сдвиг в парадигме. Я рекомендую вам в ваших исполнителях для всех задач использовать фиксированные размер памяти и количество ядер. Понимаю, что использование фиксированной конфигурации исполнителей для большинства задач Spark кажется противоречащим надлежащей практике тюнинга Spark. Даже если вы настроены скептически, я прошу вас опробовать эту стратегию, чтобы убедиться, что она работает. Выясните, как рассчитать затраты на выполнение вашей задачи, как описано в Части 2, а затем проверьте это на практике. Считаю, что если вы это сделаете, вы обнаружите, что единственный способ добиться эффективной экономичности облачных затрат это использовать фиксированные размеры памяти для ваших исполнителей, которые оптимально используют ЦП.

С учетом вышесказанного, если при использовании эффективного объема памяти для ваших исполнителей у вас остается много неиспользуемой памяти, подумайте о переносе вашего процесса на другой тип инстанса EC2, у которого меньше памяти на ЦП узла. Этот инстанс обойдется вам дешевле и, следовательно, поможет снизить стоимость выполнения вашей задачи.

Наконец, будут моменты, когда эта экономичная конфигурация не будет обеспечивать достаточную пропускную способность для ваших данных в вашем исполнителе. В примерах, приведенных во второй части, было несколько задач, в которых мне приходилось уходить от использования оптимального размера памяти, потому что нагрузка на память была максимальной во время всего выполнения задачи.

В этом руководстве я все же рекомендую вам начинать с оптимального размера памяти при переносе ваших задач. Если при оптимальной конфигурации исполнителей у вас возникают ошибки памяти, я поделюсь конфигурациями, которые избегают этих ошибок позже в Части 5.

Количество исполнителей на задачу

Теперь, когда мы определились с конфигурацией исполнителей, мы готовы настроить количество исполнителей, которые мы хотим использовать для нашей задачи. Помните, что наша цель - убедиться, что используются все 15 доступных ЦП на узел, что означает, что мы хотим, чтобы каждому узлу было назначено по три исполнителя. Если мы установим количество наших исполнителей кратное 3, мы добьемся своей цели.

Однако с такой конфигурацией есть одна проблема. Нам также нужно назначить драйвер для обработки всех исполнителей в узле. Если мы используем количество исполнителей, кратное 3, то наш одноядерный драйвер будет размещен в своем собственном 16-ядерном узле, что означает, что аж 14 ядер на этом последнем узле не будут использоваться в течение всего выполнения задачи. Это не очень хорошая практика использования облака!

Мысль, которую я хочу здесь донести, заключается в том, что идеальное количество исполнителей должно быть кратным 3 минус один исполнитель, чтобы освободить место для нашего драйвера.

--num-executors (3x - 1)

В Части 4 я дам вам рекомендации о том, сколько исполнителей вы должны использовать при переносе существующей задачи в экономичную конфигурацию исполнителей.

Объем памяти на драйвер

Обычной практикой для data-инженеров является выделение относительно небольшого размера памяти под драйвер по сравнению с исполнителями. Однако AWS на самом деле рекомендует устанавливать размер памяти вашего драйвера таким же, как и у ваших исполнителей. Я обнаружил, что это также очень помогает при оптимизации затрат.

--driver-memory 34G

В редких случаях могут возникать ситуации, когда вам нужен драйвер, память которого больше, чем у исполнителя. В таких случаях устанавливайте размер памяти драйвера в 2 раза больше памяти исполнителя, а затем используйте формулу (3x - 2), чтобы определить количество исполнителей для вашей задачи.

Количество ядер на драйвер

По умолчанию количество ядер на драйвер равно одному. Однако я обнаружил, что задачи, использующие более 500 ядер Spark, могут повысить производительность, если количество ядер на драйвер установлено в соответствии с количеством ядер на исполнителя. Однако не стоит сразу менять дефолтное количество ядер в драйвере. Просто протестируйте это на своих наиболее крупных задачах, чтобы увидеть, ощутите ли вы прирост производительности.

--driver-cores 5

Конфигурация универсальна?

Таким образом, конфигурация исполнителей, которую я рекомендую для узла с 16 процессорами и 128 ГБ памяти, будет выглядеть следующим образом.

--driver-memory 34G --executor-memory 34G --num-executors (3x - 1) --executor-cores 5

Но помните:

Не существует универсальных конфигураций просто продолжайте экспериментировать и рано или поздно вы найдете конфигурацию, идеально подходящую для ваших задач.

Как я уже упоминал выше, эта конфигурация может выглядеть не подходящей для ваших конкретных нужд. Я рекомендую вам использовать эту конфигурацию в качестве отправной точки в процессе оптимизации затрат. Если в этой конфигурации у вас возникают проблемы с памятью, в следующих частях этой серии я порекомендую вам конфигурации, которые позволят вам решить большинство проблем с памятью, возникающих при переходе на экономичные конфигурации.

Поскольку конфигурация узла, используемая в этой статье, достаточно распространена в Expedia Group , я буду ссылаться на нее в остальной части серии. Если ваши узлы имеют другой размер, вам следует использовать метод, который я изложил здесь, чтобы определить идеальную конфигурацию.

Теперь, когда у вас есть оптимальная экономичная конфигурация исполнителей, вы можете попробовать перенести на нее текущие задачи. Но какие задачи вам следует перенести в первую очередь? И сколько исполнителей вы должны запустить с этой новой конфигурацией? А что произойдет, если задача с оптимизированной стоимостью выполняется дольше, чем неоптимизированная задача? И уместно ли когда-либо избыточное использование ЦП? Я отвечу на эти вопросы в Части 4: Как перенести существующие задачи Apache Spark на экономичные конфигурации исполнителей.


Подробнее о курсе "Экосистема Hadoop, Spark, Hive" можно узнать здесь. Также можно посмотреть запись открытого урока "Spark 3.0: что нового?".

Читать ещё:

Подробнее..

Перевод Цепочка пользовательских преобразований DataFrame в Spark

13.05.2021 16:14:30 | Автор: admin

Перевод материала подготовлен в рамках набора студентов на онлайн-курс Экосистема Hadoop, Spark, Hive.

Всех желающих приглашаем на открытый вебинар Тестирование Spark приложений. На этом открытом уроке рассмотрим проблемы в тестировании Spark приложений: стат данные, частичную проверку и запуск/остановку тяжелых систем. Изучим библиотеки для решения и напишем тесты. Присоединяйтесь!


Для цепочки преобразований DataFrame в Spark можно использовать implicit classes или метод Dataset#transform. В этой статье блога будет продемонстрировано, как выстраивать цепочки преобразований DataFrame, и объяснено, почему метод Dataset#transform предпочтительнее, чем implicit classes.

Структурирование кода Spark в виде преобразований DataFrame отличает сильных программистов Spark от "спагетти-хакеров", как подробно описано в статье "Написание идеального кода Spark (Writing Beautiful Spark Code)". После публикации в блоге, ваш код Spark будет намного проще тестировать и повторно использовать.

Если вы используете PySpark, смотрите эту статью о цепочке пользовательских преобразований PySpark DataFrame.

Метод transform (преобразования) набора данных

Метод transform (преобразования) набора данных предоставляет "краткий синтаксис для цепочки пользовательских преобразований".

Предположим, у нас есть метод withGreeting(), который добавляет столбец приветствия к DataFrame, и метод withFarewell(), который добавляет столбец прощания к DataFrame.

def withGreeting(df: DataFrame): DataFrame = {  df.withColumn("greeting", lit("hello world"))}def withFarewell(df: DataFrame): DataFrame = {  df.withColumn("farewell", lit("goodbye"))}

Мы можем использовать метод transform (преобразования) для запуска методов withGreeting() и withFarewell().

val df = Seq(  "funny",  "person").toDF("something")val weirdDf = df  .transform(withGreeting)  .transform(withFarewell)
weirdDf.show()+---------+-----------+--------+|something|   greeting|farewell|+---------+-----------+--------+|    funny|hello world| goodbye||   person|hello world| goodbye|+---------+-----------+--------+

Метод transform (преобразования) можно легко объединить со встроенными методами Spark DataFrame, такими как select.

df  .select("something")  .transform(withGreeting)  .transform(withFarewell)

Если метод transform (преобразования) не используется, то нам придется вложить вызовы методов, и код станет менее читабельным.

withFarewell(withGreeting(df))// even worsewithFarewell(withGreeting(df)).select("something")

Метод transform (преобразования) c аргументами

Пользовательские преобразования DataFrame, использующие аргументы, также могут использовать метод transform (преобразования), используя карринг / списки с несколькими параметрами в Scala.

Давайте воспользуемся тем же методом withGreeting(), что и ранее, и добавим метод withCat(), который принимает в качестве аргумента строку.

def withGreeting(df: DataFrame): DataFrame = {  df.withColumn("greeting", lit("hello world"))}def withCat(name: String)(df: DataFrame): DataFrame = {  df.withColumn("cats", lit(s"$name meow"))}

Мы можем использовать метод transform (преобразования) для запуска методов withGreeting() и withCat().

val df = Seq(  "funny",  "person").toDF("something")val niceDf = df  .transform(withGreeting)  .transform(withCat("puffy"))
niceDf.show()+---------+-----------+----------+|something|   greeting|      cats|+---------+-----------+----------+|    funny|hello world|puffy meow||   person|hello world|puffy meow|+---------+-----------+----------+

Метод transform (преобразования) можно использовать для пользовательских преобразований DataFrame, которые также могут использовать аргументы!

Манкипатчинг с помощью неявных классов (Implicit Classes)

Неявные классы можно использовать для добавления методов в существующие классы. Следующий код добавляет те же методы withGreeting() и withFarewell() к самому классу DataFrame.

object BadImplicit {  implicit class DataFrameTransforms(df: DataFrame) {    def withGreeting(): DataFrame = {      df.withColumn("greeting", lit("hello world"))    }    def withFarewell(): DataFrame = {      df.withColumn("farewell", lit("goodbye"))    }  }}

Методы withGreeting() и withFarewell() можно объединить в цепочку и выполнить следующим образом.

import BadImplicit._val df = Seq(  "funny",  "person").toDF("something")val hiDf = df.withGreeting().withFarewell()

Расширение основных классов работает, но это плохая программистская практика, которой следует избегать.

Избегание неявных классов

Изменение базовых классов известно как манкипатчинг и является восхитительной особенностью Ruby, но может быть рискованным в неопытных руках.
- Санди Метц

Комментарий Санди был адресован языку программирования Ruby, но тот же принцип применим и к неявным классам Scala.

Манкипатчинг обычно не приветствуется в сообществе Ruby, и его следует избегать в Scala.

Spark был достаточно любезен, чтобы предоставить метод transform (преобразования), и вам не потребуется манкипатчинг для класса DataFrame. С помощью некоторых приемов программирования на Scala мы даже можем заставить метод transform работать с пользовательскими преобразованиями, которые могут использовать аргументы. Это делает метод transform явным победителем!


Подробнее о курсе: Экосистема Hadoop, Spark, Hive

Смотреть демо-урок: Тестирование Spark приложений

Подробнее..

Big Data Tools EAP 10 SSH-туннели, фильтрация приложений, пользовательские модули и многое другое

01.09.2020 18:20:28 | Автор: admin

Только что вышла очередная версия плагина Big Data Tools плагина для IntelliJ IDEA Ultimate, DataGrip и PyCharm, который обеспечивает интеграцию с Hadoop и Spark, позволяет редактировать и запускать интерактивные блокноты в Zeppelin.


Основная задача этого релиза поправить как можно больше проблем и улучшить плагин изнутри, но два важных улучшения видно невооруженным глазом:


  • соединяться с Hadoop и Spark теперь можно через SSH-туннели, создающиеся парой щелчков мыши;
  • мониторинг Hadoop может ограничивать объем данных, загружаемых при просмотре списка приложений.


SSH-туннели


Зачастую нужный нам сервер недоступен напрямую, например если он находится внутри защищенного корпоративного контура или закрыт специальными правилами на файерволе. Чтобы пробраться внутрь, можно использовать какой-то туннель или VPN. Самый простой из туннелей, который всегда под руками, это SSH.


Проложить туннель можно одной-единственной консольной командой:


ssh -f -N -L 1005:127.0.0.1:8080 user@spark.server

Немного автоматизировать процесс поможет файл ~/.ssh/config, в который ты один раз сохраняешь параметры соединения и потом используешь:


Host spark    HostName spark.server    IdentityFile ~/.ssh/spark.server.key    LocalForward 1005 127.0.0.1:8080    User user

Теперь достаточно написать в консоли ssh -f -N spark и туннель поднимется сам по себе, без вписывания IP-адресов. Удобно.


Но с этими способами есть две очевидных проблемы.


Во-первых, у кого-то может возникнуть масса вопросов. Что такое -f -N -L? Какой порт писать слева, а какой справа? Как выбирать адреса для соединений? Для всех, кроме профессиональных системных администраторов, такие мучения не кажутся полезными.


Во-вторых, мы программируем не в эмуляторе терминала, а в IDE. Всё, что вы запустили в консоли, работает глобально по отношению ко всей операционной системе, а в IDE хотелось бы для каждого проекта иметь свой собственный набор туннелей.


К счастью, начиная с этой версии в Big Data Tools есть возможность создавать туннели без ручного управления SSH-соединениями.



На скриншоте видно, что можно не только вручную указать все адреса и порты, но и подключить заранее подготовленный файл конфигурации SSH.


Опция Enable tunneling работает для следующих типов соединений:


  • Zeppelin
  • HDFS
  • Hadoop
  • Spark Monitoring

Важно отметить, что под капотом у нее все то же самое, что делает SSH. Не стоит ждать особой магии: например, если ты пытаешься открыть туннель на локальный порт, который уже занят другим приложением или туннелем, то случится ошибка.


Это довольно полезная опция, которая облегчает жизнь в большинстве повседневных ситуаций. Если же тебе нужно сделать что-то действительно сложное и нестандартное, то можно по старинке вручную использовать SSH, VPN или другие способы работы с сетью.


Вся работа велась в рамках задачи BDIDE-1063 на нашем YouTrack.


Управляемые ограничения на отображение приложений


Люди делятся на тех, у кого на странице Spark Monitoring всего парочка приложений, и тех, у кого их сотни.



Загрузка огромного списка приложений может занимать десятки минут, и все это время о состоянии сервера можно только гадать.


В этой версии Big Data Tools вы можете существенно ограничить время ожидания, если вручную выберете диапазон загружаемых данных. Например, можно вызвать диалоговое окно редактирования диапазона дат и вручную выбрать только сегодняшний день.


Эта опция значительно экономит время тех, кто работает с большими продакшенами.


Работа велась в рамках задачи BDIDE-1077.


Подключение модулей в зависимости Zeppelin


У многих в Zeppelin используются зависимости на собственные JAR-файлы. Big Data Tools полезно знать о таких файлах, чтобы в IDE нормально работало автодополнение и другие функции.


При синхронизации с Zeppelin, Big Data Tools пытается получить все такие файлы. Но, по разным причинам, это не всегда возможно. Чтобы Big Data Tools узнал о существовании таких пропущенных файлов, необходимо вручную добавить их в IntelliJ IDEA.


Раньше в качестве зависимостей можно было использовать только артефакты из Maven либо отдельные JAR-файлы. Это не всегда удобно, ведь для получения этих артефактов и файлов нужно или их скачать откуда-то, или собрать весь проект.


Теперь любой модуль текущего проекта тоже можно использовать в качестве зависимости. Такие зависимости попадают в таблицу "User dependencies":



Работа велась в рамках задачи BDIDE-1087.


Множество свежих исправлений


Big Data Tools молодой, активно развивающийся проект. В таких условиях неизбежно появление проблем, которые мы стараемся оперативно устранять. В EAP 10 вошло множество исправлений, значительная часть которых посвящена повышению удобства работы со Spark Monitoring.


  • [BDIDE-1078] Раньше при сворачивании ячеек их заголовки просто не отображались. Теперь они правильно отображаются, но редактировать их из Big Data Tools все еще нельзя это тема для будущих исправлений.
  • [BDIDE-1137] Удаление соединения Spark Monitoring из Hadoop приводило к ошибке IncorrectOperationException.
  • [BDIDE-570] В таблице Jobs в Spark Monitoring у выделенной задачи могло исчезать выделение.
  • [BDIDE-706] При обновлении дерева задач в Spark Monitoring выделенная задача теряла выделение.
  • [BDIDE-737] Если компьютер заснул и вышел из сна, получение информации о приложении в Spark Monitoring требовало перезагрузки IDE.
  • [BDIDE-1049] Перезапуск IDE мог приводить к появлению ошибки DisposalException.
  • [BDIDE-1060] Перезапуск IDE с открытым Variable View (функциональность ZTools) мог привести к ошибке IllegalArgumentException.
  • [BDIDE-1066] Редактирование свойств неактивного соединения в Spark Monitoring приводило к его самопроизвольному включению на панели.
  • [BDIDE-1091] Удаление только что открытого соединения с Zeppelin приводило к ошибке ConcurrentModificationException.
  • [BDIDE-1092] Кнопка Refresh могла не обновлять задачи в Spark Monitoring.
  • [BDIDE-1093] После перезапуска Spark в Spark Monitoring отображалась ошибка подключения.
  • [BDIDE-1094] При отображении ошибки соединения в Spark Monitoring нельзя было изменить размеры окна с ошибкой.
  • [BDIDE-1099] В Spark Monitoring на вкладке SQL вместо сообщения "Loading" могло неверно отображаться сообщение "Empty List".
  • [BDIDE-1119] В Spark Monitoring свойства SQL продолжали отображаться даже при сбросе соединения или перезагрузке интерпретатора.
  • [BDIDE-1130] Если в списке приложений в Spark Monitoring фильтр скрывал вообще все приложения, возникала ошибка IndexOutOfBoundsException.
  • [BDIDE-1133] Таблицы отображали только один диапазон данных, даже если в свойствах таблицы было указано сразу несколько диапазонов.
  • [BDIDE-406] Раньше при соединении с некоторыми экземплярами Zeppelin отображалась ошибка синхронизации. В рамках этого же тикета включена поддержка Zeppelin 0.9, в частности collaborative mode.
  • [BDIDE-746] При отсутствии выбранного приложения или задачи в Spark Monitoring на странице с детализацией отображалась ошибка соединения.
  • [BDIDE-769] При переключении между различными соединениями к Spark Monitoring могла не отображаться информация об этом соединении.
  • [BDIDE-893] Время от времени список задач в Spark Monitoring исчезал, и вместо него отображалось некорректное сообщение о фильтрации.
  • [BDIDE-1010] После запуска ячейки, статус "Ready" отображался со слишком большой задержкой.
  • [BDIDE-1013] Локальные блокноты в Zeppelin раньше имели проблемы с переподключением.
  • [BDIDE-1020] В результате комбинации нескольких факторов могло сбиваться форматирование кода на SQL.
  • [BDIDE-1023] Раньше не отображался промежуточный вывод исполняющихся ячеек, теперь отображается под ними.
  • [BDIDE-1041] Непустые файлы на HDFS отображались как пустые, из-за чего их можно было случайно сохранить и стереть данные.
  • [BDIDE-1061] Исправлен баг в отображении отображением SQL-задач. Раньше было неясно, является ли сервер Spark привязанным к задаче или это History Server.
  • [BDIDE-1068] Временами ссылка на задачу в Spark терялась и появлялась вновь.
  • [BDIDE-1072], [BDIDE-838] Раньше в панели Big Data Tools не отображалась ошибка соединения с Hadoop и Spark.
  • [BDIDE-1083] Если при закрытии IDE работала хоть одна задача с индикатором прогресса, возникала ошибка "Memory leak detected".
  • [BDIDE-1089] В таблицах теперь поддерживается интернационализация.
  • [BDIDE-1103] При внезапном разрыве связи с Zeppelin не отображалось предупреждение о разрыве соединения.
  • [BDIDE-1104] Горизонтальные полосы прокрутки перекрывали текст.
  • [BDIDE-1120] При потере соединения к Spark Monitoring возникала ошибка RuntimeExceptionWithAttachments.
  • [BDIDE-1122] Перезапуск интерпретатора приводил к ошибке KotlinNullPointerException.
  • [BDIDE-1124] Подключение к Hadoop не могло использовать SOCKS-прокси.
Подробнее..

Big Data Tools EAP 12 экспериментальная поддержка Python, поиск по ноутбукам в Zeppelin

16.12.2020 18:10:53 | Автор: admin

Только что вышло очередное обновление EAP 12 для плагина под названием Big Data Tools, доступного для установки в IntelliJ IDEA Ultimate, PyCharm Professional и DataGrip. Можно установить его через страницу плагина или внутри IDE. Плагин позволяет работать с Zeppelin, загружать файлы в облачные хранилища и проводить мониторинг кластеров Hadoop и Spark.


В этом релизе мы добавили экспериментальную поддержку Python и поиск по ноутбукам Zeppelin. Если вы страдали от каких-то багов, их тоже починено множество. Давайте поговорим об этих изменениях более подробно.



Экспериментальная поддержка Python в Zeppelin


Поддержку Python хотелось добавить давно. Несмотря на то, что PySpark в Zeppelin сейчас на волне хайпа, всё что предоставляет нам веб-интерфейс Zeppelin простейшее автодополнение, в котором содержится какой-то наполовину случайный набор переменных и функций. Вряд ли это можно ставить в вину Zeppelin, он никогда не обещал нам умного анализа кода. В IDE хочется видеть что-то намного большее.


Добавить целый новый язык звучит как очень сложная задача. К счастью, в наших IDE уже есть отличная поддержка Python: либо в PyCharm, либо в Python-плагине для IntelliJ IDEA. Нам нужно было взять эту готовую функциональность и интегрировать внутрь Zeppelin. Вместе с этим возникает много нюансов, специфичных для Zeppelin: как нам проанализировать список зависимостей, как найти правильную версию Python, и тому подобное.



Начиная с EAP 12, код на Python нормально подсвечивается в нашем редакторе ноутбуков Zeppelin, отображаются грубые синтаксические ошибки. Можно перейти на определение переменной или функции, если они объявлены внутри ноутбука. Можно сделать привычные рефакторинги вроде rename или change signature. Работают Zeppelin-специфичные таблицы и графики в конце концов, зачастую ради них люди и используют Zeppelin.



Конечно, многие вещи ещё предстоит сделать. Например, очень хотелось бы видеть умное автодополнение по функциям Spark API и другому внешнему коду. Сейчас мы нормально автодополняем только то, что написано внутри ноутбука. У нас есть хорошие идеи, как это реализовать в следующих релизах. Иначе говоря, не надо ждать какого-то чуда: теперь у вас есть полнофункциональный редактор Python, но это всё. Поддержка Python получилась довольно экспериментальной, но, как говорится, путь в тысячу ли начинается с первого шага. А ещё, даже имеющейся функциональности может оказаться достаточно, чтобы писать код в вашем любимом IDE и не переключаться на веб-интерфейс Zeppelin.


Смешиваем Scala и Python


Иногда, в одном и том же ноутбуке вам хочется одновременно использовать и Python, и Scala. Например, это бывает полезно из соображений производительности в вычислительных задачах.


Смешивать разные языки вполне возможно. Но не забывайте, что для полноценной поддержки Scala вам понадобится IntelliJ IDEA с плагинами Scala и Python. В PyCharm этот Scala-код хоть и будет выполняться, но его поддержка в редакторе останется на уровне plain text.



Поиск по ноутбукам Zeppelin


У нас всегда была возможность найти, в каком же файле на диске находится нужный нам текст (например, с помощью Find in Path, Ctrl+Shift+F). Но этот стандартный интерфейс поиска не работает с ноутбуками, ведь они не файлы!


Начиная с EAP 12 мы добавили отдельную панель поиска по ноутбукам. Откройте панель Big Data Tools, выделите какое-нибудь из подключений к Zeppelin и нажмите на кнопку с изображением лупы (или используйте сочетание клавиш Ctrl+F на клавиатуре). В результате, вы попадёте в окно под названием Find in Zeppelin Connections. Активация одного из результатов поиска приведет к открытию этого ноутбука и переходу на нужный параграф.



Похожий поиск есть и в веб-интерфейсе Zeppelin. Для получения результатов поиска мы используем стандартный HTTP API, поэтому результаты должны совпадать с тем, что вы видите в интерфейсе Zeppeliln по аналогичному поисковому запросу. Если вы раньше пользовались веб-интерфейсом и привыкли к поиску по ноутбукам, теперь он имеется и в Big Data Tools.


Функция небольшая, но очень полезная. Непонятно, как мы без неё жили раньше.


Исправления ошибок


Плагин Big Data Tools активно развивается, и при бурном росте неизбежны некоторые проблемы. В этом релизе мы провели много работы над правильной работой с удаленными хранилищами, отображением графиков и параграфов, переработали часть интерефейсов (например, SSH-туннели). Переработаны кое-какие системные вещи (например, несколько проектов теперь используют общее подключение к Zeppelin), вывезли множество ошибок в неожиданных местах. В целом, теперь пользоваться плагином намного приятней.


Если вам интересен обзор основных улучшений, то их можно найти в разделе What's New на странице плагина. Если вы ищете какую-то конкретную проблему, вам может подойти полный отчет из YouTrack.


Спасибо, что пользуетесь нашим плагином! Напоминаю, что установить свежую версию можно либо в браузере, на странице плагина, или внутри IDE по названию Big Data Tools. На странице плагина можно оставлять ваши отзывы и предложения (которые мы всегда читаем), и ставить оценки в виде звёздочек.


Документация и социальные сети


Ну и наконец, если вам нужно разобраться функциональностью Big Data Tools, у нас есть подробная документация в вариантах для IntelliJ IDEA и PyCharm. Если хочется задать вопрос, можно сделать это прямо в комментариях на Хабре или перейти в наш Twitter.


Надеемся, что все эти улучшения окажутся полезными, позволят вам делать более интересыне вещи, и более приятным способом.


Команда Big Data Tools

Подробнее..

Перевод Масштабирование итеративных алгоритмов в Spark

26.01.2021 14:17:08 | Автор: admin

Итеративные алгоритмы широко применяются в машинном обучении, связанных компонентах, ранжировании страниц и т.д. Эти алгоритмы усложняются итерациями, размеры данных на каждой итерации увеличивается, и сделать их отказоустойчивыми на каждой итерации непросто.

В этой статье я бы подробно остановился на некоторых моментах, которые необходимо учитывать при работе с этими задачами. Мы использовали Spark для реализации нескольких итерационных алгоритмов, таких как построение связанных компонентов, обход больших связанных компонентов и т.д. Ниже приведен мой опыт работы в лабораториях Walmart по построению связанных компонентов для 60 миллиардов узлов клиентской идентификации.

Количество итераций никогда не предопределено, всегда есть условие завершения ?.Количество итераций никогда не предопределено, всегда есть условие завершения ?.

Типы итеративных алгоритмов

Конвергентные данные: Здесь мы видим, что с каждой итерацией количество данных уменьшается, т.е. мы начинаем 1-ю итерацию с огромными наборами данных, а размер данных уменьшается с увеличением количества итераций. Основной задачей будет работа с огромными наборами данных в первых нескольких итерациях, и после того, как набор данных значительно уменьшится, можно будет легко справляться с дальнейшими итерациями до их завершения.

Расходящиеся данные: Количество данных увеличивается при каждой итерации, и иногда они могут появляться быстрее и сделать невозможной дальнейшую работу. Для работы этих алгоритмов необходимы такие ограничения, как ограничения по количеству итераций, начальному размеру данных, вычислительной мощности и т.д.

Аналогичные данные: На каждой итерации у нас были бы более или менее одинаковые данные, и с таким алгоритмом было бы очень легко работать.

Инкрементные данные: На каждой итерации у нас могут появляться новые данные, особенно в ML у нас могут появляться новые обучающие наборы данных с периодическими интервалами.

Препятствия

  1. RDD линии: Одним из распространенных способов сохранения отказоустойчивости системы является хранение копий данных в разных местах, чтобы в случае падения одного узла у нас была копия, которая помогала бы до тех пор, пока узел не восстановится. Но Spark не поддерживает дубликаты данных, а поддерживает линейный график преобразований, выполненных на данных в драйвере. Поэтому такой линейный график был бы полезен, если какой-либо фрагмент данных отсутствует, он может построить его обратно с помощью линейного графика, следовательно, Spark является отказоустойчивым. По мере того, как линейный график становится большим, становится трудно строить данные обратно, так как количество итераций увеличивается.

  2. Память и дисковый ввод/вывод: В Spark RDD являются непреложными, поэтому на каждой итерации мы будем создавать новую копию преобразованных данных (новый RDD), что увеличит использование Памяти и Диска. По мере того, как итерации будут увеличивать использование диска/памяти исполнителями, это может привести к замедлению работы из-за нехватки памяти и ожиданию, пока GC выполнит очистку. В некоторых случаях куча памяти будет недостаточной и может привести к невозможности выполнения задачи.

  3. Размер задачи: В некоторых случаях может быть несколько задач, которые могут не подходить для одного исполнителя, или одна задача занимает гораздо больше времени, чем остальные задачи, что может привести к препятствию.

Советы по преодолению вышеуказанных проблем

  1. Хранение большого линейного графика в памяти, и, в случае сбоя узла, восстановление потерянных наборов данных займет много времени. В таких случаях можно использовать кэширование или запись данных о состоянии в контрольной точке на каждой N итерации. Это сохранит рассчитанный RDD на каждой N итерации (кэширование будет храниться в памяти или на диске исполнителей, запись данных о состоянии в контрольной точке использует HDFS, мы должны принять решение исходя из нашей потребности, так как скорость будет различаться для каждой из них). В случае неудачи RDD вычисляется обратно от последней контрольной точки/кэширования. Вместо использования двух вышеупомянутых методов можно также создать временную таблицу и сохранить вычисленный набор данных, разделенный итерацией. В случае неудачи задания Spark, можно сделать перезапуск с последней N-ой итерации, а преимущество сохранения во временную таблицу состоит в том, что можно избавиться от линейного графика RDD до этой итерации и запустить свежий линейный график с этой точки. По мере того, как линейный график RDD растет в итерационных алгоритмах, нам необходимо строить гибридные решения с использованием кэширования, контрольных точек (см. ссылку [2]) и временных таблиц для различных вариантов использования.

  2. Как и выше, сохранение во временную таблицу и чтение из временной таблицы может помочь избавиться от линейного графика и очистить память и диск предыдущих RDD. Такая запись и чтение замедляет процесс, но это даст огромное преимущество при работе с большими наборами данных. Особенно при конвергировании наборов данных, нам может понадобиться выполнять этот процесс только в течение первых нескольких итераций и использовать кэширование, когда наборы данных становятся маленькими при работе с итерациями. Экономия во временной таблице в качестве контрольной точки выглядит тривиально, но она не просто действует как контрольная точка. Так как мы избавляемся от истории линейных графов, делая это на периодических итерациях, это уменьшит риск сбоя в работе и сократит время на построение ее обратной копии из потерянных данных.

  3. Работа с расходящимися данными сложна, так как размер каждой задачи будет увеличиваться с увеличением количества итераций и займет намного больше времени для каждого исполнителя. Поэтому нам нужен фактор для вычисления количества задач в ( i + 1) итерации по сравнению с i-й итерацией таким образом, чтобы размер задачи остался прежним. Например, скажем, количество задач в i-й итерации 100, и каждая задача обрабатывает около 100 МБ данных. В i+1 итерации размер каждой задачи увеличивается до 150 МБ, и мы можем перетасовать эти 100 задач до 150 задач и оставить 100 МБ на каждую задачу. Таким образом, в расходящихся наборах данных нам необходимо увеличить количество задач, переструктурировав и изменив перетасованные разделы на основе итерации.

  4. В случаях, когда размер spark задачи огромен, попробуйте увеличить память исполнителя в соответствии с размером задачи. А если нужно выполнить соединения на искаженных наборах данных, где 10% задач занимает 90% времени исполнения и 90% задач выполняются за 10% времени, эти задачи предлагается обрабатывать отдельно, выполняя их в виде двух разных запросов. Нужно определить причину больших задач, и можем ли мы разделить их на две группы, т.е. маленькие и большие задачи. В 1-м запросе мы бы обработали 90% задач, т.к. нет никаких препятствий для их обработки, и это заняло бы 10% времени, как и раньше. В другом запросе мы бы обрабатывали большие задачи (10% задач) с помощью всенаправленного соединения, так как количество таких задач меньше, а также избегали бы перетасовки данных.

Пример: Допустим, у нас есть таблица А и таблица Б. Таблица А это данные о населении со столбцами user_id, имя, город, штат. Таблица B это то, что группирует данные со столбцами user_id, group_id. Например, мы пытаемся найти 5 крупнейших городов с наибольшим количеством используемых групп. В этом примере могут быть тупиковые ситуации, как города с большим количеством населения могут быть большой задачей, пользователи с большим количеством групп могут привести к большим задачам. Для решения этих тупиковых ситуаций, объединение между этими таблицами может быть сделано в двух запросах. Мы можем отфильтровать больших пользователей с большим количеством групп (скажем, порог в 1000 групп на пользователя) и относиться к ним как к большим задачам. И выполнять соединения отдельно для больших пользователей, используя всенаправленное объединение, так как количество больших пользователей будет мало по сравнению с общими данными. Аналогичным образом, для остальных пользователей выполняйте тасовку объединения и объединяйте результаты и агрегируйте по городам, чтобы найти 5 лучших городов.


А прямо сейчас в OTUS открыт набор на курс Экосистема Hadoop, Spark, Hive.

Всех желающих приглашаем записаться на бесплатный демо-урок по теме Spark Streaming.

ЗАБРАТЬ СКИДКУ

Подробнее..

Перевод Сеть в bitly Linux tc для минимизации издержек и забавы ради

02.06.2021 20:09:34 | Автор: admin

Представьте, что вы, например, bitly то есть очень большой сервис сокращения ссылок. И вот, вы хотите скопировать свои 150 ТБ сжатых данных с одного физического кластера на другой, новый. Чтобы сделать это, вы запускаете distcp из набора инструментов hadoop и рады тому, насколько быстро он работает. Но, несколько позже, вы уже совсем не радуетесь жалобам обычных пользователей веб-сайта и API-клиентов случаются ошибки, задерживаются ответы, а данные их дата-центра только запутывают. К старту курса о DevOps мы перевели материал о том, что делать, если вы, как и bitly, оказались в подобной ситуации.


Мы в bitly очень любим работать с данными и устройствами, а если говорить точнее, то с устройствами для передачи данных. Сегодня расскажем историю о том, как мы пытались перекачать через устройства очень и очень много данных.

Важной частью инфраструктуры bitly и основным инструментом команды Data Science и рабочей группы обслуживания операций и инфраструктуры (Ops/Infra) в течение довольно долгого времени был физический кластер hadoop набор утилит, библиотек и фреймворк для разработки и выполнения распределённых программ, работающих на кластерах из сотен и тысяч узлов. Мы уже давно вынашивали идею подключения нового кластера, копирования и объединения данных старого кластера с данными нового.

И вот, были созданы ветки, проведена вся подготовительная работа, сгенерированы серверы и запущен новый кластер. Но, прежде чем приступить к изложению истории, приведём некоторые технические детали:

  • bitly работает с огромными объёмами данных: на момент миграции кластер hadoop занимал более 150 ТБ дискового пространства сжатых потоковых данных, то есть данных, полученных в результате работы наших различных приложений. Объёмы таких данных продолжали увеличиваться в результате дополнительной обработки другими приложениями;

  • физически инфраструктура bitly располагается в центре обработки данных нашего партнёра. Она состоит из трёх физических видов шасси (приложения, хранилище и база данных), расположенных в ряд в смежных стойках. На момент написания этой статьи каждое шасси имело три физических гигабитных Ethernet-канала (каждый логически изолирован посредством организации сети VLAN) прямой канал, обратный канал и схему удалённого управления (для внеполосного управления серверными шасси). Каждое соединение после прохождения через ряд патч-панелей и коммутаторов подключается к нашим главным коммутаторам через стеклянное оптоволокно 10 Gb по звездообразной схеме сети;

  • инфраструктура bitly занимает значительные физические ресурсы (сотни физических серверных шасси), однако управлением такой сетевой инфраструктурой и топологией занимается наш партнёр в центре обработки данных. Другими словами, на большинстве уровней физического сетевого стека мы имеем крайне ограниченную видимость сетевых операций, не говоря уже об их контроле.

Вернёмся к нашей истории.

Для быстрого копирования данных с одного кластера на другой мы воспользовались инструментом distcp, поставляемом в комплекте с кластером hadoop. Говоря просто, инструмент distcp выдаёт задание программному фреймворку mapreduce (используемому для параллельных вычислений над очень большими наборами данных в компьютерных кластерах) на перемещение данных из одного кластера hdfs в другой при копировании узлов в режиме "многие ко многим". Инструмент distcp сработал быстро, и это нас порадовало.

Но тут сломался сервис bitly, и это не привело в восторг команду Ops/Infra.

Пользователи веб-сайта и API-клиенты стали жаловаться на ошибки и задержки в получении ответов. Нам удалось выяснить, что причиной ошибок было получение нашим сервисом ошибок от других сервисов, что приводило к превышению времени ожидания при вызове баз данных и даже сбоям при обработке DNS-запросов внутри нашей собственной сети. Мы выяснили, что копирование данных привело к необъяснимому увеличению нагрузки на сетевые устройства, особенно на устройства, передающие трафик через физические стойки. Мы надеялись решить проблему, получив информацию от нашего партнёра из центра обработки данных, но эта информация не только не помогла, но и ещё больше нас запутала: ни одно соединение и ни одна стойка не проявляли никаких признаков насыщения, перегруженности или ошибок.

Теперь нам нужно было решать сразу две противоречивые задачи: с одной стороны, надо было продолжать миграцию hadoop, а с другой устранить неполадки и понять, в чём же кроется наша сетевая проблема.

Мы ограничили количество диспетчеров mapreduce, которые инструмент distcp использовал для копирования данных между кластерами, что позволило искусственно снизить пропускную способность при копировании и кое-как возобновить миграцию. После завершения копирования нам, наконец, удалось установить новый кластер взамен старого.

В новом кластере было больше узлов, значит, по идее, hadoop должен был работать быстрее.

Мы, кажется, осчастливили команду Data Science.

Но, к сожалению, поскольку кластер hadoop стал крупнее и быстрее, во время работы mapreduce на большее количество узлов стало передаваться большее количество данных, и это привело к непредвиденному эффекту:

Сервис bitly опять сломался, на этот раз очень серьёзно. Команда Ops/Infra была от этого не в восторге.

Первым импульсивным действием было вообще отрубить hadoop.

Но такое решение очень не понравилось команде Data Science.

Отключение кластера hadoop самое плохое из возможных решений (по последствиям может сравниться разве что с поломкой bitly), поэтому мы вернули кластер в состояние 1995 года, заставив все сетевые карты перейти на 100 Мбит/с (с 1 Гбит/с) с помощью команды ethtool -s eth1 speed 100 duplex full autoneg on. Теперь можно было спокойно подключить hadoop, но какой же медленной стала его работа!

Команда Data Science по-прежнему не выказывала признаков восторга.

И действительно, работа кластера была настолько "тормозной", что при вводе данных, выполнении запланированных заданий ETL (извлечения, преобразования и загрузки) и выдаче отчётов стали часто возникать сбои, постоянно срабатывали аварийные сигналы, будившие среди ночи членов команды Ops/Infra.

Надо ли говорить, как это не нравилось команде Ops/Infra!

Поскольку мы были лишены возможности удовлетворительно контролировать состояние сети, работа по поиску и устранению неисправностей вместе с партнёром из центра обработки данных предстояла сложная и длительная. Нужно было что-то сделать, чтобы привести hadoop в пригодное для использования состояние, и одновременно сделать так, чтобы сервис bitly перестал выходить из строя.

Сделаем ещё одно небольшое отступление:

Что нам было доступно в bitly?

  • roles.json : список серверов (app01, app02, userdb01, hadoop01 и т. д.), ролей (userdb, app, web, monitoring, hadoop_node и т.д.), а также сведения об отображении серверов на роли (app01,02 -> app, hadoop01,02 -> hadoop_node и т. д.);

  • $datacenter/jsons/* : каталог, содержащий json-файл для каждого логического сервера с атрибутами, описывающими сервер, IP-адресами, именами, информацией конфигурирования и, что наиболее важно в нашей истории, расположением стоек.;

  • Linux : Linux.

Поскольку мы могли легко определить, какие серверы за что отвечают и где они расположены, мы могли воспользоваться мощными функциями Linux. Проблему, в принципе, можно было решить, и мы приступили к работе.

Но команда Ops/Infra не проявляла признаков радости.

Её не устраивал синтаксис системы контроля сетевого трафика (Traffic Control, tc) в Linux, не говоря уже о совершенно неудобочитаемой документации. После напряжённого периода работы (с многочисленными проклятиями и разбиванием о стену клавиатур) мы смогли, наконец, создать не вызывающие отторжения работающие сценарии в tc. Были открыты ветви, написаны скрипты, выполнены развёртывания, проведены эталонные тестирования, и в результате было создано несколько тестовых узлов с таким кодом:

$ tc class show dev eth1class htb 1:100 root prio 0 rate 204800Kbit ceil 204800Kbit burst 1561b    cburst 1561bclass htb 1:10 root prio 0 rate 819200Kbit ceil 819200Kbit burst 1433b     cburst 1433bclass htb 1:20 root prio 0 rate 204800Kbit ceil 204800Kbit burst 1561b     cburst 1561b$ tc filter show dev eth1filter parent 1: protocol ip pref 49128 u32 filter parent 1: protocol ip pref 49128 u32 fh 818: ht divisor 1 filter parent 1: protocol ip pref 49128 u32 fh 818::800 order 2048 key     ht 818 bkt 0 flowid 1:20     match 7f000001/ffffffff at 16filter parent 1: protocol ip pref 49129 u32 filter parent 1: protocol ip pref 49129 u32 fh 817: ht divisor 1 filter parent 1: protocol ip pref 49129 u32 fh 817::800 order 2048 key     ht 817 bkt 0 flowid 1:10     match 7f000002/ffffffff at 16filter parent 1: protocol ip pref 49130 u32 filter parent 1: protocol ip pref 49130 u32 fh 816: ht divisor 1 filter parent 1: protocol ip pref 49130 u32 fh 816::800 order 2048 key     ht 816 bkt 0 flowid 1:20     match 7f000003/ffffffff at 16<snipped>$ tc qdisc showqdisc mq 0: dev eth2 root qdisc mq 0: dev eth0 root qdisc htb 1: dev eth1 root refcnt 9 r2q 10 default 100     direct_packets_stat 24

Говоря простым языком, есть три класса управления траффиком. Каждый класс это логическая группа, на которую может быть подписан фильтр, например:

class htb 1:100 root prio 0 rate 204800Kbit ceil 204800Kbit burst 1561b cburst 1561b

Каждый класс это потолок пропускной способности исходящего трафика, агрегированного по всем подписанным на этот класс фильтрам.

Каждый фильтр это конкретное правило для конкретного IP (к сожалению, каждый IP выводится в шестнадцатеричном формате), поэтому фильтр:

filter parent 1: protocol ip pref 49128 u32 filter parent 1: protocol ip pref 49128 u32 fh 818: ht divisor 1 filter parent 1: protocol ip pref 49128 u32 fh 818::800 order 2048 key     ht 818 bkt 0 flowid 1:20     match 7f000001/ffffffff at 16

можно интерпретировать как "subscribe hadoop14 to the class 1:20", где "7f000001" можно интерпретировать как IP для hadoop14, а "flowid 1:20" класс для подписки. Затем запускаем команду qdisc, формирующую более или менее активную очередь для устройства eth1. Данная очередь по умолчанию помещает любой хост, не определённый в фильтре класса, в класс 1:100:

qdisc htb 1: dev eth1 root refcnt 9 r2q 10 default 100 direct_packets_stat 24

В такой конфигурации любой хост (hadoop или другой), находящийся в одной стойке с конфигурируемым хостом, получает фильтр, назначенный классу "1:10", разрешающий скорость передачи до ~800 Мбит/с для класса в целом. Аналогичным образом для предопределённого списка ролей, считающихся "ролями приоритетных узлов", создаётся фильтр по тому же правилу "1:100". Такие узлы выполняют довольно важные задачи, например запускают сервисы hadoop namenode или jobtracker, а также наши узлы мониторинга. Любой другой хост hadoop, не находящийся в той же стойке, подключается к классу "1:20", ограниченному более консервативным классом ~200 Мбит/с.

Как было сказано выше, любой хост, не определённый в фильтре, попадает в класс по умолчанию для eth1 qdisc, то есть в класс "1:100". Как это выглядит на практике? Вот хост, подпадающий под действие правила "1:100":

[root@hadoop27 ~]# iperf -t 30 -c NONHADOOPHOST------------------------------------------------------------Client connecting to NONHADOOPHOST, TCP port 5001TCP window size: 23.2 KByte (default)------------------------------------------------------------[  3] local hadoop27 port 35897 connected with NONHADOOPHOST port 5001[ ID] Interval       Transfer     Bandwidth[  3]  0.0-30.1 sec   735 MBytes   205 Mbits/sec

Теперь при подключении к другому хосту, расположенному в той же стойке или подпадающему под правило "1:10":

[root@hadoop27 ~]# iperf -t 30 -c CABINETPEER------------------------------------------------------------Client connecting to CABINETPEER, TCP port 5001TCP window size: 23.2 KByte (default)------------------------------------------------------------[  3] local hadoop27 port 39016 connected with CABINETPEER port 5001[ ID] Interval       Transfer     Bandwidth[  3]  0.0-30.0 sec  2.86 GBytes   820 Mbits/sec

Что произойдёт при подключении к двум серверам, подпадающим под правило "1:10"?

[root@hadoop27 ~]# iperf -t 30 -c CABINETPEER1------------------------------------------------------------Client connecting to CABINETPEER1, TCP port 5001TCP window size: 23.2 KByte (default)------------------------------------------------------------[  3] local hadoop27 port 39648 connected with CABINETPEER1 port 5001[ ID] Interval       Transfer     Bandwidth[  3]  0.0-30.0 sec  1.47 GBytes   421 Mbits/sec[root@hadoop27 ~]# iperf -t 30 -c CABINETPEER2------------------------------------------------------------Client connecting to 10.241.28.160, TCP port 5001TCP window size: 23.2 KByte (default)------------------------------------------------------------[  3] local hadoop27 port 38218 connected with CABINETPEER2 port 5001[ ID] Interval       Transfer     Bandwidth[  3]  0.0-30.0 sec  1.43 GBytes   408 Mbits/sec

Трафик уменьшится вдвое? Похоже на правду. Даже лучше стало относительно проще отслеживать тренды данных, анализируя статистические данные, выводимые на наши сервисы трендов:

$ /sbin/tc -s class show dev eth1 classid 1:100class htb 1:100 root prio 0 rate 204800Kbit ceil 204800Kbit     burst 1561b cburst 1561b Sent 5876292240 bytes 41184081 pkt (dropped 0, overlimits 0 requeues 0) rate 3456bit 2pps backlog 0b 0p requeues 0 lended: 40130273 borrowed: 0 giants: 0tokens: 906 ctokens: 906

После тестирования мы проверили хосты hadoop, подняв их скорости до первоначальных 1Gb после применения ролей traffic control. После всех описанных действий кластер hadoop вновь обрёл достаточно высокую производительность.

Мы осчастливили команду Data Science.

Команда Ops/Infra смогла приступить к устранению неполадок и поиску решений, при этом спокойно спать по ночам, зная, что сервис bitly будет вести себя нормально.

Мы осчастливили и команду Ops/Infra.

Выводы:

  • Попав в трудное положение, помните: ваш набор инструментов для управления средой так же важен, как и сама среда. Поскольку у нас уже имелся набор инструментов для комплексного контроля среды, мы сумели выбраться из ямы почти так же быстро, как попали в неё.

  • Не загоняйте себя в трудные ситуации: изучите все нюансы среды, в которой будете работать. В данном случае нам следовало лучше понять и оценить масштабы миграции hadoop и её возможные последствия.

  • Linux TC это дорогой инструмент, но и отдача от него очень большая. Этот инструмент почти наверняка создавали бородатые люди с самыми мохнатыми свитерами на свете, и для его применения требуются время и терпение. В любом случае это невероятно мощный инструмент он помог нам исправить наши собственные ошибки.

  • Linux: Linux

И последнее

Эта история хорошая иллюстрация "закона Мерфи для девопсов":

Закон Мёрфи для девопсов: "Если что-то может пойти не так, значит, что-то уже идёт не так, просто Nagios ещё не предупредил".

Временные решения, подобные тем, о которых идёт речь в этой статье, позволили нам выиграть время для поиска и устранения неисправностей в сети. Мы разогнали кластер hadoop, перенесли его в собственную выделенную сеть, обошли проблему некачественного сетевого оборудования, укрепив тем самым основную сеть, и сделали много чего ещё. Оставайтесь с нами.

Эта статья подчёркивает, что правильная, эффективная организация инфраструктуры не менее важна, чем данные и программное обеспечение. По мере развития информационных технологий, увеличения нагрузок, специалисты DevOps, возможно, станут называться иначе, а область станет ещё более самостоятельной. Если она вам интересна, вы можете обратить внимание на наш курс о DevOps, после которого будете иметь полное представление об этой профессии и сможете работать в DevOps.

Узнайте, как прокачаться и в других специальностях или освоить их с нуля:

Другие профессии и курсы
Подробнее..

Apache Software Foundation опубликовала релиз платформы Apache Hadoop 3.3.0

03.08.2020 18:13:27 | Автор: admin


Apache Software Foundation выпустила свежий релиз своей платформы Apache Hadoop 3.3.0. С момента последнего обновления прошло полтора года. Сама платформа представляет собой инструмент для организации распределенной обработки больших объемов данных с использованием MapReduce. Hadoop включает в себя набор утилит, библиотек и фреймворков для разработки и выполнения распределенных программ, которые способны работать на кластерах из тысяч узлов.

Для Hadoop создана специализированная файловая система Hadoop Distributed File System (HDFS), которая обеспечивает резервирование данных и оптимизацию работы MapReduce-приложений. HDFS предназначена для хранения файлов больших размеров, распределенных между отдельными узлами вычислительного кластера. Благодаря своим возможностям Hadoop используется крупнейшими компаниями и организациями. Google даже предоставила Hadoop право на использование технологий, которые затрагивают патенты, связанные с методом MapReduce.

В общем, встречаем Apache Hadoop 3.3.0.



Вот список самых важных изменений в новой версии:
  • Поддержка платформ на основе ARM-архитектуры (кстати, у Selectel есть ARM-серверы; вот ссылка, если захотите попробовать).
  • Версия формата Protobuf (Protocol buffers) обновлена до 3.7.1. Protobuf используется для сериализации структурированных данных.
  • Для коннектора S3A добавлена функция Delegation Token (аутентификация), улучшена поддержка кэширования ответов с кодом 404, плюс увеличена производительность S3guard и общая надежность работы.
  • Разработчики заявили о решении проблем с автоматическим тюнингом в файловой системе ABFS.
  • Добавлена поддержка Java 11.
  • Появилась поддержка файловой системы Tencent Cloud COS, что необходимо для доступа к объектному хранилищу COS.
  • Добавлен сервис DNS Resolution, что дает возможность клиентам определять серверы через DNS по именам узлов. Соответственно, в настройках нет необходимости добавлять все хосты.
  • Появился каталог приложений YARN (Yet Another Resource Negotiator) с возможностью поиска.
  • Добавлена поддержка планирования запуска OPPORTUNISTIC-контейнеров через Resource Manager.

Благодаря тому, что Hadoop активно развивается, рынок решений на его основе быстро растет. Если в 2019 году объем рынка составлял около $1,7 млрд, то, по прогнозам экспертов, к 2024 году он достигнет $9,4 млрд.

Сейчас Hadoop занимает первое место среди репозиториев Apache по числу вносимых изменений. Размер кодовой базы платформы составляет около 4 млн строк. Наиболее крупные хранилища Netflix, Twitter, Facebook.
Подробнее..

Как увеличить скорость чтения из HBase до 3 раз и с HDFS до 5 раз

13.10.2020 02:20:47 | Автор: admin
Высокая производительность одно из ключевых требований при работе с большими данными. Мы в управлении загрузки данных в Сбере занимаемся прокачкой практически всех транзакций в наше Облако Данных на базе Hadoop и поэтому имеем дело с действительно большими потоками информации. Естественно, что мы все время ищем способы повысить производительность, и теперь хотим рассказать, как удалось пропатчить RegionServer HBase и HDFS-клиент, благодаря чему удалось значительно увеличить скорость операции чтения.


Однако, прежде чем перейти к сути доработок, стоит проговорить про ограничения, которые в принципе невозможно обойти, если сидеть на HDD.

Почему HDD и быстрые Random Access чтения несовместимы
Как известно, HBase, да и многие другие БД, хранят данные блоками, размером в несколько десятков килобайт. По умолчанию это порядка 64 Кб. Теперь представим себе, что нам нужно достать всего 100 байт и мы просим HBase выдать нам эти данные по некоему ключу. Так как размер блока в HFiles равен 64 Кб то запрошено будет в 640 раз больше (на минуточку!) чем нужно.

Далее, так как запрос пойдет через HDFS и его механизм кэширования метаданных ShortCircuitCache (который позволяет осуществлять прямой доступ к файлам), то это приводит к чтению уже 1 Мб с диска. Впрочем это можно регулировать параметром dfs.client.read.shortcircuit.buffer.size и во многих случаях имеет смысл уменьшать это значение, например до 126 Кб.

Допустим мы сделаем это, но кроме того, когда мы начнем читать данные через java api, таким функциями как FileChannel.read и просим операционную систему прочитать указанный объем данных, она вычитывает на всякий случай в 2 раза больше, т.е. в 256 Кб в нашем случае. Это происходит потому, что в java нет простой возможности выставить флаг FADV_RANDOM, предотвращающий такое поведение.

В итоге, чтобы получить наши 100 байт, под капотом вычитывается в 2600 раз больше. Казалось бы выход очевиден, давайте уменьшим размер блока до килобайта, выставим упомянутый флаг и обретем великое просветление ускорение. Но беда в том, что уменьшая размер блока в 2 раза, мы уменьшаем и количество вычитанных байт в единицу времени так же в 2 раза.

Некоторый выигрыш от выставления флага FADV_RANDOM можно получить, но только при большой многопоточности и при размере размер блока от 128 Кб, но это максимум пара десятков процентов:

Тесты проводились на 100 файлах, каждый размером в 1 Гб и размещенных на 10 дисках HDD.

Давайте посчитаем, на что мы с такой скоростью можем в принципе рассчитывать:
Допустим мы читаем с 10 дисков со скоростью 280 МБ/сек, т.е. 3 миллиона раз по 100 байт. Но как мы помним, нужны нам данные встречаются в 2600 раз меньше, чем прочитано. Таким образом 3 млн. делим на 2600 и получаем 1100 записей в секунду.

Удручающе, не так ли? Такова природа Random Access доступа к данным на HDD вне зависимости от размера блока. Это физический предел случайного доступа и большего в таких условиях не сможет выжать ни одна БД.

Как же тогда базам получается достигать гораздо более высокую скорость? Чтобы ответить на этот вопрос давайте посмотрим, что происходит на следующей картинке:


Тут мы видим, что первые несколько минут скорость действительно порядка тысячи записей в секунду. Однако далее, благодаря тому, что вычитывается гораздо больше чем было запрошено, то данные оседают в buff/cache операционной системы (linux) и скорость растет до более приличных 60 тыс. в секунду


Таким образом далее мы будем разбираться с ускорением доступа только к тем данным, которые есть в кэше ОС или находятся в сравнимых по скорости доступа хранилищ типа SSD/NVMe.

В нашем случае мы будем проводить тесты на стенде из 4х серверов, каждый из которых заряжен следующим образом:
CPU: Xeon E5-2680 v4 @ 2.40GHz 64 threads.
Память: 730 Гб.
java version: 1.8.0_111


И тут собственно ключевой момент объем данных в таблицах, которые требуется вычитывать. Дело в том, что если читать данные из таблицы, которая целиком помещаются в кэш HBase, то до чтения из buff/cache операционки дело даже не дойдет. Потому что HBase по умолчанию выделяет 40% памяти под структуру которая называется BlockCache. По сути это ConcurrentHashMap, где ключ это имя файла+offset блока, а value собственно данные по этому смещению.

Таким образом, когда чтение идет только из этой структуры, мы видим великолепную скорость, вроде миллиона запросов в секунду. Но давайте представим себе, что мы не можем отдавать сотни гигабайт памяти только под нужды БД, потому что на этих серверах крутится много чего еще полезного.

Например в нашем случае объем BlockCache на одном RS это порядка 12 Гб. Мы высадили два RS на одну ноду, т.е. под BlockCache выделено 96 Гб на всех нодах. А данных при этом во много раз больше, например пусть это будет 4 таблицы, по 130 регионов, в которых файлы размером по 800 Мб, пожатые FAST_DIFF, т.е. в сумме 410 Гб (это чистые данные, т.е. без учета фактора репликации).

Таким образом, BlockCache составляет лишь около 23% от общего объема данных и это гораздо ближе к реальным условиям того, что называется BigData. И вот тут начинается самое интересное ведь очевидно, чем меньше попаданий в кэш, тем хуже производительность. Ведь в случае промаха придется выполнить кучу работы т.е. спуститься до вызова системных функций. Однако этого не избежать и поэтому давайте рассмотрим совсем другой аспект а что происходит с данными внутри кэша?

Упростим ситуацию и допустим, что у нас есть кэш в который помещается только 1 объект. Вот пример того что произойдет при попытке работы с объемом данных в 3 раза больше чем кэш, нам придется:
1. Поместить блок 1 в кэш
2. Удалить блок 1 из кэша
3. Поместить блок 2 в кэш
4. Удалить блок 2 из кэша
5. Поместить блок 3 в кэш


Проделано 5 действий! Однако нормальной этой ситуацию называть никак нельзя, по сути мы заставляем HBase проделывать кучу совершенно бесполезной работы. Он постоянно вычитывает данные из кэша ОС, помещает его себе в BlockCache, для того чтобы почти тут же выкинуть его, потому что приехала новая порция данных. Анимация в начале поста показывает суть проблемы Garbage Collector зашкаливает, атмосфера греется, маленькая Грета в далекой и жаркой Швеции расстраивается. А мы айтишники очень не любим, когда грустят дети, поэтому начинаем думать, что с этим можно поделать.

А что если помещать в кэш не все блоки, а только определенный процент из них, так чтобы кэш не переполнялся? Давайте для начала просто добавим всего несколько строк кода в начало функции помещения данных в BlockCache:

  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {    if (cacheDataBlockPercent != 100 && buf.getBlockType().isData()) {      if (cacheKey.getOffset() % 100 >= cacheDataBlockPercent) {        return;      }    }...


Смысл тут в следующем, оффсет это положение блока в файле и последние цифры его случайно и равномерно распределены от 00 до 99. Поэтому мы будем пропускать только те, которые попадают в нужный нам диапазон.

Например выставим cacheDataBlockPercent = 20 и посмотрим что будет:


Результат налицо. На графиках ниже становится понятно, за счет чего произошло такое ускорение мы экономим кучу ресурсов GC не занимаясь сизифовым трудом размещения данных к кэше лишь для того, чтобы тут же выбросить их в марсианским псам под хвост:


Утилизация CPU при этом растет, однако сильно меньше чем производительность:


Тут еще стоит отметить, что блоки которые хранятся в BlockCache бывают разные. Большая часть, порядка 95% это собственно данные. А остальное это метаданных, типа Bloom фильтров или LEAF_INDEX и т.д.. Этих данных мало, но они очень полезные, так как прежде чем обратиться непосредственно к данным, HBase обращается к мете, чтобы понять нужно ли искать тут дальше и если да, то где именно находится интересующий его блок.

Поэтому в коде мы видим условие проверку buf.getBlockType().isData() и благодаря этому мету мы будем оставлять в кэше в любом случае.

Теперь давайте увеличим нагрузку и за одно слегка затюним фичу. В первом тесте мы сделали процент отсечения = 20 и BlockCache был немного недозагружен. Теперь поставим 23% и будем добавлять по 100 потоков каждые 5 минут, чтобы увидеть, в какой момент происходит насыщение:


Тут мы видим, что исходная версия практически сразу упирается в потолок на уровне около 100 тыс запросов в секунду. Тогда как патч дает ускорение до 300 тысяч. При этом понятно, что дальнейшее ускорение уже не такое бесплатное, утилизация CPU при этом тоже растет.

Однако это не очень изящное решение, так как мы заранее не знаем, какой процентов блоков нужно кешировать, это зависит от профиля нагрузки. Поэтому был реализован механизм автоматической подстройки этого параметра в зависимости от активности операций чтения.

Для управления этим было добавлено три параметра:

hbase.lru.cache.heavy.eviction.count.limit устанавливает, сколько раз должен запуститься процесс выселения данных из кеша, прежде чем мы начнем использовать оптимизацию (т.е. пропускать блоки). По умолчанию оно равно MAX_INT = 2147483647 и фактически означает, что фича никогда не начнет работать при таком значении. Потому что процесс выселения запускается каждые 5 10 секунд (это зависит от нагрузки) и 2147483647 * 10 / 60 / 60 / 24 / 365 = 680 лет. Однако мы можем установить этот параметр равным 0 и заставить фичу работать сразу же после старта.

Однако есть и полезная нагрузка в этом параметре. Если у нас характер нагрузки такой, что постоянно перемежаются краткосрочные чтения (допустим днем) и долгосрочные (по ночам), то мы можем сделать так, что фича будет включаться только когда идут продолжительные операции чтения.

Например мы знаем, что краткосрочные чтения длятся обычно около 1 минуты. На не надо начинать выкидывать блоки, кеш не успеет устареть и тогда мы можем установить этот параметр равным например 10. Это приведет к тому, что оптимизация начнет работать только когда началось длительное активное чтение, т.е. через 100 секунд. Таким образом если мы имеем краткосрочное чтение, то все блоки попадут в кеш и будут доступны (за исключением тех что будут выселены стандартным алгоритмом). А когда мы делаем долгосрочные чтения, фича включается и бы имеем намного более высокую производительность.

hbase.lru.cache.heavy.eviction.mb.size.limit устанавливает, как много мегабайт нам хотелось бы помещать в кеш (и естественно выселять) за 10 секунд. Фича будет пытаться достигнуть этого значения и поддерживать его. Смысл в следующем, если мы пихаем в кеш гигабайты, то и выселять придется гигабайты, а это, как мы видели выше, весьма накладно. Однако не нужно пытаться выставить его слишком маленьким, так как это приведет к преждевременному выходу из режима пропуска блоков. Для мощных серверов (порядка 20-40 физических ядер) оптимально выставлять около 300-400 МБ. Для среднего класса (~10 ядер) 200-300 МБ. Для слабых систем (2-5 ядра) может быть нормально 50-100 МБ (на таких не тестировалось).

Рассмотрим, как это работает: допустим мы выставили hbase.lru.cache.heavy.eviction.mb.size.limit = 500, идет какая-то нагрузка (чтения) и тогда каждые ~10 секунд мы вычисляем, сколько байт было выселено по формуле:

Overhead = Freed Bytes Sum (MB) * 100 / Limit (MB) 100;

Если по факту было выселено 2000 MB, то Overhead получается равным:

2000 * 100 / 500 100 = 300%

Алгоритмы же стараются поддерживать не больше чем несколько десятков процентов, так что фича будет уменьшать процент кешируемых блоков, тем самым реализуя механизм авто-тюнинга.

Однако если нагрузка упала, допустим выселено всего 200 МБ и Overhead стал отрицательным (так называемый overshooting):

200 * 100 / 500 100 = -60%

То фича наоборот, будет увеличивать процент кешируемых блоков до тех пор, пока Overhead не станет положительным.

Ниже будет пример как это выглядит на реальных данных. Не нужно пытаться достигнуть 0%, это невозможно. Весьма хорошо когда когда около 30 100%, это помогает избежать преждевременного выхода из режима оптимизации при краткосрочных всплесках.

hbase.lru.cache.heavy.eviction.overhead.coefficient устанавливает, как быстро мы хотели бы получить результат. Если мы твердо знаем, что наши чтения в основном длительные и не хотим ждать, мы можем увеличить этот коэффициент и получить высокую производительность быстрее.

Например, мы установили этот коэффициент = 0.01. Это означает что Overhead (см. выше) будет умножен на это число на на полученный результат и будет уменьшен процент кешируемых блоков. Допустим, что Overhead = 300%, а коэффициент = 0.01, то процент кешируемых блоков будет уменьшен на 3%.

Подобная логика Backpressure реализована и для отрицательных значений Overhead (overshooting). Так как всегда возможны краткосрочные колебания объема чтений-выселений, то этот механизм позволяет избегать преждевременный выход из режима оптимизации. Backpressure имеет перевернутую логику: чем сильнее overshooting, тем тем больше кешируется блоков.



Код реализации
        LruBlockCache cache = this.cache.get();        if (cache == null) {          break;        }        freedSumMb += cache.evict()/1024/1024;        /*        * Sometimes we are reading more data than can fit into BlockCache        * and it is the cause a high rate of evictions.        * This in turn leads to heavy Garbage Collector works.        * So a lot of blocks put into BlockCache but never read,        * but spending a lot of CPU resources.        * Here we will analyze how many bytes were freed and decide        * decide whether the time has come to reduce amount of caching blocks.        * It help avoid put too many blocks into BlockCache        * when evict() works very active and save CPU for other jobs.        * More delails: https://issues.apache.org/jira/browse/HBASE-23887        */        // First of all we have to control how much time        // has passed since previuos evict() was launched        // This is should be almost the same time (+/- 10s)        // because we get comparable volumes of freed bytes each time.        // 10s because this is default period to run evict() (see above this.wait)        long stopTime = System.currentTimeMillis();        if ((stopTime - startTime) > 1000 * 10 - 1) {          // Here we have to calc what situation we have got.          // We have the limit "hbase.lru.cache.heavy.eviction.bytes.size.limit"          // and can calculte overhead on it.          // We will use this information to decide,          // how to change percent of caching blocks.          freedDataOverheadPercent =            (int) (freedSumMb * 100 / cache.heavyEvictionMbSizeLimit) - 100;          if (freedSumMb > cache.heavyEvictionMbSizeLimit) {            // Now we are in the situation when we are above the limit            // But maybe we are going to ignore it because it will end quite soon            heavyEvictionCount++;            if (heavyEvictionCount > cache.heavyEvictionCountLimit) {              // It is going for a long time and we have to reduce of caching              // blocks now. So we calculate here how many blocks we want to skip.              // It depends on:             // 1. Overhead - if overhead is big we could more aggressive              // reducing amount of caching blocks.              // 2. How fast we want to get the result. If we know that our              // heavy reading for a long time, we don't want to wait and can              // increase the coefficient and get good performance quite soon.              // But if we don't sure we can do it slowly and it could prevent              // premature exit from this mode. So, when the coefficient is              // higher we can get better performance when heavy reading is stable.              // But when reading is changing we can adjust to it and set              // the coefficient to lower value.              int change =                (int) (freedDataOverheadPercent * cache.heavyEvictionOverheadCoefficient);              // But practice shows that 15% of reducing is quite enough.              // We are not greedy (it could lead to premature exit).              change = Math.min(15, change);              change = Math.max(0, change); // I think it will never happen but check for sure              // So this is the key point, here we are reducing % of caching blocks              cache.cacheDataBlockPercent -= change;              // If we go down too deep we have to stop here, 1% any way should be.              cache.cacheDataBlockPercent = Math.max(1, cache.cacheDataBlockPercent);            }          } else {            // Well, we have got overshooting.            // Mayby it is just short-term fluctuation and we can stay in this mode.            // It help avoid permature exit during short-term fluctuation.            // If overshooting less than 90%, we will try to increase the percent of            // caching blocks and hope it is enough.            if (freedSumMb >= cache.heavyEvictionMbSizeLimit * 0.1) {              // Simple logic: more overshooting - more caching blocks (backpressure)              int change = (int) (-freedDataOverheadPercent * 0.1 + 1);              cache.cacheDataBlockPercent += change;              // But it can't be more then 100%, so check it.              cache.cacheDataBlockPercent = Math.min(100, cache.cacheDataBlockPercent);            } else {              // Looks like heavy reading is over.              // Just exit form this mode.              heavyEvictionCount = 0;              cache.cacheDataBlockPercent = 100;            }          }          LOG.info("BlockCache evicted (MB): {}, overhead (%): {}, " +            "heavy eviction counter: {}, " +            "current caching DataBlock (%): {}",            freedSumMb, freedDataOverheadPercent,            heavyEvictionCount, cache.cacheDataBlockPercent);          freedSumMb = 0;          startTime = stopTime;       }



Рассмотрим теперь все это на реальном примере. Имеем следующий тестовый сценарий:

1. Начинаем делать Scan (25 threads, batch = 100)

2. Через 5 минут добавляем multi-gets (25 threads, batch = 100)

3. Через 5 минут выключаем multi-gets (остается опять только scan)

Делаем два прогона, сначала hbase.lru.cache.heavy.eviction.count.limit = 10000 (что фактически выключает фичу), а затем ставим limit = 0 (включает).

В логах ниже мы видим, как включается фича, сбрасывает Overshooting до 14-71%. Время от времени нагрузка снижается, что включает Backpressure и HBase вновь кеширует больше блоков.

Лог RegionServer
evicted (MB): 0, ratio 0.0, overhead (%): -100, heavy eviction counter: 0, current caching DataBlock (%): 100
evicted (MB): 0, ratio 0.0, overhead (%): -100, heavy eviction counter: 0, current caching DataBlock (%): 100
evicted (MB): 2170, ratio 1.09, overhead (%): 985, heavy eviction counter: 1, current caching DataBlock (%): 91 < start
evicted (MB): 3763, ratio 1.08, overhead (%): 1781, heavy eviction counter: 2, current caching DataBlock (%): 76
evicted (MB): 3306, ratio 1.07, overhead (%): 1553, heavy eviction counter: 3, current caching DataBlock (%): 61
evicted (MB): 2508, ratio 1.06, overhead (%): 1154, heavy eviction counter: 4, current caching DataBlock (%): 50
evicted (MB): 1824, ratio 1.04, overhead (%): 812, heavy eviction counter: 5, current caching DataBlock (%): 42
evicted (MB): 1482, ratio 1.03, overhead (%): 641, heavy eviction counter: 6, current caching DataBlock (%): 36
evicted (MB): 1140, ratio 1.01, overhead (%): 470, heavy eviction counter: 7, current caching DataBlock (%): 32
evicted (MB): 913, ratio 1.0, overhead (%): 356, heavy eviction counter: 8, current caching DataBlock (%): 29
evicted (MB): 912, ratio 0.89, overhead (%): 356, heavy eviction counter: 9, current caching DataBlock (%): 26
evicted (MB): 684, ratio 0.76, overhead (%): 242, heavy eviction counter: 10, current caching DataBlock (%): 24
evicted (MB): 684, ratio 0.61, overhead (%): 242, heavy eviction counter: 11, current caching DataBlock (%): 22
evicted (MB): 456, ratio 0.51, overhead (%): 128, heavy eviction counter: 12, current caching DataBlock (%): 21
evicted (MB): 456, ratio 0.42, overhead (%): 128, heavy eviction counter: 13, current caching DataBlock (%): 20
evicted (MB): 456, ratio 0.33, overhead (%): 128, heavy eviction counter: 14, current caching DataBlock (%): 19
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 15, current caching DataBlock (%): 19
evicted (MB): 342, ratio 0.32, overhead (%): 71, heavy eviction counter: 16, current caching DataBlock (%): 19
evicted (MB): 342, ratio 0.31, overhead (%): 71, heavy eviction counter: 17, current caching DataBlock (%): 19
evicted (MB): 228, ratio 0.3, overhead (%): 14, heavy eviction counter: 18, current caching DataBlock (%): 19
evicted (MB): 228, ratio 0.29, overhead (%): 14, heavy eviction counter: 19, current caching DataBlock (%): 19
evicted (MB): 228, ratio 0.27, overhead (%): 14, heavy eviction counter: 20, current caching DataBlock (%): 19
evicted (MB): 228, ratio 0.25, overhead (%): 14, heavy eviction counter: 21, current caching DataBlock (%): 19
evicted (MB): 228, ratio 0.24, overhead (%): 14, heavy eviction counter: 22, current caching DataBlock (%): 19
evicted (MB): 228, ratio 0.22, overhead (%): 14, heavy eviction counter: 23, current caching DataBlock (%): 19
evicted (MB): 228, ratio 0.21, overhead (%): 14, heavy eviction counter: 24, current caching DataBlock (%): 19
evicted (MB): 228, ratio 0.2, overhead (%): 14, heavy eviction counter: 25, current caching DataBlock (%): 19
evicted (MB): 228, ratio 0.17, overhead (%): 14, heavy eviction counter: 26, current caching DataBlock (%): 19
evicted (MB): 456, ratio 0.17, overhead (%): 128, heavy eviction counter: 27, current caching DataBlock (%): 18 < added gets (but table the same)
evicted (MB): 456, ratio 0.15, overhead (%): 128, heavy eviction counter: 28, current caching DataBlock (%): 17
evicted (MB): 342, ratio 0.13, overhead (%): 71, heavy eviction counter: 29, current caching DataBlock (%): 17
evicted (MB): 342, ratio 0.11, overhead (%): 71, heavy eviction counter: 30, current caching DataBlock (%): 17
evicted (MB): 342, ratio 0.09, overhead (%): 71, heavy eviction counter: 31, current caching DataBlock (%): 17
evicted (MB): 228, ratio 0.08, overhead (%): 14, heavy eviction counter: 32, current caching DataBlock (%): 17
evicted (MB): 228, ratio 0.07, overhead (%): 14, heavy eviction counter: 33, current caching DataBlock (%): 17
evicted (MB): 228, ratio 0.06, overhead (%): 14, heavy eviction counter: 34, current caching DataBlock (%): 17
evicted (MB): 228, ratio 0.05, overhead (%): 14, heavy eviction counter: 35, current caching DataBlock (%): 17
evicted (MB): 228, ratio 0.05, overhead (%): 14, heavy eviction counter: 36, current caching DataBlock (%): 17
evicted (MB): 228, ratio 0.04, overhead (%): 14, heavy eviction counter: 37, current caching DataBlock (%): 17
evicted (MB): 109, ratio 0.04, overhead (%): -46, heavy eviction counter: 37, current caching DataBlock (%): 22 < back pressure
evicted (MB): 798, ratio 0.24, overhead (%): 299, heavy eviction counter: 38, current caching DataBlock (%): 20
evicted (MB): 798, ratio 0.29, overhead (%): 299, heavy eviction counter: 39, current caching DataBlock (%): 18
evicted (MB): 570, ratio 0.27, overhead (%): 185, heavy eviction counter: 40, current caching DataBlock (%): 17
evicted (MB): 456, ratio 0.22, overhead (%): 128, heavy eviction counter: 41, current caching DataBlock (%): 16
evicted (MB): 342, ratio 0.16, overhead (%): 71, heavy eviction counter: 42, current caching DataBlock (%): 16
evicted (MB): 342, ratio 0.11, overhead (%): 71, heavy eviction counter: 43, current caching DataBlock (%): 16
evicted (MB): 228, ratio 0.09, overhead (%): 14, heavy eviction counter: 44, current caching DataBlock (%): 16
evicted (MB): 228, ratio 0.07, overhead (%): 14, heavy eviction counter: 45, current caching DataBlock (%): 16
evicted (MB): 228, ratio 0.05, overhead (%): 14, heavy eviction counter: 46, current caching DataBlock (%): 16
evicted (MB): 222, ratio 0.04, overhead (%): 11, heavy eviction counter: 47, current caching DataBlock (%): 16
evicted (MB): 104, ratio 0.03, overhead (%): -48, heavy eviction counter: 47, current caching DataBlock (%): 21 < interrupt gets
evicted (MB): 684, ratio 0.2, overhead (%): 242, heavy eviction counter: 48, current caching DataBlock (%): 19
evicted (MB): 570, ratio 0.23, overhead (%): 185, heavy eviction counter: 49, current caching DataBlock (%): 18
evicted (MB): 342, ratio 0.22, overhead (%): 71, heavy eviction counter: 50, current caching DataBlock (%): 18
evicted (MB): 228, ratio 0.21, overhead (%): 14, heavy eviction counter: 51, current caching DataBlock (%): 18
evicted (MB): 228, ratio 0.2, overhead (%): 14, heavy eviction counter: 52, current caching DataBlock (%): 18
evicted (MB): 228, ratio 0.18, overhead (%): 14, heavy eviction counter: 53, current caching DataBlock (%): 18
evicted (MB): 228, ratio 0.16, overhead (%): 14, heavy eviction counter: 54, current caching DataBlock (%): 18
evicted (MB): 228, ratio 0.14, overhead (%): 14, heavy eviction counter: 55, current caching DataBlock (%): 18
evicted (MB): 112, ratio 0.14, overhead (%): -44, heavy eviction counter: 55, current caching DataBlock (%): 23 < back pressure
evicted (MB): 456, ratio 0.26, overhead (%): 128, heavy eviction counter: 56, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.31, overhead (%): 71, heavy eviction counter: 57, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 58, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 59, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 60, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 61, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 62, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 63, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.32, overhead (%): 71, heavy eviction counter: 64, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 65, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 66, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.32, overhead (%): 71, heavy eviction counter: 67, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 68, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.32, overhead (%): 71, heavy eviction counter: 69, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.32, overhead (%): 71, heavy eviction counter: 70, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 71, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 72, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 73, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 74, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 75, current caching DataBlock (%): 22
evicted (MB): 342, ratio 0.33, overhead (%): 71, heavy eviction counter: 76, current caching DataBlock (%): 22
evicted (MB): 21, ratio 0.33, overhead (%): -90, heavy eviction counter: 76, current caching DataBlock (%): 32
evicted (MB): 0, ratio 0.0, overhead (%): -100, heavy eviction counter: 0, current caching DataBlock (%): 100
evicted (MB): 0, ratio 0.0, overhead (%): -100, heavy eviction counter: 0, current caching DataBlock (%): 100


Сканы нужны были для того, чтобы показать этот же процесс в виде графика соотношения между двумя разделами кеша single (куда попадают блоки которые еще никто ни разу не запрашивал) и multi (тут хранятся востребованные хотя бы раз данные):



Ну и наконец как выглядит работа параметров в виде графика. Для сравнения кеш был совсем выключен в начале, затем был запуск HBase с кешированием и отсрочкой начала работы оптимизации на 5 минут (30 циклов выселения)

Полный код можно найти в Pull Request HBASE 23887 на github.

Однако 300 тыс. чтений в секунду это не все, что можно выжать на данном железе в этих условиях. Дело в том, что когда нужно обратиться к данным через HDFS, то используется механизм ShortCircuitCache (далее SSC), который позволяет получить доступ к данным напрямую, избегая сетевых взаимодействий.

Профилировка показала, что этот механизм хоть и дает большой выигрыш, но сам также в какой-то момент становится узким горлышком, потому что практически все тяжелые операции происходят внутри lock, что приводит к блокировкам большую часть времени.


Осознав это мы поняли, что проблему можно обойти, если создать массив независимых SSC:
private final ShortCircuitCache[] shortCircuitCache;...shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];for (int i = 0; i < this.clientShortCircuitNum; i++)  this.shortCircuitCache[i] = new ShortCircuitCache();


И далее работать с ними, исключая пересечения так же по последней цифре оффсета:
public ShortCircuitCache getShortCircuitCache(long idx) {    return shortCircuitCache[(int) (idx % clientShortCircuitNum)];}


Теперь можно приступать к тестам. Для этого будем читать файлы из HDFS простым многопоточным приложением. Выставляем параметры:

conf.set("dfs.client.read.shortcircuit", "true");conf.set("dfs.client.read.shortcircuit.buffer.size", "65536"); // по дефолту = 1 МБ и это сильно замедляет чтение, поэтому лучше привести в соответствие к реальным нуждамconf.set("dfs.client.short.circuit.num", num); // от 1 до 10


И просто читаем файлы:
FSDataInputStream in = fileSystem.open(path);for (int i = 0; i < count; i++) {    position += 65536;    if (position > 900000000)        position = 0L;    int res = in.read(position, byteBuffer, 0, 65536);}


Этот код выполняется в отдельных потоках и мы будем наращивать количество одновременно читаемых файлов (от 10 до 200 горизонтальная ось) и количество кэшей (от 1 до 10 графики). Вертикальная оси показывает ускорение которое дает увеличение SSC относительно случая когда кеш только один.


Как читать график: время выполнения 100 тысяч чтений блоками по 64 КБ с одним кэшом требует 78 секунд. Тогда как с 5 кэшами это выполняется за 16 секунд. Т.е. имеет место ускорение ~5 раз. Как видно из графика, на маленьком числе параллельных чтений эффект не очень заметный, это начинает играть заметную роль когда чтения потоков больше 50. Также заметно, что увеличение количества SSC от 6 и выше дает существенно меньше прироста производительности.

Примечание 1: так как результаты тестирования достаточно волатильны (см. ниже), было осуществлено 3 запуска и полученные значения были усреднены.
Примечание 2: Прирост производительности от настройки для случайного доступа такой же, хотя сам доступ чуть медленнее.


Однако необходимо уточнить, что в отличие от случая с HBase это ускорение не всегда бесплатное. Тут мы больше разблокируем возможности CPU делать работу, вместо того чтобы отвисать на локах.


Тут можно наблюдать, что в целом увеличение количества кэшей дает примерно пропорциональный рост утилизации ЦПУ. Однако есть несколько более выигрышные комбинации.

Например присмотримся внимательнее к настройке SSC = 3. Рост производительности на диапазоне составляет около 3.3 раз. Ниже результаты всех трех отдельных запусков.


Тогда как потребление CPU растет примерно в 2.8 раз. Разница не очень большая, но маленькой Грете уже радость.

Таким образом это будет иметь позитивный эффект для любого инструмента использующего массовый доступ к HDFS (например Spark и т.д.), при условии что прикладной код легкий (т.е. затык именно на стороне клиента HDFS) и есть свободные мощности CPU. Для проверки давайте протестируем какой эффект даст совместное применение оптимизации BlockCache и тюнинга SSC для чтения из HBase.


Тут видно, что в таких условиях эффект не такой большой, как в рафинированных тестах (чтение без всякой обработки), однако выжать дополнительные 80К тут вполне себе получается. Совместно обе оптимизации дают ускорение до 4х раз.

Так же по этой оптимизации был сделан PR [HDFS-15202], который был вмержен и данный функционал будет доступен в следующих релизах.

Ну и наконец было интересно сравнить производительность чтения подобной wide-column БД Cassandra и HBase.

Для этого запускались экземпляры стандартной утилиты нагрузочного тестирования YCSB с двух хостов (800 threads суммарно). На серверной стороне по 4 экземпляра RegionServer и Cassandra на 4 хостах (не тех, где запущены клиенты, чтобы избежать их влияния). Чтения шли из таблиц размером:

HBase 300 GB on HDFS (100 GB чистых данных)

Cassandra 250 GB (replication factor = 3)

Т.е. объем был примерно одинаковый (в HBase немножко больше).

Параметры HBase:

dfs.client.short.circuit.num = 5 (оптимизация клиента HDFS)

hbase.lru.cache.heavy.eviction.count.limit = 30 это означает то патч начнет работать через 30 выселений (~5 минут)

hbase.lru.cache.heavy.eviction.mb.size.limit = 300 целевой объем кеширования и выселения

Логи YCSB были распарсены и сведены в графики Excel:



Как видно, данные оптимизации позволяют сравнять производительность этих БД в этих условиях и достигнуть 450 тыс. чтений в секунду.

Надеемся эта информация может быть кому-нибудь полезной в ходе увлекательной борьбы за производительность.
Подробнее..

Новая схватка двух якодзун или Scylla vs Aerospike ( HBase для массовки)

08.04.2021 18:23:51 | Автор: admin
В прошлый раз обсуждение битвы тяжеловесов Cassandra VS HBase вызвало весьма бурную дискуссию, в ходе которой было много раз упомянута Scylla которая позиционируется как более быстрый аналог Cassandra (далее CS). Также меня заинтересовал весьма любопытный Aerospike (далее AS), который в своих тестах предсказуемо побеждает CS с разгромным счетом.
image

По удивительному совпадению Scylla (далее SC) также легко бьет CS, о чем гордо сообщает прямо на своей заглавной странице:



Таким образом естественным образом возникает вопрос, кто кого заборет, кит или слон?

В моем тесте оптимизированная версия HBase (далее HB) работает с CS на равных, так что он тут будет не в качестве претендента на победу, а лишь постольку, что весь наш процессинг построен на HB и хочется понимать его возможности в сравнении с лидерами.

Понятно, что бесплатность HB и CS это огромный плюс, однако с другой стороны если для достижения одинаковой производительности нужно в х раз больше железа, выгоднее бывает заплатить за софт, чем выделять этаж в ЦОД под дорогие грелки. Особенно учитывая, что если уж речь зашла про производительность, то так как HDD в принципе не способны дать хоть сколько-нибудь приемлемую скорость Random Access чтений (см. "Почему HDD и быстрые Random Access чтения несовместимы"). Что в свою очередь означает покупку SSD, который в объемах нужных для настоящей BigData весьма недешевое удовольствие.

Таким образом, было сделано следующее. Я арендовал 4 сервера в облаке AWS в конфигурации i3en.6xlarge где на борту каждого:
CPU 24 vcpu
MEM 192 GB
SSD 2 x 7500 GB


Если кто-то захочет повторить, то сразу отметим, что очень важно для воспроизводимости брать конфигурации, где полный объем дисков (7500 GB). Иначе диски придется делить с непредсказуемыми соседями, которые обязательно испортят ваши тесты, как им наверняка кажется весьма ценной нагрузкой.

Далее, раскатал SC при помощи конструктора, который любезно предоставил производитель на собственном сайте. После чего залил утилиту YCSB (которая уже практически стандарт для сравнительного тестирования БД) на каждую ноду кластера.

Есть только один важный нюанс, мы практически во всех случаях используем следующий паттерн: прочитать запись до изменения + записать новое значение.

Поэтому я модифицировал update следующим образом:

  @Override  public Status update(String table, String key,                       Map<String, ByteIterator> values) {    read(table, key, null, null); // << added read before write    return write(table, key, updatePolicy, values);  }


Далее я запускал нагрузку одновременно со всех 4х хостов (тех же самых где расположены сервера БД). Это сделано сознательно, потому что бывает клиенты одних БД больше потребляют ЦПУ чем другие. Учитывая, что размеры кластера ограничены, то хочется понимать совокупную эффективность реализации как серверной, так и клиентской части.

Результаты тестирования будут представлены ниже, но прежде чем мы перейдем к ним стоит рассмотреть также еще несколько важных нюансов.

Насчет AS это весьма привлекательная БД, лидер в номинации удовлетворенности клиентов по версии ресурса g2.


Признаться, мне она тоже как-то приглянулась. Ставится легко, вот этим скриптом достаточно легко раскатывается в облако. Стабильная, конфигурировать одно удовольствие. Однако есть у ней один очень большой недостаток. На каждый ключ она выделяет 64 байта оперативной памяти. Кажется немного, но в промышленных объемах это становится проблемой. Типичная запись в наших таблицах весит 500 байт. Именно такой объем value я использовал почти* во всех тестах (*почему почти будет ниже).

Так как мы храним по 3 копии каждой записи, то получается что для хранения 1 PB чистых данных (3 PB грязных) мы должны будем выделить всего-то 400 TB оперативки. Идем дальше нет чтооо?! Секундочку, а нельзя ли с этим что-нибудь сделать? спросили мы у вендора.

Ха, конечно можно много чего, загибаем пальцы:
1. Упаковать несколько записей в одну (хопа).
2. Тоже самое что в п.1, только за счет расширения числа полей.
3. Включить режим all-flush. Суть хранить индекс не в памяти, а на диске. Правда есть нюанс, Ватсон, опция доступна только в entreprise версии (в моем случае в рамках trial-периода)

Хорошо, теперь разберемся с HB и можно уже будет рассмотреть результаты тестов. Для установки Hadoop у Амазона предусмотрена платформа EMR, которая позволяет легко раскатать необходимый вам кластер. Мне пришлось только поднять лимиты на число процессов и открытых файлов, иначе падало под нагрузкой и заменил hbase-server на свою оптимизированную сборку (подробности тут). Второй момент, HB безбожно тормозит при работе с одиночными запросами, это факт. Поэтому мы работаем только батчами. В данном тесте батч = 100. Регионов в таблице 100.

Ну и последний момент, все базы тестировались в режиме strong consistency. Для HB это из коробки. AS доступно только в enterprise версии. SC гонялась в режиме consistency=all.

Итак, поехали. Insert AS:

10 sec: 360554 operations; 36055,4 current ops/sec;
20 sec: 698872 operations; 33831,8 current ops/sec;

230 sec: 7412626 operations; 22938,8 current ops/sec;
240 sec: 7542091 operations; 12946,5 current ops/sec;
250 sec: 7589682 operations; 4759,1 current ops/sec;
260 sec: 7599525 operations; 984,3 current ops/sec;
270 sec: 7602150 operations; 262,5 current ops/sec;
280 sec: 7602752 operations; 60,2 current ops/sec;
290 sec: 7602918 operations; 16,6 current ops/sec;
300 sec: 7603269 operations; 35,1 current ops/sec;
310 sec: 7603674 operations; 40,5 current ops/sec;
Error while writing key user4809083164780879263: com.aerospike.client.AerospikeException$Timeout: Client timeout: timeout=10000 iterations=1 failedNodes=0 failedConns=0 lastNode=5600000A 127.0.0.1:3000
Error inserting, not retrying any more. number of attempts: 1Insertion Retry Limit: 0


Упс, а вы точно продюссер промышленная база? Можно подумать так на первый взгляд. Однако оказалось, что проблема в ядре амазонской версии линукса. На них завели тикет и в версии amzn2-ami-hvm-2.0.20210326.0-x86_64-gp2 проблему исправили. Но для этих тестов вендор предложил использовать скрипты ансибла под ubuntu, где эта проблема не возникала (для раскатки нужно выбрать соответствующую ветку в гите).

Ладно, продолжаем. Запускаем загрузку 200 млн. записей (INSERT), потом UPDATE, потом GET. Вот что получилось (ops операций в секунду):


ВАЖНО! Это скорость одной ноды! Всего их 4, т.е. чтобы получить суммарную скорость нужно умножать на 4.

Первая колонка 10 полей, это не совсем честный тест. Т.е. это когда индекс в памяти, чего в реальной ситуации BigData недостижимо.

Вторая колонка это упаковка 10 записей в 1. Т.е. тут уже реально идет экономия памяти, ровно в 10 раз. Как отлично видно из теста, такой фокус не проходит даром, производительность существенно падает.

Ну и наконец all-flush, тут примерно такая же картина. Чистые вставки хуже, но ключевая операция Update быстрее, так что дальше будем сравнивать только с all-flush.

Собственно не будем тянуть кота, сразу вот:


Все в общем-то понятно, но что тут стоит добавить.
1. Вендор AS подтвердил, что результаты выше по их БД релевантные.
2. У SC вставки были какие-то не очень правильные, вот более подробный график в разрезе по серверам:


Возможно где-то косяк с настройками или всплыл тот баг с ядром, не знаю. Но настраивал все от и до скрипт от вендора, так что мопед не мой, все вопросы к нему.

Еще нужно понимать, что это весьма скромный объем данных и на больших объемах ситуация может измениться. В ходе экспериментов я спалил несколько сотен баксов, так что энтузиазма хватило только на long-run тест лидера и в ограниченном одним сервером режиме.


Почему оно так просело и что за оживление в последней трети загадка природы. Можно также заметить, что скорость радикально выше, чем в тестах чуть выше. Полагаю это потому что выключен режим strong consistency (т.к. сервер всего один).

Ну и наконец GET+WRITE (поверх залитых тестом выше пары миллиардов записей):


Что за просадка такая, в душе не догадываюсь. Никаких посторонних процессов не запускалось. Возможно как-то связано с кешом SSD, потому что утилизация во время всего хода тестирования AS в режиме all-flush была 100%.


На этом собственно все. Выводы в целом очевидны, нужно больше тестов. Желательно всех самых популярных БД в одинаковых условиях. В инете этого как-то этого жанра как-то не особо много. А хорошо бы, тогда вендоры баз будут мотивированы оптимизироваться, мы осознанно выбирать лучших.
Подробнее..

Перевод Как дебажить запросы, используя только Spark UI

08.11.2020 12:22:14 | Автор: admin

Егор Матешук (CDO AdTech-компании Квант и преподаватель в OTUS) приглашает Data Engineer'ов принять участие в бесплатном Demo-уроке Spark 3.0: что нового?. Узнаете, за счет чего Spark 3.0 добивается высокой производительности, а также рассмотрите другие нововведения.

Также приглашаем посмотреть запись трансляции Demo-урока Написание эффективных пользовательских функций в Spark и пройти вступительное тестирование по курсу Экосистема Hadoop, Spark, Hive!

У вас уже есть все, что вам нужно для дебаггинга запросов

Spark - самый широко используемый фреймворк для big data вычислений, способный выполнять задачи на петабайтах данных. Spark предоставляет набор веб-UI, которые можно использовать для отслеживания потребления ресурсов и состояния кластера Spark. Большинство проблем, с которыми мы сталкиваемся при выполнении задачи (job), можно отладить, перейдя в UI Spark.

spark2-shell --queue=P0 --num-executors 20Spark context Web UI available at http://<hostname>:<port>Spark context available as 'sc'Spark session available as 'spark'

В этой статье я попытаюсь продемонстрировать, как дебажить задачу Spark, используя только Spark UI. Я запущу несколько задач Spark и покажу, как Spark UI отражает выполнение задачи. Также я поделюсь с вами несколькими советами и хитростями.

Вот как выглядит Spark UI.

Мы начнем с вкладки SQL, которая включает в себя достаточно много информации для первоначального обзора. При использовании RDD в некоторых случаях вкладки SQL может и не быть.

А вот запрос, который запускаю в качестве примера

spark.sql("select id, count(1) from table1 group by id).show(10, false)

Перевод разъяснений в правой части:

<-- В рамках запроса было запущено 3 задачи, а сам запрос был выполнен за 21с.

<-- Файлы Parquet отсканированы, они содержат в сумме 23.7М строк

<-- Это работа выполненная каждой партицией

1. генерирует хеш id, count

2. группирует id и суммирует count. Вот как это выглядит

1. id = hash(125), count=1000

2. id = hash(124), count=900

<-- Происходит обмен данных, приведенных выше, на основе хеша id колонки, чтобы в результате каждая партиция имела один хеш

<-- Данные каждой партиции суммируются и возвращается count

Теперь давайте сопоставим это с физическим планом запроса. Физический план можно найти под SQL DAG, когда вы раскрываете вкладку details. Мы должны читать план снизу вверх

== Physical Plan ==CollectLimit 11+- *(2) HashAggregate(keys=[id#1], functions=[count(1)], output=[id#1, count(1)#79])+- Exchange hashpartitioning(id#1, 200)+- *(1) HashAggregate(keys=[id#1], functions=[partial_count(1)], output=[id#1, count#83L])+- *(1) FileScan parquet [id#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://<node>:<port><location>, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string>

Вот как следует читать план:

  1. Сканирование файла parquet. Обратите внимание на PushedFilters. Я продемонстрирую, что это означает позже

  2. Создание HashAggregate с ключами. Обратите внимание на partial_count. Это означает, что агрегированный count является частичным, поскольку агрегирование было выполнено в каждой отдельной задаче и не было смешанно для получения полного набора значений.

  3. Теперь сгенерированные данные агрегируются на основе ключа, в данном случае id.

  4. Теперь вычисляется вообще весь count.

  5. Полученный результат

Теперь, когда с этим мы разобрались, давайте посмотрим на данные PuedFilters. Spark оптимизирован для предикатов, и любые применяемые фильтры пушатся к источнику. Чтобы продемонстрировать это, давайте рассмотрим другую версию этого запроса

spark.sql("select id, count(1) from table1 where status = 'false' group by id).show(10, false)

А это его план

+- *(2) HashAggregate(keys=[id#1], functions=[count(1)], output=[id#1, count(1)#224])+- Exchange hashpartitioning(id#1, 200)+- *(1) HashAggregate(keys=[id#1], functions=[partial_count(1)], output=[id#1, count#228L])+- *(1) Project [id#1]+- *(1) Filter (isnotnull(status#3) && (status#3 = false))+- *(1) FileScan parquet [id#1,status#3] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://mr25p01if-ingx03010101.mr.if.apple.com:50001/home/hadoop/work/news/20200..., PartitionFilters: [], PushedFilters: [IsNotNull(status), EqualTo(status,false)], ReadSchema: struct<id:string,status:string>

Обратите внимание на изменения по сравнению с предыдущим планом.

Мы видим в PushedFilters уже кое-что другое проверка на null и проверка на равенство. Столбец, к которому мы применяем фильтр пушится к источнику, т.е. при чтении данных эти строки игнорируются. Результат этого переносится на следующие этапы.

Можем ли мы, применяя фильтры, уменьшить общее количество прочитанных данных (или файлов)?

Да мы можем. В обоих приведенных выше примерах общее количество прочитанных данных составляет ~ 23,8M. Чтобы уменьшить его, мы можем использовать магию файлов parquet. В Parquet есть группа строк, в которой есть статистика, которую можно использовать для игнорирования нескольких групп/файлов строк. Это приводит к тому, что эти файлы вообще не читаются. Вы можете прочитать о том, как это сделать, в другой моей статье на medium Insights Into Parquet Storage.

Вкладка Executor

Эта вкладка дает нам представление о количестве активных в настоящее время исполнителей в вашей сессии spark.

spark2-shell  queue=P0  driver-memory 20g  executor-memory 20g  num-executors 40

Я запросил 40 исполнителей для сессии, однако при запуске вы можете увидеть, что он предоставил мне всего 10 активных исполнителей. Это может быть связано с тем, что не работают хосты или Spark не нуждается в таком большом количестве исполнителей. Это также может вызвать задержку в планировании задач, поскольку у вас всего 10 исполнителей, а вам нужно 40, что скажется на параллелизме.

Вкладка Environment

Вкладка Environment содержит подробную информацию обо всех параметрах конфигурации, которые в данный момент использует сессия spark.

Посмотрите, как здесь отражены параметры, отраженные мной ранее. Это полезно хотя бы просто для того, чтобы убедиться, что предоставленная вами конфигурация принята.

Вкладка Storage

Здесь отображается информация об одной из наиболее обсуждаемых функций Spark - кэшировании. В Интернете доступно множество статей с разными мнениями относительно того, стоит ли кэшировать или нет. К счастью, эта статья не о том, когда стоит кэшировать и т. д. Он больше о том, что происходит, когда мы кэшируем.

Но перед этим давайте вернемся немного назад и потратим несколько минут на некоторые основы кэширования.

Есть два способа кэширования Dataframe:

df.persist

Для кэширования набора данных требуется несколько свойств.

df.cache

Под капотом это вызывает метод persist. Обратимся к исходному коду

def cache(): this.type = persist()/*** Persist this Dataset with the given storage level.* @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`,`MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`,`MEMORY_AND_DISK_2`, etc.* @group basic* @since 1.6.0*/
  • DISK_ONLY: хранить (persist) данные на диске только в сериализованном формате.

  • MEMORY_ONLY: [хранить данные в памяти только в десериализованном формате.

  • MEMORY_AND_DISK: хранить данные в памяти, а если памяти недостаточно, вытесненные блоки будут сохранены на диске.

  • MEMORY_ONLY_SER: этот уровень Spark хранит RDD как сериализованный объект Java (однобайтовый массив на партицию). Это более компактно по сравнению с десериализованными объектами. Но это увеличивает накладные расходы на CPU.

  • MEMORY_AND_DISK_SER: аналогично MEMORY_ONLY_SER, но с записью на диск, когда данные не помещаются в памяти.

  • Давайте воспользуемся df.cache в нашем примере и посмотрим, что произойдет a.cache() -> На вкладке Storage ничего не видно. Как вы можете догадаться, это из-за ленивого вычисления

Давайте воспользуемся df.cache в нашем примере и посмотрим, что произойдет

a.cache()

> На вкладке Storage ничего не видно. Как вы можете догадаться, это из-за ленивого вычисления

a.groupBy(id).count().show(10,false)

Мы видим какой-то кэш данных. Размер в памяти составляет 5,2 ГБ, а размер моего файла - 2 ГБ хммм что здесь произошло

hadoop dfs -dus <dirName>2,134,751,429 6,404,254,287 <dirName>

Это потому, что данные в памяти десериализованы и несжаты. Это результирует в большем объеме памяти по сравнению с диском.

Так что, когда вы хотите принимаете решение о том, кэшировать или нет, помните об этом.

Я видел несколько толковых статей о том, следует ли кэшировать или нет. Ознакомиться с ними - хорошая идея

Далее мы рассмотрим вкладки Jobs и Stages, причины множества проблем можно отдебажить с помощью этих вкладок.

spark.sql("select is_new_user,count(1) from table1 group by is_new_user").show(10,false)

Я вижу, что для указанного выше запроса запускаются 3 задачи. Но 2 из них пропущены. Обычно это означает, что данные были извлечены из кэша и не было необходимости повторно выполнять данный этап. Кроме того, Spark выполняет множество фиктивных задач для оценки данных. Пропуск задач мог быть связан и с этим.

Давайте же глубоко погрузимся в задачу, которая не была пропущена. Это визуализация DAG для задачи

Мы ясно видим, что эта задача состоит из двух этапов, разделенных операцией перемешивания/обмена. Stages означают, что данные были записаны на диск для использования в следующем процессе.

Давайте углубимся во вкладку stages.

Первое, что всегда нужно проверять, - это сводные метрики для задач. Вы можете нажать show additional metrics для получения дополнительных фактов. Это покажет множество необходимых параметров по минимуму, медиане и максимуму. В идеальном мире минимальное значение должно быть близко к максимальному.

Вот несколько моментов, которые следует отметить:

Продолжительность (duration): В нашем примере минимальная и максимальная продолжительность составляет 0,4 и 4 секунды соответственно. Это может быть связано с несколькими причинами, и мы постараемся отдебажить их в пунктах ниже.

Время десериализации задачи (Task deserialization time):

В нашем примере в рамках десериализации задачи некоторое время тратится и на другие задачи. Одной из основных причин было выполнение процессов сборки мусора в исполнителях. У меня выполнялись другие процессы, в которых были кэшированы некоторые данные, что приводило к сборке мусора. Процессам сборки мусора предоставляется наивысший приоритет, и они останавливают все запущенные процессы в угоду обслуживания процесса сборки мусора. Если вы видите, что ваш процесс не потребляет много памяти, первым шагом для решения такой проблемы может быть разговор с администратором/OPS.

Задержка планировщика (Scheduler delay): максимальная задержка планировщика составляет 0,4 секунды. Это означает, что одна из задач должна была ждать отправки еще 0,4 секунды. Большое это значение или маленькое, зависит от вашего конкретного юзкейса.

Размер ввода очень сильно распределен. Это очень хорошо, поскольку все задачи читают одинаковый объем данных. Это одна из самых важных вещей при поиске неверного/искаженного запроса. Это можно увидеть в столбце shuffle read в разделе Summary metrics for tasks. Самая простая логика для решения таких проблем - это добавление соли к группе, которая может распараллеливать данные, а затем, наконец, агрегирование данных без соли. Этот принцип может применяться во многих формах для решения проблемы асимметрии данных.

Еще одна вещь, на которую стоит обратить внимание, - это уровень локальности.

* PROCESSLOCAL Эта задача будет запущена в том же процессе, что и исходные данные

* NODELOCAL Эта задача будет запущена на том же компьютере, что и исходные данные

* RACKLOCAL Эта задача будет запущена в том же блоке, что и исходные данные

* NOPREF (Отображается как ANY) Эта задача не может быть запущена в том же процессе, что и исходные данные, или это не имеет значения.

Предположим, мы потребляем данные из узла Cassandra в кластере Spark, состоящем из трех узлов. Cassandra работает на машине X узлов Spark X, Y и Z. Для узла X все данные будут помечены как NODELOCAL. Это означает, что после того, как каждое ядро на X будет занято, мы останемся с задачами, предпочтительное расположение которых - X, но у нас есть пространство для выполнения только на Y и Z. У Spark есть только два варианта: дождаться, пока ядра станут доступны на X, или понизить уровень локальности задачи и попытаться найти место для них и принять любые штрафы за нелокальное выполнение.

Параметр spark.locality.wait описывает, как долго ждать перед понижением уровня задач, которые потенциально могут выполняться с более высокого уровня локальности до более низкого уровня. Этот параметр, по сути, является нашей оценкой того, сколько стоит ожидание локального места. Значение по умолчанию - 3 секунды, что означает, что в нашем примере с Cassandra, как только наш совместно расположенный узел X будет забит задачами, другие наши машины Y и Z будут простаивать в течение 3 секунд, прежде чем задачи, которые могли быть NODELOCAL, будут понижены до ANY* и запущены.

Вот пример кода для этого.

Я надеюсь, что эта статья послужит вам в качестве руководства по дебаггингу на Spark UI с целью устранения проблем с производительностью Spark. В Spark 3 есть много дополнительных функций, которые тоже стоит посмотреть.

Также хорошая идея почитать документацию Spark UI.

Вы также можете связаться со мной в Linkedin.

Хотите узнать, как Apache Druid индексирует данные для сверхбыстрых запросов? Узнайте об этом здесь:

Insights into Indexing using Bitmap Index


Интересно развиваться в данном направлении? Участвуйте в трансляции мастер-класса Spark 3.0: что нового? и оцените программу курса Экосистема Hadoop, Spark, Hive!

Подробнее..

Перевод Стриминг Edge2AI на NVIDIA JETSON Nano 2Гб с использованием агентов MiNiFi в приложениях FLaNK

03.03.2021 10:13:29 | Автор: admin

Мне некогда было снимать своё видео распаковки - не терпелось запустить это превосходное устройство. NVIDIA Jetson Nano 2GB теперь можно купить всего за 59 долларов!!!
Я провел пробный запуск, у него есть все функции, которые вам нравятся для Jetson, всего на 2 Гб оперативной памяти и 2 USB порта меньше. Это очень доступное устройство для создания классных кейсов.

Распаковка: https://www.youtube.com/watch?v=dVGEtWYkP2c&feature=youtu.be

Подключение к приложениям ИИ-сообщества: https://youtu.be/2T8CG7lDkcU

Образовательные проекты: https://www.nvidia.com/en-us/autonomous-machines/embedded-systems/jetson-nano/education-projects/

Технические характеристики:
Графический процессор: NVIDIA Maxwell со 128 ядрами CUDA
Процессор: Четырехъядерный ARM A57 с частотой 1,43 ГГц
Память: 2 Гб LPDDR4, 64-bit 25,6 Гбит/с
Накопитель: microSD (не входит в комплект)
Кодирование видео: 4K с частотой 30 Гц | 4 потока в разрешении 1080p с частотой 30 Гц | 9 потоков в разрешении 720p с частотой 30 Гц (H.264/H.265)
Декодирование видео: 4K с частотой 60 Гц | 2 потока в разрешении 4K с частотой 30 Гц | 8 потоков в разрешении 1080p с частотой 30 Гц | 18 потоков в разрешении 720р с частотой 30 Гц (H.264/H.265)
Соединение: Беспроводное подключение Gigabit Ethernet, 802.11ac*
Камера: 1 разъем MIPI CSI-2
Разъемы дисплея: Разъем HDMI
USB: 1x USB 3.0 Type A, 2x USB 2.0 Type A, 1x USB 2.0 Micro-B
Прочие разъемы: Разъем 40-пин (GPIO, I2C, I2S, SPI, UART)
Разъем: 12-пин (питание и связанные сигналы, UART)
Разъем вентилятора: 40-пин *
Размеры: 100 x 80 x 29 мм

В зависимости от того, где или как вы покупаете устройство, вам, возможно, придется купить блок питания и USB WiFi.

Все мои существующие рабочие нагрузки отлично работают в версии на 2 ГБ, но с очень хорошей экономией средств. Настройка проста, система быстрая, я настоятельно рекомендую всем, кто ищет быстрый способ поэкспериментировать с ИИ на периферии и другими пограничными рабочими нагрузками. Это может быть отличный вариант для самостоятельного обучения. Я также подключил свой Jetson к монитору, клавиатуре и мыши, и я могу использовать его сразу же как для вычислений на периферии, так и в качестве основного рабочего ПК. С большим количеством таких устройств можно будет легко запустить MiNiFi агентов, модель классификации на Python и модели Deep Learning.

NVIDIA не остановилась на самом дешевом пограничном устройстве, у них также есть несколько серьезных корпоративных обновлений: Cloudera превозносит на новый уровень корпоративное озеро данных благодаря сотрудничеству с NVIDIA!

Пример запуска модели глубокого обучения

Исходники и настройки: https://github.com/tspannhw/SettingUpAJetsonNano2GB/blob/main/README.md

Устройство NVIDIA Jetson Nano 2GB великолепно - ничего лишнего. Я скопировал своего агента MiNiFi и код из других Jetson Nanos, Xavier NX и TX1, и все работает нормально. Скорость вполне подходит для большинства потребностей, особенно для задач разработки и прототипирования. Я предпочитаю Xavier, но за эту цену выбор достойный. Для большинства сценариев использования IoT/ИИ на периферии я собираюсь задействовать несколько Jetson Nano. Я уже использовал NVidia Jetson 2GB для демонстрации на некоторых мероприятиях, включая ApacheCon, BeamSummit, Open Source Summit и AI Dev World:
https://www.linkedin.com/pulse/2020-streaming-edge-ai-events-tim-spann/

Для захвата неподвижных изображений и создания их каталога для обработки я установил fswebcam.

Обычно я использую этот гайд: https://github.com/dusty-nv/jetson-inference.
Вы получите отличные библиотеки, руководства, документацию и примеры. Обычно я создаю свои приложения, пользуясь одним из этих примеров, а также использую одну из превосходных готовых моделей NVIDIA. Это значительно ускоряет разработку и развертывание Edge2AI-приложений, будь то задачи IoT или другие цели. Это все работает со стандартными подключаемыми камерами Raspberry Pi и отличными USB веб-камерами Logitech, которые я использовал со всеми другими устройствами NVIDIA.

При такой цене, кажется, нет причин, по которым любому разработчику в любой компании не стоило бы иметь такое устройство. Это отличное решение для тестирования приложений Edge AI и классификации с очень приличной скоростью.

Во время проведения презентации на NetHope Global Summit, и подумал, что эти устройства за 59 долларов вполне способны стать отличным вариантом для некоммерческих организаций, которые могут использовать их для сбора и анализа данных в поле. https://www.nethopeglobalsummit.org/agenda-2020#sz-tab-44134

Я исследую некоторые сценарии использования, чтобы посмотреть, можно ли заранее создать несколько простых приложений, которые такая организация могла бы просто взять и запустить. Посмотрим, что получится. Периферийное устройство с графическим процессором за 59 долларов позволяет использовать некоторые новые приложения по очень доступной цене. За 59 долларов я не получу много ресурсов облака, но смогу получить небольшое и мощное устройство для сбора данных, которое справится с нагрузками МО, DL, обработкой видео с камеры, работой с агентами MiNiFi, Python и Java. Воображение здесь ограничивается только 2-мя гигабайтами быстрой оперативной памяти и однимграфическим процессором.

Пример приложения

Пример выходных данных:

{"uuid": "nano_uuid_cmq_20201026202757", "ipaddress": "192.168.1.169",
"networktime": 47.7275505065918, "detectleft": 1.96746826171875,
"detectconfidence": 52.8866550521850pidence ": 52.8866550521850pidence":
52.8866550521850p , "gputemp": "30.0", "gputempf": "86", "cputempf": "93",
"runtime": "169", "host": "nano5",
"filename": "/ opt / demo /images/out_iue_20201026202757.jpg ",
" host_name ":" nano5 "," macaddress ":" 00: e0: 4c: 49: d8: b7 ",
" end ":" 1603744246.924455 "," te ":" 169.4200084209442 ",
systemtime: 26.10.2020 16:30:46, cpu: 9,9, diskusage: 37100,4 МB,
Memory: 91,5, id: 20201026202757_64d69a82-88d8-45f8-be06 -1b836cb6cc84 "}


Ниже приведен пример вывода при запуске скрипта на Python для классификации изображения с веб-камеры (это Logi веб-камера низкого уровня, но вы можете использовать камеру Raspberry Pi). Еще лучше, если бы мы запустили этот непрерывный вывод сообщений журнала и изображений для агентов MiNiFi. Их можно было бы собрать и отправить на сервер для маршрутизации, преобразования и обработки.

root@nano5:/opt/demo/minifi-jetson-nano# jetson_clocks

root@nano5:/opt/demo/minifi-jetson-nano# python3 detect.py

[gstreamer] initialized gstreamer, version 1.14.5.0

[gstreamer] gstCamera -- attempting to create device v4l2:///dev/video0

[gstreamer] gstCamera -- found v4l2 device: HD Webcam C615

[gstreamer] v4l2-proplist, device.path=(string)/dev/video0, udev-probed=(boolean)false, device.api=(string)v4l2, v4l2.device.driver=(string)uvcvideo, v4l2.device.card=(string)"HD\ Webcam\ C615", v4l2.device.bus_info=(string)usb-70090000.xusb-3.2, v4l2.device.version=(uint)264588, v4l2.device.capabilities=(uint)2216689665, v4l2.device.device_caps=(uint)69206017;

[gstreamer] gstCamera -- found 30 caps for v4l2 device /dev/video0

[gstreamer] [0] video/x-raw, format=(string)YUY2, width=(int)1920, height=(int)1080, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction)5/1;

[gstreamer] [1] video/x-raw, format=(string)YUY2, width=(int)1600, height=(int)896, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 15/2, 5/1 };

[gstreamer] [2] video/x-raw, format=(string)YUY2, width=(int)1280, height=(int)720, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 10/1, 15/2, 5/1 };

[gstreamer] [3] video/x-raw, format=(string)YUY2, width=(int)960, height=(int)720, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [4] video/x-raw, format=(string)YUY2, width=(int)1024, height=(int)576, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [5] video/x-raw, format=(string)YUY2, width=(int)800, height=(int)600, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [6] video/x-raw, format=(string)YUY2, width=(int)864, height=(int)480, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [7] video/x-raw, format=(string)YUY2, width=(int)800, height=(int)448, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [8] video/x-raw, format=(string)YUY2, width=(int)640, height=(int)480, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [9] video/x-raw, format=(string)YUY2, width=(int)640, height=(int)360, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [10] video/x-raw, format=(string)YUY2, width=(int)432, height=(int)240, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [11] video/x-raw, format=(string)YUY2, width=(int)352, height=(int)288, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [12] video/x-raw, format=(string)YUY2, width=(int)320, height=(int)240, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [13] video/x-raw, format=(string)YUY2, width=(int)176, height=(int)144, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [14] video/x-raw, format=(string)YUY2, width=(int)160, height=(int)120, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [15] image/jpeg, width=(int)1920, height=(int)1080, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [16] image/jpeg, width=(int)1600, height=(int)896, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [17] image/jpeg, width=(int)1280, height=(int)720, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [18] image/jpeg, width=(int)960, height=(int)720, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [19] image/jpeg, width=(int)1024, height=(int)576, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [20] image/jpeg, width=(int)800, height=(int)600, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [21] image/jpeg, width=(int)864, height=(int)480, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [22] image/jpeg, width=(int)800, height=(int)448, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [23] image/jpeg, width=(int)640, height=(int)480, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [24] image/jpeg, width=(int)640, height=(int)360, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [25] image/jpeg, width=(int)432, height=(int)240, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [26] image/jpeg, width=(int)352, height=(int)288, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [27] image/jpeg, width=(int)320, height=(int)240, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [28] image/jpeg, width=(int)176, height=(int)144, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] [29] image/jpeg, width=(int)160, height=(int)120, pixel-aspect-ratio=(fraction)1/1, framerate=(fraction){ 30/1, 24/1, 20/1, 15/1, 10/1, 15/2, 5/1 };

[gstreamer] gstCamera -- selected device profile: codec=mjpeg format=unknown width=1280 height=720

[gstreamer] gstCamera pipeline string:

[gstreamer] v4l2src device=/dev/video0 ! image/jpeg, width=(int)1280, height=(int)720 ! jpegdec ! video/x-raw ! appsink name=mysink

[gstreamer] gstCamera successfully created device v4l2:///dev/video0

[gstreamer] opening gstCamera for streaming, transitioning pipeline to GST_STATE_PLAYING

[gstreamer] gstreamer changed state from NULL to READY ==> mysink

[gstreamer] gstreamer changed state from NULL to READY ==> capsfilter1

[gstreamer] gstreamer changed state from NULL to READY ==> jpegdec0

[gstreamer] gstreamer changed state from NULL to READY ==> capsfilter0

[gstreamer] gstreamer changed state from NULL to READY ==> v4l2src0

[gstreamer] gstreamer changed state from NULL to READY ==> pipeline0

[gstreamer] gstreamer changed state from READY to PAUSED ==> capsfilter1

[gstreamer] gstreamer changed state from READY to PAUSED ==> jpegdec0

[gstreamer] gstreamer changed state from READY to PAUSED ==> capsfilter0

[gstreamer] gstreamer stream status CREATE ==> src

[gstreamer] gstreamer changed state from READY to PAUSED ==> v4l2src0

[gstreamer] gstreamer changed state from READY to PAUSED ==> pipeline0

[gstreamer] gstreamer stream status ENTER ==> src

[gstreamer] gstreamer message new-clock ==> pipeline0

[gstreamer] gstreamer message stream-start ==> pipeline0

[gstreamer] gstreamer changed state from PAUSED to PLAYING ==> capsfilter1

[gstreamer] gstreamer changed state from PAUSED to PLAYING ==> jpegdec0

[gstreamer] gstreamer changed state from PAUSED to PLAYING ==> capsfilter0

[gstreamer] gstreamer changed state from PAUSED to PLAYING ==> v4l2src0

[gstreamer] gstCamera -- onPreroll

[gstreamer] gstCamera -- map buffer size was less than max size (1382400 vs 1382407)

[gstreamer] gstCamera recieve caps: video/x-raw, format=(string)I420, width=(int)1280, height=(int)720, interlace-mode=(string)progressive, multiview-mode=(string)mono, multiview-flags=(GstVideoMultiviewFlagsSet)0:ffffffff:/right-view-first/left-flipped/left-flopped/right-flipped/right-flopped/half-aspect/mixed-mono, pixel-aspect-ratio=(fraction)1/1, chroma-site=(string)mpeg2, colorimetry=(string)1:4:0:0, framerate=(fraction)30/1

[gstreamer] gstCamera -- recieved first frame, codec=mjpeg format=i420 width=1280 height=720 size=1382407

RingBuffer -- allocated 4 buffers (1382407 bytes each, 5529628 bytes total)

[gstreamer] gstreamer changed state from READY to PAUSED ==> mysink

[gstreamer] gstreamer message async-done ==> pipeline0

[gstreamer] gstreamer changed state from PAUSED to PLAYING ==> mysink

[gstreamer] gstreamer changed state from PAUSED to PLAYING ==> pipeline0

RingBuffer -- allocated 4 buffers (14745600 bytes each, 58982400 bytes total)

jetson.inference -- detectNet loading build-in network 'ssd-mobilenet-v2'

detectNet -- loading detection network model from:

-- model networks/SSD-Mobilenet-v2/ssd_mobilenet_v2_coco.uff

-- input_blob 'Input'

-- output_blob 'NMS'

-- output_count 'NMS_1'

-- class_labels networks/SSD-Mobilenet-v2/ssd_coco_labels.txt

-- threshold 0.500000

-- batch_size 1

[TRT] TensorRT version 7.1.3

[TRT] loading NVIDIA plugins

[TRT] Registered plugin creator - ::GridAnchor_TRT version 1

[TRT] Registered plugin creator - ::NMS_TRT version 1

[TRT] Registered plugin creator - ::Reorg_TRT version 1

[TRT] Registered plugin creator - ::Region_TRT version 1

[TRT] Registered plugin creator - ::Clip_TRT version 1

[TRT] Registered plugin creator - ::LReLU_TRT version 1

[TRT] Registered plugin creator - ::PriorBox_TRT version 1

[TRT] Registered plugin creator - ::Normalize_TRT version 1

[TRT] Registered plugin creator - ::RPROI_TRT version 1

[TRT] Registered plugin creator - ::BatchedNMS_TRT version 1

[TRT] Could not register plugin creator - ::FlattenConcat_TRT version 1

[TRT] Registered plugin creator - ::CropAndResize version 1

[TRT] Registered plugin creator - ::DetectionLayer_TRT version 1

[TRT] Registered plugin creator - ::Proposal version 1

[TRT] Registered plugin creator - ::ProposalLayer_TRT version 1

[TRT] Registered plugin creator - ::PyramidROIAlign_TRT version 1

[TRT] Registered plugin creator - ::ResizeNearest_TRT version 1

[TRT] Registered plugin creator - ::Split version 1

[TRT] Registered plugin creator - ::SpecialSlice_TRT version 1

[TRT] Registered plugin creator - ::InstanceNormalization_TRT version 1

[TRT] detected model format - UFF (extension '.uff')

[TRT] desired precision specified for GPU: FASTEST

[TRT] requested fasted precision for device GPU without providing valid calibrator, disabling INT8

[TRT] native precisions detected for GPU: FP32, FP16

[TRT] selecting fastest native precision for GPU: FP16

[TRT] attempting to open engine cache file /usr/local/bin/networks/SSD-Mobilenet-v2/ssd_mobilenet_v2_coco.uff.1.1.7103.GPU.FP16.engine

[TRT] loading network plan from engine cache /usr/local/bin/networks/SSD-Mobilenet-v2/ssd_mobilenet_v2_coco.uff.1.1.7103.GPU.FP16.engine

[TRT] device GPU, loaded /usr/local/bin/networks/SSD-Mobilenet-v2/ssd_mobilenet_v2_coco.uff

[TRT] Deserialize required 2384046 microseconds.

[TRT]

[TRT] CUDA engine context initialized on device GPU:

[TRT] -- layers 117

[TRT] -- maxBatchSize 1

[TRT] -- workspace 0

[TRT] -- deviceMemory 35449344

[TRT] -- bindings 3

[TRT] binding 0

-- index 0

-- name 'Input'

-- type FP32

-- in/out INPUT

-- # dims 3

-- dim #0 3 (SPATIAL)

-- dim #1 300 (SPATIAL)

-- dim #2 300 (SPATIAL)

[TRT] binding 1

-- index 1

-- name 'NMS'

-- type FP32

-- in/out OUTPUT

-- # dims 3

-- dim #0 1 (SPATIAL)

-- dim #1 100 (SPATIAL)

-- dim #2 7 (SPATIAL)

[TRT] binding 2

-- index 2

-- name 'NMS_1'

-- type FP32

-- in/out OUTPUT

-- # dims 3

-- dim #0 1 (SPATIAL)

-- dim #1 1 (SPATIAL)

-- dim #2 1 (SPATIAL)

[TRT]

[TRT] binding to input 0 Input binding index: 0

[TRT] binding to input 0 Input dims (b=1 c=3 h=300 w=300) size=1080000

[TRT] binding to output 0 NMS binding index: 1

[TRT] binding to output 0 NMS dims (b=1 c=1 h=100 w=7) size=2800

[TRT] binding to output 1 NMS_1 binding index: 2

[TRT] binding to output 1 NMS_1 dims (b=1 c=1 h=1 w=1) size=4

[TRT]

[TRT] device GPU, /usr/local/bin/networks/SSD-Mobilenet-v2/ssd_mobilenet_v2_coco.uff initialized.

[TRT] W = 7 H = 100 C = 1

[TRT] detectNet -- maximum bounding boxes: 100

[TRT] detectNet -- loaded 91 class info entries

[TRT] detectNet -- number of object classes: 91

detected 0 objects in image

[TRT] ------------------------------------------------

[TRT] Timing Report /usr/local/bin/networks/SSD-Mobilenet-v2/ssd_mobilenet_v2_coco.uff

[TRT] ------------------------------------------------

[TRT] Pre-Process CPU 0.07802ms CUDA 0.48875ms

[TRT] Network CPU 45.52254ms CUDA 44.93750ms

[TRT] Post-Process CPU 0.03193ms CUDA 0.03177ms

[TRT] Total CPU 45.63248ms CUDA 45.45802ms

[TRT] ------------------------------------------------

[TRT] note -- when processing a single image, run 'sudo jetson_clocks' before

to disable DVFS for more accurate profiling/timing measurements

[image] saved '/opt/demo/images/out_kfy_20201030195943.jpg' (1280x720, 4 channels)

[TRT] ------------------------------------------------

[TRT] Timing Report /usr/local/bin/networks/SSD-Mobilenet-v2/ssd_mobilenet_v2_coco.uff

[TRT] ------------------------------------------------

[TRT] Pre-Process CPU 0.07802ms CUDA 0.48875ms

[TRT] Network CPU 45.52254ms CUDA 44.93750ms

[TRT] Post-Process CPU 0.03193ms CUDA 0.03177ms

[TRT] Total CPU 45.63248ms CUDA 45.45802ms

[TRT] ------------------------------------------------

[gstreamer] gstCamera -- stopping pipeline, transitioning to GST_STATE_NULL

[gstreamer] gstCamera -- pipeline stopped


Мы используем расширенный пример сценария, скрипт detect.py. Чтобы сделать снимок веб-камеры и классифицировать его: camera = jetson.utils.gstCamera(width, height, camera)

Модель работает достаточно быстро и дает нам те результаты и данные, которые нужно.

классификация изображения моделью на Jetson Nano 2GBклассификация изображения моделью на Jetson Nano 2GB

Ссылки:

https://github.com/tspannhw/minifi-jetson-nano

https://youtu.be/fIESu365Sb0

https://developer.nvidia.com/blog/ultimate-starter-ai-computer-jetson-nano-2gb-developer-kit/

Подробнее..

Перевод - recovery mode Инструменты обработки OLAP-запросов для Big Data

14.09.2020 22:10:58 | Автор: admin


Введение


Эта статья является компиляцией другой статьи. В ней я намерен сконцентрироваться на инструментах для работы с Big data, ориентированных на анализ данных.

Итак, предположим, вы приняли необработанные данные, обработали их, и теперь они готовы к дальнейшему использованию.

Существует множество инструментов, используемых для работы с данными, каждый из которых имеет свои преимущества и недостатки. Большинство из них ориентировано на OLAP, но некоторые также оптимизированы для OLTP. Часть из них использует стандартные форматы и сосредоточена только на выполнении запросов, другие используют свой собственный формат или хранилище для передачи обработанных данных в источник в целях повышения производительности. Некоторые из них оптимизированы для хранения данных с использованием определенных схем, например звезда или снежинка, но есть и более гибкие. Подводя итог, имеем следующие противопоставления:

  • Хранилище данных против Озера
  • Hadoop против Автономного хранилища
  • OLAP против OLTP
  • Движок запросов против OLAP механизмов


Мы также рассмотрим инструменты для обработки данных с возможностью выполнения запросов.


Инструменты для обработки данных


Большинство упомянутых инструментов могут подключаться к серверу метаданных, например Hive, и выполнять запросы, создавать представления и т.п. Это часто используется для создания дополнительных (улучшенных) уровней отчетности.

Spark SQL предоставляет способ беспрепятственно смешивать запросы SQL с программами Spark, поэтому вы можете смешивать API DataFrame с SQL. Он имеет интеграцию с Hive и стандартное подключение через JDBC или ODBC, так что вы можете подключить Tableau, Looker или любой инструмент бизнес-аналитики к своим данным через Spark.



Apache Flink также предоставляет SQL API. Поддержка SQL в Flink основана на Apache Calcite, который реализует стандарт SQL. Он также интегрируется с Hive через HiveCatalog. Например, пользователи могут хранить свои таблицы Kafka или ElasticSearch в Hive Metastore с помощью HiveCatalog и повторно использовать их позже в запросах SQL.

Kafka тоже предоставляет возможности SQL. В целом большинство инструментов обработки данных предоставляют SQL интерфейсы.

Инструменты выполнения запросов


Этот тип инструментов ориентирован на унифицированный запрос к различным источникам данных в разных форматах. Идея состоит в том, чтобы направлять запросы к вашему озеру данных с помощью SQL, как если бы это была обычная реляционная база данных, хотя у нее есть некоторые ограничения. Некоторые из этих инструментов могут также выполнять запросы к NoSQL базам и многое другое. Эти инструменты предоставляют интерфейс JDBC для внешних инструментов, таких как Tableau или Looker, для безопасного подключения к вашему озеру данных. Инструменты запросов самый медленный вариант, но обеспечивают максимальную гибкость.

Apache Pig: один из первых инструментов наряду с Hive. Имеет собственный язык, отличный от SQL. Отличительным свойством программ созданных Pig является то, что их структура поддается существенному распараллеливанию, что, в свою очередь, позволяет им обрабатывать очень большие наборы данных. Это работает не в пользу более новых движков на базе SQL.

Presto: платформа от Facebook с открытым исходным кодом. Это распределенный механизм SQL-запросов для выполнения интерактивных аналитических запросов к источникам данных любого размера. Presto позволяет запрашивать данные там, где они находятся, включая Hive, Cassandra, реляционные базы данных и файловые системы. Она может выполнять запросы к большим наборам данных за секунды. Presto не зависит от Hadoop, но интегрируется с большинством его инструментов, особенно с Hive, для выполнения SQL-запросов.

Apache Drill: предоставляет механизм SQL запросов без схем для Hadoop, NoSQL и даже облачного хранилища. Он не зависит от Hadoop, но имеет множество интеграций с инструментами экосистемы, такими как Hive. Один запрос может объединять данные из нескольких хранилищ, выполняя оптимизацию, специфичную для каждого из них. Это очень хорошо, т.к. позволяет аналитикам обрабатывать любые данные как таблицу, даже если фактически они читают файл. Drill полностью поддерживает стандартный SQL. Бизнес-пользователи, аналитики и специалисты по обработке данных могут использовать стандартные инструменты бизнес-аналитики, такие как Tableau, Qlik и Excel, для взаимодействия с нереляционными хранилищами данных, используя драйверы Drill JDBC и ODBC. Кроме того, разработчики могут использовать простой REST API Drill в своих пользовательских приложениях для создания красивых визуализаций.

OLTP базы данных


Хотя Hadoop оптимизирован для OLAP, все же бывают ситуации, когда вы хотите выполнять запросы OLTP для интерактивного приложения.

HBase имеет очень ограниченные свойства ACID по дизайну, поскольку он был построен для масштабирования и не предоставляет возможностей ACID из коробки, но его можно использовать для некоторых сценариев OLTP.

Apache Phoenix построен на основе HBase и обеспечивает способ выполнения запросов OTLP в экосистеме Hadoop. Apache Phoenix полностью интегрирован с другими продуктами Hadoop, такими как Spark, Hive, Pig, Flume и Map Reduce. Он также может хранить метаданные, поддерживает создание таблиц и добавочные изменения с управлением версиями при помощи команд DDL. Это работает довольно быстро, быстрее, чем при использовании Drill или другого
механизма запросов.

Вы можете использовать любую крупномасштабную базу данных за пределами экосистемы Hadoop, такую как Cassandra, YugaByteDB, ScyllaDB для OTLP.

Наконец, очень часто бывает, что в быстрых базах данных любого типа, таких как MongoDB или MySQL, есть более медленное подмножество данных, обычно самые свежие. Упомянутые выше механизмы запросов могут объединять данные между медленным и быстрым хранилищами за один запрос.

Распределенное индексирование


Эти инструменты обеспечивают способы хранения и поиска неструктурированных текстовых данных, и они живут за пределами экосистемы Hadoop, поскольку им требуются специальные структуры для хранения данных. Идея состоит в том, чтобы использовать инвертированный индекс для выполнения быстрого поиска. Помимо текстового поиска, эту технологию можно использовать для самых разных целей, таких как хранение журналов, событий и т.п. Есть два основных варианта:

Solr: это популярная, очень быстрая платформа корпоративного поиска с открытым исходным кодом, построенная на Apache Lucene. Solr является надежным, масштабируемым и отказоустойчивым инструментом, обеспечивая распределенное индексирование, репликацию и запросы с балансировкой нагрузки, автоматическое переключение при отказе и восстановление, централизованную настройку и многое другое. Он отлично подходит для текстового поиска, но его варианты использования ограничены по сравнению с ElasticSearch.

ElasticSearch: это также очень популярный распределенный индекс, но он превратился в собственную экосистему, которая охватывает множество вариантов использования, таких как APM, поиск, хранение текста, аналитика, информационные панели, машинное обучение и многое другое. Это определенно инструмент, который нужно иметь в своем наборе инструментов либо для DevOps, либо для конвейера данных, поскольку он очень универсален. Он также может хранить и искать видео и изображения.

ElasticSearch можно использовать в качестве слоя быстрого хранения для вашего озера данных для расширенных функций поиска. Если вы храните свои данные в большой базе данных типа ключ-значение, такой как HBase или Cassandra, которые предоставляют очень ограниченные возможности поиска из-за отсутствия соединений, вы можете поставить ElasticSearch перед ними, чтобы выполнять запросы, возвращать идентификаторы, а затем выполнять быстрый поиск в своей базе данных.

Его также можно использовать для аналитики. Вы можете экспортировать свои данные, индексировать их, а затем запрашивать их с помощью Kibana, создавая информационные панели, отчеты и многое другое, вы можете добавлять гистограммы, сложные агрегации и даже запускать алгоритмы машинного обучения поверх ваших данных. Экосистема ElasticSearch огромна, и ее стоит изучить.

OLAP базы данных


Тут мы рассмотрим базы данных, которые также могут предоставлять хранилище метаданных для схем запросов. Если сравнивать эти инструменты с системами выполнения запросов, эти инструменты также предоставляют хранилище данных и могут применяться для конкретных схем хранения данных (звездообразная схема). Эти инструменты используют синтаксис SQL. Spark, либо другие платформы могут с ними взаимодействовать.

Apache Hive: мы уже обсуждали Hive, как центральный репозиторий схем для Spark и других инструментов, чтобы они могли использовать SQL, но Hive также может хранить данные, так что вы можете использовать его в качестве хранилища. Он может получить доступ к HDFS или HBase. При запросе Hive он использует Apache Tez, Apache Spark или MapReduce, будучи намного быстрее Tez или Spark. Он также имеет процедурный язык под названием HPL-SQL. Hive это чрезвычайно популярное хранилище мета-данных для Spark SQL.

Apache Impala: это собственная аналитическая база данных для Hadoop, которую вы можете использовать для хранения данных и эффективных запросов к ним. Она может подключаться к Hive для получения метаданных с помощью Hcatalog. Impala обеспечивает низкую задержку и высокий уровень параллелизма для запросов бизнес-аналитики и аналитики в Hadoop (что не предоставляется пакетными платформами, такими как Apache Hive). Impala также масштабируется линейно, даже в многопользовательских средах, что является лучшей альтернативой для запросов, чем Hive. Impala интегрирована с собственной системой безопасности Hadoop и Kerberos для аутентификации, поэтому вы можете безопасно управлять доступом к данным. Она использует HBase и HDFS для хранения данных.



Apache Tajo: это еще одно хранилище данных для Hadoop. Tajo разработан для выполнения специальных запросов с малыми задержкой и масштабируемостью, онлайн-агрегирования и ETL для больших наборов данных, хранящихся в HDFS и других источниках данных. Он поддерживает интеграцию с Hive Metastore для доступа к общим схемам. Он также имеет множество оптимизаций запросов, он масштабируемый, отказоустойчивый и предоставляет интерфейс JDBC.

Apache Kylin: это новое распределенное хранилище аналитических данных. Kylin чрезвычайно быстр, поэтому его можно использовать для дополнения некоторых других баз данных, таких как Hive, для случаев использования, где важна производительность, таких как панели мониторинга или интерактивные отчеты. Это, вероятно, лучшее хранилище данных OLAP, но его сложно использовать. Другая проблема заключается в том, что из-за большой растягиваемости требуется больше места для хранения. Идея состоит в том, что если механизмы запросов или Hive недостаточно быстры, вы можете создать Куб в Kylin, который представляет собой многомерную таблицу, оптимизированную для OLAP, с предварительно вычисленными
значениями, которые вы можете запрашивать из панелей мониторинга или интерактивных отчетов. Он может создавать кубики прямо из Spark и даже почти в реальном времени из Kafka.



Инструменты OLAP


В эту категорию я включаю более новые механизмы, которые представляют собой эволюцию предыдущих баз данных OLAP, которые предоставляют больше функций, создавая комплексную аналитическую платформу. Фактически, они представляют собой гибрид двух предыдущих категорий, добавляющих индексацию в ваши базы данных OLAP. Они живут за пределами платформы Hadoop, но тесно интегрированы. В этом случае вы обычно пропускаете этап обработки и непосредственно используете эти инструменты.

Они пытаются решить проблему запросов к данным в реальном времени и к историческим данным единообразно, поэтому вы можете немедленно запрашивать данные в реальном времени, как только они становятся доступными, вместе с историческими данными с низкой задержкой, чтобы вы могли создавать интерактивные приложения и информационные панели. Эти инструменты позволяют во многих случаях запрашивать необработанные данные практически без преобразования в стиле ELT, но с высокой производительностью, лучше, чем у обычных баз данных OLAP.

Их объединяет то, что они обеспечивают унифицированное представление данных, прием данных в реальном времени и пакетных данных, распределенную индексацию, собственный формат данных, поддержку SQL, интерфейс JDBC, поддержку горячих и холодных данных, множественные интеграции и хранилище метаданных.

Apache Druid: это самый известный движок OLAP в реальном времени. Он ориентирован на данные временных рядов, но может использоваться для любых данных. Он использует свой собственный столбчатый формат, который может сильно сжимать данные, и он имеет множество встроенных оптимизаций, таких как инвертированные индексы, кодирование текста, автоматическое сворачивание данных и многое другое. Данные загружаются в реальном времени с использованием Tranquility или Kafka, которые имеют очень низкую задержку, хранятся в памяти в формате строки, оптимизированном для записи, но как только они поступают, они становятся доступными для запроса, как и предыдущие загруженные данные. Фоновая процесс отвечает за асинхронное перемещение данных в систему глубокого хранения, такую как HDFS. Когда данные перемещаются в глубокое хранилище, они разделяются на более мелкие фрагменты, сегрегированные по времени, называемые сегментами, которые хорошо оптимизированы для запросов с низкой задержкой. У такого сегмента есть отметка времени для нескольких измерений, которые вы можете использовать для фильтрации и агрегирования, и метрики, которые представляют собой предварительно вычисленные состояния. При пакетном приеме данные сохраняются непосредственно в сегменты. Apache Druid поддерживает проглатывание по принципу push и pull, имеет интеграцию с Hive, Spark и даже NiFi. Он может использовать хранилище метаданных Hive и поддерживает запросы Hive SQL, которые затем преобразуются в запросы JSON, используемые Druid. Интеграция Hive поддерживает JDBC, поэтому вы можете подключить любой инструмент бизнес-аналитики. Он также имеет собственное хранилище метаданных, обычно для этого используется MySQL. Он может принимать огромные объемы данных и очень хорошо масштабируется. Основная проблема в том, что в нем много компонентов, и им сложно управлять и развертывать.



Apache Pinot: это более новая альтернатива Druid с открытым исходным кодом от LinkedIn. По сравнению с Druid, он предлагает меньшую задержку благодаря индексу Startree, который выполняет частичное предварительное вычисление, поэтому его можно использовать для приложений, ориентированных на пользователя (он использовался для получения лент LinkedIn). Он использует отсортированный индекс вместо инвертированного, который работает быстрее. Он имеет расширяемую архитектуру плагинов, а также имеет множество интеграций, но не поддерживает Hive. Он также объединяет пакетную обработку и режим реального времени, обеспечивает быструю загрузку, интеллектуальный индекс и сохраняет данные в сегментах. Его легче и быстрее развернуть по сравнению с Druid, но на данный момент он выглядит немного недозрелым.

ClickHouse: будучи написанным на C++, этот движок обеспечивает невероятную производительность для запросов OLAP, особенно для агрегатов. Это похоже на реляционную базу данных, поэтому вы можете легко моделировать данные. Он очень прост в настройке и имеет множество интеграций.

Прочитайте эту статью, которая сравнивает 3 движка детально.

Начните с малого, изучив свои данные, прежде чем принимать решение. Эти новые механизмы очень мощные, но их сложно использовать. Если вы можете ждать по несколько часов, используйте пакетную обработку и базу данных, такую как Hive или Tajo; затем используйте Kylin, чтобы ускорить запросы OLAP и сделать их более интерактивными. Если этого недостаточно и вам нужно еще меньше задержки и данные в реальном времени, подумайте о механизмах OLAP. Druid больше подходит для анализа в реальном времени. Кайлин больше ориентирован на дела OLAP. Druid имеет хорошую интеграцию с Kafka в качестве потоковой передачи в реальном времени. Kylin получает данные из Hive или Kafka партиями, хотя планируется прием в реальном времени.

Наконец, Greenplum еще один механизм OLAP, больше ориентированный на искусственный интеллект.

Визуализация данных


Для визуализации есть несколько коммерческих инструментов, таких как Qlik, Looker или Tableau.

Если вы предпочитаете Open Source, посмотрите в сторону SuperSet. Это замечательный инструмент, который поддерживает все упомянутые нами инструменты, имеет отличный редактор и действительно быстрый, он использует SQLAlchemy, что обеспечивает поддержку многих баз данных.

Другие интересные инструменты: Metabase или Falcon.

Заключение


Есть широкий спектр инструментов, которые можно использовать для обработки данных, от гибких механизмов запросов, таких как Presto, до высокопроизводительных хранилищ, таких как Kylin. Единого решения не существует, я советую изучить имеющиеся данные и начать с малого. Механизмы запросов хорошая отправная точка из-за их гибкости. Затем для различных случаев использования вам может потребоваться добавить дополнительные инструменты чтобы достигнуть необходимого вам уровня обслуживания.

Обратите особое внимание на новые инструменты, такие как Druid или Pinot, которые обеспечивают простой способ анализа огромных объемов данных с очень низкой задержкой, сокращая разрыв между OLTP и OLAP с точки зрения производительности. У вас может возникнуть соблазн подумать об обработке, предварительном вычислении агрегатов и т.п., но подумайте об этих инструментах, если вы хотите упростить свою работу.
Подробнее..

Применение low-code в аналитических платформах

24.09.2020 18:11:15 | Автор: admin
Уважаемые читатели, доброго дня!

Задача построения программных платформ для накопления и проведения аналитики над данными рано или поздно возникает у любой компании, в основе бизнеса которой заложена интеллектуально нагруженная модель оказания услуг или создания технически сложно изготавливаемых продуктов. Построение аналитических платформ сложная и трудозатратная задача. Однако любую задачу можно упростить. В этой статье я хочу поделиться опытом применения low-code-инструментов, помогающих в создании аналитических решений. Данный опыт был приобретён при реализации ряда проектов направления Big Data Solutions компании Неофлекс. Направление Big Data Solutions компании Неофлекс с 2005 года занимается вопросами построения хранилищ и озёр данных, решает задачи оптимизации скорости обработки информации и работает над методологией управления качеством данных.



Избежать осознанного накопления слабо и/или сильно структурированных данных не удастся никому. Пожалуй, даже если речь будет идти о малом бизнесе. Ведь при масштабировании бизнеса перспективный предприниматель столкнётся с вопросами разработки программы лояльности, захочет провести анализ эффективности точек продаж, подумает о таргетированной рекламе, озадачится спросом на сопроводительную продукцию. В первом приближении задача может быть решена на коленке. Но при росте бизнеса приход к аналитической платформе все же неизбежен.

Однако в каком случае задачи аналитики данных могут перерасти в задачи класса Rocket Science? Пожалуй, в тот момент, когда речь идёт о действительно больших данных.
Чтобы упростить задачу Rocket Science, можно есть слона по частям.



Чем большая дискретность и автономность будет у ваших приложений/сервисов/микросервисов, тем проще вам, вашим коллегам и всему бизнесу будет переваривать слона.

К этому постулату пришли практически все наши клиенты, перестроив ландшафт, основываясь на инженерных практиках DevOps-команд.

Но даже при раздельной, слоновьей диете мы имеем неплохие шансы на перенасыщение IT-ландшафта. В этот момент стоит остановиться, выдохнуть и посмотреть в сторону low-code engineering platform.

Многих разработчиков пугает перспектива появления тупика в карьере при уходе от непосредственного написания кода в сторону перетаскивания стрелочек в UI-интерфейсах low-code систем. Но появление станков не привело к исчезновению инженеров, а вывело их работу на новый уровень!

Давайте разбираться почему.

Анализ данных таких сфер, как: логистика, телеком-индустрия, медиаисследования, финансовый сектора бизнеса, всегда сопряжён со следующими вопросами:

  • Скорость проведения автоматизированного анализа;
  • Возможность проведения экспериментов без воздействия на основной поток производства данных;
  • Достоверность подготовленных данных;
  • Отслеживание изменений и версионирование;
  • Data proveance, Data lineage, CDC;
  • Быстрота доставки новых фич на продукционное окружение;
  • И пресловутое: стоимость разработки и поддержки.

То есть у инженеров имеется огромное количество высокоуровневых задач, выполнить которые с достаточной эффективностью можно, лишь очистив сознание от задач низкоуровневой разработки.

Предпосылками перехода разработчиков на новый уровень стали эволюция и цифровизация бизнеса. Ценность разработчика также изменяется: в значительном дефиците находятся разработчики, способные погрузиться в суть концепций автоматизируемого бизнеса.

Давайте проведём аналогию с низкоуровневыми и высокоуровневыми языками программирования. Переход от низкоуровневых языков в сторону высокоуровневых это переход от написания прямых директив на языке железа в сторону директив на языке людей. То есть добавление некоторого слоя абстракции. В таком случае переход на low-code-платформы с высокоуровневых языков программирования это переход от директив на языке людей в сторону директив на языке бизнеса. Если найдутся разработчики, которых этот факт опечалит, тогда опечалены они, возможно, ещё с того момента, как на свет появился Java Script, в котором используются функции сортировки массива. И эти функции, разумеется, имеют под капотом программную имплементацию другими средствами того же самого высокоуровнего программирования.

Следовательно, low-code это всего лишь появление ещё одного уровня абстракции.

Прикладной опыт использования low-code


Тема low-code достаточно широка, но сейчас я хотел бы рассказать о прикладном применении малокодовых концепций на примере одного из наших проектов.

Подразделение Big Data Solutions компании Неофлекс в большей степени специализируется на финансовом секторе бизнеса, cтроя хранилища и озёра данных и автоматизируя различную отчётность. В данной нише применение low-code давно стало стандартом. Среди прочих low-code-инструментов можно упомянуть средства для организации ETL-процессов: Informatica Power Center, IBM Datastage, Pentaho Data Integration. Или же Oracle Apex, выступающий средой быстрой разработки интерфейсов доступа и редактирования данных. Однако применение малокодовых средств разработки не всегда сопряжено с построением узконаправленных приложений на коммерческом стеке технологий с явно выраженной зависимостью от вендора.

С помощью low-code-платформ можно также организовывать оркестрацию потоков данных, создать data-science-площадки или, например, модули проверки качества данных.

Одним из прикладных примеров опыта использования малокодовых средств разработки является коллаборация Неофлекс c компанией Mediascope, одним из лидеров российского рынка исследований медиа. Одна из задач бизнеса данной компании производство данных, на основе которых рекламодатели, интернет-площадки, телеканалы, радиостанции, рекламные агентства и бренды принимают решение о покупке рекламы и планируют свои маркетинговые коммуникации.



Медиаисследования технологически нагруженная сфера бизнеса. Распознавание видеоряда, сбор данных с устройств, анализирующих просмотр, измерение активности на веб-ресурсах всё это подразумевает наличие у компании большого IT-штата и колоссального опыта в построении аналитических решений. Но экспоненциальный рост количества информации, числа и разнообразия ее источников заставляет постоянно прогрессировать IT-индустрию данных. Самым простым решением масштабирования уже функционирующей аналитической платформы Mediascope могло стать увеличение штата IT. Но гораздо более эффективное решение это ускорение процесса разработки. Одним из шагов, ведущих в эту сторону, может являться применение low-code-платформ.

На момент старта проекта у компании уже имелось функционирующее продуктовое решение. Однако реализация решения на MSSQL не могла в полной мере соответствовать ожиданиям по масштабированию функционала с сохранением приемлемой стоимости доработки.

Стоявшая перед нами задача была поистине амбициозной Неофлекс и Mediascope предстояло создать промышленное решение менее чем за год, при условии выхода MVP уже в течение первого квартала от даты начала работ.

В качестве фундамента для построения новой платформы данных, основанной на low-code-вычислениях, был выбран стек технологий Hadoop. Стандартом хранения данных стал HDFS с использованием файлов формата parquet. Для доступа к данным, находящимся в платформе, использован Hive, в котором все доступные витрины представлены в виде внешних таблиц. Загрузка данных в хранилище реализовывалась с помощь Kafka и Apache NiFi.

Lowe-code-инструмент в данной концепции был применён для оптимизации самой трудозатратной задачи в построении аналитической платформы задачи расчёта данных.



Основным механизмом для маппирования данных был выбран low-code-инструмент Datagram. Neoflex Datagram это средство для разработки трансформаций и потоков данных.
Применяя данный инструмент, можно обойтись без написания кода на Scala вручную. Scala-код генерируется автоматически с использованием подхода Model Driven Architecture.

Очевидный плюс такого подхода ускорение процесса разработки. Однако помимо скорости есть ещё и следующие достоинства:

  • Просмотр содержимого и структуры источников/приемников;
  • Отслеживание происхождения объектов потока данных до отдельных полей (lineage);
  • Частичное выполнение преобразований с просмотром промежуточных результатов;
  • Просмотр исходного кода и его корректировка перед выполнением;
  • Автоматическая валидация трансформаций;
  • Автоматическая загрузка данных 1 в 1.

Порог вхождения в low-code-решения для генерации трансформаций достаточно невысок: разработчику необходимо знать SQL и иметь опыт работы с ETL-инструментами. При этом стоит оговориться, что code-driven-генераторы трансформаций это не ETL-инструменты в широком понимании этого слова. Low-code-инструменты могут не иметь собственного окружения для выполнения кода. То есть сгенерированный код будет выполняться на том окружении, которое имелось на кластере ещё до инсталляции low-code-решения. И это, пожалуй, ещё один плюс в карму low-code. Так как в параллель с low-code-командой может работать классическая команда, реализующая функционал, например, на чистом Scala-коде. Втягивание доработок обеих команд в продуктив будет простым и бесшовным.

Пожалуй, стоит ещё отметить, что помимо low-code есть ещё и no-code решения. И по своей сути это разные вещи. Low-code в большей степени позволяет разработчику вмешиваться в генерируемый код. В случае с Datagram возможен просмотр и редактирование генерируемого кода Scala, no-code такой возможности может не предоставлять. Эта разница весьма существенна не только в плане гибкости решения, но и в плане комфорта и мотивации в работе дата-инженеров.

Архитектура решения


Давайте попробуем разобраться, как именно low-code-инструмент помогает решить задачу оптимизации скорости разработки функционала расчёта данных. Для начала разберём функциональную архитектуру системы. В данном случае примером выступает модель производства данных для медиаисследований.



Источники данных в нашем случае весьма разнородны и многообразны:

  • Пиплметры (ТВ-метры) программно-аппаратные устройства, считывающие пользовательское поведение у респондентов телевизионной панели кто, когда и какой телеканал смотрел в домохозяйстве, которое участвует в исследовании. Поставляемая информация это поток интервалов смотрения эфира с привязкой к медиапакету и медиапродукту. Данные на этапе загрузки в Data Lake могут быть обогащены демографическими атрибутами, привязкой к геострате, таймзоне и другими сведениями, необходимыми для проведения анализа телепросмотра того или иного медиа продукта. Произведённые измерения могут быть использованы для анализа или планирования рекламных компаний, оценки активности и предпочтений аудитории, составления эфирной сетки;
  • Данные могут поступать из систем мониторинга потокового телевещания и замера просмотра контента видеоресурсов в интернете;
  • Измерительные инструменты в web-среде, среди которых как site-centric, так и user-centric счётчики. Поставщиком данных для Data Lake может служить надстройка браузера research bar и мобильное приложение со встроенным VPN.
  • Данные также могут поступать с площадок, консолидирующих результаты заполнения онлайн-анкет и итоги проведения телефонных интервью в опросных исследованиях компании;
  • Дополнительное обогащение озера данных может происходить за счёт загрузки сведений из логов компаний-партнёров.

Имплементация as is загрузки из систем-источников в первичный staging сырых данных может быть организована различными способами. В случае использования для этих целей low-code возможна автоматическая генерация сценариев загрузки на основе метаданных. При этом нет необходимости спускаться на уровень разработки source to target мэппингов. Для реализации автоматической загрузки нам необходимо установить соединение с источником, после чего определить в интерфейсе загрузки перечень сущностей, подлежащих загрузке. Создание структуры каталогов в HDFS произойдёт автоматически и будет соответствовать структуре хранения данных в системе-источнике.

Однако в контексте данного проекта эту возможность low-code-платформы мы решили не использовать в силу того, что компания Mediascope уже самостоятельно начала работу по изготовлению аналогичного сервиса на связке Nifi + Kafka.

Стоит сразу обозначить, что данные инструменты являются не взаимозаменяющими, а скорее дополняющими друг друга. Nifi и Kafka способны работать как в прямой (Nifi -> Kafka), так и в обратной (Kafka -> Nifi) связке. Для платформы медиаисследований использовался первый вариант связки.



В нашем случае найфаю требовалось обрабатывать различные типы данных из систем-источников и пересылать их брокеру Kafka. При этом направление сообщений в определённый топик Kafka производилось посредством применения Nifi-процессоров PublishKafka. Оркестрация и обслуживание этих pipeline`ов производится в визуальном интерфейсе. Инструмент Nifi и использование связки Nifi + Kafka также можно назвать low-code-подходом к разработке, обладающим низким порогом вхождения в технологии Big Data и ускоряющим процесс разработки приложений.

Следующим этапом в реализации проекта являлось приведение к формату единого семантического слоя детальных данных. В случае наличия у сущности исторических атрибутов расчёт производится в контексте рассматриваемой партиции. Если же сущность не является исторической, то опционально возможен либо пересчёт всего содержимого объекта, либо вовсе отказ от пересчёта этого объекта (вследствие отсутствия изменений). На данном этапе происходит генерация ключей для всех сущностей. Ключи сохраняются в соответствующие мастер-объектам справочники Hbase, содержащие соответствие между ключами в аналитической платформе и ключами из систем-источников. Консолидация атомарных сущностей сопровождается обогащением результатами предварительного расчёта аналитических данных. Framework`ом для расчёта данных являлся Spark. Описанный функционал приведения данных к единой семантике был реализован также на основе маппингов low-code-инструмента Datagram.

В целевой архитектуре требовалось обеспечить наличие SQL-доступа к данным для бизнес-пользователей. Для данной опции был использован Hive. Регистрация объектов в Hive производится автоматически при включении опции Registr Hive Table в low-code-инструменте.



Управление потоком расчёта


Datagram имеет интерфейс для построения дизайна потоков workflow. Запуск маппингов может осуществляться с использованием планировщика Oozie. В интерфейсе разработчика потоков возможно создание схем параллельного, последовательного или зависящего от заданных условий исполнения преобразований данных. Имеется поддержка shell scripts и java-программ. Также возможно использование сервера Apache Livy. Apache Livy используется для запуска приложений непосредственно из среды разработки.

В случае, если у компании уже есть собственный оркестратор процессов, возможно использование REST API для встраивания маппингов в уже имеющийся поток. Например, у нас имелся достаточно успешный опыт встраивания маппингов на Scala в оркестраторы, написанные на PLSQL и Kotlin. REST API малокодового инструмента подразумевает наличие таких операций, как генерация исполняемого года на основе дизайна маппинга, вызов маппинга, вызов последовательности маппингов и, разумеется, передача в URL параметров для запуска маппингов.

Наравне с Oozie возможно организовать поток расчёта средствами Airflow. Пожалуй, не буду долго останавливаться на сравнении Oozie и Airflow, а просто скажу, что в контексте работ по проекту медиаисследований выбор пал в сторону Airflow. Главными аргументами на этот раз оказались более активное сообщество, развивающее продукт, и более развитый интерфейс + API.



Airflow также хорош потому, что для описания процессов расчёта в нём используется многими любимый Python. Да и вообще, платформ управления рабочими процессами с открытым исходным кодом не так уж и много. Запуск и мониторинг выполнения процессов (в том числе с диаграммой Ганта) лишь добавляют очков в карму Airflow.

Форматом конфигурационного файла для запуска маппингов low-code-решения стал spark-submit. Произошло это по двум причинам. Во-первых, spark-submit позволяет напрямую запустить jar-файл из консоли. Во-вторых, он может содержать всю необходимую информацию для конфигурирования рабочего потока (что облегчает написание скриптов, формирующих Dag).
Наиболее часто встречающимся элементом рабочего потока Airflow в нашем случае стал SparkSubmitOperator.

SparkSubmitOperator позволяет запускать jar`ники упакованные маппинги Datagram с предварительно сформированными для них входными параметрами.

Следует упомянуть, что каждая задача Airflow выполняется в отдельном потоке и ничего не знает о других задачах. В связи с чем взаимодействие между задачами осуществляется с помощью управляющих операторов, таких как DummyOperator или BranchPythonOperator.

В совокупности использования low-code-решения Datagram в связке с универсализацией конфигурационных файлов (формирующих Dag) привело к существенному ускорению и упрощению процесса разработки потоков загрузки данных.

Расчёт витрин


Пожалуй, самый интеллектуально нагруженный этап в производстве аналитических данных это шаг построения витрин. В контексте одного из потоков расчёта данных исследовательской компании на данном этапе происходит приведение к эталонной трансляции с учётом поправки на часовые пояса с привязкой к сетке вещания. Также возможна поправка на локальную эфирную сетку (местные новости и реклама). Среди прочего на данном шаге выполняется разбивка интервалов непрерывного смотрения медиапродуктов на основе анализа интервалов смотрения. Тут же происходит взвешивание значений просмотра на основе сведений об их значимости (вычисление поправочного коэффициента).



Отдельным шагом подготовки витрин является валидация данных. Алгоритм валидации сопряжён с применением ряда математических science-моделей. Однако использование low-code-платформы позволяет разбить сложный алгоритм на ряд отдельных визуально считываемых маппингов. Каждый из маппингов выполняет узкую задачу. Вследствие чего возможен промежуточный дебаг, логирование и визуализация этапов подготовки данных.

Алгоритм валидации было решено дискретизировать на следующие подэтапы:

  • Построение регрессий зависимостей смотрения телесети в регионе со смотрением всех сетей в регионе за 60 дней.
  • Расчёт стьюдентизированных остатков (отклонения фактических значений от предсказанных регрессионной моделью) для всех точек регрессии и для расчетного дня.
  • Выборка аномальных пар регион-телесеть, где стьюдентизированный остаток расчетного дня превышает норму (заданную настройкой операции).
  • Пересчёт поправленного стьюдентизированного остатка по аномальным парам регион-телесеть для каждого респондента, смотревшего сеть в регионе с определением вклада данного респондента (величина изменения стьюдентизированного остатка) при исключении смотрения данного респондента из выборки.
  • Поиск кандидатов, исключение которых приводит стьюдентизированный остаток расчетного дня в норму;

Приведённый выше пример является подтверждением гипотезы о том, что у дата-инженера и так слишком много чего должно быть в голове И, если это действительно инженер, а не кодер, то страх профессиональной деградации при использовании low-code-инструментов у него должен окончательно отступить.

Что ещё может low-code?


Область применения low-code инструмента для пакетной и потоковой обработки данных без необходимости написания кода на Scala вручную не заканчивается.

Применение low-code в разработке datalake`ов для нас стало уже некоторым стандартом. Наверное, можно сказать, что решения на стеке Hadoop повторяют путь развития классических DWH, основанных на РСУБД. Малокодовые инструменты на стеке Hadoop могут решать, как задачи обработки данных, так и задачи построения конечных BI-интерфейсов. Причём нужно заметить, что под BI может пониматься не только репрезентация данных, но и их редактирование силами бизнес-пользователей. Данный функционал нами часто применяется при построении аналитических платформ для финансового сектора.



Среди прочего, с помощью low-code и, в частности, Datagram возможно решить задачу отслеживания происхождения объектов потока данных с атомарностью до отдельных полей (lineage). Для этого в low-code-инструменте имплементировано сопряжение с Apache Atlas и Cloudera Navigator. По сути, разработчику необходимо зарегистрировать набор объектов в словарях Atlas и ссылаться на зарегистрированные объекты при построении маппингов. Механизм отслеживания происхождения данных или анализ зависимостей объектов экономит большое количество времени при необходимости внесения доработок в алгоритмы расчёта. Например, при построении финансовой отчётности эта фишка позволяет комфортнее пережить период изменений законодательства. Ведь, чем качественнее мы осознаём межформенную зависимость в разрезе объектов детального слоя, тем меньше мы столкнёмся с внезапными дефектами и сократим количество реворков.



Data Quality & Low-code


Ещё одной задачей, реализованной low-code-инструментом на проекте компании Mediascope, стала задача класса Data Quality. Особенностью реализации конвейера проверки данных для проекта исследовательской компании было отсутствие влияния на работоспособность и скорость работы основного потока расчёта данных. Для возможности оркестрирования независимыми потоками проверки данных применялся уже знакомый Apache Airflow. По мере готовности каждого шага производства данных параллельно происходил запуск обособленной части DQ-конвейера.

Хорошей практикой считается наблюдение за качеством данных с момента их зарождения в аналитической платформе. Имея информацию о метаданных, мы можем уже с момента попадания информации в первичный слой проверять соблюдение базовых условий not null, constraints, foreign keys. Этот функционал реализован на основе автоматически генерируемых мэппингов семейства data quality в Datagram. Кодогенерация в данном случае также основывается на метаданных модели. На проекте компании Mediascope сопряжение происходило с метаданными продукта Enterprise Architect.

Благодаря сопряжению low-code-инструмента и Enterprise Architect автоматически были сгенерированы следующие проверки:

  • Проверка присутствия значений null в полях с модификатором not null;
  • Проверка присутствия дублей первичного ключа;
  • Проверка внешнего ключа сущности;
  • Проверка уникальности строки по набору полей.

Для более сложных проверок доступности и достоверности данных был создан мэппинг с Scala Expression, принимающий на вход внешний Spark SQL-код проверки, подготовленной силами аналитиков в Zeppelin.



Разумеется, к автогенерации проверок необходимо приходить постепенно. В рамках описываемого проекта этому предшествовали следующие шаги:
  • DQ, реализованные в блокнотах Zeppelin;
  • DQ, встроенные в мэппинг;
  • DQ в виде отдельных массивных мэппингов, содержащих целый набор проверок под отдельную сущность;
  • Универсальные параметризованные DQ-мэппинги, принимающие на вход информацию о метаданных и бизнес-проверках.

Пожалуй, основным преимуществом создания сервиса параметризированных проверок является сокращение времени доставки функционала на продукционное окружение. Новые проверки качества могут миновать классический паттерн доставки кода опосредованно через среды разработки и тестирования:

  • Все проверки метаданных генерируются автоматически при изменении модели в EA;
  • Проверки доступности данных (определение наличия каких-либо данных на момент времени) могут быть сгенерированы на основе справочника, хранящего ожидаемый тайминг появления очередной порции данных в разрезе объектов;
  • Бизнес-проверки достоверности данных создаются силами аналитиков в notebook`ах Zeppelin. Откуда направляются прямиком в настроечные таблицы модуля DQ на продукционном окружении.

Риски прямой отгрузки скриптов на прод отсутствуют как таковые. Даже при синтаксической ошибке максимум, что нам грозит, невыполнение одной проверки, ведь поток расчёта данных и поток запуска проверок качества разведены между собой.

По сути, сервис DQ перманентно запущен на продукционном окружении и готов начать свою работу в момент появления очередной порции данных.

Вместо заключения


Преимущество применения low-code очевидно. Разработчикам не нужно разрабатывать приложение с нуля. А освобождённый от дополнительных задач программист даёт результат быстрее. Скорость, в свою очередь, высвобождает дополнительный ресурс времени на разрешение вопросов оптимизации. Следовательно, в данном случае можно рассчитывать на наличие более качественного и быстрого решения.

Разумеется, low-code не панацея, и волшебство само по себе не случится:

  • Малокодовая индустрия проходит стадию крепчания, и пока в ней нет однородных индустриальных стандартов;
  • Многие low-code-решения не бесплатны, и их приобретение должно быть осознанным шагом, сделать который следует при полной уверенности финансовой выгоды от их использования;
  • Многие малокодовые решения не всегда хорошо дружат с GIT / SVN. Либо неудобны в использовании в случае сокрытия генерируемого кода;
  • При расширении архитектуры может потребоваться доработка малокодового решения что, в свою очередь, провоцирует эффект привязанности и зависимости от поставщика low-code-решения.
  • Должный уровень обеспечения безопасности возможен, но весьма трудозатратен и сложен в реализации движков low-code-систем. Малокодовые платформы должны выбираться не только по принципу поиска выгоды от их использования. При выборе стоит задаться вопросами наличия функционала управлением доступа и делегированием/эскалацией идентификационных данных на уровень всего IT-ландшафта организации.



Однако если все недостатки выбранной системы вам известны, и бенефиты от её использования, тем не менее, находятся в доминирующем большинстве, то переходите к малому коду без боязни. Тем более, что переход на него неизбежен как неизбежна любая эволюция.

Если один разработчик на low-code-платформе будет выполнять свою работу быстрее, чем два разработчика без low-code, то это даёт компании фору во всех отношениях. Порог вхождения в low-code-решения более низкий, чем в традиционные технологии, и это положительным образом сказывается на вопросе кадрового дефицита. При использовании малокодовых инструментов возможно ускорение взаимодействия между функциональными командами и более быстрое принятие решений о корректности выбранного пути data-science-исследований. Низкоуровневые платформы могут выступить причиной цифровой трансформации организации, поскольку производимые решения могут быть доступны к пониманию нетехническим специалистам (в частности, бизнес-пользователям).

Если у вас сжатые сроки, нагруженная бизнес-логика, дефицит технологической экспертизы, и вам требуется ускорить time to market, то low-code это один из способов удовлетворения ваших потребностей.

Не стоит отрицать значимость традиционных инструментов разработки, однако во многих случаях применение малокодовых решений лучший способ повысить эффективность решаемых задач.
Подробнее..

Spark schemaEvolution на практике

19.10.2020 16:12:07 | Автор: admin
Уважаемые читатели, доброго дня!

В данной статье ведущий консультант бизнес-направления Big Data Solutions компании Неофлекс, подробно описывает варианты построения витрин переменной структуры с использованием Apache Spark.

В рамках проекта по анализу данных, часто возникает задача построения витрин на основе слабо структурированных данных.

Обычно это логи, или ответы различных систем, сохраняемые в виде JSON или XML. Данные выгружаются в Hadoop, далее из них нужно построить витрину. Организовать доступ к созданной витрине можем, например, через Impala.

В этом случае схема целевой витрины предварительно неизвестна. Более того, схема еще и не может быть составлена заранее, так как зависит от данных, а мы имеем дело с этими самыми слабо структурированными данными.

Например, сегодня логируется такой ответ:

{source: "app1", error_code: ""}

а завтра от этой же системы приходит такой ответ:

{source: "app1", error_code: "error", description: "Network error"}

В результате в витрину должно добавиться еще одно поле description, и придет оно или нет, никто не знает.

Задача создания витрины на таких данных довольно стандартная, и у Spark для этого есть ряд инструментов. Для парсинга исходных данных есть поддержка и JSON, и XML, а для неизвестной заранее схемы предусмотрена поддержка schemaEvolution.

С первого взгляда решение выглядит просто. Надо взять папку с JSON и прочитать в dataframe. Spark создаст схему, вложенные данные превратит в структуры. Далее все нужно сохранить в parquet, который поддерживается в том числе и в Impala, зарегистрировав витрину в Hive metastore.

Вроде бы все просто.

Однако, из коротких примеров в документации не ясно, что делать с рядом проблем на практике.

В документации описывается подход не для создания витрины, а для чтения JSON или XML в dataframe.

А именно, просто приводится как прочитать и распарсить JSON:

df = spark.read.json(path...)

Этого достаточно, чтобы сделать данные доступными для Spark.

На практике сценарий гораздо сложнее, чем просто прочитать JSON файлы из папки, и создать dataframe. Ситуация выглядит так: уже есть определенная витрина, каждый день приходят новые данные, их нужно добавить в витрину, не забывая, что схема может отличаться.

Обычная схема построения витрины такая:

Шаг 1. Данные загружаются в Hadoop с последующей ежедневной дозагрузкой и складываются в новую партицию. Получается партиционированная по дням папка с исходными данными.

Шаг 2. В ходе инициализирующей загрузки эта папка читается и парсится средствами Spark. Полученный dataframe сохраняется в формат, доступный для анализа, например, в parquet, который потом можно импортировать в Impala. Так создается целевая витрина со всеми данными, которые накопились к этому моменту.

Шаг 3. Создается загрузка, которая будет каждый день обновлять витрину.
Возникает вопрос инкрементальной загрузки, необходимость партиционирования витрины, и вопрос поддержки общей схемы витрины.

Приведем пример. Допустим, реализован первый шаг построения хранилища, и настроена выгрузка JSON файлов в папку.

Создать из них dataframe, чтобы потом сохранить как витрину, проблем не составляет. Это тот самый первый шаг, который легко можно найти в документации Spark:

df = spark.read.option("mergeSchema", True).json(".../*") df.printSchema()root |-- a: long (nullable = true) |-- b: string (nullable = true) |-- c: struct (nullable = true) |    |-- d: long (nullable = true)

Вроде бы все хорошо.

Мы прочитали и распарсили JSON, далее сохраняем dataframe как parquet, регистрируя в Hive любым удобным способом:

df.write.format(parquet).option('path','<External Table Path>').saveAsTable('<Table Name>')

Получаем витрину.

Но, на другой день добавились новые данные из источника. У нас есть папка с JSON, и витрина, созданная на основе данной папки. После загрузки следующей порции данных из источника в витрине не хватает данных за один день.

Логичным решением будет партиционировать витрину по дням, что позволит каждый следующий день добавлять новую партицию. Механизм этого тоже хорошо известен, Spark позволяет записывать партиции отдельно.

Сначала делаем инициализирующую загрузку, сохраняя данные как было описано выше, добавив только партиционирование. Это действие называется инициализация витрины и делается только один раз:

df.write.<b>partitionBy("date_load")</b><u></u>.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

На следующий день загружаем только новую партицию:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Остается только перерегистрировать в Hive, чтобы обновить схему.
Однако, здесь и возникают проблемы.

Первая проблема. Рано или поздно получившийся parquet нельзя будет прочитать. Это связано с тем, как по-разному подходят parquet и JSON к пустым полям.

Рассмотрим типичную ситуацию. Например, вчера приходит JSON:

День 1: {"a": {"b": 1}},

а сегодня этот же JSON выглядит так:

День 2: {"a": null}

Допустим, у нас две разных партиции, в которых по одной строке.
Когда мы читаем исходные данные целиком, Spark сумеет определить тип, и поймет, что a это поле типа структура, со вложенным полем b типа INT. Но, если каждая партиция была сохранена по отдельности, то получается parquet с несовместимыми схемами партиций:

df1 (a: <struct<"b": INT>>)df2 (a: STRING NULLABLE)

Данная ситуация хорошо известна, поэтому специально добавлена опция при парсинге исходных данных удалять пустые поля:

df = spark.read.json("...", dropFieldIfAllNull=True)

В этом случае parquet будет состоять из партиций, которые можно будет прочитать вместе.
Хотя те, кто делал это на практике, тут горько усмехнутся. Почему? Да потому, что скорее всего возникнут еще две ситуации. Или три. Или четыре. Первая, которая возникнет почти наверняка, числовые типы будут по-разному выглядеть в разных JSON файлах. Например, {intField: 1} и {intField: 1.1}. Если такие поля попадутся в одной партции, то мерж схемы прочитает все правильно, приведя к самому точному типу. А вот если в разных, то в одной будет intField: int, а в другой intField: double.

Для обработки этой ситуации есть следующий флаг:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Теперь у нас есть папка, где находятся партиции, которые можно прочитать в единый dataframe и валидный parquet всей витрины. Да? Нет.

Надо вспомнить, что мы регистрировали таблицу в Hive. Hive не чувствителен к регистру в названиях полей, а parquet чувствителен. Поэтому партиции со схемами: field1: int, и Field1: int для Hive одинаковые, а для Spark нет. Надо не забыть привести названия полей к нижнему регистру.

Вот после этого, кажется, все хорошо.

Однако, не все так просто. Возникает вторая, тоже хорошо известная проблема. Так как каждая новая партиция сохраняется отдельно, то в папке партиции будут лежать служебные файлы Spark, например, флаг успешности операции _SUCCESS. Это приведет к ошибке при попытке parquet. Чтобы этого избежать, надо настроить конфигурацию, запретив Spark дописывать в папку служебные файлы:

hadoopConf = sc._jsc.hadoopConfiguration()hadoopConf.set("parquet.enable.summary-metadata", "false")hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Кажется, теперь каждый день в папку целевой витрины добавляется новая parquet партиция, где лежат распарсенные данные за день. Мы заранее озаботились тем, чтобы не было партиций с конфликтом типов данных.

Но, перед нами третья проблема. Теперь общая схема не известна, более того, в Hive таблица с неправильной схемой, так как каждая новая партиция, скорее всего, внесла искажение в схему.

Нужно перерегистрировать таблицу. Это можно сделать просто: снова прочитать parquet витрины, взять схему и создать на ее основе DDL, с которым заново зарегистрировать папку в Hive как внешнюю таблицу, обновив схему целевой витрины.

Перед нами возникает четвертая проблема. Когда мы регистрировали таблицу первый раз, мы опирались на Spark. Теперь делаем это сами, и нужно помнить, что поля parquet могут начинаться с символов, недопустимых для Hive. Например, Spark выкидывает строки, которые не смог распарсить в поле corrupt_record. Такое поле нельзя будет зарегистрировать в Hive без того, чтобы экранировать.

Зная это, получаем схему:

f_def = ""for f in pf.dtypes:  if f[0] != "date_load":    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"hc.sql("drop table if exists jsonevolvtable")hc.sql(table_define)

Код ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace(array<`, array<) делает безопасный DDL, то есть вместо:

create table tname (_field1 string, 1field string)

С такими названиями полей как _field1, 1field, делается безопасный DDL, где названия полей экранированы: create table `tname` (`_field1` string, `1field` string).

Возникает вопрос: как правильно получить dataframe с полной схемой (в коде pf)? Как получить этот pf? Это пятая проблема. Перечитывать схему всех партиций из папки с parquet файлами целевой витрины? Это метод самый безопасный, но тяжелый.

Схема уже есть в Hive. Получить новую схему можно, объединив схему всей таблицы и новой партиции. Значит надо схему таблицы брать из Hive и объединить ее со схемой новой партиции. Это можно сделать, прочитав тестовые метаданные из Hive, сохранив их во временную папку и прочитав с помощью Spark обе партиции сразу.

По сути, есть все, что нужно: исходная схема таблицы в Hive и новая партиция. Данные у нас тоже есть. Остается лишь получить новую схему, в которой объединяется схема витрины и новые поля из созданной партиции:

from pyspark.sql import HiveContextfrom pyspark.sql.functions import lithc = HiveContext(spark)df = spark.read.json("...", dropFieldIfAllNull=True)df.write.mode("overwrite").parquet(".../date_load=12-12-2019")pe = hc.sql("select * from jsonevolvtable limit 1")pe.write.mode("overwrite").parquet(".../fakePartiton/")pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Далее создаем DDL регистрации таблицы, как в предыдущем фрагменте.
Если вся цепочка работает правильно, а именно была инициализирующая загрузка, и в Hive правильно созданная таблица, то мы получаем обновленную схему таблицы.

И последняя, проблема заключается в том, что нельзя так просто добавить партицию в таблицу Hive, так как она будет сломана. Необходимо заставить Hive починить у себя структуру партиций:

from pyspark.sql import HiveContexthc = HiveContext(spark) hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Простая задача чтения JSON и создания на его основе витрины выливается в преодоление ряда неявных трудностей, решения для которых приходится искать по отдельности. И хотя эти решения простые, но на их поиск уходит много времени.

Чтобы реализовать построение витрины пришлось:

Добавлять партиции в витрину, избавившись от служебных файлов
Разобраться с пустыми полями в исходных данных, которые Spark типизировал
Привести простые типы к строке
Привести названия полей к нижнему регистру
Разделить выгрузку данных и регистрацию таблицы в Hive (создание DDL)
Не забыть экранировать названия полей, которые могут быть несовместимы с Hive
Научиться обновлять регистрацию таблицы в Hive

Подводя итог, отметим, что решение по построению витрин таит много подводных камней. Поэтому при возникновении сложностей в реализации лучше обратиться к опытному партнеру с успешной экспертизой.

Благодарим за чтение данной статьи, надеемся, что информация окажется полезной.
Подробнее..

Перевод Как Apache Spark 3.0 увеличивает производительность ваших SQL рабочих нагрузок

01.06.2021 12:20:22 | Автор: admin

Практически в каждом секторе, работающем со сложными данными, Spark "де-факто" быстро стал средой распределенных вычислений для команд на всех этапах жизненного цикла данных и аналитики. Одна из наиболее ожидаемых функций Spark 3.0 - это новая платформа Adaptive Query Execution (AQE), устраняющая проблемы, которые возникают при многих рабочих нагрузках Spark SQL. Они были задокументированы в начале 2018 года командой специалистов Intel и Baidu. Для более глубокого изучения фреймворка можно пройти наш обновленный курс по тюнингу производительности Apache Spark (Apache Spark Performance Tuning).

Наш опыт работы с Workload XM, безусловно, подтверждает реальность и серьезность этих проблем.

AQE был впервые представлен в Spark 2.4, но в Spark 3.0 и 3.1 он стал намного более развитым. Для начала, давайте посмотрим, какие проблемы решает AQE.

Недостаток первоначальной архитектуры Catalyst

На диаграмме ниже представлен вид распределенной обработки, которая происходит, когда вы выполняете простой group-by-count запрос с использованием DataFrames.

Spark определяет подходящее количество партиций для первого этапа, но для второго этапа использует по умолчанию "магическое число" - 200.

И это плохо по трем причинам:

1. 200 вряд ли будет идеальным количеством партиций, а именно их количество является одним из критических факторов, влияющих на производительность;

2. Если вы запишете результат этого второго этапа на диск, у вас может получиться 200 маленьких файлов;

3. Оптимизация и ее отсутствие имеют косвенный эффект: если обработка должна продолжаться после второго этапа, то вы можете упустить потенциальные возможности дополнительной оптимизации.

Что можно сделать? Вручную установить значение этого свойства перед выполнением запроса с помощью такого оператора:

spark.conf.set(spark.sql.shuffle.partitions,2)

Но это также создает некоторые проблемы:

  • Задавать данный параметр перед каждым запросом утомительно.

  • Эти значения станут устаревшими по мере эволюции ваших данных.

  • Этот параметр будет применяться ко всем шаффлингах в вашем запросе.

Перед первым этапом в предыдущем примере известно распределение и объем данных, и Spark может предложить разумное значение для количества партиций. Однако для второго этапа эта информация еще не известна, так как цена, которую нужно заплатить за ее получение, - это выполнение фактической обработки на первом этапе: отсюда и обращение к магическому числу.

Принцип работы Adaptive Query Execution

Основная идея AQE состоит в том, чтобы сделать план выполнения не окончательным и перепроверять статус после каждого этапа. Таким образом, план выполнения разбивается на новые абстракции этапов запроса, разделенных этапами.

Catalyst теперь останавливается на границе каждого этапа, чтобы попытаться применить дополнительную оптимизацию с учетом информации, доступной на основе промежуточных данных.

Поэтому AQE можно определить как слой поверх Spark Catalyst, который будет изменять план Spark "на лету".

Есть недостатки? Некоторые есть, но они второстепенные:

  • Выполнение останавливается на границе каждого этапа, чтобы Spark проверил свой план, но это компенсируется увеличением производительности.

  • Пользовательский интерфейс Spark труднее читать, потому что Spark создает для данного приложения больше заданий, и эти задания не подхватывают группу заданий и описание, которое вы задали.

Адаптивное количество перемешиваемых партиций

Эта функция AQE доступна, начиная с версии Spark 2.4.

Чтобы включить ее, вам нужно установить для spark.sql.adaptive.enabled значение true, значение по умолчанию - false. Когда AQE включено, количество партиций в случайном порядке регулируется автоматически и больше не равно 200 по умолчанию или заданному вручную значению.

Вот как выглядит выполнение первого запроса TPC-DS до и после включения AQE:

Динамическая конвертация Sort Merge Joins в Broadcast Joins

AQE преобразует соединения sort-merge в broadcast хэш-соединения, если статистика времени выполнения любой из сторон соединения меньше порога broadcast хэш-соединения.

Вот как выглядят последние этапы выполнения второго запроса TPC-DS до и после включения AQE:

Динамическое объединение shuffle партиций

Если количество разделов в случайном порядке больше, чем количество групп по ключам, то много циклов ЦП теряется из-за несбалансированного распределения ключей.

Когда оба:

spark.sql.adaptive.enabled и

spark.sql.adaptive.coalescePartitions.enabled

установлены на true, Spark объединит смежные перемешанные разделы в соответствии с целевым размером, указанным в spark.sql.adaptive.advisoryPartitionSizeInBytes. Это делается, чтобы избежать слишком большого количества мелких задач.

Динамическая оптимизация обьединений с перекосом

Skew (перекос) - это камень преткновения распределенной обработки. Это может задержать обработку буквально на несколько часов:

Без оптимизации время, необходимое для выполнения объединения, будет определяться самым большим разделом.

Оптимизация skew join, таким образом, разобъет раздел A0 на подразделы, используя значение, указанное park.sql.adaptive.advisoryPartitionSizeInBytes, и присоединит каждый из них к соответствующему разделу B0 таблицы B.

Следовательно, вам необходимо предоставить AQE свое определение перекоса.

Это включает в себя два параметра:

1. spark.sql.adaptive.skewJoin.skewedPartitionFactor является относительным: партиция считается с пересом, если ее размер больше, чем этот коэффициент, умноженный на средний размер партиции, а также, если он больше, чем

2. spark.sql.adaptive.skewedPartitionThresholdInBytes, который является абсолютным: это порог, ниже которого перекос будет игнорироваться.

Динамическое сокращение разделов

Идея динамического сокращения разделов (dynamic partition pruning, DPP) - один из наиболее эффективных методов оптимизации: считываются только те данные, которые вам нужны. Если в вашем запросе есть DPP, то AQE не запускается. DPP было перенесено в Spark 2.4 для CDP.

Эта оптимизация реализована как на логическом, так и на физическом уровне.

1. На логическом уровне фильтр размера идентифицируется и распространяется через обьединение на другую часть сканирования.

2. Затем на физическом уровне фильтр выполняется один раз в части измерения, и результат транслируется в основную таблицу, где также применяется фильтр.

DPP в действительности может работать с другими типами обьединений (например, SortMergeJoin), если вы отключите spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly.

В этом случае Spark оценит, действительно ли фильтр DPP улучшает производительность запроса.

DPP может привести к значительному увеличению производительности высокоселективных запросов, например, если ваш запрос фильтрует данные за один месяц из выборки за 5 лет.

Не все запросы получают такой впечатляющий прирост производительности, но 72 из 99 запросов TPC-DS положительно влияют на DPP.

Заключение

Spark прошел долгий путь от своей первоначальной базовой парадигмы: неспешного выполнения оптимизированного статического плана для статического набора данных.

Анализ статического набора данных был пересмотрен из-за потоковой передачи: команда Spark сначала создала довольно неуклюжий дизайн на основе RDD, прежде чем придумать лучшее решение с использованием DataFrames.

Часть статического плана подверглась сомнению со стороны SQL, а структура адаптивного выполнения запросов в некотором роде - это то, чем является структурированная потоковая передача для исходной потоковой библиотеки: элегантное решение, которым она должна была быть с самого начала.

Благодаря фреймворку AQE, DPP, усиленной поддержке графических процессоров и Kubernetes перспективы увеличения производительности теперь весьма многообещающие, поэтому мы и наблюдаем повсеместный переход на Spark 3.1

Подробнее..

Перевод Руководство по столбчатым форматам файлов в Spark и Hadoop для начинающих

25.01.2021 18:10:11 | Автор: admin

Эта статья является продолжением руководства по форматам файлов в Spark и Hadoop, которое послужит хорошей отправной точкой, если вы еще ничего не знаете о форматах файлов big data.

Что из себя представляет столбчатый формат файла?

Этот термин часто используется, но я не уверен, что всем до конца ясно, что он означает на практике.

Определение из учебника гласит, что столбчатые (колоночные, многоколоночные, columnar) форматы файлов хранят данные по столбцам, а не по строкам. CSV, TSV, JSON и Avro традиционные строковые форматы файлов. Файл Parquet и ORC это столбчатые форматы файлов.

Давайте проиллюстрируем различия между этими двумя концепциями, используя примеры некоторых данных и простой наглядный столбчатый формат файла, который я только что придумал.

Вот схема данных набора люди. Она довольно проста:

{  id: Integer,  first_name: String,  last_name: String,  age: Integer,  cool: Boolean,  favorite_fruit: Array[String]}

Данные в строковом формате строки

Мы могли бы представить этот набор данных в нескольких форматах, например, вот неструктурированный JSON-файл:

{"id": 1, "first_name": "Matthew", "last_name": "Rathbone", "age": 19, "cool": true, "favorite_fruit": ["bananas", "apples"]}{"id": 2, "first_name": "Joe", "last_name": "Bloggs", "age": 102, "cool": true, "favorite_fruit": null}

Вот те же данные во всеми любимом формате CSV

1, Matthew, Rathbone, 19, True, ['bananas', 'apples']2, Joe, Bloggs, 102, True,

В обоих этих форматах, каждая строчка файла содержит полную строку данных. Именно так люди представляют себе данные: аккуратные строки выстроены в линию, которые легко сканировать, легко редактировать в Excel, легко читать в терминале или текстовом редакторе.

Данные в столбчатом формате

В целях демонстрации я собираюсь ввести новый столбчатый (псевдо) формат файла. Назовем его CCSV (Столбчатый CSV).

Каждая строка нашего CCSV-файла имеет следующее содержимое:

Field Name/Field Type/Number of Characters:[data in csv format]

Вот те же данные представленные в CCSV:

ID/INT/3:1,2FIRST_NAME/STRING/11:Matthew,JoeLAST_NAME/STRING/15:Rathbone,BloggsAGE/INT/6:19,102COOL/BOOL/3:1,1FAVORITE_FRUIT/ARRAY[STRING]/19:[bananas,apples],[]

Обратите внимание, что все имена, возраст и любимые фрукты хранятся рядом друг с другом, а не вместе с другими данными из той же записи. Чтобы каждый раздел столбца не становился слишком большим, CCSV будет повторять этот паттерн каждые 1000 записей. Таким образом, файл из 10 000 записей будет содержать 10 разделов сгруппированных столбцовых данных.

Могут ли люди легко читать CCSV-файлы? В принципе да, но вам будет сложно составить целостное представление о данных, если вы откроете их в Excel. Однако CCSV имеет несколько полезных свойств, которые делают его предпочтительнее для компьютеров, и сейчас я о них расскажу.

Преимущества столбчатых форматов

Оптимизация чтения

Представим, что я хочу выполнить SQL-запрос к этим данным, например:

SELECT COUNT(1) from people where last_name = "Rathbone"

С обычным CSV-файлом SQL-движку пришлось бы сканировать каждую строку, анализировать каждый столбец, извлекать значение last_name, а затем пересчитать все значения Rathbone, которые он увидит.

В CCSV SQL-движок может смело пропустить первые два поля и просто просканировать третью строку, которая содержит все доступные значения фамилий.

Почему это хорошо? Теперь SQL-движок обрабатывает только около 1/6 данных, т.е. CCSV только что обеспечил (теоретическое и совершенно необоснованное) увеличение производительности на 600% по сравнению с обычными CSV-файлами.

Представьте себе такой же выигрыш на наборе данных петабайтного масштаба. Нетрудно оценить оптимизацию столбчатых форматов файлов, позволяющую сэкономить массу вычислительной мощности (и денег) по сравнению с обычными наборами данных в JSON. Это основная ценность столбчатых форматов файлов.

Конечно, на самом деле CCSV необходимо проделать еще немалый путь, чтобы стать жизнеспособным файловым форматом, но это немного уводит нас от темы, поэтому я не буду здесь на этом заостряться.

Улучшения сжатия

Совместное хранение однотипных данных также сулит преимущества для кодеков сжатия. Многие кодеки сжатия (включая GZIP и Snappy) имеют более высокий коэффициент сжатия при сжатии последовательностей схожих данных. Сохраняя записи столбец за столбцом, во многих случаях каждый раздел данных столбца будет содержать похожие значения, что делает крайне пригодным для сжатия. Фактически, каждый столбец можно сжать независимо от других для дальнейшей оптимизации.

Недостатки столбчатых форматов

Самый большой недостаток столбчатых форматов состоит в том, что восстановление полной записи происходит медленнее и требует чтения сегментов из каждой строки, один за другим. Именно по этой причине столбчатые форматы файлов изначально подходят для рабочих процессов аналитики, а не для рабочих процессов в стиле Map/Reduce, которые по умолчанию работают с целыми строками данных за раз.

Для реальных столбчатых форматов файлов (например, Parquet) этот недостаток сводится к минимуму с помощью некоторых хитрых приемов, таких как разбиение файла на группы строк и создание обширных метаданных, хотя для особенно широких наборов данных (например, 200+ столбцов) влияние на скорость может быть значительным.

Другой недостаток состоит в том, что они требуют больше ресурсов ЦП и оперативной памяти для записи, поскольку средство записи файлов должно собрать целую кучу метаданных и реорганизовать строки, прежде чем он сможет записать файл.

Попутно замечу, что я почти всегда рекомендую использовать столбчатый формат файла, если это возможно, чтобы иметь возможность быстро заглянуть в файл и собрать некоторые простые показатели.

Реальные столбчатые форматы файлов

Теперь, когда вы знаете, чего ожидать, давайте быстро рассмотрим реальный формат файла Parquet. Есть гораздо более подробные руководства по parquet, и я рекомендую вам прочитать официальную документацию parquet, в частности, чтобы понять, как все это работает.

Вот диаграмма из документации Parquet, показывающая структуру Parquet-файла. Она немного ошеломляет, но я думаю, что ключевым выводом является важность организации данных и метаданных. Я приведу здесь только общую схему, ведь документация являются исчерпывающим и в то же время достаточно доступными источником, если хотя бы в какой-то степени понимаете, что происходит (а вы, вероятно, к этому моменту уже понимаете!).

Выводы

Для Spark, Spark SQL, Hive, Impala и других подобных технологий столбчатое хранение данных может дать 100-кратное, а иногда и 1000-кратное повышение производительности, особенно для разреженных запросов к очень широким наборам данных. Несмотря на свою сложность, файлы оптимизированы для компьютеров, и хотя это затрудняет их чтение человеком, они действительно кажутся большим шагом вперед по сравнению с такими форматами, как JSON или Avro.

Ого, вы дочитали аж до сюда?

Спасибо, что дочитали до самого конца! Полагаю, вы глубоко увлечены работой с big data, если добрались до конца. Если вам когда-нибудь понадобится поделиться идеями или получить совет по поводу того, над чем вы работаете, не стесняйтесь отправить мне электронное письмо matthew (at) rathbonelabs (dot com). Я люблю говорить о big data с умными людьми.


Всех, кому интересен курс Data Engineer приглашаем посетить бесплатный демо-урок курса на тему ML в Spark. На уроке участники узнают об особенностях ML в Spark, рассмотрят процесс разработки моделей и научатся переводить обученные модели в production.


ЗАБРАТЬ СКИДКУ

Подробнее..

Перевод Почему ваши Spark приложения медленно работаютили не работают вообще. Часть 1 Управление памятью

02.02.2021 02:11:38 | Автор: admin

Будущих учащихся на курсе Экосистема Hadoop, Spark, Hive приглашаем на открытый вебинар по теме Spark Streaming. На вебинаре участники вместе с экспертом познакомятся со Spark Streaming и Structured Streaming, изучат их особенности и напишут простое приложение обработки потоков.

А сейчас делимся с вами традиционным переводом полезного материала.


Spark приложения легко писать и легко понять, когда все идет по плану. Однако, это становится очень сложно, когда приложения Spark начинают медленно запускаться или выходить из строя. Порой хорошо настроенное приложение может выйти из строя из-за изменения данных или изменения компоновки данных. Иногда приложение, которое до сих пор работало хорошо, начинает вести себя плохо из-за нехватки ресурсов. Список можно продолжать и продолжать.

Важно понимать не только приложение Spark, но также и его базовые компоненты среды выполнения, такие как использование диска, сети, конфликт доступа и т.д., чтобы мы могли принимать обоснованные решения, когда дела идут плохо.

В этой серии статей я хочу рассказать о некоторых наиболее распространенных причинах, по которым приложение Spark выходит из строя или замедляется. Первая и наиболее распространенная это управление памятью.

Если бы мы заставили всех разработчиков Spark проголосовать, то условия отсутствия памяти (OOM) наверняка стали бы проблемой номер один, с которой все столкнулись.Это неудивительно, так как архитектура Spark ориентирована на память. Некоторые из наиболее распространенных причин OOM:

  • неправильное использование Spark

  • высокая степень многопоточности (high concurrency)

  • неэффективные запросы

  • неправильная конфигурация

Чтобы избежать этих проблем, нам необходимо базовое понимание Spark и наших данных. Есть определенные вещи, которые могут быть сделаны, чтобы либо предотвратить OOM, либо настроить приложение, которое вышло из строя из-за OOM.Стандартная конфигурация Spark может быть достаточной или не подходящей для ваших приложений. Иногда даже хорошо настроенное приложение может выйти из строя по причине OOM, когда происходят изменения базовых данных.

Переполнение памяти может быть в узлах драйвера, исполнителя и управления. Рассмотрим каждый случай.

НЕДОСТАТОЧНО ПАМЯТИ ПРИ РАБОТЕ ДРАЙВЕРА

Драйвер в Spark это JVM (Java Virtual Machine) процесс, в котором работает основной поток управления приложения. Чаще всего драйвер выходит из строя с ошибкой OutOfMemory OOM (недостаточно памяти из-за неправильного использования Spark. Spark это механизм распределения нагрузки между рабочим оборудованием. Драйвер должен рассматриваться только как дирижер. В типовых установках драйверу предоставляется меньше памяти, чем исполнителям. Поэтому мы должны быть осторожны с тем, что мы делаем с драйвером.

Обычными причинами, приводящими к OutOfMemory OOM (недостаточно памяти) драйвера, являются:

  • rdd.collect()

  • sparkContext.broadcast

  • Низкий уровень памяти драйвера, настроенный в соответствии с требованиями приложения

  • Неправильная настройка Spark.sql.autoBroadcastJoinThreshold.

Spark использует этот лимит для распределения связей ко всем узлам в случае операции соединения. При самом первом использовании, все связи реализуются на узле драйвера. Иногда многочисленные таблицы также транслируются как часть осуществления запроса.

Попробуйте написать свое приложение таким образом, чтобы в драйвере можно было избежать полного сбора всех результатов. Вы вполне можете делегировать эту задачу одной из управляющих программ. Например, если вы хотите сохранить результаты в определенном файле, вы можете либо собрать их в драйвере, или назначить программу, которая сделает это за вас.

Если вы используете SQL (Structured Query Language) от Spark, а драйвер находится в состоянии OOM из-за распределения связей, то вы можете либо увеличить память драйвера, если это возможно; либо уменьшить значение "spark.sql.autoBroadcastJoinThreshold" (неправильная настройка порога подключения) так, чтобы ваши операции по объединению использовали более удобные для памяти операции слияния соединений.

НЕДОСТАТОЧНО ПАМЯТИ ПРИ РАБОТЕ УПРАВЛЯЮЩЕЙ ПРОГРАММ

Это очень распространенная проблема с приложениями Spark, которая может быть вызвана различными причинами. Некоторые из наиболее распространенных причин высокая степень многопоточности, неэффективные запросы и неправильная конфигурация. Рассмотрим каждую по очереди.

ВСОКАЯ СТЕПЕНЬ МНОГОПОТОЧНОСТИ

Прежде чем понять, почему высокая степень многопоточности может быть причиной OOM, давайте попробуем понять, как Spark выполняет запрос или задание и какие компоненты способствуют потреблению памяти.

Spark задания или запросы разбиваются на несколько этапов, и каждый этап далее делится на задачи. Количество задач зависит от различных факторов, например, на какой стадии выполняется, какой источник данных читается и т.д. Если это этап map-stage (фаза сканирования в SQL), то, как правило, соблюдаются базовые разделы источника данных.

Например, если реестр таблицы ORC (Optimized Row Columnar) имеет 2000 разделов, то для этапа map-stage создается 2000 заданий для чтения таблицы, предполагая, что обработка разделовещё не началась. Если это этап reduce-stage (стадия Shuffle), то для определения количества задач Spark будет использовать либо настройку "spark.default.parallelism" для RDD (Resilient Distributed Dataset), либо "spark.sql.shuffle.partitions" для DataSet (набор данных). Сколько задач будет выполняться параллельно каждой управляющей программе, будет зависеть от свойства "spark.executor.cores". Если это значение установить больше без учета памяти, то программы могут отказать и привести к ситуации OOM (недостаточно памяти). Теперь посмотрим на то, что происходит, как говорится, за кадром, при выполнении задачи и на некоторые вероятные причины OOM.

Допустим, мы реализуем задачу создания схемы (map) или этап сканирования SQL из файла HDFS (распределенная файловая система Hadoop distributed file system) или таблицы Parquet/ORC. Для файлов HDFS каждая задача Spark будет считывать блок данных размером 128 МБ.Таким образом, если выполняется 10 параллельных задач, то потребность в памяти составляет не менее 128*10 только для хранения разбитых на разделы данных. При этом опять же игнорируется любое сжатие данных, которое может привести к резкому скачку данных в зависимости от алгоритмов сжатия.

Spark читает Parquet (формат файлов с открытым исходным кодом) в векторном формате. Проще говоря, каждая задача Spark считывает данные из файла Parquet пакет за пакетом. Так как Parquet является столбцом, то эти пакеты строятся для каждого из столбцов.Она накапливает определенный объем данных по столбцам в памяти перед выполнением любой операции над этим столбцом. Это означает, что для хранения такого количества данных Spark необходимы некоторые структуры данных и учет. Кроме того, такие методы кодирования, как словарное кодирование, имеют некоторое состояние, сохраненное в памяти. Все они требуют памяти.

Spark задачи и компоненты памяти во время сканирования таблицыSpark задачи и компоненты памяти во время сканирования таблицы

Так что, при большем количестве параллелей, потребление ресурсов увеличивается. Кроме того, если речь идет о широковещательное соединении (broadcast join), то широковещательные переменные (broadcast variables) также займут некоторое количество памяти. На приведенной выше диаграмме показан простой случай, когда каждый исполнитель выполняет две задачи параллельно.

НЕЭФФЕКТИВНЕ ЗАПРОС

Хотя программа Spark's Catalyst пытается максимально оптимизировать запрос, она не может помочь, если сам запрос плохо написан. Например, выбор всех столбцов таблицы Parquet/ORC. Как видно из предыдущего раздела, каждый столбец нуждается в некотором пакетном состоянии в памяти. Если выбрано больше столбцов, то больше будет потребляться ресурсов.

Постарайтесь считывать как можно меньше столбцов. Попробуйте использовать фильтры везде, где это возможно, чтобы меньше данных попадало к управляющим программам. Некоторые источники данных поддерживают обрезку разделов. Если ваш запрос может быть преобразован в столбец(ы) раздела, то это в значительной степени уменьшит перемещение данных.

НЕПРАВИЛЬНАЯ КОНФИГУРАЦИЯ

Неправильная конфигурация памяти и кэширования также может привести к сбоям и замедлению работы приложений Spark. Рассмотрим некоторые примеры.

ПАМЯТЬ ИСПОЛНИТЕЛЯ И ДРАЙВЕРА

Требования к памяти каждого приложения разные. В зависимости от требований, каждое приложение должно быть настроено по-разному. Вы должны обеспечить правильные значения памяти spark.executor.memory или spark.driver.memory в зависимости от загруженности. Как бы очевидно это ни казалось, это одна из самых трудных задач. Нам нужна помощь средств для мониторинга фактического использования памяти приложения. Unravel (Unravel Data Operations Platform) делает это довольно хорошо.

ПЕРЕГРУЗКА ПАМЯТИ

Иногда это не память управляющей программы, а перегруженная память модуля YARN (Yet Another Resource Negotiator еще один ресурсный посредник), которая вызывает OOM или узел перестает функционировать (killed) из-за YARN. Сообщения "YARN kill" обычно выглядят так:

YARN запускает каждый компонент Spark, как управляющие программы и драйвера внутри модулей. Переполненная память это off-heap память, используемая для JVM в режиме перегрузки, интернированных строк и других метаданных JVM. В этом случае необходимо настроить spark.yarn.executor.memoryOverhead на нужное значение. Обычно 10% общей памяти управляющей программы должно быть выделено под неизбежное потребление ресурсов.

КЭШИРОВАННАЯ ПАМЯТЬ

Если ваше приложение использует кэширование Spark для хранения некоторых наборов данных, то стоит обратить внимание на настройки менеджера памяти Spark. Менеджер памяти Spark разработан в очень общем стиле, чтобы удовлетворить основные рабочие нагрузки. Следовательно, есть несколько настроек, чтобы установить его правильно для определенной внеплановой нагрузки.

Spark определила требования к памяти как два типа: исполнение и хранение. Память хранения используется для кэширования, а память исполнения выделяется для временных структур, таких как хэш-таблицы для агрегирования, объединения и т. д.

Как память исполнения, так и память хранения можно получить из настраиваемой части (общий объем памяти 300МБ). Эта настройка называется "spark.memory.fraction". По умолчанию 60%. Из них по умолчанию 50% (настраивается параметром "spark.memory.storageFraction") выделяется на хранение и остаток выделяется на исполнение.

Бывают ситуации, когда каждый из вышеперечисленных резервов памяти, а именно исполнение и хранение, могут занимать друг у друга, если другой свободен. Кроме того, память в хранилище может быть уменьшена до предела, если она заимствовала память из исполнения. Однако, не вдаваясь в эти сложности, мы можем настроить нашу программу таким образом, чтобы наши кэшированные данные, которые помещаются в память хранилища, не создавали проблем для выполнения.

Если мы не хотим, чтобы все наши кэшированные данные оставались в памяти, то мы можем настроить "spark.memory.storageFraction" на меньшее значение, чтобы лишние данные были исключены и выполнение не столкнулось бы с нехваткой памяти.

ПЕРЕГРУЗКА ПАМЯТИ В МЕНЕДЖЕРЕ УЗЛА

Spark приложения, которые осуществляют перетасовку данных в рамках групповых операций или присоединяются к подобным операциям, испытывают значительные перегрузки. Обычно процесс перетасовки выполняется управляющей программой. Если управляющая программа (исполнитель) занята или завалена большим количеством (мусора) GC (Garbage Collector), то она не может обслуживать перетасовки запросов. Эта проблема в некоторой степени решается за счет использования внешнего сервиса обмена.

Внешний сервис обмена работает на каждом рабочем узле и обрабатывает поступающие от исполнителей запросы на переключение. Исполнители могут читать перемешанные файлы с этого сервиса, вместо того, чтобы не считывать файлы между собой. Это помогает запрашивающим исполнителям читать перемешанные файлы, даже если производящие их исполнители не работают или работают медленно. Также, когда включено динамическое распределение, его обязательным условием является включение внешнего сортировочного сервиса.

Когда внешний сервис обмена данными Spark настроен с помощью YARN, NodeManager (управляющий узел) запускает вспомогательный сервис, который действует как внешний провайдер обмена данными. По умолчанию память NodeManager составляет около 1 ГБ. Однако приложения, выполняющие значительную перестановку данных, могут выйти из строя из-за того, что память NodeManager исчерпана. Крайне важно правильно настроить NodeManager, если ваши приложения попадают в вышеуказанную категорию.

КОНЕЦ ЧАСТИ 1, СПАСИБО ЗА ВНИМАНИЕ

Процессинг внутренней памяти Spark ключевая часть ее мощности. Поэтому эффективное управление памятью является критически важным фактором для получения наилучшей производительности, расширяемости и стабильности ваших приложений Spark и каналов передачи данных. Однако настройки по умолчанию в Spark часто бывают недостаточными. В зависимости от приложения и среды, некоторые ключевые параметры конфигурации должны быть установлены правильно для достижения ваших целей производительности. Если иметь базовое представление о них и о том, как они могут повлиять на общее приложение, то это поможет в работе.

Я поделился некоторыми соображениями о том, на что следует обратить внимание при рассмотрении вопроса об управлении памятью Spark. Это область, которую платформа Unravel понимает и оптимизирует очень хорошо, с небольшим количеством, если таковое вообще потребуется, человеческого вмешательства. Я рекомендую вам заказать демо-версию, чтобы увидеть Unravel в действии. Мы видим довольно значительное ускорение работы приложений Spark.

Во второй части этой серии статьи напишу о том, почему ваши приложения Spark медленно работают или не работают: Во второй части цикла, посвященной искажению данных и сбору мусора, я расскажу о том, как структура данных, искажение данных и сбор мусора влияют на производительность Spark.


Узнать подробнее о курсе Экосистема Hadoop, Spark, Hive.

Записаться на открытый вебинар по теме Spark Streaming.

Подробнее..

Как построить современное аналитическое хранилище данных на базе Cloudera Hadoop

28.04.2021 12:07:29 | Автор: admin

Привет.

В конце прошлого года GlowByte и Газпромбанк сделали большой совместный доклад на конференции Big Data Days, посвященный созданию современного аналитического хранилища данных на базе экосистемы Cloudera Hadoop. В статье мы детальнее расскажем об опыте построения системы, о сложностях и вызовах с которыми пришлось столкнуться и преодолеть и о тех успехах и результатах, которых мы достигли..


Появление технологии Hadoop десятилетние назад вызвало на рынке интеграции данных небывалый ажиотаж и оптимизм. Индустрия задалась вопросом а готова ли технология вытеснить традиционные системы обработки данных?. За прошедшую декаду было сломано немало копий в этой битве. Кто-то успел разочароваться, кто-то добился локальных успехов, а тем временем сама экосистема прошла короткий, но стремительный эволюционный путь, который позволяет уверенно сказать, что в настоящий момент не существует задачи и вызова в области обработки и интеграции данных, которую не способен решить Hadoop.

В этой статье мы попытаемся дать ответ на главный вопрос как создать современное аналитическое хранилище данных на базе экосистемы Cloudera на примере проекта, реализованного нами в Газпромбанк АО. Попутно расскажем как мы справились с основными вызовами при решении задачи.

Газпромбанк АО один их ведущих системообразующих финансовых институтов РФ. Он входит в топ-3 банков по активам России и всей Восточной Европы и имеет разветвленную сеть дочерних филиалов.

Банк традиционно на рынке финансовых услуг был консервативным и ориентировался на корпоративный сектор, но в 2017 году принял стратегию Цифровой трансформации с целью развития направления розничного бизнеса.

Розничный банковский сектор является высококонкурентным в РФ и для реализации стратегии Газпромбанку потребовалось создание новой технологической платформы, которая должна удовлетворять современным требованиям, так как основой интенсивного роста на конкурентном рынке могут быть только data driven процессы.

На тот момент в Банке уже было несколько платформ интеграции данных. Основная платформа КХД занята классическими, но критичными с точки зрения бизнеса задачами: управленческой, финансовой и регуляторной отчетности. Внесение изменения в текущую архитектуру КХД несло серьезные риски и финансовые затраты. Поэтому было принято решение разделить задачи и создавать аналитическую платформу с нуля.

Верхнеуровнево задачи ставились следующие:

  • Создание озера данных (как единой среды, в которой располагаются все необходимые для анализа данные);

  • Консолидации данных из озера в единую модель;

  • Создание аналитический инфраструктуры;

  • Интеграция с бизнес-приложениями;

  • Создание витрин данных;

  • Внедрение Self-service инструментов;

  • Создание Data Science окружения.

Этап проработки архитектуры важно начинать после консолидации и уточнения всех ключевых требований к системе. Требования были разделили на два больших блока:

Бизнес-требования

  • Обеспечение данными бизнес-приложений: аналитический CRM, Real Time Offer, Next Best Offer, розничный кредитный конвейер;

  • Возможность работы с сырыми данными из систем-источников as is (функция Data Lake);

  • Среда статистического моделирования;

  • Быстрое подключение новых систем источников к ландшафту;

  • Возможность обработки данных за всю историю хранения;

  • Единая модель консолидированных данных (аналитическое ядро);

  • Графовая аналитика;

  • Текстовая аналитика;

  • Обеспечение качества данных.

Требования ИТ

  • Высокая производительность при дешевом горизонтальном масштабировании;

  • Отказоустойчивость и высокая доступность;

  • Разделяемая нагрузка и гарантированный SLA;

  • ELT обработка и трансформация данных;

  • Совместимость с имеющимися Enterprise решениями (например, SAP Business Objects, SAS);

  • Ролевая модель доступа и полное обеспечение требований информационной безопасности.

Кроме этого, система должна быть линейно масштабируемой, основываться на open source технологиях, и самое главное соотношение стоимость\производительность должно быть самым конкурентным из всех предложений на рынке.

Для создания единой аналитической платформы розничного бизнеса мы выбрали стек Hadoop на базе дистрибутива Cloudera Data Hub

Архитектура решения

Рассмотрим архитектуру решения.

Рис. Архитектура

Система разделена на два кластера Cloudera Data Hub. Кластер регламентных процессов и Лаборатория данных

1. Кластер регламентных процессов

Все регламентные источники данных подключаются к данному кластеру. Все регламентные ETL расчеты также работают на этом контуре. Все системы потребители данных запитываются из регламентного кластера. Таким образом выполняется жесткая изоляция непредсказуемой пользовательской нагрузки от критичных бизнес процессов.

В настоящий момент к Hadoop подключено свыше 40-ка систем-источников с регламентом от t-1 день до t-15 минут для batch загрузки, а также real-time интеграция с процессинговым центром. Регламентный контур поставляет данные во все системы розничного бизнеса:

  • Аналитический CRM;

  • Розничный кредитный конвейер;

  • Антифрод система;

  • Система принятия решений;

  • Collection;

  • MDM;

  • Система графовой аналитики;

  • Система текстовой аналитики;

  • BI отчетность

2. Кластер пользовательских экспериментов Лаборатория данных

В то же время, все данные которые загружаются на регламентный контур в режиме онлайн реплицируются на контур пользовательских экспериментов. Задержка по времени минимальная и зависит только от пропускной способности сетевого канала тк контур лаборатории данных находится в другом ЦОДе. Те пользовательский контур одновременно выполняет роль Disaster Recovery плеча в случае выхода из строя основного ЦОДа.

Дата инженеры и дата science специалисты получают все необходимые данные для проведения своих исследований и проверки гипотез без задержки и без ожидания днями и неделями, когда нужные им данные для расчетов или тренировки моделей куда то выгрузят. Они доступны все в одном месте и всегда свежие. Дополнительно на кластере лаборатории данных создаются пользовательские песочницы, где можно создавать и свои объекты. Также ресурсы кластера распределены именно для высококонкурентной пользовательской нагрузки. На регламентный кластер у пользовательского доступа нет.

После проверки гипотез, подготовки требований для регламентных расчетов либо тренировки моделей, результаты передаются для постановки на регламентный контур и сопровождения.

Дополнительно на контуре лаборатории создано окружение управления жизненным циклом моделей, окружение пользовательских аналитических приложений с управлением ресурсами на K8S, подключены два специализированных узла с GPU ускорением для обучения моделей.

Система мониторинга и управления кластерами, загрузками, ETL, реализована на дополнительных виртуальных машинах, не включенных напрямую в кластера Cloudera.

Сейчас версия дистрибутива CDH 5.16.1. В архитектурный подход закладывалась ситуация выхода из строя двух любых узлов без последующей остановки системы.

Характеристики Data узлов следующие: CPU 2x22 Cores 768Gb RAM SAS HDD 12x4Tb. Все собрано в HPE DL380 в соответствии с рекомендациями Cloudera Enterprise Reference Architecture for Bare Metal Deployments. Такой необычный, как кому-то может показаться, сайзинг связан с выбором подхода по ETL и процессингового движка для работы с данными. Об этом немного ниже. Необычность его в том, что вместо 100500 маленьких узлов, мы выбираем меньше узлов, но сами узлы жирнее.

Основные технические вызовы

В процессе проработки и внедрения мы столкнулись с рядом технических вызовов, которые необходимо было решить, для того чтобы система удовлетворяла выше заявленным высоким требованиям.

  • Выбор основного процессингового движка в Hadoop;

  • Подход по трансформации данных (ETL);

  • Репликация данных Система-источник > Hadoop и Hadoop > Hadoop;

  • Изоляция изменений и консистентность данных;

  • Управление конкурентной нагрузкой;

  • Обеспечение требований информационной безопасности

Далее рассмотрим каждый из этих пунктов детально.

Выбор основного процессингового движка

Горький опыт первых попыток некоторых игроков реализовать ХД в Hadoop 1.0 показал, что нельзя построить систему обработки данных руками java программистов, не имеющих опыта построения классических ХД за плечами, не понимающих базовых понятий жизненного цикла данных, не способных отличить дебит от кредита или рассчитать просрочку. Следовательно, для успеха нам надо сформировать команду специалистов по данным, понимающих нашу предметную область и использовать язык структурированных запросов SQL.

В целом, базовый принцип работы с данными которого стоит придерживаться если задачу можно решить на SQL то ее нужно решать только на SQL. А большинство задач с данными решаются именно с помощью языка структурированных запросов. Да и нанять и подготовить команду SQL-щиков для проектной работы быстрее и дешевле чем специалистов по данным, окончивших курсы на диване из рекламы в инстаграм.

Для нас это означало что необходимо выбрать правильный SQL движок для работы с данными в Hadoop. Остановили свой выбор на движке Impala так как он имеет ряд конкурентных преимуществ. Ну и собственно ориентация на Impala во многом и предопределила выбор в пользу Cloudera как дистрибутива Hadoop для построения аналитического хранилища.

Чем же Impala так хороша?

Impala движок распределенных вычислений, работающий напрямую с данными HDFS, а не транслирующий команды в другой фреймворк вроде MapReduce, TEZ или SPARK.

Impala движок который большинство всех операций выполняет в памяти.

Impala читает только те блоки Parquet, которые удовлетворяют условиям выборки и соединений (bloom фильтрация, динамическая фильтрация), а не поднимает для обработки весь массив данных. Поэтому в большинстве аналитических задач на практике Impala быстрее чем другие традиционные MPP движки вроде Teradata или GreenPlum.

Impala имеет хинты, позволяющие очень легко управлять планом запроса, что весьма важный критерий при разработке и оптимизации сложных ETL преобразований без переписывания запроса.

Движок не разделяет общие ресурсы Hadoop с другими сервисами так как не использует YARN и имеет свой ресурсный менеджмент. Это обеспечивает предсказуемую высоко конкурентную нагрузку.

Синтаксис SQL настолько близок к традиционным движкам, что на подготовку разработчика или аналитика, имеющего опыт другой SQL системы, уходит не больше 3-4х часов.

Вот как работа с Hadoop выглядит глазами аналитика:

Рис. Работа с Impala SQL в Hue

Это работа в веб-ноутбуке Hue, который идет вместе с Cloudera. Не обделены и те пользователи, кто предпочитает работать с классическими толстыми SQL клиентами или сводными таблицами Excel.

Рис. SQL доступ к Hadoop в локальном толстом клиенте.

Многие кто читал рекомендации Cloudera могут задаться вопросом а почему Impala не рекомендована как ETL движок, а только как движок пользовательского ad-hoc или BI доступа? Ответ на самом деле прост - Impala не имеет гарантии исполнения запроса чтобы не стало в отличие от Hive. Eсли падает запрос или узел, то запрос автоматически не перезапустится и поднимать его надо вручную.

Это проблема легко решаема ETL поток или запрос в приложении должны уметь перезапускаться в таких ситуациях.

ETL потоки в нашем решении перезапускаются без вмешательства администратора автоматически:

  • При падении запроса происходит автоматический анализ причины;

  • При необходимости автоматически подбираются параметры конкретного запроса или параметры сессии чтобы повторный перезапуск отработал без ошибок;

  • Выполняется сбор статистической информации по ошибкам для дальнейшего анализа и настройки потока чтобы в будущем по данному запросу или jobу таких ситуаций не возникало.

У нас на проекте сложилась парадоксальная ситуация - команда аналитиков и инженеров по данным, работающих над проектом, знала про Hadoop только то, что на логотипе есть желтый слоник. Для них Hadoop - это привычный SQL. Уже после уборки урожая (завершения разработки аналитического слоя, о котором речь пойдет ниже), ребята попросили провести для них обучение по Hadoop чтобы быть в теме.

Подход по трансформации данных

В разработке трансформации данных важно не только выбрать правильный движок, но и принять правильные стандарты разработки. У нас давно сформировался подход к таким задачам как metadata driven E-L-T при котором трансформация данных отрисовывается в диаграмме ETL инструмента, который в свою очередь генерирует SQL и запускает его в среде исполнения. При этом SQL должен быть максимально оптимальным с точки зрения конкретной среды исполнения. На рынке не так много ETL инструментов, позволяющих управлять генерацией SQL. В данном внедрении использовался инструмент SAS Data Integration.

Весь регламентный ETL выполнен в подходе metadata driven ELT. Никаких ручных скриптов с планировкой на airflow!

Такой подход позволяет

  • Автоматизировать процессы управления метаданными;

  • Автоматизировать процесс построения lineage данных как средствами самого ETL инструмента, так и средствами доступа к API;

  • Повысить качество процессов внесения изменений и управления данными т.к. вся информация о зависимостях всех объектов и всех jobв хранится в метаданных ETL инструмента.

  • Использовать CI/CD процессы в разработке

Рис. Примеры диаграмм ETL процессов

SAS DI позволяет визуализировать граф зависимостей в штатном функционале или можно выгрузить метаданные через API и использовать их для анализа в других средах.

Рис. Граф зависимостей объектов.

Репликация данных

Загрузка данных в систему ключевая отправная точка реализации функциональных бизнес требований системы.

Для этой функции был разработан специализированный инструмент Data Replicator. Инструмент позволяет в очень короткие сроки подключать системы источники и настраивать загрузку данных в Hadoop.

Из возможностей

  • Синхронизация метаданных с источника;

  • Встроенные механизмы контроля качества загруженных данных;

  • Загрузка в различных режимах работы в т.ч. полная копия, извлечение и загрузка инкремента (по любой скалярной детерминированной функции), архивация данных источника и т.д.

Решение имеет гибкие настройки позволяющие приоритизировать задания загрузки, балансировку, контроль многопоточности. Это позволяет бережно относится к источнику при извлечении данных, но в то же время гарантировать SLA доступности данных в Hadoop.

Другая очень важная функция Data Replicatorа - автоматическая репликация данных с регламентного кластера Hadoop на DR кластер. Данные, загружаемые из систем-источников реплицируются автоматически, для деривативных данных существует API. Все регламентные ETL процессы, при обновлении целевой таблицы вызывают API которое запускает процесс мгновенного копирования изменений на резервный контур. Таким образом, DR кластер, который так же выполняет роль пользовательской песочницы, всегда имеет свежие данные.

Нами реализовано множество конфигураций для различных СУБД используемых как источники в ГПБ, также для других процессинговых движков Hadoop (для случаев когда другой кластер Hadoop является источником данных для системы) и есть возможность обрабатывать данные, загруженные в систему другими инструментами, например kafka, flume, или промышленный ETL tool.

Изоляция изменений и консистентность

Любой кто работал в Hadoop сталкивался с проблемой конкурентного доступа к данным. Когда пользователь читает таблицу, а другая сессия пытается туда записать данные, то происходит блокировка таблицы (в случае Hive) либо пользовательский запрос падает (в случае Impala).

Самое распространенное решение на практике выделение регламентных окон на загрузку во время которых не допускается работа пользователей, либо каждая новая порция загрузки записывается в новую партицию. Для нас первый подход неприемлем тк мы должны гарантировать доступность данных 24х7 как по загрузке так и по доступу. Второй подход не применим т.к. он предполагает секционирование данных только по дате\порции загрузке, что неприемлемо если требуется отличное секционирование (по первичному ключу, по системе источнику и т.д.). Так же второй метод приводит к избыточному хранению данных.

Забегая вперед хочется отметить, что в настоящее время в HIVE 3 проблемы решена путем добавления поддержки ACID транзакционности, но, в нашей версии дистрибутива у нас далеко не третий Hive (да еще и на Map Reduce), а хотим получить высокую производительность и конкурентную нагрузку и поэтому нам пришлось реализовать ACID для Impala в Hadoop самостоятельно.

В нашем решении изоляция выполнена с применением подхода HDFS snapshot и разделения слоя хранения и доступа к данным через VIEW.

Когда данные записываются в HDFS, сразу, мгновенно создается снапшот на который переключается VIEW.

Пользователь читает данные с VIEW, а не напрямую с таблицы, поэтому следующая сессия записи никак не влияет на его текущий запрос.

Все что остается делать это переключать VIEW на новые HDFS снапшоты, число которых определяется максимальной длительностью пользовательских запросов и частотой обновления данных в Hadoop. Те в сухом остатке мы получаем аналог UNDO в Oracle, retention период которого зависит от количества снапшотов и регламента загрузки данных.

Основной секрет в том, что как только процессинговый движок определил какие данные из HDFS он должен прочитать, после этого DDL VIEW или таблицы может быть изменен т.к. оптимизатор больше не будет обращаться к словарю metastore. Т.е. можно выполнить переключение VIEW на другую директорию.

Функционал HDFS Snapshot настолько легковесный и быстрый что позволяет создавать сотни снапшотов в минуту и никак не влияет на производительность системы.

Изоляции изменений в нашем решении также является функцией DataReplictorа. Все загружаемые данные изолируются автоматически, причем на обеих контурах системы, а производные ETL данные изолируются через вызов API. Каждое изменение целевого объекта, которое происходит в рамках ETL процесса завершается вызовом API по созданию снапшота и переключению VIEW.

Благодаря такому решению, все загрузки и все данные доступны в режиме 24х7 без регламентных окон. HDFS снапшоты не приводят к большому избыточному хранению данных в HDFS. Наш опыт показал, что для часто меняющихся регламентных данных хранение снапшотов за трое суток приводит к увеличению размера максимум на 25%.

Управление конкурентной нагрузкой

Следующий большой блок требований управление конкурентной нагрузкой.

На практике это означает что нужно обеспечить

  • Предсказуемую работу регламентных процессов;

  • Приоритизация пользователей в зависимости от принадлежности к ресурсной группе;

  • Отсутствие, минимизация или управление отказами в обслуживании;

Как это обеспечено на практике

  • Настроено разделение ресурсов между сервисами Hadoop на уровне ОС через cgroups;

  • Правильное распределение памяти между нуждами ОС и Hadoop;

  • Правильное распределение памяти внутри кластера между служебными сервисами Hadoop, YARN приложениями и Impala;

  • Выделение ресурсных пулов Impala отдельным пользовательским группам для гарантии обслуживания и приоритизация запросов

Результат предсказуемая высококонкурентная нагрузка десятков пользователей одновременно и десятков тысяч ETL запросов в сутки без влияния на другие составляющие экосистемы Cloudera.

Ри. Количество SQL запросов, завершающихся каждую секунду.

В настоящий момент на кластере регламентных расчетов в сутки регистрируется и успешно выполняется в среднем 900 тыс SQL запросов по трансформации и загрузке данных. В дни массовых загрузок и расчетов эта цифра поднимается до полутора миллионов.

Рис. Средняя утилизация CPU за сутки

При этом мы видим, что остается внушительный запас по производительности с тз возможностей повышения конкурентной работы. Есть понимание что это может быть и 1,5 млн и 2 млн запросов. Это означает что выбранный подход оказался верным и пропускная способность системы как и ее предсказуемость под нагрузкой показывает выдающиеся результаты.

Информационная безопасность

В финансовом секторе традиционно вопросы информационной безопасности являются одними из самых ключевых тк приходится работать с данными, которые не только подлежат защите с тз федерального законодательства, но и с требованиями, которые периодически ужесточаются госрегулятором. При выборе дистрибутива Hadoop стоит особое внимание уделять этим требованиям, так как большинство не вендорских сборок, либо сборок, спроектированных на базе популярных open source дистрибутивов (например Apache Big Top) не позволяют закрывать часть требований и при выводе системы в промышленную эксплуатацию можно столкнуться с неприятными сюрпризами недопуска системы от службы ИБ.

В кластере Cloudera нами были реализованные следующие требования:

  • Ролевая модель доступа к данным

    • Все пользователи включены в группы Active Directory (AD) каталога;

    • Группы AD зарегистрированы в Sentry;

    • В Sentry выполнено разграничение доступа для баз Impala и директорий HDFS;

    • Каждый Target слой данных имеет ролевые слои VIEW с ограничениями на чувствительные данные в соответствии с ролевой моделью доступа;

  • Кластеры керберизированы;

  • Подключение клиентских приложений только с применением SSL шифрования. Также шифрование используется при передачи данных внутри кластера.

  • Выполняется парсинг и приведение всех журналов сервисов Hadoop к единому реляционному формату стандартного журнала ИБ (единая точка интеграции для системы сбора данных ИБ)

    • Пользовательские запросы;

    • Запросы ETL;

    • Точки интеграции Hadoop с другими системами;

  • Все серверы, ОС, компоненты и прикладное ПО настроены в соответствии с согласованными профилями информационной безопасности и периодически проходят проверку на предмет известных уязвимостей.

Единый аналитический слой данных

Наличие общего слоя консолидированных данных основное требование аналитического ХД.

Без этого Hadoop (как и любое другое ХД) озеро данных, которое пользователи начинают превращать со временем в неуправляемое болото. Поэтому важно иметь общую версию правды над этим озером чтобы все задачи решались в единой системе координат.

Был разработан единый аналитический слой консолидированных данных. Источником для него является копия детального слой КХД, которая регулярно реплицируется в среду Hadoop, а также дополнительные источники, подключаемые напрямую, минуя КХД.

Модель ориентирована на пользовательский ad-hoc доступ и проектировалась с учетом требований типовых задач клиентской аналитики, риск моделей, скоринга.

Реализованы все области данных, необходимые для решения задач розничного бизнеса и моделирования такие как:

  • Аккредитивы

  • Депозиты

  • Залоги

  • Заявки

  • Карты

  • Контрагенты

  • MDM

  • Кредиты

  • Сегмент клиента

  • Рейтинги

  • Агрегаты

  • Справочники

  • Счета

  • Эквайринг

  • Векселя

  • РЕПО

  • Резервы

В настоящий момент слой состоит из 177 целевых объектов и порядка 2350 бизнес-атрибутов. В snappy сжатии объем данных порядка 20 Тб (не менее 100 Тб в RAW).

В модель загружена история с 2010 года. Ведь точность моделей зависит от глубины истории данных, на которых она обучается. Более того, история очищалась аналитическими алгоритмами. Дето в том, что в банке разветвленная филиальная сеть и часть филиалов мигрировали друг в друга, клиенты переходили из одного филиала в другой, производили пролонгацию сделок и тд. Все это составляет определенные сложности для анализа данных. Но в конечном целевом слое вся история отношений с каждым клиентом, все сделки, имеют непрерывную историю в рамках одного суррогатного ключа без пересекающихся интервалов историчности.

Реализованный единый слой - источник данных для производных прикладных витрин под бизнес-приложения, отчетность и модели. Сейчас у нас около 40 производных регламентных витрин, состоящих из 550 целевых таблиц и примерно 13200 атрибутов.

Надежность

Часто приходится слушать о ненадежности решений, спроектированных на Hadoop. За два года эксплуатации Cloudera Data Hub у нас практически не было каких-либо проблем, связанных с простоем системы. Случилось буквально пара инцидентов, повлиявших не регламентные процессы.

Один раз у нас забилось место, выделенное под БД metastore (недостатки мониторинга).

В другой раз была попытка выгрузить несколько сотен миллионов транзакций через Impala. В результате прилег координатор и другие пользователи и процессы не могли подключиться на этот координатор. Как результат выработали правило каждый отдельный вид процессов (загрузка данных, ETL, пользователи, приложения) подключается к своему координатору, который еще имеет дублера для балансировки. Ну и конечно большие выгрузки данных в системы потребители лучше делать через sqoop export. Ну и в последних релизах Impala уже без проблем может отдавать десятки миллионов записей на подключение.

Да, случаются выходы из строя дисков, приходится иногда делать decommission узлов для их замены, но все это проходит прозрачно для пользователей без остановки работы, ведь наш архитектурный подход сразу подразумевал устойчивость к выходу из строя как минимум двух любых узлов.

Итоги

В настоящий момент система является фабрикой данных всех розничных процессов Банка и аналитических приложений. Платформой ежедневно пользуется 36 департаментов и примерно 500 пользователей для самостоятельного решения задач по аналитике и моделированию.

Реализованный нами проект стал финалистом номинации Cloudera Data Impact 2020 в категории Data For Enterprise AI.

Выводы

После двух лет промышленной эксплуатации нашей Системы мы сегодня с уверенностью можем сказать, то экосистема Hadoop полностью позволяет реализовать все современные требования к аналитической платформе при использовании дистрибутива Cloudera и при правильных архитектурных подходах. Система может полностью вытеснить все традиционные аналитические СУБД без какого-либо ущерба к накопленному опыту разработчиков и аналитиков. Нужно всего лишь принять правильные решения и сделать прыжок веры. Традиционно консервативный Газпромбанк сделал с нами этот прыжок веры и смог построить современную аналитическую платформу, ввязавшись в гонку на розничном рынке в кратчайшие сроки.

Об успехах в цифрах можно посмотреть в записи нашего совместно доклада.

Для проектирования современной аналитической системы не требуется гетерогенная архитектура слоеного пирога с пропитками из гринпламов, тарантулов, игнайтов и так далее. Все данные и сервисы работы с данными должны находится под управлением одной целостной системы. Такой подход снижает наличие дополнительных точек интеграции, а следовательно, и потенциальные отказы. Не требуются дополнительные работы и длительные сроки по интеграции и пропитке этих слоев данными.

Наш архитектурный подход позволяет ускорить внедрение нового функционала и как следствие улучшить time to market новых продуктов, основанных на data driven процессах.

В современных аналитических задачах не существует понятий горячих и холодных данных. Ситуация прилета пачки проводок, за диапазон t - 3-5 лет - это каждодневная регламентная ситуация. И для такого случая вы должны пересчитать остатки, обороты, просрочки и предоставить данные для модели или определения сегмента клиента в аналитическом CRM. Как я уже писал выше, чем глубже в истории данные, тем точнее ваши модели. Такие задачи можно решить только если все данные в одном месте и в одной системе. Наш принцип - все данные горячие!

Для успешной реализации проектной команде недостаточно опыта знания технологии Hadoop. Hadoop это всего лишь инструмент. Необходимо применять подходы проектирования классического ХД на базе SQL MPP, иначе ваша система навсегда останется помойкой под архивные данные, нарисованной внизу слоеного пирога как хранилище неструктурированных и холодных данных на архитектурной картинке.

Наши ближайшие планы

В настоящий момент мы находимся в завершающей стадии миграции на новую платформу Cloudera Data Platform 7.1. Вполне вероятно, что на момент публикации мы уже на CDP и в ближайшее время тут будут опубликованы результаты. Пока, можно с уверенностью сказать, что после проведенных тестов, мы ожидаем ряд оптимизационных улучшений, связанных с Impala 3.4, появлением страничных индексов в parquet, наличием Zstd компрессии. Новые сервисы вроде Atlas и Cloudera Data Flow позволят закрывать функции управления данными и потоковой аналитики из коробки. В ближашее время мы также планируем пилотировать родной для Cloudera BI инструмент - Cloudera Data Visualization.

Что еще мы еще сделали в нашем ландшафте Hadoop

  • Real-time интеграция системы с процессинговым центром с использованием Kudu (real-time клиентские данные, доступные для работы с минимальной задержкой наступления события). Горячие данные в Kudu, холодные в Parquet, общий склеивающий интерфейс доступа для пользователей через SQL Impala. Результат - данные в реальном времени о состоянии карточных транзакций и остатков по карточному счету открывают для бизнеса новые возможности.

  • Историзируемый слой ODS

Построение слоя ODS с использованием Oracle Golden Gate с сохранением истории изменения источника с возможностью задания гранулярности истории по каждому объекту репликации, а также архивированием в Hadoop с возможностью схлопывания интервалов холодных данных.

  • Графовая аналитика

    • Построение витрины property графа в Hadoop;

    • Загрузка в графовую БД Arango;

    • Интерфейс работы с графом для андерайтеров над Arango;

    • Графовые модели (анализ окружения клиента при скоринге);

  • Текстовая аналитика

    • Работа моделей по распознаванию первичных документов клиента и поиска в них аномалий (контроль фронта, антифрод, автоматизация работы с заявкой);

    • Анализ новостных лент, тематических форумов

  • Геоаналитика

    • Анализ удаленности и проходимости офисов от основных пешеходных маршрутов, автомобильных проездов и парковок;

    • Оптимизация курьерских маршрутов

  • Система управления качеством данных, позволяющая оценить качество всех загружаемых и производных данных для принятия решений об использовании этих данных на прикладном уровне. Результат - мониторинг через визуальные дашборды и почтовые рассылки состояния качества данных аналитического слоя, поставка данных в системы потребители вместе с паспортом качества.

  • Контейнеризация пользовательских приложений и моделей с использованием окружения K8S

Каждый пункт из этого списка достоин отдельной развернутой статьи, которые обязательно появятся в будущем. Следите за обновлениями, задавайте ваши вопросы и делитесь своим опытом.

Авторы:

Евгений Вилков, Глоубайт.

Колесникова Елена, Газпромбанк (АО).

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru