Русский
Русский
English
Статистика
Реклама

Spark

Паша Финкельштейн о Big Data, Apache Spark и DevRel

15.06.2021 10:13:02 | Автор: admin

Паша Финкельштейн разработчик, серийный спикер, автор и ведущий нескольких подкастов. На конференции Java Meeting Point он сделает доклад Spark: let's touch it, на котором познакомит участников с миром больших данных.

В этом интервью Паша рассказал, что ждет участников конференции, как он начал заниматься DevRel-активностями и что, по его мнению, должно быть в хорошем докладе.


Расскажи, чем ты занимаешься?

Я Developer Advocate в JetBrains, занимаюсь темой Big Data и дата-инжиниринга. Я пытаюсь рассказывать людям о том, как устроен мир Big Data, что там интересного, какие есть инструменты.

О чем будет твой доклад на Java Meeting Point?

Я расскажу, что такое большие данные и как они отличаются от маленьких. Разберемся, как работает Apache Spark, как устроен его API, и поймем, что ничего принципиально сложного в этом нет.

Кому будет полезно посмотреть доклад?

Полезно будет Java или JVM-разработчикам ровня Middle+ и тем, кому интересно узнать, как работает Apache Spark. Мы научимся писать простенькие пайплайны на этом фреймворке. Станет понятно как, например, взять и написать пайплайн обработки данных на Apache Spark или проанализировать данные в датасете.

Ты много занимаешься DevRel-активностями: выступаешь с докладами, вел подкасты. Как ты начал этим заниматься?

Мой друг Слава Семушин уехал работать в Чехию. Я не особенно умею поддерживать отношения на расстоянии, но Слава очень классный, с ним хотелось продолжать общаться. Однажды он написал: Слушай, давай запишем подкаст. Решили попробовать, и так родился подкаст Паша+Слава.

Раз в пару недель мы созванивались и говорили на околотехнические темы. Потом к нам присоединился еще один Слава Артемьев, с которым я работал в компании Домклик. Очень удобно не пришлось менять название подкаста. Так возникла моя первая публичность.

Потом появилась идея выступить на сцене, и я сделал доклад на Joker. В последствии этот доклад я переделывал 3 раза и выступил с ним 14 раз.

С тех пор я подсел на этот крючок и сделал десятки докладов. Со временем мне стало намного проще их готовить. Я могу подготовить выступление по теме, в которой я не разбираюсь за неделю, а теме, в которой разбираюсь за 2-4 дня.

Можешь посоветовать, как научиться так же быстро готовить выступления?

Я думаю, это так не работает. У всех людей свой темп, и нужно себя любить и работать в комфортном режиме. И рассказывать про то, что хочется. Доклады из-под палки никогда не получатся хорошими, прожитыми.

Что должно быть в хорошем докладе?

Он должен быть живым и интересным, в нем должны быть эмоции, шутеечки. В нем не должно быть слишком много мыслей, но те, которые есть, не должны быть заезженными. Неплохо, если информация из доклада применима на практике. Людям становится интереснее, когда они понимают, как они смогут использовать эту информацию.

Альтернативный вариант, это если доклад полностью теоретический. На такой люди придут вывернуть мозги. Так тоже можно, но сложно. Я один раз сделал доклад про внутренности Kotlin, людям не зашло.

Я люблю странные форматы: лайвкодинги, демки, парные доклады. Мне кажется, что у меня получается неплохо. Но не любить какой-то из этих форматов нормально, выбирайте для себя тот, который больше нравится.

Какой формат будет у твоего доклада на Java Meeting Point?

Это будет демо, на котором Java или JVM-разработчик на примере не очень больших данных сможет посмотреть, как работают большие. Мы посмотрим 5-6 слайдов и перейдем к программированию.

Подробнее..

Обогащение данных что это и почему без него никак

15.04.2021 06:04:33 | Автор: admin

Задача обогащения данных напрямую связана с темой их обработки и анализа. Обогащение нужно для того, чтобы конечные потребители данных получали качественную и полную информацию.

Сам термин "обогащение данных" это перевод англоязычного Data enrichment, который проводит аналогию между данными и... ураном. Точно так же, как промышленники насыщают урановую руду, увеличивая долю изотопа 235U, чтобы её можно было использовать (хочется надеяться, в мирных целях), в процессе обогащения данных мы насыщаем их информацией.

Обогащение данных это процесс дополнения сырых данных той информацией, которая в исходном виде в них отсутствует, но необходима для качественного анализа.

Конечными потребителями такой информации чаще всего являются аналитики, но могут быть и нейронные сети, пользователи приложения или предоставляемого API.

Заметим, что обогащение данных термин широкий, и получать данные из внешних источников можно весьма разнообразными способами. Например, представим бизнес-процесс регистрации нового клиента. Если в данных этого клиента отсутствует e-mail, то взаимодействие с внешним источником в этом случае может быть буквально следующим: взяли телефон, позвонили клиенту, узнали его e-mail. Получается, этот процесс может включать в себя такие совершенно не автоматизированные операции, как обычный телефонный разговор со всеми этими эс как доллар, "а" как русская. Это тоже можно считать обогащением данных, но данный пласт проблем в этой статье мы затрагивать не будем. Нас интересуют вполне конкретные случаи, когда данные хранятся в базе данных и именно БД служит внешним источником данных для обогащения.

Источниками сырых исходных данных могут быть:

  • Система кассовых платежей, которая отслеживает транзакции и отправляет в хранилище данных: время, id торговой точки, количество и стоимость купленного товара и т.д.

  • Логистическая система, которая отслеживает движение товара: id транспорта и его водителя, gps-координаты в заданные моменты времени, статус, маршрут и т.д.

  • Телеметрия с датчиков интернета вещей.

  • Система мониторинга инфраструктуры.

Все эти примеры объединяет то, что в них регулярно передаются относительно небольшие пакеты данных, каждый из которых имеет идентификатор. То есть каждый источник знает свой идентификатор и передает его вместе с пакетом данных. По этому идентификатору в хранилище данных можно получить справочную информацию об источнике.

Таким образом, типовую задачу можно сформулировать следующим образом: на вход поступают данные вида <таймстамп, идентификатор источника, содержимое пакета>, а конечному потребителю требуется соединить (в смысле join) справочную информацию об источнике из хранилища данных с идентификатором источника из сырых данных.

Как обогащаем данные

Один из вариантов решения задачи сопоставления данных по расписанию запускать скрипты, которые подготовят подходящие для аналитиков представления. Например, это можно делать с помощью Apache Airflow. Но в любом случае потребуется ждать и сохранять данные из внешних источников.

Другой вариант использовать инструменты потоковой обработки данных. В этом случае нужно определиться, где же всё-таки хранить справочную информацию и что будет являться Single Source of Truth (SSOT), или единым источником истины для справочных данных. Если хранить справочные данные в хранилище, то к нему придется каждый раз обращаться, и это может быть накладным, так как к сетевым издержкам добавится ещё и обращение к диску. Вероятно, оптимальнее хранить справочную информацию в оперативной памяти или другом горячем хранилище, например, в Tarantool.

Мы, очевидно, отдаём предпочтению именно последнему варианту, и наша схема приобретает завершенный вид.

Она отличается от первой иллюстрации процесса обогащения данных тем, что появилась конкретика: процесс обогащения происходит на лету с помощью потокового обработчика, справочные данные хранятся в горячем хранилище (оперативной памяти).

Давайте посмотрим, какие плюсы и минусы мы получим, если возьмем за основу данную схему.

Преимущества потокового обогащения данных:

  • В хранилище всегда актуальные данные, которые готовы к использованию. В нашем динамичном мире зачастую необходимо иметь доступ к реалтайм-данным и сразу их использовать, не тратя время на дополнительную подготовку, распаковку и т.д. Потоковый обработчик, можно сказать, приносит их нам на блюдечке.

  • Схема хорошо масштабируется, если подобрать соответствующие технологии. С правильными инструментами можно масштабироваться под требуемую пропускную способность платформы и под объемы справочной информации.

Недостатки:

  • Источник истины для справочной информации вынесен из надёжного дискового хранилища данных в менее надёжную оперативную память. При сбое это грозит потерей всех несинхронизированных данных.

  • Требуется гораздо больше памяти как оперативной, так и постоянной. В целом, для любой системы аналитики желательно иметь побольше дискового пространства и оперативной памяти, но здесь это становится обязательным условием.

Какие технологии используем

Что касается технологий, то мы выбираем более-менее устоявшиеся на рынке решения с открытым исходным кодом и хорошей документацией, учитывая совместимость.

Выбранный стек технологий:

  • Apache Kafka источник данных и брокер очередей;

  • Apache Spark потоковый обработчик данных;

  • Apache Ignite горячее хранение справочной информации;

  • Greenplum и Apache Hadoop хранилище данных.

В выборе Greenplum мы немного поступились совместимостью. Связать его со Spark не совсем тривиальная задача, для этого не существует стандартного open source коннектора (подробнее рассказывали в этой статье). Поэтому мы разрабатываем такой коннектор сами.

В остальном набор достаточно стандартный. Hadoop держим на всякий случай, например, чтобы использовать тот же Hive поверх HDFS вместо Greenplum.

Каждое из этих решений масштабируемо. Поэтому мы можем построить любую архитектуру, в зависимости от объемов справочной информации и ожидаемой пропускной способности.

Итак, вот окончательные очертания нашей схемы обогащения данных.

Версии, которые используем:

  • Apache Spark 2.4.6.

  • Apache Ignite 2.8.1.

  • Apache Kafka 2.4.1.

  • Greenplum 6.9.0.

  • Apache Hadoop 2.10.1.

Обращаю на это внимание, потому что от версий выбранных компонент зависит совместимость и возможность этих компонент взаимодействовать. Например, если нужно, чтобы Apache Spark писал данные в базу данных Greenplum, это может стать отдельной проблемой, и версии также будут важны. Наш коннектор на текущий момент работает именно со Spark v2, для Spark v3 его нужно отдельно дорабатывать.

Что в результате

Если объединять данные, применяя предложенную схему обогащения, в дальнейшем аналитикам не потребуется каждый раз делать JOIN-запрос, что сэкономит как ценное время людей, так и машинные ресурсы.

Но есть и ограничения, связанные с тем, что source of truth, по сути, находится в оперативной памяти. Поэтому при редактировании справочной информации надо напрямую работать с Ignite через интерфейсы самого Ignite. Кроме этого, нужен аккуратный механизм синхронизации, чтобы кэш Ignite был персистентным. У Ignite есть собственный механизм для записи на диск, но все же Ignite больше ориентирован на работу в ОЗУ, поэтому для резервирования справочной информации в хранилище данных лучше использовать что-нибудь специально для этого предназначенное, например, Airflow.

Полезные ссылки, чтобы понять, как строить взаимодействие между Spark и Ignite и насколько это уже проработанный механизм:

Пользуясь случаем: мы расширяем отдел систем обработки данных. Если вам интересно заниматься с подобного рода задачами, пишите мне в личку, в телеграм @its_ihoziainov или на job@itsumma.ru с пометкой data engineering.

Если вы уже сталкивались с такого рода задачами, делитесь мнением в комментариях, задавайте вопросы они, как известно, очень помогают находить лучшие решения. Спасибо.

Подробнее..

Перевод Экономичная конфигурация исполнителей Apache Spark

20.11.2020 16:08:20 | Автор: admin

Привет, Хабр! В преддверии старта курса "Экосистема Hadoop, Spark, Hive" подготовили для вас перевод полезной статьи. А также предлагаем посмотреть бесплатную запись демо-урока по теме: "Spark 3.0: Что нового?".


Ищем наиболее оптимальную конфигурацию исполнителей для вашего узла

Количество ЦП на узел

Первый этап в определении оптимальной конфигурации исполнителей (executor) - это выяснить, сколько фактических ЦП (т.е. не виртуальных ЦП) доступно на узлах (node) в вашем кластер. Для этого вам необходимо выяснить, какой тип инстанса EC2 использует ваш кластер. В этой статье мы будем использовать r5.4xlarge, который, согласно прейскуранту на инстансы AWS EC2, насчитывает 16 процессоров.

Когда мы запускаем наши задачи (job), нам нужно зарезервировать один процессор для операционной системы и системы управления кластерами (Cluster Manager). Поэтому мы не хотели бы задействовать под задачу сразу все 16 ЦП. Таким образом, когда Spark производит вычисления, на каждом узле у нас остается только 15 доступных для аллоцирования ЦП.

Количество ЦП на исполнителя

Теперь, когда мы узнали, сколько ЦП доступно для использования на каждом узле, нам нужно определить, сколько ядер (core) Spark мы хотим назначить каждому исполнителю. С помощью базовой математики (X * Y = 15), мы можем посчитать, что существует четыре различных комбинации ядер и исполнителей, которые могут подойти для нашего случая с 15 ядрам Spark на узел:

Возможные конфигурации исполнителейВозможные конфигурации исполнителей

Давайте исследуем целесообразность каждой из этих конфигураций.

Один исполнитель с пятнадцатью ядрами

Самое очевидное решение, которое приходит на ум, - создать одного исполнителя с 15 ядрами. Проблема с большими жирными исполнителями, подобными этому, заключается в том, что исполнитель, поддерживающий такое количество ядер, обычно будет иметь настолько большой пул памяти (64 ГБ+), что задержки на сборку мусора будут неоправданно замедлять вашу работу. Поэтому мы сразу исключаем эту конфигурацию.

Пятнадцать одноядерных исполнителей

Следующее очевидное решение, которое приходит на ум создать 15 исполнителей, каждый из которых имеет только одно ядро. Проблема здесь в том, что одноядерные исполнители неэффективны, потому что они не используют преимуществ параллелизма, которые обеспечивают несколько ядер внутри одного исполнителя. Кроме того, найти оптимальный объем служебной памяти для одноядерных исполнителей может быть достаточно сложно. Давайте немного поговорим о накладных расходах памяти.

Накладные расходы памяти для исполнителя по умолчанию составляют 10% от размера выделенной вашему исполнителю памяти или 384 MB (в зависимости от того, что больше). Однако на некоторых big data платформах, таких как Qubole, накладные расходы зафиксированы на определенном значении по умолчанию, вне зависимости от размера вашего исполнителя. Вы можете проверить ваш показатель накладных расходов, перейдя во вкладку Environments в логе Spark и выполнив поиск параметра spark.executor.memoryOverhead.

Накладные расходы памяти по умолчанию в Spark будут очень маленьким, что результирует в проблемах с вашими задачами. С другой стороны, фиксированное значение накладных расходов для всех исполнителей приведет к слишком большому объему служебной памяти и, следовательно, оставит меньше места самим исполнителям. Выверить идеальный размер служебной памяти сложно, поэтому это еще одна причина, по которой одноядерный исполнитель нам не подходит.

Пять исполнителей с тремя ядрами или три исполнителя с пятью ядрами

Итак, у нас осталось два варианта. Большинство руководств по настройке Spark сходятся во мнении, что 5 ядер на исполнителя это оптимальное количество ядер с точки зрения параллельной обработки. И я тоже пришел к выводу, что это правда, в том числе, на основании моих собственных изысканий в оптимизации. Еще одно преимущество использования пятиядерных исполнителей по сравнению с трехъядерными заключается в том, что меньшее количество исполнителей на узел требует меньшее количество служебной памяти. Поэтому мы остановимся на пятиядерных исполнителях, чтобы минимизировать накладные расходы памяти на узле и максимизировать параллелизм внутри каждого исполнителя.

--executor-cores 5

Объем памяти на узел

Наш следующий шаг определить, сколько памяти назначить каждому исполнителю. Прежде чем мы сможем это сделать, мы должны определить, сколько физической памяти на нашем узле нам вообще доступно. Это важно, потому что физическая память это жесткое ограничение для ваших исполнителей. Если вы знаете, какой инстанс EC2 используете, значит, вы знаете и общий объем памяти, доступной на узле. Про наш инстанс r5.4xlarge AWS сообщает, что у него 128 ГБ доступной памяти.

Однако доступными для использования вашими исполнителями будут не все 128 ГБ, так как память нужно будет выделить также и для вашей системы управления кластерами. На рисунке ниже показано, где в YARN искать сколько памяти доступно для использования после того, как память была выделена для системы управления кластерами.

Мы видим, что на узлах этого кластера исполнителям доступно 112 ГБ.

Объем памяти на исполнителя

Если мы хотим, чтобы три исполнителя использовали 112 ГБ доступной памяти, то нам следует определить оптимальный размер памяти для каждого исполнителя. Чтобы вычислить объем памяти доступной исполнителю, мы попросту делим доступную память на 3. Затем мы вычитаем накладные расходы на память и округляем до ближайшего целого числа.

Если служебная память у вас фиксированная (как в случае с Qubole), вы будете использовать эту формулу. (112/3) = 372,3 = 34,7 = 34.

Если вы используете дефолтный метод Spark для расчета накладных расходов на память, вы будете использовать эту формулу. (112/3) = 37 / 1,1 = 33,6 = 33.

В оставшейся части этой статьи мы будем использовать фиксированный объем накладных расходов памяти для Qubole.

--executor-memory 34G

Чтобы по настоящему начать экономить, опытным тюнерам Spark необходим следующий сдвиг в парадигме. Я рекомендую вам в ваших исполнителях для всех задач использовать фиксированные размер памяти и количество ядер. Понимаю, что использование фиксированной конфигурации исполнителей для большинства задач Spark кажется противоречащим надлежащей практике тюнинга Spark. Даже если вы настроены скептически, я прошу вас опробовать эту стратегию, чтобы убедиться, что она работает. Выясните, как рассчитать затраты на выполнение вашей задачи, как описано в Части 2, а затем проверьте это на практике. Считаю, что если вы это сделаете, вы обнаружите, что единственный способ добиться эффективной экономичности облачных затрат это использовать фиксированные размеры памяти для ваших исполнителей, которые оптимально используют ЦП.

С учетом вышесказанного, если при использовании эффективного объема памяти для ваших исполнителей у вас остается много неиспользуемой памяти, подумайте о переносе вашего процесса на другой тип инстанса EC2, у которого меньше памяти на ЦП узла. Этот инстанс обойдется вам дешевле и, следовательно, поможет снизить стоимость выполнения вашей задачи.

Наконец, будут моменты, когда эта экономичная конфигурация не будет обеспечивать достаточную пропускную способность для ваших данных в вашем исполнителе. В примерах, приведенных во второй части, было несколько задач, в которых мне приходилось уходить от использования оптимального размера памяти, потому что нагрузка на память была максимальной во время всего выполнения задачи.

В этом руководстве я все же рекомендую вам начинать с оптимального размера памяти при переносе ваших задач. Если при оптимальной конфигурации исполнителей у вас возникают ошибки памяти, я поделюсь конфигурациями, которые избегают этих ошибок позже в Части 5.

Количество исполнителей на задачу

Теперь, когда мы определились с конфигурацией исполнителей, мы готовы настроить количество исполнителей, которые мы хотим использовать для нашей задачи. Помните, что наша цель - убедиться, что используются все 15 доступных ЦП на узел, что означает, что мы хотим, чтобы каждому узлу было назначено по три исполнителя. Если мы установим количество наших исполнителей кратное 3, мы добьемся своей цели.

Однако с такой конфигурацией есть одна проблема. Нам также нужно назначить драйвер для обработки всех исполнителей в узле. Если мы используем количество исполнителей, кратное 3, то наш одноядерный драйвер будет размещен в своем собственном 16-ядерном узле, что означает, что аж 14 ядер на этом последнем узле не будут использоваться в течение всего выполнения задачи. Это не очень хорошая практика использования облака!

Мысль, которую я хочу здесь донести, заключается в том, что идеальное количество исполнителей должно быть кратным 3 минус один исполнитель, чтобы освободить место для нашего драйвера.

--num-executors (3x - 1)

В Части 4 я дам вам рекомендации о том, сколько исполнителей вы должны использовать при переносе существующей задачи в экономичную конфигурацию исполнителей.

Объем памяти на драйвер

Обычной практикой для data-инженеров является выделение относительно небольшого размера памяти под драйвер по сравнению с исполнителями. Однако AWS на самом деле рекомендует устанавливать размер памяти вашего драйвера таким же, как и у ваших исполнителей. Я обнаружил, что это также очень помогает при оптимизации затрат.

--driver-memory 34G

В редких случаях могут возникать ситуации, когда вам нужен драйвер, память которого больше, чем у исполнителя. В таких случаях устанавливайте размер памяти драйвера в 2 раза больше памяти исполнителя, а затем используйте формулу (3x - 2), чтобы определить количество исполнителей для вашей задачи.

Количество ядер на драйвер

По умолчанию количество ядер на драйвер равно одному. Однако я обнаружил, что задачи, использующие более 500 ядер Spark, могут повысить производительность, если количество ядер на драйвер установлено в соответствии с количеством ядер на исполнителя. Однако не стоит сразу менять дефолтное количество ядер в драйвере. Просто протестируйте это на своих наиболее крупных задачах, чтобы увидеть, ощутите ли вы прирост производительности.

--driver-cores 5

Конфигурация универсальна?

Таким образом, конфигурация исполнителей, которую я рекомендую для узла с 16 процессорами и 128 ГБ памяти, будет выглядеть следующим образом.

--driver-memory 34G --executor-memory 34G --num-executors (3x - 1) --executor-cores 5

Но помните:

Не существует универсальных конфигураций просто продолжайте экспериментировать и рано или поздно вы найдете конфигурацию, идеально подходящую для ваших задач.

Как я уже упоминал выше, эта конфигурация может выглядеть не подходящей для ваших конкретных нужд. Я рекомендую вам использовать эту конфигурацию в качестве отправной точки в процессе оптимизации затрат. Если в этой конфигурации у вас возникают проблемы с памятью, в следующих частях этой серии я порекомендую вам конфигурации, которые позволят вам решить большинство проблем с памятью, возникающих при переходе на экономичные конфигурации.

Поскольку конфигурация узла, используемая в этой статье, достаточно распространена в Expedia Group , я буду ссылаться на нее в остальной части серии. Если ваши узлы имеют другой размер, вам следует использовать метод, который я изложил здесь, чтобы определить идеальную конфигурацию.

Теперь, когда у вас есть оптимальная экономичная конфигурация исполнителей, вы можете попробовать перенести на нее текущие задачи. Но какие задачи вам следует перенести в первую очередь? И сколько исполнителей вы должны запустить с этой новой конфигурацией? А что произойдет, если задача с оптимизированной стоимостью выполняется дольше, чем неоптимизированная задача? И уместно ли когда-либо избыточное использование ЦП? Я отвечу на эти вопросы в Части 4: Как перенести существующие задачи Apache Spark на экономичные конфигурации исполнителей.


Подробнее о курсе "Экосистема Hadoop, Spark, Hive" можно узнать здесь. Также можно посмотреть запись открытого урока "Spark 3.0: что нового?".

Читать ещё:

Подробнее..

Перевод Секреты производительности Spark, или Почему важна компиляция запросов

24.11.2020 18:14:36 | Автор: admin

Для будущих студентов курсов "Data Engineer" и "Экосистема Hadoop, Spark, Hive" подготовили еще один перевод полезной статьи.


Criteo это компания, работа которой основана на данных. Каждый день через наши системы проходят десятки терабайт новых данных для обучения моделей рекомендаций, обрабатывающих запросы в масштабах всего Интернета. Spark наше основное средство обработки больших данных. Это мощный и гибкий инструмент, однако он отличается довольно высокой сложностью в освоении, а чтобы пользоваться им эффективно, зачастую требуется читать исходный код платформы.

Быстрая обработка больших данных имеет критическое значение для нашего бизнеса:

  • мы часто обновляем наши модели, повышая их производительность для наших клиентов;

  • мы быстро выводим новые продукты на базе машинного обучения на рынок за счет того, что можем быстро выполнять итерации;

  • от скорости обработки данных зависят затраты на инфраструктуру.

В этой статье я расскажу о написании эффективного кода Spark и на примерах продемонстрирую распространенные подводные камни. Я покажу, что в большинстве случаев Spark SQL (Datasets) следует отдавать предпочтение перед Spark Core API (RDD), и если сделать правильный выбор, можно повысить производительность обработки больших данных в 210 раз, а это очень значимо.

Конфигурация для экспериментов

Spark 2.4.6, Macbook Pro 2017 с процессором Intel Core i7 с частотой 3,5ГГц

Измерения всегда производятся на разогретой виртуальной Java-машине (выполняется 100 прогонов кода, и берется среднее значение за последние 90 прогонов). Приведенный в этой статье код написан на Scala, но ее выводы должны быть справедливыми и для Python.

