Русский
Русский
English
Статистика
Реклама

Performance optimization

Из песочницы Удав укрощает Graal VM

04.11.2020 22:14:20 | Автор: admin


В мире Java за последнее время произошло много интересных событий. Одним из таких событий стал выход первой production ready версии Graal VM.


Лично у меня Graal давно вызывает нескрываемый интерес и я пристально слежу за докладами и последними новостями в этой области. Одно время попался на глаза доклад Криса Талингера. В нём Крис рассказывает как в Twitter удалось получить значительный выигрыш в производительности, применив для настройки Graal алгоритмы машинного обучения. У меня появилось стойкое желание попробовать подобное самому. В этой статье хочу поделится тем, что в итоге получилось.


Эксперимент


Для реализации эксперимента мне понадобились:


  1. cвежий Graal VM Community Edition. На момент написания статьи это 20.2.0
  2. выделенное облачное окружение для нагрузочного тестирования
  3. NewRelic для сбора метрик
  4. генератор тестовой нагрузки
  5. программа на Python и набор скриптов, для реализации самого алгоритма ML

Если описать задачу сухим языком математики, то она будет выглядеть так


Найти такие значения параметров $inline$A = (a_1,a_2,..,a_n)$inline$ при которых функция
$inline$f(x_1,x_2,..,x_n)$inline$ принимает максимальное значение.


Я решил минимизировать потребление процессорного времени и выбрал такую целевую функцию:


$$display$$f=1/mean(CPUUtilization)$$display$$


Чем меньше нагружен процессор, тем ближе целевая функция к единице.
В качестве параметров которые нужно найти взял те же, что и в докладе:


-Dgraal.MaximumInliningSize -Dgraal.TrivialInliningSize  -Dgraal.SmallCompiledLowLevelGraphSize

Все они отвечают за инлайнинг. Это важная оптимизация, которая позволяет
сэкономить на вызове методов.


С постановкой задачи оптимизации разобрались. Теперь пришло время пройтись по шагам
алгоритма оптимизации:


  1. алгоритм делает предположение о том какие параметры оптимальные
  2. меняет конфигурацию JVM и запускает нагрузочный тест
  3. снимает метрики и вычисляет значение целевой функции
  4. делает новое предположение на основе значения целевой функции

Процесс повторяется несколько раз.



В качестве алгоритма поиска Twitter предлагает использовать байесовскую оптимизацию. Она лучше справляется с зашумленными функциями и не требует большого количества итераций.


Байесовская оптимизация работает путем построения апостериорного распределения функций,
которое наилучшим образом её описывает. По мере роста количества наблюдений улучшается апостериорное распределение и алгоритм становится более определённым в том, какие регионы в пространстве параметров стоит изучить.


Получение метрик из NewRelic


Для работы с NewRelic REST API необходимо узнать свои APP_ID и API_KEY.
APP_ID это уникальный идентификатор приложения в системе. Его можно найти в разделе APM.
API_KEY необходимо создать или узнать из настроек профиля в NewRelic.


Структура ответа для всех метрик приблизительно одинакова и имеет следующий вид:


{  "metric_data": {    "from": "time",    "to": "time",    "metrics_not_found": "string",    "metrics_found": "string",    "metrics": [      {        "name": "string",        "timeslices": [          {            "from": "time",            "to": "time",            "values": "hash"          }        ]      }    ]  }}

В итоге метод для получения метрик будет таким:


def request_metrics(params):    app_id = "APP_ID"    url = "https://api.newrelic.com/v2/applications/"+ app_id + "/metrics/data.json"    headers = {'X-Api-Key':"API_KEY"}    response = requests.get(url, headers=headers, params=params)    return response.json()

Для получения CPU Utilzation значение params следующее:


params = {    'names[]': "CPU/User/Utilization",    'values[]': "percent",    'from': timerange[0],    'to': timerange[1],    'raw': "false"}

timerange хранит значения времени начала и конца нагрузочного теста.


Далее парсим запрос и извлекаем необходимые метрики


def get_timeslices(response_json, value_name):    metrics = response_json['metric_data']['metrics'][0]    timeslices = metrics['timeslices']    values = []    for t in timeslices:        values.append(t['values'][value_name])    return values

Алгоритм оптимизации


Перейдем к самому интересному поиску оптимальных параметров.


Для реализации байесовской оптимизации взял уже готовую библиотеку BayesianOptimization.


Сначала создадим метод для вычисления целевой функции.


def objective_function(maximumInliningSize, trivialInliningSize, smallCompiledLowLevelGraphSize):    update_config(int(maximumInliningSize), int(trivialInliningSize), int(smallCompiledLowLevelGraphSize))    timerange = do_test()    data = get_results(timerange)    return calculate(data)

Метод _updateconfig вызывает скрипт, который обновляет конфиг приложения. Далее в _dotest происходит вызов скрипта для запуска нагрузочного теста.


Каждое изменение конфигурации требует перезапуска JVM и первые несколько минут идёт фаза прогрева. Эту фазу необходимо отфильтровать, откинув первые минуты.


В методе calculate вычисляем целевую функцию:


    value = 1 / (mean(filtered_data))

Необходимо ограничить поиск


pbounds = {            'maximumInliningSize': (200, 500),           'trivialInliningSize': (10, 25),           'smallCompiledLowLevelGraphSize': (200, 650)           }

Так как улучшение должно быть относительно настроек по умолчанию, то я добавил соответствующую точку


  optimizer.probe(        params={"maximumInliningSize": 300.0,                "trivialInliningSize": 10.0,                "smallCompiledLowLevelGraphSize": 300.0},        lazy=True,        )

Окончательный метод ниже


def autotune():    pbounds = {                'maximumInliningSize': (200, 500),               'trivialInliningSize': (10, 25),               'smallCompiledLowLevelGraphSize': (200, 650)               }    optimizer = BayesianOptimization(        f=objective_function,        pbounds=pbounds,        random_state=1,    )    optimizer.probe(    params={"maximumInliningSize": 300.0,            "trivialInliningSize": 10.0,            "smallCompiledLowLevelGraphSize": 300.0},    lazy=True,    )    optimizer.maximize(        init_points=2,        n_iter=10,    )    print(optimizer.max)

В данном примере _objectivefunction выполнится 12 раз и в конце выведет значение
параметров, при которых наша целевая функция была максимальная. Чем больше итераций, то тем точнее мы приближаемся к максимуму функции.


При необходимости все итерации можно вывести вот так:


for i, res in enumerate(optimizer.res):    print("Iteration {}: \n\t{}".format(i, res))

Код выведет значения целевой функции и параметров для каждой итерации.


Iteration 0:    {'target': 0.02612330198537095, 'params': {'maximumInliningSize': 300.0, 'smallCompiledLowLevelGraphSize': 300.0, 'trivialInliningSize': 10.0}}Iteration 1:    {'target': 0.02666666666666667, 'params': {'maximumInliningSize': 325.1066014107722, 'smallCompiledLowLevelGraphSize': 524.1460220489712, 'trivialInliningSize': 10.001715622260173}}...

Результаты


Сравнил два прогона нагрузочного теста с настройками по умолчанию и вычисленными в ходе эксперимента.


На графике можно заметить, что CPU Utilization уменьшился для случая Graal



Среднее время отклика также незначительно снизилось:



Пропускная способность ограничена сверху и никак не менялась для обоих прогонов.



Заключение


В итоге удалось получить снижение нагрузки CPU в среднем на 4-5%.


Для нашего проекта такая экономия на CPU не существенна, но для proof of concept
результат достаточно неплохой.


С2 много лет оптимизировали под Java и поэтому соревноваться Graal с С2 пока сложно. Больше выгоды можно получить от связки Graal с другими JVM языками, такими как Scala и Kotlin.

Подробнее..
Категории: Python , Java , Graalvm , Performance optimization

Перевод Как Apache Spark 3.0 увеличивает производительность ваших SQL рабочих нагрузок

01.06.2021 12:20:22 | Автор: admin

Практически в каждом секторе, работающем со сложными данными, Spark "де-факто" быстро стал средой распределенных вычислений для команд на всех этапах жизненного цикла данных и аналитики. Одна из наиболее ожидаемых функций Spark 3.0 - это новая платформа Adaptive Query Execution (AQE), устраняющая проблемы, которые возникают при многих рабочих нагрузках Spark SQL. Они были задокументированы в начале 2018 года командой специалистов Intel и Baidu. Для более глубокого изучения фреймворка можно пройти наш обновленный курс по тюнингу производительности Apache Spark (Apache Spark Performance Tuning).

Наш опыт работы с Workload XM, безусловно, подтверждает реальность и серьезность этих проблем.

AQE был впервые представлен в Spark 2.4, но в Spark 3.0 и 3.1 он стал намного более развитым. Для начала, давайте посмотрим, какие проблемы решает AQE.

Недостаток первоначальной архитектуры Catalyst

На диаграмме ниже представлен вид распределенной обработки, которая происходит, когда вы выполняете простой group-by-count запрос с использованием DataFrames.

Spark определяет подходящее количество партиций для первого этапа, но для второго этапа использует по умолчанию "магическое число" - 200.

И это плохо по трем причинам:

1. 200 вряд ли будет идеальным количеством партиций, а именно их количество является одним из критических факторов, влияющих на производительность;

2. Если вы запишете результат этого второго этапа на диск, у вас может получиться 200 маленьких файлов;

3. Оптимизация и ее отсутствие имеют косвенный эффект: если обработка должна продолжаться после второго этапа, то вы можете упустить потенциальные возможности дополнительной оптимизации.

Что можно сделать? Вручную установить значение этого свойства перед выполнением запроса с помощью такого оператора:

spark.conf.set(spark.sql.shuffle.partitions,2)

Но это также создает некоторые проблемы:

  • Задавать данный параметр перед каждым запросом утомительно.

  • Эти значения станут устаревшими по мере эволюции ваших данных.

  • Этот параметр будет применяться ко всем шаффлингах в вашем запросе.

Перед первым этапом в предыдущем примере известно распределение и объем данных, и Spark может предложить разумное значение для количества партиций. Однако для второго этапа эта информация еще не известна, так как цена, которую нужно заплатить за ее получение, - это выполнение фактической обработки на первом этапе: отсюда и обращение к магическому числу.

Принцип работы Adaptive Query Execution

Основная идея AQE состоит в том, чтобы сделать план выполнения не окончательным и перепроверять статус после каждого этапа. Таким образом, план выполнения разбивается на новые абстракции этапов запроса, разделенных этапами.

Catalyst теперь останавливается на границе каждого этапа, чтобы попытаться применить дополнительную оптимизацию с учетом информации, доступной на основе промежуточных данных.

Поэтому AQE можно определить как слой поверх Spark Catalyst, который будет изменять план Spark "на лету".

Есть недостатки? Некоторые есть, но они второстепенные:

  • Выполнение останавливается на границе каждого этапа, чтобы Spark проверил свой план, но это компенсируется увеличением производительности.

  • Пользовательский интерфейс Spark труднее читать, потому что Spark создает для данного приложения больше заданий, и эти задания не подхватывают группу заданий и описание, которое вы задали.

Адаптивное количество перемешиваемых партиций

Эта функция AQE доступна, начиная с версии Spark 2.4.

Чтобы включить ее, вам нужно установить для spark.sql.adaptive.enabled значение true, значение по умолчанию - false. Когда AQE включено, количество партиций в случайном порядке регулируется автоматически и больше не равно 200 по умолчанию или заданному вручную значению.

Вот как выглядит выполнение первого запроса TPC-DS до и после включения AQE:

Динамическая конвертация Sort Merge Joins в Broadcast Joins

AQE преобразует соединения sort-merge в broadcast хэш-соединения, если статистика времени выполнения любой из сторон соединения меньше порога broadcast хэш-соединения.

Вот как выглядят последние этапы выполнения второго запроса TPC-DS до и после включения AQE:

Динамическое объединение shuffle партиций

Если количество разделов в случайном порядке больше, чем количество групп по ключам, то много циклов ЦП теряется из-за несбалансированного распределения ключей.

Когда оба:

spark.sql.adaptive.enabled и

spark.sql.adaptive.coalescePartitions.enabled

установлены на true, Spark объединит смежные перемешанные разделы в соответствии с целевым размером, указанным в spark.sql.adaptive.advisoryPartitionSizeInBytes. Это делается, чтобы избежать слишком большого количества мелких задач.

Динамическая оптимизация обьединений с перекосом

Skew (перекос) - это камень преткновения распределенной обработки. Это может задержать обработку буквально на несколько часов:

Без оптимизации время, необходимое для выполнения объединения, будет определяться самым большим разделом.

Оптимизация skew join, таким образом, разобъет раздел A0 на подразделы, используя значение, указанное park.sql.adaptive.advisoryPartitionSizeInBytes, и присоединит каждый из них к соответствующему разделу B0 таблицы B.

Следовательно, вам необходимо предоставить AQE свое определение перекоса.

Это включает в себя два параметра:

1. spark.sql.adaptive.skewJoin.skewedPartitionFactor является относительным: партиция считается с пересом, если ее размер больше, чем этот коэффициент, умноженный на средний размер партиции, а также, если он больше, чем

2. spark.sql.adaptive.skewedPartitionThresholdInBytes, который является абсолютным: это порог, ниже которого перекос будет игнорироваться.

Динамическое сокращение разделов

Идея динамического сокращения разделов (dynamic partition pruning, DPP) - один из наиболее эффективных методов оптимизации: считываются только те данные, которые вам нужны. Если в вашем запросе есть DPP, то AQE не запускается. DPP было перенесено в Spark 2.4 для CDP.

Эта оптимизация реализована как на логическом, так и на физическом уровне.

1. На логическом уровне фильтр размера идентифицируется и распространяется через обьединение на другую часть сканирования.

2. Затем на физическом уровне фильтр выполняется один раз в части измерения, и результат транслируется в основную таблицу, где также применяется фильтр.

DPP в действительности может работать с другими типами обьединений (например, SortMergeJoin), если вы отключите spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly.

В этом случае Spark оценит, действительно ли фильтр DPP улучшает производительность запроса.

DPP может привести к значительному увеличению производительности высокоселективных запросов, например, если ваш запрос фильтрует данные за один месяц из выборки за 5 лет.

Не все запросы получают такой впечатляющий прирост производительности, но 72 из 99 запросов TPC-DS положительно влияют на DPP.

Заключение

Spark прошел долгий путь от своей первоначальной базовой парадигмы: неспешного выполнения оптимизированного статического плана для статического набора данных.

Анализ статического набора данных был пересмотрен из-за потоковой передачи: команда Spark сначала создала довольно неуклюжий дизайн на основе RDD, прежде чем придумать лучшее решение с использованием DataFrames.

Часть статического плана подверглась сомнению со стороны SQL, а структура адаптивного выполнения запросов в некотором роде - это то, чем является структурированная потоковая передача для исходной потоковой библиотеки: элегантное решение, которым она должна была быть с самого начала.

Благодаря фреймворку AQE, DPP, усиленной поддержке графических процессоров и Kubernetes перспективы увеличения производительности теперь весьма многообещающие, поэтому мы и наблюдаем повсеместный переход на Spark 3.1

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru