Русский
Русский
English
Статистика
Реклама

Data lake

Recovery mode Создаём компанию мечты нет хайпу

01.06.2021 00:14:15 | Автор: admin
Наверняка в вашей компании уже не раз появлялись ребята в дорогих костюмах и с хорошо подвешенным языком, увлекательно рассказывающие, что без современных айти-штучек компания не проживет и несколько лет!

Все эти data lake (болото данных), КХД (корпоративное кладбище данных), data mining (смотри, не подорвись), data governance (стань рабом своих данных) и им подобные не исчезают из их рассказов, периодически сменяя друг друга. Срок жизни очередного хайпа редко превышает год-два, но при желании для вас с большим удовольствием откопают любую почти забытую технологию.

При этом биг-дату продают как такой волшебный сундук, из которого можно доставать разные чудеса: то ковер-самолёт, то сапоги-скороходы, а то и шамаханскую царицу (кому что актуально). Вот только, как правило, ковёр-самолёт проеден волшебной же молью и больше не летает, сапоги с отвалившейся подошвой и пешком-то в них ходить неудобно, а про дряхлую царицу и говорить нечего.

В этой статье я постараюсь рассказать о старых-добрых технологиях, которые всё ещё работают. О том, что можно извлечь из перечисленных выше хайповых технологий и как всем этим пользоваться простым смертным, типа нас, не нанимая толпу дата-сайентологов с зарплатами >10 тысяч $ в месяц.

image


Статья продолжает цикл:
Создаём компанию мечты: мастер-данные и интеграция
Создаём компанию мечты: управление качеством данных

Содержание


1. Big data: постановка проблемы
2. Мастер-данные: бессмертная классика
3. Как хранить данные: нужны ли КХД
4. Нормализация, или зачем вам болота данных
5. Почему дата-сайентист получает больше аналитика, а делает меньше?
6. Шина данных vs микросервисы
7. Как вообще не попасть на хайп?

1. Big data: постановка проблемы


Роль big data в развитии современной цивилизации впечатляющая. Но не по той причине, которая вам кажется.

Если интернет в каждой деревне и каждом телефоне появился благодаря порно и соцсетям (мессенджерам), то big data подарила триллионы долларов производителям жёстких дисков и оперативной памяти.

Проблема в том, что реальная польза от современной big data (в широком смысле слова) для всего человечества близка к пользе от порнографии, т.е. за несколькими исключениями нулевая!

Как же так, удивитесь вы. Ведь любой консультант и продажник рассказывает целую дюжину примеров, начиная с General Electric с их диагностикой состояния авиадвигателей, заканчивая таргетированной рекламой от Гугла!

Проблема, если быть точнее, в повторяемости результатов. Скажу по секрету, что скамейка запасных у продажников big data короткая. Если вы попросите их привести ещё несколько примеров, список закончится на втором десятке. Уверен, что мессенджеров и порносайтов они смогут назвать куда больше :) потому что их просто физически больше.

Результат от работы дата-сайентистов, конечно, есть, только он редко когда удовлетворяет заказчиков. Потому что, потратив год работы и несколько миллионов на оборудование и зарплату, на выходе они дают совершенно тривиальные выводы и закономерности, очевидные для любого линейного менеджера или специалиста на местах. Например, что лучше всего продаётся товар, размещённый на уровне человеческого глаза.

А General Electric сформировала своё конкурентное преимущество основываясь на методах математического анализа и статистики, которые можно найти в любом курсе математики для университета. Понятия big data тогда ещё не было.

Но на матанализе хайп не сделаешь, и слушать про двухсотлетние методы Фурье и Коши большие менеджеры вряд ли будут. Ведь там всё скучно, занудно, нужно много думать, и точно нет серебряной пули и волшебной таблетки.

Что же делать? Работать! Долго, скучно и уныло, стараясь создать такую атмосферу, которая поощряла бы активное думание. Как в канонических примерах от Bell Labs или той же GE. Это вполне возможно, более того, на это способны самые обычные люди, как вы и я, если их нужным образом замотивировать.

А начать нужно с

2. Мастер-данные: бессмертная классика


Мастер-данные это такой подход в структурировании информации, которая есть в компании. Если вы в какой-то момент обнаруживаете, что та или иная сущность используется одновременно в двух или большем количестве систем в вашей компании (например, список сотрудников на внутреннем сайте, в базе 1С-Бухгалтерии или CRM-системе), нужно выносить его в отдельную систему мастер-данных (MDM) и заставлять все системы пользоваться только этим справочником. По ходу понадобиться всем участникам договориться о нужных полях и атрибутах, а также придумать множество правил для контроля качества этих данных.

В среде дата-сайентистов младше 30 лет встречается убеждение, что окно для внедрения систем MDM началось примерно в 2008 и закончилось в районе 2012-15 годов. Что после этого появилось так много новых инструментов (всякие hadoop и spark), что заморачиваться с мастер-данными уже не нужно, не нужно ходить и договариваться с владельцами всех систем, думать о последствиях выбора архитектуры MDM и каждого конкретного реквизита в каждом справочнике.

К сожалению для них и к счастью для вас, это окно не закрылось. MDM-системы всё так же актуальны, как и системы бухучета или взаимодействия с клиентами. И думать и договариваться всё ещё нужно.

3. Как хранить данные: нужны ли КХД


Нет, корпоративные кладбища данных вам не нужны.

Идея, что для целей аналитики нужно иметь специальным образом подготовленные наборы всех данных (идеологи КХД не только выделяют это слово жирным, но и подчёркивают его двойной чертой) в вашей компании, абсурда. Коэффициент реального использования этих данных минимальный, 99% из них не используются никогда.

Тем не менее, идея предподготовленных наборов данных сама по себе годная. Только готовить их надо перед потенциальным использованием, не раньше. И, конечно, нужно иметь работающую методику такой подготовки.

4. Нормализация, или зачем вам болота данных


Это раздел про data lake, или болото данных. Легенды гласят, что можно сваливать все данные без разбору в одну большую кучу. Не нужно приводить все данные к одному формату, не нужно нормализовать и вычищать их!

И что существует такое специальное программное обеспечение, которое позволяет из этой свалки данных делать полезные именно вам выводы и доставать, как фокусник из рукава, нужные вам закономерности.

На практике самый ценный вывод, который вы сможете сделать из data lake что ваша компания почти не работает на январских каникулах.

И главный вопрос как какие-то жулики сумели убедить хоть кого-то в работоспособности этого подхода. Я склоняюсь к гипнозу :)

5. Почему дата-сайентист получает больше аналитика, а делает меньше?


Маркетинг, грамотная самопрезентация, максимум уверенности в себе. Не исключаю также гипноз :)

6. Шина данных vs микросервисы