Заблуждения, связанные с обработкой больших данных

Существует распространенное мнение, что в процессах обработки больших данных есть два основных узких места, влияющих на производительность:

  • перетасовка данных, поскольку для ее выполнения требуется отправлять данные по сети;

  • дисковый ввод-вывод, поскольку доступ к данным на диске всегда намного медленнее, чем доступ к данным в ОЗУ.

Эти представления имеют под собой исторические основания в 2006 году, когда впервые появилась библиотека Hadoop, обычные жесткие диски были медленными и ненадежными, а основной платформой для обработки больших данных была MapReduce. Именно медленная работа жестких дисков и подстегнула разработку таких средств обработки в памяти, как Spark. С того времени характеристики аппаратного обеспечения значительно улучшились.

В 2015 году висследовании Кей Остерхаут (Kay Ousterhout) и др. были проанализированы узкие места в заданиях Spark, и в результате выяснилось, что скорость их выполнения в большей степени определяется операциями, загружающими ЦП, а не вводом-выводом и передачей данных по сети. В частности, авторами этой научной работы был выполнен широкий спектр запросов к трем тестовым наборам данных, включаяTPC-DS, и было определено, что:

  • если бы пропускная способность сети была безграничной, время выполнения заданий можно было бы сократить на 2% (медианное значение);

  • если бы пропускная способность дискового ввода-вывода была безграничной, время выполнения стандартного аналитического процесса можно было бы сократить на 19% (медианное значение).

Весьма неожиданный результат! Получается, что дисковый ввод-вывод оказывает намного большее влияние на производительность, чем передача данных по сети. Этому есть несколько причин:

  • Spark использует дисковый ввод-вывод не только при считывании входного набора данных и записи результата, но и в ходе выполнения заданий для кэширования и переноса на диск данных, которые не умещаются в ОЗУ.

  • При выполнении аналитических заданий часто требуется производить агрегацию, поэтому объем данных, передаваемых по сети, обычно меньше, чем объем данных, которые первоначально считываются с диска.

Интересно, что специалисты Databricks примерно в 2016 году пришли к таким же заключениям, что заставило их переориентировать вектор развития Spark на оптимизацию использования процессора. Результатом стало внедрение поддержки SQL, а также API DataFrames и позднее Datasets.

Насколько быстро работает Spark?

Давайте рассмотрим простую задачу посчитаем наивным методом четные числа от 0 до 10. Для выполнения такого задания Spark, в принципе, не требуется, поэтому для начала напишем простую программу на Scala:

var res: Long = 0Lvar i: Long  = 0Lwhile (i < 1000L * 1000 * 1000) {  if (i % 2 == 0) res += 1  i += 1L}

Листинг1. Наивный подсчет

А теперь давайте также вычислим этот же результат с помощью Spark RDD и Spark Datasets. Чтобы эксперимент был честным, я запускаю Spark в локальном[1] режиме:

val res = spark.sparkContext  .range(0L, 1000L * 1000 * 1000)  .filter(_ % 2 == 0)  .count()

Листинг2. Подсчет с помощью RDD

val res = spark.range(1000L * 1000 * 1000)  .filter(col("id") % 2 === 0)  .select(count(col("id")))  .first().getAs[Long](0)

Листинг3. Подсчет с помощью Datasets

Время выполнения всех фрагментов кода приведено ниже. Неудивительно, что написанный вручную код является самым эффективным решением. Удивительно же то, что RDD в пять раз медленнее, тогда как у Datasets время вычисления почти такое же, как у написанного вручную кода.

Парадокс Datasets

Парадокс: API-интерфейс Datasets построен на основе RDD, однако работает намного быстрее, почти так же быстро, как код, написанный вручную для конкретной задачи. Как такое вообще возможно? Дело в новой модели выполнения.

Прошлое модель Volcano

Код, написанный с использованием RDD, выполняется с помощью модели выполнения Volcano. На практике это означает, что каждый RDD следует стандартному интерфейсу:

  • знает свой родительский RDD;

  • предоставляет посредством методаcomputeдоступ к итератору Iterator[T], который перебирает элементы данного RDD (он является private и должен использоваться только разработчиками Spark).

abstract class RDD[T: ClassTag]def compute(): Iterator[T]

Листинг4. RDD.scala

С учетом этих свойств упрощенная версия реализации функции подсчета для RDD, которая игнорирует разбиение, выглядит вот так:

def pseudo_rdd_count(rdd: RDD[T]): Long = {  val iter = rdd.compute  var result = 0  while (iter.hasNext) result += 1  result}

Листинг5. Псевдокод для действия подсчета на основе RDD

Почему этот код работает значительно медленнее, чем написанный вручную код, который приведен в листинге 1? Есть несколько причин:

  • Вызовы итераторов виртуальной функцией: вызовы Iterator.next() несут дополнительную нагрузку по сравнению с функциями, не являющимися виртуальными, которые могут выполняться компилятором илиJIT как встроенные (inline).

  • Отсутствие оптимизации на уровне ЦП: виртуальная Java-машина и JIT не могут оптимизировать байт-код, образуемый листингом 5, так же хорошо, как байт-код, получаемый при использовании листинга 1. В частности, написанный вручную код позволяет виртуальной Java-машине и JIT хранить промежуточные результаты вычислений в регистре ЦП, а не помещать их в основную память.

Настоящее формирование кода всего этапа

Код, написанный с помощью Spark SQL, выполняется не так, как код, написанный с использованием RDD. Когда запускается действие, Spark генерирует код, который сворачивает несколько трансформаций данных в одну функцию. Этот процесс называется формированием кода всего этапа (Whole-Stage Code Generation). Spark пытается воспроизвести процесс написания специального кода для конкретной задачи, в котором не используются вызовы виртуальных функций. Такой код может выполняться JVM/JIT более эффективно. На самом деле Spark генерирует довольно много кода, см., например, код Spark для листинга 3.

Технически Spark только формирует высокоуровневый код, а генерация байт-кода выполняется компилятором Janino. Именно это и делает Spark SQL настолько быстрым по сравнению с RDD.

Эффективное использование Spark

Сегодня в Spark есть 3 API-интерфейса Scala/Java: RDD, Datasets и DataFrames (который теперь объединен с Datasets). RDD все еще широко применяется в Spark в частности, из-за того, что этот API используется большинством созданных ранее заданий, и перспектива продолжать в том же духе весьма заманчива. Однако, как показывают тесты, переход на API-интерфейс Datasets может дать громадный прирост производительности за счет оптимизированного использования ЦП.

Неправильный подход классический способ

Самая распространенная проблема, с которой я сталкивался при использовании Spark SQL, это явное переключение на API RDD. Причина состоит в том, что программисту зачастую проще сформулировать вычисление в терминах объектов Java, чем с помощью ограниченного языка Spark SQL:

val res = spark.range(1000L * 1000 * 1000)    .rdd    .filter(_ %2 == 0)    .count()

Листинг6. Переключение с Dataset на RDD

Этот код выполняется в течение 43 секунд вместо исходных 2,1 секунды, при этом делая абсолютно то же самое. Явное переключение на RDD останавливает формирование кода всего этапа и запускает преобразование элементов наборов данных из примитивных типов в объекты Java, что оказывается очень затратным. Если мы сравним схемы этапов выполнения кода из листингов 3 и 6 (см. ниже), то увидим, что во втором случае появляется дополнительный этап.

Рисунок 1. Визуальные представления этапов для листинга 3 (схема a) и листинга 6 (схема b)Рисунок 1. Визуальные представления этапов для листинга 3 (схема a) и листинга 6 (схема b)

Неправильный подход изысканный способ

Производительность Spark SQL является на удивление хрупкой. Это незначительное изменение приводит к увеличению времени выполнения запроса в три раза (до 6 секунд):

val res = spark  .range(1000L * 1000 * 1000)  .filter(x => x % 2 == 0) // note that the condition changed  .select(count(col("id")))   .first()  .getAs[Long](0)

Листинг7. Замена выражения Spark SQL функцией Scala

Spark не способен генерировать эффективный код для условия в фильтре. Условие является анонимной функцией Scala, а не выражением Spark SQL, и Spark выполнит десериализацию каждой записи из оптимизированного внутреннего представления, чтобы вызвать эту функцию. Причем вот что примечательно это изменение никак не сказывается на визуальном представлении этапов (рис. 1a), поэтому его невозможно обнаружить, анализируя направленный ациклический граф (DAG) задания в пользовательском интерфейсе Spark.

Высокая производительность Spark SQL обеспечивается за счет ограничения круга доступных операций чем-то все равно приходится жертвовать! Чтобы получить максимальную производительность, нужно использовать преобразования, которые работают со столбцами: используйтеfilter(condition: Column)вместоfilter(T => Boolean) иselect()вместоmap(). При этом Spark не придется перестраивать объект, представленный одной строкой набора данных (Dataset). И, разумеется, избегайте переключения на RDD.

Заключение и итоговые замечания

Приведенные в этой статье простые примеры демонстрируют, что большая часть времени выполнения заданий обработки больших данных не тратится на полезную работу. Хорошим решением этой проблемы является компиляция запросов, которая возможна с использованием Spark SQL и обеспечивает более эффективное использование современного аппаратного обеспечения. Последние исследования свидетельствуют, что использование эффективных запросов для стандартных процессов обработки больших данных важнее, чем оптимизация использования сети и дискового ввода-вывода.

Правильное применение компиляции запросов может сократить время обработки в 210 раз, а это означает ускорение экспериментов, снижение затрат на инфраструктуру и громадное удовольствие от элегантного выполнения своей работы!

Образцы кода из этой статьи можно найти здесь. С помощью этого репозитория можно анализировать производительность разных запросов Spark.

Использованные материалы

  1. Ousterhout, Kay, et al. Making sense of performance in data analytics frameworks (Анализ производительности платформ анализа данных).12-й симпозиум {USENIX} по проектированию и реализации сетевых систем ({NSDI} 15). 2015.

  2. http://www.tpc.org/tpcds/

  3. https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

4.https://janino-compiler.github.io/janino/

5.http://people.csail.mit.edu/matei/papers/2015/sigmodsparksql.pdf

6.https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html


Узнать подробнее о курсах "Data Engineer" и "Экосистема Hadoop, Spark, Hive".

Подробнее..

Big Data Tools EAP 10 SSH-туннели, фильтрация приложений, пользовательские модули и многое другое

01.09.2020 18:20:28 | Автор: admin

Только что вышла очередная версия плагина Big Data Tools плагина для IntelliJ IDEA Ultimate, DataGrip и PyCharm, который обеспечивает интеграцию с Hadoop и Spark, позволяет редактировать и запускать интерактивные блокноты в Zeppelin.


Основная задача этого релиза поправить как можно больше проблем и улучшить плагин изнутри, но два важных улучшения видно невооруженным глазом:


  • соединяться с Hadoop и Spark теперь можно через SSH-туннели, создающиеся парой щелчков мыши;
  • мониторинг Hadoop может ограничивать объем данных, загружаемых при просмотре списка приложений.


SSH-туннели


Зачастую нужный нам сервер недоступен напрямую, например если он находится внутри защищенного корпоративного контура или закрыт специальными правилами на файерволе. Чтобы пробраться внутрь, можно использовать какой-то туннель или VPN. Самый простой из туннелей, который всегда под руками, это SSH.


Проложить туннель можно одной-единственной консольной командой:


ssh -f -N -L 1005:127.0.0.1:8080 user@spark.server

Немного автоматизировать процесс поможет файл ~/.ssh/config, в который ты один раз сохраняешь параметры соединения и потом используешь:


Host spark    HostName spark.server    IdentityFile ~/.ssh/spark.server.key    LocalForward 1005 127.0.0.1:8080    User user

Теперь достаточно написать в консоли ssh -f -N spark и туннель поднимется сам по себе, без вписывания IP-адресов. Удобно.


Но с этими способами есть две очевидных проблемы.


Во-первых, у кого-то может возникнуть масса вопросов. Что такое -f -N -L? Какой порт писать слева, а какой справа? Как выбирать адреса для соединений? Для всех, кроме профессиональных системных администраторов, такие мучения не кажутся полезными.


Во-вторых, мы программируем не в эмуляторе терминала, а в IDE. Всё, что вы запустили в консоли, работает глобально по отношению ко всей операционной системе, а в IDE хотелось бы для каждого проекта иметь свой собственный набор туннелей.


К счастью, начиная с этой версии в Big Data Tools есть возможность создавать туннели без ручного управления SSH-соединениями.



На скриншоте видно, что можно не только вручную указать все адреса и порты, но и подключить заранее подготовленный файл конфигурации SSH.


Опция Enable tunneling работает для следующих типов соединений:


  • Zeppelin
  • HDFS
  • Hadoop
  • Spark Monitoring

Важно отметить, что под капотом у нее все то же самое, что делает SSH. Не стоит ждать особой магии: например, если ты пытаешься открыть туннель на локальный порт, который уже занят другим приложением или туннелем, то случится ошибка.


Это довольно полезная опция, которая облегчает жизнь в большинстве повседневных ситуаций. Если же тебе нужно сделать что-то действительно сложное и нестандартное, то можно по старинке вручную использовать SSH, VPN или другие способы работы с сетью.


Вся работа велась в рамках задачи BDIDE-1063 на нашем YouTrack.


Управляемые ограничения на отображение приложений


Люди делятся на тех, у кого на странице Spark Monitoring всего парочка приложений, и тех, у кого их сотни.



Загрузка огромного списка приложений может занимать десятки минут, и все это время о состоянии сервера можно только гадать.


В этой версии Big Data Tools вы можете существенно ограничить время ожидания, если вручную выберете диапазон загружаемых данных. Например, можно вызвать диалоговое окно редактирования диапазона дат и вручную выбрать только сегодняшний день.


Эта опция значительно экономит время тех, кто работает с большими продакшенами.


Работа велась в рамках задачи BDIDE-1077.


Подключение модулей в зависимости Zeppelin


У многих в Zeppelin используются зависимости на собственные JAR-файлы. Big Data Tools полезно знать о таких файлах, чтобы в IDE нормально работало автодополнение и другие функции.


При синхронизации с Zeppelin, Big Data Tools пытается получить все такие файлы. Но, по разным причинам, это не всегда возможно. Чтобы Big Data Tools узнал о существовании таких пропущенных файлов, необходимо вручную добавить их в IntelliJ IDEA.


Раньше в качестве зависимостей можно было использовать только артефакты из Maven либо отдельные JAR-файлы. Это не всегда удобно, ведь для получения этих артефактов и файлов нужно или их скачать откуда-то, или собрать весь проект.


Теперь любой модуль текущего проекта тоже можно использовать в качестве зависимости. Такие зависимости попадают в таблицу "User dependencies":



Работа велась в рамках задачи BDIDE-1087.


Множество свежих исправлений


Big Data Tools молодой, активно развивающийся проект. В таких условиях неизбежно появление проблем, которые мы стараемся оперативно устранять. В EAP 10 вошло множество исправлений, значительная часть которых посвящена повышению удобства работы со Spark Monitoring.


  • [BDIDE-1078] Раньше при сворачивании ячеек их заголовки просто не отображались. Теперь они правильно отображаются, но редактировать их из Big Data Tools все еще нельзя это тема для будущих исправлений.
  • [BDIDE-1137] Удаление соединения Spark Monitoring из Hadoop приводило к ошибке IncorrectOperationException.
  • [BDIDE-570] В таблице Jobs в Spark Monitoring у выделенной задачи могло исчезать выделение.
  • [BDIDE-706] При обновлении дерева задач в Spark Monitoring выделенная задача теряла выделение.
  • [BDIDE-737] Если компьютер заснул и вышел из сна, получение информации о приложении в Spark Monitoring требовало перезагрузки IDE.
  • [BDIDE-1049] Перезапуск IDE мог приводить к появлению ошибки DisposalException.
  • [BDIDE-1060] Перезапуск IDE с открытым Variable View (функциональность ZTools) мог привести к ошибке IllegalArgumentException.
  • [BDIDE-1066] Редактирование свойств неактивного соединения в Spark Monitoring приводило к его самопроизвольному включению на панели.
  • [BDIDE-1091] Удаление только что открытого соединения с Zeppelin приводило к ошибке ConcurrentModificationException.
  • [BDIDE-1092] Кнопка Refresh могла не обновлять задачи в Spark Monitoring.
  • [BDIDE-1093] После перезапуска Spark в Spark Monitoring отображалась ошибка подключения.
  • [BDIDE-1094] При отображении ошибки соединения в Spark Monitoring нельзя было изменить размеры окна с ошибкой.
  • [BDIDE-1099] В Spark Monitoring на вкладке SQL вместо сообщения "Loading" могло неверно отображаться сообщение "Empty List".
  • [BDIDE-1119] В Spark Monitoring свойства SQL продолжали отображаться даже при сбросе соединения или перезагрузке интерпретатора.
  • [BDIDE-1130] Если в списке приложений в Spark Monitoring фильтр скрывал вообще все приложения, возникала ошибка IndexOutOfBoundsException.
  • [BDIDE-1133] Таблицы отображали только один диапазон данных, даже если в свойствах таблицы было указано сразу несколько диапазонов.
  • [BDIDE-406] Раньше при соединении с некоторыми экземплярами Zeppelin отображалась ошибка синхронизации. В рамках этого же тикета включена поддержка Zeppelin 0.9, в частности collaborative mode.
  • [BDIDE-746] При отсутствии выбранного приложения или задачи в Spark Monitoring на странице с детализацией отображалась ошибка соединения.
  • [BDIDE-769] При переключении между различными соединениями к Spark Monitoring могла не отображаться информация об этом соединении.
  • [BDIDE-893] Время от времени список задач в Spark Monitoring исчезал, и вместо него отображалось некорректное сообщение о фильтрации.
  • [BDIDE-1010] После запуска ячейки, статус "Ready" отображался со слишком большой задержкой.
  • [BDIDE-1013] Локальные блокноты в Zeppelin раньше имели проблемы с переподключением.
  • [BDIDE-1020] В результате комбинации нескольких факторов могло сбиваться форматирование кода на SQL.
  • [BDIDE-1023] Раньше не отображался промежуточный вывод исполняющихся ячеек, теперь отображается под ними.
  • [BDIDE-1041] Непустые файлы на HDFS отображались как пустые, из-за чего их можно было случайно сохранить и стереть данные.
  • [BDIDE-1061] Исправлен баг в отображении отображением SQL-задач. Раньше было неясно, является ли сервер Spark привязанным к задаче или это History Server.
  • [BDIDE-1068] Временами ссылка на задачу в Spark терялась и появлялась вновь.
  • [BDIDE-1072], [BDIDE-838] Раньше в панели Big Data Tools не отображалась ошибка соединения с Hadoop и Spark.
  • [BDIDE-1083] Если при закрытии IDE работала хоть одна задача с индикатором прогресса, возникала ошибка "Memory leak detected".
  • [BDIDE-1089] В таблицах теперь поддерживается интернационализация.
  • [BDIDE-1103] При внезапном разрыве связи с Zeppelin не отображалось предупреждение о разрыве соединения.
  • [BDIDE-1104] Горизонтальные полосы прокрутки перекрывали текст.
  • [BDIDE-1120] При потере соединения к Spark Monitoring возникала ошибка RuntimeExceptionWithAttachments.
  • [BDIDE-1122] Перезапуск интерпретатора приводил к ошибке KotlinNullPointerException.
  • [BDIDE-1124] Подключение к Hadoop не могло использовать SOCKS-прокси.
Подробнее..

Big Data Tools EAP 11 Zeppelin в DataGrip и spark-submit во всех поддерживаемых IDE

09.10.2020 20:15:45 | Автор: admin

Только что вышло очередное обновление EAP 11 для плагина под названием Big Data Tools, доступного для установки в IntelliJ IDEA Ultimate, PyCharm, and DataGrip. Можно установить его через страницу плагина на сайте или внутри IDE.


Big Data Tools это плагин, позволяющий соединяться с кластерами Hadoop и Spark. Он предоставляет мониторинг узлов, приложений и отдельных задач. Кроме того, в IDEA и DataGrip можно создавать, запускать и редактировать ноутбуки Zeppelin. Можно не переключаться на веб-интерфейс Zeppelin и спокойно работать, не выходя из любимого IDE. Плагин позволяет удобно перемещаться по коду, делать умное автодополнение, рефакторинги и квик-фиксы прямо внутри ноутбука.



Новый тип конфигурации запуска для spark-submit


Одно из важнейших улучшений в этом релизе возможность запускать Spark-приложения из IDE, без необходимости набирать команды в консоли. Эта функциональность доступна для всех поддерживаемых IDE, включая PyCharm. Напоминаю, скрипт spark-submit лежит в директории bin дистрибутива Spark и используется для запуска приложений на кластере. Он использует все поддерживаемые типы кластеров через единый интерфейс: благодаря этому не нужно несколько раз по-разному перенастраивать своё приложение. Удобно.


Большая проблема пользователей spark-submit в том, что его использование сопряжено с рядом ритуальных рутинных действий. Нужно вручную собрать все артефакты, скопировать их на целевой сервер по SSH, запустить spark-submit с кучей параметров. Обычно всё это заканчивается написанием пачки bash-скриптов на все случаи жизни, которые ты теперь обязан поддерживать. В целом, эта рутина крадет время разработчиков, которое можно было бы потратить на что-то более полезное.


С плагином Big Data Tools этот кошмар можно если не прекратить, но значительно облегчить. Теперь вам достаточно создать новую run configuration, вписать в неё параметры spark-submit, параметры SSH, прописать артефакты-зависимости и нажать кнопку запуска. Дальше всё сработает автоматически.



Поддержка Apache Zeppelin в DataGrip


Теперь можно использовать интеграцию с Zeppelin внутри DataGrip. Для пользователей DataGrip это означает, что наконец-то можно нормально визуализировать данные. Кроме того, писать код на SQL куда легче с новым умным автодополнением прямо внутри интерактивного блокнота.


Надо понимать, что языковая поддержка сейчас ограничена только SQL. Например, Matplotlib или анализ кода на Scala в DataGrip у вас не заработают.


Изменения


В этом обновлении мы сконцентрировались на улучшении существующей функциональности. Полный список изменений можно прочитать по ссылке, а дальше будут перечислены только самые важные.


Новая функциональность


  • Поддержка Spark-submit для IntelliJ IDEA, DataGrip и PyCharm (BDIDE-843).
  • Поддержка Apache Zeppelin для DataGrip (BDIDE-1045).
  • Редактируемые заголовки параграфов в Zeppelin (BDIDE-906).
  • Расширенная фильтрация результатов поиска в Spark Monitoring (BDIDE-1159).
  • Псевдографические таблицы в текстовом ответе Zeppelin отображаются как полноценные таблицы (BDIDE-1172).

Улучшения в интерфейсе


  • Zeppelin: Функции экспорта теперь умеют запоминать путь, по которому в прошлый раз происходило сохранение (графики, таблицы и HTML в ответах Zeppelin) (BDIDE-1132).

Список исправленных багов


  • Zeppelin: Исправлено IntelliJ IDEA 2020.1 могла падать при просмотре ответа Zeppelin, содержащего встроенные изображения (BDIDE-1184).
  • Zeppelin: Исправлено Нельзя было подключиться к серверу, если эндпоинт /api/version требовал серверной аутентификации (BRIDE-1199).
  • Zeppelin: Исправлено Попытка остановить выполняющийся параграф останавливала вообще все параграфы в ноутбуке (BDIDE-1171).
  • Zeppelin: Исправлено Нумерация параграфов в окне Structure не совпадала с нумерацией внутри Zeppelin (BDIDE-1135).
  • Zeppelin: Fixed IDEA пыталась загружать исходники заново, при каждом новом запуске (BDIDE-1208).
  • Zeppelin: Исправлено Кнопка "Open in browser" для локальных ноутов была не нужна (BDIDE-1142).
  • Zeppelin: Исправлено Результаты выполнения могли стираться при перезапуске интерпретатора (BDIDE-1129).
  • Spark Monitoring: Исправлено Неправильный анализ JSON, в котором хранятся данные для списка Storages, приводил к очистке списка (BDIDE-1162).
  • Remote FS: Исправлено Копирование директорий внутри инстанса Azure иногда приводило к созданию пустых директорий (BDIDE-1141).
  • Remote FS: Исправлено Файлы с неизвестным содержимым стоит открывать как текстовые файлы (BDIDE-1192).
  • Remote FS: Исправлено Исключение возникало при попытке открывать файлы без расширения (BDIDE-1202).
  • General: Исправлено Проблемы при попытке копирования и вставки между различными проектами (BDIDE-1195).
  • HTTP Proxy: Исправлено Ошибка при соединении со Spark или Hadoop через SOCKS proxy с авторизацией (BDIDE-1209).
  • HTTP Proxy: Исправлено Проблема с параметрами авторизации для SCOKS proxy (BDIDE-1215).
  • HTTP Proxy: Исправлено Частные настройки прокси не должны перекрывать общие настройки для нового соединения (BDIDE-1216).

Документация и социальные сети


Ну и наконец, если вам нужно разобраться функциональностью Big Data Tools, у нас есть подробная документация. Если хочется задать вопрос, можно сделать это прямо в комментариях на Хабре или перейти в наш Twitter.


Надеемся, что все эти улучшения окажутся полезными, позволят вам делать более интересыне вещи, и более приятным способом.


Команда Big Data Tools

Подробнее..

Big Data Tools EAP 12 экспериментальная поддержка Python, поиск по ноутбукам в Zeppelin

16.12.2020 18:10:53 | Автор: admin

Только что вышло очередное обновление EAP 12 для плагина под названием Big Data Tools, доступного для установки в IntelliJ IDEA Ultimate, PyCharm Professional и DataGrip. Можно установить его через страницу плагина или внутри IDE. Плагин позволяет работать с Zeppelin, загружать файлы в облачные хранилища и проводить мониторинг кластеров Hadoop и Spark.


В этом релизе мы добавили экспериментальную поддержку Python и поиск по ноутбукам Zeppelin. Если вы страдали от каких-то багов, их тоже починено множество. Давайте поговорим об этих изменениях более подробно.



Экспериментальная поддержка Python в Zeppelin


Поддержку Python хотелось добавить давно. Несмотря на то, что PySpark в Zeppelin сейчас на волне хайпа, всё что предоставляет нам веб-интерфейс Zeppelin простейшее автодополнение, в котором содержится какой-то наполовину случайный набор переменных и функций. Вряд ли это можно ставить в вину Zeppelin, он никогда не обещал нам умного анализа кода. В IDE хочется видеть что-то намного большее.


Добавить целый новый язык звучит как очень сложная задача. К счастью, в наших IDE уже есть отличная поддержка Python: либо в PyCharm, либо в Python-плагине для IntelliJ IDEA. Нам нужно было взять эту готовую функциональность и интегрировать внутрь Zeppelin. Вместе с этим возникает много нюансов, специфичных для Zeppelin: как нам проанализировать список зависимостей, как найти правильную версию Python, и тому подобное.



Начиная с EAP 12, код на Python нормально подсвечивается в нашем редакторе ноутбуков Zeppelin, отображаются грубые синтаксические ошибки. Можно перейти на определение переменной или функции, если они объявлены внутри ноутбука. Можно сделать привычные рефакторинги вроде rename или change signature. Работают Zeppelin-специфичные таблицы и графики в конце концов, зачастую ради них люди и используют Zeppelin.



Конечно, многие вещи ещё предстоит сделать. Например, очень хотелось бы видеть умное автодополнение по функциям Spark API и другому внешнему коду. Сейчас мы нормально автодополняем только то, что написано внутри ноутбука. У нас есть хорошие идеи, как это реализовать в следующих релизах. Иначе говоря, не надо ждать какого-то чуда: теперь у вас есть полнофункциональный редактор Python, но это всё. Поддержка Python получилась довольно экспериментальной, но, как говорится, путь в тысячу ли начинается с первого шага. А ещё, даже имеющейся функциональности может оказаться достаточно, чтобы писать код в вашем любимом IDE и не переключаться на веб-интерфейс Zeppelin.


Смешиваем Scala и Python


Иногда, в одном и том же ноутбуке вам хочется одновременно использовать и Python, и Scala. Например, это бывает полезно из соображений производительности в вычислительных задачах.


Смешивать разные языки вполне возможно. Но не забывайте, что для полноценной поддержки Scala вам понадобится IntelliJ IDEA с плагинами Scala и Python. В PyCharm этот Scala-код хоть и будет выполняться, но его поддержка в редакторе останется на уровне plain text.



Поиск по ноутбукам Zeppelin


У нас всегда была возможность найти, в каком же файле на диске находится нужный нам текст (например, с помощью Find in Path, Ctrl+Shift+F). Но этот стандартный интерфейс поиска не работает с ноутбуками, ведь они не файлы!


Начиная с EAP 12 мы добавили отдельную панель поиска по ноутбукам. Откройте панель Big Data Tools, выделите какое-нибудь из подключений к Zeppelin и нажмите на кнопку с изображением лупы (или используйте сочетание клавиш Ctrl+F на клавиатуре). В результате, вы попадёте в окно под названием Find in Zeppelin Connections. Активация одного из результатов поиска приведет к открытию этого ноутбука и переходу на нужный параграф.



Похожий поиск есть и в веб-интерфейсе Zeppelin. Для получения результатов поиска мы используем стандартный HTTP API, поэтому результаты должны совпадать с тем, что вы видите в интерфейсе Zeppeliln по аналогичному поисковому запросу. Если вы раньше пользовались веб-интерфейсом и привыкли к поиску по ноутбукам, теперь он имеется и в Big Data Tools.


Функция небольшая, но очень полезная. Непонятно, как мы без неё жили раньше.


Исправления ошибок


Плагин Big Data Tools активно развивается, и при бурном росте неизбежны некоторые проблемы. В этом релизе мы провели много работы над правильной работой с удаленными хранилищами, отображением графиков и параграфов, переработали часть интерефейсов (например, SSH-туннели). Переработаны кое-какие системные вещи (например, несколько проектов теперь используют общее подключение к Zeppelin), вывезли множество ошибок в неожиданных местах. В целом, теперь пользоваться плагином намного приятней.


Если вам интересен обзор основных улучшений, то их можно найти в разделе What's New на странице плагина. Если вы ищете какую-то конкретную проблему, вам может подойти полный отчет из YouTrack.


Спасибо, что пользуетесь нашим плагином! Напоминаю, что установить свежую версию можно либо в браузере, на странице плагина, или внутри IDE по названию Big Data Tools. На странице плагина можно оставлять ваши отзывы и предложения (которые мы всегда читаем), и ставить оценки в виде звёздочек.


Документация и социальные сети


Ну и наконец, если вам нужно разобраться функциональностью Big Data Tools, у нас есть подробная документация в вариантах для IntelliJ IDEA и PyCharm. Если хочется задать вопрос, можно сделать это прямо в комментариях на Хабре или перейти в наш Twitter.


Надеемся, что все эти улучшения окажутся полезными, позволят вам делать более интересыне вещи, и более приятным способом.


Команда Big Data Tools

Подробнее..

Тестирование в Apache Spark Structured Streaming

02.01.2021 20:04:09 | Автор: admin

Введение


На текущий момент не так много примеров тестов для приложений на основе Spark Structured Streaming. Поэтому в данной статье приводятся базовые примеры тестов с подробным описанием.


Все примеры используют: Apache Spark 3.0.1.


Подготовка


Необходимо установить:


  • Apache Spark 3.0.x
  • Python 3.7 и виртуальное окружение для него
  • Conda 4.y
  • scikit-learn 0.22.z
  • Maven 3.v
  • В примерах для Scala используется версия 2.12.10.

  1. Загрузить Apache Spark
  2. Распаковать: tar -xvzf ./spark-3.0.1-bin-hadoop2.7.tgz
  3. Создать окружение, к примеру, с помощью conda: conda create -n sp python=3.7

Необходимо настроить переменные среды. Здесь приведен пример для локального запуска.


SPARK_HOME=/Users/$USER/Documents/spark/spark-3.0.1-bin-hadoop2.7PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip;

Тесты


Пример с scikit-learn


При написании тестов необходимо разделять код таким образом, чтобы можно было изолировать логику и реальное применение конечного API. Хороший пример изоляции: DataFrame-pandas, DataFrame-spark.


Для написания тестов будет использоваться следующий пример: LinearRegression.


Итак, пусть код для тестирования использует следующий "шаблон" для Python:


class XService:    def __init__(self):        # Инициализация    def train(self, ds):        # Обучение    def predict(self, ds):        # Предсказание и вывод результатов

Для Scala шаблон выглядит соответственно.


Полный пример:


from sklearn import linear_modelclass LocalService:    def __init__(self):        self.model = linear_model.LinearRegression()    def train(self, ds):        X, y = ds        self.model.fit(X, y)    def predict(self, ds):        r = self.model.predict(ds)        print(r)

Тест.


Импорт:


import unittestimport numpy as np

Основной класс:


class RunTest(unittest.TestCase):

Запуск тестов:


if __name__ == "__main__":    unittest.main()

Подготовка данных:


X = np.array([    [1, 1],  # 6    [1, 2],  # 8    [2, 2],  # 9    [2, 3]  # 11])y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3

Создание модели и обучение:


service = local_service.LocalService()service.train((X, y))

Получение результатов:


service.predict(np.array([[3, 5]]))service.predict(np.array([[4, 6]]))

Ответ:


[16.][19.]

Все вместе:


import unittestimport numpy as npfrom spark_streaming_pp import local_serviceclass RunTest(unittest.TestCase):    def test_run(self):        # Prepare data.        X = np.array([            [1, 1],  # 6            [1, 2],  # 8            [2, 2],  # 9            [2, 3]  # 11        ])        y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3        # Create model and train.        service = local_service.LocalService()        service.train((X, y))        # Predict and results.        service.predict(np.array([[3, 5]]))        service.predict(np.array([[4, 6]]))        # [16.]        # [19.]if __name__ == "__main__":    unittest.main()

Пример с Spark и Python


Будет использован аналогичный алгоритм LinearRegression. Нужно отметить, что Structured Streaming основан на тех же DataFrame-х, которые используются и в Spark Sql. Но как обычно есть нюансы.


Инициализация:


self.service = LinearRegression(maxIter=10, regParam=0.01)self.model = None

Обучение:


self.model = self.service.fit(ds)

Получение результатов:


transformed_ds = self.model.transform(ds)q = transformed_ds.select("label", "prediction").writeStream.format("console").start()return q

Все вместе:


from pyspark.ml.regression import LinearRegressionclass StructuredStreamingService:    def __init__(self):        self.service = LinearRegression(maxIter=10, regParam=0.01)        self.model = None    def train(self, ds):        self.model = self.service.fit(ds)    def predict(self, ds):        transformed_ds = self.model.transform(ds)        q = transformed_ds.select("label", "prediction").writeStream.format("console").start()        return q

Сам тест.


Обычно в тестах можно использовать данные, которые создаются прямо в тестах.


train_ds = spark.createDataFrame([    (6.0, Vectors.dense([1.0, 1.0])),    (8.0, Vectors.dense([1.0, 2.0])),    (9.0, Vectors.dense([2.0, 2.0])),    (11.0, Vectors.dense([2.0, 3.0]))],    ["label", "features"])

Это очень удобно и код получается компактным.


Но подобный код, к сожалению, не будет работать в Structured Streaming, т.к. созданный DataFrame не будет обладать нужными свойствами, хотя и будет соответствовать контракту DataFrame.
На текущий момент для создания источников для тестов можно использовать такой же подход, что и в тестах для Spark.


def test_stream_read_options_overwrite(self):    bad_schema = StructType([StructField("test", IntegerType(), False)])    schema = StructType([StructField("data", StringType(), False)])    df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \        .schema(bad_schema)\        .load(path='python/test_support/sql/streaming', schema=schema, format='text')    self.assertTrue(df.isStreaming)    self.assertEqual(df.schema.simpleString(), "struct<data:string>")

И так.


Создается контекст для работы:


spark = SparkSession.builder.enableHiveSupport().getOrCreate()spark.sparkContext.setLogLevel("ERROR")

Подготовка данных для обучения (можно сделать обычным способом):


train_ds = spark.createDataFrame([    (6.0, Vectors.dense([1.0, 1.0])),    (8.0, Vectors.dense([1.0, 2.0])),    (9.0, Vectors.dense([2.0, 2.0])),    (11.0, Vectors.dense([2.0, 3.0]))],    ["label", "features"])

Обучение:


service = structure_streaming_service.StructuredStreamingService()service.train(train_ds)

Получение результатов. Для начала считываем данные из файла и выделяем: признаки и идентификатор для объектов. После запускаем предсказание с ожиданием в 3 секунды.


def extract_features(x):    values = x.split(",")    features_ = []    for i in values[1:]:        features_.append(float(i))    features = Vectors.dense(features_)    return featuresextract_features_udf = udf(extract_features, VectorUDT())def extract_label(x):    values = x.split(",")    label = float(values[0])    return labelextract_label_udf = udf(extract_label, FloatType())predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \    .withColumn("features", extract_features_udf(col("value"))) \    .withColumn("label", extract_label_udf(col("value")))service.predict(predict_ds).awaitTermination(3)

Ответ:


15.9669918.96138

Все вместе:


import unittestimport warningsfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, udffrom pyspark.sql.types import FloatTypefrom pyspark.ml.linalg import Vectors, VectorUDTfrom spark_streaming_pp import structure_streaming_serviceclass RunTest(unittest.TestCase):    def test_run(self):        spark = SparkSession.builder.enableHiveSupport().getOrCreate()        spark.sparkContext.setLogLevel("ERROR")        # Prepare data.        train_ds = spark.createDataFrame([            (6.0, Vectors.dense([1.0, 1.0])),            (8.0, Vectors.dense([1.0, 2.0])),            (9.0, Vectors.dense([2.0, 2.0])),            (11.0, Vectors.dense([2.0, 3.0]))        ],            ["label", "features"]        )        # Create model and train.        service = structure_streaming_service.StructuredStreamingService()        service.train(train_ds)        # Predict and results.        def extract_features(x):            values = x.split(",")            features_ = []            for i in values[1:]:                features_.append(float(i))            features = Vectors.dense(features_)            return features        extract_features_udf = udf(extract_features, VectorUDT())        def extract_label(x):            values = x.split(",")            label = float(values[0])            return label        extract_label_udf = udf(extract_label, FloatType())        predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() \            .withColumn("features", extract_features_udf(col("value"))) \            .withColumn("label", extract_label_udf(col("value")))        service.predict(predict_ds).awaitTermination(3)        # +-----+------------------+        # |label|        prediction|        # +-----+------------------+        # |  1.0|15.966990887541273|        # |  2.0|18.961384020443553|        # +-----+------------------+    def setUp(self):        warnings.filterwarnings("ignore", category=ResourceWarning)        warnings.filterwarnings("ignore", category=DeprecationWarning)if __name__ == "__main__":    unittest.main()

Нужно отметить, что для Scala можно воспользоваться созданием потока в памяти.
Это может выглядеть вот так:


implicit val sqlCtx = spark.sqlContextimport spark.implicits._val source = MemoryStream[Record]source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))val predictDs = source.toDF()service.predict(predictDs).awaitTermination(2000)

Полный пример на Scala (здесь, для разнообразия, не используется sql):


package aaa.abc.dd.spark_streaming_pr.clusterimport org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}import org.apache.spark.sql.DataFrameimport org.apache.spark.sql.functions.udfimport org.apache.spark.sql.streaming.StreamingQueryclass StructuredStreamingService {  var service: LinearRegression = _  var model: LinearRegressionModel = _  def train(ds: DataFrame): Unit = {    service = new LinearRegression().setMaxIter(10).setRegParam(0.01)    model = service.fit(ds)  }  def predict(ds: DataFrame): StreamingQuery = {    val m = ds.sparkSession.sparkContext.broadcast(model)    def transformFun(features: org.apache.spark.ml.linalg.Vector): Double = {      m.value.predict(features)    }    val transform: org.apache.spark.ml.linalg.Vector => Double = transformFun    val toUpperUdf = udf(transform)    val predictionDs = ds.withColumn("prediction", toUpperUdf(ds("features")))    predictionDs      .writeStream      .foreachBatch((r: DataFrame, i: Long) => {        r.show()        // scalastyle:off println        println(s"$i")        // scalastyle:on println      })      .start()  }}

Тест:


package aaa.abc.dd.spark_streaming_pr.clusterimport org.apache.spark.ml.linalg.Vectorsimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.execution.streaming.MemoryStreamimport org.scalatest.{Matchers, Outcome, fixture}class StructuredStreamingServiceSuite extends fixture.FunSuite with Matchers {  test("run") { spark =>    // Prepare data.    val trainDs = spark.createDataFrame(Seq(      (6.0, Vectors.dense(1.0, 1.0)),      (8.0, Vectors.dense(1.0, 2.0)),      (9.0, Vectors.dense(2.0, 2.0)),      (11.0, Vectors.dense(2.0, 3.0))    )).toDF("label", "features")    // Create model and train.    val service = new StructuredStreamingService()    service.train(trainDs)    // Predict and results.    implicit val sqlCtx = spark.sqlContext    import spark.implicits._    val source = MemoryStream[Record]    source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))    source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))    val predictDs = source.toDF()    service.predict(predictDs).awaitTermination(2000)    // +-----+---------+------------------+    // |label| features|        prediction|    // +-----+---------+------------------+    // |  1.0|[3.0,5.0]|15.966990887541273|    // |  2.0|[4.0,6.0]|18.961384020443553|    // +-----+---------+------------------+  }  override protected def withFixture(test: OneArgTest): Outcome = {    val spark = SparkSession.builder().master("local[2]").getOrCreate()    try withFixture(test.toNoArgTest(spark))    finally spark.stop()  }  override type FixtureParam = SparkSession  case class Record(label: Double, features: org.apache.spark.ml.linalg.Vector)}

Выводы


При написании тестов необходимо разделять код таким образом, чтобы разделять логику и применение конкретных вызовов API. Можно использоваться любые доступные источники. В том числе и kafka.


Такие абстракции как DataFrame позволяют это сделать легко и просто.


При использовании Python данные придется хранить в файлах.


Ссылки и ресурсы


Подробнее..
Категории: Scala , Python , Testing , Apache , Spark , Apache spark , Kafka , Streaming

Перевод Масштабирование итеративных алгоритмов в Spark

26.01.2021 14:17:08 | Автор: admin

Итеративные алгоритмы широко применяются в машинном обучении, связанных компонентах, ранжировании страниц и т.д. Эти алгоритмы усложняются итерациями, размеры данных на каждой итерации увеличивается, и сделать их отказоустойчивыми на каждой итерации непросто.

В этой статье я бы подробно остановился на некоторых моментах, которые необходимо учитывать при работе с этими задачами. Мы использовали Spark для реализации нескольких итерационных алгоритмов, таких как построение связанных компонентов, обход больших связанных компонентов и т.д. Ниже приведен мой опыт работы в лабораториях Walmart по построению связанных компонентов для 60 миллиардов узлов клиентской идентификации.

Количество итераций никогда не предопределено, всегда есть условие завершения ?.Количество итераций никогда не предопределено, всегда есть условие завершения ?.

Типы итеративных алгоритмов

Конвергентные данные: Здесь мы видим, что с каждой итерацией количество данных уменьшается, т.е. мы начинаем 1-ю итерацию с огромными наборами данных, а размер данных уменьшается с увеличением количества итераций. Основной задачей будет работа с огромными наборами данных в первых нескольких итерациях, и после того, как набор данных значительно уменьшится, можно будет легко справляться с дальнейшими итерациями до их завершения.

Расходящиеся данные: Количество данных увеличивается при каждой итерации, и иногда они могут появляться быстрее и сделать невозможной дальнейшую работу. Для работы этих алгоритмов необходимы такие ограничения, как ограничения по количеству итераций, начальному размеру данных, вычислительной мощности и т.д.

Аналогичные данные: На каждой итерации у нас были бы более или менее одинаковые данные, и с таким алгоритмом было бы очень легко работать.

Инкрементные данные: На каждой итерации у нас могут появляться новые данные, особенно в ML у нас могут появляться новые обучающие наборы данных с периодическими интервалами.

Препятствия

  1. RDD линии: Одним из распространенных способов сохранения отказоустойчивости системы является хранение копий данных в разных местах, чтобы в случае падения одного узла у нас была копия, которая помогала бы до тех пор, пока узел не восстановится. Но Spark не поддерживает дубликаты данных, а поддерживает линейный график преобразований, выполненных на данных в драйвере. Поэтому такой линейный график был бы полезен, если какой-либо фрагмент данных отсутствует, он может построить его обратно с помощью линейного графика, следовательно, Spark является отказоустойчивым. По мере того, как линейный график становится большим, становится трудно строить данные обратно, так как количество итераций увеличивается.

  2. Память и дисковый ввод/вывод: В Spark RDD являются непреложными, поэтому на каждой итерации мы будем создавать новую копию преобразованных данных (новый RDD), что увеличит использование Памяти и Диска. По мере того, как итерации будут увеличивать использование диска/памяти исполнителями, это может привести к замедлению работы из-за нехватки памяти и ожиданию, пока GC выполнит очистку. В некоторых случаях куча памяти будет недостаточной и может привести к невозможности выполнения задачи.

  3. Размер задачи: В некоторых случаях может быть несколько задач, которые могут не подходить для одного исполнителя, или одна задача занимает гораздо больше времени, чем остальные задачи, что может привести к препятствию.

Советы по преодолению вышеуказанных проблем

  1. Хранение большого линейного графика в памяти, и, в случае сбоя узла, восстановление потерянных наборов данных займет много времени. В таких случаях можно использовать кэширование или запись данных о состоянии в контрольной точке на каждой N итерации. Это сохранит рассчитанный RDD на каждой N итерации (кэширование будет храниться в памяти или на диске исполнителей, запись данных о состоянии в контрольной точке использует HDFS, мы должны принять решение исходя из нашей потребности, так как скорость будет различаться для каждой из них). В случае неудачи RDD вычисляется обратно от последней контрольной точки/кэширования. Вместо использования двух вышеупомянутых методов можно также создать временную таблицу и сохранить вычисленный набор данных, разделенный итерацией. В случае неудачи задания Spark, можно сделать перезапуск с последней N-ой итерации, а преимущество сохранения во временную таблицу состоит в том, что можно избавиться от линейного графика RDD до этой итерации и запустить свежий линейный график с этой точки. По мере того, как линейный график RDD растет в итерационных алгоритмах, нам необходимо строить гибридные решения с использованием кэширования, контрольных точек (см. ссылку [2]) и временных таблиц для различных вариантов использования.

  2. Как и выше, сохранение во временную таблицу и чтение из временной таблицы может помочь избавиться от линейного графика и очистить память и диск предыдущих RDD. Такая запись и чтение замедляет процесс, но это даст огромное преимущество при работе с большими наборами данных. Особенно при конвергировании наборов данных, нам может понадобиться выполнять этот процесс только в течение первых нескольких итераций и использовать кэширование, когда наборы данных становятся маленькими при работе с итерациями. Экономия во временной таблице в качестве контрольной точки выглядит тривиально, но она не просто действует как контрольная точка. Так как мы избавляемся от истории линейных графов, делая это на периодических итерациях, это уменьшит риск сбоя в работе и сократит время на построение ее обратной копии из потерянных данных.

  3. Работа с расходящимися данными сложна, так как размер каждой задачи будет увеличиваться с увеличением количества итераций и займет намного больше времени для каждого исполнителя. Поэтому нам нужен фактор для вычисления количества задач в ( i + 1) итерации по сравнению с i-й итерацией таким образом, чтобы размер задачи остался прежним. Например, скажем, количество задач в i-й итерации 100, и каждая задача обрабатывает около 100 МБ данных. В i+1 итерации размер каждой задачи увеличивается до 150 МБ, и мы можем перетасовать эти 100 задач до 150 задач и оставить 100 МБ на каждую задачу. Таким образом, в расходящихся наборах данных нам необходимо увеличить количество задач, переструктурировав и изменив перетасованные разделы на основе итерации.

  4. В случаях, когда размер spark задачи огромен, попробуйте увеличить память исполнителя в соответствии с размером задачи. А если нужно выполнить соединения на искаженных наборах данных, где 10% задач занимает 90% времени исполнения и 90% задач выполняются за 10% времени, эти задачи предлагается обрабатывать отдельно, выполняя их в виде двух разных запросов. Нужно определить причину больших задач, и можем ли мы разделить их на две группы, т.е. маленькие и большие задачи. В 1-м запросе мы бы обработали 90% задач, т.к. нет никаких препятствий для их обработки, и это заняло бы 10% времени, как и раньше. В другом запросе мы бы обрабатывали большие задачи (10% задач) с помощью всенаправленного соединения, так как количество таких задач меньше, а также избегали бы перетасовки данных.

Пример: Допустим, у нас есть таблица А и таблица Б. Таблица А это данные о населении со столбцами user_id, имя, город, штат. Таблица B это то, что группирует данные со столбцами user_id, group_id. Например, мы пытаемся найти 5 крупнейших городов с наибольшим количеством используемых групп. В этом примере могут быть тупиковые ситуации, как города с большим количеством населения могут быть большой задачей, пользователи с большим количеством групп могут привести к большим задачам. Для решения этих тупиковых ситуаций, объединение между этими таблицами может быть сделано в двух запросах. Мы можем отфильтровать больших пользователей с большим количеством групп (скажем, порог в 1000 групп на пользователя) и относиться к ним как к большим задачам. И выполнять соединения отдельно для больших пользователей, используя всенаправленное объединение, так как количество больших пользователей будет мало по сравнению с общими данными. Аналогичным образом, для остальных пользователей выполняйте тасовку объединения и объединяйте результаты и агрегируйте по городам, чтобы найти 5 лучших городов.


А прямо сейчас в OTUS открыт набор на курс Экосистема Hadoop, Spark, Hive.

Всех желающих приглашаем записаться на бесплатный демо-урок по теме Spark Streaming.

ЗАБРАТЬ СКИДКУ

Подробнее..

Проектирование озера данных с открытым исходным кодом

08.08.2020 08:17:55 | Автор: admin


Озера данных (data lakes) фактически стали стандартом для предприятий и корпораций, которые стараются использовать всю имеющуюся у них информацию. Компоненты с открытым исходным кодом часто являются привлекательным вариантом при разработке озер данных значительного размера. Мы рассмотрим общие архитектурные паттерны необходимые для создания озера данных для облачных или гибридных решений, а также обратим внимание на ряд критически важных деталей которые не стоит упускать при внедрения ключевых компонентов.

Проектирование потока данных


Типичный логический поток озера данных включает следующие функциональные блоки:

  • Источники данных;
  • Получение данных;
  • Узел хранения;
  • Обработка и обогащение данных;
  • Анализ данных.

В этом контексте источники данных, как правило, представляют собой потоки или коллекции сырых событийных данных (например, логи, клики, телеметрия интернета вещей, транзакции).
Ключевой особенностью таких источников является то, что необработанные данные хранятся в их оригинальном виде. Шум в этих данных обычно состоит из дублированных или неполных записей с избыточными или ошибочными полями.



На этапе приёма сырые данные приходят из одного или нескольких источников данных. Механизм приёма чаще всего реализуется в виде одной или нескольких очередей сообщений с простым компонентом, направленным на первичную очистку и сохранение данных. Для того, чтобы построить эффективное, масштабируемое и целостное озеро данных, рекомендуется различать простую очистку данных и более сложные задачи обогащения данных. Одно из проверенных правил заключается в том, что задачи очистки требуют данных из одного источника в пределах скользящего окна.

Скрытый текст
Под обогащением данных подразумеваем насыщение данных дополнительной информацией из других источников (это могут быть какие-то справочники, информациях из сторонних сервисов и т.п.). Обогащение данных распространённый процесс, когда следует учесть все возможные аспекты для проведения дальнейшего анализа.

Как пример, удаление дублей, которое ограничено только сравнением ключей от событий, полученных в течение 60 секунд друг от друга из одного и того же источника, будет типичной задачей очистки. С другой стороны, задача слияния данных из нескольких источников данных в течение относительно длительного промежутка времени (например, за последние 24 часа), скорее соответствует фазе обогащения.


После того, как данные приняты и очищены, они сохраняются в распределенной файловой системе (для повышения отказоустойчивости). Часто данные записываются в табличном формате. Когда новая информация записывается в узел хранения, каталог данных, содержащий схему и метаданные, может обновляться с помощью автономного краулера. Запуск краулера обычно запускается событийно, например при поступлении нового объекта в хранилище. Хранилища обычно интегрированы со своими каталогами. Они выгружают базовую схему для того, чтобы к данным можно было обращаться.

Далее данные попадают в специальную область, предназначенную для золотых данных. С этого момента данные готовы для обогащения другими процессами.

Скрытый текст
Данные называют золотыми, потому что они остаются сырыми и полуструктурированными, и это основной источник знаний о бизнесе.

В процессе обогащения данные дополнительно изменяются и очищаются в соответствии с бизнес-логикой. В результате они сохраняются в структурированном формате в хранилище данных или в базе данных, которое используется для быстрого получения информации, аналитики или обучающей модели.

Наконец, использование данных аналитика и исследования. Именно здесь извлечённая информация преобразуется в бизнес-идеи при помощи визуализации, дашбордов и отчётов. Также эти данные источник для прогнозирования с помощью машинного обучения, результат которого помогает принимать лучшие решения.

Компоненты платформы


Облачная инфраструктура озёр данных требует надёжного, а в случае гибридных облачных систем ещё и унифицированного уровня абстракции, который поможет развёртывать, координировать и запускать вычислительные задачи без ограничений в API провайдеров.

Kubernetes отличный инструмент для такой работы. Он позволяет эффективно развёртывать, организовывать и запускать различные сервисы и вычислительные задачи озера данных надёжным и выгодным способом. Он предлагает унифицированный API, который будет работать и локально и в каком-либо публичном или приватном облаке.



Платформу можно условно разделить на несколько слоёв. Базовый слой это то место, где мы развертываем Kubernetes или его эквивалент. Базовый слой также можно использовать для обработки вычислительных задач вне компетенций озера данных. При использовании облачных провайдеров, перспективно было бы использовать уже наработанные практики облачных поставщиков (ведение журнала и аудит, проектирование минимального доступа, сканирование уязвимостей и отчетность, сетевая архитектура, архитектура IAM и т.д.) Это позволит достичь необходимого уровня безопасности и соответствия другим требованиям.

Над базовым уровнем есть два дополнительных само озеро данных и уровень вывода значений. Эти два уровня отвечают за основу бизнес-логики, а также за процессы обработки данных. Несмотря на то, что существует множество технологий для этих двух уровней, Kubernetes снова окажется хорошим вариантом из-за его гибкости для поддержки различных вычислительных задач.

Уровень озера данных включает в себя все необходимые сервисы для приёма (Kafka, Kafka Connect), фильтрации, обогащения и переработки (Flink и Spark), управления рабочим процессом (Airflow). Помимо этого, он включает хранилища данных и распределённые файловые системы (HDFS), а также базы данных RDBMS и NoSQL.

Самый верхний уровень получение значений данных. По сути, это уровень потребления. Он включает в себя такие компоненты, как инструменты визуализации для понимания бизнес-аналитики, инструменты для исследования данных (Jupyter Notebooks). Другим важным процессом, который происходит на этом уровне, является машинное обучение с использованием обучающей выборки из озера данных.

Важно отметить, что неотъемлемой частью каждого озера данных является внедрение распространённых практик DevOps: инфраструктура как код, наблюдаемость, аудит и безопасность. Они играют важную роль в решении повседневных задач и должны применяться на каждом отдельном уровне, чтобы обеспечивать стандартизацию, безопасность и быть удобными в использовании.

Скрытый текст
При выборе решений с открытым исходным кодом на любом из этапов проектирования облака данных, хорошим показателем является распространённое применение этого решения в индустрии, подробная документация и поддержка opensource-сообществом.

Взаимодействие компонентов платформы


Кластер Kafka будет получать неотфильтрованные и необработанные сообщения и будет функционировать как узел приёма в озере данных. Kafka обеспечивает высокую пропускную способность сообщений надёжным способом. Кластер обычно содержит несколько разделов для сырых данных, обработанных (для потоковой обработки) и недоставленных или искажённых данных.

Flink принимает сообщение из узла c необработанными данных от Kafka, фильтрует данные и делает, при необходимости, предварительное обогащение. Затем данные передаются обратно в Kafka (в отдельный раздел для отфильтрованных и обогащенных данных). В случае сбоя, или при изменении бизнес-логики, эти сообщения можно будет вызвать повторно, т.к. что они сохраняются в Kafka. Это распространенное решение для в потоковых процессов. Между тем, Flink записывает все неправильно сформированные сообщения в другой раздел для дальнейшего анализа.

Используя Kafka Connect получаем возможность сохранять данные в требуемые бекенды хранилищ данных (вроде золотой зоны в HDFS). Kafka Connect легко масштабируется и поможет быстро увеличить количество параллельных процессов, увеличив пропускную способность при большой загрузке:



При записи из Kafka Connect в HDFS рекомендуется выполнять разбиение контента для эффективности обращения с данными (чем меньше данных для сканирования тем меньше запросов и ответов). После того, как данные были записаны в HDFS, serverless функциональность (вроде OpenWhisk или Knative) будет периодически обновлять хранилище метаданных и параметров схемы. В результате, к обновленной схеме становится возможно обращаться через SQL-подобный интерфейс (например, Hive или Presto).



Для последующего data-flows и управления ETL-процессом можно использовать Apache Airflow. Он позволяет пользователям запускать многоступенчатые pipline обработки данных с использованием Python и объектов типа Directed Acyclic Graph (DAG). Пользователь может задавать зависимости, программировать сложные процессы и отслеживать выполнение задач через графический интерфейс. Apache Airflow также может служить для обработки всех внешних данных. Например, для получения данных через внешний API и сохранения их в постоянном хранилище.

Spark под управлением Apache Airflow через специальный плагин, может периодически обогащать сырые отфильтрованные данные в соответствии с бизнес-задачами, и подготавливать данные для исследования специалистами по данным, и бизнес-аналитикам. Специалисты по данным могут использовать JupyterHub для управления несколькими Jupyter Notebook. Поэтому стоит воспользоваться Spark для настройки многопользовательских интерфейсов для работы с данными, их сбором и анализом.



Для машинного обучения можно воспользоваться фреймворками, вроде Kubeflow, используя возможность масштабируемости Kubernetes. Получившиеся модели обучения могут быть возвращены в систему.

Если сложить пазл воедино, мы получим что-то вроде этого:



Операционное совершенство


Мы уже говорили, что принципы DevOps и DevSecOps являются важными компонентами любого озера данных и никогда не должны упускаться из виду. С большой властью приходит большая ответственность, особенно когда все структурированные и неструктурированные данные о вашем бизнесе находятся в одном месте.

Основные принципы будут следующими:

  1. Ограничить доступ пользователей;
  2. Ведение мониторинга;
  3. Шифрование данных;
  4. Serverless-решения;
  5. Использование процессов CI/CD.

Принципы DevOps и DevSecOps являются важными компонентами любого озера данных и никогда не должны упускаться из виду. С большой властью приходит большая ответственность, особенно когда все структурированные и неструктурированные данные о вашем бизнесе находятся в одном месте.

Один из рекомендуемых методов разрешить доступ только для определённых сервисов, распределив соответствующие права, и запретить прямой доступ пользователей, чтобы пользователи не смогли изменить данные (это касается и команды). Также для защиты данных важен полный мониторинг при помощи фиксации действий в журнал.

Шифрование данных ещё один механизм для защиты данных. Шифрование хранимых данных может производиться при помощи системы управления ключами (KMS). Это зашифрует вашу систему хранения и текущее состояние. В свою очередь, шифрование при передаче может производиться при помощи сертификатов для всех интерфейсов и эндпоинтов сервисов вроде Kafka и ElasticSearch.

А в случае систем поиска, которые могут не соответствуют политике безопасности, лучше отдавать предпочтение serverless-решениям. Необходимо также отказаться от ручных деплоев, ситуативных изменений в любом компоненте озера данных; каждое изменение должно приходить из системы контроля версий и проходить серию CI-тестов перед развертыванием на продуктовое озеро данных (smoke-тестирование, регрессия, и т. д.).

Эпилог


Мы рассмотрели основные принципы проектирования архитектуры озера данных с открытым исходным кодом. Как часто бывает, выбор подхода не всегда очевиден и может быть продиктован разными требованиями бизнеса, бюджета и времени. Но использование облачных технологий для создания озёр данных, вне зависимости от того, это гибридное или полностью облачное решение, является развивающейся тенденцией в отрасли. Это происходит благодаря огромному количеству преимуществ, предлагаемых таким подходом. Он обладает большим уровнем гибкости и не ограничивает развитие. Важно понимать, что гибкая модель работы несёт заметную экономическую выгоду, позволяя комбинировать, масштабировать и совершенствовать применяемые процессы.
Подробнее..

Перевод Apache Spark 3.1 Spark on Kubernetes теперь общедоступен

22.04.2021 12:11:11 | Автор: admin


С выходом Apache Spark 3.1 в марте 2021-го проект Spark on Kubernetes официально перешел в статус общедоступного и готового к эксплуатации. Это стало результатом трехлетней работы быстрорастущего сообщества, участники которого помогали в разработке и внедрении (изначально поддержка Spark on Kubernetes появилась в Spark 2.3 в феврале 2018 года). Команда Kubernetes aaS от Mail.ru Cloud Solutions перевела самое важное из статьи об основных возможностях Spark 3.1, в которой автор подробно остановился на улучшениях в Spark on Kubernetes.


Полезные источники информации:



Путь Spark on Kubernetes: от бета-поддержки в 2.3 до нового стандарта в 3.1


С выходом Spark 2.3 в начале 2018 года Kubernetes стал новым диспетчером для Spark (помимо YARN, Mesos и автономного режима) в крупных компаниях, возглавляющих проект: RedHat, Palantir, Google, Bloomberg и Lyft. Сначала поддержка имела статус экспериментальной, функций было мало, стабильность и производительность были невысокими.


С тех пор сообщество проекта получило поддержку многочисленных крупных и маленьких компаний, которых заинтересовали преимущества Kubernetes:


  1. Нативная контейнеризация. Упаковывайте зависимости (и сам Spark) с помощью Docker.
  2. Эффективное совместное использование ресурсов и ускорение запуска приложений.
  3. Обширная Open Source-экосистема уменьшает зависимость от облачных провайдеров и вендоров.

В проект было внесено несколько нововведений: от базовых требований вроде поддержки PySpark и R, клиентского режима и монтирования томов в версии 2.4 до мощных оптимизаций вроде динамического выделения (в 3.0) и улучшения обработки выключения нод (в 3.1). За последние три года вышло больше 500 патчей, сильно повысивших надежность и производительность Spark on Kubernetes.



График внесения улучшений в Spark с 2018 по 2021 годы


В результате в новых Spark-проектах в 2021 году Kubernetes все чаще рассматривается в роли стандартного менеджера ресурсов: это следует из популярности Open Source-проекта оператора Spark on Kubernetes и объявлений крупных вендоров, внедряющих Kubernetes вместо Hadoop YARN.


С выходом Spark 3.1 проект Spark on Kubernetes получил статус общедоступного и готового к эксплуатации. В этом релизе было внесено больше 70 исправлений и улучшений производительности. Давайте рассмотрим самые важные функции, которых с нетерпением ждали заказчики.


Улучшенная обработка выключения нод: постепенное отключение исполнителя (новая функция в Spark 3.1)


Эту функцию (SPARK-20624) реализовал Holden Karau, и пока что она доступна только для автономных развертываний и в Kubernetes. Называется функция улучшенная обработка выключения нод, хотя еще одно подходящее название постепенное отключение исполнителя (Graceful Executor Decommissioning).


Эта функция повышает надежность и производительность Spark при использовании Spot-нод (вытесняемые ноды в GCP). Перед остановкой спота с него перемещаются shuffle-данные и содержимое кэша, поэтому влияние на работу Spark-приложения оказывается минимальное. Раньше, когда система убивала спот, все shuffle-файлы терялись, поэтому их приходилось вычислять заново (снова выполнять потенциально очень долгие задачи). Новая фича не требует настройки внешнего shuffle-сервиса, для которого нужно по запросу запускать дорогие ноды хранения и который совместим с Kubernetes.



Новая функция Spark предотвращает внезапное уничтожение спотов и постепенно выключает исполнитель без потери драгоценных данных!


Что делает эта функция?


  • Исполнитель, который нужно выключить, вносится в черный список: драйвер Spark не будет назначать ему новые задачи. Те задачи, которые сейчас исполняются, не будут принудительно прерваны, но если они сбоят из-за остановки исполнителя, то перезапустятся в другом исполнителе, как и сейчас, а их сбой не будет учитываться в максимальном количестве сбоев (новинка).
  • Shuffle-файлы и кэшированные данные мигрируют из выключаемого исполнителя в другой. Если другого нет, например, мы выключаем единственный исполнитель, то можно настроить объектное хранилище (вроде S3) в качестве запасного.
  • После завершения миграции исполнитель умирает, а Spark-приложение продолжает работать как ни в чем не бывало.

Когда работает эта функция?


  • При использовании Spot / вытесняемых нод облачный провайдер уведомляет о выключении за 60120 секунд. Spark может использовать это время для сохранения важных shuffle-файлов. Этот механизм используется и в тех случаях, когда экземпляр на стороне провайдера по каким-то причинам выключают, допустим, при обслуживании EC2.
  • Когда нода Kubernetes пустеет, например для технического обслуживания, или когда вытесняется под исполнителя Spark, например более высокоприоритетным подом.
  • Когда убранный исполнитель является частью динамического выделения при уменьшении размера системы из-за простоя исполнителя. В этом случае тоже будут сохранены кэш и shuffle-файлы.

Как включить функцию?


  • С помощью конфигурационных флагов. Нужно включить четыре основных флага Spark:


    • spark.decommission.enabled;
    • spark.storage.decommission.rddBlocks.enabled;
    • spark.storage.decommission.shuffleBlocks.enabled;
    • spark.storage.decommission.enabled.

    Другие доступные настройки рекомендую поискать прямо в исходном коде.


  • Для получения предупреждения от облачного провайдера о скором отключении ноды, например из-за выключения спота, нужна специфическая интеграция.



Новые опции с томами в Spark on Kubernetes


Начиная со Spark 2.4 при использовании Spark on Kubernetes можно монтировать три типа томов:


  1. emptyDir: изначально пустая директория, существующая, пока работает под. Полезна для временного хранения. Можно поддерживать ее с помощью диска ноды, SSD или сетевого хранилища.
  2. hostpath: в ваш под монтируется директория прямо из текущей ноды.
  3. Заранее статически создаваемый PersistentVolumeClaim. Это Kubernetes-абстракция для разных типов персистентного хранилища. Пользователь должен создать PersistentVolumeClaim заранее, существование тома не привязано к поду.

В Spark 3.1 появилось два новых варианта: NFS и динамически создаваемый PersistentVolumeClaims.


NFS это том, который могут одновременно использовать несколько подов и который можно заранее наполнить данными. Это один из способов обмена информацией, кодом и конфигурациями между Spark-приложениями либо между драйвером и исполнителем внутри какого-нибудь Spark-приложения. В Kubernetes нет NFS-сервера, вы можете запустить его самостоятельно или использовать облачный сервис.


После создания NFS вы можете легко монтировать этот том в Spark-приложение с помощью таких настроек:


spark.kubernetes.driver.volumes.nfs.myshare.mount.path=/sharedspark.kubernetes.driver.volumes.nfs.myshare.mount.readOnly=falsespark.kubernetes.driver.volumes.nfs.myshare.options.server=nfs.example.comspark.kubernetes.driver.volumes.nfs.myshare.options.path=/storage/shared


NFS (Network File System) популярный способ обмена данными между любыми Spark-приложениями. Теперь он работает и поверх Kubernetes


Второй новый вариант динамический PVC. Это более удобный способ использования персистентных томов. Раньше нужно было сначала создавать PVC, а затем монтировать их. Но при использовании динамического выделения вы не знаете, сколько можно создать исполнителей в ходе работы вашего приложения, поэтому старый способ был неудобен. К тому же приходилось самостоятельно вычищать ненужные PersistentVolumeClaims либо смиряться с потерей места в хранилище.


Со Spark 3.1 все стало динамическим и автоматизированным. Когда вы инициализируете Spark-приложение или в ходе динамического выделения запрашиваете новые исполнители, в Kubernetes динамически создается PersistentVolumeClaims, который автоматически предоставляет новый PersistentVolumes запрошенного вами класса хранилища. При удалении пода ассоциированные с ним ресурсы автоматически удаляются.


Другие функции Spark 3.1: PySpark UX, стейджинговая диспетчеризация, повышение производительности


В Spark 3.1 появилось два крупных улучшения в UX для разработчиков на PySpark:


  • Документация PySpark полностью переделана, теперь она больше соответствует Python и удобна для использования.
  • Теперь поддерживаются подсказки типов: получение в IDE бесплатного автозавершения кода и обнаружения статических ошибок.


Автозавершение кода в PySpark с выходом Apache Spark 3.1


Spark History Server, который отображает интерфейс Spark после завершения вашего приложения, теперь может показывать статистику выполненных вами запросов на Structured Streaming.


Стейджинговая диспетчеризация (SPARK-27495) применима только для YARN- и Kubernetes-развертываний при включенном динамическом выделении. Она позволяет вам управлять в коде количеством и типом запрашиваемых для исполнителя ресурсов с точностью на уровне стадий. В частности, вы можете настроить приложение под использование исполнителей с процессорными ресурсами в ходе первой стадии (например, при выполнении ETL или подготовке данных), а в ходе второй стадии на использование видеокарт (скажем, для моделей машинного обучения).


В Spark 3.1 улучшили производительность shuffle хеш-соединения, добавили новые правила для прерывания подвыражений и в оптимизатор Catalyst. Для пользователей PySpark будет полезно то, что в Spark теперь применяется колоночный формат in-memory хранения Apache Arrow 2.0.0 вместо 1.0.2. Это повысит скорость работы приложений, особенно если вам нужно преобразовывать данные между Spark и фреймами данных Pandas. Причем все эти улучшения производительности не потребуют от вас менять код или конфигурацию.


Что еще почитать по теме:


  1. Как и зачем разворачивать приложение на Apache Spark в Kubernetes.
  2. Наш телеграм-канал Вокруг Kubernetes в Mail.ru Group.
  3. MLOps без боли в облаке: как развернуть Kubeflow в production-кластере Kubernetes.
Подробнее..

PySpark. Решаем задачу на поиск сессий

07.03.2021 10:21:03 | Автор: admin

Добрый день уважаемые читатели! Несколько дней назад перечитывая книгу Энтони Молинаро SQL. Сборник рецептов, в одной из глав я наткнулся на тему, которая была посвящена определению начала и конца диапазона последовательных значений. Бегло ознакомившись с материалом, я сразу вспомнил, что уже сталкивался с данным вопросом в качестве одного из тестовых заданий, но тогда тема была заявлена как Задача на поиск сессий. Фишкой технического собеседования был не разбор выполненной работы, а один из вопросов интервьюера о том, как получить аналогичные значения с помощью Spark. Готовясь к собеседованию, я не знал, что в компании применяется (а может и не применяется) Apache Spark, и поэтому не собрал информацию по новому на тот момент для меня инструменту. Оставалось лишь выдвинуть гипотезу, что искомое решение может быть подобно скрипту, который можно написать c помощью библиотеки Pandas. Хотя очень отдалено я все-таки попал в цель, однако поработать в данной организации не получилось.

Справедливости ради хочу заметить, что за прошедшие годы я несильно продвинулся в изучении Apache Spark. Но я все равно хочу поделиться с читателями наработками, так как многие аналитики вообще не сталкивались с этим инструментом, а другим возможно предстоит подобное собеседование. Если вы являетесь профессионалом Spark, то всегда можно предложить более оптимальный код в комментариях к публикации.

Это была преамбула, приступим непосредственно к разбору данной темы. Пойдем сначала и напишем SQL скрипт. Но прежде создадим базу данных и заполним ее значениями. Так как это демо-пример предлагаю использовать SQLite. Данная БД уступает более мощным коллегам по цеху, но ее возможностей для разработки скрипта нам хватит сполна. Чтобы автоматизировать заявленные выше операции, я написал вот такой код на Python.

# Импорт библиотекimport sqlite3# Данные для записи в БДprojects = [    ('2020-01-01', '2020-01-02'),    ('2020-01-02', '2020-01-03'),    ('2020-01-03', '2020-01-04'),    ('2020-01-04', '2020-01-05'),    ('2020-01-06', '2020-01-07'),    ('2020-01-16', '2020-01-17'),    ('2020-01-17', '2020-01-18'),    ('2020-01-18', '2020-01-19'),    ('2020-01-19', '2020-01-20'),    ('2020-01-21', '2020-01-22'),    ('2020-01-26', '2020-01-27'),    ('2020-01-27', '2020-01-28'),    ('2020-01-28', '2020-01-29'),    ('2020-01-29', '2020-01-30')]try:    # Создаем соединение    con = sqlite3.connect("projects.sqlite")    # Создаем курсор    cur = con.cursor()    # Создаем таблицу    cur.execute("""CREATE TABLE IF NOT EXISTS projects (                    proj_id INTEGER PRIMARY KEY AUTOINCREMENT,                    proj_start TEXT,                    proj_end TEXT)""")    # Добавляем записи    cur.executemany("INSERT INTO projects VALUES(NULL, ?,?)", projects)    # Сохраняем транзакцию    con.commit()    # Закрываем курсор    cur.close()except sqlite3.Error as err:    print("Ошибка выполнения запроса", err)finally:    # Закрываем соединение    con.close()    print("Соединение успешно закрыто")

Решение тривиальное и не требует дополнительных комментариев. Для коммуникации с созданной БД я использовал DBeaver. Клиентское приложение хорошо себя зарекомендовало, поэтому я часто использую его для разработки SQL запросов.

В рукописи Молинаро приводится два варианта решения данной задачи, но, по сути, это один и тот же код. Просто в первом случае не применяются оконные функции, а во-втором они присутствуют. Я выбрал последний, чтобы упростить ход рассуждений. Текстовую версию скрипта и все вспомогательные файлы к публикации вы можете найти по адресу (ссылка).

select       p3.proj_group,       min(p3.proj_start) as date_start,      max(p3.proj_end) as date_end,      julianday(max(p3.proj_end))-julianday( min(p3.proj_end))+1 as deltafrom    (select      p2.*,     sum(p2.flag)over(order by p2.proj_id) as proj_groupfrom (select       p.proj_id ,       p.proj_start,       p.proj_end,       case       when lag(p.proj_end)over(order by p.proj_id) = p.proj_start then 0 else 1       end as flagfrom projects as p) as p2) as p3group by p3.proj_group

Если вы раньше уже использовали оконные функции, то разобраться самостоятельно с написанной конструкцией не составит никакого труда. Я лишь кратко опишу логику. Первоначальная таблица представляет собой последовательные шаги, для которых заданы два параметра: дата начала и дата конца. Если дата начала шага соответствует дате конца предыдущего шага, то два шага считаются одной сессией. Следовательно, начинать расчеты нужно со смещения, за это отвечает оконная функция lag. На следующем этапе сравниваем дату старта текущего шага и дату конца предыдущего и выводим либо 0, либо 1. Если к новому столбцу применить суммирование с нарастающим итогом, то получим номера сессий. Стандартная группировка по номерам с агрегирующими функциями позволит извлечь начало и конец диапазона значений. Я также рассчитал дельту между двумя датами на случай, если потребуется установить самую длинную или короткую сессию. Приведенный код будет актуален и для других БД. Ошибка будет выводиться только на строчке, где находится разница между двумя датами (функция julianday это прерогатива SQLite). На этом первая часть тестового задания выполнена. Переходим к Spark.

Если верить Википедии, то Apache Spark это фреймворксоткрытым исходным кодомдля реализации распределённой обработкинеструктурированныхи слабоструктурированных данных, входящий в экосистему проектовHadoop. Так как я не пишу на Java, Scala или R, то для получения функциональности Spark решил использовать PySpark. Устанавливать на компьютер все необходимые для работы компоненты я не стал. Для экспериментов выбрал облачный сервис Google Colab, так как у меня уже был заведенный аккаунт. Основной минус - при каждом новом сеансе работы нужно заново скачивать файлы, связанные с запуском нашего инструмента. На просторах Интернета я встречал вариант с фиксированной установкой, но пока не пробовал его на практике.

С помощью базовых команд Linux мы устанавливаем OpenJDK, скачиваем и разархивируем файлы Spark. Затем прописываем две переменные среды. Нужно не забыть о вспомогательной библиотеке findspark. Подготовительная работа закончена, осталось только открыть сессию.

В идеале следует импортировать файл с БД SQLite в облако и подключаться к нему, но я решил облегчить себе жизнь и сформировал датафрейм прямо в ноутбуке. Чтобы даты воспринимались как даты, потребовалось написать собственную функцию.

Так как операций в Spark довольно много, рекомендую сразу обзавестись шпаргалками. Если говорить о литературе для изучения данного инструмента, то радует два факта. Во-первых, есть как англоязычные, так и переводные издания, а во-вторых, источников информации предостаточно. Если вы не владеете языком Шекспира, то могу порекомендовать в первую очередь Изучаем Spark. Молниеносный анализ данных, авторы Холден Карау, Энди Конвински, Патрик Венделл, Матей Захария.

После того, как датафрейм подготовлен, можно пойти сначала самым простым путем и использовать уже имеющийся скрипт SQL. В код нужно лишь внести две правки: изменить имя базовой таблицы, изменить способ нахождения дельты (функция datediff).

Строго говоря, мы уже выполнили вторую часть тестового задания. Но, что-то мне подсказывает, что на собеседовании к данному способу могут придраться, аргументируя это тем, что это все тот же скрипт SQL без специфики Spark. Поэтому в конце статьи приведу псевдокод, который скорее всего уступает по производительности предыдущему решению, но лучше отражает способности соискателя программировать. Полную версию скрипта можно найти в ноутбуке.

from pyspark.sql.functions import lagfrom pyspark.sql import functions as Ffrom pyspark.sql.window import Window# Equivalent of Pandas.dataframe.shift() methodw = Window().partitionBy().orderBy(col("proj_id"))df_dataframe = df.withColumn('lag', F.lag("proj_end").over(w))#...# Equivalent of SQL- CASE WHEN...THEN...ELSE... ENDdf_dataframe = df_dataframe.withColumn('flag',F.when(df_dataframe["proj_start"] == df_dataframe["lag"],0).otherwise(1))#...# Cumsum by column flagw = Window().partitionBy().orderBy(col("proj_id"))df_dataframe = df_dataframe.withColumn("proj_group", F.sum("flag").over(w))#...# Equivalent of SQL - GROUP BYfrom pyspark.sql.functions import  min, maxdf_group = df_dataframe.groupBy("proj_group").agg(min("proj_start").alias("date_start"), \                                                  max("proj_end").alias("date_end"))df_group = df_group.withColumn("delta", F.datediff(df_group.date_end,df_group.date_start))df_group.show()

Краткие выводы.

  1. Перед собеседованием внимательно изучайте список инструментов и технологий, с которыми работает компания. Не стоит ограничиваться позициями только из текста вакансии. В идеале нужно постоянно расширять свой профессиональный кругозор, чтобы отвечать на каверзные вопросы, которые задаются с цель всесторонне прощупать кандидата.

  2. Даже если вы раньше никогда не работали со Spark, это не повод отказываться от конкурса на вакантную позицию. Основы PySpark можно освоить в сжатые сроки, при условии, что в бэкграунде уже есть опыт программирования с использованием библиотеки Pandas.

  3. Недостатка в книгах по Spark не наблюдается.

На этом все. Всем здоровья, удачи и профессиональных успехов!

Подробнее..
Категории: Python , Sql , Big data , Data engineering , Sqlite , Spark

Обработка и анализ текстов на Python и Spark NLP

08.04.2021 22:07:23 | Автор: admin

В наше время без анализа и обработки текстов,не обходится ни один проект, и так уж сложилось что Python обладает широким спектром библиотек и фреймворков для задач NLP. Задачи могут быть как тривиальные: анализ тональности(sentiment) текста, настроение, распознавание сущностей(NER) так и более интересные как боты, сравнение диалогов в саппорт-чатах - мониторить следует ли ваша тех.поддержка или сейлз текстовым скриптам, или постобработка текста после SpeechToText.

Для решения задач NLP имеется огромное количество инструментов. Вот короткий список таковых:

Речь как вы понимаете пойдет о последнем, так как он включает в себя практически все что умеют выше перечисленные библиотеки. Существуют как бесплатные pre-trained модели так и платные, узкоспециализированные например для healthcare.

Для работы Spark NLP понадобится Java 8 - она нужна для фреймворка Apache Spark с помощью которого и работает Spark NLP. Для экспериментов на сервере или на локальной машине потребуется минимум 16Гб ОЗУ. Устанавливать лучше на каком нибудь дистрибутиве Linux (На macOS могут возникнуть трудности), лично я выбрал Ubuntu инстанс на AWS.

apt-get -qy install openjdk-8

Также нужно установить Python3 и сопутствующие библиотеки

apt-get -qy install build-essential python3 python3-pip python3-dev gnupg2

pip install nlu==1.1.3

pip install pyspark==2.4.7

pip install spark-nlp==2.7.4

Экспериментировать можно так же на colab. Работает Spark NLP по принципу конвейеров (pipeline), ваш текст проходит некоторое количество стадий которые вы описали в pipe-лайне, и каждая стадия производит описанные манипуляции, к примеру: пайплайн для получения именованных сущностей. Ниже на картинке схема наиболее часто встречающихся стадий которые будет проходить ваш входной текст, каждая стадия конвейера добавляет свою колонку с данными после ее выполнения.

Пример конвейера в Spark NLPПример конвейера в Spark NLP

Пример создания стадий для пайплайна. (весь код примера по ссылке на colab)

documentAssembler = DocumentAssembler() \    .setInputCol('text') \    .setOutputCol('document')tokenizer = Tokenizer() \    .setInputCols(['document']) \    .setOutputCol('token')embeddings = BertEmbeddings.pretrained(name='bert_base_cased', lang='en') \        .setInputCols(['document', 'token']) \        .setOutputCol('embeddings')ner_model = NerDLModel.pretrained('ner_dl_bert', 'en') \    .setInputCols(['document', 'token', 'embeddings']) \    .setOutputCol('ner')ner_converter = NerConverter() \    .setInputCols(['document', 'token', 'ner']) \    .setOutputCol('ner_chunk')nlp_pipeline = Pipeline(stages=[    documentAssembler,     tokenizer,    embeddings,    ner_model,    ner_converter])
  1. documentAssembler - создает аннотацию типаDocument,которая может использоваться аннотаторами в будущем

  2. tokenizer - разбивает текст и пунктуацию на массив строк

  3. embeddings -создает векторные представления для слов

  4. ner_model - распознаватель именованных сущностей. к примеру: October 28, 1955 = DATE

  5. ner_converter - добавляет колонку с отдельными распознанными сущностями October 28, 1955

И все конечно хорошо, но приходится как-то много кода писать - описывая стадии пайплайна и сам пайплайн, не говоря про подключение библиотек и инициализацию Spark NLP, поэтому разработчики SparkNLP (johnsnowlabs) сделали более высокоуровневую библиотеку или синтаксический сахар над SparkNLP - называйте как хотите, но когда мы попробуем повторить вышеприведенный пример:

import nlupipeline = nlu.load('ner')result = pipeline.predict(  text, output_level='document').to_dict(orient='records')

мы получим все те же самые NER, но написав на порядок меньше кода.

Хотелось бы еще отметить что оба варианта получения именованных сущностей, требуют некоторое время на инициализацию Apache Spark, предзагрузку моделей и установку связи интерпретатора Python c Spark через pyspark. Потому вам не особо захочется по 10-100 раз перезапускать скрипт с кодом выше, нужно предусмотреть предзагрузку и просто обрабатывать текст посредством вызова predict, в моем случае я сделал инициализацию нужных мне конвейеров во время инициализации Сelery воркеров.

# паттерн Реестрpipeline_registry = PipelineRegistry()def get_pipeline_registry():    pipeline_registry.register('sentiment', nlu.load('en.sentiment'))    pipeline_registry.register('ner', nlu.load('ner'))    pipeline_registry.register('stopwords', nlu.load('stopwords'))    pipeline_registry.register('stemmer', nlu.load('stemm'))    pipeline_registry.register('emotion', nlu.load('emotion'))    return pipeline_registry@worker_process_init.connectdef init_worker(**kwargs):    logging.info("Initializing pipeline_factory...")    get_pipeline_registry()

Таким образом можно выполнять NLP задачи без боли в мозгу и минимумом усилий.

Подробнее..
Категории: Python , Nlp , Spark , Nlu

Перевод Как дебажить запросы, используя только Spark UI

08.11.2020 12:22:14 | Автор: admin

Егор Матешук (CDO AdTech-компании Квант и преподаватель в OTUS) приглашает Data Engineer'ов принять участие в бесплатном Demo-уроке Spark 3.0: что нового?. Узнаете, за счет чего Spark 3.0 добивается высокой производительности, а также рассмотрите другие нововведения.

Также приглашаем посмотреть запись трансляции Demo-урока Написание эффективных пользовательских функций в Spark и пройти вступительное тестирование по курсу Экосистема Hadoop, Spark, Hive!

У вас уже есть все, что вам нужно для дебаггинга запросов

Spark - самый широко используемый фреймворк для big data вычислений, способный выполнять задачи на петабайтах данных. Spark предоставляет набор веб-UI, которые можно использовать для отслеживания потребления ресурсов и состояния кластера Spark. Большинство проблем, с которыми мы сталкиваемся при выполнении задачи (job), можно отладить, перейдя в UI Spark.

spark2-shell --queue=P0 --num-executors 20Spark context Web UI available at http://<hostname>:<port>Spark context available as 'sc'Spark session available as 'spark'

В этой статье я попытаюсь продемонстрировать, как дебажить задачу Spark, используя только Spark UI. Я запущу несколько задач Spark и покажу, как Spark UI отражает выполнение задачи. Также я поделюсь с вами несколькими советами и хитростями.

Вот как выглядит Spark UI.

Мы начнем с вкладки SQL, которая включает в себя достаточно много информации для первоначального обзора. При использовании RDD в некоторых случаях вкладки SQL может и не быть.

А вот запрос, который запускаю в качестве примера

spark.sql("select id, count(1) from table1 group by id).show(10, false)

Перевод разъяснений в правой части:

<-- В рамках запроса было запущено 3 задачи, а сам запрос был выполнен за 21с.

<-- Файлы Parquet отсканированы, они содержат в сумме 23.7М строк

<-- Это работа выполненная каждой партицией

1. генерирует хеш id, count

2. группирует id и суммирует count. Вот как это выглядит

1. id = hash(125), count=1000

2. id = hash(124), count=900

<-- Происходит обмен данных, приведенных выше, на основе хеша id колонки, чтобы в результате каждая партиция имела один хеш

<-- Данные каждой партиции суммируются и возвращается count

Теперь давайте сопоставим это с физическим планом запроса. Физический план можно найти под SQL DAG, когда вы раскрываете вкладку details. Мы должны читать план снизу вверх

== Physical Plan ==CollectLimit 11+- *(2) HashAggregate(keys=[id#1], functions=[count(1)], output=[id#1, count(1)#79])+- Exchange hashpartitioning(id#1, 200)+- *(1) HashAggregate(keys=[id#1], functions=[partial_count(1)], output=[id#1, count#83L])+- *(1) FileScan parquet [id#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://<node>:<port><location>, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string>

Вот как следует читать план:

  1. Сканирование файла parquet. Обратите внимание на PushedFilters. Я продемонстрирую, что это означает позже

  2. Создание HashAggregate с ключами. Обратите внимание на partial_count. Это означает, что агрегированный count является частичным, поскольку агрегирование было выполнено в каждой отдельной задаче и не было смешанно для получения полного набора значений.

  3. Теперь сгенерированные данные агрегируются на основе ключа, в данном случае id.

  4. Теперь вычисляется вообще весь count.

  5. Полученный результат

Теперь, когда с этим мы разобрались, давайте посмотрим на данные PuedFilters. Spark оптимизирован для предикатов, и любые применяемые фильтры пушатся к источнику. Чтобы продемонстрировать это, давайте рассмотрим другую версию этого запроса

spark.sql("select id, count(1) from table1 where status = 'false' group by id).show(10, false)

А это его план

+- *(2) HashAggregate(keys=[id#1], functions=[count(1)], output=[id#1, count(1)#224])+- Exchange hashpartitioning(id#1, 200)+- *(1) HashAggregate(keys=[id#1], functions=[partial_count(1)], output=[id#1, count#228L])+- *(1) Project [id#1]+- *(1) Filter (isnotnull(status#3) && (status#3 = false))+- *(1) FileScan parquet [id#1,status#3] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://mr25p01if-ingx03010101.mr.if.apple.com:50001/home/hadoop/work/news/20200..., PartitionFilters: [], PushedFilters: [IsNotNull(status), EqualTo(status,false)], ReadSchema: struct<id:string,status:string>

Обратите внимание на изменения по сравнению с предыдущим планом.

Мы видим в PushedFilters уже кое-что другое проверка на null и проверка на равенство. Столбец, к которому мы применяем фильтр пушится к источнику, т.е. при чтении данных эти строки игнорируются. Результат этого переносится на следующие этапы.

Можем ли мы, применяя фильтры, уменьшить общее количество прочитанных данных (или файлов)?

Да мы можем. В обоих приведенных выше примерах общее количество прочитанных данных составляет ~ 23,8M. Чтобы уменьшить его, мы можем использовать магию файлов parquet. В Parquet есть группа строк, в которой есть статистика, которую можно использовать для игнорирования нескольких групп/файлов строк. Это приводит к тому, что эти файлы вообще не читаются. Вы можете прочитать о том, как это сделать, в другой моей статье на medium Insights Into Parquet Storage.

Вкладка Executor

Эта вкладка дает нам представление о количестве активных в настоящее время исполнителей в вашей сессии spark.

spark2-shell  queue=P0  driver-memory 20g  executor-memory 20g  num-executors 40

Я запросил 40 исполнителей для сессии, однако при запуске вы можете увидеть, что он предоставил мне всего 10 активных исполнителей. Это может быть связано с тем, что не работают хосты или Spark не нуждается в таком большом количестве исполнителей. Это также может вызвать задержку в планировании задач, поскольку у вас всего 10 исполнителей, а вам нужно 40, что скажется на параллелизме.

Вкладка Environment

Вкладка Environment содержит подробную информацию обо всех параметрах конфигурации, которые в данный момент использует сессия spark.

Посмотрите, как здесь отражены параметры, отраженные мной ранее. Это полезно хотя бы просто для того, чтобы убедиться, что предоставленная вами конфигурация принята.

Вкладка Storage

Здесь отображается информация об одной из наиболее обсуждаемых функций Spark - кэшировании. В Интернете доступно множество статей с разными мнениями относительно того, стоит ли кэшировать или нет. К счастью, эта статья не о том, когда стоит кэшировать и т. д. Он больше о том, что происходит, когда мы кэшируем.

Но перед этим давайте вернемся немного назад и потратим несколько минут на некоторые основы кэширования.

Есть два способа кэширования Dataframe:

df.persist

Для кэширования набора данных требуется несколько свойств.

df.cache

Под капотом это вызывает метод persist. Обратимся к исходному коду

def cache(): this.type = persist()/*** Persist this Dataset with the given storage level.* @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`,`MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`,`MEMORY_AND_DISK_2`, etc.* @group basic* @since 1.6.0*/
  • DISK_ONLY: хранить (persist) данные на диске только в сериализованном формате.

  • MEMORY_ONLY: [хранить данные в памяти только в десериализованном формате.

  • MEMORY_AND_DISK: хранить данные в памяти, а если памяти недостаточно, вытесненные блоки будут сохранены на диске.

  • MEMORY_ONLY_SER: этот уровень Spark хранит RDD как сериализованный объект Java (однобайтовый массив на партицию). Это более компактно по сравнению с десериализованными объектами. Но это увеличивает накладные расходы на CPU.

  • MEMORY_AND_DISK_SER: аналогично MEMORY_ONLY_SER, но с записью на диск, когда данные не помещаются в памяти.

  • Давайте воспользуемся df.cache в нашем примере и посмотрим, что произойдет a.cache() -> На вкладке Storage ничего не видно. Как вы можете догадаться, это из-за ленивого вычисления

Давайте воспользуемся df.cache в нашем примере и посмотрим, что произойдет

a.cache()

> На вкладке Storage ничего не видно. Как вы можете догадаться, это из-за ленивого вычисления

a.groupBy(id).count().show(10,false)

Мы видим какой-то кэш данных. Размер в памяти составляет 5,2 ГБ, а размер моего файла - 2 ГБ хммм что здесь произошло

hadoop dfs -dus <dirName>2,134,751,429 6,404,254,287 <dirName>

Это потому, что данные в памяти десериализованы и несжаты. Это результирует в большем объеме памяти по сравнению с диском.

Так что, когда вы хотите принимаете решение о том, кэшировать или нет, помните об этом.

Я видел несколько толковых статей о том, следует ли кэшировать или нет. Ознакомиться с ними - хорошая идея

Далее мы рассмотрим вкладки Jobs и Stages, причины множества проблем можно отдебажить с помощью этих вкладок.

spark.sql("select is_new_user,count(1) from table1 group by is_new_user").show(10,false)

Я вижу, что для указанного выше запроса запускаются 3 задачи. Но 2 из них пропущены. Обычно это означает, что данные были извлечены из кэша и не было необходимости повторно выполнять данный этап. Кроме того, Spark выполняет множество фиктивных задач для оценки данных. Пропуск задач мог быть связан и с этим.

Давайте же глубоко погрузимся в задачу, которая не была пропущена. Это визуализация DAG для задачи

Мы ясно видим, что эта задача состоит из двух этапов, разделенных операцией перемешивания/обмена. Stages означают, что данные были записаны на диск для использования в следующем процессе.

Давайте углубимся во вкладку stages.

Первое, что всегда нужно проверять, - это сводные метрики для задач. Вы можете нажать show additional metrics для получения дополнительных фактов. Это покажет множество необходимых параметров по минимуму, медиане и максимуму. В идеальном мире минимальное значение должно быть близко к максимальному.

Вот несколько моментов, которые следует отметить:

Продолжительность (duration): В нашем примере минимальная и максимальная продолжительность составляет 0,4 и 4 секунды соответственно. Это может быть связано с несколькими причинами, и мы постараемся отдебажить их в пунктах ниже.

Время десериализации задачи (Task deserialization time):

В нашем примере в рамках десериализации задачи некоторое время тратится и на другие задачи. Одной из основных причин было выполнение процессов сборки мусора в исполнителях. У меня выполнялись другие процессы, в которых были кэшированы некоторые данные, что приводило к сборке мусора. Процессам сборки мусора предоставляется наивысший приоритет, и они останавливают все запущенные процессы в угоду обслуживания процесса сборки мусора. Если вы видите, что ваш процесс не потребляет много памяти, первым шагом для решения такой проблемы может быть разговор с администратором/OPS.

Задержка планировщика (Scheduler delay): максимальная задержка планировщика составляет 0,4 секунды. Это означает, что одна из задач должна была ждать отправки еще 0,4 секунды. Большое это значение или маленькое, зависит от вашего конкретного юзкейса.

Размер ввода очень сильно распределен. Это очень хорошо, поскольку все задачи читают одинаковый объем данных. Это одна из самых важных вещей при поиске неверного/искаженного запроса. Это можно увидеть в столбце shuffle read в разделе Summary metrics for tasks. Самая простая логика для решения таких проблем - это добавление соли к группе, которая может распараллеливать данные, а затем, наконец, агрегирование данных без соли. Этот принцип может применяться во многих формах для решения проблемы асимметрии данных.

Еще одна вещь, на которую стоит обратить внимание, - это уровень локальности.

* PROCESSLOCAL Эта задача будет запущена в том же процессе, что и исходные данные

* NODELOCAL Эта задача будет запущена на том же компьютере, что и исходные данные

* RACKLOCAL Эта задача будет запущена в том же блоке, что и исходные данные

* NOPREF (Отображается как ANY) Эта задача не может быть запущена в том же процессе, что и исходные данные, или это не имеет значения.

Предположим, мы потребляем данные из узла Cassandra в кластере Spark, состоящем из трех узлов. Cassandra работает на машине X узлов Spark X, Y и Z. Для узла X все данные будут помечены как NODELOCAL. Это означает, что после того, как каждое ядро на X будет занято, мы останемся с задачами, предпочтительное расположение которых - X, но у нас есть пространство для выполнения только на Y и Z. У Spark есть только два варианта: дождаться, пока ядра станут доступны на X, или понизить уровень локальности задачи и попытаться найти место для них и принять любые штрафы за нелокальное выполнение.

Параметр spark.locality.wait описывает, как долго ждать перед понижением уровня задач, которые потенциально могут выполняться с более высокого уровня локальности до более низкого уровня. Этот параметр, по сути, является нашей оценкой того, сколько стоит ожидание локального места. Значение по умолчанию - 3 секунды, что означает, что в нашем примере с Cassandra, как только наш совместно расположенный узел X будет забит задачами, другие наши машины Y и Z будут простаивать в течение 3 секунд, прежде чем задачи, которые могли быть NODELOCAL, будут понижены до ANY* и запущены.

Вот пример кода для этого.

Я надеюсь, что эта статья послужит вам в качестве руководства по дебаггингу на Spark UI с целью устранения проблем с производительностью Spark. В Spark 3 есть много дополнительных функций, которые тоже стоит посмотреть.

Также хорошая идея почитать документацию Spark UI.

Вы также можете связаться со мной в Linkedin.

Хотите узнать, как Apache Druid индексирует данные для сверхбыстрых запросов? Узнайте об этом здесь:

Insights into Indexing using Bitmap Index


Интересно развиваться в данном направлении? Участвуйте в трансляции мастер-класса Spark 3.0: что нового? и оцените программу курса Экосистема Hadoop, Spark, Hive!

Подробнее..

Курс Промышленный ML на больших данных что это, для кого и каких навыков требует?

12.10.2020 16:11:28 | Автор: admin

Привет Хабр! Приглашаем на бесплатный Demo-урок Современные большие данные, анализ и оптимизация производительности распределенных приложений. А также в этой статье решили рассказать, как складывается ситуация на рынке специалистов Data Science и конкретно в Big Data и что вас ждет на курсе по промышленному машинному обучению.

В крупных компаниях Data science в терминах fit-predict отходит в прошлое

Первое, что стоит отметить, что джуниоров сейчас избыток, а среди компаний выделяется тренд на то, чтобы искать Middle/ Senior специалиста, давать ему какое-то время на изучение своей инфраструктуры и сразу поручать ему боевые задачи.

При этом пока еще значительная часть начинающих специалистов считает, что Дата сайнтисту достаточно готовой реализации модели обучить ее на каких-то данных и отдать ее Дата инженеру, ну а там дальше как-нибудь разберутся. Но сейчас все движется к тому, что сами процессы обучения и валидации настолько отстроены и понятны, что даже неспециалист может сделать fit-predict. Получается, что люди, которые умеют заниматься лишь этим, в отстроенных конвейерах не очень нужны.

Кроме того, существует проблема подготовки специалистов, которые обладали бы знаниями в инженерной области хотя бы на уровне bird view. В классических курсах немного информации по этой части, в том числе и потому, что тяжело сходу развернуть необходимую инфраструктуру, а задачки на Kaggle этого не требуют. Когда же вы приходите в большую компанию, вас встречает кластер на десятки петабайт, где надо писать распределенные алгоритмы на фреймворках, которые отличаются от стандартного набора Дата сайнтиста. С одной стороны многих это пугает, а с другой те, кто разбирается в этом хотя бы на базовом уровне, получают преимущество при найме.

Альтернативная специальность для Дата сайнтистов и Software инженеров

Курс Промышленный ML на больших данных предлагает симбиоз навыков Дата сайнтиста и Дата инженера. Как правило, такие специалисты требуются в крупные компании с масштабным цифровым продуктом, где нужно работать с потоковыми данными.

Соответственно, освоить этот профиль могут как специалисты из области машинного обучения, так и те, кто обладают бэкграундом в software инжиниринге. Причем вторым будет несколько проще, т.к. базовый ML изучать проще, чем полный стек инженерных технологий.

Навыки, необходимые для работы с Big Data и распределенными данными

Если кратко, то вам понадобится знать особенности обработки распределенных данных, освоить фреймворк Spark и научиться всем составляющим продакшена.

Мы все это (и немного больше) упаковали в онлайн-курс Промышленный ML на больших данных.

Программа рассчитана на 5 месяцев и состоит из 9 модулей:

  • Модуль 1 посвящен начальным знаниям, которые необходимы для освоения дальнейшей программы. Быстрое повторение ML: какие бывают модели, метрики и виды обучения, как мы учим модели, все меряем, валидируем и делаем из полученного выводы.

    Сюда же мы включили занятие по Scala. Хотя с большими данными с помощью фреймворка Spark можно общаться и на Python, мы все же предлагаем познакомиться и со Scala, чтобы вы могли контактировать со Spark через его нативное API. В завершение модуля вас ждет домашняя работа на Scala.

  • В модуле 2 вы познакомитесь с техническими основами распределенной обработки данных. Узнаете про хранилище, как развивались параллельные алгоритмы, какие есть менеджеры ресурсов в таких распределенных системах. Начнете работать со Spark и выполните на нем домашнее задание.

  • В модуле 3 начинаем погружаться в распределенный ML. Показываем, как учатся модельки в распределенной парадигме на Spark, как подбирать гиперпараметры. Т.е. мы переводим релевантный для Дата сайнтиста опыт локальных вычислений на распределенную парадигму.

  • Модуль 4 посвящен потоковой обработке. В первую очередь, с этим полезно познакомиться тем, кто занимался соревновательным анализом данных или работал в ограниченных ресурсах. Эти навыки больше относятся к работе в больших компаниях, где есть какой-то непрерывный поток входящих данных, которые надо обрабатывать, складывать, хранить, применять к ним на ходу ML.

  • Задача модуля 5 научить вас формировать долгосрочные и краткосрочные цели для ML-проекта. Вы будете понимать, как достигать этих целей и оценивать результаты. Пара занятий выделена специально под то, как проводить А/B тестирование.

  • Модуль 6 отвечает на вопросы, как и зачем обучать модели. Вы узнаете, как раскатывать модели в своей инфраструктуре: оборачивать, версионировать, воспроизводить, сервить и т.д. Все это для больших данных и распределенной парадигмы.

  • Модуль 7 отводится под Python. Вы освоите различные практики: как писать на нем в продакшн и как это все оборачивать, как вставлять модель на сервинг, делать для нее API, запаковывать в контейнеры и раскатывать на примере облачных систем вроде Amazon.

  • Модуль 8 мы выделили под продвинутые темы. Здесь разберем, как запускать в продакшн нейросети, обучение с подкреплением, а закончим модуль градиентным бустингом, где вы научитесь запускать его распределенно на кластере.

  • Модуль 9 посвящен проектной работе. Тут вам доступны два варианта действий:

  1. Можно взять свой рабочий кейс, над которым вы сейчас трудитесь. Тогда вы будете выполнять поставленную задачу end to end: начиная с данных, которые потоком приходят или в виде датасета отгружаются, и заканчивая результатом, которые дают ваши модели в виде сервиса, выгрузки и т.д.

  2. Можно сделать учебный проект: рекомендательную систему на базе данных OTUS.

Специальность, которую дает эта программа, не только максимально прикладная, но и с каждым годом будет становиться все перспективнее. Это связано и с тем, что все больше цифровых продуктов делают акцент на обработку данных и все чаще от специалистов требуется не только обучить модель, но и правильно подготовить ее в продакшн.

Если область промышленного ML вам интересна, сделать первые шаги в этом направлении вы сможете уже 19 октября на демо-уроке Вывод ML моделей в промышленную среду на примере онлайн-рекомендаций, который проведет Дмитрий Бугайченко управляющий директор в Сбербанке. Так как занятие рассчитано на специалистов с опытом в работе с данными, для регистрации понадобится пройти вступительное тестирование.

Сам курс Промышленный ML на больших данных стартует 30 октября. Познакомиться с преподавательским составом и программой можно здесь.

До встречи на занятиях!

Подробнее..

Spark schemaEvolution на практике

19.10.2020 16:12:07 | Автор: admin
Уважаемые читатели, доброго дня!

В данной статье ведущий консультант бизнес-направления Big Data Solutions компании Неофлекс, подробно описывает варианты построения витрин переменной структуры с использованием Apache Spark.

В рамках проекта по анализу данных, часто возникает задача построения витрин на основе слабо структурированных данных.

Обычно это логи, или ответы различных систем, сохраняемые в виде JSON или XML. Данные выгружаются в Hadoop, далее из них нужно построить витрину. Организовать доступ к созданной витрине можем, например, через Impala.

В этом случае схема целевой витрины предварительно неизвестна. Более того, схема еще и не может быть составлена заранее, так как зависит от данных, а мы имеем дело с этими самыми слабо структурированными данными.

Например, сегодня логируется такой ответ:

{source: "app1", error_code: ""}

а завтра от этой же системы приходит такой ответ:

{source: "app1", error_code: "error", description: "Network error"}

В результате в витрину должно добавиться еще одно поле description, и придет оно или нет, никто не знает.

Задача создания витрины на таких данных довольно стандартная, и у Spark для этого есть ряд инструментов. Для парсинга исходных данных есть поддержка и JSON, и XML, а для неизвестной заранее схемы предусмотрена поддержка schemaEvolution.

С первого взгляда решение выглядит просто. Надо взять папку с JSON и прочитать в dataframe. Spark создаст схему, вложенные данные превратит в структуры. Далее все нужно сохранить в parquet, который поддерживается в том числе и в Impala, зарегистрировав витрину в Hive metastore.

Вроде бы все просто.

Однако, из коротких примеров в документации не ясно, что делать с рядом проблем на практике.

В документации описывается подход не для создания витрины, а для чтения JSON или XML в dataframe.

А именно, просто приводится как прочитать и распарсить JSON:

df = spark.read.json(path...)

Этого достаточно, чтобы сделать данные доступными для Spark.

На практике сценарий гораздо сложнее, чем просто прочитать JSON файлы из папки, и создать dataframe. Ситуация выглядит так: уже есть определенная витрина, каждый день приходят новые данные, их нужно добавить в витрину, не забывая, что схема может отличаться.

Обычная схема построения витрины такая:

Шаг 1. Данные загружаются в Hadoop с последующей ежедневной дозагрузкой и складываются в новую партицию. Получается партиционированная по дням папка с исходными данными.

Шаг 2. В ходе инициализирующей загрузки эта папка читается и парсится средствами Spark. Полученный dataframe сохраняется в формат, доступный для анализа, например, в parquet, который потом можно импортировать в Impala. Так создается целевая витрина со всеми данными, которые накопились к этому моменту.

Шаг 3. Создается загрузка, которая будет каждый день обновлять витрину.
Возникает вопрос инкрементальной загрузки, необходимость партиционирования витрины, и вопрос поддержки общей схемы витрины.

Приведем пример. Допустим, реализован первый шаг построения хранилища, и настроена выгрузка JSON файлов в папку.

Создать из них dataframe, чтобы потом сохранить как витрину, проблем не составляет. Это тот самый первый шаг, который легко можно найти в документации Spark:

df = spark.read.option("mergeSchema", True).json(".../*") df.printSchema()root |-- a: long (nullable = true) |-- b: string (nullable = true) |-- c: struct (nullable = true) |    |-- d: long (nullable = true)

Вроде бы все хорошо.

Мы прочитали и распарсили JSON, далее сохраняем dataframe как parquet, регистрируя в Hive любым удобным способом:

df.write.format(parquet).option('path','<External Table Path>').saveAsTable('<Table Name>')

Получаем витрину.

Но, на другой день добавились новые данные из источника. У нас есть папка с JSON, и витрина, созданная на основе данной папки. После загрузки следующей порции данных из источника в витрине не хватает данных за один день.

Логичным решением будет партиционировать витрину по дням, что позволит каждый следующий день добавлять новую партицию. Механизм этого тоже хорошо известен, Spark позволяет записывать партиции отдельно.

Сначала делаем инициализирующую загрузку, сохраняя данные как было описано выше, добавив только партиционирование. Это действие называется инициализация витрины и делается только один раз:

df.write.<b>partitionBy("date_load")</b><u></u>.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

На следующий день загружаем только новую партицию:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Остается только перерегистрировать в Hive, чтобы обновить схему.
Однако, здесь и возникают проблемы.

Первая проблема. Рано или поздно получившийся parquet нельзя будет прочитать. Это связано с тем, как по-разному подходят parquet и JSON к пустым полям.

Рассмотрим типичную ситуацию. Например, вчера приходит JSON:

День 1: {"a": {"b": 1}},

а сегодня этот же JSON выглядит так:

День 2: {"a": null}

Допустим, у нас две разных партиции, в которых по одной строке.
Когда мы читаем исходные данные целиком, Spark сумеет определить тип, и поймет, что a это поле типа структура, со вложенным полем b типа INT. Но, если каждая партиция была сохранена по отдельности, то получается parquet с несовместимыми схемами партиций:

df1 (a: <struct<"b": INT>>)df2 (a: STRING NULLABLE)

Данная ситуация хорошо известна, поэтому специально добавлена опция при парсинге исходных данных удалять пустые поля:

df = spark.read.json("...", dropFieldIfAllNull=True)

В этом случае parquet будет состоять из партиций, которые можно будет прочитать вместе.
Хотя те, кто делал это на практике, тут горько усмехнутся. Почему? Да потому, что скорее всего возникнут еще две ситуации. Или три. Или четыре. Первая, которая возникнет почти наверняка, числовые типы будут по-разному выглядеть в разных JSON файлах. Например, {intField: 1} и {intField: 1.1}. Если такие поля попадутся в одной партции, то мерж схемы прочитает все правильно, приведя к самому точному типу. А вот если в разных, то в одной будет intField: int, а в другой intField: double.

Для обработки этой ситуации есть следующий флаг:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Теперь у нас есть папка, где находятся партиции, которые можно прочитать в единый dataframe и валидный parquet всей витрины. Да? Нет.

Надо вспомнить, что мы регистрировали таблицу в Hive. Hive не чувствителен к регистру в названиях полей, а parquet чувствителен. Поэтому партиции со схемами: field1: int, и Field1: int для Hive одинаковые, а для Spark нет. Надо не забыть привести названия полей к нижнему регистру.

Вот после этого, кажется, все хорошо.

Однако, не все так просто. Возникает вторая, тоже хорошо известная проблема. Так как каждая новая партиция сохраняется отдельно, то в папке партиции будут лежать служебные файлы Spark, например, флаг успешности операции _SUCCESS. Это приведет к ошибке при попытке parquet. Чтобы этого избежать, надо настроить конфигурацию, запретив Spark дописывать в папку служебные файлы:

hadoopConf = sc._jsc.hadoopConfiguration()hadoopConf.set("parquet.enable.summary-metadata", "false")hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Кажется, теперь каждый день в папку целевой витрины добавляется новая parquet партиция, где лежат распарсенные данные за день. Мы заранее озаботились тем, чтобы не было партиций с конфликтом типов данных.

Но, перед нами третья проблема. Теперь общая схема не известна, более того, в Hive таблица с неправильной схемой, так как каждая новая партиция, скорее всего, внесла искажение в схему.

Нужно перерегистрировать таблицу. Это можно сделать просто: снова прочитать parquet витрины, взять схему и создать на ее основе DDL, с которым заново зарегистрировать папку в Hive как внешнюю таблицу, обновив схему целевой витрины.

Перед нами возникает четвертая проблема. Когда мы регистрировали таблицу первый раз, мы опирались на Spark. Теперь делаем это сами, и нужно помнить, что поля parquet могут начинаться с символов, недопустимых для Hive. Например, Spark выкидывает строки, которые не смог распарсить в поле corrupt_record. Такое поле нельзя будет зарегистрировать в Hive без того, чтобы экранировать.

Зная это, получаем схему:

f_def = ""for f in pf.dtypes:  if f[0] != "date_load":    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"hc.sql("drop table if exists jsonevolvtable")hc.sql(table_define)

Код ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace(array<`, array<) делает безопасный DDL, то есть вместо:

create table tname (_field1 string, 1field string)

С такими названиями полей как _field1, 1field, делается безопасный DDL, где названия полей экранированы: create table `tname` (`_field1` string, `1field` string).

Возникает вопрос: как правильно получить dataframe с полной схемой (в коде pf)? Как получить этот pf? Это пятая проблема. Перечитывать схему всех партиций из папки с parquet файлами целевой витрины? Это метод самый безопасный, но тяжелый.

Схема уже есть в Hive. Получить новую схему можно, объединив схему всей таблицы и новой партиции. Значит надо схему таблицы брать из Hive и объединить ее со схемой новой партиции. Это можно сделать, прочитав тестовые метаданные из Hive, сохранив их во временную папку и прочитав с помощью Spark обе партиции сразу.

По сути, есть все, что нужно: исходная схема таблицы в Hive и новая партиция. Данные у нас тоже есть. Остается лишь получить новую схему, в которой объединяется схема витрины и новые поля из созданной партиции:

from pyspark.sql import HiveContextfrom pyspark.sql.functions import lithc = HiveContext(spark)df = spark.read.json("...", dropFieldIfAllNull=True)df.write.mode("overwrite").parquet(".../date_load=12-12-2019")pe = hc.sql("select * from jsonevolvtable limit 1")pe.write.mode("overwrite").parquet(".../fakePartiton/")pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Далее создаем DDL регистрации таблицы, как в предыдущем фрагменте.
Если вся цепочка работает правильно, а именно была инициализирующая загрузка, и в Hive правильно созданная таблица, то мы получаем обновленную схему таблицы.

И последняя, проблема заключается в том, что нельзя так просто добавить партицию в таблицу Hive, так как она будет сломана. Необходимо заставить Hive починить у себя структуру партиций:

from pyspark.sql import HiveContexthc = HiveContext(spark) hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Простая задача чтения JSON и создания на его основе витрины выливается в преодоление ряда неявных трудностей, решения для которых приходится искать по отдельности. И хотя эти решения простые, но на их поиск уходит много времени.

Чтобы реализовать построение витрины пришлось:

Добавлять партиции в витрину, избавившись от служебных файлов
Разобраться с пустыми полями в исходных данных, которые Spark типизировал
Привести простые типы к строке
Привести названия полей к нижнему регистру
Разделить выгрузку данных и регистрацию таблицы в Hive (создание DDL)
Не забыть экранировать названия полей, которые могут быть несовместимы с Hive
Научиться обновлять регистрацию таблицы в Hive

Подводя итог, отметим, что решение по построению витрин таит много подводных камней. Поэтому при возникновении сложностей в реализации лучше обратиться к опытному партнеру с успешной экспертизой.

Благодарим за чтение данной статьи, надеемся, что информация окажется полезной.
Подробнее..

Перевод Что такое фильтр Блума?

09.02.2021 02:22:17 | Автор: admin

Всем привет! В этой статье я постараюсь описать, что такое фильтр Блума, рассказать о его назначении и показать сценарии, в которых его можно использовать. Я также реализую фильтр Блума на Python с нуля в целях облегчения понимания его внутреннего устройства.

Назначение фильтра Блума

Фильтр Блума это структура данных, цель которой быстро проверить, что элемент НЕ входит в множество (для тех, кто знаком с нотацией O большое, сложность вставки и проверки принадлежности элемента к множеству с помощью фильтра Блума O(1)). Он может быть очень полезен для предотвращения излишнего выполнения задач, требующих интенсивных вычислений, просто проверяя, что элемент совершенно точно не входит в множество. Важно понимать, что фильтр Блума это вероятностная структура данных: он может сказать вам со 100% вероятностью, что элемент отсутствует в наборе данных, но сказать со 100% вероятностью, что элемент находится в наборе, он не может (возможны ложно положительные результаты). Давайте же поговорим о сценариях, в которых можно использовать фильтр Блума с подробным объяснением его внутреннего устройства и реализацией на Python, и позже вы поймете, откуда фильтр Блума имеет такие показатели!

Фильтр Блума обычно используется перед поиском в множестве с достаточно медленным доступом к элементам. За счет его использования может быть уменьшено как количество поисковых операций над множеством, так и время поиска в целом.

Сценарии использования

Давайте поразмыслим о сценариях, в которых для ускорения вычисления некоторых задач такая структура данных могла бы оказаться очень полезной. Например, мы можем начать с маршрутизатора опорной сети (не такого, который вы можете найти у себя дома). От таких маршрутизаторов может требоваться скорость в uplink более 100 Гбит/с. Администратору может понадобиться создать черный список IP-адресов, чтобы заблокировать им доступ в сеть. Это означает, что каждый раз, когда маршрутизатор получает пакет на скорости более 100 Гбит/с, он должен обращаться к своей памяти и выполнять в лучшем случае логарифмический поиск (O(log(n))), чтобы проверить, заблокирован ли IP-адрес, учитывая, что большинство IP-адресов не заблокированы и что поиск не даст результатов для большинства пакетов. В этом случае фильтр Блума может быть реализован как раз перед доступом к памяти, чтобы гарантировать, что большинству пакетов не нужно дожидаться поиска IP-адреса для отправки в сеть.

Еще один сценарий это базы данных. Когда к базе данных происходят миллионы обращений в секунду, и большинство обращений представляют из себя поиск по ключу, которого в базе данных нет, максимально уменьшить нагрузку таких обращений на базу данных может быть важно по двум причинам: если сократить количество поисков, ядро базы данных будет быстрее отвечать на другие обращения; если клиент может не дожидаться поиска по базе данных и получить результат (например, не из памяти) без необходимости доступа к базе данных, достигнутое ускорение может быть весьма существенным.

Наконец, чтобы ускорить процесс поиска файла в папке с большим количеством файлов, можно использовать фильтр Блума, чтобы сначала проверить, отсутствует ли файл в этой папке.

Более типичные сценарии использования фильтра Блума можно найти здесь.

Что из себя представляет фильтр Блума?

Чтобы проиллюстрировать устройство фильтра Блума, мы будем использовать первый сценарий. Представьте, что вы внесли в черный список 100 IP-адресов. Самый простой способ пометить, есть ли IP-адрес в черном списке или нет, создать список из 100 бит, где каждый бит это один IP. Если IP-адрес занесен в черный список, мы отмечаем его позицию как 1, в противном случае 0.

В этом фильтре Блума 4-й IP-адрес занесен в черный список, а все остальные нет.

Сколько всего IP-адресов?

Эта реализация работает, если используются только 100 IP. В реальности же каждый IPv4-адрес имеет 32 бита, что означает, что существует 4 294 967 296 (2^32) возможных адресов (некоторые из них зарезервированы для приватных, бродкастных, мультикастных и других специальных сетей, но оставшихся адресов все еще огромное количество)! И количество IP-адресов в черном списке, вероятно, не превысит нескольких сотен в самом крайнем случае. Мы не можем позволить себе составлять такой большой список, чтобы использовать его для такого относительно небольшого количества записей. Нам нужно найти способ сопоставления IP-адреса и записей в списке. И вот тут-то и приходят на помощь хеш-функции.

Хеш-функция

Хеш-функция это функция, которая преобразует вход произвольной длины в значение фиксированного размера. Таким образом, мы можем создать массив фиксированного размера и вычислить результат хеш-функции по конкретному IP-адресу, и он всегда будет генерировать число, меньшее или равное размеру массива. Хеш-функция не является случайной функцией, что означает, что для одного и того же ввода вывод всегда будет одинаковым.

Хеш-функция получает ввод, который может быть любой строкой (в данном случае IP), и вычисляет числовое представление. В этом случае числовое представление будет позицией в фильтре Блума, соответствующей входным данным.

Но подождите Что-то не так. Вернемся к нашему сценарию. Представьте, что мы занесли в черный список 100 IP-адресов. Как хеш-функция точно сопоставит наши 100 из возможных 2^32 IP-адресов на 100 различных значений без сохранения какой-либо информации о них? Правда в том, что никак. Будут коллизии. Хэш-функция гарантирует, что каждый IP-адрес будет иметь собственное сопоставление с числом, но, поскольку может быть 4 294 967 296 (2^32) возможных IP-адресов, невозможно сопоставить их все с всего лишь сотней различных значений. Все, что может гарантировать хеш-функция, это то, что она скремблирует биты входных данных так, чтобы выходные данные согласовались с равномерным распределением. Это означает, что если вы измените ввод хеш-функции с 192.168.1.1 на 192.168.1.2, вывод, вероятно, будет совершенно другим, кажущимся случайным (но в действительности не случайным, поскольку каждому вводу всегда будет соответствовать один и тот же вывод).

Пример коллизии. Два разных IP-адреса имеют одинаковый хеш, а это означает, что их индекс в фильтре Блума будет одинаковым.

Хорошо, теперь с самого начала: мы заносим в черный список 100 IP-адресов. Каждый IP-адрес будет проходить через хеш-функцию, и результат хеш-функции вернет число, меньшее или равное размеру массива. Это число будет индексом массива, который отмечает, был ли IP-адрес в черном списке или нет. Но будут коллизии как нам с этим справиться?

Предположим, что IP-адреса 178.23.12.63 и 112.64.90.12 имеют одинаковый хеш. Первый IP попал в черный список, второй нет. Когда мы проверяем, находится ли хеш второго IP-адреса в фильтре Блума, мы его там найдем, даже если этот IP-адрес никогда не попадал в черный список. Означает ли это, что у нас есть ошибка?

Помните, что в начале я предупреждал, что цель фильтра Блума проверить, что элемент НЕ входит в набор данных. Если позиция элемента в фильтре Блума равна 0, этот элемент определенно НЕ входит в набор. Однако, если позиция элемента в фильтре Блума равна 1, то либо этот элемент может все-таки быть в наборе, либо это просто коллизия. Все, что мы можем сделать, это уменьшить вероятность коллизии, чтобы уменьшить количество обращений к памяти, необходимых для проверки, действительно ли IP находится в черном списке.

Снижение вероятности коллизий

Есть два основных способа снизить вероятность коллизий, и оба с нюансами. Одна из возможностей увеличить размер массива. Если мы увеличим размер массива (и, следовательно, заставим хеш-функцию возвращать число меньше или того же размера, что и новый размер массива), вероятность коллизий уменьшается. В частности, вероятность ложного срабатывания (фильтр Блума возвращает 1, когда элемент отсутствует в наборе) составляет (1-e^(m / n)), где m количество элементов, которые предполагается внести в фильтр, а n размер фильтра.

Другой способ уменьшить вероятность коллизии увеличить количество хеш-функций. Это означает, что в нашем сценарии для одного IP-адреса будет использоваться несколько различных хеш-функций, т.е. несколько различных мест в массиве будет помечаться как 1. Если мы используем k хеш-функций, вероятность ложного срабатывания теперь (1-e^(mk/n))^k, что означает, что оптимальное количество хеш-функций равно (n/m)*ln(2) (подробнее об этих уравнениях можно почитать здесь).

Пример фильтра Блума с двумя хеш-функциями. В одном из хешей этих IP-адресов мы наблюдаем коллизию, но все-равно можно проверить, что IP 112.64.90.12 не входит в набор, потому что одна из его позиций фильтра Блума не равна 1.

Ну что ж, а теперь давайте реализуем фильтр Блума в Python и посмотрим на результат! Это займет у нас всего около 50 строк кода.

Давайте начнем с создания класса BloomFilter (в следующем фрагменте кода). Конструктор получает размер фильтра Блума и (это опционально) количество ожидаемых элементов, которые будет хранить этот фильтр Блума. Для создания массива из битов мы будем использовать библиотеку bitarray, и мы установим их все в ноль. Наконец, мы устанавливаем количество хеш-функций в уравнение, которое возвращает оптимальное количество хеш-функций, учитывая размер фильтра Блума и количество ожидаемых элементов.

import mathfrom bitarray import bitarrayclass BloomFilter(object):    def __init__(self, size, number_expected_elements=100000):        self.size = size        self.number_expected_elements = number_expected_elements        self.bloom_filter = bitarray(self.size)        self.bloom_filter.setall(0)        self.number_hash_functions = round((self.size / self.number_expected_elements) * math.log(2))

Теперь давайте определим хеш-функцию для фильтра Блума. Используемая реализация (взятая отсюда) реализует алгоритм DJB2. Сейчас мы будем использовать его как черный ящик, поскольку объяснение это алгоритма выходит за рамки темы этой статьи.

def _hash_djb2(self, s):        hash = 5381        for x in s:            hash = ((hash << 5) + hash) + ord(x)        return hash % self.size

Теперь у нас есть хеш-функция, но как нам создать K хеш-функций? Мы можем сделать один простой фокус. Вместо того, чтобы создавать разные хеш-функции, мы просто будем добавлять число к каждому вводу в хеш-функцию. Число будет представлять из себя номер вызываемой хэш-функции. Поскольку любая небольшая разница во вводе хеш-функции результирует в совершенно другом хеше, результат можно рассматривать как другую хеш-функцию. Круто, правда?

def _hash(self, item, K):        return self._hash_djb2(str(K) + item)

Теперь давайте создадим функцию для добавления элемента в фильтр Блума. Для этого давайте проитерируем все хеш-функции, вычисляя хеш для элемента и, наконец, помещая 1 (или True) в индекс хеша.

def add_to_filter(self, item):        for i in range(self.number_hash_functions):            self.bloom_filter[self._hash(item, i)] = 1

Осталось только создать функцию, которая проверяет, что элемент НЕ находится в фильтре Блума. Для этого давайте еще раз проитерируем по всем хеш-функциям. Если какая-либо из позиций фильтра Блума имеет 0, мы можем сказать, что элемент определенно отсутствует в наборе. В противном случае, если все позиции имеют 1, мы не можем сказать, что элемента нет в наборе.

 def check_is_not_in_filter(self, item):        for i in range(self.number_hash_functions):            if self.bloom_filter[self._hash(item, i)] == 0:                return True        return False

И все! Мы реализовали наш фильтр Блума. Давай протестируем его!

Мы создадим простой тест, чтобы проверить, работает ли он. Давайте создадим фильтр Блума с 1 миллионом записей, а затем установим ожидаемое количество элементов равным 100 000. Мы собираемся добавить элемент 192.168.1.1 в наш фильтр Блума в качестве заблокированного IP-адреса.

bloom_filter = BloomFilter(1000000, 100000)base_ip = "192.168.1."bloom_filter.add_to_filter(base_ip + str(1))

Чтобы проверить это, мы будем итерировать i от 1 до 100 000 и проверять, находится ли IP 192.168.1.i в фильтре Блума (нет таких IP-адресов, где i>254, например 192.168.289, но в данном случае мы просто проводим тест). Мы выведем элементы, о которых мы не знаем, входят ли они в набор; все остальные элементы, которые не будут напечатаны, точно не входят в набор.

for i in range(1, 100000):    if not bloom_filter.check_is_not_in_filter(base_ip + str(i)):        print(base_ip+str(i))

192.168.1.1

Да! Наш фильтр Блума говорит, что из 100 000 IP-адресов единственный элемент, который может быть заблокированным, это действительно наш заблокированный IP-адрес. Никаких ложноположительных результатов!

Вот полный код нашего фильтра Блума:

import mathfrom bitarray import bitarrayclass BloomFilter(object):    def __init__(self, size, number_expected_elements=100000):        self.size = size        self.number_expected_elements = number_expected_elements        self.bloom_filter = bitarray(self.size)        self.bloom_filter.setall(0)        self.number_hash_functions = round((self.size / self.number_expected_elements) * math.log(2))    def _hash_djb2(self, s):        hash = 5381        for x in s:            hash = ((hash << 5) + hash) + ord(x)        return hash % self.size    def _hash(self, item, K):        return self._hash_djb2(str(K) + item)    def add_to_filter(self, item):        for i in range(self.number_hash_functions):            self.bloom_filter[self._hash(item, i)] = 1    def check_is_not_in_filter(self, item):        for i in range(self.number_hash_functions):            if self.bloom_filter[self._hash(item, i)] == 0:                return True        return Falsebloom_filter = BloomFilter(1000000, 100000)base_ip = "192.168.1."bloom_filter.add_to_filter(base_ip + str(1))for i in range(1, 100000):    if not bloom_filter.check_is_not_in_filter(base_ip + str(i)):        print(base_ip+str(i))

Вот и все, что касается фильтров Блума. Я надеюсь, что вам было интересно узнать, что такое фильтр Блума и как его реализовать. Спасибо за внимание!


Перевод статьи подготовлен в преддверии старта курса Data Engineer.

Также приглашаем всех желающих на бесплатный демо-урок по теме ML в Spark. На занятии участники вместе с экспертом узнают об особенностях ML в Spark, рассмотрят процесс разработки моделей и научатся переводить обученные модели в production

Подробнее..

Что нам стоит загрузить JSON в Data Platform

16.06.2021 16:13:28 | Автор: admin

Всем привет!

В недавней статье мы рассказали, как мы шли к построению нашей Data Platform. Сегодня хотелось бы глубже погрузиться в желудок нашей платформы и попутно рассказать вам о том, как мы решали одну из задач, которая возникла в связи с ростом разнообразия интегрируемых источников данных.

То есть, если возвращаться к финальной картинке из упомянутой выше статьи (специально дублирую ее, чтобы уважаемым читателям было удобнее), то сегодня мы будем более углубленно говорить о реализации правой части схемы той, что лежит после Apache NiFi.

Схема из прошлой нашей статьи.Схема из прошлой нашей статьи.

Напомним, что в нашей компании более 350 реляционных баз данных. Естественно, не все они уникальны и многие представляют собой по сути разные экземпляры одной и той же системы, установленной во всех магазинах торговой сети, но все же зоопарк разнообразия присутствует. Поэтому без какого-либо Frameworkа, упрощающего и ускоряющего интеграцию источников в Data Platform, не обойтись.

Общая схема доставки данных из источников в ODS-слой Greenplum посредством разработанного нами frameworkа приведена ниже:

Общая схема доставки данных в ODS-слой Greenplum Общая схема доставки данных в ODS-слой Greenplum
  1. Данные из систем-источников пишутся в Kafka в AVRO-формате, обрабатываются в режиме реального времени Apache NiFi, который сохраняет их в формате parquet на S3.

  2. Затем эти файлы с сырыми данными с помощью Sparkа обрабатываются в два этапа:

    1. Compaction на данном этапе выполняется объединение для снижения количества выходных файлов с целью оптимизации записи и последующего чтения (то есть несколько более мелких файлов объединяются в несколько файлов побольше), а также производится дедубликация данных: простой distinct() и затем coalesce(). Результат сохраняется на S3. Эти файлы используются затем для parsing'а , а также являются своеобразным архивом сырых необработанных данных в формате как есть;

    2. Parsing на этой фазе производится разбор входных данных и сохранение их в плоские структуры согласно маппингу, описанному в метаданных. В общем случае из одного входного файла можно получить на выходе несколько плоских структур, которые в виде сжатых (как правило gzip) CSV-файлов сохраняются на S3.

  3. Заключительный этап загрузка данных CSV-файлов в ODS-слой хранилища: создается временная external table над данными в S3 через PXF S3 connector, после чего данные уже простым pgsql переливаются в таблицы ODS-слоя Greenplum

  4. Все это оркестрируется с помощью Airflow.

DAGи для Airflow у нас генерируются динамически на основании метаданных. Parsing файлов и разложение их в плоские структуры также производится с использованием метаданных. Это приводит к упрощению интеграции нового источника, так как, для этого необходимо всего лишь:

  • создать в ODS-слое Хранилища таблицы-приемники данных;

  • в репозитории метаданных в Git согласно принятым стандартам прописать в виде отдельных YAML-файлов:

    • общие настройки источника (такие как: расписание загрузки, входной и выходной формат файлов с данными, S3-бакет, имя сервисного пользователя, имя и email владельца для нотификации в случае сбоя загрузки и т.п.);

    • маппинг данных объектов источника на таблицы слоя ODS (при этом поддерживаются как плоские, так и вложенные структуры и массивы, а также есть возможность данные из одного объекта раскладывать в ODS-слое по нескольким таблицам). То есть описать, как необходимо сложную вложенную структуру разложить в плоские таблицы;

До недавнего времени такой подход удовлетворял текущие наши потребности, но количество и разнообразие источников данных растет. У нас стали появляться источники, которые не являются реляционными базами данных, а генерируют данные в виде потока JSON-объектов. Кроме того на горизонте уже маячила интеграция источника, который под собой имел MongoDB и поэтому будет использовать MongoDB Kafka source connector для записи данных в Kafka. Поэтому остро встала необходимость доработки нашего frameworkа для поддержки такого сценария. Хотелось, чтобы данные источника сразу попадали на S3 в формате JSON - то есть в формате "как есть", без лишнего шага конвертации в parquet посредством Apache NiFi.

В первую очередь необходимо было доработать шаг Compaction. Его код, если убрать всю обвязку и выделить только главное, очень простой:

df = spark.read.format(in_format) \               .options(**in_options) \               .load(path) \               .distinct()    new_df = df.coalesce(div)new_df.write.mode("overwrite") \             .format(out_format) \            .options(**out_options) \            .save(path)

Но если мы проделаем все те же манипуляции с JSON-данными, то волей-неволей внесем изменения во входные данные, так как при чтении JSONов Spark автоматом определит и сделает mergeSchema, т.е. мы тем самым можем исказить входные данные, чего не хотелось бы. Ведь наша задача на этом шаге только укрупнить файлы и дедублицировать данные, без какого-либо вмешательства в их структуру и наполнение. То есть сохранить их как есть.

По-хорошему, нам надо было просто прочитать данные как обычный текст, дедублицировать строки, укрупнить файлы и положить обратно на S3. Для этого был предложен достаточно изящный способ:

рассматривать файлы с JSON-объектами как DataFrame с одной колонкой, содержащей весь JSON-объект.

Попробуем сделать это. Допустим, мы имеем следующий файл данных:

file1:

{productId: 1, productName: ProductName 1, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}{productId: 2, price: 10.01, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}

Обратите внимание на формат этого файла. Это файл с JSON-объектами, где 1 строка = 1 объект. Оставаясь, по сути, JSON-ом, он при этом не пройдет синтаксическую JSON-валидацию. Именно в таком виде мы сохраняем JSON-данные на S3 (есть специальная "галочка в процессоре Apache NiFi).

Прочитаем файл предлагаемым способом:

# Читаем данныеdf = spark.read \          .format("csv") \          .option("sep", "\a") \          .load("file1.json")# Схема получившегося DataFramedf.printSchema()root |-- _c0: string (nullable = true)# Сами данныеdf.show()+--------------------+|                 _c0|+--------------------+|{"productId": 1, ...||{"productId": 2, ...|+--------------------+

То есть мы тут читаем JSON как обычный CSV, указывая разделитель, который никогда заведомо не встретится в наших данных. Например, Bell character. В итоге мы получим DataFrame из одного поля, к которому можно будет также применить dicstinct() и затем coalesce(), то есть менять существующий код не потребуется. Нам остается только определить опции в зависимости от формата:

# Для parquetin_format = "parquet"in_options = {}# Для JSONin_format = "csv"in_options = {"sep": "\a"}

Ну и при сохранении этого же DataFrame обратно на S3 в зависимости от формата данных опять применяем разные опции:

df.write.mode("overwrite") \           .format(out_format) \.options(**out_options) \  .save(path)  # для JSON     out_format = "text" out_options = {"compression": "gzip"}  # для parquet   out_format = input_format out_options = {"compression": "snappy"}

Следующей точкой доработки был шаг Parsing. В принципе, ничего сложного, если бы задача при этом упиралась в одну маленькую деталь: JSON -файл, в отличии от parquet, не содержит в себе схему данных. Для разовой загрузки это не является проблемой, так как при чтении JSON-файла Spark умеет сам определять схему, и даже в случае, если файл содержит несколько JSON-объектов с немного отличающимся набором полей, корректно выполнит mergeSchema. Но для регулярного процесса мы не могли уповать на это. Банально может случиться так, что во всех записях какого-то файла с данными может не оказаться некоего поля field_1, так как, например, в источнике оно заполняется не во всех случаях. Тогда в получившемся Spark DataFrame вообще не окажется этого поля, и наш Parsing, построенный на метаданных, просто-напросто упадет с ошибкой из-за того, что не найдет прописанное в маппинге поле.

Проиллюстрирую. Допустим,у нас есть два файла из одного источника со следующим наполнением:

file1 (тот же что и в примере выше):

{productId: 1, productName: ProductName 1, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}{productId: 2, price: 10.01, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}

file2:

{productId: 3, productName: ProductName 3, dimensions: {length: 10, width: 12, height: 12.5, package: [10, 20.5, 30]}}

Теперь прочитаем Sparkом их и посмотрим данные и схемы получившихся DataFrame:

df = spark.read \          .format("json") \          .option("multiline", "false") \          .load(path)df.printSchema()df.show()

Первый файл (схема и данные):

root |-- dimensions: struct (nullable = true) |    |-- height: double (nullable = true) |    |-- length: long (nullable = true) |    |-- width: long (nullable = true) |-- price: double (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true) |-- tags: array (nullable = true) |    |-- element: string (containsNull = true)+--------------+-----+---------+-------------+--------------+|    dimensions|price|productId|  productName|          tags|+--------------+-----+---------+-------------+--------------+|[12.5, 10, 12]| null|        1|ProductName 1|[tag 1, tag 2]||[12.5, 10, 12]|10.01|        2|         null|[tag 1, tag 2]|+--------------+-----+---------+-------------+--------------+

Второй файл (схема и данные):

root |-- dimensions: struct (nullable = true) |    |-- height: double (nullable = true) |    |-- length: long (nullable = true) |    |-- package: array (nullable = true) |    |    |-- element: double (containsNull = true) |    |-- width: long (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true)+--------------------+---------+-------------+|          dimensions|productId|  productName|+--------------------+---------+-------------+|[12.5, 10, [10.0,...|        3|ProductName 3|+--------------------+---------+-------------+

Как видно, Spark корректно выстроил схему отдельно для каждого файла. Если в какой-либо записи не было обнаружено поля, имеющегося в другой, то в DataFrame мы видим корректное проставление null (поля price и productName для первого файла).

Но в целом схемы получились разные, и если у нас в маппинге прописано, что нам нужно распарсить эти данные (то есть оба файла) в следующую плоскую структуру,

root |-- price: double (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true)

а во входных данных у нас присутствуют только файлы а-ля file2, где поля price нет ни у одной записи, то Spark упадет с ошибкой, так как не найдет поля price для формирования выходного DataFrame. С parquet-файлами такой проблемы как правило не возникает, так как сам parquet-файл генерируется из AVRO, который уже содержит полную схему данных и, соответственно, эта полная схема есть и в parquet-файле.

Еще надо отметить, что, естественно, мы хотели по максимум переиспользовать уже существующий и зарекомендовавший себя код нашего frameworkа, а не городить какую-то полностью отдельную ветку для обработки JSONов то есть все изменения хотелось сделать на этапе чтения JSON-файлов с S3.

Таким образом очевидно, что для корректной загрузки данных из JSON-файлов необходимо предопределить схему JSON-файла с данными и читать файлы уже с применением этой схемы. Тогда у нас даже если в JSONе нет какого-то поля, то в самом DataFrame это поле будет, но его значение подставится как null:

df = spark.read \          .format("json") \          .option("multiline","false") \          .schema(df_schema) \          .load(path)

Первая мысль была использовать для хранения схемы имеющийся сервис метаданных - то есть описать схему в YAML-формате и сохранить в имеющемся репозитории. Но с учетом того, что все данные источников у нас проходят через Kafka, решили, что логично для хранения схем использовать имеющийся Kafka Schema Registry, а схему хранить в стандартном для JSON формате (другой формат, кстати говоря, Kafka Schema Registry не позволил бы хранить).

В общем, вырисовывалась следующая реализация:

  • Читаем из Kafka Schema Registry схему

  • Импортируем ее в pyspark.sql.types.StructType что-то типа такого:

# 1. получаем через Kafka Schema Registry REST API схему данных # 2. записываем ее в переменную schema и далее:df_schema = StructType.fromJson(schema)
  • Ну и с помощью полученной схемы читаем JSON-файлы

Звучит хорошо, если бы Давайте посмотрим на формат JSON-схемы, понятной Sparkу. Пусть имеем простой JSON из file2 выше. Посмотреть его схему в формате JSON можно, выполнив:

df.schema.json()  
Получившаяся схема
{    "fields":    [        {            "metadata": {},            "name": "dimensions",            "nullable": true,            "type":            {                "fields":                [                    {"metadata":{},"name":"height","nullable":true,"type":"double"},                    {"metadata":{},"name":"length","nullable":true,"type":"long"},                    {"metadata":{},"name":"width","nullable":true,"type":"long"}                ],                "type": "struct"            }        },        {            "metadata": {},            "name": "price",            "nullable": true,            "type": "double"        },        {            "metadata": {},            "name": "productId",            "nullable": true,            "type": "long"        },        {            "metadata": {},            "name": "productName",            "nullable": true,            "type": "string"        },        {            "metadata": {},            "name": "tags",            "nullable": true,            "type":            {                "containsNull": true,                "elementType": "string",                "type": "array"            }        }    ],    "type": "struct"}

Как видно, это совсем не стандартный формат JSON-схемы.

Но мы же наверняка не первые, у кого встала задача конвертировать стандартную JSON-схему в формат, понятный Sparkу - подумали мы и В принципе, не ошиблись, но несколько часов поиска упорно выводили исключительно на примеры, из серии:

как сохранить схему уже прочитанного DataFrame в JSON, затем использовать повторно

либо на репозиторий https://github.com/zalando-incubator/spark-json-schema, который нам бы подошел, если мы использовали Scala, а не pySpark

В общем, на горизонте маячила перспектива писать SchemaConverter. Сперва решили отделаться малой кровью и написать простенький конвертер никаких ссылок и прочих сложных конструкций. Конвертер был успешно протестирован на синтетических данных, после чего захотелось скормить ему что-то приближенное к реальности.

К счастью, у нас уже был один источник, генерирующий данные в формате JSON. Как временное решение схема его интеграции в DataPlatform была незамысловата: NiFi читал данные из Kafka, преобразовывал их в parquet, использую прибитую гвоздями в NiFi схему в формате AVRO-schema, и складывал на S3. Схема данных была действительно непростой и с кучей вложенных структур и нескольких десятков полей - неплохой тест-кейс в общем-то:

Посмотреть длинную портянку, если кому интересно :)
root |-- taskId: string (nullable = true) |-- extOrderId: string (nullable = true) |-- taskStatus: string (nullable = true) |-- taskControlStatus: string (nullable = true) |-- documentVersion: long (nullable = true) |-- buId: long (nullable = true) |-- storeId: long (nullable = true) |-- priority: string (nullable = true) |-- created: struct (nullable = true) |    |-- createdBy: string (nullable = true) |    |-- created: string (nullable = true) |-- lastUpdateInformation: struct (nullable = true) |    |-- updatedBy: string (nullable = true) |    |-- updated: string (nullable = true) |-- customerId: string (nullable = true) |-- employeeId: string (nullable = true) |-- pointOfGiveAway: struct (nullable = true) |    |-- selected: string (nullable = true) |    |-- available: array (nullable = true) |    |    |-- element: string (containsNull = true) |-- dateOfGiveAway: string (nullable = true) |-- dateOfGiveAwayEnd: string (nullable = true) |-- pickingDeadline: string (nullable = true) |-- storageLocation: string (nullable = true) |-- currentStorageLocations: array (nullable = true) |    |-- element: string (containsNull = true) |-- customerType: string (nullable = true) |-- comment: string (nullable = true) |-- totalAmount: double (nullable = true) |-- currency: string (nullable = true) |-- stockDecrease: boolean (nullable = true) |-- offline: boolean (nullable = true) |-- trackId: string (nullable = true) |-- transportationType: string (nullable = true) |-- stockRebook: boolean (nullable = true) |-- notificationStatus: string (nullable = true) |-- lines: array (nullable = true) |    |-- element: struct (containsNull = true) |    |    |-- lineId: string (nullable = true) |    |    |-- extOrderLineId: string (nullable = true) |    |    |-- productId: string (nullable = true) |    |    |-- lineStatus: string (nullable = true) |    |    |-- lineControlStatus: string (nullable = true) |    |    |-- orderedQuantity: double (nullable = true) |    |    |-- confirmedQuantity: double (nullable = true) |    |    |-- assignedQuantity: double (nullable = true) |    |    |-- pickedQuantity: double (nullable = true) |    |    |-- controlledQuantity: double (nullable = true) |    |    |-- allowedForGiveAwayQuantity: double (nullable = true) |    |    |-- givenAwayQuantity: double (nullable = true) |    |    |-- returnedQuantity: double (nullable = true) |    |    |-- sellingScheme: string (nullable = true) |    |    |-- stockSource: string (nullable = true) |    |    |-- productPrice: double (nullable = true) |    |    |-- lineAmount: double (nullable = true) |    |    |-- currency: string (nullable = true) |    |    |-- markingFlag: string (nullable = true) |    |    |-- operations: array (nullable = true) |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |-- operationId: string (nullable = true) |    |    |    |    |-- type: string (nullable = true) |    |    |    |    |-- reason: string (nullable = true) |    |    |    |    |-- quantity: double (nullable = true) |    |    |    |    |-- dmCodes: array (nullable = true) |    |    |    |    |    |-- element: string (containsNull = true) |    |    |    |    |-- timeStamp: string (nullable = true) |    |    |    |    |-- updatedBy: string (nullable = true) |    |    |-- source: array (nullable = true) |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |-- type: string (nullable = true) |    |    |    |    |-- items: array (nullable = true) |    |    |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |    |    |-- assignedQuantity: double (nullable = true) |-- linkedObjects: array (nullable = true) |    |-- element: struct (containsNull = true) |    |    |-- objectType: string (nullable = true) |    |    |-- objectId: string (nullable = true) |    |    |-- objectStatus: string (nullable = true) |    |    |-- objectLines: array (nullable = true) |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |-- objectLineId: string (nullable = true) |    |    |    |    |-- taskLineId: string (nullable = true)

Естественно, я не захотел перебивать руками захардкоженную схему, а воспользовался одним из многочисленных онлайн-конвертеров, позволяющих из Avro-схемы сделать JSON-схему. И тут меня ждал неприятный сюрприз: все перепробованные мною конвертеры на выходе использовали гораздо больше синтаксических конструкций, чем понимала первая версия конвертера. Дополнительно пришло осознание, что также как и я, наши пользователи (а для нас пользователями в данном контексте являются владельцы источников данных) с большой вероятностью могут использовать подобные конвертеры для того, чтобы получить JSON-схему, которую надо зарегистрировать в Kafka Schema Registry, из того, что у них есть.

В результате наш SparkJsonSchemaConverter был доработан появилась поддержка более сложных конструкций, таких как definitions, refs (только внутренние) и oneOf. Сам же парсер был оформлен уже в отдельный класс, который сразу собирал на основании JSON-схемы объект pyspark.sql.types.StructType

У нас почти сразу же родилась мысль, что хорошо бы было поделиться им с сообществом, так как мы в Леруа Мерлен сами активно используем продукты Open Source, созданные и поддерживаемые энтузиастами со всего мира, и хотим не просто их использовать, но и контрибьютить обратно, участвуя в разработке Open Source продуктов и поддерживая сообщество. В настоящий момент мы решаем внутренние орг.вопросы по схеме выкладывания данного конвертера в Open Source и, уверен, что в ближайшее время поделимся с сообществом этой наработкой!

В итоге благодаря написанному SparkJsonSchemaConverterу доработка шага Parsing свелась только к небольшому тюнингу чтения данных с S3: в зависимости от формата входных данных источника (получаем из сервиса метаданных) читаем файлы с S3 немного по-разному:

# Для JSONdf = spark.read.format(in_format)\            .option("multiline", "false")\            .schema(json_schema) \            .load(path)# Для parquet:df = spark.read.format(in_format)\            .load(path)

А дальше отрабатывает уже существующий код, раскрывающий все вложенные структуры согласно маппингу и сохраняющий данные DataFrameа в несколько плоских CSV-файлов.

В итоге мы смогли при относительном минимуме внесенных изменений в код текущего frameworkа добавить в него поддержку интеграции в нашу Data Platform JSON-источников данных. И результат нашей работы уже заметен:

  • Всего через месяц после внедрения доработки у нас на ПРОДе проинтегрировано 4 новых JSON-источника!

  • Все текущие интеграции даже не заметили расширения функционала frameworkа, а значит, мы точно ничего не сломали произведенной доработкой.

Подробнее..

Перевод Руководство по столбчатым форматам файлов в Spark и Hadoop для начинающих

25.01.2021 18:10:11 | Автор: admin

Эта статья является продолжением руководства по форматам файлов в Spark и Hadoop, которое послужит хорошей отправной точкой, если вы еще ничего не знаете о форматах файлов big data.

Что из себя представляет столбчатый формат файла?

Этот термин часто используется, но я не уверен, что всем до конца ясно, что он означает на практике.

Определение из учебника гласит, что столбчатые (колоночные, многоколоночные, columnar) форматы файлов хранят данные по столбцам, а не по строкам. CSV, TSV, JSON и Avro традиционные строковые форматы файлов. Файл Parquet и ORC это столбчатые форматы файлов.

Давайте проиллюстрируем различия между этими двумя концепциями, используя примеры некоторых данных и простой наглядный столбчатый формат файла, который я только что придумал.

Вот схема данных набора люди. Она довольно проста:

{  id: Integer,  first_name: String,  last_name: String,  age: Integer,  cool: Boolean,  favorite_fruit: Array[String]}

Данные в строковом формате строки

Мы могли бы представить этот набор данных в нескольких форматах, например, вот неструктурированный JSON-файл:

{"id": 1, "first_name": "Matthew", "last_name": "Rathbone", "age": 19, "cool": true, "favorite_fruit": ["bananas", "apples"]}{"id": 2, "first_name": "Joe", "last_name": "Bloggs", "age": 102, "cool": true, "favorite_fruit": null}

Вот те же данные во всеми любимом формате CSV

1, Matthew, Rathbone, 19, True, ['bananas', 'apples']2, Joe, Bloggs, 102, True,

В обоих этих форматах, каждая строчка файла содержит полную строку данных. Именно так люди представляют себе данные: аккуратные строки выстроены в линию, которые легко сканировать, легко редактировать в Excel, легко читать в терминале или текстовом редакторе.

Данные в столбчатом формате

В целях демонстрации я собираюсь ввести новый столбчатый (псевдо) формат файла. Назовем его CCSV (Столбчатый CSV).

Каждая строка нашего CCSV-файла имеет следующее содержимое:

Field Name/Field Type/Number of Characters:[data in csv format]

Вот те же данные представленные в CCSV:

ID/INT/3:1,2FIRST_NAME/STRING/11:Matthew,JoeLAST_NAME/STRING/15:Rathbone,BloggsAGE/INT/6:19,102COOL/BOOL/3:1,1FAVORITE_FRUIT/ARRAY[STRING]/19:[bananas,apples],[]

Обратите внимание, что все имена, возраст и любимые фрукты хранятся рядом друг с другом, а не вместе с другими данными из той же записи. Чтобы каждый раздел столбца не становился слишком большим, CCSV будет повторять этот паттерн каждые 1000 записей. Таким образом, файл из 10 000 записей будет содержать 10 разделов сгруппированных столбцовых данных.

Могут ли люди легко читать CCSV-файлы? В принципе да, но вам будет сложно составить целостное представление о данных, если вы откроете их в Excel. Однако CCSV имеет несколько полезных свойств, которые делают его предпочтительнее для компьютеров, и сейчас я о них расскажу.

Преимущества столбчатых форматов

Оптимизация чтения

Представим, что я хочу выполнить SQL-запрос к этим данным, например:

SELECT COUNT(1) from people where last_name = "Rathbone"

С обычным CSV-файлом SQL-движку пришлось бы сканировать каждую строку, анализировать каждый столбец, извлекать значение last_name, а затем пересчитать все значения Rathbone, которые он увидит.

В CCSV SQL-движок может смело пропустить первые два поля и просто просканировать третью строку, которая содержит все доступные значения фамилий.

Почему это хорошо? Теперь SQL-движок обрабатывает только около 1/6 данных, т.е. CCSV только что обеспечил (теоретическое и совершенно необоснованное) увеличение производительности на 600% по сравнению с обычными CSV-файлами.

Представьте себе такой же выигрыш на наборе данных петабайтного масштаба. Нетрудно оценить оптимизацию столбчатых форматов файлов, позволяющую сэкономить массу вычислительной мощности (и денег) по сравнению с обычными наборами данных в JSON. Это основная ценность столбчатых форматов файлов.

Конечно, на самом деле CCSV необходимо проделать еще немалый путь, чтобы стать жизнеспособным файловым форматом, но это немного уводит нас от темы, поэтому я не буду здесь на этом заостряться.

Улучшения сжатия

Совместное хранение однотипных данных также сулит преимущества для кодеков сжатия. Многие кодеки сжатия (включая GZIP и Snappy) имеют более высокий коэффициент сжатия при сжатии последовательностей схожих данных. Сохраняя записи столбец за столбцом, во многих случаях каждый раздел данных столбца будет содержать похожие значения, что делает крайне пригодным для сжатия. Фактически, каждый столбец можно сжать независимо от других для дальнейшей оптимизации.

Недостатки столбчатых форматов

Самый большой недостаток столбчатых форматов состоит в том, что восстановление полной записи происходит медленнее и требует чтения сегментов из каждой строки, один за другим. Именно по этой причине столбчатые форматы файлов изначально подходят для рабочих процессов аналитики, а не для рабочих процессов в стиле Map/Reduce, которые по умолчанию работают с целыми строками данных за раз.

Для реальных столбчатых форматов файлов (например, Parquet) этот недостаток сводится к минимуму с помощью некоторых хитрых приемов, таких как разбиение файла на группы строк и создание обширных метаданных, хотя для особенно широких наборов данных (например, 200+ столбцов) влияние на скорость может быть значительным.

Другой недостаток состоит в том, что они требуют больше ресурсов ЦП и оперативной памяти для записи, поскольку средство записи файлов должно собрать целую кучу метаданных и реорганизовать строки, прежде чем он сможет записать файл.

Попутно замечу, что я почти всегда рекомендую использовать столбчатый формат файла, если это возможно, чтобы иметь возможность быстро заглянуть в файл и собрать некоторые простые показатели.

Реальные столбчатые форматы файлов

Теперь, когда вы знаете, чего ожидать, давайте быстро рассмотрим реальный формат файла Parquet. Есть гораздо более подробные руководства по parquet, и я рекомендую вам прочитать официальную документацию parquet, в частности, чтобы понять, как все это работает.

Вот диаграмма из документации Parquet, показывающая структуру Parquet-файла. Она немного ошеломляет, но я думаю, что ключевым выводом является важность организации данных и метаданных. Я приведу здесь только общую схему, ведь документация являются исчерпывающим и в то же время достаточно доступными источником, если хотя бы в какой-то степени понимаете, что происходит (а вы, вероятно, к этому моменту уже понимаете!).

Выводы

Для Spark, Spark SQL, Hive, Impala и других подобных технологий столбчатое хранение данных может дать 100-кратное, а иногда и 1000-кратное повышение производительности, особенно для разреженных запросов к очень широким наборам данных. Несмотря на свою сложность, файлы оптимизированы для компьютеров, и хотя это затрудняет их чтение человеком, они действительно кажутся большим шагом вперед по сравнению с такими форматами, как JSON или Avro.

Ого, вы дочитали аж до сюда?

Спасибо, что дочитали до самого конца! Полагаю, вы глубоко увлечены работой с big data, если добрались до конца. Если вам когда-нибудь понадобится поделиться идеями или получить совет по поводу того, над чем вы работаете, не стесняйтесь отправить мне электронное письмо matthew (at) rathbonelabs (dot com). Я люблю говорить о big data с умными людьми.


Всех, кому интересен курс Data Engineer приглашаем посетить бесплатный демо-урок курса на тему ML в Spark. На уроке участники узнают об особенностях ML в Spark, рассмотрят процесс разработки моделей и научатся переводить обученные модели в production.


ЗАБРАТЬ СКИДКУ

Подробнее..

Перевод Почему ваши Spark приложения медленно работаютили не работают вообще. Часть 1 Управление памятью

02.02.2021 02:11:38 | Автор: admin

Будущих учащихся на курсе Экосистема Hadoop, Spark, Hive приглашаем на открытый вебинар по теме Spark Streaming. На вебинаре участники вместе с экспертом познакомятся со Spark Streaming и Structured Streaming, изучат их особенности и напишут простое приложение обработки потоков.

А сейчас делимся с вами традиционным переводом полезного материала.


Spark приложения легко писать и легко понять, когда все идет по плану. Однако, это становится очень сложно, когда приложения Spark начинают медленно запускаться или выходить из строя. Порой хорошо настроенное приложение может выйти из строя из-за изменения данных или изменения компоновки данных. Иногда приложение, которое до сих пор работало хорошо, начинает вести себя плохо из-за нехватки ресурсов. Список можно продолжать и продолжать.

Важно понимать не только приложение Spark, но также и его базовые компоненты среды выполнения, такие как использование диска, сети, конфликт доступа и т.д., чтобы мы могли принимать обоснованные решения, когда дела идут плохо.

В этой серии статей я хочу рассказать о некоторых наиболее распространенных причинах, по которым приложение Spark выходит из строя или замедляется. Первая и наиболее распространенная это управление памятью.

Если бы мы заставили всех разработчиков Spark проголосовать, то условия отсутствия памяти (OOM) наверняка стали бы проблемой номер один, с которой все столкнулись.Это неудивительно, так как архитектура Spark ориентирована на память. Некоторые из наиболее распространенных причин OOM:

  • неправильное использование Spark

  • высокая степень многопоточности (high concurrency)

  • неэффективные запросы

  • неправильная конфигурация

Чтобы избежать этих проблем, нам необходимо базовое понимание Spark и наших данных. Есть определенные вещи, которые могут быть сделаны, чтобы либо предотвратить OOM, либо настроить приложение, которое вышло из строя из-за OOM.Стандартная конфигурация Spark может быть достаточной или не подходящей для ваших приложений. Иногда даже хорошо настроенное приложение может выйти из строя по причине OOM, когда происходят изменения базовых данных.

Переполнение памяти может быть в узлах драйвера, исполнителя и управления. Рассмотрим каждый случай.

НЕДОСТАТОЧНО ПАМЯТИ ПРИ РАБОТЕ ДРАЙВЕРА

Драйвер в Spark это JVM (Java Virtual Machine) процесс, в котором работает основной поток управления приложения. Чаще всего драйвер выходит из строя с ошибкой OutOfMemory OOM (недостаточно памяти из-за неправильного использования Spark. Spark это механизм распределения нагрузки между рабочим оборудованием. Драйвер должен рассматриваться только как дирижер. В типовых установках драйверу предоставляется меньше памяти, чем исполнителям. Поэтому мы должны быть осторожны с тем, что мы делаем с драйвером.

Обычными причинами, приводящими к OutOfMemory OOM (недостаточно памяти) драйвера, являются:

  • rdd.collect()

  • sparkContext.broadcast

  • Низкий уровень памяти драйвера, настроенный в соответствии с требованиями приложения

  • Неправильная настройка Spark.sql.autoBroadcastJoinThreshold.

Spark использует этот лимит для распределения связей ко всем узлам в случае операции соединения. При самом первом использовании, все связи реализуются на узле драйвера. Иногда многочисленные таблицы также транслируются как часть осуществления запроса.

Попробуйте написать свое приложение таким образом, чтобы в драйвере можно было избежать полного сбора всех результатов. Вы вполне можете делегировать эту задачу одной из управляющих программ. Например, если вы хотите сохранить результаты в определенном файле, вы можете либо собрать их в драйвере, или назначить программу, которая сделает это за вас.

Если вы используете SQL (Structured Query Language) от Spark, а драйвер находится в состоянии OOM из-за распределения связей, то вы можете либо увеличить память драйвера, если это возможно; либо уменьшить значение "spark.sql.autoBroadcastJoinThreshold" (неправильная настройка порога подключения) так, чтобы ваши операции по объединению использовали более удобные для памяти операции слияния соединений.

НЕДОСТАТОЧНО ПАМЯТИ ПРИ РАБОТЕ УПРАВЛЯЮЩЕЙ ПРОГРАММ

Это очень распространенная проблема с приложениями Spark, которая может быть вызвана различными причинами. Некоторые из наиболее распространенных причин высокая степень многопоточности, неэффективные запросы и неправильная конфигурация. Рассмотрим каждую по очереди.

ВСОКАЯ СТЕПЕНЬ МНОГОПОТОЧНОСТИ

Прежде чем понять, почему высокая степень многопоточности может быть причиной OOM, давайте попробуем понять, как Spark выполняет запрос или задание и какие компоненты способствуют потреблению памяти.

Spark задания или запросы разбиваются на несколько этапов, и каждый этап далее делится на задачи. Количество задач зависит от различных факторов, например, на какой стадии выполняется, какой источник данных читается и т.д. Если это этап map-stage (фаза сканирования в SQL), то, как правило, соблюдаются базовые разделы источника данных.

Например, если реестр таблицы ORC (Optimized Row Columnar) имеет 2000 разделов, то для этапа map-stage создается 2000 заданий для чтения таблицы, предполагая, что обработка разделовещё не началась. Если это этап reduce-stage (стадия Shuffle), то для определения количества задач Spark будет использовать либо настройку "spark.default.parallelism" для RDD (Resilient Distributed Dataset), либо "spark.sql.shuffle.partitions" для DataSet (набор данных). Сколько задач будет выполняться параллельно каждой управляющей программе, будет зависеть от свойства "spark.executor.cores". Если это значение установить больше без учета памяти, то программы могут отказать и привести к ситуации OOM (недостаточно памяти). Теперь посмотрим на то, что происходит, как говорится, за кадром, при выполнении задачи и на некоторые вероятные причины OOM.

Допустим, мы реализуем задачу создания схемы (map) или этап сканирования SQL из файла HDFS (распределенная файловая система Hadoop distributed file system) или таблицы Parquet/ORC. Для файлов HDFS каждая задача Spark будет считывать блок данных размером 128 МБ.Таким образом, если выполняется 10 параллельных задач, то потребность в памяти составляет не менее 128*10 только для хранения разбитых на разделы данных. При этом опять же игнорируется любое сжатие данных, которое может привести к резкому скачку данных в зависимости от алгоритмов сжатия.

Spark читает Parquet (формат файлов с открытым исходным кодом) в векторном формате. Проще говоря, каждая задача Spark считывает данные из файла Parquet пакет за пакетом. Так как Parquet является столбцом, то эти пакеты строятся для каждого из столбцов.Она накапливает определенный объем данных по столбцам в памяти перед выполнением любой операции над этим столбцом. Это означает, что для хранения такого количества данных Spark необходимы некоторые структуры данных и учет. Кроме того, такие методы кодирования, как словарное кодирование, имеют некоторое состояние, сохраненное в памяти. Все они требуют памяти.

Spark задачи и компоненты памяти во время сканирования таблицыSpark задачи и компоненты памяти во время сканирования таблицы

Так что, при большем количестве параллелей, потребление ресурсов увеличивается. Кроме того, если речь идет о широковещательное соединении (broadcast join), то широковещательные переменные (broadcast variables) также займут некоторое количество памяти. На приведенной выше диаграмме показан простой случай, когда каждый исполнитель выполняет две задачи параллельно.

НЕЭФФЕКТИВНЕ ЗАПРОС

Хотя программа Spark's Catalyst пытается максимально оптимизировать запрос, она не может помочь, если сам запрос плохо написан. Например, выбор всех столбцов таблицы Parquet/ORC. Как видно из предыдущего раздела, каждый столбец нуждается в некотором пакетном состоянии в памяти. Если выбрано больше столбцов, то больше будет потребляться ресурсов.

Постарайтесь считывать как можно меньше столбцов. Попробуйте использовать фильтры везде, где это возможно, чтобы меньше данных попадало к управляющим программам. Некоторые источники данных поддерживают обрезку разделов. Если ваш запрос может быть преобразован в столбец(ы) раздела, то это в значительной степени уменьшит перемещение данных.

НЕПРАВИЛЬНАЯ КОНФИГУРАЦИЯ

Неправильная конфигурация памяти и кэширования также может привести к сбоям и замедлению работы приложений Spark. Рассмотрим некоторые примеры.

ПАМЯТЬ ИСПОЛНИТЕЛЯ И ДРАЙВЕРА

Требования к памяти каждого приложения разные. В зависимости от требований, каждое приложение должно быть настроено по-разному. Вы должны обеспечить правильные значения памяти spark.executor.memory или spark.driver.memory в зависимости от загруженности. Как бы очевидно это ни казалось, это одна из самых трудных задач. Нам нужна помощь средств для мониторинга фактического использования памяти приложения. Unravel (Unravel Data Operations Platform) делает это довольно хорошо.

ПЕРЕГРУЗКА ПАМЯТИ

Иногда это не память управляющей программы, а перегруженная память модуля YARN (Yet Another Resource Negotiator еще один ресурсный посредник), которая вызывает OOM или узел перестает функционировать (killed) из-за YARN. Сообщения "YARN kill" обычно выглядят так:

YARN запускает каждый компонент Spark, как управляющие программы и драйвера внутри модулей. Переполненная память это off-heap память, используемая для JVM в режиме перегрузки, интернированных строк и других метаданных JVM. В этом случае необходимо настроить spark.yarn.executor.memoryOverhead на нужное значение. Обычно 10% общей памяти управляющей программы должно быть выделено под неизбежное потребление ресурсов.

КЭШИРОВАННАЯ ПАМЯТЬ

Если ваше приложение использует кэширование Spark для хранения некоторых наборов данных, то стоит обратить внимание на настройки менеджера памяти Spark. Менеджер памяти Spark разработан в очень общем стиле, чтобы удовлетворить основные рабочие нагрузки. Следовательно, есть несколько настроек, чтобы установить его правильно для определенной внеплановой нагрузки.

Spark определила требования к памяти как два типа: исполнение и хранение. Память хранения используется для кэширования, а память исполнения выделяется для временных структур, таких как хэш-таблицы для агрегирования, объединения и т. д.

Как память исполнения, так и память хранения можно получить из настраиваемой части (общий объем памяти 300МБ). Эта настройка называется "spark.memory.fraction". По умолчанию 60%. Из них по умолчанию 50% (настраивается параметром "spark.memory.storageFraction") выделяется на хранение и остаток выделяется на исполнение.

Бывают ситуации, когда каждый из вышеперечисленных резервов памяти, а именно исполнение и хранение, могут занимать друг у друга, если другой свободен. Кроме того, память в хранилище может быть уменьшена до предела, если она заимствовала память из исполнения. Однако, не вдаваясь в эти сложности, мы можем настроить нашу программу таким образом, чтобы наши кэшированные данные, которые помещаются в память хранилища, не создавали проблем для выполнения.

Если мы не хотим, чтобы все наши кэшированные данные оставались в памяти, то мы можем настроить "spark.memory.storageFraction" на меньшее значение, чтобы лишние данные были исключены и выполнение не столкнулось бы с нехваткой памяти.

ПЕРЕГРУЗКА ПАМЯТИ В МЕНЕДЖЕРЕ УЗЛА

Spark приложения, которые осуществляют перетасовку данных в рамках групповых операций или присоединяются к подобным операциям, испытывают значительные перегрузки. Обычно процесс перетасовки выполняется управляющей программой. Если управляющая программа (исполнитель) занята или завалена большим количеством (мусора) GC (Garbage Collector), то она не может обслуживать перетасовки запросов. Эта проблема в некоторой степени решается за счет использования внешнего сервиса обмена.

Внешний сервис обмена работает на каждом рабочем узле и обрабатывает поступающие от исполнителей запросы на переключение. Исполнители могут читать перемешанные файлы с этого сервиса, вместо того, чтобы не считывать файлы между собой. Это помогает запрашивающим исполнителям читать перемешанные файлы, даже если производящие их исполнители не работают или работают медленно. Также, когда включено динамическое распределение, его обязательным условием является включение внешнего сортировочного сервиса.

Когда внешний сервис обмена данными Spark настроен с помощью YARN, NodeManager (управляющий узел) запускает вспомогательный сервис, который действует как внешний провайдер обмена данными. По умолчанию память NodeManager составляет около 1 ГБ. Однако приложения, выполняющие значительную перестановку данных, могут выйти из строя из-за того, что память NodeManager исчерпана. Крайне важно правильно настроить NodeManager, если ваши приложения попадают в вышеуказанную категорию.

КОНЕЦ ЧАСТИ 1, СПАСИБО ЗА ВНИМАНИЕ

Процессинг внутренней памяти Spark ключевая часть ее мощности. Поэтому эффективное управление памятью является критически важным фактором для получения наилучшей производительности, расширяемости и стабильности ваших приложений Spark и каналов передачи данных. Однако настройки по умолчанию в Spark часто бывают недостаточными. В зависимости от приложения и среды, некоторые ключевые параметры конфигурации должны быть установлены правильно для достижения ваших целей производительности. Если иметь базовое представление о них и о том, как они могут повлиять на общее приложение, то это поможет в работе.

Я поделился некоторыми соображениями о том, на что следует обратить внимание при рассмотрении вопроса об управлении памятью Spark. Это область, которую платформа Unravel понимает и оптимизирует очень хорошо, с небольшим количеством, если таковое вообще потребуется, человеческого вмешательства. Я рекомендую вам заказать демо-версию, чтобы увидеть Unravel в действии. Мы видим довольно значительное ускорение работы приложений Spark.

Во второй части этой серии статьи напишу о том, почему ваши приложения Spark медленно работают или не работают: Во второй части цикла, посвященной искажению данных и сбору мусора, я расскажу о том, как структура данных, искажение данных и сбор мусора влияют на производительность Spark.


Узнать подробнее о курсе Экосистема Hadoop, Spark, Hive.

Записаться на открытый вебинар по теме Spark Streaming.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru