Русский
Русский
English
Статистика
Реклама

Что нам стоит загрузить JSON в Data Platform

Всем привет!

В недавней статье мы рассказали, как мы шли к построению нашей Data Platform. Сегодня хотелось бы глубже погрузиться в желудок нашей платформы и попутно рассказать вам о том, как мы решали одну из задач, которая возникла в связи с ростом разнообразия интегрируемых источников данных.

То есть, если возвращаться к финальной картинке из упомянутой выше статьи (специально дублирую ее, чтобы уважаемым читателям было удобнее), то сегодня мы будем более углубленно говорить о реализации правой части схемы той, что лежит после Apache NiFi.

Схема из прошлой нашей статьи.Схема из прошлой нашей статьи.

Напомним, что в нашей компании более 350 реляционных баз данных. Естественно, не все они уникальны и многие представляют собой по сути разные экземпляры одной и той же системы, установленной во всех магазинах торговой сети, но все же зоопарк разнообразия присутствует. Поэтому без какого-либо Frameworkа, упрощающего и ускоряющего интеграцию источников в Data Platform, не обойтись.

Общая схема доставки данных из источников в ODS-слой Greenplum посредством разработанного нами frameworkа приведена ниже:

Общая схема доставки данных в ODS-слой Greenplum Общая схема доставки данных в ODS-слой Greenplum
  1. Данные из систем-источников пишутся в Kafka в AVRO-формате, обрабатываются в режиме реального времени Apache NiFi, который сохраняет их в формате parquet на S3.

  2. Затем эти файлы с сырыми данными с помощью Sparkа обрабатываются в два этапа:

    1. Compaction на данном этапе выполняется объединение для снижения количества выходных файлов с целью оптимизации записи и последующего чтения (то есть несколько более мелких файлов объединяются в несколько файлов побольше), а также производится дедубликация данных: простой distinct() и затем coalesce(). Результат сохраняется на S3. Эти файлы используются затем для parsing'а , а также являются своеобразным архивом сырых необработанных данных в формате как есть;

    2. Parsing на этой фазе производится разбор входных данных и сохранение их в плоские структуры согласно маппингу, описанному в метаданных. В общем случае из одного входного файла можно получить на выходе несколько плоских структур, которые в виде сжатых (как правило gzip) CSV-файлов сохраняются на S3.

  3. Заключительный этап загрузка данных CSV-файлов в ODS-слой хранилища: создается временная external table над данными в S3 через PXF S3 connector, после чего данные уже простым pgsql переливаются в таблицы ODS-слоя Greenplum

  4. Все это оркестрируется с помощью Airflow.

DAGи для Airflow у нас генерируются динамически на основании метаданных. Parsing файлов и разложение их в плоские структуры также производится с использованием метаданных. Это приводит к упрощению интеграции нового источника, так как, для этого необходимо всего лишь:

  • создать в ODS-слое Хранилища таблицы-приемники данных;

  • в репозитории метаданных в Git согласно принятым стандартам прописать в виде отдельных YAML-файлов:

    • общие настройки источника (такие как: расписание загрузки, входной и выходной формат файлов с данными, S3-бакет, имя сервисного пользователя, имя и email владельца для нотификации в случае сбоя загрузки и т.п.);

    • маппинг данных объектов источника на таблицы слоя ODS (при этом поддерживаются как плоские, так и вложенные структуры и массивы, а также есть возможность данные из одного объекта раскладывать в ODS-слое по нескольким таблицам). То есть описать, как необходимо сложную вложенную структуру разложить в плоские таблицы;

До недавнего времени такой подход удовлетворял текущие наши потребности, но количество и разнообразие источников данных растет. У нас стали появляться источники, которые не являются реляционными базами данных, а генерируют данные в виде потока JSON-объектов. Кроме того на горизонте уже маячила интеграция источника, который под собой имел MongoDB и поэтому будет использовать MongoDB Kafka source connector для записи данных в Kafka. Поэтому остро встала необходимость доработки нашего frameworkа для поддержки такого сценария. Хотелось, чтобы данные источника сразу попадали на S3 в формате JSON - то есть в формате "как есть", без лишнего шага конвертации в parquet посредством Apache NiFi.

В первую очередь необходимо было доработать шаг Compaction. Его код, если убрать всю обвязку и выделить только главное, очень простой:

df = spark.read.format(in_format) \               .options(**in_options) \               .load(path) \               .distinct()    new_df = df.coalesce(div)new_df.write.mode("overwrite") \             .format(out_format) \            .options(**out_options) \            .save(path)

Но если мы проделаем все те же манипуляции с JSON-данными, то волей-неволей внесем изменения во входные данные, так как при чтении JSONов Spark автоматом определит и сделает mergeSchema, т.е. мы тем самым можем исказить входные данные, чего не хотелось бы. Ведь наша задача на этом шаге только укрупнить файлы и дедублицировать данные, без какого-либо вмешательства в их структуру и наполнение. То есть сохранить их как есть.

По-хорошему, нам надо было просто прочитать данные как обычный текст, дедублицировать строки, укрупнить файлы и положить обратно на S3. Для этого был предложен достаточно изящный способ:

рассматривать файлы с JSON-объектами как DataFrame с одной колонкой, содержащей весь JSON-объект.

Попробуем сделать это. Допустим, мы имеем следующий файл данных:

file1:

{productId: 1, productName: ProductName 1, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}{productId: 2, price: 10.01, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}

Обратите внимание на формат этого файла. Это файл с JSON-объектами, где 1 строка = 1 объект. Оставаясь, по сути, JSON-ом, он при этом не пройдет синтаксическую JSON-валидацию. Именно в таком виде мы сохраняем JSON-данные на S3 (есть специальная "галочка в процессоре Apache NiFi).

Прочитаем файл предлагаемым способом:

# Читаем данныеdf = spark.read \          .format("csv") \          .option("sep", "\a") \          .load("file1.json")# Схема получившегося DataFramedf.printSchema()root |-- _c0: string (nullable = true)# Сами данныеdf.show()+--------------------+|                 _c0|+--------------------+|{"productId": 1, ...||{"productId": 2, ...|+--------------------+

То есть мы тут читаем JSON как обычный CSV, указывая разделитель, который никогда заведомо не встретится в наших данных. Например, Bell character. В итоге мы получим DataFrame из одного поля, к которому можно будет также применить dicstinct() и затем coalesce(), то есть менять существующий код не потребуется. Нам остается только определить опции в зависимости от формата:

# Для parquetin_format = "parquet"in_options = {}# Для JSONin_format = "csv"in_options = {"sep": "\a"}

Ну и при сохранении этого же DataFrame обратно на S3 в зависимости от формата данных опять применяем разные опции:

df.write.mode("overwrite") \           .format(out_format) \.options(**out_options) \  .save(path)  # для JSON     out_format = "text" out_options = {"compression": "gzip"}  # для parquet   out_format = input_format out_options = {"compression": "snappy"}

Следующей точкой доработки был шаг Parsing. В принципе, ничего сложного, если бы задача при этом упиралась в одну маленькую деталь: JSON -файл, в отличии от parquet, не содержит в себе схему данных. Для разовой загрузки это не является проблемой, так как при чтении JSON-файла Spark умеет сам определять схему, и даже в случае, если файл содержит несколько JSON-объектов с немного отличающимся набором полей, корректно выполнит mergeSchema. Но для регулярного процесса мы не могли уповать на это. Банально может случиться так, что во всех записях какого-то файла с данными может не оказаться некоего поля field_1, так как, например, в источнике оно заполняется не во всех случаях. Тогда в получившемся Spark DataFrame вообще не окажется этого поля, и наш Parsing, построенный на метаданных, просто-напросто упадет с ошибкой из-за того, что не найдет прописанное в маппинге поле.

Проиллюстрирую. Допустим,у нас есть два файла из одного источника со следующим наполнением:

file1 (тот же что и в примере выше):

{productId: 1, productName: ProductName 1, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}{productId: 2, price: 10.01, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}

file2:

{productId: 3, productName: ProductName 3, dimensions: {length: 10, width: 12, height: 12.5, package: [10, 20.5, 30]}}

Теперь прочитаем Sparkом их и посмотрим данные и схемы получившихся DataFrame:

df = spark.read \          .format("json") \          .option("multiline", "false") \          .load(path)df.printSchema()df.show()

Первый файл (схема и данные):

root |-- dimensions: struct (nullable = true) |    |-- height: double (nullable = true) |    |-- length: long (nullable = true) |    |-- width: long (nullable = true) |-- price: double (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true) |-- tags: array (nullable = true) |    |-- element: string (containsNull = true)+--------------+-----+---------+-------------+--------------+|    dimensions|price|productId|  productName|          tags|+--------------+-----+---------+-------------+--------------+|[12.5, 10, 12]| null|        1|ProductName 1|[tag 1, tag 2]||[12.5, 10, 12]|10.01|        2|         null|[tag 1, tag 2]|+--------------+-----+---------+-------------+--------------+

Второй файл (схема и данные):

root |-- dimensions: struct (nullable = true) |    |-- height: double (nullable = true) |    |-- length: long (nullable = true) |    |-- package: array (nullable = true) |    |    |-- element: double (containsNull = true) |    |-- width: long (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true)+--------------------+---------+-------------+|          dimensions|productId|  productName|+--------------------+---------+-------------+|[12.5, 10, [10.0,...|        3|ProductName 3|+--------------------+---------+-------------+

Как видно, Spark корректно выстроил схему отдельно для каждого файла. Если в какой-либо записи не было обнаружено поля, имеющегося в другой, то в DataFrame мы видим корректное проставление null (поля price и productName для первого файла).

Но в целом схемы получились разные, и если у нас в маппинге прописано, что нам нужно распарсить эти данные (то есть оба файла) в следующую плоскую структуру,

root |-- price: double (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true)

а во входных данных у нас присутствуют только файлы а-ля file2, где поля price нет ни у одной записи, то Spark упадет с ошибкой, так как не найдет поля price для формирования выходного DataFrame. С parquet-файлами такой проблемы как правило не возникает, так как сам parquet-файл генерируется из AVRO, который уже содержит полную схему данных и, соответственно, эта полная схема есть и в parquet-файле.

Еще надо отметить, что, естественно, мы хотели по максимум переиспользовать уже существующий и зарекомендовавший себя код нашего frameworkа, а не городить какую-то полностью отдельную ветку для обработки JSONов то есть все изменения хотелось сделать на этапе чтения JSON-файлов с S3.

Таким образом очевидно, что для корректной загрузки данных из JSON-файлов необходимо предопределить схему JSON-файла с данными и читать файлы уже с применением этой схемы. Тогда у нас даже если в JSONе нет какого-то поля, то в самом DataFrame это поле будет, но его значение подставится как null:

df = spark.read \          .format("json") \          .option("multiline","false") \          .schema(df_schema) \          .load(path)

Первая мысль была использовать для хранения схемы имеющийся сервис метаданных - то есть описать схему в YAML-формате и сохранить в имеющемся репозитории. Но с учетом того, что все данные источников у нас проходят через Kafka, решили, что логично для хранения схем использовать имеющийся Kafka Schema Registry, а схему хранить в стандартном для JSON формате (другой формат, кстати говоря, Kafka Schema Registry не позволил бы хранить).

В общем, вырисовывалась следующая реализация:

  • Читаем из Kafka Schema Registry схему

  • Импортируем ее в pyspark.sql.types.StructType что-то типа такого:

# 1. получаем через Kafka Schema Registry REST API схему данных # 2. записываем ее в переменную schema и далее:df_schema = StructType.fromJson(schema)
  • Ну и с помощью полученной схемы читаем JSON-файлы

Звучит хорошо, если бы Давайте посмотрим на формат JSON-схемы, понятной Sparkу. Пусть имеем простой JSON из file2 выше. Посмотреть его схему в формате JSON можно, выполнив:

df.schema.json()  
Получившаяся схема
{    "fields":    [        {            "metadata": {},            "name": "dimensions",            "nullable": true,            "type":            {                "fields":                [                    {"metadata":{},"name":"height","nullable":true,"type":"double"},                    {"metadata":{},"name":"length","nullable":true,"type":"long"},                    {"metadata":{},"name":"width","nullable":true,"type":"long"}                ],                "type": "struct"            }        },        {            "metadata": {},            "name": "price",            "nullable": true,            "type": "double"        },        {            "metadata": {},            "name": "productId",            "nullable": true,            "type": "long"        },        {            "metadata": {},            "name": "productName",            "nullable": true,            "type": "string"        },        {            "metadata": {},            "name": "tags",            "nullable": true,            "type":            {                "containsNull": true,                "elementType": "string",                "type": "array"            }        }    ],    "type": "struct"}

Как видно, это совсем не стандартный формат JSON-схемы.

Но мы же наверняка не первые, у кого встала задача конвертировать стандартную JSON-схему в формат, понятный Sparkу - подумали мы и В принципе, не ошиблись, но несколько часов поиска упорно выводили исключительно на примеры, из серии:

как сохранить схему уже прочитанного DataFrame в JSON, затем использовать повторно

либо на репозиторий https://github.com/zalando-incubator/spark-json-schema, который нам бы подошел, если мы использовали Scala, а не pySpark

В общем, на горизонте маячила перспектива писать SchemaConverter. Сперва решили отделаться малой кровью и написать простенький конвертер никаких ссылок и прочих сложных конструкций. Конвертер был успешно протестирован на синтетических данных, после чего захотелось скормить ему что-то приближенное к реальности.

К счастью, у нас уже был один источник, генерирующий данные в формате JSON. Как временное решение схема его интеграции в DataPlatform была незамысловата: NiFi читал данные из Kafka, преобразовывал их в parquet, использую прибитую гвоздями в NiFi схему в формате AVRO-schema, и складывал на S3. Схема данных была действительно непростой и с кучей вложенных структур и нескольких десятков полей - неплохой тест-кейс в общем-то:

Посмотреть длинную портянку, если кому интересно :)
root |-- taskId: string (nullable = true) |-- extOrderId: string (nullable = true) |-- taskStatus: string (nullable = true) |-- taskControlStatus: string (nullable = true) |-- documentVersion: long (nullable = true) |-- buId: long (nullable = true) |-- storeId: long (nullable = true) |-- priority: string (nullable = true) |-- created: struct (nullable = true) |    |-- createdBy: string (nullable = true) |    |-- created: string (nullable = true) |-- lastUpdateInformation: struct (nullable = true) |    |-- updatedBy: string (nullable = true) |    |-- updated: string (nullable = true) |-- customerId: string (nullable = true) |-- employeeId: string (nullable = true) |-- pointOfGiveAway: struct (nullable = true) |    |-- selected: string (nullable = true) |    |-- available: array (nullable = true) |    |    |-- element: string (containsNull = true) |-- dateOfGiveAway: string (nullable = true) |-- dateOfGiveAwayEnd: string (nullable = true) |-- pickingDeadline: string (nullable = true) |-- storageLocation: string (nullable = true) |-- currentStorageLocations: array (nullable = true) |    |-- element: string (containsNull = true) |-- customerType: string (nullable = true) |-- comment: string (nullable = true) |-- totalAmount: double (nullable = true) |-- currency: string (nullable = true) |-- stockDecrease: boolean (nullable = true) |-- offline: boolean (nullable = true) |-- trackId: string (nullable = true) |-- transportationType: string (nullable = true) |-- stockRebook: boolean (nullable = true) |-- notificationStatus: string (nullable = true) |-- lines: array (nullable = true) |    |-- element: struct (containsNull = true) |    |    |-- lineId: string (nullable = true) |    |    |-- extOrderLineId: string (nullable = true) |    |    |-- productId: string (nullable = true) |    |    |-- lineStatus: string (nullable = true) |    |    |-- lineControlStatus: string (nullable = true) |    |    |-- orderedQuantity: double (nullable = true) |    |    |-- confirmedQuantity: double (nullable = true) |    |    |-- assignedQuantity: double (nullable = true) |    |    |-- pickedQuantity: double (nullable = true) |    |    |-- controlledQuantity: double (nullable = true) |    |    |-- allowedForGiveAwayQuantity: double (nullable = true) |    |    |-- givenAwayQuantity: double (nullable = true) |    |    |-- returnedQuantity: double (nullable = true) |    |    |-- sellingScheme: string (nullable = true) |    |    |-- stockSource: string (nullable = true) |    |    |-- productPrice: double (nullable = true) |    |    |-- lineAmount: double (nullable = true) |    |    |-- currency: string (nullable = true) |    |    |-- markingFlag: string (nullable = true) |    |    |-- operations: array (nullable = true) |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |-- operationId: string (nullable = true) |    |    |    |    |-- type: string (nullable = true) |    |    |    |    |-- reason: string (nullable = true) |    |    |    |    |-- quantity: double (nullable = true) |    |    |    |    |-- dmCodes: array (nullable = true) |    |    |    |    |    |-- element: string (containsNull = true) |    |    |    |    |-- timeStamp: string (nullable = true) |    |    |    |    |-- updatedBy: string (nullable = true) |    |    |-- source: array (nullable = true) |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |-- type: string (nullable = true) |    |    |    |    |-- items: array (nullable = true) |    |    |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |    |    |-- assignedQuantity: double (nullable = true) |-- linkedObjects: array (nullable = true) |    |-- element: struct (containsNull = true) |    |    |-- objectType: string (nullable = true) |    |    |-- objectId: string (nullable = true) |    |    |-- objectStatus: string (nullable = true) |    |    |-- objectLines: array (nullable = true) |    |    |    |-- element: struct (containsNull = true) |    |    |    |    |-- objectLineId: string (nullable = true) |    |    |    |    |-- taskLineId: string (nullable = true)

Естественно, я не захотел перебивать руками захардкоженную схему, а воспользовался одним из многочисленных онлайн-конвертеров, позволяющих из Avro-схемы сделать JSON-схему. И тут меня ждал неприятный сюрприз: все перепробованные мною конвертеры на выходе использовали гораздо больше синтаксических конструкций, чем понимала первая версия конвертера. Дополнительно пришло осознание, что также как и я, наши пользователи (а для нас пользователями в данном контексте являются владельцы источников данных) с большой вероятностью могут использовать подобные конвертеры для того, чтобы получить JSON-схему, которую надо зарегистрировать в Kafka Schema Registry, из того, что у них есть.

В результате наш SparkJsonSchemaConverter был доработан появилась поддержка более сложных конструкций, таких как definitions, refs (только внутренние) и oneOf. Сам же парсер был оформлен уже в отдельный класс, который сразу собирал на основании JSON-схемы объект pyspark.sql.types.StructType

У нас почти сразу же родилась мысль, что хорошо бы было поделиться им с сообществом, так как мы в Леруа Мерлен сами активно используем продукты Open Source, созданные и поддерживаемые энтузиастами со всего мира, и хотим не просто их использовать, но и контрибьютить обратно, участвуя в разработке Open Source продуктов и поддерживая сообщество. В настоящий момент мы решаем внутренние орг.вопросы по схеме выкладывания данного конвертера в Open Source и, уверен, что в ближайшее время поделимся с сообществом этой наработкой!

В итоге благодаря написанному SparkJsonSchemaConverterу доработка шага Parsing свелась только к небольшому тюнингу чтения данных с S3: в зависимости от формата входных данных источника (получаем из сервиса метаданных) читаем файлы с S3 немного по-разному:

# Для JSONdf = spark.read.format(in_format)\            .option("multiline", "false")\            .schema(json_schema) \            .load(path)# Для parquet:df = spark.read.format(in_format)\            .load(path)

А дальше отрабатывает уже существующий код, раскрывающий все вложенные структуры согласно маппингу и сохраняющий данные DataFrameа в несколько плоских CSV-файлов.

В итоге мы смогли при относительном минимуме внесенных изменений в код текущего frameworkа добавить в него поддержку интеграции в нашу Data Platform JSON-источников данных. И результат нашей работы уже заметен:

  • Всего через месяц после внедрения доработки у нас на ПРОДе проинтегрировано 4 новых JSON-источника!

  • Все текущие интеграции даже не заметили расширения функционала frameworkа, а значит, мы точно ничего не сломали произведенной доработкой.

Источник: habr.com
К списку статей
Опубликовано: 16.06.2021 16:13:28
0

Сейчас читают

Комментариев (0)
Имя
Электронная почта

Блог компании леруа мерлен

Big data

Хранение данных

Хранилища данных

Data engineering

Spark

Json

Data platform

Data lake

Pyspark

Категории

Последние комментарии

© 2006-2021, personeltest.ru