Мой любимый пример использования технологии не по назначению. В любой достаточно крупной компании на определённом этапе развития возникает шина данных. Не обязательно единая и по науке, но сама функция реализуется успешно. Подробнее и системно о подходе можно прочитать в прошлой статье.

В качестве альтернативы молодым, успешно растущим компаниям предлагают использовать микросервисы или наборы открытых API, свои для каждой используемой системы.

Да, микросервисы очень удобны, когда вы пишите один монопродукт, с которым могут интегрироваться другие. Микросервисы, как правило, достаточно легко пишутся, их несложно тестировать, и в процессе их разработки вовсе не обязательно договариваться. За это их любят как разработчики, так и менеджеры.

Как показывает практика, любые две системы прекрасно интегрируются через микросервисы. Любые три хорошо. Любые пять терпимо, если очень тщательно всё задокументировать и обвешать автотестами.

Уже на десяти системах прекрасно выглядевшая на старте архитектура, подход превращается в некий такой клубок, паутину, когда определённые потоки отваливаются и не работают месяцами.

image

На нескольких десятках систем (цифра только кажется внушительной, в любом энтерпрайзе используется куда больше информационных систем) подход хоронит сам себя. И спустя несколько лет возникает какая-то централизация и шина. Делают её, как правило, уже другие люди.

7. Как вообще не попасть на хайп?


Вы познакомились с несколькими примерами хайпа, когда какой-то подход или технология могут оказаться бесполезными. И это с учётом того, что, по мировой статистике, доля успешно выполненных проектов по разработке и внедрению в IT редко превышает 40%.

Послевкусие от проваленных или бесполезных проектов может оказаться таким, что компания на время вообще откажется от IT-инициатив до тех пор, пока очередной влиятельный менеджер не оседлает очередной хайп.

Чтобы не попасть на хайп, перед очередным внедрением нужно выяснить следующее:

у технологии есть большая скамейка запасных. Количество приводимых примеров успешного применения должно превышать пару десятков, и от них не должно возникать ощущения, что тут происходит какая-то магия;
технология должна проходить тест на бабушку (объяснение сути должны быть настолько понятным, что его осилит даже ваша бабушка повторю, никакой магии);
у технологии должен быть конкретный, оцифрованный список ачивок, которые в результате получит ваша компания. Внедренцы MDM, CRM или той же 1С-бухгалтерии могут часами рассказывать о пользе их решения на примере ваших конкретных задач. Внедренцы Big data вообще начинают рассказывать, что сначала соберём кучу данных, а потом посмотрим, что с ней делать;
и, наконец, технология должна быть фальсифицируемой (в смысле критерия Поппера), т.е. внедренец должен чётко понимать область её применения и актуальности и суметь привести доводы против (!) внедрения. Не нужно забивать гвозди микроскопом, и вообще, например, если у вас мало клиентов, нужна ли вам супер-пупер CRM?

По большому счёту, этого уже достаточно, чтобы продолжить просто работать и не отвлекаться на хайпы.

Можете предложить ещё какие-нибудь критерии?
Приглашаю к обсуждению!
Подробнее..

Business Intelligence на больших данных наш опыт интеграции

20.01.2021 14:20:49 | Автор: admin

В вопросах производительности BI обычно приходится искать компромисс между скоростью работы аналитики и объемами данных, над которыми она реализована. Мы попробовали убить двух зайцев сразу, и сегодня я хочу поделиться нашим опытом интеграции Visiology с платформой Arenadata при построении гибридной модели работы BI.

Если вы читаете наш блог, то уже знаете о платформе Visiology хотя бы в общих чертах (если нет, это можно легко исправить, прочитав наш первый пост). Но сегодня речь пойдет не только о платформе Visiology и BI как таковых, но также о наших новых друзьях Arenadata. А точнее, об интеграции, которая позволила реализовать гибридную работу аналитики с большой скоростью и на больших объемах данных.

Зачем потребовалась интеграция Arenadata и Visiology?

Подходов к работе BI-систем на сегодняшний день несколько. Но когда речь идет о больших данных для самых разных задач, обычно используется ROLAP. Работает он достаточно просто: когда пользователь нажимает что-то на дашборде, например, выбирает какой-то фильтр, внутри платформы формируется SQL-запрос, который уходит на тот или иной бэкэнд. В принципе, под системой BI может лежать любая СУБД, которая поддерживает запросы от Postgres до Teradata. Подробнее о схемах работы OLAP я рассказывал здесь.

Преимущество интеграции BI с СУБД заключается в том, что для работы системы, по сути, нет ограничения по объему данных. Но при этом падает скорость выполнения запросов - конечно, если не использовать специализированную колоночную СУБД, например, ClickHouse или Vertica. И, хотя у ClickHouse спектр возможностей пока еще уже, чем у той же Vertica, система развивается и выглядит очень многообещающей.

Но даже с колоночной СУБД есть свои минусы при работе с BI, и самый первый из них это более низкая эффективность использования кэша на уровне платформы в целом, потому что СУБД, в отличие от самой BI-платформы, "не знает" многого о поведении пользователей и не может использовать эту информацию для оптимизации. Когда большое количество пользователей начинают работать, по-разному делать запросы и обращаться к дашбордам, требования к железу, на котором крутится СУБД даже хорошая, аналитическая и колоночная могут оказаться очень серьезными.

Второй момент это ограничение аналитической функциональности: все, что не укладывается в SQL-запрос, поддерживаемый распределенной СУБД, отсекается автоматически (например, в случае ClickHouse - это оконные функции). И это проблема, потому что в BI есть много вещей, которые с трудом транслируются в SQL-запросы или выполняются неоптимально.

Второй вариант это In-memory OLAP. Он подразумевает перенос всех обрабатываемых данных в специальный движок, который молниеносно прорабатывает базу в 200-300 Гб это порядок единицы миллиардов записей. Кстати, подробнее про ограничения In-Memory OLAP я уже рассказывал здесь. На практике встречаются инсталляции In-Memory OLAP, укомплектованные 1-2-3 терабайтами оперативной памяти, но это скорее экзотика, причем дорогостоящая.

Практика показывает, что далеко не всегда можно обойтись тем или иным подходом. Когда требуются одновременно гибкость, возможность работы с большим объемом данных и поддержка значительного количества пользователей, возникает потребность в гибридной системе, которая с одной стороны загружает данные в движок In-Memory OLAP, а с другой постоянно подтягивает нужные записи из СУБД. В этом случае движок OLAP используется для доступа ко всему массиву данных, без всяких задержек. И в отличие от чистого In-Memory OLAP, который нужно периодически перезагружать, в гибридной модели мы всегда получаем актуальные данные.

Такое разделение данных на горячие и холодные объединяет плюсы обоих подходов ROLAP и In-Memory, но усложняет проект внедрения BI. Например, разделение данных происходит вручную, на уровне ETL процедур. Поэтому для эффективной работы всего комплекса очень важна совместимость между бэкэндом и самой BI-системой. При том, что SQL-запросы остаются стандартными, в реальности всегда есть аспекты их выполнения, нюансы производительности.

Arenadata и Arenadata QuickMarts

Платформа данных Arenadata состоит из нескольких компонентов, построенных на базе открытых технологий, и используется многими российскими и зарубежными компаниями. В состав решения входит собственное MPP решение на базе Greenplum, дистрибутив Hadoop для хранения и обработки неструктурированных и слабоструктурированных данных, система централизованного управления ADCM (Сluster Management) на базе Ansible и другие полезные компоненты, в том числе Arenadata QuickMarts (ADQM).

СУБД ADQM это колоночная СУБД от Arenadata, построенная на базе ClickHouse, аналитической СУБД, которую развивает Яндекс. Изначально ClickHouse создавалась для внутреннего проекта Яндекс.Метрика, но эта СУБД очень понравилась сообществу. В результате исходный код ClickHouse был переведен в OpenSource (лицензия Apache-2) и стал популярен по всему миру. На сегодняшний день насчитывается порядка 1000 инсталляций ClickHouse по всему миру, и только 1/3 из них в России. И хотя Яндекс остается основным контрибьютором развития СУБД, лицензия Apache-2 позволяет абсолютно свободно использовать продукт и вносить изменения в проект.

Современная колоночная СУБД использует аппаратную оптимизацию CPU (SSE). ClickHouse может очень быстро выполнять запросы за счет векторных оптимизаций и утилизации всего ресурса многоядерных CPU. На базе ClickHouse работают огромные кластера сам Яндекс растягивает эту СУБД на несколько сотен серверов. Это гарантирует, что вместе с этим решением вы можете масштабироваться в достаточно больших объемах.

Но главная фича ClickHouse в нашем контексте это эффективная работа с достаточно специфическими аналитическими запросами. Если витрины уже отстроены и вам нужно предоставить доступ пользователей к BI с минимальной латентностью, эта история как раз для ClickHouse. Эта СУБД прекрасно справляется с запросами без джойнов и соединений.

Во многих сравнениях ClickHouse дает серьезную фору даже классическим СУБД, например, той же Oracle Exadata. Результаты этих тестов можно найти на ресурсах Яндекса.

Производительность QuickMarts

  • Типичные запросы быстрей чем за секунду

  • > 100 раз быстрей чем Hadoop и обычные СУБД

  • 100 млн - 1 миллиард строк в секунду на одной ноде

  • До 2 терабайт в секунду для кластера на 400 нод

Но вернемся к Arenadata QuickMarts. Это сборка ClickHouse, которая немного отличается от сборки Яндекса. Наши коллеги из Arenadata даже позже выпускают релизы, потому что проводят больше тестов, чтобы серьезные задачи в продакшене работали только на стабильных версиях.

При этом установка и настройка ADQM происходит из Arenadata Cluster Manager. Кастомизированная СУБД обладает расширенными механизмами авторизации пользователей, a также средствами мониторинга на базе Graphite и Grafana. Но самое главное, что QuickMarts изначально располагает готовыми коннекторами и прозрачно взаимодействует с другими компонентами платформы, в т.ч. с ADB (Greenplum), что позволяет по мере необходимости подгружать данные из ADB в ADQM.

В нашем случае QuickMarts используется для работы с витринами, к которым через BI обращаются сотни или тысячи пользователей. Архитектура системы позволяет выдать им данные здесь и сейчас, а не ждать 20-30 секунд, когда обработается их запрос по витринам в более медленной СУБД.

Как работает интеграция Arenadata и Visiology

Когда Visiology используется вместе с Arenadata, схема работы системы выглядит следующим образом. Основное хранилище данных может быть реализовано на базе ADB (GreenPlum), из которой создаются витрины данных, хранящиеся уже в ADQM. За счет интеграции между компонентами решения система работает как единое целое, а необходимые для запросов данные поднимаются на нужный уровень автоматически.

Фактически в аналитической системе создается только один дашборд, а графику обрабатывает движок In-Memory ViQube ядро платформы Visiology. Пользователь лишь выбирает те или иные фильтры, а задача по выгрузке самих транзакций выполняется уже на бэкенде ресурсами QuickMarts.

Раньше подобная интеграция была только с Vertica, но сейчас мы совместно с коллегами сделали интеграцию для Arenadata QuickMarts. Это радостная новость для сторонников ClickHouse, потому что BI работает с популярной СУБД по гибридной схеме. При этом Arenadata DB, выполняющая функцию корпоративного хранилища данных, обеспечивает необходимую трансформацию данных для дальнейшей работы QuickMarts и Visiology.

Все запросы BI обрабатываются движком ViQube. Если пользователь обращается к тем данным, которых нет в памяти, система автоматически генерирует SQL-запрос, который выполняется на Arenadata QuickMarts.

Чтобы все это заработало, мы реализовали поддержку диалекта ClickHouse для основных аналитических функций и добавили автоматическое переключение между режимами работы OLAP в зависимости от того, где находятся данные на самом деле. Однако для пользователя все остается предельно прозрачным: он даже не знает, как работает система просто делает запросы в интерфейсе BI и получает результаты, причем достаточно быстро.

Конечно, у такой схемы есть и свои минусы. Например, учитывая ограничения SQL, не все аналитические функции будут доступны на полном объеме данных. Но зато появляется возможность отрабатывать огромное количество транзакций, для большого количества людей, которые в основном изучают тренды на готовых дашбордах, и лишь иногда ищут конкретные записи.

Развиваемся дальше

Сейчас интеграция находится на стадии версии v1.0, и мы планируем дальнейшие доработки. В частности, уже сейчас речь идет о том, чтобы расширить набор доступных аналитических возможностей, а также об интеграции в единую консоль управления (например, у Arenadata есть решение Cluster Manager (ADCM), которое позволяет управлять всеми компонентами ландшафта из единой консоли, рассматриваем это как один из вариантов).

Также на повестке дня остро стоит задача автоматизации настройки метаданных. Сейчас их нужно размечать в специальном интерфейсе - это довольно трудоемко. Хочется, чтобы система самостоятельно распознавала бы все необходимые параметры, получив путь к той или иной витрине.

В целом, мы остались очень довольны и сотрудничеством с Arenadata, и той интеграцией с ClickHouse и ADQM, которая получилась. Теперь в аналитической платформе Visiology можно одновременно работать с источниками данных любого масштаба - от Small Data (ручной ввод, Excel) до Big Data (миллиардов или даже сотни миллиардов транзакций из распределенных хранилищ данных). А гибридный режим работы, который мы реализовали вместе с Arenadata, еще и позволяет сделать это с разумными затратами на оборудование.

Будем признательны, если вы напишете в комментариях, с какими сценариями запуска BI на больших данных вы сталкивались. Ну а мы всегда готовы поделиться своим опытом в деталях!

Подробнее..

Как мы выбирали Data Catalog, но в итоге оставили все как есть

09.04.2021 12:18:12 | Автор: admin

Меня зовут Никита Василюк, я инженер по работе с данными в департаменте данных и аналитики Lamoda. Я и моя команда занимаемся всем, что связано с распределенной системой хранения и обработки данных.


Периодически нам приходится отвечать на вопросы, где у нас лежат те или иные данные. Поэтому однажды мы решили провести эксперимент и внедрить Data Catalog, чтобы запросы приходили уже не к нам, а в систему. Например, если человеку понадобилась информация, связанная с заказами, он может перейти в систему, ввести слово order и найти все, что ему нужно по этой теме. Мы рассмотрели три инструмента и в итоге не стали ничего менять. Рассказываю почему.



В идеальном мире Data Catalog это инструмент, в котором можно найти краткую сводку по данным в хранилище, увидеть их структуру, проследить lineage (путь данных от системы-источника до целевой таблицы), посмотреть profiling (краткую статистику по полям таблицы) и историю проверок качества данных, увидеть владельцев данных и запросить доступ. Сейчас у нас есть подобие этого каталога: все таблицы нашего хранилища описываются вручную аналитиками в Confluence.


Мы решили поставить небольшой эксперимент и представить, что было бы, если роль Data Catalog исполнял не Confluence, а другая система.


Требования к системе


Мы определили несколько важных требований к потенциальной системе, в которой бы начали строить Data Catalog:


  • Автоматический сбор данных из разных СУБД. Это позволит нам избавить аналитиков от ручного обновления описаний таблиц.
  • Отображение структуры датасета с понятными описаниями и полнотекстовым поиском по этой информации.
  • Web UI с поиском. Это очень важное требование, поскольку в первую очередь Data Catalog задумывается как инструмент для поиска метаданных.
  • Визуализация data lineage от системы-источника до отчета в BI-системе.
  • Отображение data owner. С помощью этого можно понять, к какому человеку обратиться по всем вопросам, связанных с данными.

Остальные требования входят в разряд хотелок их наличие упростило бы жизнь, однако отсутствие не так критично:


  • SSO SAML авторизация;
  • визуализация Data Profiling;
  • визуализация Data Quality;
  • добавление кастомной информации для отображения;
  • трекинг изменения датасетов.

Мы решили рассмотреть три популярных open source проекта: Amundsen, LinkedIn DataHub и Marquez.


Amundsen



Amundsen это типичный справочник. То есть просто хорошая штука, чтобы поискать информацию по имеющимся таблицам. Он состоит из следующих сервисов:


  • neo4j хранилище метаданных (также может использоваться Apache Atlas);
  • elasticsearch поисковый движок;
  • amundsensearch сервис для поиска по данным в Elasticsearch;
  • amundsenfrontendlibrary Web UI (написан на Flask);
  • amundsenmetadatalibrary отвечает за работу с метаданными в Neo4j или Atlas;
  • amundsendatabuilder библиотека для извлечения данных из различных СУБД.


Принцип работы довольно простой. ETL-процесс сбора метаданных состоит из извлечения записей из источника при помощи выполнения SQL-запросов, преобразования записей и их загрузки в хранилище метаданных. Extractor выполняет запрос к хранилищу метаданных и преобразует их в набор вершин и связей между ними. Промежуточные результаты сохраняются в локальную директорию. Transformer преобразует загруженные данные в нужную структуру. Loader подхватывает промежуточные данные и складывает их либо во временный слой, либо сразу в финальное хранилище. Publisher подхватывает промежуточные данные и отправляет в хранилище.


В целом Amundsen хороший справочник, который может отображать текущее состояние данных, но, к сожалению, он не способен хранить историю. Мы не можем отследить, когда таблица или колонка была добавлена, удалена или модифицирована.


Во время тестирования Amundsen показался достаточно сырым например, из коробки не было авторизации, а поиск работал только по тегам и названиям баз, таблиц и колонок, не было возможности искать по описаниям. Но он действительно хорошо работает, когда нужно посмотреть, какие данные есть у нас в схемах.


Плюсы:


  • автоматический сбор данных из разных СУБД;
  • API для добавления или редактирования данных в автоматическом режиме за счет обращения напрямую к Metastore/information_schema;
  • Web UI с полнотекстовым поиском;
  • поиск по базам, таблицам, полям и тэгам;
  • добавление кастомной информации для отображения (programmatic description)
  • визуализация data profiling (например, количество записей, дата последнего обновления, исторические значения);
  • визуализация data quality (какие проверки навешаны на датасет, история результатов проверок);
  • отображение data owner.

Минусы:


  • нет трекинга изменения датасетов (хранит только актуальное состояние и работает как справочник);
  • нет data lineage (источник можно идентифицировать только в блоке с кастомной информацией);
  • не нашли SSO-аутентификацию, доступна только OIDC;
  • полнотекстовой поиск работает только для тегов, таблиц, баз и колонок (нет возможности искать по описаниям колонок).

LinkedIn DataHub



Как можно понять из названия, это платформа поиска и обнаружения метаданных от LinkedIn. Из коробки она состоит из целого зоопарка сервисов:


  • kafka-broker брокер Kafka;
  • zookeeper координатор для Kafka;
  • kafka-rest-proxy RESTful интерфейс для Kafka;
  • kafka-topics-ui Web UI для топиков Kafka;
  • schema-registry Kafka Schema Registry;
  • schema-registry-ui Kafka Schema Registry UI;
  • elasticsearch поисковый движок;
  • kibana дашборд для Elasticsearch;
  • neo4j графовая база данных;
  • datahub-gms Generalized Metadata Store;
  • datahub-frontend Web UI;
  • datahub-mae-consumer сервис для обработки сообщений Metadata Audit Events;
  • datahub-mce-consumer сервис для обработки сообщений Metadata Change Events;
  • mysql база данных для хранения метаданных.


Основная сущность DataHub dataset. Он может включать в себя таблицы (RDBMS и не только), топики в Kafka, директории на HDFS или другие сущности, имеющие схему.


Датасет имеет:


  • схему (включая типы и комментарии к полям),
  • статус (active или deprecated),
  • владельцев,
  • relationships (он же lineage),
  • docs с указанием ссылок на документацию.

Метаданные обновляются через отправку сообщений Metadata Change Event (MCE) в Kafka. MCE это сообщение в формате AVRO с указанием пунктов, которые необходимо обновить. Гибкость обновления данных в системе достигается за счет возможности в одном сообщении обновить владельцев датасета, в другом обновить схему, в третьем upstream datasets.


Отличительная особенность DataHub приятный веб-интерфейс. Он нам сразу понравился и запал в душу. У него все хорошо в плане поиска, обновлений типов таблиц и типов датасетов, информация о схеме датасета выглядит очень приятно. Можно добавлять владельцев датасетов, можно зайти в профиль пользователя и посмотреть, какими датасетами он владеет. У DataHub есть lineage, для каждого датасета можно наблюдать его взаимосвязи с другими объектами. Также есть возможность к каждому датасету прикладывать ссылки на документацию или исходный код.


Самый большой минус DataHub он состоит из огромного числа компонентов. Плохо это тем, что за каждым надо следить и для каждого из них нужно настроить отказоустойчивость.


Плюсы:


  • удобный UI с поиском;
  • автоматический сбор данных из разных СУБД (большая гибкость, поддерживает сбор данных не только из СУБД, работает для всего, у чего есть схема);
  • добавление или редактирование данных в автоматическом режиме через отправку AVRO-сообщений в Kafka;
  • добавление ссылок на документацию к датасету;
  • визуализация data lineage от источника до отчета в BI-системе (однако нет возможности отобразить всю цепочку сразу, отображается только upstream и downstream датасеты на один уровень вверх и вниз);
  • отображение data owner;
  • есть возможность сделать связку с интранетом компании.

Минусы:


  • огромное количество внутренних сервисов, за каждым из которых нужно следить;
  • отсутствует трекинг изменения датасетов;
  • data lineage показывает только upstream и downstream датасеты;
  • отсутствие визуализации data profiling;
  • отсутствие визуализации data quality (в roadmap на Q2 2021 есть пункт про отображение визуализаций и интеграцию с такими системами, как Great Expectations и deequ);
  • нет возможности добавить кастомную информацию для датасета;
  • нет возможности прослеживать изменения в датасетах;
  • поиск работает только для датасетов и пользователей.

Marquez



Третий инструмент Marquez. Он состоит из основного приложения, базы данных и веб-интерфейса для отображения датасетов, джобов и связей между ними.


Метаданные в Marquez отправляются с помощью REST API. Еще он поддерживает создание следующих типов объектов:


  • data source системы-источники;
  • dataset таблицы, из которых читаются и в которые пишутся данные, обрабатываемые джобами;
  • job абстракция над процессом трансформации данных, которая принимает таблицы на вход и записывают в них данные;
  • job run запуск конкретной джобы.


Marquez на самом деле очень простой и не имеет в себе ничего лишнего. У него хорошая модель данных: абстракции, которые заложили в него разработчики, позволяют довольно полно описывать процессы обработки и трансформации данных.


Его самый главный минус слишком минималистичный интерфейс, он плохо справляется с отображением lineage, в котором есть много таблиц и ветвлений. Нет возможности отображать владельца данных, нельзя в режиме справочника посмотреть, какие таблицы у нас есть. Нет возможности отображать информацию по качеству данных, по профилированию, невозможно добавить кастомную информацию. То есть Marquez максимально простой инструмент, который может подойти для каких-то простых use-caseов, но не подойдет для чего-то масштабного.


Плюсы:


  • быстрый и минималистичный UI;
  • поддержка airflow из коробки;
  • простая, но гибкая модель данных, позволяет с минимальным набором абстракций описывать данные;
  • понятный и простой API для добавления или редактирования данных;
  • Web UI с поиском;
  • есть lineage;
  • минимум компонентов.

Минусы:


  • слишком минималистичный UI;
  • отсутствует авторизация;
  • плохо работает в режиме пробежаться глазами и посмотреть, какие данные вообще есть;
  • не отображается data owner;
  • поиск работает только для датасетов и джобов;
  • нет возможности прослеживать изменения в датасетах;
  • отсутствует трекинг изменения датасетов;
  • отсутствует визуализация data profiling;
  • отсутствует визуализация data quality;
  • нет возможности добавить кастомную информацию для датасета.

Бонус: загоняем lineage из DWH в Neo4j


В качестве бонуса мы решили попробовать графовую базу данных Neo4j для отображения lineage. Источником данных стала сервисная таблица в нашем хранилище, в которой для каждой другой таблицы указано, какие объекты участвовали в ее формировании. Мы взяли три самых массивных представления и прошлись по их lineage вплоть до систем-источников.



В первом подходе мы решили действовать в лоб: прошлись по всем таблицам в цепочке и соединили их промежуточными вершинами-джобами aka SQL-запросами, которые заполняют таблицу данными. Таким образом, получилось большое дерево связей, которое невозможно внятно читать (зато его забавно рассматривать и двигать).


Очевидно, что ничего дельного из этого графа мы не вычленим: вершин слишком много, для просмотра полного названия каждой вершины на нее нужно сначала нажать и не промазать, а поиск интересующей таблицы в графе может занять много времени.



Во втором подходе мы попробовали убрать джобы и просто связать таблицы между собой. Вершин в графе стало очевидно меньше, однако читать его легче не стало.
После этого мы загнали данные из Neo4j в инструмент под названием neo4j-explorer, который создан для более структурированного отображения графа из Neo4j.



Зеленые блоки джобы, серые таблицы. Можно выделить джоб или таблицу и подсветить его зависимости в обе стороны. Несмотря на то, что выглядит это мощно (и напоминает кусок производства из игры Factorio), ничего полезного из этого мы вынести тоже не можем.


Что мы выбрали в итоге и почему не стали внедрять


В результате нашим фаворитом стал LinkedIn DataHub. Но мы поняли, что большинство текущих хотелок полностью покрываются Confluence, а у команд аналитиков сложились устоявшиеся процессы по работе с данными. Внедрять новую сложную систему и изменять текущие подходы к работе стоит только ради очень серьезных улучшений. Помимо этого, плюсы систем и их ограничения не перевешивают для нас трудоемкости внедрения и перехода.


Проведя Customer Development среди потенциальных пользователей, мы пришли к выводу, что ни одна из систем не поможет сэкономить рабочее время тех людей, которые работают с данными. При этом сложность внедрения и перестройки процессов будет существенной. Поэтому мы решили на какое-то время отложить выбор.


Мы отслеживаем развитие рассмотренных в статье сервисов, изучаем платные варианты Data Catalog и их возможности. Если у вас есть успешный (или не очень) опыт внедрения подобных систем, то поделитесь им в комментариях.

Подробнее..

Что нам стоит загрузить JSON в Data Platform

16.06.2021 16:13:28 | Автор: admin

Всем привет!

В недавней статье мы рассказали, как мы шли к построению нашей Data Platform. Сегодня хотелось бы глубже погрузиться в желудок нашей платформы и попутно рассказать вам о том, как мы решали одну из задач, которая возникла в связи с ростом разнообразия интегрируемых источников данных.

То есть, если возвращаться к финальной картинке из упомянутой выше статьи (специально дублирую ее, чтобы уважаемым читателям было удобнее), то сегодня мы будем более углубленно говорить о реализации правой части схемы той, что лежит после Apache NiFi.

Схема из прошлой нашей статьи.Схема из прошлой нашей статьи.

Напомним, что в нашей компании более 350 реляционных баз данных. Естественно, не все они уникальны и многие представляют собой по сути разные экземпляры одной и той же системы, установленной во всех магазинах торговой сети, но все же зоопарк разнообразия присутствует. Поэтому без какого-либо Frameworkа, упрощающего и ускоряющего интеграцию источников в Data Platform, не обойтись.

Общая схема доставки данных из источников в ODS-слой Greenplum посредством разработанного нами frameworkа приведена ниже:

Общая схема доставки данных в ODS-слой Greenplum Общая схема доставки данных в ODS-слой Greenplum
  1. Данные из систем-источников пишутся в Kafka в AVRO-формате, обрабатываются в режиме реального времени Apache NiFi, который сохраняет их в формате parquet на S3.

  2. Затем эти файлы с сырыми данными с помощью Sparkа обрабатываются в два этапа:

    1. Compaction на данном этапе выполняется объединение для снижения количества выходных файлов с целью оптимизации записи и последующего чтения (то есть несколько более мелких файлов объединяются в несколько файлов побольше), а также производится дедубликация данных: простой distinct() и затем coalesce(). Результат сохраняется на S3. Эти файлы используются затем для parsing'а , а также являются своеобразным архивом сырых необработанных данных в формате как есть;

    2. Parsing на этой фазе производится разбор входных данных и сохранение их в плоские структуры согласно маппингу, описанному в метаданных. В общем случае из одного входного файла можно получить на выходе несколько плоских структур, которые в виде сжатых (как правило gzip) CSV-файлов сохраняются на S3.

  3. Заключительный этап загрузка данных CSV-файлов в ODS-слой хранилища: создается временная external table над данными в S3 через PXF S3 connector, после чего данные уже простым pgsql переливаются в таблицы ODS-слоя Greenplum

  4. Все это оркестрируется с помощью Airflow.

DAGи для Airflow у нас генерируются динамически на основании метаданных. Parsing файлов и разложение их в плоские структуры также производится с использованием метаданных. Это приводит к упрощению интеграции нового источника, так как, для этого необходимо всего лишь:

  • создать в ODS-слое Хранилища таблицы-приемники данных;

  • в репозитории метаданных в Git согласно принятым стандартам прописать в виде отдельных YAML-файлов:

    • общие настройки источника (такие как: расписание загрузки, входной и выходной формат файлов с данными, S3-бакет, имя сервисного пользователя, имя и email владельца для нотификации в случае сбоя загрузки и т.п.);

    • маппинг данных объектов источника на таблицы слоя ODS (при этом поддерживаются как плоские, так и вложенные структуры и массивы, а также есть возможность данные из одного объекта раскладывать в ODS-слое по нескольким таблицам). То есть описать, как необходимо сложную вложенную структуру разложить в плоские таблицы;

До недавнего времени такой подход удовлетворял текущие наши потребности, но количество и разнообразие источников данных растет. У нас стали появляться источники, которые не являются реляционными базами данных, а генерируют данные в виде потока JSON-объектов. Кроме того на горизонте уже маячила интеграция источника, который под собой имел MongoDB и поэтому будет использовать MongoDB Kafka source connector для записи данных в Kafka. Поэтому остро встала необходимость доработки нашего frameworkа для поддержки такого сценария. Хотелось, чтобы данные источника сразу попадали на S3 в формате JSON - то есть в формате "как есть", без лишнего шага конвертации в parquet посредством Apache NiFi.

В первую очередь необходимо было доработать шаг Compaction. Его код, если убрать всю обвязку и выделить только главное, очень простой:

df = spark.read.format(in_format) \               .options(**in_options) \               .load(path) \               .distinct()    new_df = df.coalesce(div)new_df.write.mode("overwrite") \             .format(out_format) \            .options(**out_options) \            .save(path)

Но если мы проделаем все те же манипуляции с JSON-данными, то волей-неволей внесем изменения во входные данные, так как при чтении JSONов Spark автоматом определит и сделает mergeSchema, т.е. мы тем самым можем исказить входные данные, чего не хотелось бы. Ведь наша задача на этом шаге только укрупнить файлы и дедублицировать данные, без какого-либо вмешательства в их структуру и наполнение. То есть сохранить их как есть.

По-хорошему, нам надо было просто прочитать данные как обычный текст, дедублицировать строки, укрупнить файлы и положить обратно на S3. Для этого был предложен достаточно изящный способ:

рассматривать файлы с JSON-объектами как DataFrame с одной колонкой, содержащей весь JSON-объект.

Попробуем сделать это. Допустим, мы имеем следующий файл данных:

file1:

{productId: 1, productName: ProductName 1, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}{productId: 2, price: 10.01, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}

Обратите внимание на формат этого файла. Это файл с JSON-объектами, где 1 строка = 1 объект. Оставаясь, по сути, JSON-ом, он при этом не пройдет синтаксическую JSON-валидацию. Именно в таком виде мы сохраняем JSON-данные на S3 (есть специальная "галочка в процессоре Apache NiFi).

Прочитаем файл предлагаемым способом:

# Читаем данныеdf = spark.read \          .format("csv") \          .option("sep", "\a") \          .load("file1.json")# Схема получившегося DataFramedf.printSchema()root |-- _c0: string (nullable = true)# Сами данныеdf.show()+--------------------+|                 _c0|+--------------------+|{"productId": 1, ...||{"productId": 2, ...|+--------------------+

То есть мы тут читаем JSON как обычный CSV, указывая разделитель, который никогда заведомо не встретится в наших данных. Например, Bell character. В итоге мы получим DataFrame из одного поля, к которому можно будет также применить dicstinct() и затем coalesce(), то есть менять существующий код не потребуется. Нам остается только определить опции в зависимости от формата:

# Для parquetin_format = "parquet"in_options = {}# Для JSONin_format = "csv"in_options = {"sep": "\a"}

Ну и при сохранении этого же DataFrame обратно на S3 в зависимости от формата данных опять применяем разные опции:

df.write.mode("overwrite") \           .format(out_format) \.options(**out_options) \  .save(path)  # для JSON     out_format = "text" out_options = {"compression": "gzip"}  # для parquet   out_format = input_format out_options = {"compression": "snappy"}

Следующей точкой доработки был шаг Parsing. В принципе, ничего сложного, если бы задача при этом упиралась в одну маленькую деталь: JSON -файл, в отличии от parquet, не содержит в себе схему данных. Для разовой загрузки это не является проблемой, так как при чтении JSON-файла Spark умеет сам определять схему, и даже в случае, если файл содержит несколько JSON-объектов с немного отличающимся набором полей, корректно выполнит mergeSchema. Но для регулярного процесса мы не могли уповать на это. Банально может случиться так, что во всех записях какого-то файла с данными может не оказаться некоего поля field_1, так как, например, в источнике оно заполняется не во всех случаях. Тогда в получившемся Spark DataFrame вообще не окажется этого поля, и наш Parsing, построенный на метаданных, просто-напросто упадет с ошибкой из-за того, что не найдет прописанное в маппинге поле.

Проиллюстрирую. Допустим,у нас есть два файла из одного источника со следующим наполнением:

file1 (тот же что и в примере выше):

{productId: 1, productName: ProductName 1, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}{productId: 2, price: 10.01, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}

file2:

{productId: 3, productName: ProductName 3, dimensions: {length: 10, width: 12, height: 12.5, package: [10, 20.5, 30]}}

Теперь прочитаем Sparkом их и посмотрим данные и схемы получившихся DataFrame:

df = spark.read \          .format("json") \          .option("multiline", "false") \          .load(path)df.printSchema()df.show()

Первый файл (схема и данные):

root |-- dimensions: struct (nullable = true) |    |-- height: double (nullable = true) |    |-- length: long (nullable = true) |    |-- width: long (nullable = true) |-- price: double (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true) |-- tags: array (nullable = true) |    |-- element: string (containsNull = true)+--------------+-----+---------+-------------+--------------+|    dimensions|price|productId|  productName|          tags|+--------------+-----+---------+-------------+--------------+|[12.5, 10, 12]| null|        1|ProductName 1|[tag 1, tag 2]||[12.5, 10, 12]|10.01|        2|         null|[tag 1, tag 2]|+--------------+-----+---------+-------------+--------------+

Второй файл (схема и данные):

root |-- dimensions: struct (nullable = true) |    |-- height: double (nullable = true) |    |-- length: long (nullable = true) |    |-- package: array (nullable = true) |    |    |-- element: double (containsNull = true) |    |-- width: long (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true)+--------------------+---------+-------------+|          dimensions|productId|  productName|+--------------------+---------+-------------+|[12.5, 10, [10.0,...|        3|ProductName 3|+--------------------+---------+-------------+

Как видно, Spark корректно выстроил схему отдельно для каждого файла. Если в какой-либо записи не было обнаружено поля, имеющегося в другой, то в DataFrame мы видим корректное проставление null (поля price и productName для первого файла).

Но в целом схемы получились разные, и если у нас в маппинге прописано, что нам нужно распарсить эти данные (то есть оба файла) в следующую плоскую структуру,

root |-- price: double (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true)

а во входных данных у нас присутствуют только файлы а-ля file2, где поля price нет ни у одной записи, то Spark упадет с ошибкой, так как не найдет поля price для формирования выходного DataFrame. С parquet-файлами такой проблемы как правило не возникает, так как сам parquet-файл генерируется из AVRO, который уже содержит полную схему данных и, соответственно, эта полная схема есть и в parquet-файле.

Еще надо отметить, что, естественно, мы хотели по максимум переиспользовать уже существующий и зарекомендовавший себя код нашего frameworkа, а не городить какую-то полностью отдельную ветку для обработки JSONов то есть все изменения хотелось сделать на этапе чтения JSON-файлов с S3.

Таким образом очевидно, что для корректной загрузки данных из JSON-файлов необходимо предопределить схему JSON-файла с данными и читать файлы уже с применением этой схемы. Тогда у нас даже если в JSONе нет какого-то поля, то в самом DataFrame это поле будет, но его значение подставится как null:

df = spark.read \          .format("json") \          .option("multiline","false") \          .schema(df_schema) \          .load(path)

Первая мысль была использовать для хранения схемы имеющийся сервис метаданных - то есть описать схему в YAML-формате и сохранить в имеющемся репозитории. Но с учетом того, что все данные источников у нас проходят через Kafka, решили, что логично для хранения схем использовать имеющийся Kafka Schema Registry, а схему хранить в стандартном для JSON формате (другой формат, кстати говоря, Kafka Schema Registry не позволил бы хранить).

В общем, вырисовывалась следующая реализация:

  • Читаем из Kafka Schema Registry схему

  • Импортируем ее в pyspark.sql.types.StructType что-то типа такого:

# 1. получаем через Kafka Schema Registry REST API схему данных # 2. записываем ее в переменную schema и далее:df_schema = StructType.fromJson(schema)
  • Ну и с помощью полученной схемы читаем JSON-файлы

Звучит хорошо, если бы Давайте посмотрим на формат JSON-схемы, понятной Sparkу. Пусть имеем простой JSON из file2 выше. Посмотреть его схему в формате JSON можно, выполнив:

df.schema.json()  
Получившаяся схема
{    "fields":    [        {            "metadata": {},            "name": "dimensions",            "nullable": true,            "type":            {                "fields":                [                    {"metadata":{},"name":"height","nullable":true,"type":"double"},                    {"metadata":{},"name":"length","nullable":true,"type":"long"},                    {"metadata":{},"name":"width","nullable":true,"type":"long"}                ],                "type": "struct"            }        },        {            "metadata": {},            "name": "price",            "nullable": true,            "type": "double"        },        {            "metadata": {},            "name": "productId",            "nullable": true,            "type": "long"        },        {            "metadata": {},            "name": "productName",            "nullable": true,            "type": "string"        },        {            "metadata": {},            "name": "tags",            "nullable": true,            "type":            {                "containsNull": true,                "elementType": "string",                "type": "array"            }        }    ],    "type": "struct"}

Как видно, это совсем не стандартный формат JSON-схемы.

Но мы же наверняка не первые, у кого встала задача конвертировать стандартную JSON-схему в формат, понятный Sparkу - подумали мы и В принципе, не ошиблись, но несколько часов поиска упорно выводили исключительно на примеры, из серии:

как сохранить схему уже прочитанного DataFrame в JSON, затем использовать повторно

либо на репозиторий https://github.com/zalando-incubator/spark-json-schema, который нам бы подошел, если мы использовали Scala, а не pySpark

В общем, на горизонте маячила перспектива писать SchemaConverter. Сперва решили отделаться малой кровью и написать простенький конвертер никаких ссылок и прочих сложных конструкций. Конвертер был успешно протестирован на синтетических данных, после чего захотелось скормить ему что-то приближенное к реальности.

К счастью, у нас уже был один источник, генерирующий данные в формате JSON. Как временное решение схема его интеграции в DataPlatform была незамысловата: NiFi читал данные из Kafka, преобразовывал их в parquet, использую прибитую гвоздями в NiFi схему в формате AVRO-schema, и складывал на S3. Схема данных была действительно непростой и с кучей вложенных структур и нескольких десятков полей - неплохой тест-кейс в общем-то:

Посмотреть длинную портянку, если кому интересно :)
root |-- taskId: string (nullable = true) |-- extOrderId: string (nullable = true) |-- taskStatus: string (nullable = true) |-- taskControlStatus: string (nullable = true) |-- documentVersion: long (nullable = true) |-- buId: long (nullable = true) |-- storeId: long (nullable = true) |-- priority: string (nullable = true) |-- created: struct (nullable = true) |    |-- createdBy: string (nullable = true) |    |-- created: string (nullable = true) |-- lastUpdateInformation: struct (nullable = true) |    |-- updatedBy: string (nullable = true) |    |-- updated: string (nullable = true) |-- customerId: string (nullable = true) |-- employeeId: string (nullable = true) |-- pointOfGiveAway: struct (nullable = true) |    |-- selected: string (nullable = true) |    |-- available: array (nullable = true) |    |    |-- element: string (containsNull = true) |-- dateOfGiveAway: string (nullable = true) |-- dateOfGiveAwayEnd: string (nullable = true) |-- pickingDeadline: string (nullable = true) |-- storageLocation: string (nullable = true) |-- currentStorageLocations: array (nullable = true) |    |-- element: string (containsNull = true) |-- customerType: string (nullable = true) |-- comment: string (nullable = true) |-- totalAmount: double (nullable = true) |-- currency: string (nullable = true) |-- stockDecrease: boolean (nullable = true) |-- offline: boolean (nullable = true) |-- trackId: string (nullable = true) |-- transportationType: string (nullable = true) |-- stockRebook: boolean (nullable = true) |-- notificationStatus: string (nullable = true) |-- lines: array (nullable = true) |    |-- element: struct (containsNull = true) |    |    |-- lineId: string (nullable = true) |    |    |-- extOrderLineId: string (nullable = true) |    |    |-- productId: string (nullable = true) |    |    |-- lineStatus: string (nullable = true) |    |    |-- lineControlStatus: string (nullable = true) |    |    |-- orderedQuantity: double (nullable = true) |    |    |-- confirmedQuantity: double (nullable = true) |    |    |-- assignedQuantity: double (nullable = true) |    |    |-- pickedQuantity: double (nullable = true) |    |    |-- controlledQuantity: double (nullable = true) |    |    |-- allowedForGiveAwayQuantity: double (nullable = true) |    |    |-- givenAwayQuantity: double (nullable = true) |    |    |-- returnedQuantity: double (nullable = true) |    |    |-- sellingScheme: string (nullable = true) |    |    |-- stockSource: string (nullable = true) |    |    |-- productPrice: double (nullable = true) |    |    |-- lineAmount: double (nullable = true) |    |    |-- currency: string (nullable = true) |    |    |-- markingFlag: string (nullable = true) |    |    |-- operations: array (nullable = true) |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |-- operationId: string (nullable = true) |    |    |    |    |-- type: string (nullable = true) |    |    |    |    |-- reason: string (nullable = true) |    |    |    |    |-- quantity: double (nullable = true) |    |    |    |    |-- dmCodes: array (nullable = true) |    |    |    |    |    |-- element: string (containsNull = true) |    |    |    |    |-- timeStamp: string (nullable = true) |    |    |    |    |-- updatedBy: string (nullable = true) |    |    |-- source: array (nullable = true) |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |-- type: string (nullable = true) |    |    |    |    |-- items: array (nullable = true) |    |    |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |    |    |-- assignedQuantity: double (nullable = true) |-- linkedObjects: array (nullable = true) |    |-- element: struct (containsNull = true) |    |    |-- objectType: string (nullable = true) |    |    |-- objectId: string (nullable = true) |    |    |-- objectStatus: string (nullable = true) |    |    |-- objectLines: array (nullable = true) |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |-- objectLineId: string (nullable = true) |    |    |    |    |-- taskLineId: string (nullable = true)

Естественно, я не захотел перебивать руками захардкоженную схему, а воспользовался одним из многочисленных онлайн-конвертеров, позволяющих из Avro-схемы сделать JSON-схему. И тут меня ждал неприятный сюрприз: все перепробованные мною конвертеры на выходе использовали гораздо больше синтаксических конструкций, чем понимала первая версия конвертера. Дополнительно пришло осознание, что также как и я, наши пользователи (а для нас пользователями в данном контексте являются владельцы источников данных) с большой вероятностью могут использовать подобные конвертеры для того, чтобы получить JSON-схему, которую надо зарегистрировать в Kafka Schema Registry, из того, что у них есть.

В результате наш SparkJsonSchemaConverter был доработан появилась поддержка более сложных конструкций, таких как definitions, refs (только внутренние) и oneOf. Сам же парсер был оформлен уже в отдельный класс, который сразу собирал на основании JSON-схемы объект pyspark.sql.types.StructType

У нас почти сразу же родилась мысль, что хорошо бы было поделиться им с сообществом, так как мы в Леруа Мерлен сами активно используем продукты Open Source, созданные и поддерживаемые энтузиастами со всего мира, и хотим не просто их использовать, но и контрибьютить обратно, участвуя в разработке Open Source продуктов и поддерживая сообщество. В настоящий момент мы решаем внутренние орг.вопросы по схеме выкладывания данного конвертера в Open Source и, уверен, что в ближайшее время поделимся с сообществом этой наработкой!

В итоге благодаря написанному SparkJsonSchemaConverterу доработка шага Parsing свелась только к небольшому тюнингу чтения данных с S3: в зависимости от формата входных данных источника (получаем из сервиса метаданных) читаем файлы с S3 немного по-разному:

# Для JSONdf = spark.read.format(in_format)\            .option("multiline", "false")\            .schema(json_schema) \            .load(path)# Для parquet:df = spark.read.format(in_format)\            .load(path)

А дальше отрабатывает уже существующий код, раскрывающий все вложенные структуры согласно маппингу и сохраняющий данные DataFrameа в несколько плоских CSV-файлов.

В итоге мы смогли при относительном минимуме внесенных изменений в код текущего frameworkа добавить в него поддержку интеграции в нашу Data Platform JSON-источников данных. И результат нашей работы уже заметен:

  • Всего через месяц после внедрения доработки у нас на ПРОДе проинтегрировано 4 новых JSON-источника!

  • Все текущие интеграции даже не заметили расширения функционала frameworkа, а значит, мы точно ничего не сломали произведенной доработкой.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru