Русский
Русский
English
Статистика
Реклама

Pyspark

Big Data Tools EAP 12 экспериментальная поддержка Python, поиск по ноутбукам в Zeppelin

16.12.2020 18:10:53 | Автор: admin

Только что вышло очередное обновление EAP 12 для плагина под названием Big Data Tools, доступного для установки в IntelliJ IDEA Ultimate, PyCharm Professional и DataGrip. Можно установить его через страницу плагина или внутри IDE. Плагин позволяет работать с Zeppelin, загружать файлы в облачные хранилища и проводить мониторинг кластеров Hadoop и Spark.


В этом релизе мы добавили экспериментальную поддержку Python и поиск по ноутбукам Zeppelin. Если вы страдали от каких-то багов, их тоже починено множество. Давайте поговорим об этих изменениях более подробно.



Экспериментальная поддержка Python в Zeppelin


Поддержку Python хотелось добавить давно. Несмотря на то, что PySpark в Zeppelin сейчас на волне хайпа, всё что предоставляет нам веб-интерфейс Zeppelin простейшее автодополнение, в котором содержится какой-то наполовину случайный набор переменных и функций. Вряд ли это можно ставить в вину Zeppelin, он никогда не обещал нам умного анализа кода. В IDE хочется видеть что-то намного большее.


Добавить целый новый язык звучит как очень сложная задача. К счастью, в наших IDE уже есть отличная поддержка Python: либо в PyCharm, либо в Python-плагине для IntelliJ IDEA. Нам нужно было взять эту готовую функциональность и интегрировать внутрь Zeppelin. Вместе с этим возникает много нюансов, специфичных для Zeppelin: как нам проанализировать список зависимостей, как найти правильную версию Python, и тому подобное.



Начиная с EAP 12, код на Python нормально подсвечивается в нашем редакторе ноутбуков Zeppelin, отображаются грубые синтаксические ошибки. Можно перейти на определение переменной или функции, если они объявлены внутри ноутбука. Можно сделать привычные рефакторинги вроде rename или change signature. Работают Zeppelin-специфичные таблицы и графики в конце концов, зачастую ради них люди и используют Zeppelin.



Конечно, многие вещи ещё предстоит сделать. Например, очень хотелось бы видеть умное автодополнение по функциям Spark API и другому внешнему коду. Сейчас мы нормально автодополняем только то, что написано внутри ноутбука. У нас есть хорошие идеи, как это реализовать в следующих релизах. Иначе говоря, не надо ждать какого-то чуда: теперь у вас есть полнофункциональный редактор Python, но это всё. Поддержка Python получилась довольно экспериментальной, но, как говорится, путь в тысячу ли начинается с первого шага. А ещё, даже имеющейся функциональности может оказаться достаточно, чтобы писать код в вашем любимом IDE и не переключаться на веб-интерфейс Zeppelin.


Смешиваем Scala и Python


Иногда, в одном и том же ноутбуке вам хочется одновременно использовать и Python, и Scala. Например, это бывает полезно из соображений производительности в вычислительных задачах.


Смешивать разные языки вполне возможно. Но не забывайте, что для полноценной поддержки Scala вам понадобится IntelliJ IDEA с плагинами Scala и Python. В PyCharm этот Scala-код хоть и будет выполняться, но его поддержка в редакторе останется на уровне plain text.



Поиск по ноутбукам Zeppelin


У нас всегда была возможность найти, в каком же файле на диске находится нужный нам текст (например, с помощью Find in Path, Ctrl+Shift+F). Но этот стандартный интерфейс поиска не работает с ноутбуками, ведь они не файлы!


Начиная с EAP 12 мы добавили отдельную панель поиска по ноутбукам. Откройте панель Big Data Tools, выделите какое-нибудь из подключений к Zeppelin и нажмите на кнопку с изображением лупы (или используйте сочетание клавиш Ctrl+F на клавиатуре). В результате, вы попадёте в окно под названием Find in Zeppelin Connections. Активация одного из результатов поиска приведет к открытию этого ноутбука и переходу на нужный параграф.



Похожий поиск есть и в веб-интерфейсе Zeppelin. Для получения результатов поиска мы используем стандартный HTTP API, поэтому результаты должны совпадать с тем, что вы видите в интерфейсе Zeppeliln по аналогичному поисковому запросу. Если вы раньше пользовались веб-интерфейсом и привыкли к поиску по ноутбукам, теперь он имеется и в Big Data Tools.


Функция небольшая, но очень полезная. Непонятно, как мы без неё жили раньше.


Исправления ошибок


Плагин Big Data Tools активно развивается, и при бурном росте неизбежны некоторые проблемы. В этом релизе мы провели много работы над правильной работой с удаленными хранилищами, отображением графиков и параграфов, переработали часть интерефейсов (например, SSH-туннели). Переработаны кое-какие системные вещи (например, несколько проектов теперь используют общее подключение к Zeppelin), вывезли множество ошибок в неожиданных местах. В целом, теперь пользоваться плагином намного приятней.


Если вам интересен обзор основных улучшений, то их можно найти в разделе What's New на странице плагина. Если вы ищете какую-то конкретную проблему, вам может подойти полный отчет из YouTrack.


Спасибо, что пользуетесь нашим плагином! Напоминаю, что установить свежую версию можно либо в браузере, на странице плагина, или внутри IDE по названию Big Data Tools. На странице плагина можно оставлять ваши отзывы и предложения (которые мы всегда читаем), и ставить оценки в виде звёздочек.


Документация и социальные сети


Ну и наконец, если вам нужно разобраться функциональностью Big Data Tools, у нас есть подробная документация в вариантах для IntelliJ IDEA и PyCharm. Если хочется задать вопрос, можно сделать это прямо в комментариях на Хабре или перейти в наш Twitter.


Надеемся, что все эти улучшения окажутся полезными, позволят вам делать более интересыне вещи, и более приятным способом.


Команда Big Data Tools

Подробнее..

Что нам стоит загрузить JSON в Data Platform

16.06.2021 16:13:28 | Автор: admin

Всем привет!

В недавней статье мы рассказали, как мы шли к построению нашей Data Platform. Сегодня хотелось бы глубже погрузиться в желудок нашей платформы и попутно рассказать вам о том, как мы решали одну из задач, которая возникла в связи с ростом разнообразия интегрируемых источников данных.

То есть, если возвращаться к финальной картинке из упомянутой выше статьи (специально дублирую ее, чтобы уважаемым читателям было удобнее), то сегодня мы будем более углубленно говорить о реализации правой части схемы той, что лежит после Apache NiFi.

Схема из прошлой нашей статьи.Схема из прошлой нашей статьи.

Напомним, что в нашей компании более 350 реляционных баз данных. Естественно, не все они уникальны и многие представляют собой по сути разные экземпляры одной и той же системы, установленной во всех магазинах торговой сети, но все же зоопарк разнообразия присутствует. Поэтому без какого-либо Frameworkа, упрощающего и ускоряющего интеграцию источников в Data Platform, не обойтись.

Общая схема доставки данных из источников в ODS-слой Greenplum посредством разработанного нами frameworkа приведена ниже:

Общая схема доставки данных в ODS-слой Greenplum Общая схема доставки данных в ODS-слой Greenplum
  1. Данные из систем-источников пишутся в Kafka в AVRO-формате, обрабатываются в режиме реального времени Apache NiFi, который сохраняет их в формате parquet на S3.

  2. Затем эти файлы с сырыми данными с помощью Sparkа обрабатываются в два этапа:

    1. Compaction на данном этапе выполняется объединение для снижения количества выходных файлов с целью оптимизации записи и последующего чтения (то есть несколько более мелких файлов объединяются в несколько файлов побольше), а также производится дедубликация данных: простой distinct() и затем coalesce(). Результат сохраняется на S3. Эти файлы используются затем для parsing'а , а также являются своеобразным архивом сырых необработанных данных в формате как есть;

    2. Parsing на этой фазе производится разбор входных данных и сохранение их в плоские структуры согласно маппингу, описанному в метаданных. В общем случае из одного входного файла можно получить на выходе несколько плоских структур, которые в виде сжатых (как правило gzip) CSV-файлов сохраняются на S3.

  3. Заключительный этап загрузка данных CSV-файлов в ODS-слой хранилища: создается временная external table над данными в S3 через PXF S3 connector, после чего данные уже простым pgsql переливаются в таблицы ODS-слоя Greenplum

  4. Все это оркестрируется с помощью Airflow.

DAGи для Airflow у нас генерируются динамически на основании метаданных. Parsing файлов и разложение их в плоские структуры также производится с использованием метаданных. Это приводит к упрощению интеграции нового источника, так как, для этого необходимо всего лишь:

  • создать в ODS-слое Хранилища таблицы-приемники данных;

  • в репозитории метаданных в Git согласно принятым стандартам прописать в виде отдельных YAML-файлов:

    • общие настройки источника (такие как: расписание загрузки, входной и выходной формат файлов с данными, S3-бакет, имя сервисного пользователя, имя и email владельца для нотификации в случае сбоя загрузки и т.п.);

    • маппинг данных объектов источника на таблицы слоя ODS (при этом поддерживаются как плоские, так и вложенные структуры и массивы, а также есть возможность данные из одного объекта раскладывать в ODS-слое по нескольким таблицам). То есть описать, как необходимо сложную вложенную структуру разложить в плоские таблицы;

До недавнего времени такой подход удовлетворял текущие наши потребности, но количество и разнообразие источников данных растет. У нас стали появляться источники, которые не являются реляционными базами данных, а генерируют данные в виде потока JSON-объектов. Кроме того на горизонте уже маячила интеграция источника, который под собой имел MongoDB и поэтому будет использовать MongoDB Kafka source connector для записи данных в Kafka. Поэтому остро встала необходимость доработки нашего frameworkа для поддержки такого сценария. Хотелось, чтобы данные источника сразу попадали на S3 в формате JSON - то есть в формате "как есть", без лишнего шага конвертации в parquet посредством Apache NiFi.

В первую очередь необходимо было доработать шаг Compaction. Его код, если убрать всю обвязку и выделить только главное, очень простой:

df = spark.read.format(in_format) \               .options(**in_options) \               .load(path) \               .distinct()    new_df = df.coalesce(div)new_df.write.mode("overwrite") \             .format(out_format) \            .options(**out_options) \            .save(path)

Но если мы проделаем все те же манипуляции с JSON-данными, то волей-неволей внесем изменения во входные данные, так как при чтении JSONов Spark автоматом определит и сделает mergeSchema, т.е. мы тем самым можем исказить входные данные, чего не хотелось бы. Ведь наша задача на этом шаге только укрупнить файлы и дедублицировать данные, без какого-либо вмешательства в их структуру и наполнение. То есть сохранить их как есть.

По-хорошему, нам надо было просто прочитать данные как обычный текст, дедублицировать строки, укрупнить файлы и положить обратно на S3. Для этого был предложен достаточно изящный способ:

рассматривать файлы с JSON-объектами как DataFrame с одной колонкой, содержащей весь JSON-объект.

Попробуем сделать это. Допустим, мы имеем следующий файл данных:

file1:

{productId: 1, productName: ProductName 1, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}{productId: 2, price: 10.01, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}

Обратите внимание на формат этого файла. Это файл с JSON-объектами, где 1 строка = 1 объект. Оставаясь, по сути, JSON-ом, он при этом не пройдет синтаксическую JSON-валидацию. Именно в таком виде мы сохраняем JSON-данные на S3 (есть специальная "галочка в процессоре Apache NiFi).

Прочитаем файл предлагаемым способом:

# Читаем данныеdf = spark.read \          .format("csv") \          .option("sep", "\a") \          .load("file1.json")# Схема получившегося DataFramedf.printSchema()root |-- _c0: string (nullable = true)# Сами данныеdf.show()+--------------------+|                 _c0|+--------------------+|{"productId": 1, ...||{"productId": 2, ...|+--------------------+

То есть мы тут читаем JSON как обычный CSV, указывая разделитель, который никогда заведомо не встретится в наших данных. Например, Bell character. В итоге мы получим DataFrame из одного поля, к которому можно будет также применить dicstinct() и затем coalesce(), то есть менять существующий код не потребуется. Нам остается только определить опции в зависимости от формата:

# Для parquetin_format = "parquet"in_options = {}# Для JSONin_format = "csv"in_options = {"sep": "\a"}

Ну и при сохранении этого же DataFrame обратно на S3 в зависимости от формата данных опять применяем разные опции:

df.write.mode("overwrite") \           .format(out_format) \.options(**out_options) \  .save(path)  # для JSON     out_format = "text" out_options = {"compression": "gzip"}  # для parquet   out_format = input_format out_options = {"compression": "snappy"}

Следующей точкой доработки был шаг Parsing. В принципе, ничего сложного, если бы задача при этом упиралась в одну маленькую деталь: JSON -файл, в отличии от parquet, не содержит в себе схему данных. Для разовой загрузки это не является проблемой, так как при чтении JSON-файла Spark умеет сам определять схему, и даже в случае, если файл содержит несколько JSON-объектов с немного отличающимся набором полей, корректно выполнит mergeSchema. Но для регулярного процесса мы не могли уповать на это. Банально может случиться так, что во всех записях какого-то файла с данными может не оказаться некоего поля field_1, так как, например, в источнике оно заполняется не во всех случаях. Тогда в получившемся Spark DataFrame вообще не окажется этого поля, и наш Parsing, построенный на метаданных, просто-напросто упадет с ошибкой из-за того, что не найдет прописанное в маппинге поле.

Проиллюстрирую. Допустим,у нас есть два файла из одного источника со следующим наполнением:

file1 (тот же что и в примере выше):

{productId: 1, productName: ProductName 1, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}{productId: 2, price: 10.01, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}

file2:

{productId: 3, productName: ProductName 3, dimensions: {length: 10, width: 12, height: 12.5, package: [10, 20.5, 30]}}

Теперь прочитаем Sparkом их и посмотрим данные и схемы получившихся DataFrame:

df = spark.read \          .format("json") \          .option("multiline", "false") \          .load(path)df.printSchema()df.show()

Первый файл (схема и данные):

root |-- dimensions: struct (nullable = true) |    |-- height: double (nullable = true) |    |-- length: long (nullable = true) |    |-- width: long (nullable = true) |-- price: double (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true) |-- tags: array (nullable = true) |    |-- element: string (containsNull = true)+--------------+-----+---------+-------------+--------------+|    dimensions|price|productId|  productName|          tags|+--------------+-----+---------+-------------+--------------+|[12.5, 10, 12]| null|        1|ProductName 1|[tag 1, tag 2]||[12.5, 10, 12]|10.01|        2|         null|[tag 1, tag 2]|+--------------+-----+---------+-------------+--------------+

Второй файл (схема и данные):

root |-- dimensions: struct (nullable = true) |    |-- height: double (nullable = true) |    |-- length: long (nullable = true) |    |-- package: array (nullable = true) |    |    |-- element: double (containsNull = true) |    |-- width: long (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true)+--------------------+---------+-------------+|          dimensions|productId|  productName|+--------------------+---------+-------------+|[12.5, 10, [10.0,...|        3|ProductName 3|+--------------------+---------+-------------+

Как видно, Spark корректно выстроил схему отдельно для каждого файла. Если в какой-либо записи не было обнаружено поля, имеющегося в другой, то в DataFrame мы видим корректное проставление null (поля price и productName для первого файла).

Но в целом схемы получились разные, и если у нас в маппинге прописано, что нам нужно распарсить эти данные (то есть оба файла) в следующую плоскую структуру,

root |-- price: double (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true)

а во входных данных у нас присутствуют только файлы а-ля file2, где поля price нет ни у одной записи, то Spark упадет с ошибкой, так как не найдет поля price для формирования выходного DataFrame. С parquet-файлами такой проблемы как правило не возникает, так как сам parquet-файл генерируется из AVRO, который уже содержит полную схему данных и, соответственно, эта полная схема есть и в parquet-файле.

Еще надо отметить, что, естественно, мы хотели по максимум переиспользовать уже существующий и зарекомендовавший себя код нашего frameworkа, а не городить какую-то полностью отдельную ветку для обработки JSONов то есть все изменения хотелось сделать на этапе чтения JSON-файлов с S3.

Таким образом очевидно, что для корректной загрузки данных из JSON-файлов необходимо предопределить схему JSON-файла с данными и читать файлы уже с применением этой схемы. Тогда у нас даже если в JSONе нет какого-то поля, то в самом DataFrame это поле будет, но его значение подставится как null:

df = spark.read \          .format("json") \          .option("multiline","false") \          .schema(df_schema) \          .load(path)

Первая мысль была использовать для хранения схемы имеющийся сервис метаданных - то есть описать схему в YAML-формате и сохранить в имеющемся репозитории. Но с учетом того, что все данные источников у нас проходят через Kafka, решили, что логично для хранения схем использовать имеющийся Kafka Schema Registry, а схему хранить в стандартном для JSON формате (другой формат, кстати говоря, Kafka Schema Registry не позволил бы хранить).

В общем, вырисовывалась следующая реализация:

  • Читаем из Kafka Schema Registry схему

  • Импортируем ее в pyspark.sql.types.StructType что-то типа такого:

# 1. получаем через Kafka Schema Registry REST API схему данных # 2. записываем ее в переменную schema и далее:df_schema = StructType.fromJson(schema)
  • Ну и с помощью полученной схемы читаем JSON-файлы

Звучит хорошо, если бы Давайте посмотрим на формат JSON-схемы, понятной Sparkу. Пусть имеем простой JSON из file2 выше. Посмотреть его схему в формате JSON можно, выполнив:

df.schema.json()  
Получившаяся схема
{    "fields":    [        {            "metadata": {},            "name": "dimensions",            "nullable": true,            "type":            {                "fields":                [                    {"metadata":{},"name":"height","nullable":true,"type":"double"},                    {"metadata":{},"name":"length","nullable":true,"type":"long"},                    {"metadata":{},"name":"width","nullable":true,"type":"long"}                ],                "type": "struct"            }        },        {            "metadata": {},            "name": "price",            "nullable": true,            "type": "double"        },        {            "metadata": {},            "name": "productId",            "nullable": true,            "type": "long"        },        {            "metadata": {},            "name": "productName",            "nullable": true,            "type": "string"        },        {            "metadata": {},            "name": "tags",            "nullable": true,            "type":            {                "containsNull": true,                "elementType": "string",                "type": "array"            }        }    ],    "type": "struct"}

Как видно, это совсем не стандартный формат JSON-схемы.

Но мы же наверняка не первые, у кого встала задача конвертировать стандартную JSON-схему в формат, понятный Sparkу - подумали мы и В принципе, не ошиблись, но несколько часов поиска упорно выводили исключительно на примеры, из серии:

как сохранить схему уже прочитанного DataFrame в JSON, затем использовать повторно

либо на репозиторий https://github.com/zalando-incubator/spark-json-schema, который нам бы подошел, если мы использовали Scala, а не pySpark

В общем, на горизонте маячила перспектива писать SchemaConverter. Сперва решили отделаться малой кровью и написать простенький конвертер никаких ссылок и прочих сложных конструкций. Конвертер был успешно протестирован на синтетических данных, после чего захотелось скормить ему что-то приближенное к реальности.

К счастью, у нас уже был один источник, генерирующий данные в формате JSON. Как временное решение схема его интеграции в DataPlatform была незамысловата: NiFi читал данные из Kafka, преобразовывал их в parquet, использую прибитую гвоздями в NiFi схему в формате AVRO-schema, и складывал на S3. Схема данных была действительно непростой и с кучей вложенных структур и нескольких десятков полей - неплохой тест-кейс в общем-то:

Посмотреть длинную портянку, если кому интересно :)
root |-- taskId: string (nullable = true) |-- extOrderId: string (nullable = true) |-- taskStatus: string (nullable = true) |-- taskControlStatus: string (nullable = true) |-- documentVersion: long (nullable = true) |-- buId: long (nullable = true) |-- storeId: long (nullable = true) |-- priority: string (nullable = true) |-- created: struct (nullable = true) |    |-- createdBy: string (nullable = true) |    |-- created: string (nullable = true) |-- lastUpdateInformation: struct (nullable = true) |    |-- updatedBy: string (nullable = true) |    |-- updated: string (nullable = true) |-- customerId: string (nullable = true) |-- employeeId: string (nullable = true) |-- pointOfGiveAway: struct (nullable = true) |    |-- selected: string (nullable = true) |    |-- available: array (nullable = true) |    |    |-- element: string (containsNull = true) |-- dateOfGiveAway: string (nullable = true) |-- dateOfGiveAwayEnd: string (nullable = true) |-- pickingDeadline: string (nullable = true) |-- storageLocation: string (nullable = true) |-- currentStorageLocations: array (nullable = true) |    |-- element: string (containsNull = true) |-- customerType: string (nullable = true) |-- comment: string (nullable = true) |-- totalAmount: double (nullable = true) |-- currency: string (nullable = true) |-- stockDecrease: boolean (nullable = true) |-- offline: boolean (nullable = true) |-- trackId: string (nullable = true) |-- transportationType: string (nullable = true) |-- stockRebook: boolean (nullable = true) |-- notificationStatus: string (nullable = true) |-- lines: array (nullable = true) |    |-- element: struct (containsNull = true) |    |    |-- lineId: string (nullable = true) |    |    |-- extOrderLineId: string (nullable = true) |    |    |-- productId: string (nullable = true) |    |    |-- lineStatus: string (nullable = true) |    |    |-- lineControlStatus: string (nullable = true) |    |    |-- orderedQuantity: double (nullable = true) |    |    |-- confirmedQuantity: double (nullable = true) |    |    |-- assignedQuantity: double (nullable = true) |    |    |-- pickedQuantity: double (nullable = true) |    |    |-- controlledQuantity: double (nullable = true) |    |    |-- allowedForGiveAwayQuantity: double (nullable = true) |    |    |-- givenAwayQuantity: double (nullable = true) |    |    |-- returnedQuantity: double (nullable = true) |    |    |-- sellingScheme: string (nullable = true) |    |    |-- stockSource: string (nullable = true) |    |    |-- productPrice: double (nullable = true) |    |    |-- lineAmount: double (nullable = true) |    |    |-- currency: string (nullable = true) |    |    |-- markingFlag: string (nullable = true) |    |    |-- operations: array (nullable = true) |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |-- operationId: string (nullable = true) |    |    |    |    |-- type: string (nullable = true) |    |    |    |    |-- reason: string (nullable = true) |    |    |    |    |-- quantity: double (nullable = true) |    |    |    |    |-- dmCodes: array (nullable = true) |    |    |    |    |    |-- element: string (containsNull = true) |    |    |    |    |-- timeStamp: string (nullable = true) |    |    |    |    |-- updatedBy: string (nullable = true) |    |    |-- source: array (nullable = true) |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |-- type: string (nullable = true) |    |    |    |    |-- items: array (nullable = true) |    |    |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |    |    |-- assignedQuantity: double (nullable = true) |-- linkedObjects: array (nullable = true) |    |-- element: struct (containsNull = true) |    |    |-- objectType: string (nullable = true) |    |    |-- objectId: string (nullable = true) |    |    |-- objectStatus: string (nullable = true) |    |    |-- objectLines: array (nullable = true) |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |-- objectLineId: string (nullable = true) |    |    |    |    |-- taskLineId: string (nullable = true)

Естественно, я не захотел перебивать руками захардкоженную схему, а воспользовался одним из многочисленных онлайн-конвертеров, позволяющих из Avro-схемы сделать JSON-схему. И тут меня ждал неприятный сюрприз: все перепробованные мною конвертеры на выходе использовали гораздо больше синтаксических конструкций, чем понимала первая версия конвертера. Дополнительно пришло осознание, что также как и я, наши пользователи (а для нас пользователями в данном контексте являются владельцы источников данных) с большой вероятностью могут использовать подобные конвертеры для того, чтобы получить JSON-схему, которую надо зарегистрировать в Kafka Schema Registry, из того, что у них есть.

В результате наш SparkJsonSchemaConverter был доработан появилась поддержка более сложных конструкций, таких как definitions, refs (только внутренние) и oneOf. Сам же парсер был оформлен уже в отдельный класс, который сразу собирал на основании JSON-схемы объект pyspark.sql.types.StructType

У нас почти сразу же родилась мысль, что хорошо бы было поделиться им с сообществом, так как мы в Леруа Мерлен сами активно используем продукты Open Source, созданные и поддерживаемые энтузиастами со всего мира, и хотим не просто их использовать, но и контрибьютить обратно, участвуя в разработке Open Source продуктов и поддерживая сообщество. В настоящий момент мы решаем внутренние орг.вопросы по схеме выкладывания данного конвертера в Open Source и, уверен, что в ближайшее время поделимся с сообществом этой наработкой!

В итоге благодаря написанному SparkJsonSchemaConverterу доработка шага Parsing свелась только к небольшому тюнингу чтения данных с S3: в зависимости от формата входных данных источника (получаем из сервиса метаданных) читаем файлы с S3 немного по-разному:

# Для JSONdf = spark.read.format(in_format)\            .option("multiline", "false")\            .schema(json_schema) \            .load(path)# Для parquet:df = spark.read.format(in_format)\            .load(path)

А дальше отрабатывает уже существующий код, раскрывающий все вложенные структуры согласно маппингу и сохраняющий данные DataFrameа в несколько плоских CSV-файлов.

В итоге мы смогли при относительном минимуме внесенных изменений в код текущего frameworkа добавить в него поддержку интеграции в нашу Data Platform JSON-источников данных. И результат нашей работы уже заметен:

  • Всего через месяц после внедрения доработки у нас на ПРОДе проинтегрировано 4 новых JSON-источника!

  • Все текущие интеграции даже не заметили расширения функционала frameworkа, а значит, мы точно ничего не сломали произведенной доработкой.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru