Русский
Русский
English
Статистика
Реклама

Hive

Когда у вас сберовские масштабы. Использование Ab Initio при работе с Hive и GreenPlum

07.07.2020 18:11:11 | Автор: admin
Некоторое время назад перед нами встал вопрос выбора ETL-средства для работы с BigData. Ранее использовавшееся решение Informatica BDM не устраивало нас из-за ограниченной функциональности. Её использование свелось к фреймворку по запуску команд spark-submit. На рынке имелось не так много аналогов, в принципе способных работать с тем объёмом данных, с которым мы имеем дело каждый день. В итоге мы выбрали Ab Initio. В ходе пилотных демонстраций продукт показал очень высокую скорость обработки данных. Информации об Ab Initio на русском языке почти нет, поэтому мы решили рассказать о своём опыте на Хабре.

Ab Initio обладает множеством классических и необычных трансформаций, код которых может быть расширен с помощью собственного языка PDL. Для мелкого бизнеса такой мощный инструмент, вероятно, будет избыточным, и большинство его возможностей могут оказаться дорогими и невостребованными. Но если ваши масштабы приближаются к сберовским, то вам Ab Initio может быть интересен.

Он помогает бизнесу глобально копить знания и развивать экосистему, а разработчику прокачивать свои навыки в ETL, подтягивать знания в shell, предоставляет возможность освоения языка PDL, даёт визуальную картину процессов загрузки, упрощает разработку благодаря обилию функциональных компонентов.

В посте я расскажу о возможностях Ab Initio и приведу сравнительные характеристики по его работе с Hive и GreenPlum.

  • Описание фреймворка MDW и работ по его донастройке под GreenPlum
  • Сравнительные характеристики производительности Ab Initio по работе с Hive и GreenPlum
  • Работа Ab Initio с GreenPlum в режиме Near Real Time


Функционал этого продукта очень широк и требует немало времени на своё изучение. Однако, при должных навыках работы и правильных настройках производительности результаты обработки данных получаются весьма впечатляющие. Использование Ab Initio для разработчика может дать ему интересный опыт. Это новый взгляд на ETL-разработку, гибрид между визуальной средой и разработкой загрузок на скрипто-подобном языке.
Бизнес развивает свои экосистемы и этот инструмент оказывается ему как никогда кстати. С помощью Ab Initio можно копить знания о текущем бизнесе и использовать эти знания для расширения старых и открытия новых бизнесов. Альтернативами Ab Initio можно назвать из визуальных сред разработки Informatica BDM и из невизуальных сред Apache Spark.

Описание Ab Initio


Ab Initio, как и другие ETL-средства, представляет собой набор продуктов.

Ab Initio GDE (Graphical Development Environment) это среда для разработчика, в которой он настраивает трансформации данных и соединяет их потоками данных в виде стрелочек. При этом такой набор трансформаций называется графом:

Входные и выходные соединения функциональных компонентов являются портами и содержат поля, вычисленные внутри преобразований. Несколько графов, соединённых потоками в виде стрелочек в порядке их выполнения называются планом.
Имеется несколько сотен функциональных компонентов, что очень много. Многие из них узкоспециализированные. Возможности классических трансформаций в Ab Initio шире, чем в других ETL-средствах. Например, Join имеет несколько выходов. Помимо результата соединения датасетов можно получить на выходе записи входных датасетов, по ключам которых не удалось соединиться. Также можно получить rejects, errors и лог работы трансформации, который можно в этом же графе прочитать как текстовый файл и обработать другими трансформациями:

Или, например, можно материализовать приёмник данных в виде таблицы и в этом же графе считать из него данные.
Есть оригинальные трансформации. Например, трансформация Scan имеет функционал, как у аналитических функций. Есть трансформации с говорящими названиями: Create Data, Read Excel, Normalize, Sort within Groups, Run Program, Run SQL, Join with DB и др. Графы могут использовать параметры времени выполнения, в том числе возможна передача параметров из операционной системы или в операционную систему. Файлы с готовым набором передаваемых графу параметров называются parameter sets (psets).
Как и полагается, Ab Initio GDE имеет свой репозиторий, именуемый EME (Enterprise Meta Environment). Разработчики имеют возможность работать с локальными версиями кода и делать check in своих разработок в центральный репозиторий.
Имеется возможность во время выполнения или после выполнения графа кликнуть по любому соединяющему трансформации потоку и посмотреть на данные, прошедшие между этими трансформациями:

Также есть возможность кликнуть по любому потоку и посмотреть tracking details в сколько параллелей работала трансформация, сколько строк и байт в какой из параллелей загрузилось:

Есть возможность разбить выполнение графа на фазы и пометить, что одни трансформации нужно выполнять первым делом (в нулевой фазе), следующие в первой фазе, следующие во второй фазе и т.д.
У каждой трансформации можно выбрать так называемый layout (где она будет выполняться): без параллелей или в параллельных потоках, число которых можно задать. При этом временные файлы, которые создаёт Ab Initio при работе трансформаций, можно размещать как в файловой системе сервера, так и в HDFS.
В каждой трансформации на базе шаблона по умолчанию можно создать свой скрипт на языке PDL, который немного напоминает shell.
С помощью языка PDL вы можете расширять функционал трансформаций и, в частности, вы можете динамически (во время выполнения) генерировать произвольные фрагменты кода в зависимости от параметров времени выполнения.
Также в Ab Initio хорошо развита интеграция с ОС через shell. Конкретно в Сбербанке используется linux ksh. Можно обмениваться с shell переменными и использовать их в качестве параметров графов. Можно из shell вызывать выполнение графов Ab Initio и администрировать Ab Initio.
Помимо Ab Initio GDE в поставку входит много других продуктов. Есть своя Co>Operation System с претензией называться операционной системой. Есть Control>Center, в котором можно ставить на расписание и мониторить потоки загрузки. Есть продукты для осуществления разработки на более примитивном уровне, чем позволяет Ab Initio GDE.

Описание фреймворка MDW и работ по его донастройке под GreenPlum


Вместе со своими продуктами вендор поставляет продукт MDW (Metadata Driven Warehouse), который представляет собой конфигуратор графов, предназначенный для помощи в типичных задачах по наполнению хранилищ данных или data vaults.
Он содержит пользовательские (специфичные для проекта) парсеры метаданных и готовые генераторы кода из коробки.

На входе MDW получает модель данных, конфигурационный файл по настройке соединения с базой данных (Oracle, Teradata или Hive) и некоторые другие настройки. Специфическая для проекта часть, например, разворачивает модель в базе данных. Коробочная часть продукта генерирует графы и настроечные файлы к ним по загрузке данных в таблицы модели. При этом создаются графы (и psets) для нескольких режимов инициализирующей и инкрементальной работы по обновлению сущностей.
В случаях Hive и RDBMS генерируются различающиеся графы по инициализирующему и инкрементальному обновлению данных.
В случае Hive поступившие данные дельты соединяется посредством Ab Initio Join с данными, которые были в таблице до обновления. Загрузчики данных в MDW (как в Hive, так и в RDBMS) не только вставляют новые данные из дельты, но и закрывают периоды актуальности данных, по первичным ключам которых поступила дельта. Кроме того, приходится переписать заново неизменившуюся часть данных. Но так приходится делать, поскольку в Hive нет операций delete или update.

В случае же RDBMS графы по инкрементальному обновлению данных выглядят более оптимально, потому что RDBMS имеют реальные возможности обновления.

Поступившая дельта загружается в промежуточную таблицу в базу данных. После этого происходит соединение дельты с данными, которые были в таблице до обновления. И делается это силами SQL посредством сгенерированного SQL-запроса. Далее с помощью SQL-команд delete+insert в целевую таблицу происходит вставка новых данных из дельты и закрытие периодов актуальности данных, по первичным ключам которых поступила дельта. Неизменившиеся данные переписывать нет нужды.
Таким образом, мы пришли к выводу, что в случае Hive MDW должен пойти на переписывание всей таблицы, потому что Hive не имеет функции обновления. И ничего лучше полного переписывания данных при обновлении не придумано. В случае же RDBMS, наоборот, создатели продукта сочли нужным доверить соединение и обновление таблиц использованию SQL.
Для проекта в Сбербанке мы создали новую многократно используемую реализацию загрузчика базы данных для GreenPlum. Сделано это было на основе версии, которую MDW генерирует для Teradata. Именно Teradata, а не Oracle подошла для этого лучше и ближе всего, т.к. тоже является MPP-системой. Способы работы, а также синтаксис Teradata и GreenPlum оказались близки.
Примеры критичных для MDW различий между разными RDBMS таковы. В GreenPlum в отличии от Teradata при создании таблиц нужно писать клаузу
distributed by

В Teradata пишут
delete <table> all

, а в GreеnPlum пишут
delete from <table>

В Oracle в целях оптимизации пишут
delete from t where rowid in (<соединение t с дельтой>)

, а в Teradata и GreenPlum пишут
delete from t where exists (select * from delta where delta.pk=t.pk)

Ещё отметим, что для работы Ab Initio с GreenPlum потребовалось установить клиент GreenPlum на все ноды кластера Ab Initio. Это потому, что мы подключились к GreenPlum одновременно со всех узлов нашего кластера. А для того, чтобы чтение из GreenPlum было параллельным и каждый параллельный поток Ab Initio читал свою порцию данных из GreenPlum, пришлось в секцию where SQL-запросов поместить понимаемую Ab Initio конструкцию
where ABLOCAL()

и определить значение этой конструкции, указав читающей из БД трансформации параметр
ablocal_expr=string_concat("mod(t.", string_filter_out("{$TABLE_KEY}","{}"), ",", (decimal(3))(number_of_partitions()),")=", (decimal(3))(this_partition()))

, которая компилируется в что-то типа
mod(sk,10)=3

, т.е. приходится подсказывать GreenPlum явный фильтр для каждой партиции. Для других баз данных (Teradata, Oracle) Ab Initio может выполнить это распараллеливание автоматически.

Сравнительные характеристики производительности Ab Initio по работе с Hive и GreenPlum


В Сбербанке был проведён эксперимент по сравнению производительности сгенерированных MDW графов применительно к Hive и применительно к GreenPlum. В рамках эксперимента в случае Hive имелось 5 нод на том же кластере, что и Ab Initio, а в случае GreenPlum имелось 4 ноды на отдельном кластере. Т.е. Hive имел некоторое преимущество над GreenPlum по железу.
Было рассмотрено две пары графов, выполняющих одну и ту же задачу обновления данных в Hive и в GreenPlum. При этом запускали графы, сгенерированные конфигуратором MDW:

  • инициализирующая загрузка + инкрементальная загрузка случайно сгенерированных данных в таблицу Hive
  • инициализирующая загрузка + инкрементальная загрузка случайно сгенерированных данных в такую же таблицу GreenPlum

В обоих случаях (Hive и GreenPlum) запускали загрузки в 10 параллельных потоков на одном и том же кластере Ab Initio. Промежуточные данные для расчётов Ab Initio сохранял в HDFS (в терминах Ab Initio был использован MFS layout using HDFS). Одна строка случайно сгенерированных данных занимала в обоих случаях по 200 байт.
Результат получился такой:

Hive:
Инициализирующая загрузка в Hive
Вставлено строк 6 000 000 60 000 000 600 000 000
Продолжительность инициализирующей
загрузки в секундах
41 203 1 601
Инкрементальная загрузка в Hive
Количество строк, имевшихся в
целевой таблице на начало эксперимента
6 000 000 60 000 000 600 000 000
Количество строк дельты, применённых к
целевой таблице в ходе эксперимента
6 000 000 6 000 000 6 000 000
Продолжительность инкрементальной
загрузки в секундах
88 299 2 541

GreenPlum:
Инициализирующая загрузка в GreenPlum
Вставлено строк 6 000 000 60 000 000 600 000 000
Продолжительность инициализирующей
загрузки в секундах
72 360 3 631
Инкрементальная загрузка в GreenPlum
Количество строк, имевшихся в
целевой таблице на начало эксперимента
6 000 000 60 000 000 600 000 000
Количество строк дельты, применённых к
целевой таблице в ходе эксперимента
6 000 000 6 000 000 6 000 000
Продолжительность инкрементальной
загрузки в секундах
159 199 321

Видим, что скорость инициализирующей загрузки как в Hive, так и в GreenPlum линейно зависит от объёма данных и по причинам лучшего железа она несколько быстрее для Hive, чем для GreenPlum.
Инкрементальная загрузка в Hive также линейно зависит от объёма имеющихся в целевой таблице ранее загруженных данных и проходит достаточно медленно с ростом объёма. Вызвано это необходимостью перезаписывать целевую таблицу полностью. Это означает, что применение маленьких изменений к огромным таблицам не очень хороший вариант использования для Hive.
Инкрементальная же загрузка в GreenPlum слабо зависит от объёма имеющихся в целевой таблице ранее загруженных данных и проходит достаточно быстро. Получилось это благодаря SQL Joins и архитектуре GreenPlum, допускающей операцию delete.

Итак, GreenPlum вливает дельту методом delete+insert, а в Hive нету операций delete либо update, поэтому весь массив данных при инкрементальном обновлении были вынуждены переписывать целиком. Наиболее показательно сравнение выделенных жирным ячеек, так как оно соответствует наиболее частому варианту эксплуатации ресурсоёмких загрузок. Видим, что GreenPlum выиграл у Hive в этом тесте в 8 раз.

Работа Ab Initio с GreenPlum в режиме Near Real Time


В этом эксперименте проверим возможность Ab Initio производить обновление таблицы GreenPlum случайно формируемыми порциями данных в режиме, близком к реальному времени. Рассмотрим таблицу GreenPlum dev42_1_db_usl.TESTING_SUBJ_org_finval, с которой будет вестись работа.
Будем использовать три графа Ab Initio по работе с ней:

1) Граф Create_test_data.mp создаёт в 10 параллельных потоков файлы с данными в HDFS на 6 000 000 строк. Данные случайные, структура их организована для вставки в нашу таблицу



2) Граф mdw_load.day_one.current.dev42_1_db_usl_testing_subj_org_finval.pset сгенерированный MDW граф по инициализирующей вставке данных в нашу таблицу в 10 параллельных потоков (используются тестовые данные, сгенерированные графом (1))


3) Граф mdw_load.regular.current.dev42_1_db_usl_testing_subj_org_finval.pset сгенерированный MDW граф по инкрементальному обновлению нашей таблицы в 10 параллельных потоков с использованием порции свежих поступивших данных (дельты), сгенерированных графом (1)


Выполним нижеприведённый сценарий в режиме NRT:

  • сгенерировать 6 000 000 тестовых строк
  • произвести инициализирующую загрузку вставить 6 000 000 тестовых строк в пустую таблицу
  • повторить 5 раз инкрементальную загрузку
    • сгенерировать 6 000 000 тестовых строк
    • произвести инкрементальную вставку 6 000 000 тестовых строк в таблицу (при этом старым данным проставляется время истечения актуальности valid_to_ts и вставляются более свежие данные с тем же первичным ключом)

Такой сценарий эмулирует режим реальной работы некой бизнес-системы в режиме реального времени появляется достаточно объёмная порция новых данных и тут же вливается в GreenPlum.

Теперь посмотрим лог работы сценария:
Start Create_test_data.input.pset at 2020-06-04 11:49:11
Finish Create_test_data.input.pset at 2020-06-04 11:49:37
Start mdw_load.day_one.current.dev42_1_db_usl_testing_subj_org_finval.pset at 2020-06-04 11:49:37
Finish mdw_load.day_one.current.dev42_1_db_usl_testing_subj_org_finval.pset at 2020-06-04 11:50:42
Start Create_test_data.input.pset at 2020-06-04 11:50:42
Finish Create_test_data.input.pset at 2020-06-04 11:51:06
Start mdw_load.regular.current.dev42_1_db_usl_testing_subj_org_finval.pset at 2020-06-04 11:51:06
Finish mdw_load.regular.current.dev42_1_db_usl_testing_subj_org_finval.pset at 2020-06-04 11:53:41
Start Create_test_data.input.pset at 2020-06-04 11:53:41
Finish Create_test_data.input.pset at 2020-06-04 11:54:04
Start mdw_load.regular.current.dev42_1_db_usl_testing_subj_org_finval.pset at 2020-06-04 11:54:04
Finish mdw_load.regular.current.dev42_1_db_usl_testing_subj_org_finval.pset at 2020-06-04 11:56:51
Start Create_test_data.input.pset at 2020-06-04 11:56:51
Finish Create_test_data.input.pset at 2020-06-04 11:57:14
Start mdw_load.regular.current.dev42_1_db_usl_testing_subj_org_finval.pset at 2020-06-04 11:57:14
Finish mdw_load.regular.current.dev42_1_db_usl_testing_subj_org_finval.pset at 2020-06-04 11:59:55
Start Create_test_data.input.pset at 2020-06-04 11:59:55
Finish Create_test_data.input.pset at 2020-06-04 12:00:23
Start mdw_load.regular.current.dev42_1_db_usl_testing_subj_org_finval.pset at 2020-06-04 12:00:23
Finish mdw_load.regular.current.dev42_1_db_usl_testing_subj_org_finval.pset at 2020-06-04 12:03:23
Start Create_test_data.input.pset at 2020-06-04 12:03:23
Finish Create_test_data.input.pset at 2020-06-04 12:03:49
Start mdw_load.regular.current.dev42_1_db_usl_testing_subj_org_finval.pset at 2020-06-04 12:03:49
Finish mdw_load.regular.current.dev42_1_db_usl_testing_subj_org_finval.pset at 2020-06-04 12:06:46


Получается такая картина:
Graph Start time Finish time Length
Create_test_data.input.pset 04.06.2020 11:49:11 04.06.2020 11:49:37 00:00:26
mdw_load.day_one.current.
dev42_1_db_usl_testing_subj_org_finval.pset
04.06.2020 11:49:37 04.06.2020 11:50:42 00:01:05
Create_test_data.input.pset 04.06.2020 11:50:42 04.06.2020 11:51:06 00:00:24
mdw_load.regular.current.
dev42_1_db_usl_testing_subj_org_finval.pset
04.06.2020 11:51:06 04.06.2020 11:53:41 00:02:35
Create_test_data.input.pset 04.06.2020 11:53:41 04.06.2020 11:54:04 00:00:23
mdw_load.regular.current.
dev42_1_db_usl_testing_subj_org_finval.pset
04.06.2020 11:54:04 04.06.2020 11:56:51 00:02:47
Create_test_data.input.pset 04.06.2020 11:56:51 04.06.2020 11:57:14 00:00:23
mdw_load.regular.current.
dev42_1_db_usl_testing_subj_org_finval.pset
04.06.2020 11:57:14 04.06.2020 11:59:55 00:02:41
Create_test_data.input.pset 04.06.2020 11:59:55 04.06.2020 12:00:23 00:00:28
mdw_load.regular.current.
dev42_1_db_usl_testing_subj_org_finval.pset
04.06.2020 12:00:23 04.06.2020 12:03:23 00:03:00
Create_test_data.input.pset 04.06.2020 12:03:23 04.06.2020 12:03:49 00:00:26
mdw_load.regular.current.
dev42_1_db_usl_testing_subj_org_finval.pset
04.06.2020 12:03:49 04.06.2020 12:06:46 00:02:57

Видим, что 6 000 000 строк инкремента обрабатываются за 3 минуты, что достаточно быстро.
Данные в целевой таблице получились распределёнными следующим образом:
select valid_from_ts, valid_to_ts, count(1), min(sk), max(sk) from dev42_1_db_usl.TESTING_SUBJ_org_finval group by valid_from_ts, valid_to_ts order by 1,2;


Можно разглядеть соответствие вставленных данных моментам запуска графов.
Значит можно запускать в Ab Initio инкрементальную загрузку данных в GreenPlum с очень высокой частотой и наблюдать высокую скорость вставки этих данных в GreenPlum. Конечно, раз в секунду запускаться не получится, так как Ab Initio, как и любое ETL-средство, при запуске требует времени на раскачку.

Заключение


Сейчас Ab Initio используется в Сбербанке для построения Единого семантического слоя данных (ЕСС). Этот проект подразумевает построение единой версии состояния различных банковских бизнес-сущностей. Информация приходит из различных источников, реплики которых готовятся на Hadoop. Исходя из потребностей бизнеса, готовится модель данных и описываются трансформации данных. Ab Initio загружает информацию в ЕСС и загруженные данные не только представляют интерес для бизнеса сами по себе, но и служат источником для построения витрин данных. При этом функционал продукта позволяет использовать в качестве приёмника различные системы (Hive, Greenplum, Teradata, Oracle), что даёт возможность без особых усилий подготавливать данные для бизнеса в различных требуемых ему форматах.

Возможности Ab Initio широки, например, прилагающийся фреймворк MDW даёт возможность строить техническую и бизнес-историчность данных из коробки. Для разработчиков Ab Initio даёт возможность не изобретать велосипед, а пользоваться множеством имеющихся функциональных компонентов, по сути являющихся библиотеками, нужными при работе с данными.

Автор эксперт профессионального сообщества Сбербанка SberProfi DWH/BigData. Профессиональное сообщество SberProfi DWH/BigData отвечает за развитие компетенций в таких направлениях, как экосистема Hadoop, Teradata, Oracle DB, GreenPlum, а также BI инструментах Qlik, SAP BO, Tableau и др.
Подробнее..

Перевод Экономичная конфигурация исполнителей Apache Spark

20.11.2020 16:08:20 | Автор: admin

Привет, Хабр! В преддверии старта курса "Экосистема Hadoop, Spark, Hive" подготовили для вас перевод полезной статьи. А также предлагаем посмотреть бесплатную запись демо-урока по теме: "Spark 3.0: Что нового?".


Ищем наиболее оптимальную конфигурацию исполнителей для вашего узла

Количество ЦП на узел

Первый этап в определении оптимальной конфигурации исполнителей (executor) - это выяснить, сколько фактических ЦП (т.е. не виртуальных ЦП) доступно на узлах (node) в вашем кластер. Для этого вам необходимо выяснить, какой тип инстанса EC2 использует ваш кластер. В этой статье мы будем использовать r5.4xlarge, который, согласно прейскуранту на инстансы AWS EC2, насчитывает 16 процессоров.

Когда мы запускаем наши задачи (job), нам нужно зарезервировать один процессор для операционной системы и системы управления кластерами (Cluster Manager). Поэтому мы не хотели бы задействовать под задачу сразу все 16 ЦП. Таким образом, когда Spark производит вычисления, на каждом узле у нас остается только 15 доступных для аллоцирования ЦП.

Количество ЦП на исполнителя

Теперь, когда мы узнали, сколько ЦП доступно для использования на каждом узле, нам нужно определить, сколько ядер (core) Spark мы хотим назначить каждому исполнителю. С помощью базовой математики (X * Y = 15), мы можем посчитать, что существует четыре различных комбинации ядер и исполнителей, которые могут подойти для нашего случая с 15 ядрам Spark на узел:

Возможные конфигурации исполнителейВозможные конфигурации исполнителей

Давайте исследуем целесообразность каждой из этих конфигураций.

Один исполнитель с пятнадцатью ядрами

Самое очевидное решение, которое приходит на ум, - создать одного исполнителя с 15 ядрами. Проблема с большими жирными исполнителями, подобными этому, заключается в том, что исполнитель, поддерживающий такое количество ядер, обычно будет иметь настолько большой пул памяти (64 ГБ+), что задержки на сборку мусора будут неоправданно замедлять вашу работу. Поэтому мы сразу исключаем эту конфигурацию.

Пятнадцать одноядерных исполнителей

Следующее очевидное решение, которое приходит на ум создать 15 исполнителей, каждый из которых имеет только одно ядро. Проблема здесь в том, что одноядерные исполнители неэффективны, потому что они не используют преимуществ параллелизма, которые обеспечивают несколько ядер внутри одного исполнителя. Кроме того, найти оптимальный объем служебной памяти для одноядерных исполнителей может быть достаточно сложно. Давайте немного поговорим о накладных расходах памяти.

Накладные расходы памяти для исполнителя по умолчанию составляют 10% от размера выделенной вашему исполнителю памяти или 384 MB (в зависимости от того, что больше). Однако на некоторых big data платформах, таких как Qubole, накладные расходы зафиксированы на определенном значении по умолчанию, вне зависимости от размера вашего исполнителя. Вы можете проверить ваш показатель накладных расходов, перейдя во вкладку Environments в логе Spark и выполнив поиск параметра spark.executor.memoryOverhead.

Накладные расходы памяти по умолчанию в Spark будут очень маленьким, что результирует в проблемах с вашими задачами. С другой стороны, фиксированное значение накладных расходов для всех исполнителей приведет к слишком большому объему служебной памяти и, следовательно, оставит меньше места самим исполнителям. Выверить идеальный размер служебной памяти сложно, поэтому это еще одна причина, по которой одноядерный исполнитель нам не подходит.

Пять исполнителей с тремя ядрами или три исполнителя с пятью ядрами

Итак, у нас осталось два варианта. Большинство руководств по настройке Spark сходятся во мнении, что 5 ядер на исполнителя это оптимальное количество ядер с точки зрения параллельной обработки. И я тоже пришел к выводу, что это правда, в том числе, на основании моих собственных изысканий в оптимизации. Еще одно преимущество использования пятиядерных исполнителей по сравнению с трехъядерными заключается в том, что меньшее количество исполнителей на узел требует меньшее количество служебной памяти. Поэтому мы остановимся на пятиядерных исполнителях, чтобы минимизировать накладные расходы памяти на узле и максимизировать параллелизм внутри каждого исполнителя.

--executor-cores 5

Объем памяти на узел

Наш следующий шаг определить, сколько памяти назначить каждому исполнителю. Прежде чем мы сможем это сделать, мы должны определить, сколько физической памяти на нашем узле нам вообще доступно. Это важно, потому что физическая память это жесткое ограничение для ваших исполнителей. Если вы знаете, какой инстанс EC2 используете, значит, вы знаете и общий объем памяти, доступной на узле. Про наш инстанс r5.4xlarge AWS сообщает, что у него 128 ГБ доступной памяти.

Однако доступными для использования вашими исполнителями будут не все 128 ГБ, так как память нужно будет выделить также и для вашей системы управления кластерами. На рисунке ниже показано, где в YARN искать сколько памяти доступно для использования после того, как память была выделена для системы управления кластерами.

Мы видим, что на узлах этого кластера исполнителям доступно 112 ГБ.

Объем памяти на исполнителя

Если мы хотим, чтобы три исполнителя использовали 112 ГБ доступной памяти, то нам следует определить оптимальный размер памяти для каждого исполнителя. Чтобы вычислить объем памяти доступной исполнителю, мы попросту делим доступную память на 3. Затем мы вычитаем накладные расходы на память и округляем до ближайшего целого числа.

Если служебная память у вас фиксированная (как в случае с Qubole), вы будете использовать эту формулу. (112/3) = 372,3 = 34,7 = 34.

Если вы используете дефолтный метод Spark для расчета накладных расходов на память, вы будете использовать эту формулу. (112/3) = 37 / 1,1 = 33,6 = 33.

В оставшейся части этой статьи мы будем использовать фиксированный объем накладных расходов памяти для Qubole.

--executor-memory 34G

Чтобы по настоящему начать экономить, опытным тюнерам Spark необходим следующий сдвиг в парадигме. Я рекомендую вам в ваших исполнителях для всех задач использовать фиксированные размер памяти и количество ядер. Понимаю, что использование фиксированной конфигурации исполнителей для большинства задач Spark кажется противоречащим надлежащей практике тюнинга Spark. Даже если вы настроены скептически, я прошу вас опробовать эту стратегию, чтобы убедиться, что она работает. Выясните, как рассчитать затраты на выполнение вашей задачи, как описано в Части 2, а затем проверьте это на практике. Считаю, что если вы это сделаете, вы обнаружите, что единственный способ добиться эффективной экономичности облачных затрат это использовать фиксированные размеры памяти для ваших исполнителей, которые оптимально используют ЦП.

С учетом вышесказанного, если при использовании эффективного объема памяти для ваших исполнителей у вас остается много неиспользуемой памяти, подумайте о переносе вашего процесса на другой тип инстанса EC2, у которого меньше памяти на ЦП узла. Этот инстанс обойдется вам дешевле и, следовательно, поможет снизить стоимость выполнения вашей задачи.

Наконец, будут моменты, когда эта экономичная конфигурация не будет обеспечивать достаточную пропускную способность для ваших данных в вашем исполнителе. В примерах, приведенных во второй части, было несколько задач, в которых мне приходилось уходить от использования оптимального размера памяти, потому что нагрузка на память была максимальной во время всего выполнения задачи.

В этом руководстве я все же рекомендую вам начинать с оптимального размера памяти при переносе ваших задач. Если при оптимальной конфигурации исполнителей у вас возникают ошибки памяти, я поделюсь конфигурациями, которые избегают этих ошибок позже в Части 5.

Количество исполнителей на задачу

Теперь, когда мы определились с конфигурацией исполнителей, мы готовы настроить количество исполнителей, которые мы хотим использовать для нашей задачи. Помните, что наша цель - убедиться, что используются все 15 доступных ЦП на узел, что означает, что мы хотим, чтобы каждому узлу было назначено по три исполнителя. Если мы установим количество наших исполнителей кратное 3, мы добьемся своей цели.

Однако с такой конфигурацией есть одна проблема. Нам также нужно назначить драйвер для обработки всех исполнителей в узле. Если мы используем количество исполнителей, кратное 3, то наш одноядерный драйвер будет размещен в своем собственном 16-ядерном узле, что означает, что аж 14 ядер на этом последнем узле не будут использоваться в течение всего выполнения задачи. Это не очень хорошая практика использования облака!

Мысль, которую я хочу здесь донести, заключается в том, что идеальное количество исполнителей должно быть кратным 3 минус один исполнитель, чтобы освободить место для нашего драйвера.

--num-executors (3x - 1)

В Части 4 я дам вам рекомендации о том, сколько исполнителей вы должны использовать при переносе существующей задачи в экономичную конфигурацию исполнителей.

Объем памяти на драйвер

Обычной практикой для data-инженеров является выделение относительно небольшого размера памяти под драйвер по сравнению с исполнителями. Однако AWS на самом деле рекомендует устанавливать размер памяти вашего драйвера таким же, как и у ваших исполнителей. Я обнаружил, что это также очень помогает при оптимизации затрат.

--driver-memory 34G

В редких случаях могут возникать ситуации, когда вам нужен драйвер, память которого больше, чем у исполнителя. В таких случаях устанавливайте размер памяти драйвера в 2 раза больше памяти исполнителя, а затем используйте формулу (3x - 2), чтобы определить количество исполнителей для вашей задачи.

Количество ядер на драйвер

По умолчанию количество ядер на драйвер равно одному. Однако я обнаружил, что задачи, использующие более 500 ядер Spark, могут повысить производительность, если количество ядер на драйвер установлено в соответствии с количеством ядер на исполнителя. Однако не стоит сразу менять дефолтное количество ядер в драйвере. Просто протестируйте это на своих наиболее крупных задачах, чтобы увидеть, ощутите ли вы прирост производительности.

--driver-cores 5

Конфигурация универсальна?

Таким образом, конфигурация исполнителей, которую я рекомендую для узла с 16 процессорами и 128 ГБ памяти, будет выглядеть следующим образом.

--driver-memory 34G --executor-memory 34G --num-executors (3x - 1) --executor-cores 5

Но помните:

Не существует универсальных конфигураций просто продолжайте экспериментировать и рано или поздно вы найдете конфигурацию, идеально подходящую для ваших задач.

Как я уже упоминал выше, эта конфигурация может выглядеть не подходящей для ваших конкретных нужд. Я рекомендую вам использовать эту конфигурацию в качестве отправной точки в процессе оптимизации затрат. Если в этой конфигурации у вас возникают проблемы с памятью, в следующих частях этой серии я порекомендую вам конфигурации, которые позволят вам решить большинство проблем с памятью, возникающих при переходе на экономичные конфигурации.

Поскольку конфигурация узла, используемая в этой статье, достаточно распространена в Expedia Group , я буду ссылаться на нее в остальной части серии. Если ваши узлы имеют другой размер, вам следует использовать метод, который я изложил здесь, чтобы определить идеальную конфигурацию.

Теперь, когда у вас есть оптимальная экономичная конфигурация исполнителей, вы можете попробовать перенести на нее текущие задачи. Но какие задачи вам следует перенести в первую очередь? И сколько исполнителей вы должны запустить с этой новой конфигурацией? А что произойдет, если задача с оптимизированной стоимостью выполняется дольше, чем неоптимизированная задача? И уместно ли когда-либо избыточное использование ЦП? Я отвечу на эти вопросы в Части 4: Как перенести существующие задачи Apache Spark на экономичные конфигурации исполнителей.


Подробнее о курсе "Экосистема Hadoop, Spark, Hive" можно узнать здесь. Также можно посмотреть запись открытого урока "Spark 3.0: что нового?".

Читать ещё:

Подробнее..

Перевод Масштабирование итеративных алгоритмов в Spark

26.01.2021 14:17:08 | Автор: admin

Итеративные алгоритмы широко применяются в машинном обучении, связанных компонентах, ранжировании страниц и т.д. Эти алгоритмы усложняются итерациями, размеры данных на каждой итерации увеличивается, и сделать их отказоустойчивыми на каждой итерации непросто.

В этой статье я бы подробно остановился на некоторых моментах, которые необходимо учитывать при работе с этими задачами. Мы использовали Spark для реализации нескольких итерационных алгоритмов, таких как построение связанных компонентов, обход больших связанных компонентов и т.д. Ниже приведен мой опыт работы в лабораториях Walmart по построению связанных компонентов для 60 миллиардов узлов клиентской идентификации.

Количество итераций никогда не предопределено, всегда есть условие завершения ?.Количество итераций никогда не предопределено, всегда есть условие завершения ?.

Типы итеративных алгоритмов

Конвергентные данные: Здесь мы видим, что с каждой итерацией количество данных уменьшается, т.е. мы начинаем 1-ю итерацию с огромными наборами данных, а размер данных уменьшается с увеличением количества итераций. Основной задачей будет работа с огромными наборами данных в первых нескольких итерациях, и после того, как набор данных значительно уменьшится, можно будет легко справляться с дальнейшими итерациями до их завершения.

Расходящиеся данные: Количество данных увеличивается при каждой итерации, и иногда они могут появляться быстрее и сделать невозможной дальнейшую работу. Для работы этих алгоритмов необходимы такие ограничения, как ограничения по количеству итераций, начальному размеру данных, вычислительной мощности и т.д.

Аналогичные данные: На каждой итерации у нас были бы более или менее одинаковые данные, и с таким алгоритмом было бы очень легко работать.

Инкрементные данные: На каждой итерации у нас могут появляться новые данные, особенно в ML у нас могут появляться новые обучающие наборы данных с периодическими интервалами.

Препятствия

  1. RDD линии: Одним из распространенных способов сохранения отказоустойчивости системы является хранение копий данных в разных местах, чтобы в случае падения одного узла у нас была копия, которая помогала бы до тех пор, пока узел не восстановится. Но Spark не поддерживает дубликаты данных, а поддерживает линейный график преобразований, выполненных на данных в драйвере. Поэтому такой линейный график был бы полезен, если какой-либо фрагмент данных отсутствует, он может построить его обратно с помощью линейного графика, следовательно, Spark является отказоустойчивым. По мере того, как линейный график становится большим, становится трудно строить данные обратно, так как количество итераций увеличивается.

  2. Память и дисковый ввод/вывод: В Spark RDD являются непреложными, поэтому на каждой итерации мы будем создавать новую копию преобразованных данных (новый RDD), что увеличит использование Памяти и Диска. По мере того, как итерации будут увеличивать использование диска/памяти исполнителями, это может привести к замедлению работы из-за нехватки памяти и ожиданию, пока GC выполнит очистку. В некоторых случаях куча памяти будет недостаточной и может привести к невозможности выполнения задачи.

  3. Размер задачи: В некоторых случаях может быть несколько задач, которые могут не подходить для одного исполнителя, или одна задача занимает гораздо больше времени, чем остальные задачи, что может привести к препятствию.

Советы по преодолению вышеуказанных проблем

  1. Хранение большого линейного графика в памяти, и, в случае сбоя узла, восстановление потерянных наборов данных займет много времени. В таких случаях можно использовать кэширование или запись данных о состоянии в контрольной точке на каждой N итерации. Это сохранит рассчитанный RDD на каждой N итерации (кэширование будет храниться в памяти или на диске исполнителей, запись данных о состоянии в контрольной точке использует HDFS, мы должны принять решение исходя из нашей потребности, так как скорость будет различаться для каждой из них). В случае неудачи RDD вычисляется обратно от последней контрольной точки/кэширования. Вместо использования двух вышеупомянутых методов можно также создать временную таблицу и сохранить вычисленный набор данных, разделенный итерацией. В случае неудачи задания Spark, можно сделать перезапуск с последней N-ой итерации, а преимущество сохранения во временную таблицу состоит в том, что можно избавиться от линейного графика RDD до этой итерации и запустить свежий линейный график с этой точки. По мере того, как линейный график RDD растет в итерационных алгоритмах, нам необходимо строить гибридные решения с использованием кэширования, контрольных точек (см. ссылку [2]) и временных таблиц для различных вариантов использования.

  2. Как и выше, сохранение во временную таблицу и чтение из временной таблицы может помочь избавиться от линейного графика и очистить память и диск предыдущих RDD. Такая запись и чтение замедляет процесс, но это даст огромное преимущество при работе с большими наборами данных. Особенно при конвергировании наборов данных, нам может понадобиться выполнять этот процесс только в течение первых нескольких итераций и использовать кэширование, когда наборы данных становятся маленькими при работе с итерациями. Экономия во временной таблице в качестве контрольной точки выглядит тривиально, но она не просто действует как контрольная точка. Так как мы избавляемся от истории линейных графов, делая это на периодических итерациях, это уменьшит риск сбоя в работе и сократит время на построение ее обратной копии из потерянных данных.

  3. Работа с расходящимися данными сложна, так как размер каждой задачи будет увеличиваться с увеличением количества итераций и займет намного больше времени для каждого исполнителя. Поэтому нам нужен фактор для вычисления количества задач в ( i + 1) итерации по сравнению с i-й итерацией таким образом, чтобы размер задачи остался прежним. Например, скажем, количество задач в i-й итерации 100, и каждая задача обрабатывает около 100 МБ данных. В i+1 итерации размер каждой задачи увеличивается до 150 МБ, и мы можем перетасовать эти 100 задач до 150 задач и оставить 100 МБ на каждую задачу. Таким образом, в расходящихся наборах данных нам необходимо увеличить количество задач, переструктурировав и изменив перетасованные разделы на основе итерации.

  4. В случаях, когда размер spark задачи огромен, попробуйте увеличить память исполнителя в соответствии с размером задачи. А если нужно выполнить соединения на искаженных наборах данных, где 10% задач занимает 90% времени исполнения и 90% задач выполняются за 10% времени, эти задачи предлагается обрабатывать отдельно, выполняя их в виде двух разных запросов. Нужно определить причину больших задач, и можем ли мы разделить их на две группы, т.е. маленькие и большие задачи. В 1-м запросе мы бы обработали 90% задач, т.к. нет никаких препятствий для их обработки, и это заняло бы 10% времени, как и раньше. В другом запросе мы бы обрабатывали большие задачи (10% задач) с помощью всенаправленного соединения, так как количество таких задач меньше, а также избегали бы перетасовки данных.

Пример: Допустим, у нас есть таблица А и таблица Б. Таблица А это данные о населении со столбцами user_id, имя, город, штат. Таблица B это то, что группирует данные со столбцами user_id, group_id. Например, мы пытаемся найти 5 крупнейших городов с наибольшим количеством используемых групп. В этом примере могут быть тупиковые ситуации, как города с большим количеством населения могут быть большой задачей, пользователи с большим количеством групп могут привести к большим задачам. Для решения этих тупиковых ситуаций, объединение между этими таблицами может быть сделано в двух запросах. Мы можем отфильтровать больших пользователей с большим количеством групп (скажем, порог в 1000 групп на пользователя) и относиться к ним как к большим задачам. И выполнять соединения отдельно для больших пользователей, используя всенаправленное объединение, так как количество больших пользователей будет мало по сравнению с общими данными. Аналогичным образом, для остальных пользователей выполняйте тасовку объединения и объединяйте результаты и агрегируйте по городам, чтобы найти 5 лучших городов.


А прямо сейчас в OTUS открыт набор на курс Экосистема Hadoop, Spark, Hive.

Всех желающих приглашаем записаться на бесплатный демо-урок по теме Spark Streaming.

ЗАБРАТЬ СКИДКУ

Подробнее..

Перевод Как дебажить запросы, используя только Spark UI

08.11.2020 12:22:14 | Автор: admin

Егор Матешук (CDO AdTech-компании Квант и преподаватель в OTUS) приглашает Data Engineer'ов принять участие в бесплатном Demo-уроке Spark 3.0: что нового?. Узнаете, за счет чего Spark 3.0 добивается высокой производительности, а также рассмотрите другие нововведения.

Также приглашаем посмотреть запись трансляции Demo-урока Написание эффективных пользовательских функций в Spark и пройти вступительное тестирование по курсу Экосистема Hadoop, Spark, Hive!

У вас уже есть все, что вам нужно для дебаггинга запросов

Spark - самый широко используемый фреймворк для big data вычислений, способный выполнять задачи на петабайтах данных. Spark предоставляет набор веб-UI, которые можно использовать для отслеживания потребления ресурсов и состояния кластера Spark. Большинство проблем, с которыми мы сталкиваемся при выполнении задачи (job), можно отладить, перейдя в UI Spark.

spark2-shell --queue=P0 --num-executors 20Spark context Web UI available at http://<hostname>:<port>Spark context available as 'sc'Spark session available as 'spark'

В этой статье я попытаюсь продемонстрировать, как дебажить задачу Spark, используя только Spark UI. Я запущу несколько задач Spark и покажу, как Spark UI отражает выполнение задачи. Также я поделюсь с вами несколькими советами и хитростями.

Вот как выглядит Spark UI.

Мы начнем с вкладки SQL, которая включает в себя достаточно много информации для первоначального обзора. При использовании RDD в некоторых случаях вкладки SQL может и не быть.

А вот запрос, который запускаю в качестве примера

spark.sql("select id, count(1) from table1 group by id).show(10, false)

Перевод разъяснений в правой части:

<-- В рамках запроса было запущено 3 задачи, а сам запрос был выполнен за 21с.

<-- Файлы Parquet отсканированы, они содержат в сумме 23.7М строк

<-- Это работа выполненная каждой партицией

1. генерирует хеш id, count

2. группирует id и суммирует count. Вот как это выглядит

1. id = hash(125), count=1000

2. id = hash(124), count=900

<-- Происходит обмен данных, приведенных выше, на основе хеша id колонки, чтобы в результате каждая партиция имела один хеш

<-- Данные каждой партиции суммируются и возвращается count

Теперь давайте сопоставим это с физическим планом запроса. Физический план можно найти под SQL DAG, когда вы раскрываете вкладку details. Мы должны читать план снизу вверх

== Physical Plan ==CollectLimit 11+- *(2) HashAggregate(keys=[id#1], functions=[count(1)], output=[id#1, count(1)#79])+- Exchange hashpartitioning(id#1, 200)+- *(1) HashAggregate(keys=[id#1], functions=[partial_count(1)], output=[id#1, count#83L])+- *(1) FileScan parquet [id#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://<node>:<port><location>, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string>

Вот как следует читать план:

  1. Сканирование файла parquet. Обратите внимание на PushedFilters. Я продемонстрирую, что это означает позже

  2. Создание HashAggregate с ключами. Обратите внимание на partial_count. Это означает, что агрегированный count является частичным, поскольку агрегирование было выполнено в каждой отдельной задаче и не было смешанно для получения полного набора значений.

  3. Теперь сгенерированные данные агрегируются на основе ключа, в данном случае id.

  4. Теперь вычисляется вообще весь count.

  5. Полученный результат

Теперь, когда с этим мы разобрались, давайте посмотрим на данные PuedFilters. Spark оптимизирован для предикатов, и любые применяемые фильтры пушатся к источнику. Чтобы продемонстрировать это, давайте рассмотрим другую версию этого запроса

spark.sql("select id, count(1) from table1 where status = 'false' group by id).show(10, false)

А это его план

+- *(2) HashAggregate(keys=[id#1], functions=[count(1)], output=[id#1, count(1)#224])+- Exchange hashpartitioning(id#1, 200)+- *(1) HashAggregate(keys=[id#1], functions=[partial_count(1)], output=[id#1, count#228L])+- *(1) Project [id#1]+- *(1) Filter (isnotnull(status#3) && (status#3 = false))+- *(1) FileScan parquet [id#1,status#3] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://mr25p01if-ingx03010101.mr.if.apple.com:50001/home/hadoop/work/news/20200..., PartitionFilters: [], PushedFilters: [IsNotNull(status), EqualTo(status,false)], ReadSchema: struct<id:string,status:string>

Обратите внимание на изменения по сравнению с предыдущим планом.

Мы видим в PushedFilters уже кое-что другое проверка на null и проверка на равенство. Столбец, к которому мы применяем фильтр пушится к источнику, т.е. при чтении данных эти строки игнорируются. Результат этого переносится на следующие этапы.

Можем ли мы, применяя фильтры, уменьшить общее количество прочитанных данных (или файлов)?

Да мы можем. В обоих приведенных выше примерах общее количество прочитанных данных составляет ~ 23,8M. Чтобы уменьшить его, мы можем использовать магию файлов parquet. В Parquet есть группа строк, в которой есть статистика, которую можно использовать для игнорирования нескольких групп/файлов строк. Это приводит к тому, что эти файлы вообще не читаются. Вы можете прочитать о том, как это сделать, в другой моей статье на medium Insights Into Parquet Storage.

Вкладка Executor

Эта вкладка дает нам представление о количестве активных в настоящее время исполнителей в вашей сессии spark.

spark2-shell  queue=P0  driver-memory 20g  executor-memory 20g  num-executors 40

Я запросил 40 исполнителей для сессии, однако при запуске вы можете увидеть, что он предоставил мне всего 10 активных исполнителей. Это может быть связано с тем, что не работают хосты или Spark не нуждается в таком большом количестве исполнителей. Это также может вызвать задержку в планировании задач, поскольку у вас всего 10 исполнителей, а вам нужно 40, что скажется на параллелизме.

Вкладка Environment

Вкладка Environment содержит подробную информацию обо всех параметрах конфигурации, которые в данный момент использует сессия spark.

Посмотрите, как здесь отражены параметры, отраженные мной ранее. Это полезно хотя бы просто для того, чтобы убедиться, что предоставленная вами конфигурация принята.

Вкладка Storage

Здесь отображается информация об одной из наиболее обсуждаемых функций Spark - кэшировании. В Интернете доступно множество статей с разными мнениями относительно того, стоит ли кэшировать или нет. К счастью, эта статья не о том, когда стоит кэшировать и т. д. Он больше о том, что происходит, когда мы кэшируем.

Но перед этим давайте вернемся немного назад и потратим несколько минут на некоторые основы кэширования.

Есть два способа кэширования Dataframe:

df.persist

Для кэширования набора данных требуется несколько свойств.

df.cache

Под капотом это вызывает метод persist. Обратимся к исходному коду

def cache(): this.type = persist()/*** Persist this Dataset with the given storage level.* @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`,`MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`,`MEMORY_AND_DISK_2`, etc.* @group basic* @since 1.6.0*/
  • DISK_ONLY: хранить (persist) данные на диске только в сериализованном формате.

  • MEMORY_ONLY: [хранить данные в памяти только в десериализованном формате.

  • MEMORY_AND_DISK: хранить данные в памяти, а если памяти недостаточно, вытесненные блоки будут сохранены на диске.

  • MEMORY_ONLY_SER: этот уровень Spark хранит RDD как сериализованный объект Java (однобайтовый массив на партицию). Это более компактно по сравнению с десериализованными объектами. Но это увеличивает накладные расходы на CPU.

  • MEMORY_AND_DISK_SER: аналогично MEMORY_ONLY_SER, но с записью на диск, когда данные не помещаются в памяти.

  • Давайте воспользуемся df.cache в нашем примере и посмотрим, что произойдет a.cache() -> На вкладке Storage ничего не видно. Как вы можете догадаться, это из-за ленивого вычисления

Давайте воспользуемся df.cache в нашем примере и посмотрим, что произойдет

a.cache()

> На вкладке Storage ничего не видно. Как вы можете догадаться, это из-за ленивого вычисления

a.groupBy(id).count().show(10,false)

Мы видим какой-то кэш данных. Размер в памяти составляет 5,2 ГБ, а размер моего файла - 2 ГБ хммм что здесь произошло

hadoop dfs -dus <dirName>2,134,751,429 6,404,254,287 <dirName>

Это потому, что данные в памяти десериализованы и несжаты. Это результирует в большем объеме памяти по сравнению с диском.

Так что, когда вы хотите принимаете решение о том, кэшировать или нет, помните об этом.

Я видел несколько толковых статей о том, следует ли кэшировать или нет. Ознакомиться с ними - хорошая идея

Далее мы рассмотрим вкладки Jobs и Stages, причины множества проблем можно отдебажить с помощью этих вкладок.

spark.sql("select is_new_user,count(1) from table1 group by is_new_user").show(10,false)

Я вижу, что для указанного выше запроса запускаются 3 задачи. Но 2 из них пропущены. Обычно это означает, что данные были извлечены из кэша и не было необходимости повторно выполнять данный этап. Кроме того, Spark выполняет множество фиктивных задач для оценки данных. Пропуск задач мог быть связан и с этим.

Давайте же глубоко погрузимся в задачу, которая не была пропущена. Это визуализация DAG для задачи

Мы ясно видим, что эта задача состоит из двух этапов, разделенных операцией перемешивания/обмена. Stages означают, что данные были записаны на диск для использования в следующем процессе.

Давайте углубимся во вкладку stages.

Первое, что всегда нужно проверять, - это сводные метрики для задач. Вы можете нажать show additional metrics для получения дополнительных фактов. Это покажет множество необходимых параметров по минимуму, медиане и максимуму. В идеальном мире минимальное значение должно быть близко к максимальному.

Вот несколько моментов, которые следует отметить:

Продолжительность (duration): В нашем примере минимальная и максимальная продолжительность составляет 0,4 и 4 секунды соответственно. Это может быть связано с несколькими причинами, и мы постараемся отдебажить их в пунктах ниже.

Время десериализации задачи (Task deserialization time):

В нашем примере в рамках десериализации задачи некоторое время тратится и на другие задачи. Одной из основных причин было выполнение процессов сборки мусора в исполнителях. У меня выполнялись другие процессы, в которых были кэшированы некоторые данные, что приводило к сборке мусора. Процессам сборки мусора предоставляется наивысший приоритет, и они останавливают все запущенные процессы в угоду обслуживания процесса сборки мусора. Если вы видите, что ваш процесс не потребляет много памяти, первым шагом для решения такой проблемы может быть разговор с администратором/OPS.

Задержка планировщика (Scheduler delay): максимальная задержка планировщика составляет 0,4 секунды. Это означает, что одна из задач должна была ждать отправки еще 0,4 секунды. Большое это значение или маленькое, зависит от вашего конкретного юзкейса.

Размер ввода очень сильно распределен. Это очень хорошо, поскольку все задачи читают одинаковый объем данных. Это одна из самых важных вещей при поиске неверного/искаженного запроса. Это можно увидеть в столбце shuffle read в разделе Summary metrics for tasks. Самая простая логика для решения таких проблем - это добавление соли к группе, которая может распараллеливать данные, а затем, наконец, агрегирование данных без соли. Этот принцип может применяться во многих формах для решения проблемы асимметрии данных.

Еще одна вещь, на которую стоит обратить внимание, - это уровень локальности.

* PROCESSLOCAL Эта задача будет запущена в том же процессе, что и исходные данные

* NODELOCAL Эта задача будет запущена на том же компьютере, что и исходные данные

* RACKLOCAL Эта задача будет запущена в том же блоке, что и исходные данные

* NOPREF (Отображается как ANY) Эта задача не может быть запущена в том же процессе, что и исходные данные, или это не имеет значения.

Предположим, мы потребляем данные из узла Cassandra в кластере Spark, состоящем из трех узлов. Cassandra работает на машине X узлов Spark X, Y и Z. Для узла X все данные будут помечены как NODELOCAL. Это означает, что после того, как каждое ядро на X будет занято, мы останемся с задачами, предпочтительное расположение которых - X, но у нас есть пространство для выполнения только на Y и Z. У Spark есть только два варианта: дождаться, пока ядра станут доступны на X, или понизить уровень локальности задачи и попытаться найти место для них и принять любые штрафы за нелокальное выполнение.

Параметр spark.locality.wait описывает, как долго ждать перед понижением уровня задач, которые потенциально могут выполняться с более высокого уровня локальности до более низкого уровня. Этот параметр, по сути, является нашей оценкой того, сколько стоит ожидание локального места. Значение по умолчанию - 3 секунды, что означает, что в нашем примере с Cassandra, как только наш совместно расположенный узел X будет забит задачами, другие наши машины Y и Z будут простаивать в течение 3 секунд, прежде чем задачи, которые могли быть NODELOCAL, будут понижены до ANY* и запущены.

Вот пример кода для этого.

Я надеюсь, что эта статья послужит вам в качестве руководства по дебаггингу на Spark UI с целью устранения проблем с производительностью Spark. В Spark 3 есть много дополнительных функций, которые тоже стоит посмотреть.

Также хорошая идея почитать документацию Spark UI.

Вы также можете связаться со мной в Linkedin.

Хотите узнать, как Apache Druid индексирует данные для сверхбыстрых запросов? Узнайте об этом здесь:

Insights into Indexing using Bitmap Index


Интересно развиваться в данном направлении? Участвуйте в трансляции мастер-класса Spark 3.0: что нового? и оцените программу курса Экосистема Hadoop, Spark, Hive!

Подробнее..

Перевод Почему ваши Spark приложения медленно работаютили не работают вообще. Часть 1 Управление памятью

02.02.2021 02:11:38 | Автор: admin

Будущих учащихся на курсе Экосистема Hadoop, Spark, Hive приглашаем на открытый вебинар по теме Spark Streaming. На вебинаре участники вместе с экспертом познакомятся со Spark Streaming и Structured Streaming, изучат их особенности и напишут простое приложение обработки потоков.

А сейчас делимся с вами традиционным переводом полезного материала.


Spark приложения легко писать и легко понять, когда все идет по плану. Однако, это становится очень сложно, когда приложения Spark начинают медленно запускаться или выходить из строя. Порой хорошо настроенное приложение может выйти из строя из-за изменения данных или изменения компоновки данных. Иногда приложение, которое до сих пор работало хорошо, начинает вести себя плохо из-за нехватки ресурсов. Список можно продолжать и продолжать.

Важно понимать не только приложение Spark, но также и его базовые компоненты среды выполнения, такие как использование диска, сети, конфликт доступа и т.д., чтобы мы могли принимать обоснованные решения, когда дела идут плохо.

В этой серии статей я хочу рассказать о некоторых наиболее распространенных причинах, по которым приложение Spark выходит из строя или замедляется. Первая и наиболее распространенная это управление памятью.

Если бы мы заставили всех разработчиков Spark проголосовать, то условия отсутствия памяти (OOM) наверняка стали бы проблемой номер один, с которой все столкнулись.Это неудивительно, так как архитектура Spark ориентирована на память. Некоторые из наиболее распространенных причин OOM:

  • неправильное использование Spark

  • высокая степень многопоточности (high concurrency)

  • неэффективные запросы

  • неправильная конфигурация

Чтобы избежать этих проблем, нам необходимо базовое понимание Spark и наших данных. Есть определенные вещи, которые могут быть сделаны, чтобы либо предотвратить OOM, либо настроить приложение, которое вышло из строя из-за OOM.Стандартная конфигурация Spark может быть достаточной или не подходящей для ваших приложений. Иногда даже хорошо настроенное приложение может выйти из строя по причине OOM, когда происходят изменения базовых данных.

Переполнение памяти может быть в узлах драйвера, исполнителя и управления. Рассмотрим каждый случай.

НЕДОСТАТОЧНО ПАМЯТИ ПРИ РАБОТЕ ДРАЙВЕРА

Драйвер в Spark это JVM (Java Virtual Machine) процесс, в котором работает основной поток управления приложения. Чаще всего драйвер выходит из строя с ошибкой OutOfMemory OOM (недостаточно памяти из-за неправильного использования Spark. Spark это механизм распределения нагрузки между рабочим оборудованием. Драйвер должен рассматриваться только как дирижер. В типовых установках драйверу предоставляется меньше памяти, чем исполнителям. Поэтому мы должны быть осторожны с тем, что мы делаем с драйвером.

Обычными причинами, приводящими к OutOfMemory OOM (недостаточно памяти) драйвера, являются:

  • rdd.collect()

  • sparkContext.broadcast

  • Низкий уровень памяти драйвера, настроенный в соответствии с требованиями приложения

  • Неправильная настройка Spark.sql.autoBroadcastJoinThreshold.

Spark использует этот лимит для распределения связей ко всем узлам в случае операции соединения. При самом первом использовании, все связи реализуются на узле драйвера. Иногда многочисленные таблицы также транслируются как часть осуществления запроса.

Попробуйте написать свое приложение таким образом, чтобы в драйвере можно было избежать полного сбора всех результатов. Вы вполне можете делегировать эту задачу одной из управляющих программ. Например, если вы хотите сохранить результаты в определенном файле, вы можете либо собрать их в драйвере, или назначить программу, которая сделает это за вас.

Если вы используете SQL (Structured Query Language) от Spark, а драйвер находится в состоянии OOM из-за распределения связей, то вы можете либо увеличить память драйвера, если это возможно; либо уменьшить значение "spark.sql.autoBroadcastJoinThreshold" (неправильная настройка порога подключения) так, чтобы ваши операции по объединению использовали более удобные для памяти операции слияния соединений.

НЕДОСТАТОЧНО ПАМЯТИ ПРИ РАБОТЕ УПРАВЛЯЮЩЕЙ ПРОГРАММ

Это очень распространенная проблема с приложениями Spark, которая может быть вызвана различными причинами. Некоторые из наиболее распространенных причин высокая степень многопоточности, неэффективные запросы и неправильная конфигурация. Рассмотрим каждую по очереди.

ВСОКАЯ СТЕПЕНЬ МНОГОПОТОЧНОСТИ

Прежде чем понять, почему высокая степень многопоточности может быть причиной OOM, давайте попробуем понять, как Spark выполняет запрос или задание и какие компоненты способствуют потреблению памяти.

Spark задания или запросы разбиваются на несколько этапов, и каждый этап далее делится на задачи. Количество задач зависит от различных факторов, например, на какой стадии выполняется, какой источник данных читается и т.д. Если это этап map-stage (фаза сканирования в SQL), то, как правило, соблюдаются базовые разделы источника данных.

Например, если реестр таблицы ORC (Optimized Row Columnar) имеет 2000 разделов, то для этапа map-stage создается 2000 заданий для чтения таблицы, предполагая, что обработка разделовещё не началась. Если это этап reduce-stage (стадия Shuffle), то для определения количества задач Spark будет использовать либо настройку "spark.default.parallelism" для RDD (Resilient Distributed Dataset), либо "spark.sql.shuffle.partitions" для DataSet (набор данных). Сколько задач будет выполняться параллельно каждой управляющей программе, будет зависеть от свойства "spark.executor.cores". Если это значение установить больше без учета памяти, то программы могут отказать и привести к ситуации OOM (недостаточно памяти). Теперь посмотрим на то, что происходит, как говорится, за кадром, при выполнении задачи и на некоторые вероятные причины OOM.

Допустим, мы реализуем задачу создания схемы (map) или этап сканирования SQL из файла HDFS (распределенная файловая система Hadoop distributed file system) или таблицы Parquet/ORC. Для файлов HDFS каждая задача Spark будет считывать блок данных размером 128 МБ.Таким образом, если выполняется 10 параллельных задач, то потребность в памяти составляет не менее 128*10 только для хранения разбитых на разделы данных. При этом опять же игнорируется любое сжатие данных, которое может привести к резкому скачку данных в зависимости от алгоритмов сжатия.

Spark читает Parquet (формат файлов с открытым исходным кодом) в векторном формате. Проще говоря, каждая задача Spark считывает данные из файла Parquet пакет за пакетом. Так как Parquet является столбцом, то эти пакеты строятся для каждого из столбцов.Она накапливает определенный объем данных по столбцам в памяти перед выполнением любой операции над этим столбцом. Это означает, что для хранения такого количества данных Spark необходимы некоторые структуры данных и учет. Кроме того, такие методы кодирования, как словарное кодирование, имеют некоторое состояние, сохраненное в памяти. Все они требуют памяти.

Spark задачи и компоненты памяти во время сканирования таблицыSpark задачи и компоненты памяти во время сканирования таблицы

Так что, при большем количестве параллелей, потребление ресурсов увеличивается. Кроме того, если речь идет о широковещательное соединении (broadcast join), то широковещательные переменные (broadcast variables) также займут некоторое количество памяти. На приведенной выше диаграмме показан простой случай, когда каждый исполнитель выполняет две задачи параллельно.

НЕЭФФЕКТИВНЕ ЗАПРОС

Хотя программа Spark's Catalyst пытается максимально оптимизировать запрос, она не может помочь, если сам запрос плохо написан. Например, выбор всех столбцов таблицы Parquet/ORC. Как видно из предыдущего раздела, каждый столбец нуждается в некотором пакетном состоянии в памяти. Если выбрано больше столбцов, то больше будет потребляться ресурсов.

Постарайтесь считывать как можно меньше столбцов. Попробуйте использовать фильтры везде, где это возможно, чтобы меньше данных попадало к управляющим программам. Некоторые источники данных поддерживают обрезку разделов. Если ваш запрос может быть преобразован в столбец(ы) раздела, то это в значительной степени уменьшит перемещение данных.

НЕПРАВИЛЬНАЯ КОНФИГУРАЦИЯ

Неправильная конфигурация памяти и кэширования также может привести к сбоям и замедлению работы приложений Spark. Рассмотрим некоторые примеры.

ПАМЯТЬ ИСПОЛНИТЕЛЯ И ДРАЙВЕРА

Требования к памяти каждого приложения разные. В зависимости от требований, каждое приложение должно быть настроено по-разному. Вы должны обеспечить правильные значения памяти spark.executor.memory или spark.driver.memory в зависимости от загруженности. Как бы очевидно это ни казалось, это одна из самых трудных задач. Нам нужна помощь средств для мониторинга фактического использования памяти приложения. Unravel (Unravel Data Operations Platform) делает это довольно хорошо.

ПЕРЕГРУЗКА ПАМЯТИ

Иногда это не память управляющей программы, а перегруженная память модуля YARN (Yet Another Resource Negotiator еще один ресурсный посредник), которая вызывает OOM или узел перестает функционировать (killed) из-за YARN. Сообщения "YARN kill" обычно выглядят так:

YARN запускает каждый компонент Spark, как управляющие программы и драйвера внутри модулей. Переполненная память это off-heap память, используемая для JVM в режиме перегрузки, интернированных строк и других метаданных JVM. В этом случае необходимо настроить spark.yarn.executor.memoryOverhead на нужное значение. Обычно 10% общей памяти управляющей программы должно быть выделено под неизбежное потребление ресурсов.

КЭШИРОВАННАЯ ПАМЯТЬ

Если ваше приложение использует кэширование Spark для хранения некоторых наборов данных, то стоит обратить внимание на настройки менеджера памяти Spark. Менеджер памяти Spark разработан в очень общем стиле, чтобы удовлетворить основные рабочие нагрузки. Следовательно, есть несколько настроек, чтобы установить его правильно для определенной внеплановой нагрузки.

Spark определила требования к памяти как два типа: исполнение и хранение. Память хранения используется для кэширования, а память исполнения выделяется для временных структур, таких как хэш-таблицы для агрегирования, объединения и т. д.

Как память исполнения, так и память хранения можно получить из настраиваемой части (общий объем памяти 300МБ). Эта настройка называется "spark.memory.fraction". По умолчанию 60%. Из них по умолчанию 50% (настраивается параметром "spark.memory.storageFraction") выделяется на хранение и остаток выделяется на исполнение.

Бывают ситуации, когда каждый из вышеперечисленных резервов памяти, а именно исполнение и хранение, могут занимать друг у друга, если другой свободен. Кроме того, память в хранилище может быть уменьшена до предела, если она заимствовала память из исполнения. Однако, не вдаваясь в эти сложности, мы можем настроить нашу программу таким образом, чтобы наши кэшированные данные, которые помещаются в память хранилища, не создавали проблем для выполнения.

Если мы не хотим, чтобы все наши кэшированные данные оставались в памяти, то мы можем настроить "spark.memory.storageFraction" на меньшее значение, чтобы лишние данные были исключены и выполнение не столкнулось бы с нехваткой памяти.

ПЕРЕГРУЗКА ПАМЯТИ В МЕНЕДЖЕРЕ УЗЛА

Spark приложения, которые осуществляют перетасовку данных в рамках групповых операций или присоединяются к подобным операциям, испытывают значительные перегрузки. Обычно процесс перетасовки выполняется управляющей программой. Если управляющая программа (исполнитель) занята или завалена большим количеством (мусора) GC (Garbage Collector), то она не может обслуживать перетасовки запросов. Эта проблема в некоторой степени решается за счет использования внешнего сервиса обмена.

Внешний сервис обмена работает на каждом рабочем узле и обрабатывает поступающие от исполнителей запросы на переключение. Исполнители могут читать перемешанные файлы с этого сервиса, вместо того, чтобы не считывать файлы между собой. Это помогает запрашивающим исполнителям читать перемешанные файлы, даже если производящие их исполнители не работают или работают медленно. Также, когда включено динамическое распределение, его обязательным условием является включение внешнего сортировочного сервиса.

Когда внешний сервис обмена данными Spark настроен с помощью YARN, NodeManager (управляющий узел) запускает вспомогательный сервис, который действует как внешний провайдер обмена данными. По умолчанию память NodeManager составляет около 1 ГБ. Однако приложения, выполняющие значительную перестановку данных, могут выйти из строя из-за того, что память NodeManager исчерпана. Крайне важно правильно настроить NodeManager, если ваши приложения попадают в вышеуказанную категорию.

КОНЕЦ ЧАСТИ 1, СПАСИБО ЗА ВНИМАНИЕ

Процессинг внутренней памяти Spark ключевая часть ее мощности. Поэтому эффективное управление памятью является критически важным фактором для получения наилучшей производительности, расширяемости и стабильности ваших приложений Spark и каналов передачи данных. Однако настройки по умолчанию в Spark часто бывают недостаточными. В зависимости от приложения и среды, некоторые ключевые параметры конфигурации должны быть установлены правильно для достижения ваших целей производительности. Если иметь базовое представление о них и о том, как они могут повлиять на общее приложение, то это поможет в работе.

Я поделился некоторыми соображениями о том, на что следует обратить внимание при рассмотрении вопроса об управлении памятью Spark. Это область, которую платформа Unravel понимает и оптимизирует очень хорошо, с небольшим количеством, если таковое вообще потребуется, человеческого вмешательства. Я рекомендую вам заказать демо-версию, чтобы увидеть Unravel в действии. Мы видим довольно значительное ускорение работы приложений Spark.

Во второй части этой серии статьи напишу о том, почему ваши приложения Spark медленно работают или не работают: Во второй части цикла, посвященной искажению данных и сбору мусора, я расскажу о том, как структура данных, искажение данных и сбор мусора влияют на производительность Spark.


Узнать подробнее о курсе Экосистема Hadoop, Spark, Hive.

Записаться на открытый вебинар по теме Spark Streaming.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru