Русский
Русский
English
Статистика
Реклама

Spark core api

Перевод Секреты производительности Spark, или Почему важна компиляция запросов

24.11.2020 18:14:36 | Автор: admin

Для будущих студентов курсов "Data Engineer" и "Экосистема Hadoop, Spark, Hive" подготовили еще один перевод полезной статьи.


Criteo это компания, работа которой основана на данных. Каждый день через наши системы проходят десятки терабайт новых данных для обучения моделей рекомендаций, обрабатывающих запросы в масштабах всего Интернета. Spark наше основное средство обработки больших данных. Это мощный и гибкий инструмент, однако он отличается довольно высокой сложностью в освоении, а чтобы пользоваться им эффективно, зачастую требуется читать исходный код платформы.

Быстрая обработка больших данных имеет критическое значение для нашего бизнеса:

  • мы часто обновляем наши модели, повышая их производительность для наших клиентов;

  • мы быстро выводим новые продукты на базе машинного обучения на рынок за счет того, что можем быстро выполнять итерации;

  • от скорости обработки данных зависят затраты на инфраструктуру.

В этой статье я расскажу о написании эффективного кода Spark и на примерах продемонстрирую распространенные подводные камни. Я покажу, что в большинстве случаев Spark SQL (Datasets) следует отдавать предпочтение перед Spark Core API (RDD), и если сделать правильный выбор, можно повысить производительность обработки больших данных в 210 раз, а это очень значимо.

Конфигурация для экспериментов

Spark 2.4.6, Macbook Pro 2017 с процессором Intel Core i7 с частотой 3,5ГГц

Измерения всегда производятся на разогретой виртуальной Java-машине (выполняется 100 прогонов кода, и берется среднее значение за последние 90 прогонов). Приведенный в этой статье код написан на Scala, но ее выводы должны быть справедливыми и для Python.

Заблуждения, связанные с обработкой больших данных

Существует распространенное мнение, что в процессах обработки больших данных есть два основных узких места, влияющих на производительность:

  • перетасовка данных, поскольку для ее выполнения требуется отправлять данные по сети;

  • дисковый ввод-вывод, поскольку доступ к данным на диске всегда намного медленнее, чем доступ к данным в ОЗУ.

Эти представления имеют под собой исторические основания в 2006 году, когда впервые появилась библиотека Hadoop, обычные жесткие диски были медленными и ненадежными, а основной платформой для обработки больших данных была MapReduce. Именно медленная работа жестких дисков и подстегнула разработку таких средств обработки в памяти, как Spark. С того времени характеристики аппаратного обеспечения значительно улучшились.

В 2015 году висследовании Кей Остерхаут (Kay Ousterhout) и др. были проанализированы узкие места в заданиях Spark, и в результате выяснилось, что скорость их выполнения в большей степени определяется операциями, загружающими ЦП, а не вводом-выводом и передачей данных по сети. В частности, авторами этой научной работы был выполнен широкий спектр запросов к трем тестовым наборам данных, включаяTPC-DS, и было определено, что:

  • если бы пропускная способность сети была безграничной, время выполнения заданий можно было бы сократить на 2% (медианное значение);

  • если бы пропускная способность дискового ввода-вывода была безграничной, время выполнения стандартного аналитического процесса можно было бы сократить на 19% (медианное значение).

Весьма неожиданный результат! Получается, что дисковый ввод-вывод оказывает намного большее влияние на производительность, чем передача данных по сети. Этому есть несколько причин:

  • Spark использует дисковый ввод-вывод не только при считывании входного набора данных и записи результата, но и в ходе выполнения заданий для кэширования и переноса на диск данных, которые не умещаются в ОЗУ.

  • При выполнении аналитических заданий часто требуется производить агрегацию, поэтому объем данных, передаваемых по сети, обычно меньше, чем объем данных, которые первоначально считываются с диска.

Интересно, что специалисты Databricks примерно в 2016 году пришли к таким же заключениям, что заставило их переориентировать вектор развития Spark на оптимизацию использования процессора. Результатом стало внедрение поддержки SQL, а также API DataFrames и позднее Datasets.

Насколько быстро работает Spark?

Давайте рассмотрим простую задачу посчитаем наивным методом четные числа от 0 до 10. Для выполнения такого задания Spark, в принципе, не требуется, поэтому для начала напишем простую программу на Scala:

var res: Long = 0Lvar i: Long  = 0Lwhile (i < 1000L * 1000 * 1000) {  if (i % 2 == 0) res += 1  i += 1L}

Листинг1. Наивный подсчет

А теперь давайте также вычислим этот же результат с помощью Spark RDD и Spark Datasets. Чтобы эксперимент был честным, я запускаю Spark в локальном[1] режиме:

val res = spark.sparkContext  .range(0L, 1000L * 1000 * 1000)  .filter(_ % 2 == 0)  .count()

Листинг2. Подсчет с помощью RDD

val res = spark.range(1000L * 1000 * 1000)  .filter(col("id") % 2 === 0)  .select(count(col("id")))  .first().getAs[Long](0)

Листинг3. Подсчет с помощью Datasets

Время выполнения всех фрагментов кода приведено ниже. Неудивительно, что написанный вручную код является самым эффективным решением. Удивительно же то, что RDD в пять раз медленнее, тогда как у Datasets время вычисления почти такое же, как у написанного вручную кода.

Парадокс Datasets

Парадокс: API-интерфейс Datasets построен на основе RDD, однако работает намного быстрее, почти так же быстро, как код, написанный вручную для конкретной задачи. Как такое вообще возможно? Дело в новой модели выполнения.

Прошлое модель Volcano

Код, написанный с использованием RDD, выполняется с помощью модели выполнения Volcano. На практике это означает, что каждый RDD следует стандартному интерфейсу:

  • знает свой родительский RDD;

  • предоставляет посредством методаcomputeдоступ к итератору Iterator[T], который перебирает элементы данного RDD (он является private и должен использоваться только разработчиками Spark).

abstract class RDD[T: ClassTag]def compute(): Iterator[T]

Листинг4. RDD.scala

С учетом этих свойств упрощенная версия реализации функции подсчета для RDD, которая игнорирует разбиение, выглядит вот так:

def pseudo_rdd_count(rdd: RDD[T]): Long = {  val iter = rdd.compute  var result = 0  while (iter.hasNext) result += 1  result}

Листинг5. Псевдокод для действия подсчета на основе RDD

Почему этот код работает значительно медленнее, чем написанный вручную код, который приведен в листинге 1? Есть несколько причин:

  • Вызовы итераторов виртуальной функцией: вызовы Iterator.next() несут дополнительную нагрузку по сравнению с функциями, не являющимися виртуальными, которые могут выполняться компилятором илиJIT как встроенные (inline).

  • Отсутствие оптимизации на уровне ЦП: виртуальная Java-машина и JIT не могут оптимизировать байт-код, образуемый листингом 5, так же хорошо, как байт-код, получаемый при использовании листинга 1. В частности, написанный вручную код позволяет виртуальной Java-машине и JIT хранить промежуточные результаты вычислений в регистре ЦП, а не помещать их в основную память.

Настоящее формирование кода всего этапа

Код, написанный с помощью Spark SQL, выполняется не так, как код, написанный с использованием RDD. Когда запускается действие, Spark генерирует код, который сворачивает несколько трансформаций данных в одну функцию. Этот процесс называется формированием кода всего этапа (Whole-Stage Code Generation). Spark пытается воспроизвести процесс написания специального кода для конкретной задачи, в котором не используются вызовы виртуальных функций. Такой код может выполняться JVM/JIT более эффективно. На самом деле Spark генерирует довольно много кода, см., например, код Spark для листинга 3.

Технически Spark только формирует высокоуровневый код, а генерация байт-кода выполняется компилятором Janino. Именно это и делает Spark SQL настолько быстрым по сравнению с RDD.

Эффективное использование Spark

Сегодня в Spark есть 3 API-интерфейса Scala/Java: RDD, Datasets и DataFrames (который теперь объединен с Datasets). RDD все еще широко применяется в Spark в частности, из-за того, что этот API используется большинством созданных ранее заданий, и перспектива продолжать в том же духе весьма заманчива. Однако, как показывают тесты, переход на API-интерфейс Datasets может дать громадный прирост производительности за счет оптимизированного использования ЦП.

Неправильный подход классический способ

Самая распространенная проблема, с которой я сталкивался при использовании Spark SQL, это явное переключение на API RDD. Причина состоит в том, что программисту зачастую проще сформулировать вычисление в терминах объектов Java, чем с помощью ограниченного языка Spark SQL:

val res = spark.range(1000L * 1000 * 1000)    .rdd    .filter(_ %2 == 0)    .count()

Листинг6. Переключение с Dataset на RDD

Этот код выполняется в течение 43 секунд вместо исходных 2,1 секунды, при этом делая абсолютно то же самое. Явное переключение на RDD останавливает формирование кода всего этапа и запускает преобразование элементов наборов данных из примитивных типов в объекты Java, что оказывается очень затратным. Если мы сравним схемы этапов выполнения кода из листингов 3 и 6 (см. ниже), то увидим, что во втором случае появляется дополнительный этап.

Рисунок 1. Визуальные представления этапов для листинга 3 (схема a) и листинга 6 (схема b)Рисунок 1. Визуальные представления этапов для листинга 3 (схема a) и листинга 6 (схема b)

Неправильный подход изысканный способ

Производительность Spark SQL является на удивление хрупкой. Это незначительное изменение приводит к увеличению времени выполнения запроса в три раза (до 6 секунд):

val res = spark  .range(1000L * 1000 * 1000)  .filter(x => x % 2 == 0) // note that the condition changed  .select(count(col("id")))   .first()  .getAs[Long](0)

Листинг7. Замена выражения Spark SQL функцией Scala

Spark не способен генерировать эффективный код для условия в фильтре. Условие является анонимной функцией Scala, а не выражением Spark SQL, и Spark выполнит десериализацию каждой записи из оптимизированного внутреннего представления, чтобы вызвать эту функцию. Причем вот что примечательно это изменение никак не сказывается на визуальном представлении этапов (рис. 1a), поэтому его невозможно обнаружить, анализируя направленный ациклический граф (DAG) задания в пользовательском интерфейсе Spark.

Высокая производительность Spark SQL обеспечивается за счет ограничения круга доступных операций чем-то все равно приходится жертвовать! Чтобы получить максимальную производительность, нужно использовать преобразования, которые работают со столбцами: используйтеfilter(condition: Column)вместоfilter(T => Boolean) иselect()вместоmap(). При этом Spark не придется перестраивать объект, представленный одной строкой набора данных (Dataset). И, разумеется, избегайте переключения на RDD.

Заключение и итоговые замечания

Приведенные в этой статье простые примеры демонстрируют, что большая часть времени выполнения заданий обработки больших данных не тратится на полезную работу. Хорошим решением этой проблемы является компиляция запросов, которая возможна с использованием Spark SQL и обеспечивает более эффективное использование современного аппаратного обеспечения. Последние исследования свидетельствуют, что использование эффективных запросов для стандартных процессов обработки больших данных важнее, чем оптимизация использования сети и дискового ввода-вывода.

Правильное применение компиляции запросов может сократить время обработки в 210 раз, а это означает ускорение экспериментов, снижение затрат на инфраструктуру и громадное удовольствие от элегантного выполнения своей работы!

Образцы кода из этой статьи можно найти здесь. С помощью этого репозитория можно анализировать производительность разных запросов Spark.

Использованные материалы

  1. Ousterhout, Kay, et al. Making sense of performance in data analytics frameworks (Анализ производительности платформ анализа данных).12-й симпозиум {USENIX} по проектированию и реализации сетевых систем ({NSDI} 15). 2015.

  2. http://www.tpc.org/tpcds/

  3. https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

4.https://janino-compiler.github.io/janino/

5.http://people.csail.mit.edu/matei/papers/2015/sigmodsparksql.pdf

6.https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html


Узнать подробнее о курсах "Data Engineer" и "Экосистема Hadoop, Spark, Hive".

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru