Русский
Русский
English
Статистика
Реклама

Apache spark

Перевод Экономичная конфигурация исполнителей Apache Spark

20.11.2020 16:08:20 | Автор: admin

Привет, Хабр! В преддверии старта курса "Экосистема Hadoop, Spark, Hive" подготовили для вас перевод полезной статьи. А также предлагаем посмотреть бесплатную запись демо-урока по теме: "Spark 3.0: Что нового?".


Ищем наиболее оптимальную конфигурацию исполнителей для вашего узла

Количество ЦП на узел

Первый этап в определении оптимальной конфигурации исполнителей (executor) - это выяснить, сколько фактических ЦП (т.е. не виртуальных ЦП) доступно на узлах (node) в вашем кластер. Для этого вам необходимо выяснить, какой тип инстанса EC2 использует ваш кластер. В этой статье мы будем использовать r5.4xlarge, который, согласно прейскуранту на инстансы AWS EC2, насчитывает 16 процессоров.

Когда мы запускаем наши задачи (job), нам нужно зарезервировать один процессор для операционной системы и системы управления кластерами (Cluster Manager). Поэтому мы не хотели бы задействовать под задачу сразу все 16 ЦП. Таким образом, когда Spark производит вычисления, на каждом узле у нас остается только 15 доступных для аллоцирования ЦП.

Количество ЦП на исполнителя

Теперь, когда мы узнали, сколько ЦП доступно для использования на каждом узле, нам нужно определить, сколько ядер (core) Spark мы хотим назначить каждому исполнителю. С помощью базовой математики (X * Y = 15), мы можем посчитать, что существует четыре различных комбинации ядер и исполнителей, которые могут подойти для нашего случая с 15 ядрам Spark на узел:

Возможные конфигурации исполнителейВозможные конфигурации исполнителей

Давайте исследуем целесообразность каждой из этих конфигураций.

Один исполнитель с пятнадцатью ядрами

Самое очевидное решение, которое приходит на ум, - создать одного исполнителя с 15 ядрами. Проблема с большими жирными исполнителями, подобными этому, заключается в том, что исполнитель, поддерживающий такое количество ядер, обычно будет иметь настолько большой пул памяти (64 ГБ+), что задержки на сборку мусора будут неоправданно замедлять вашу работу. Поэтому мы сразу исключаем эту конфигурацию.

Пятнадцать одноядерных исполнителей

Следующее очевидное решение, которое приходит на ум создать 15 исполнителей, каждый из которых имеет только одно ядро. Проблема здесь в том, что одноядерные исполнители неэффективны, потому что они не используют преимуществ параллелизма, которые обеспечивают несколько ядер внутри одного исполнителя. Кроме того, найти оптимальный объем служебной памяти для одноядерных исполнителей может быть достаточно сложно. Давайте немного поговорим о накладных расходах памяти.

Накладные расходы памяти для исполнителя по умолчанию составляют 10% от размера выделенной вашему исполнителю памяти или 384 MB (в зависимости от того, что больше). Однако на некоторых big data платформах, таких как Qubole, накладные расходы зафиксированы на определенном значении по умолчанию, вне зависимости от размера вашего исполнителя. Вы можете проверить ваш показатель накладных расходов, перейдя во вкладку Environments в логе Spark и выполнив поиск параметра spark.executor.memoryOverhead.

Накладные расходы памяти по умолчанию в Spark будут очень маленьким, что результирует в проблемах с вашими задачами. С другой стороны, фиксированное значение накладных расходов для всех исполнителей приведет к слишком большому объему служебной памяти и, следовательно, оставит меньше места самим исполнителям. Выверить идеальный размер служебной памяти сложно, поэтому это еще одна причина, по которой одноядерный исполнитель нам не подходит.

Пять исполнителей с тремя ядрами или три исполнителя с пятью ядрами

Итак, у нас осталось два варианта. Большинство руководств по настройке Spark сходятся во мнении, что 5 ядер на исполнителя это оптимальное количество ядер с точки зрения параллельной обработки. И я тоже пришел к выводу, что это правда, в том числе, на основании моих собственных изысканий в оптимизации. Еще одно преимущество использования пятиядерных исполнителей по сравнению с трехъядерными заключается в том, что меньшее количество исполнителей на узел требует меньшее количество служебной памяти. Поэтому мы остановимся на пятиядерных исполнителях, чтобы минимизировать накладные расходы памяти на узле и максимизировать параллелизм внутри каждого исполнителя.

--executor-cores 5

Объем памяти на узел

Наш следующий шаг определить, сколько памяти назначить каждому исполнителю. Прежде чем мы сможем это сделать, мы должны определить, сколько физической памяти на нашем узле нам вообще доступно. Это важно, потому что физическая память это жесткое ограничение для ваших исполнителей. Если вы знаете, какой инстанс EC2 используете, значит, вы знаете и общий объем памяти, доступной на узле. Про наш инстанс r5.4xlarge AWS сообщает, что у него 128 ГБ доступной памяти.

Однако доступными для использования вашими исполнителями будут не все 128 ГБ, так как память нужно будет выделить также и для вашей системы управления кластерами. На рисунке ниже показано, где в YARN искать сколько памяти доступно для использования после того, как память была выделена для системы управления кластерами.

Мы видим, что на узлах этого кластера исполнителям доступно 112 ГБ.

Объем памяти на исполнителя

Если мы хотим, чтобы три исполнителя использовали 112 ГБ доступной памяти, то нам следует определить оптимальный размер памяти для каждого исполнителя. Чтобы вычислить объем памяти доступной исполнителю, мы попросту делим доступную память на 3. Затем мы вычитаем накладные расходы на память и округляем до ближайшего целого числа.

Если служебная память у вас фиксированная (как в случае с Qubole), вы будете использовать эту формулу. (112/3) = 372,3 = 34,7 = 34.

Если вы используете дефолтный метод Spark для расчета накладных расходов на память, вы будете использовать эту формулу. (112/3) = 37 / 1,1 = 33,6 = 33.

В оставшейся части этой статьи мы будем использовать фиксированный объем накладных расходов памяти для Qubole.

--executor-memory 34G

Чтобы по настоящему начать экономить, опытным тюнерам Spark необходим следующий сдвиг в парадигме. Я рекомендую вам в ваших исполнителях для всех задач использовать фиксированные размер памяти и количество ядер. Понимаю, что использование фиксированной конфигурации исполнителей для большинства задач Spark кажется противоречащим надлежащей практике тюнинга Spark. Даже если вы настроены скептически, я прошу вас опробовать эту стратегию, чтобы убедиться, что она работает. Выясните, как рассчитать затраты на выполнение вашей задачи, как описано в Части 2, а затем проверьте это на практике. Считаю, что если вы это сделаете, вы обнаружите, что единственный способ добиться эффективной экономичности облачных затрат это использовать фиксированные размеры памяти для ваших исполнителей, которые оптимально используют ЦП.

С учетом вышесказанного, если при использовании эффективного объема памяти для ваших исполнителей у вас остается много неиспользуемой памяти, подумайте о переносе вашего процесса на другой тип инстанса EC2, у которого меньше памяти на ЦП узла. Этот инстанс обойдется вам дешевле и, следовательно, поможет снизить стоимость выполнения вашей задачи.

Наконец, будут моменты, когда эта экономичная конфигурация не будет обеспечивать достаточную пропускную способность для ваших данных в вашем исполнителе. В примерах, приведенных во второй части, было несколько задач, в которых мне приходилось уходить от использования оптимального размера памяти, потому что нагрузка на память была максимальной во время всего выполнения задачи.

В этом руководстве я все же рекомендую вам начинать с оптимального размера памяти при переносе ваших задач. Если при оптимальной конфигурации исполнителей у вас возникают ошибки памяти, я поделюсь конфигурациями, которые избегают этих ошибок позже в Части 5.

Количество исполнителей на задачу

Теперь, когда мы определились с конфигурацией исполнителей, мы готовы настроить количество исполнителей, которые мы хотим использовать для нашей задачи. Помните, что наша цель - убедиться, что используются все 15 доступных ЦП на узел, что означает, что мы хотим, чтобы каждому узлу было назначено по три исполнителя. Если мы установим количество наших исполнителей кратное 3, мы добьемся своей цели.

Однако с такой конфигурацией есть одна проблема. Нам также нужно назначить драйвер для обработки всех исполнителей в узле. Если мы используем количество исполнителей, кратное 3, то наш одноядерный драйвер будет размещен в своем собственном 16-ядерном узле, что означает, что аж 14 ядер на этом последнем узле не будут использоваться в течение всего выполнения задачи. Это не очень хорошая практика использования облака!

Мысль, которую я хочу здесь донести, заключается в том, что идеальное количество исполнителей должно быть кратным 3 минус один исполнитель, чтобы освободить место для нашего драйвера.

--num-executors (3x - 1)

В Части 4 я дам вам рекомендации о том, сколько исполнителей вы должны использовать при переносе существующей задачи в экономичную конфигурацию исполнителей.

Объем памяти на драйвер

Обычной практикой для data-инженеров является выделение относительно небольшого размера памяти под драйвер по сравнению с исполнителями. Однако AWS на самом деле рекомендует устанавливать размер памяти вашего драйвера таким же, как и у ваших исполнителей. Я обнаружил, что это также очень помогает при оптимизации затрат.

--driver-memory 34G

В редких случаях могут возникать ситуации, когда вам нужен драйвер, память которого больше, чем у исполнителя. В таких случаях устанавливайте размер памяти драйвера в 2 раза больше памяти исполнителя, а затем используйте формулу (3x - 2), чтобы определить количество исполнителей для вашей задачи.

Количество ядер на драйвер

По умолчанию количество ядер на драйвер равно одному. Однако я обнаружил, что задачи, использующие более 500 ядер Spark, могут повысить производительность, если количество ядер на драйвер установлено в соответствии с количеством ядер на исполнителя. Однако не стоит сразу менять дефолтное количество ядер в драйвере. Просто протестируйте это на своих наиболее крупных задачах, чтобы увидеть, ощутите ли вы прирост производительности.

--driver-cores 5

Конфигурация универсальна?

Таким образом, конфигурация исполнителей, которую я рекомендую для узла с 16 процессорами и 128 ГБ памяти, будет выглядеть следующим образом.

--driver-memory 34G --executor-memory 34G --num-executors (3x - 1) --executor-cores 5

Но помните:

Не существует универсальных конфигураций просто продолжайте экспериментировать и рано или поздно вы найдете конфигурацию, идеально подходящую для ваших задач.

Как я уже упоминал выше, эта конфигурация может выглядеть не подходящей для ваших конкретных нужд. Я рекомендую вам использовать эту конфигурацию в качестве отправной точки в процессе оптимизации затрат. Если в этой конфигурации у вас возникают проблемы с памятью, в следующих частях этой серии я порекомендую вам конфигурации, которые позволят вам решить большинство проблем с памятью, возникающих при переходе на экономичные конфигурации.

Поскольку конфигурация узла, используемая в этой статье, достаточно распространена в Expedia Group , я буду ссылаться на нее в остальной части серии. Если ваши узлы имеют другой размер, вам следует использовать метод, который я изложил здесь, чтобы определить идеальную конфигурацию.

Теперь, когда у вас есть оптимальная экономичная конфигурация исполнителей, вы можете попробовать перенести на нее текущие задачи. Но какие задачи вам следует перенести в первую очередь? И сколько исполнителей вы должны запустить с этой новой конфигурацией? А что произойдет, если задача с оптимизированной стоимостью выполняется дольше, чем неоптимизированная задача? И уместно ли когда-либо избыточное использование ЦП? Я отвечу на эти вопросы в Части 4: Как перенести существующие задачи Apache Spark на экономичные конфигурации исполнителей.


Подробнее о курсе "Экосистема Hadoop, Spark, Hive" можно узнать здесь. Также можно посмотреть запись открытого урока "Spark 3.0: что нового?".

Читать ещё:

Подробнее..

Перевод Цепочка пользовательских преобразований DataFrame в Spark

13.05.2021 16:14:30 | Автор: admin

Перевод материала подготовлен в рамках набора студентов на онлайн-курс Экосистема Hadoop, Spark, Hive.

Всех желающих приглашаем на открытый вебинар Тестирование Spark приложений. На этом открытом уроке рассмотрим проблемы в тестировании Spark приложений: стат данные, частичную проверку и запуск/остановку тяжелых систем. Изучим библиотеки для решения и напишем тесты. Присоединяйтесь!


Для цепочки преобразований DataFrame в Spark можно использовать implicit classes или метод Dataset#transform. В этой статье блога будет продемонстрировано, как выстраивать цепочки преобразований DataFrame, и объяснено, почему метод Dataset#transform предпочтительнее, чем implicit classes.

Структурирование кода Spark в виде преобразований DataFrame отличает сильных программистов Spark от "спагетти-хакеров", как подробно описано в статье "Написание идеального кода Spark (Writing Beautiful Spark Code)". После публикации в блоге, ваш код Spark будет намного проще тестировать и повторно использовать.

Если вы используете PySpark, смотрите эту статью о цепочке пользовательских преобразований PySpark DataFrame.

Метод transform (преобразования) набора данных

Метод transform (преобразования) набора данных предоставляет "краткий синтаксис для цепочки пользовательских преобразований".

Предположим, у нас есть метод withGreeting(), который добавляет столбец приветствия к DataFrame, и метод withFarewell(), который добавляет столбец прощания к DataFrame.

def withGreeting(df: DataFrame): DataFrame = {  df.withColumn("greeting", lit("hello world"))}def withFarewell(df: DataFrame): DataFrame = {  df.withColumn("farewell", lit("goodbye"))}

Мы можем использовать метод transform (преобразования) для запуска методов withGreeting() и withFarewell().

val df = Seq(  "funny",  "person").toDF("something")val weirdDf = df  .transform(withGreeting)  .transform(withFarewell)
weirdDf.show()+---------+-----------+--------+|something|   greeting|farewell|+---------+-----------+--------+|    funny|hello world| goodbye||   person|hello world| goodbye|+---------+-----------+--------+

Метод transform (преобразования) можно легко объединить со встроенными методами Spark DataFrame, такими как select.

df  .select("something")  .transform(withGreeting)  .transform(withFarewell)

Если метод transform (преобразования) не используется, то нам придется вложить вызовы методов, и код станет менее читабельным.

withFarewell(withGreeting(df))// even worsewithFarewell(withGreeting(df)).select("something")

Метод transform (преобразования) c аргументами

Пользовательские преобразования DataFrame, использующие аргументы, также могут использовать метод transform (преобразования), используя карринг / списки с несколькими параметрами в Scala.

Давайте воспользуемся тем же методом withGreeting(), что и ранее, и добавим метод withCat(), который принимает в качестве аргумента строку.

def withGreeting(df: DataFrame): DataFrame = {  df.withColumn("greeting", lit("hello world"))}def withCat(name: String)(df: DataFrame): DataFrame = {  df.withColumn("cats", lit(s"$name meow"))}

Мы можем использовать метод transform (преобразования) для запуска методов withGreeting() и withCat().

val df = Seq(  "funny",  "person").toDF("something")val niceDf = df  .transform(withGreeting)  .transform(withCat("puffy"))
niceDf.show()+---------+-----------+----------+|something|   greeting|      cats|+---------+-----------+----------+|    funny|hello world|puffy meow||   person|hello world|puffy meow|+---------+-----------+----------+

Метод transform (преобразования) можно использовать для пользовательских преобразований DataFrame, которые также могут использовать аргументы!

Манкипатчинг с помощью неявных классов (Implicit Classes)

Неявные классы можно использовать для добавления методов в существующие классы. Следующий код добавляет те же методы withGreeting() и withFarewell() к самому классу DataFrame.

object BadImplicit {  implicit class DataFrameTransforms(df: DataFrame) {    def withGreeting(): DataFrame = {      df.withColumn("greeting", lit("hello world"))    }    def withFarewell(): DataFrame = {      df.withColumn("farewell", lit("goodbye"))    }  }}

Методы withGreeting() и withFarewell() можно объединить в цепочку и выполнить следующим образом.

import BadImplicit._val df = Seq(  "funny",  "person").toDF("something")val hiDf = df.withGreeting().withFarewell()

Расширение основных классов работает, но это плохая программистская практика, которой следует избегать.

Избегание неявных классов

Изменение базовых классов известно как манкипатчинг и является восхитительной особенностью Ruby, но может быть рискованным в неопытных руках.
- Санди Метц

Комментарий Санди был адресован языку программирования Ruby, но тот же принцип применим и к неявным классам Scala.

Манкипатчинг обычно не приветствуется в сообществе Ruby, и его следует избегать в Scala.

Spark был достаточно любезен, чтобы предоставить метод transform (преобразования), и вам не потребуется манкипатчинг для класса DataFrame. С помощью некоторых приемов программирования на Scala мы даже можем заставить метод transform работать с пользовательскими преобразованиями, которые могут использовать аргументы. Это делает метод transform явным победителем!


Подробнее о курсе: Экосистема Hadoop, Spark, Hive

Смотреть демо-урок: Тестирование Spark приложений

Подробнее..

Тестирование в Apache Spark Structured Streaming

02.01.2021 20:04:09 | Автор: admin

Введение


На текущий момент не так много примеров тестов для приложений на основе Spark Structured Streaming. Поэтому в данной статье приводятся базовые примеры тестов с подробным описанием.


Все примеры используют: Apache Spark 3.0.1.


Подготовка


Необходимо установить:


  • Apache Spark 3.0.x
  • Python 3.7 и виртуальное окружение для него
  • Conda 4.y
  • scikit-learn 0.22.z
  • Maven 3.v
  • В примерах для Scala используется версия 2.12.10.

  1. Загрузить Apache Spark
  2. Распаковать: tar -xvzf ./spark-3.0.1-bin-hadoop2.7.tgz
  3. Создать окружение, к примеру, с помощью conda: conda create -n sp python=3.7

Необходимо настроить переменные среды. Здесь приведен пример для локального запуска.


SPARK_HOME=/Users/$USER/Documents/spark/spark-3.0.1-bin-hadoop2.7PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip;

Тесты


Пример с scikit-learn


При написании тестов необходимо разделять код таким образом, чтобы можно было изолировать логику и реальное применение конечного API. Хороший пример изоляции: DataFrame-pandas, DataFrame-spark.


Для написания тестов будет использоваться следующий пример: LinearRegression.


Итак, пусть код для тестирования использует следующий "шаблон" для Python:


class XService:    def __init__(self):        # Инициализация    def train(self, ds):        # Обучение    def predict(self, ds):        # Предсказание и вывод результатов

Для Scala шаблон выглядит соответственно.


Полный пример:


from sklearn import linear_modelclass LocalService:    def __init__(self):        self.model = linear_model.LinearRegression()    def train(self, ds):        X, y = ds        self.model.fit(X, y)    def predict(self, ds):        r = self.model.predict(ds)        print(r)

Тест.


Импорт:


import unittestimport numpy as np

Основной класс:


class RunTest(unittest.TestCase):

Запуск тестов:


if __name__ == "__main__":    unittest.main()

Подготовка данных:


X = np.array([    [1, 1],  # 6    [1, 2],  # 8    [2, 2],  # 9    [2, 3]  # 11])y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3

Создание модели и обучение:


service = local_service.LocalService()service.train((X, y))

Получение результатов:


service.predict(np.array([[3, 5]]))service.predict(np.array([[4, 6]]))

Ответ:


[16.][19.]

Все вместе:


import unittestimport numpy as npfrom spark_streaming_pp import local_serviceclass RunTest(unittest.TestCase):    def test_run(self):        # Prepare data.        X = np.array([            [1, 1],  # 6            [1, 2],  # 8            [2, 2],  # 9            [2, 3]  # 11        ])        y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3        # Create model and train.        service = local_service.LocalService()        service.train((X, y))        # Predict and results.        service.predict(np.array([[3, 5]]))        service.predict(np.array([[4, 6]]))        # [16.]        # [19.]if __name__ == "__main__":    unittest.main()

Пример с Spark и Python


Будет использован аналогичный алгоритм LinearRegression. Нужно отметить, что Structured Streaming основан на тех же DataFrame-х, которые используются и в Spark Sql. Но как обычно есть нюансы.


Инициализация:


self.service = LinearRegression(maxIter=10, regParam=0.01)self.model = None

Обучение:


self.model = self.service.fit(ds)

Получение результатов:


transformed_ds = self.model.transform(ds)q = transformed_ds.select("label", "prediction").writeStream.format("console").start()return q

Все вместе:


from pyspark.ml.regression import LinearRegressionclass StructuredStreamingService:    def __init__(self):        self.service = LinearRegression(maxIter=10, regParam=0.01)        self.model = None    def train(self, ds):        self.model = self.service.fit(ds)    def predict(self, ds):        transformed_ds = self.model.transform(ds)        q = transformed_ds.select("label", "prediction").writeStream.format("console").start()        return q

Сам тест.


Обычно в тестах можно использовать данные, которые создаются прямо в тестах.


train_ds = spark.createDataFrame([    (6.0, Vectors.dense([1.0, 1.0])),    (8.0, Vectors.dense([1.0, 2.0])),    (9.0, Vectors.dense([2.0, 2.0])),    (11.0, Vectors.dense([2.0, 3.0]))],    ["label", "features"])

Это очень удобно и код получается компактным.


Но подобный код, к сожалению, не будет работать в Structured Streaming, т.к. созданный DataFrame не будет обладать нужными свойствами, хотя и будет соответствовать контракту DataFrame.
На текущий момент для создания источников для тестов можно использовать такой же подход, что и в тестах для Spark.


def test_stream_read_options_overwrite(self):    bad_schema = StructType([StructField("test", IntegerType(), False)])    schema = StructType([StructField("data", StringType(), False)])    df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \        .schema(bad_schema)\        .load(path='python/test_support/sql/streaming', schema=schema, format='text')    self.assertTrue(df.isStreaming)    self.assertEqual(df.schema.simpleString(), "struct<data:string>")

И так.


Создается контекст для работы:


spark = SparkSession.builder.enableHiveSupport().getOrCreate()spark.sparkContext.setLogLevel("ERROR")

Подготовка данных для обучения (можно сделать обычным способом):


train_ds = spark.createDataFrame([    (6.0, Vectors.dense([1.0, 1.0])),    (8.0, Vectors.dense([1.0, 2.0])),    (9.0, Vectors.dense([2.0, 2.0])),    (11.0, Vectors.dense([2.0, 3.0]))],    ["label", "features"])

Обучение:


service = structure_streaming_service.StructuredStreamingService()service.train(train_ds)

Получение результатов. Для начала считываем данные из файла и выделяем: признаки и идентификатор для объектов. После запускаем предсказание с ожиданием в 3 секунды.


def extract_features(x):    values = x.split(",")    features_ = []    for i in values[1:]:        features_.append(float(i))    features = Vectors.dense(features_)    return featuresextract_features_udf = udf(extract_features, VectorUDT())def extract_label(x):    values = x.split(",")    label = float(values[0])    return labelextract_label_udf = udf(extract_label, FloatType())predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \    .withColumn("features", extract_features_udf(col("value"))) \    .withColumn("label", extract_label_udf(col("value")))service.predict(predict_ds).awaitTermination(3)

Ответ:


15.9669918.96138

Все вместе:


import unittestimport warningsfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, udffrom pyspark.sql.types import FloatTypefrom pyspark.ml.linalg import Vectors, VectorUDTfrom spark_streaming_pp import structure_streaming_serviceclass RunTest(unittest.TestCase):    def test_run(self):        spark = SparkSession.builder.enableHiveSupport().getOrCreate()        spark.sparkContext.setLogLevel("ERROR")        # Prepare data.        train_ds = spark.createDataFrame([            (6.0, Vectors.dense([1.0, 1.0])),            (8.0, Vectors.dense([1.0, 2.0])),            (9.0, Vectors.dense([2.0, 2.0])),            (11.0, Vectors.dense([2.0, 3.0]))        ],            ["label", "features"]        )        # Create model and train.        service = structure_streaming_service.StructuredStreamingService()        service.train(train_ds)        # Predict and results.        def extract_features(x):            values = x.split(",")            features_ = []            for i in values[1:]:                features_.append(float(i))            features = Vectors.dense(features_)            return features        extract_features_udf = udf(extract_features, VectorUDT())        def extract_label(x):            values = x.split(",")            label = float(values[0])            return label        extract_label_udf = udf(extract_label, FloatType())        predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \            .withColumn("features", extract_features_udf(col("value"))) \            .withColumn("label", extract_label_udf(col("value")))        service.predict(predict_ds).awaitTermination(3)        # +-----+------------------+        # |label|        prediction|        # +-----+------------------+        # |  1.0|15.966990887541273|        # |  2.0|18.961384020443553|        # +-----+------------------+    def setUp(self):        warnings.filterwarnings("ignore", category=ResourceWarning)        warnings.filterwarnings("ignore", category=DeprecationWarning)if __name__ == "__main__":    unittest.main()

Нужно отметить, что для Scala можно воспользоваться созданием потока в памяти.
Это может выглядеть вот так:


implicit val sqlCtx = spark.sqlContextimport spark.implicits._val source = MemoryStream[Record]source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))val predictDs = source.toDF()service.predict(predictDs).awaitTermination(2000)

Полный пример на Scala (здесь, для разнообразия, не используется sql):


package aaa.abc.dd.spark_streaming_pr.clusterimport org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}import org.apache.spark.sql.DataFrameimport org.apache.spark.sql.functions.udfimport org.apache.spark.sql.streaming.StreamingQueryclass StructuredStreamingService {  var service: LinearRegression = _  var model: LinearRegressionModel = _  def train(ds: DataFrame): Unit = {    service = new LinearRegression().setMaxIter(10).setRegParam(0.01)    model = service.fit(ds)  }  def predict(ds: DataFrame): StreamingQuery = {    val m = ds.sparkSession.sparkContext.broadcast(model)    def transformFun(features: org.apache.spark.ml.linalg.Vector): Double = {      m.value.predict(features)    }    val transform: org.apache.spark.ml.linalg.Vector => Double = transformFun    val toUpperUdf = udf(transform)    val predictionDs = ds.withColumn("prediction", toUpperUdf(ds("features")))    predictionDs      .writeStream      .foreachBatch((r: DataFrame, i: Long) => {        r.show()        // scalastyle:off println        println(s"$i")        // scalastyle:on println      })      .start()  }}

Тест:


package aaa.abc.dd.spark_streaming_pr.clusterimport org.apache.spark.ml.linalg.Vectorsimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.execution.streaming.MemoryStreamimport org.scalatest.{Matchers, Outcome, fixture}class StructuredStreamingServiceSuite extends fixture.FunSuite with Matchers {  test("run") { spark =>    // Prepare data.    val trainDs = spark.createDataFrame(Seq(      (6.0, Vectors.dense(1.0, 1.0)),      (8.0, Vectors.dense(1.0, 2.0)),      (9.0, Vectors.dense(2.0, 2.0)),      (11.0, Vectors.dense(2.0, 3.0))    )).toDF("label", "features")    // Create model and train.    val service = new StructuredStreamingService()    service.train(trainDs)    // Predict and results.    implicit val sqlCtx = spark.sqlContext    import spark.implicits._    val source = MemoryStream[Record]    source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))    source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))    val predictDs = source.toDF()    service.predict(predictDs).awaitTermination(2000)    // +-----+---------+------------------+    // |label| features|        prediction|    // +-----+---------+------------------+    // |  1.0|[3.0,5.0]|15.966990887541273|    // |  2.0|[4.0,6.0]|18.961384020443553|    // +-----+---------+------------------+  }  override protected def withFixture(test: OneArgTest): Outcome = {    val spark = SparkSession.builder().master("local[2]").getOrCreate()    try withFixture(test.toNoArgTest(spark))    finally spark.stop()  }  override type FixtureParam = SparkSession  case class Record(label: Double, features: org.apache.spark.ml.linalg.Vector)}

Выводы


При написании тестов необходимо разделять код таким образом, чтобы разделять логику и применение конкретных вызовов API. Можно использоваться любые доступные источники. В том числе и kafka.


Такие абстракции как DataFrame позволяют это сделать легко и просто.


При использовании Python данные придется хранить в файлах.


Ссылки и ресурсы


Подробнее..
Категории: Scala , Python , Testing , Apache , Spark , Apache spark , Kafka , Streaming

Запускаем Apache Spark на Kubernetes

20.07.2020 16:23:27 | Автор: admin

Дорогие читатели, доброго дня. Сегодня поговорим немного про Apache Spark и его перспективы развития.





В современном мире Big Data Apache Spark является де факто стандартом при разработке задач пакетной обработки данных. Помимо этого, он также используется для создания стриминговых приложений, работающих в концепции micro batch, обрабатывающих и отгружающих данные маленькими порциями (Spark Structured Streaming). И традиционно он являлся частью общего стека Hadoop, используя в качестве менеджера ресурсов YARN (или, в некоторых случаях, Apache Mesos). К 2020 году его использование в традиционном виде для большинства компаний находится под большим вопросом в виду отсутствия приличных дистрибутивов Hadoop развитие HDP и CDH остановлено, CDH недостаточно проработан и имеет высокую стоимость, а остальные поставщики Hadoop либо прекратили своё существование, либо имеют туманное будущее. Поэтому всё больший интерес у сообщества и крупных компаний вызывает запуск Apache Spark с помощью Kubernetes став стандартом в оркестрации контейнеров и управлении ресурсами в приватных и публичных облаках, он решает проблему с неудобным планированием ресурсов задач Spark на YARN и предоставляет стабильно развивающуюся платформу с множеством коммерческих и открытых дистрибутивов для компаний всех размеров и мастей. К тому же на волне популярности большинство уже успело обзавестись парой-тройкой своих инсталляций и нарастить экспертизу в его использовании, что упрощает переезд.


Начиная с версии 2.3.0 Apache Spark обзавёлся официальной поддержкой запуска задач в кластере Kubernetes и сегодня, мы поговорим о текущей зрелости данного подхода, различных вариантах его использования и подводных камнях, с которыми предстоит столкнуться при внедрении.


Прежде всего, рассмотрим процесс разработки задач и приложений на базе Apache Spark и выделим типовые случаи, в которых требуется запустить задачу на кластере Kubernetes. При подготовке данного поста в качестве дистрибутива используется OpenShift и будут приведены команды, актуальные для его утилиты командной строки (oc). Для других дистрибутивов Kubernetes могут быть использованы соответствующие команды стандартной утилиты командной строки Kubernetes (kubectl) либо их аналоги (например, для oc adm policy).



Первый вариант использования spark-submit



В процессе разработки задач и приложений разработчику требуется запускать задачи для отладки трансформации данных. Теоретически для этих целей могут быть использованы заглушки, но разработка с участием реальных (пусть и тестовых) экземпляров конечных систем, показала себя в этом классе задач быстрее и качественнее. В том случае, когда мы производим отладку на реальных экземплярах конечных систем, возможны два сценария работы:


  • разработчик запускает задачу Spark локально в режиме standalone;

  • разработчик запускает задачу Spark на кластере Kubernetes в тестовом контуре.


Первый вариант имеет право на существование, но влечёт за собой ряд недостатков:


  • для каждого разработчика требуется обеспечить доступ с рабочего места до всех необходимых ему экземпляров конечных систем;
  • на рабочей машине требуется достаточное количество ресурсов для запуска разрабатываемой задачи.

Второй вариант лишён данных недостатков, поскольку использование кластера Kubernetes позволяет выделить необходимый пул ресурсов для запуска задач и обеспечить для него необходимые доступы к экземплярам конечных систем, гибко предоставляя к нему доступ с помощью ролевой модели Kubernetes для всех членов команды разработки. Выделим его в качестве первого варианта использования запуск задач Spark с локальной машины разработчика на кластере Kubernetes в тестовом контуре.


Расскажем подробнее о процессе настройки Spark для локального запуска. Чтобы начать пользоваться Spark его требуется установить:


mkdir /opt/sparkcd /opt/sparkwget http://mirror.linux-ia64.org/apache/spark/spark-2.4.5/spark-2.4.5.tgztar zxvf spark-2.4.5.tgzrm -f spark-2.4.5.tgz

Собираем необходимые пакеты для работы с Kubernetes:


cd spark-2.4.5/./build/mvn -Pkubernetes -DskipTests clean package

Полная сборка занимает много времени, а для создания образов Docker и их запуска на кластере Kubernetes в реальности нужны только jar файлы из директории assembly/, поэтому можно собрать только данный подпроект:


./build/mvn -f ./assembly/pom.xml -Pkubernetes -DskipTests clean package

Для запуска задач Spark в Kubernetes требуется создать образ Docker, который будет использоваться в качестве базового. Здесь возможны 2 подхода:


  • Созданный образ Docker включает в себя исполняемый код задачи Spark;
  • Созданный образ включает в себя только Spark и необходимые зависимости, исполняемый код размещается удалённо (например, в HDFS).

Для начала соберём образ Docker, содержащий тестовый пример задачи Spark. Для создания образов Docker у Spark есть соответствующая утилита под названием docker-image-tool. Изучим по ней справку:


./bin/docker-image-tool.sh --help

С её помощью можно создавать образы Docker и осуществлять их загрузку в удалённые реестры, но по умолчанию она имеет ряд недостатков:


  • в обязательном порядке создаёт сразу 3 образа Docker под Spark, PySpark и R;
  • не позволяет указать имя образа.

Поэтому мы будем использовать модифицированный вариант данной утилиты, приведённый ниже:


vi bin/docker-image-tool-upd.sh

#!/usr/bin/env bashfunction error {  echo "$@" 1>&2  exit 1}if [ -z "${SPARK_HOME}" ]; then  SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi. "${SPARK_HOME}/bin/load-spark-env.sh"function image_ref {  local image="$1"  local add_repo="${2:-1}"  if [ $add_repo = 1 ] && [ -n "$REPO" ]; then    image="$REPO/$image"  fi  if [ -n "$TAG" ]; then    image="$image:$TAG"  fi  echo "$image"}function build {  local BUILD_ARGS  local IMG_PATH  if [ ! -f "$SPARK_HOME/RELEASE" ]; then    IMG_PATH=$BASEDOCKERFILE    BUILD_ARGS=(      ${BUILD_PARAMS}      --build-arg      img_path=$IMG_PATH      --build-arg      datagram_jars=datagram/runtimelibs      --build-arg      spark_jars=assembly/target/scala-$SPARK_SCALA_VERSION/jars    )  else    IMG_PATH="kubernetes/dockerfiles"    BUILD_ARGS=(${BUILD_PARAMS})  fi  if [ -z "$IMG_PATH" ]; then    error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."  fi  if [ -z "$IMAGE_REF" ]; then    error "Cannot find docker image reference. Please add -i arg."  fi  local BINDING_BUILD_ARGS=(    ${BUILD_PARAMS}    --build-arg    base_img=$(image_ref $IMAGE_REF)  )  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/docker/Dockerfile"}  docker build $NOCACHEARG "${BUILD_ARGS[@]}" \    -t $(image_ref $IMAGE_REF) \    -f "$BASEDOCKERFILE" .}function push {  docker push "$(image_ref $IMAGE_REF)"}function usage {  cat <<EOFUsage: $0 [options] [command]Builds or pushes the built-in Spark Docker image.Commands:  build       Build image. Requires a repository address to be provided if the image will be              pushed to a different registry.  push        Push a pre-built image to a registry. Requires a repository address to be provided.Options:  -f file               Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.  -p file               Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.  -R file               Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.  -r repo               Repository address.  -i name               Image name to apply to the built image, or to identify the image to be pushed.    -t tag                Tag to apply to the built image, or to identify the image to be pushed.  -m                    Use minikube's Docker daemon.  -n                    Build docker image with --no-cache  -b arg      Build arg to build or push the image. For multiple build args, this option needs to              be used separately for each build arg.Using minikube when building images will do so directly into minikube's Docker daemon.There is no need to push the images into minikube in that case, they'll be automaticallyavailable when running applications inside the minikube cluster.Check the following documentation for more information on using the minikube Docker daemon:  https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemonExamples:  - Build image in minikube with tag "testing"    $0 -m -t testing build  - Build and push image with tag "v2.3.0" to docker.io/myrepo    $0 -r docker.io/myrepo -t v2.3.0 build    $0 -r docker.io/myrepo -t v2.3.0 pushEOF}if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then  usage  exit 0fiREPO=TAG=BASEDOCKERFILE=NOCACHEARG=BUILD_PARAMS=IMAGE_REF=while getopts f:mr:t:nb:i: optiondo case "${option}" in f) BASEDOCKERFILE=${OPTARG};; r) REPO=${OPTARG};; t) TAG=${OPTARG};; n) NOCACHEARG="--no-cache";; i) IMAGE_REF=${OPTARG};; b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};; esacdonecase "${@: -1}" in  build)    build    ;;  push)    if [ -z "$REPO" ]; then      usage      exit 1    fi    push    ;;  *)    usage    exit 1    ;;esac

С её помощью собираем базовый образ Spark, содержащий в себе тестовую задачу для вычисления числа Pi с помощью Spark (здесь {docker-registry-url} URL вашего реестра образов Docker, {repo} имя репозитория внутри реестра, совпадающее с проектом в OpenShift, {image-name} имя образа (если используется трёхуровневое разделение образов, например, как в интегрированном реестре образов Red Hat OpenShift), {tag} тег данной версии образа):


./bin/docker-image-tool-upd.sh -f resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile -r {docker-registry-url}/{repo} -i {image-name} -t {tag} build

Авторизуемся в кластере OKD с помощью консольной утилиты (здесь {OKD-API-URL} URL API кластера OKD):


oc login {OKD-API-URL}

Получим токен текущего пользователя для авторизации в Docker Registry:


oc whoami -t

Авторизуемся во внутреннем Docker Registry кластера OKD (в качестве пароля используем токен, полученный с помощью предыдущей команды):


docker login {docker-registry-url}

Загрузим собранный образ Docker в Docker Registry OKD:


./bin/docker-image-tool-upd.sh -r {docker-registry-url}/{repo} -i {image-name} -t {tag} push

Проверим, что собранный образ доступен в OKD. Для этого откроем в браузере URL со списком образов соответствующего проекта (здесь {project} имя проекта внутри кластера OpenShift, {OKD-WEBUI-URL} URL Web консоли OpenShift) https://{OKD-WEBUI-URL}/console/project/{project}/browse/images/{image-name}.


Для запуска задач должен быть создан сервисный аккаунт с привилегиями запуска подов под root (в дальнейшем обсудим этот момент):


oc create sa spark -n {project}oc adm policy add-scc-to-user anyuid -z spark -n {project}

Выполним команду spark-submit для публикации задачи Spark в кластере OKD, указав созданный сервисный аккаунт и образ Docker:


 /opt/spark/bin/spark-submit --name spark-test --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar

Здесь:
--name имя задачи, которое будет участвовать в формировании имени подов Kubernetes;

--class класс исполняемого файла, вызываемый при запуске задачи;

--conf конфигурационные параметры Spark;

spark.executor.instances количество запускаемых экзекьюторов Spark;

spark.kubernetes.authenticate.driver.serviceAccountName имя служебной учётной записи Kubernetes, используемой при запуске подов (для определения контекста безопасности и возможностей при взаимодействии с API Kubernetes);

spark.kubernetes.namespace пространство имён Kubernetes, в котором будут запускаться поды драйвера и экзекьютеров;

spark.submit.deployMode способ запуска Spark (для стандартного spark-submit используется cluster, для Spark Operator и более поздних версий Spark client);

spark.kubernetes.container.image образ Docker, используемый для запуска подов;

spark.master URL API Kubernetes (указывается внешний так обращение происходит с локальной машины);

local:// путь до исполняемого файла Spark внутри образа Docker.

Переходим в соответствующий проект OKD и изучаем созданные поды https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods.


Для упрощения процесса разработки может быть использован ещё один вариант, при котором создаётся общий базовый образ Spark, используемый всеми задачами для запуска, а снэпшоты исполняемых файлов публикуются во внешнее хранилище (например, Hadoop) и указываются при вызове spark-submit в виде ссылки. В этом случае можно запускать различные версии задач Spark без пересборки образов Docker, используя для публикации образов, например, WebHDFS. Отправляем запрос на создание файла (здесь {host} хост сервиса WebHDFS, {port} порт сервиса WebHDFS, {path-to-file-on-hdfs} желаемый путь к файлу на HDFS):


curl -i -X PUT "http://{host}:{port}/webhdfs/v1/{path-to-file-on-hdfs}?op=CREATE

При этом будет получен ответ вида (здесь {location} это URL, который нужно использовать для загрузки файла):


HTTP/1.1 307 TEMPORARY_REDIRECTLocation: {location}Content-Length: 0

Загружаем исполняемый файл Spark в HDFS (здесь {path-to-local-file} путь к исполняемому файлу Spark на текущем хосте):


curl -i -X PUT -T {path-to-local-file} "{location}"

После этого можем делать spark-submit с использованием файла Spark, загруженного на HDFS (здесь {class-name} имя класса, который требуется запустить для выполнения задачи):


/opt/spark/bin/spark-submit --name spark-test --class {class-name} --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace={project} --conf spark.submit.deployMode=cluster --conf spark.kubernetes.container.image={docker-registry-url}/{repo}/{image-name}:{tag} --conf spark.master=k8s://https://{OKD-API-URL}  hdfs://{host}:{port}/{path-to-file-on-hdfs}

При этом надо заметить, что для доступа к HDFS и обеспечения работы задачи может потребоваться изменить Dockerfile и скрипт entrypoint.sh добавить в Dockerfile директиву для копирования зависимых библиотек в директорию /opt/spark/jars и включить файл конфигурации HDFS в SPARK_CLASSPATH в entrypoint.sh.


Второй вариант использования Apache Livy


Далее, когда задача разработана и требуется протестировать полученный результат, возникает вопрос её запуска в рамках процесса CI/CD и отслеживания статусов её выполнения. Конечно, можно запускать её и с помощью локального вызова spark-submit, но это усложняет инфраструктуру CI/CD поскольку требует установку и конфигурацию Spark на агентах/раннерах CI сервера и настройки доступа к API Kubernetes. Для данного случая целевой реализацией выбрано использование Apache Livy в качестве REST API для запуска задач Spark, размещённого внутри кластера Kubernetes. С его помощью можно запускать задачи Spark на кластере Kubernetes используя обычные cURL запросы, что легко реализуемо на базе любого CI решения, а его размещение внутри кластера Kubernetes решает вопрос аутентификации при взаимодействии с API Kubernetes.



Выделим его в качестве второго варианта использования запуск задач Spark в рамках процесса CI/CD на кластере Kubernetes в тестовом контуре.


Немного про Apache Livy он работает как HTTP сервер, предоставляющий Web интерфейс и RESTful API, позволяющий удалённо запустить spark-submit, передав необходимые параметры. Традиционно он поставлялся в составе дистрибутива HDP, но также может быть развёрнут в OKD или любой другой инсталляции Kubernetes с помощью соответствующего манифеста и набора образов Docker, например, этого github.com/ttauveron/k8s-big-data-experiments/tree/master/livy-spark-2.3. Для нашего случая был собран аналогичный образ Docker, включающий в себя Spark версии 2.4.5 из следующего Dockerfile:


FROM java:8-alpineENV SPARK_HOME=/opt/sparkENV LIVY_HOME=/opt/livyENV HADOOP_CONF_DIR=/etc/hadoop/confENV SPARK_USER=sparkWORKDIR /optRUN apk add --update openssl wget bash && \    wget -P /opt https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && \    tar xvzf spark-2.4.5-bin-hadoop2.7.tgz && \    rm spark-2.4.5-bin-hadoop2.7.tgz && \    ln -s /opt/spark-2.4.5-bin-hadoop2.7 /opt/sparkRUN wget http://mirror.its.dal.ca/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && \    unzip apache-livy-0.7.0-incubating-bin.zip && \    rm apache-livy-0.7.0-incubating-bin.zip && \    ln -s /opt/apache-livy-0.7.0-incubating-bin /opt/livy && \    mkdir /var/log/livy && \    ln -s /var/log/livy /opt/livy/logs && \    cp /opt/livy/conf/log4j.properties.template /opt/livy/conf/log4j.propertiesADD livy.conf /opt/livy/confADD spark-defaults.conf /opt/spark/conf/spark-defaults.confADD entrypoint.sh /entrypoint.shENV PATH="/opt/livy/bin:${PATH}"EXPOSE 8998ENTRYPOINT ["/entrypoint.sh"]CMD ["livy-server"]

Созданный образ может быть собран и загружен в имеющийся у вас репозиторий Docker, например, внутренний репозиторий OKD. Для его развёртывания используется следующий манифест ({registry-url} URL реестра образов Docker, {image-name} имя образа Docker, {tag} тег образа Docker, {livy-url} желаемый URL, по которому будет доступен сервер Livy; манифест Route применяется в случае, если в качестве дистрибутива Kubernetes используется Red Hat OpenShift, в противном случае используется соответствующий манифест Ingress или Service типа NodePort):


---apiVersion: apps/v1kind: Deploymentmetadata:  labels:    component: livy  name: livyspec:  progressDeadlineSeconds: 600  replicas: 1  revisionHistoryLimit: 10  selector:    matchLabels:      component: livy  strategy:    rollingUpdate:      maxSurge: 25%      maxUnavailable: 25%    type: RollingUpdate  template:    metadata:      creationTimestamp: null      labels:        component: livy    spec:      containers:        - command:            - livy-server          env:            - name: K8S_API_HOST              value: localhost            - name: SPARK_KUBERNETES_IMAGE              value: 'gnut3ll4/spark:v1.0.14'          image: '{registry-url}/{image-name}:{tag}'          imagePullPolicy: Always          name: livy          ports:            - containerPort: 8998              name: livy-rest              protocol: TCP          resources: {}          terminationMessagePath: /dev/termination-log          terminationMessagePolicy: File          volumeMounts:            - mountPath: /var/log/livy              name: livy-log            - mountPath: /opt/.livy-sessions/              name: livy-sessions            - mountPath: /opt/livy/conf/livy.conf              name: livy-config              subPath: livy.conf            - mountPath: /opt/spark/conf/spark-defaults.conf              name: spark-config              subPath: spark-defaults.conf        - command:            - /usr/local/bin/kubectl            - proxy            - '--port'            - '8443'          image: 'gnut3ll4/kubectl-sidecar:latest'          imagePullPolicy: Always          name: kubectl          ports:            - containerPort: 8443              name: k8s-api              protocol: TCP          resources: {}          terminationMessagePath: /dev/termination-log          terminationMessagePolicy: File      dnsPolicy: ClusterFirst      restartPolicy: Always      schedulerName: default-scheduler      securityContext: {}      serviceAccount: spark      serviceAccountName: spark      terminationGracePeriodSeconds: 30      volumes:        - emptyDir: {}          name: livy-log        - emptyDir: {}          name: livy-sessions        - configMap:            defaultMode: 420            items:              - key: livy.conf                path: livy.conf            name: livy-config          name: livy-config        - configMap:            defaultMode: 420            items:              - key: spark-defaults.conf                path: spark-defaults.conf            name: livy-config          name: spark-config---apiVersion: v1kind: ConfigMapmetadata:  name: livy-configdata:  livy.conf: |-    livy.spark.deploy-mode=cluster    livy.file.local-dir-whitelist=/opt/.livy-sessions/    livy.spark.master=k8s://http://localhost:8443    livy.server.session.state-retain.sec = 8h  spark-defaults.conf: 'spark.kubernetes.container.image        "gnut3ll4/spark:v1.0.14"'---apiVersion: v1kind: Servicemetadata:  labels:    app: livy  name: livyspec:  ports:    - name: livy-rest      port: 8998      protocol: TCP      targetPort: 8998  selector:    component: livy  sessionAffinity: None  type: ClusterIP---apiVersion: route.openshift.io/v1kind: Routemetadata:  labels:    app: livy  name: livyspec:  host: {livy-url}  port:    targetPort: livy-rest  to:    kind: Service    name: livy    weight: 100  wildcardPolicy: None

После его применения и успешного запуска пода графический интерфейс Livy доступен по ссылке: http://{livy-url}/ui. С помощью Livy мы можем опубликовать нашу задачу Spark, используя REST запрос, например, из Postman. Пример коллекции с запросами представлен ниже (в массиве args могут быть переданы конфигурационные аргументы с переменными, необходимыми для работы запускаемой задачи):


{    "info": {        "_postman_id": "be135198-d2ff-47b6-a33e-0d27b9dba4c8",        "name": "Spark Livy",        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"    },    "item": [        {            "name": "1 Submit job with jar",            "request": {                "method": "POST",                "header": [                    {                        "key": "Content-Type",                        "value": "application/json"                    }                ],                "body": {                    "mode": "raw",                    "raw": "{\n\t\"file\": \"local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar\", \n\t\"className\": \"org.apache.spark.examples.SparkPi\",\n\t\"numExecutors\":1,\n\t\"name\": \"spark-test-1\",\n\t\"conf\": {\n\t\t\"spark.jars.ivy\": \"/tmp/.ivy\",\n\t\t\"spark.kubernetes.authenticate.driver.serviceAccountName\": \"spark\",\n\t\t\"spark.kubernetes.namespace\": \"{project}\",\n\t\t\"spark.kubernetes.container.image\": \"{docker-registry-url}/{repo}/{image-name}:{tag}\"\n\t}\n}"                },                "url": {                    "raw": "http://{livy-url}/batches",                    "protocol": "http",                    "host": [                        "{livy-url}"                    ],                    "path": [                        "batches"                    ]                }            },            "response": []        },        {            "name": "2 Submit job without jar",            "request": {                "method": "POST",                "header": [                    {                        "key": "Content-Type",                        "value": "application/json"                    }                ],                "body": {                    "mode": "raw",                    "raw": "{\n\t\"file\": \"hdfs://{host}:{port}/{path-to-file-on-hdfs}\", \n\t\"className\": \"{class-name}\",\n\t\"numExecutors\":1,\n\t\"name\": \"spark-test-2\",\n\t\"proxyUser\": \"0\",\n\t\"conf\": {\n\t\t\"spark.jars.ivy\": \"/tmp/.ivy\",\n\t\t\"spark.kubernetes.authenticate.driver.serviceAccountName\": \"spark\",\n\t\t\"spark.kubernetes.namespace\": \"{project}\",\n\t\t\"spark.kubernetes.container.image\": \"{docker-registry-url}/{repo}/{image-name}:{tag}\"\n\t},\n\t\"args\": [\n\t\t\"HADOOP_CONF_DIR=/opt/spark/hadoop-conf\",\n\t\t\"MASTER=k8s://https://kubernetes.default.svc:8443\"\n\t]\n}"                },                "url": {                    "raw": "http://{livy-url}/batches",                    "protocol": "http",                    "host": [                        "{livy-url}"                    ],                    "path": [                        "batches"                    ]                }            },            "response": []        }    ],    "event": [        {            "listen": "prerequest",            "script": {                "id": "41bea1d0-278c-40c9-ad42-bf2e6268897d",                "type": "text/javascript",                "exec": [                    ""                ]            }        },        {            "listen": "test",            "script": {                "id": "3cdd7736-a885-4a2d-9668-bd75798f4560",                "type": "text/javascript",                "exec": [                    ""                ]            }        }    ],    "protocolProfileBehavior": {}}

Выполним первый запрос из коллекции, перейдём в интерфейс OKD и проверим, что задача успешно запущена https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods. При этом в интерфейсе Livy (http://personeltest.ru/away/{livy-url}/ui) появится сессия, в рамках которой с помощью API Livy или графического интерфейса можно отслеживать ход выполнения задачи и изучать логи сессии.


Теперь покажем механизм работы Livy. Для этого изучим журналы контейнера Livy внутри пода с сервером Livy https://{OKD-WEBUI-URL}/console/project/{project}/browse/pods/{livy-pod-name}?tab=logs. Из них видно, что при вызове REST API Livy в контейнере с именем livy выполняется spark-submit, аналогичный используемому нами выше (здесь {livy-pod-name} имя созданного пода с сервером Livy). В коллекции также представлен второй запрос, позволяющий запускать задачи с удалённым размещением исполняемого файла Spark с помощью сервера Livy.


Третий вариант использования Spark Operator


Теперь, когда задача протестирована, встаёт вопрос её регулярного запуска. Нативным способом для регулярного запуска задач в кластере Kubernetes является сущность CronJob и можно использовать её, но в данный момент большую популярность имеет использование операторов для управления приложениями в Kubernetes и для Spark существует достаточно зрелый оператор, который, в том числе, используется в решениях Enterprise уровня (например, Lightbend FastData Platform). Мы рекомендуем использовать его текущая стабильная версия Spark (2.4.5) имеет достаточно ограниченные возможности по конфигурации запуска задач Spark в Kubernetes, при этом в следующей мажорной версии (3.0.0) заявлена полноценная поддержка Kubernetes, но дата её выхода остаётся неизвестной. Spark Operator компенсирует этот недостаток, добавляя важные параметры конфигурации (например, монтирование ConfigMap с конфигурацией доступа к Hadoop в поды Spark) и возможность регулярного запуска задачи по расписанию.



Выделим его в качестве третьего варианта использования регулярный запуск задач Spark на кластере Kubernetes в продуктивном контуре.


Spark Operator имеет открытый исходный код и разрабатывается в рамках Google Cloud Platform github.com/GoogleCloudPlatform/spark-on-k8s-operator. Его установка может быть произведена 3 способами:


  1. В рамках установки Lightbend FastData Platform/Cloudflow;
  2. С помощью Helm:
    helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubatorhelm install incubator/sparkoperator --namespace spark-operator
    

  3. Применением манифестов из официального репозитория (http://personeltest.ru/aways/github.com/GoogleCloudPlatform/spark-on-k8s-operator/tree/master/manifest). При этом стоит отметить следующее в состав Cloudflow входит оператор с версией API v1beta1. Если используется данный тип установки, то описания манифестов приложений Spark должны строиться на основе примеров из тегов в Git с соответствующей версией API, например, v1beta1-0.9.0-2.4.0. Версию оператора можно посмотреть в описании CRD, входящего в состав оператора в словаре versions:
    oc get crd sparkapplications.sparkoperator.k8s.io -o yaml
    


Если оператор установлен корректно, то в соответствующем проекте появится активный под с оператором Spark (например, cloudflow-fdp-sparkoperator в пространстве Cloudflow для установки Cloudflow) и появится соответствующий тип ресурсов Kubernetes с именем sparkapplications. Изучить имеющиеся приложений Spark можно следующей командой:


oc get sparkapplications -n {project}

Для запуска задач с помощью Spark Operator требуется сделать 3 вещи:


  • создать образ Docker, включающий в себя все необходимые библиотеки, а также конфигурационные и исполняемые файлы. В целевой картине это образ, созданный на этапе CI/CD и протестированный на тестовом кластере;
  • опубликовать образ Docker в реестр, доступный из кластера Kubernetes;
  • сформировать манифест с типом SparkApplication и описанием запускаемой задачи. Примеры манифестов доступны в официальном репозитории (например, github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta1-0.9.0-2.4.0/examples/spark-pi.yaml). Стоит отметить важные моменты касательно манифеста:
    1. в словаре apiVersion должна быть указана версия API, соответствующая версии оператора;
    2. в словаре metadata.namespace должно быть указано пространство имён, в котором будет запущено приложение;
    3. в словаре spec.image должен быть указан адрес созданного образа Docker в доступном реестре;
    4. в словаре spec.mainClass должен быть указан класс задачи Spark, который требуется запустить при запуске процесса;
    5. в словаре spec.mainApplicationFile должен быть указан путь к исполняемому jar файлу;
    6. в словаре spec.sparkVersion должна быть указана используемая версия Spark;
    7. в словаре spec.driver.serviceAccount должна быть указана сервисная учётная запись внутри соответствующего пространства имён Kubernetes, которая будет использована для запуска приложения;
    8. в словаре spec.executor должно быть указано количество ресурсов, выделяемых приложению;
    9. в словаре spec.volumeMounts должна быть указана локальная директория, в которой будут создаваться локальные файлы задачи Spark.


Пример формирования манифеста (здесь {spark-service-account} сервисный аккаунт внутри кластера Kubernetes для запуска задач Spark):


apiVersion: "sparkoperator.k8s.io/v1beta1"kind: SparkApplicationmetadata:  name: spark-pi  namespace: {project}spec:  type: Scala  mode: cluster  image: "gcr.io/spark-operator/spark:v2.4.0"  imagePullPolicy: Always  mainClass: org.apache.spark.examples.SparkPi  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"  sparkVersion: "2.4.0"  restartPolicy:    type: Never  volumes:    - name: "test-volume"      hostPath:        path: "/tmp"        type: Directory  driver:    cores: 0.1    coreLimit: "200m"    memory: "512m"    labels:      version: 2.4.0    serviceAccount: {spark-service-account}    volumeMounts:      - name: "test-volume"        mountPath: "/tmp"  executor:    cores: 1    instances: 1    memory: "512m"    labels:      version: 2.4.0    volumeMounts:      - name: "test-volume"        mountPath: "/tmp"

В данном манифесте указана сервисная учётная запись, для которой требуется до публикации манифеста создать необходимые привязки ролей, предоставляющие необходимые права доступа для взаимодействия приложения Spark с API Kubernetes (если нужно). В нашем случае приложению нужны права на создание Pod'ов. Создадим необходимую привязку роли:


oc adm policy add-role-to-user edit system:serviceaccount:{project}:{spark-service-account} -n {project}

Также стоит отметить, что в спецификации данного манифеста может быть указан параметр hadoopConfigMap, который позволяет указать ConfigMap с конфигурацией Hadoop без необходимости предварительного помещения соответствующего файла в образ Docker. Также он подходит для регулярного запуска задач с помощью параметра schedule может быть указано расписание запуска данной задачи.


После этого сохраняем наш манифест в файл spark-pi.yaml и применяем его к нашему кластеру Kubernetes:


oc apply -f spark-pi.yaml

При этом создастся объект типа sparkapplications:


oc get sparkapplications -n {project}> NAME       AGE> spark-pi   22h

При этом будет создан под с приложением, статус которого будет отображаться в созданном sparkapplications. Его можно посмотреть следующей командой:


oc get sparkapplications spark-pi -o yaml -n {project}

По завершении задачи POD перейдёт в статус Completed, который также обновится в sparkapplications. Логи приложения можно посмотреть в браузере или с помощью следующей команды (здесь {sparkapplications-pod-name} имя пода запущенной задачи):


oc logs {sparkapplications-pod-name} -n {project}

Также управление задачами Spark может быть осуществлено с помощью специализированной утилиты sparkctl. Для её установки клонируем репозиторий с её исходным кодом, установим Go и соберём данную утилиту:


git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.gitcd spark-on-k8s-operator/wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gztar -xzf go1.13.3.linux-amd64.tar.gzsudo mv go /usr/localmkdir $HOME/Projectsexport GOROOT=/usr/local/goexport GOPATH=$HOME/Projectsexport PATH=$GOPATH/bin:$GOROOT/bin:$PATHgo -versioncd sparkctlgo build -o sparkctlsudo mv sparkctl /usr/local/bin

Изучим список запущенных задач Spark:


sparkctl list -n {project}

Создадим описание для задачи Spark:


vi spark-app.yaml

apiVersion: "sparkoperator.k8s.io/v1beta1"kind: SparkApplicationmetadata:  name: spark-pi  namespace: {project}spec:  type: Scala  mode: cluster  image: "gcr.io/spark-operator/spark:v2.4.0"  imagePullPolicy: Always  mainClass: org.apache.spark.examples.SparkPi  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar"  sparkVersion: "2.4.0"  restartPolicy:    type: Never  volumes:    - name: "test-volume"      hostPath:        path: "/tmp"        type: Directory  driver:    cores: 1    coreLimit: "1000m"    memory: "512m"    labels:      version: 2.4.0    serviceAccount: spark    volumeMounts:      - name: "test-volume"        mountPath: "/tmp"  executor:    cores: 1    instances: 1    memory: "512m"    labels:      version: 2.4.0    volumeMounts:      - name: "test-volume"        mountPath: "/tmp"

Запустим описанную задачу с помощью sparkctl:


sparkctl create spark-app.yaml -n {project}

Изучим список запущенных задач Spark:


sparkctl list -n {project}

Изучим список событий запущенной задачи Spark:


sparkctl event spark-pi -n {project} -f

Изучим статус запущенной задачи Spark:


sparkctl status spark-pi -n {project}

В заключение хотелось бы рассмотреть обнаруженные минусы эксплуатации текущей стабильной версии Spark (2.4.5) в Kubernetes:


  1. Первый и, пожалуй, главный минус это отсутствие Data Locality. Несмотря на все недостатки YARN были и плюсы в его использовании, например, принцип доставки кода к данным (а не данных к коду). Благодаря ему задачи Spark выполнялись на узлах, где располагались данные, участвующие в вычислениях, и заметно уменьшалось время на доставку данных по сети. При использовании Kubernetes мы сталкиваемся с необходимостью перемещения по сети данных, задействованных в работе задачи. В случае, если они достаточно большие, то время выполнения задачи может существенно увеличиться, а также потребоваться достаточно большой объём дискового пространства, выделенного экземплярам задачи Spark для их временного хранения. Данный недостаток может быть снижен за счёт использования специализированных программных средств, обеспечивающих локальность данных в Kubernetes (например, Alluxio), но это фактически означает необходимость хранения полной копии данных на узлах кластера Kubernetes.
  2. Второй важный минус это безопасность. По умолчанию функции, связанные с обеспечением безопасности касательно запуска задач Spark отключены, вариант использования Kerberos в официальной документации не охвачен (хотя соответствующие параметры появились в версии 3.0.0, что потребует дополнительной проработки), а в документации по обеспечению безопасности при использовании Spark (http://personeltest.ru/aways/spark.apache.org/docs/2.4.5/security.html) в качестве хранилищ ключей фигурируют только YARN, Mesos и Standalone Cluster. При этом пользователь, под которым запускаются задачи Spark, не может быть указан напрямую мы лишь задаём сервисную учётную запись, под которой будет работать под, а пользователь выбирается исходя из настроенных политик безопасности. В связи с этим либо используется пользователь root, что не безопасно в продуктивном окружении, либо пользователь с случайным UID, что неудобно при распределении прав доступа к данным (решаемо созданием PodSecurityPolicies и их привязкой к соответствующим служебным учётным записям). На текущий момент решается либо помещением всех необходимых файлов непосредственно в образ Docker, либо модификацией скрипта запуска Spark для использования механизма хранения и получения секретов, принятого в Вашей организации.
  3. Запуск задач Spark с помощью Kubernetes официально до сих пор находится в экспериментальном режиме и в будущем возможны значительные изменения в используемых артефактах (конфигурационных файлах, базовых образов Docker и скриптах запуска). И действительно при подготовке материала тестировались версии 2.3.0 и 2.4.5, поведение существенно отличалось.

Будем ждать обновлений недавно вышла свежая версия Spark (3.0.0), принёсшая ощутимые изменения в работу Spark на Kubernetes, но сохранившая экспериментальный статус поддержки данного менеджера ресурсов. Возможно, следующие обновления действительно позволят в полной мере рекомендовать отказаться от YARN и запускать задачи Spark на Kubernetes, не опасаясь за безопасность Вашей системы и без необходимости самостоятельной доработки функциональных компонентов.


Fin.

Подробнее..

Перевод Apache Spark на Kubernetes чем полезен Apache YuniKorn

25.02.2021 10:21:06 | Автор: admin

Сунил Говиндан, Вэйвэй Янг, Вангда Tан, Уилфред Шпигельбург

Предпосылки

Почему для Apache Spark выбирают K8s

Apache Spark унифицирует в рамках одной платформы пакетную обработку в режиме реального времени, потоковую аналитику, машинное обучение и интерактивные запросы. Хотя Apache Spark предоставляет множество возможностей для разнообразных сценариев применения, его использование сопряжено с дополнительной сложностью и высокими затратами на обслуживание и администрирование кластера. Давайте рассмотрим некоторые высокоуровневые требования к базовому оркестратору ресурсов, чтобы расширить возможности Spark как единой платформы:

Контейнерные вычисления Spark для предоставления общих ресурсов разным заданиям машинного обучения и ETL.

Поддержка нескольких версий Spark, версий Python с контролем версий, контейнеры на общих кластерах K8s для более быстрой итерации и стабильной работы в продуктиве.

Единая унифицированная инфраструктура для большинства пакетных рабочих нагрузок и микросервисов.

Контроль доступа к общим кластерам.

Kubernetes как фактический стандарт для развертывания сервисов предлагает более точный и детализированный контроль всех вышеупомянутых аспектов по сравнению с другими оркестраторами ресурсов. Kubernetes предлагает упрощенный способ управления инфраструктурой и приложениями с практическим подходом к изоляции рабочих нагрузок, ограничению использования ресурсов, развертыванию ресурсов по запросу и возможностям автоматического масштабирования по мере необходимости.

Проблемы планирования задач Apache Spark на K8s

В планировщике Kubernetes по умолчанию есть пробелы с точки зрения эффективного развертывания пакетных рабочих нагрузок на том же кластере, где также планируется длительная работа других сервисов. Для пакетных рабочих нагрузок из-за требуемого параллелизма вычислений в основном должно планироваться совместное и гораздо более частое выполнение. Давайте подробно рассмотрим некоторые из этих пробелов.

Отсутствие первоклассной концепции приложения

В зависимости от типа развертываемого контейнера пакетные задания часто необходимо планировать к выполнению последовательно. Например, поды драйверов Spark нужно планировать раньше, чем рабочие поды. Четкая концепция первоклассного приложения может помочь с упорядочиванием или постановкой в очередь при каждом развертывании контейнера. Кроме того, такая концепция помогает администратору визуализировать задания, запланированные для отладки.

Отсутствие эффективных возможностей управления емкостью/квотами

Для управления ресурсами при выполнении рабочих нагрузок Spark в случае использования ресурсов несколькими арендаторами могут применяться квоты Kubernetes на ресурсы пространства имен (неймспейсов). Однако здесь есть несколько проблем:

1. Задания Apache Spark в отношении использования ресурсов являются динамическими по своей природе. А квоты пространства имен фиксируются и проверяются на этапе допуска. Запрос пода отклоняется, если он не соответствует квоте пространства имен. Это требует, чтобы задание Apache Spark вместо того, чтобы ставить запрос на выполнение в очередь внутри самого Kubernetes, реализовывало механизм повтора запросов пода.

2. Квота ресурсов пространства имен плоская, она не поддерживает иерархии управления квотами ресурсов.

Также часто пользователь может сталкиваться с нехваткой ресурсов для выполнения пакетных рабочих нагрузок, поскольку квоты пространства имен Kubernetes нередко не соответствуют плану распределения ресурсов на основе организационной иерархии. Эластичное и иерархическое управление приоритетами заданий в K8s сегодня отсутствует.

Недостаточно справедливое распределение ресурсов между арендаторами

В производственной, рабочей среде часто обнаруживается, что планировщик Kubernetes по умолчанию не может эффективно управлять диверсифицированными рабочими нагрузками и обеспечивать справедливость распределения ресурсов для рабочих нагрузок арендаторов. Вот некоторые из основных причин:

Управление рабочими нагрузками в производственной среде часто выполняется большим количеством пользователей.

В плотной производственной среде, где выполняются разные типы рабочих нагрузок, весьма вероятно, что поды драйверов Spark могут занять все ресурсы в пространстве имен. Такие сценарии создают большую проблему для эффективного совместного использования ресурсов.

Неправомерные или поврежденные задания могут легко захватить ресурсы и повлиять на производственные рабочие нагрузки.

Строгие требования SLA с задержкой планирования

На большинстве загруженных производственных кластеров, предназначенных для пакетных рабочих нагрузок, часто выполняются тысячи заданий с сотнями тысяч тасков ежедневно. Эти рабочие нагрузки требуют большего количества параллельно развертываемых контейнеров, и часто время жизни таких контейнеров весьма короткое (от нескольких секунд до часов). Обычно это влечет потребность в развертывании тысяч подов или контейнеров, ожидающих выполнения, и использование по умолчанию планировщика Kubernetes может привести к дополнительным задержкам, что, в свою очередь, приведет к невыполнению SLA.

Как может помочь Apache YuniKorn

Обзор Apache YuniKorn

YuniKorn - это улучшенный планировщик Kubernetes, как для сервисов, так и для пакетных рабочих нагрузок. В зависимости от сценариев использования и развертывания, YuniKorn может либо полностью заменить стандартный планировщик Kubernetes, либо работать с ним совместно.

YuniKorn является унифицированным кроссплатформенным планировщиком смешанных рабочих нагрузок, состоящих как из stateless пакетных рабочих нагрузок, так и stateful сервисов.

Сравнение YuniKorn и стандартного планировщика Kubernetes

Фича

Планировщик по умолчанию

YUNIKORN

Примечание

Концепция приложения

x

Приложения в YuniKorn приоритетны. YuniKorn планирует приложения с учетом, например, порядка их запуска, приоритета, использования ресурсов и т. д.

Упорядочение заданий

x

YuniKorn поддерживает правила упорядочения заданийFIFO/FAIR/Приоритеты (WIP)

Детализированное управление ресурсами

x

Управление ресурсами кластера с помощью иерархии очередей. Очереди предоставляют гарантированные ресурсы (минимум) и предельные квоты ресурсов (максимум).

Справедливое распределение ресурсов

x

Справедливое распределение ресурсов между приложениями и очередями для их идеального распределения для всех запущенных приложений.

Нативная поддержка нагрузок Big Data

x

Стандартный планировщик ориентирован на долговременные сервисы. YuniKorn разработан для рабочих нагрузок приложений Big Data и изначально поддерживает эффективное выполнение Spark / Flink / Tensorflow и т. д. на базе K8s.

Масштабирование и производительность

x

YuniKorn оптимизирован по производительности, он подходит для высокопроизводительных и крупномасштабных сред.


Как YuniKorn помогает в работе Spark на K8s

YuniKorn имеет богатый набор функций, которые помогают более эффективно запускать Apache Spark на Kubernetes. Подробности можно найти здесь (инструкции по запуску Spark на K8s с YuniKorn).

Ознакомьтесь с более подробной информацией о том, как YuniKorn расширяет возможности Spark на K8s: Cloud-Native Spark Scheduling with YuniKorn Scheduler на Spark & AI саммите 2020.

Давайте рассмотрим некоторые варианты использования и то, как YuniKorn помогает улучшить планирование ресурсов Spark в этих сценариях.

Несколько пользователей одновременно запускают разные рабочие нагрузки (шумные соседи)

По мере того, как все больше пользователей начинают одновременно запускать джобы, становится все сложнее изолировать их и справедливо предоставлять им необходимые ресурсы. Планировщик YuniKorn предоставляет оптимальное решение для управления квотами ресурсов путем использования очередей ресурсов.

В приведенном выше примере структуры очередей в YuniKorn и пространства имен, определенные в Kubernetes, сопоставляются с очередями неймспейсов родительской очереди с помощью политики размещения. Очереди сред тестирования и разработки имеют фиксированные ограничения по ресурсам. Все остальные очереди ограничены только размером кластера. Ресурсы распределяются между очередями с использованием справедливой политики, а задания в очереди в производственной среде планируются в порядке FIFO.

Вот некоторые из основных преимуществ:

Одна очередь YuniKorn может автоматически сопоставляться с одним пространством имен в Kubernetes.

Емкость очереди эластична по своей природе и может обеспечивать диапазон ресурсов от минимального до максимального заданного значения.

Справедливость распределения ресурсов, которая поможет избежать возможной нехватки ресурсов.

YuniKorn предоставляет простой способ управления квотой ресурсов для кластера Kubernetes, он может работать как замена квоты ресурсов пространства имен. Управление квотами ресурсов в YuniKorn позволяет задействовать очереди запросов подов и совместное использование ограниченных ресурсов между заданиями на основе подключаемых политик планирования. Все это может быть выполнено без каких-либо дополнительных манипуляций, таких как повторная отправка запроса к поду Apache Spark.

Настройка кластера для модели распределения ресурсов на основе существующей в организации иерархии

В большой производственной среде несколько пользователей будут одновременно запускать рабочие нагрузки разных типов. Часто эти пользователи вынуждены потреблять ресурсы в зависимости от бюджетных ограничений в иерархии организации. Такая производственная настройка помогает эффективно использовать ресурсы кластера в пределах выделенной квоты ресурсов.

YuniKorn позволяет управлять ресурсами кластера с иерархией очередей. Детальное управление распределением ресурсов в многопользовательской среде становится возможным за счет использования очередей ресурсов с четкой иерархией (например, соответствующей иерархии организации). Очереди YuniKorn можно настраивать статически или динамически управлять ими, а с помощью функции динамического управления очередями пользователи могут настраивать правила для делегирования функций управления очередью.

Улучшенное SLA заданий Spark в мультиарендном кластере

Обычные рабочие нагрузки ETL, выполняемые в мультиарендном (мультитенантном) кластере, требуют более простых средств определения детализированных политик для выполнения заданий в желаемой для организации иерархии очередей. Часто такие политики помогают определять для выполнения заданий более строгие SLA.

YuniKorn дает администраторам возможность размещать задания в очередях на основе более простых политик, таких как FIFO, FAIR и т. д. Политика сортировки приложений StateAware упорядочивает задания в очереди в порядке FIFO и планирует их выполнение одно за другим согласно условиям. Это позволяет избежать обычного состояния гонки при отправке большого количества пакетных заданий, например Spark, в одно пространство имен (или кластер). Обеспечивая определенный порядок заданий, она также улучшает планирование заданий, делая его более предсказуемым.

Использование различных фич K8s для планирования заданий Apache Spark

YuniKorn полностью совместим с основными выпущенными версиями K8s. Пользователи могут прозрачным образом менять планировщик в существующем кластере K8s. YuniKorn полностью поддерживает всю родную семантику K8s, которая может использоваться во время планирования, такую как селектор меток, афинность/анти-афинность пода, толерантность, PV/PVC и т. д. YuniKorn также совместим с командами управления и утилитами, такими как узлы cordon, получение событий через kubectl и т. д.

Apache YuniKorn в CDP

Платформа CDP от Cloudera предлагает аналитическое приложение Cloudera Data Engineering на базе Apache YuniKorn (в инкубации).

Некоторые из основных вариантов использования YuniKorn в Cloudera включают в себя:

Управление квотами ресурсов для виртуальных кластеров CDE.

Расширенные возможности планирования заданий Spark.

Ответственность за планирование как микросервисных, так и пакетных заданий.

Работа в облаке с поддержкой автоматического масштабирования.

Планы на улучшение поддержки рабочих нагрузок Spark

Сообщество YuniKorn активно изучает определенные улучшения основных функций для поддержки выполнения рабочих нагрузок Spark. Например, для более эффективного выполнения важно выделить минимальное количество подов драйверов и рабочих подов. Параллельное планирование (gang scheduling) помогает обеспечить выделение для выполнения задания Spark необходимого количества подов. Такая функция будет очень полезна в шумном окружении при развертывании многопользовательского кластера. Для получения более подробной информации см. YUNIKORN-2 (Jira отслеживает прогресс функций).

Поддержка приоритетов заданий/задач

Упорядочение по приоритетам на уровне заданий помогает администраторам расставлять приоритеты и YuniKorn выделять необходимые ресурсы заданиям с высоким уровнем SLA. Это также дает большую гибкость для эффективного использования ресурсов кластера. Для получения более подробной информации см. YUNIKORN-1 (Jira отслеживает прогресс функции).

Распределенная трассировка

В YUNIKORN-387 для улучшения общего контроля планировщика используется открытая трассировка. С помощью этой функции можно собирать и сохранять критические трассировки в рамках основного цикла планирования для устранения неполадок, профилирования системы и мониторинга.

Резюме

YuniKorn помогает эффективно осуществлять детальное распределение ресурсов для различных рабочих нагрузок Spark в крупномасштабных многопользовательских средах, с одной стороны, и динамически создаваемых облачных средах - с другой.

Таким образом, благодаря YuniKorn платформа Apache Spark может стать для пользователей важной платформой корпоративного уровня, предлагающей надежную основу для различных приложений, от крупномасштабного преобразования данных до задач аналитики и машинного обучения.

Подробнее..

Перевод Распределенное обучение XGBoost и параллельное прогнозирование с Apache Spark

24.06.2020 14:16:49 | Автор: admin
Привет, хабр! Уже в конце июля Otus запускает новый курс Промышленный ML на больших данных. Традиционно, в преддверии старта нового курса, мы подготовили для вас перевод полезного материала.



Общие сведения


В бустинге (из ансамбля моделей машинного обучения), алгоритмы реализуют последовательный процесс (в отличие от бэггинга, где он распараллелен), который генерирует слабые обучающие алгоритмы и комбинирует их с сильным (как и во всех методах ансамбля). В бустинге на каждой итерации процесса модель пытается адаптивно исправить ошибки предыдущей итерации, в отличие от бэггинга, в котором слабые обучающие алгоритмы обучаются независимо.



Один из алгоритмов бустинга, градиентный бустинг, использует градиентный спуск для минимизации функции потерь прямо в этих последовательных моделях (в отличие от алгоритма AdaBoost, где обучение происходит посредством изменения весов обучающих экземпляров).

Слабые обучающие алгоритмы, созданные при градиентном бустинге во время обучения, обычно реализуются в виде деревьев решений. Самое неэффективное в градиентном бустинге это последовательный процесс создания деревьев, поскольку в таком случае создается всего одно дерево за раз.

Чтобы обойти это ограничение Тяньцзи Ченом и Карлосом Гестрином было предложено улучшение алгоритма градиентного бустинга, которое называется XGBoost, что расшифровывается как Extreme Gradient Boosting или экстремальный градиентный бустинг. Это своего рода градиентный бустинг на стероидах, который используется в основном для классификации, но также порой для регрессии и ранжирования.

По сравнению со стандартным градиентным бустингом, новый метод существенно увеличивает производительность за счет гиперпараметров, поддержки GPU, кроссвалидации и регуляризации алгоритмов. В целом модель получается более эффективной, быстрее обучается и менее подвержена переобучению.

В последнее время XGBoost обрел большую популярность и выиграл множество соревнований по машинному обучению в Kaggle. Считается, что он обладает большой вычислительной мощностью и точностью.



XGBoost и Apache Spark


Во время стандартного workflow в ML используются такие системы как Spark для создания пайплайна машинного обучения, где вы предварительно обрабатываете и чистите данные, а затем результат передается на этап машинного обучения, зачастую с помощью Spark MLlib, если вы уже используете Spark.

В контексте этой статьи важно то, что в XGBoost есть распараллеливание процесса построения дерева, что позволяет производить между узлами распределенное обучение и прогнозирование. То есть если я, как пользователь Apache Spark MLlib, могу использовать его для расширения возможностей обучения XGBoost и работы на продакшене, то, по сути, я могу радоваться высокой производительности XGBoost и мощным механизмам работы Spark для инженерии признаков и построения ML-пайплайнов.

Встречайте XGBoost4J-Spark проект, который объединяет XGBoost и Apache Spark, добавляя XGBoost к фреймворку Apache Spark MLlib.

XGBoost4J-Spark дает возможность построить пайплайн MLlib, который предварительно обрабатывает данные перед обучением модели XGBoost, обучает ее и может использоваться для параллельного прогнозирования на продкшене. С помощью этой библиотеки каждый воркер XGBoost оборачивается в таск Spark, при этом обучающий датасет из памяти Spark отправляется воркерам XGBoost, которые невидимо существуют в исполнителях Spark.



Чтобы начать писать приложение с машинным обучением на XGBoost4J-Spark, вам нужно сначала добавить соответствующую зависимость:

<dependency>    <groupId>ml.dmlc</groupId>    <artifactId>xgboost4j-spark</artifactId>    <version>0.90</version></dependency>


Подготовка данных (пример с ирисами)


Как говорилось ранее, XGBoost4J-Spark позволяет подогнать данные под интерфейс XGBoost.

Как только мы считаем датасет Цветы Ириса в DataFrame, нам нужно будет:

  • Преобразовать столбцы из String к Double;
  • Объединить столбцы признаков в вектора, чтобы данные соответствовали интерфейсу фреймворка машинного обучения Spark.


import org.apache.spark.ml.feature.StringIndexerimport org.apache.spark.ml.feature.VectorAssemblerval stringIndexer = new StringIndexer().  setInputCol("class").  setOutputCol("classIndex").  fit(irisDF)val labelTransformed = stringIndexer.transform(irisDF).drop("class")val vectorAssembler = new VectorAssembler().  setInputCols(Array("sepal length", "sepal width", "petal length", "petal width")).  setOutputCol("features")val xgbInput = vectorAssembler.transform(labelTransformed).select("features", "classIndex")


В DataFrame выше в результате будут два столбца, features: вектор представляющий признаки ириса и classIndex: лейбл типа Double. Такой DataFrame можно спокойно скормить обучающему движку XGBoost4J-Spark.

Распределенное обучение


import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifierval xgbClassifier = new XGBoostClassifier().      setFeaturesCol("features").      setLabelCol("classIndex").      setObjective("multi:softmax")      setMaxDepth(2).      setNumClass(3).      setNumRound(100).      setNumWorkers(10).


Полный список параметров XGBoost вы можете найти здесь. Обратите внимание, что в XGBoost4J-Spark вы также можете использовать camelСase, как в примере выше.

Заметки:


  1. multi:softmax означает, что мы делаем многоклассовую классификацию с помощью функции softmax. Для этого нужно задать количество классов с помощью параметра num_class.
  2. max_depth это максимальная глубина дерева, созданного на каждой итерации бустинга. Увеличение этого значения сделает модель сложной и склонной к переобучению. При обучении глубоких деревьев XGBoost потребляет много памяти.
  3. num_rounds количество раундов бустинга.
  4. Параметр num_workers определяет сколько параллельных воркеров нам нужно при обучении XGBoostClassificationModel. Позже этот параметр станет отложенными тасками в Spark, которые в перспективе будут обрабатываться менеджером кластера (в большинстве случаев YARN).

Ранняя остановка поддерживается с помощью параметров num_early_stopping_rounds и maximize_evaluation_metrics.
Теперь мы можем создать трансформер, обучив классификатор XGBoost на входном DataFrame. В результате процесса обучения мы получаем модель, которую можно использовать для получения прогнозов.

val xgbClassificationModel = xgbClassifier.fit(xgbInput)


Параллельное прогнозирование


XGBoost4j-Spark поддерживает пакетное прогнозирование и точечное прогнозирование.

Для пакетного прогнозирования модель берет DataFrame со столбцом, содержащим векторы признаков, делает прогноз для каждого вектора признаков и выводит новый DataFrame с результатами. В этом процессе XGBoost4J-Spark запускает таск Spark с воркером XGBoost для каждой части входного DataFrame для параллельного пакетного прогнозирования.

val predictionsDf = xgbClassificationModel.transform(inputDF)predictionsDf.show()+----------------+----------+-------------+-------------+----------+|       features |classIndex|rawPrediction| probability |prediction|+----------------+----------+-------------+-------------+----------+|[5.1,3.5,1.2,.. |       0.0|[3.4556984...|[0.9957963...|       0.0||[4.7,3.2,1.3,.. |       0.0|[3.4556984...|[0.9961891...|       0.0||[5.7,4.4,1.5,.. |       0.0|[3.4556984...|[0.9964334...|       0.0|+----------------+----------+-------------+-------------+----------+


Для точечного прогнозирования модель принимает один вектор.

val features = xgbInput.head().getAs[Vector]("features")val result = xgbClassificationModel.predict(features)


Точечное прогнозирование с помощью XGBoost не рекомендуется из-за больших накладных расходов, поскольку они будут сравнимы с единичным прогнозом.

На данный момент последняя версия (0.9) XGBoost4J-Spark требует Spark 2.4.x., в основном потому, что теперь в нем используются средства org.apache.spark.ml.param.shared, которые доступны не полностью в более ранних версиях Spark.

Также эта версия включает в себя более последовательную обработку пропущенных значений, лучшую производительность для многоядерных процессоров, улучшение управления кэшированием разделенных обучающих данных для сокращения времени обучения и т.д.

Узнать больше вы можете в документации XGBoost.

Источники:


XGBoost с CUDA
XGBoost в Spark c GPU и RAPIDS XGboost4J-Spark



Узнать о курсе подробнее.


Подробнее..

Перевод Как Apache Spark 3.0 увеличивает производительность ваших SQL рабочих нагрузок

01.06.2021 12:20:22 | Автор: admin

Практически в каждом секторе, работающем со сложными данными, Spark "де-факто" быстро стал средой распределенных вычислений для команд на всех этапах жизненного цикла данных и аналитики. Одна из наиболее ожидаемых функций Spark 3.0 - это новая платформа Adaptive Query Execution (AQE), устраняющая проблемы, которые возникают при многих рабочих нагрузках Spark SQL. Они были задокументированы в начале 2018 года командой специалистов Intel и Baidu. Для более глубокого изучения фреймворка можно пройти наш обновленный курс по тюнингу производительности Apache Spark (Apache Spark Performance Tuning).

Наш опыт работы с Workload XM, безусловно, подтверждает реальность и серьезность этих проблем.

AQE был впервые представлен в Spark 2.4, но в Spark 3.0 и 3.1 он стал намного более развитым. Для начала, давайте посмотрим, какие проблемы решает AQE.

Недостаток первоначальной архитектуры Catalyst

На диаграмме ниже представлен вид распределенной обработки, которая происходит, когда вы выполняете простой group-by-count запрос с использованием DataFrames.

Spark определяет подходящее количество партиций для первого этапа, но для второго этапа использует по умолчанию "магическое число" - 200.

И это плохо по трем причинам:

1. 200 вряд ли будет идеальным количеством партиций, а именно их количество является одним из критических факторов, влияющих на производительность;

2. Если вы запишете результат этого второго этапа на диск, у вас может получиться 200 маленьких файлов;

3. Оптимизация и ее отсутствие имеют косвенный эффект: если обработка должна продолжаться после второго этапа, то вы можете упустить потенциальные возможности дополнительной оптимизации.

Что можно сделать? Вручную установить значение этого свойства перед выполнением запроса с помощью такого оператора:

spark.conf.set(spark.sql.shuffle.partitions,2)

Но это также создает некоторые проблемы:

  • Задавать данный параметр перед каждым запросом утомительно.

  • Эти значения станут устаревшими по мере эволюции ваших данных.

  • Этот параметр будет применяться ко всем шаффлингах в вашем запросе.

Перед первым этапом в предыдущем примере известно распределение и объем данных, и Spark может предложить разумное значение для количества партиций. Однако для второго этапа эта информация еще не известна, так как цена, которую нужно заплатить за ее получение, - это выполнение фактической обработки на первом этапе: отсюда и обращение к магическому числу.

Принцип работы Adaptive Query Execution

Основная идея AQE состоит в том, чтобы сделать план выполнения не окончательным и перепроверять статус после каждого этапа. Таким образом, план выполнения разбивается на новые абстракции этапов запроса, разделенных этапами.

Catalyst теперь останавливается на границе каждого этапа, чтобы попытаться применить дополнительную оптимизацию с учетом информации, доступной на основе промежуточных данных.

Поэтому AQE можно определить как слой поверх Spark Catalyst, который будет изменять план Spark "на лету".

Есть недостатки? Некоторые есть, но они второстепенные:

  • Выполнение останавливается на границе каждого этапа, чтобы Spark проверил свой план, но это компенсируется увеличением производительности.

  • Пользовательский интерфейс Spark труднее читать, потому что Spark создает для данного приложения больше заданий, и эти задания не подхватывают группу заданий и описание, которое вы задали.

Адаптивное количество перемешиваемых партиций

Эта функция AQE доступна, начиная с версии Spark 2.4.

Чтобы включить ее, вам нужно установить для spark.sql.adaptive.enabled значение true, значение по умолчанию - false. Когда AQE включено, количество партиций в случайном порядке регулируется автоматически и больше не равно 200 по умолчанию или заданному вручную значению.

Вот как выглядит выполнение первого запроса TPC-DS до и после включения AQE:

Динамическая конвертация Sort Merge Joins в Broadcast Joins

AQE преобразует соединения sort-merge в broadcast хэш-соединения, если статистика времени выполнения любой из сторон соединения меньше порога broadcast хэш-соединения.

Вот как выглядят последние этапы выполнения второго запроса TPC-DS до и после включения AQE:

Динамическое объединение shuffle партиций

Если количество разделов в случайном порядке больше, чем количество групп по ключам, то много циклов ЦП теряется из-за несбалансированного распределения ключей.

Когда оба:

spark.sql.adaptive.enabled и

spark.sql.adaptive.coalescePartitions.enabled

установлены на true, Spark объединит смежные перемешанные разделы в соответствии с целевым размером, указанным в spark.sql.adaptive.advisoryPartitionSizeInBytes. Это делается, чтобы избежать слишком большого количества мелких задач.

Динамическая оптимизация обьединений с перекосом

Skew (перекос) - это камень преткновения распределенной обработки. Это может задержать обработку буквально на несколько часов:

Без оптимизации время, необходимое для выполнения объединения, будет определяться самым большим разделом.

Оптимизация skew join, таким образом, разобъет раздел A0 на подразделы, используя значение, указанное park.sql.adaptive.advisoryPartitionSizeInBytes, и присоединит каждый из них к соответствующему разделу B0 таблицы B.

Следовательно, вам необходимо предоставить AQE свое определение перекоса.

Это включает в себя два параметра:

1. spark.sql.adaptive.skewJoin.skewedPartitionFactor является относительным: партиция считается с пересом, если ее размер больше, чем этот коэффициент, умноженный на средний размер партиции, а также, если он больше, чем

2. spark.sql.adaptive.skewedPartitionThresholdInBytes, который является абсолютным: это порог, ниже которого перекос будет игнорироваться.

Динамическое сокращение разделов

Идея динамического сокращения разделов (dynamic partition pruning, DPP) - один из наиболее эффективных методов оптимизации: считываются только те данные, которые вам нужны. Если в вашем запросе есть DPP, то AQE не запускается. DPP было перенесено в Spark 2.4 для CDP.

Эта оптимизация реализована как на логическом, так и на физическом уровне.

1. На логическом уровне фильтр размера идентифицируется и распространяется через обьединение на другую часть сканирования.

2. Затем на физическом уровне фильтр выполняется один раз в части измерения, и результат транслируется в основную таблицу, где также применяется фильтр.

DPP в действительности может работать с другими типами обьединений (например, SortMergeJoin), если вы отключите spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly.

В этом случае Spark оценит, действительно ли фильтр DPP улучшает производительность запроса.

DPP может привести к значительному увеличению производительности высокоселективных запросов, например, если ваш запрос фильтрует данные за один месяц из выборки за 5 лет.

Не все запросы получают такой впечатляющий прирост производительности, но 72 из 99 запросов TPC-DS положительно влияют на DPP.

Заключение

Spark прошел долгий путь от своей первоначальной базовой парадигмы: неспешного выполнения оптимизированного статического плана для статического набора данных.

Анализ статического набора данных был пересмотрен из-за потоковой передачи: команда Spark сначала создала довольно неуклюжий дизайн на основе RDD, прежде чем придумать лучшее решение с использованием DataFrames.

Часть статического плана подверглась сомнению со стороны SQL, а структура адаптивного выполнения запросов в некотором роде - это то, чем является структурированная потоковая передача для исходной потоковой библиотеки: элегантное решение, которым она должна была быть с самого начала.

Благодаря фреймворку AQE, DPP, усиленной поддержке графических процессоров и Kubernetes перспективы увеличения производительности теперь весьма многообещающие, поэтому мы и наблюдаем повсеместный переход на Spark 3.1

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru