Всем привет!
В недавней статье мы рассказали, как мы шли к построению
нашей Data Platform. Сегодня хотелось бы глубже погрузиться в
желудок нашей платформы и попутно рассказать вам о том, как мы
решали одну из задач, которая возникла в связи с ростом
разнообразия интегрируемых источников данных.
То есть, если возвращаться к финальной картинке из упомянутой
выше статьи (специально дублирую ее, чтобы уважаемым читателям было
удобнее), то сегодня мы будем более углубленно говорить о
реализации правой части схемы той, что лежит после Apache NiFi.
Схема из
прошлой нашей статьи.
Напомним, что в нашей компании более 350 реляционных баз данных.
Естественно, не все они уникальны и многие представляют собой по
сути разные экземпляры одной и той же системы, установленной во
всех магазинах торговой сети, но все же зоопарк разнообразия
присутствует. Поэтому без какого-либо Frameworkа, упрощающего и
ускоряющего интеграцию источников в Data Platform, не обойтись.
Общая схема доставки данных из источников в ODS-слой Greenplum
посредством разработанного нами frameworkа приведена ниже:
Общая схема доставки данных в ODS-слой Greenplum
-
Данные из систем-источников пишутся в Kafka в AVRO-формате,
обрабатываются в режиме реального времени Apache NiFi, который
сохраняет их в формате parquet на S3.
-
Затем эти файлы с сырыми данными с помощью Sparkа обрабатываются
в два этапа:
-
Compaction на данном этапе выполняется объединение для снижения
количества выходных файлов с целью оптимизации записи и
последующего чтения (то есть несколько более мелких файлов
объединяются в несколько файлов побольше), а также производится
дедубликация данных: простой distinct() и затем
coalesce(). Результат сохраняется на S3. Эти файлы
используются затем для parsing'а , а также являются своеобразным
архивом сырых необработанных данных в формате как есть;
-
Parsing на этой фазе производится разбор входных данных и
сохранение их в плоские структуры согласно маппингу, описанному в
метаданных. В общем случае из одного входного файла можно получить
на выходе несколько плоских структур, которые в виде сжатых (как
правило gzip) CSV-файлов сохраняются на S3.
-
Заключительный этап загрузка данных CSV-файлов в ODS-слой
хранилища: создается временная external table над данными в S3
через
PXF S3 connector, после чего данные уже простым pgsql
переливаются в таблицы ODS-слоя Greenplum
-
Все это оркестрируется с помощью Airflow.
DAGи для Airflow у нас генерируются динамически на основании
метаданных. Parsing файлов и разложение их в плоские структуры
также производится с использованием метаданных. Это приводит к
упрощению интеграции нового источника, так как, для этого
необходимо всего лишь:
До недавнего времени такой подход удовлетворял текущие наши
потребности, но количество и разнообразие источников данных растет.
У нас стали появляться источники, которые не являются реляционными
базами данных, а генерируют данные в виде потока JSON-объектов.
Кроме того на горизонте уже маячила интеграция источника, который
под собой имел MongoDB и поэтому будет использовать MongoDB Kafka
source connector для записи данных в Kafka. Поэтому остро встала
необходимость доработки нашего frameworkа для поддержки такого
сценария. Хотелось, чтобы данные источника сразу попадали на S3 в
формате JSON - то есть в формате "как есть", без лишнего шага
конвертации в parquet посредством Apache NiFi.
В первую очередь необходимо было доработать шаг Compaction. Его
код, если убрать всю обвязку и выделить только главное, очень
простой:
df = spark.read.format(in_format) \ .options(**in_options) \ .load(path) \ .distinct() new_df = df.coalesce(div)new_df.write.mode("overwrite") \ .format(out_format) \ .options(**out_options) \ .save(path)
Но если мы проделаем все те же манипуляции с JSON-данными, то
волей-неволей внесем изменения во входные данные, так как при
чтении JSONов Spark автоматом определит и сделает mergeSchema, т.е.
мы тем самым можем исказить входные данные, чего не хотелось бы.
Ведь наша задача на этом шаге только укрупнить файлы и
дедублицировать данные, без какого-либо вмешательства в их
структуру и наполнение. То есть сохранить их как есть.
По-хорошему, нам надо было просто прочитать данные как обычный
текст, дедублицировать строки, укрупнить файлы и положить обратно
на S3. Для этого был предложен достаточно изящный способ:
рассматривать файлы с JSON-объектами как DataFrame с одной
колонкой, содержащей весь JSON-объект.
Попробуем сделать это. Допустим, мы имеем следующий файл
данных:
file1:
{productId: 1, productName: ProductName 1, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}{productId: 2, price: 10.01, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}
Обратите внимание на формат этого файла. Это файл с
JSON-объектами, где 1 строка = 1 объект. Оставаясь, по сути,
JSON-ом, он при этом не пройдет синтаксическую JSON-валидацию.
Именно в таком виде мы сохраняем JSON-данные на S3 (есть
специальная "галочка в процессоре Apache NiFi).
Прочитаем файл предлагаемым способом:
# Читаем данныеdf = spark.read \ .format("csv") \ .option("sep", "\a") \ .load("file1.json")# Схема получившегося DataFramedf.printSchema()root |-- _c0: string (nullable = true)# Сами данныеdf.show()+--------------------+| _c0|+--------------------+|{"productId": 1, ...||{"productId": 2, ...|+--------------------+
То есть мы тут читаем JSON как обычный CSV, указывая
разделитель, который никогда заведомо не встретится в наших
данных. Например,
Bell character. В итоге мы получим DataFrame из одного поля, к
которому можно будет также применить dicstinct() и затем
coalesce(), то есть менять существующий код не
потребуется. Нам остается только определить опции в зависимости от
формата:
# Для parquetin_format = "parquet"in_options = {}# Для JSONin_format = "csv"in_options = {"sep": "\a"}
Ну и при сохранении этого же DataFrame обратно на S3 в
зависимости от формата данных опять применяем разные опции:
df.write.mode("overwrite") \ .format(out_format) \.options(**out_options) \ .save(path) # для JSON out_format = "text" out_options = {"compression": "gzip"} # для parquet out_format = input_format out_options = {"compression": "snappy"}
Следующей точкой доработки был шаг Parsing. В принципе, ничего
сложного, если бы задача при этом упиралась в одну маленькую
деталь: JSON -файл, в отличии от parquet, не содержит в себе схему
данных. Для разовой загрузки это не является проблемой, так как при
чтении JSON-файла Spark умеет сам определять схему, и даже в
случае, если файл содержит несколько JSON-объектов с немного
отличающимся набором полей, корректно выполнит mergeSchema. Но для
регулярного процесса мы не могли уповать на это. Банально может
случиться так, что во всех записях какого-то файла с данными может
не оказаться некоего поля field_1, так как, например, в
источнике оно заполняется не во всех случаях. Тогда в получившемся
Spark DataFrame вообще не окажется этого поля, и наш Parsing,
построенный на метаданных, просто-напросто упадет с ошибкой из-за
того, что не найдет прописанное в маппинге поле.
Проиллюстрирую. Допустим,у нас есть два файла из одного
источника со следующим наполнением:
file1 (тот же что и в примере выше):
{productId: 1, productName: ProductName 1, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}{productId: 2, price: 10.01, tags: [tag 1, tag 2], dimensions: {length: 10, width: 12, height: 12.5}}
file2:
{productId: 3, productName: ProductName 3, dimensions: {length: 10, width: 12, height: 12.5, package: [10, 20.5, 30]}}
Теперь прочитаем Sparkом их и посмотрим данные и схемы
получившихся DataFrame:
df = spark.read \ .format("json") \ .option("multiline", "false") \ .load(path)df.printSchema()df.show()
Первый файл (схема и данные):
root |-- dimensions: struct (nullable = true) | |-- height: double (nullable = true) | |-- length: long (nullable = true) | |-- width: long (nullable = true) |-- price: double (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true) |-- tags: array (nullable = true) | |-- element: string (containsNull = true)+--------------+-----+---------+-------------+--------------+| dimensions|price|productId| productName| tags|+--------------+-----+---------+-------------+--------------+|[12.5, 10, 12]| null| 1|ProductName 1|[tag 1, tag 2]||[12.5, 10, 12]|10.01| 2| null|[tag 1, tag 2]|+--------------+-----+---------+-------------+--------------+
Второй файл (схема и данные):
root |-- dimensions: struct (nullable = true) | |-- height: double (nullable = true) | |-- length: long (nullable = true) | |-- package: array (nullable = true) | | |-- element: double (containsNull = true) | |-- width: long (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true)+--------------------+---------+-------------+| dimensions|productId| productName|+--------------------+---------+-------------+|[12.5, 10, [10.0,...| 3|ProductName 3|+--------------------+---------+-------------+
Как видно, Spark корректно выстроил схему отдельно для каждого
файла. Если в какой-либо записи не было обнаружено поля, имеющегося
в другой, то в DataFrame мы видим корректное проставление null
(поля price и productName для первого файла).
Но в целом схемы получились разные, и если у нас в маппинге
прописано, что нам нужно распарсить эти данные (то есть оба файла)
в следующую плоскую структуру,
root |-- price: double (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true)
а во входных данных у нас присутствуют только файлы а-ля
file2, где поля price нет ни у одной записи, то
Spark упадет с ошибкой, так как не найдет поля price для
формирования выходного DataFrame. С parquet-файлами такой проблемы
как правило не возникает, так как сам parquet-файл генерируется из
AVRO, который уже содержит полную схему данных и,
соответственно, эта полная схема есть и в parquet-файле.
Еще надо отметить, что, естественно, мы хотели по максимум
переиспользовать уже существующий и зарекомендовавший себя код
нашего frameworkа, а не городить какую-то полностью отдельную ветку
для обработки JSONов то есть все изменения хотелось сделать на
этапе чтения JSON-файлов с S3.
Таким образом очевидно, что для корректной загрузки данных из
JSON-файлов необходимо предопределить схему JSON-файла с данными и
читать файлы уже с применением этой схемы. Тогда у нас даже если в
JSONе нет какого-то поля, то в самом DataFrame это поле будет, но
его значение подставится как null:
df = spark.read \ .format("json") \ .option("multiline","false") \ .schema(df_schema) \ .load(path)
Первая мысль была использовать для хранения схемы имеющийся
сервис метаданных - то есть описать схему в YAML-формате и
сохранить в имеющемся репозитории. Но с учетом того, что все данные
источников у нас проходят через Kafka, решили, что логично для
хранения схем использовать имеющийся Kafka Schema Registry, а схему
хранить в стандартном для
JSON формате (другой формат, кстати говоря, Kafka Schema
Registry не позволил бы хранить).
В общем, вырисовывалась следующая реализация:
# 1. получаем через Kafka Schema Registry REST API схему данных # 2. записываем ее в переменную schema и далее:df_schema = StructType.fromJson(schema)
Звучит хорошо, если бы Давайте посмотрим на формат JSON-схемы,
понятной Sparkу. Пусть имеем простой JSON из file2 выше.
Посмотреть его схему в формате JSON можно, выполнив:
df.schema.json()
Получившаяся схема
{ "fields": [ { "metadata": {}, "name": "dimensions", "nullable": true, "type": { "fields": [ {"metadata":{},"name":"height","nullable":true,"type":"double"}, {"metadata":{},"name":"length","nullable":true,"type":"long"}, {"metadata":{},"name":"width","nullable":true,"type":"long"} ], "type": "struct" } }, { "metadata": {}, "name": "price", "nullable": true, "type": "double" }, { "metadata": {}, "name": "productId", "nullable": true, "type": "long" }, { "metadata": {}, "name": "productName", "nullable": true, "type": "string" }, { "metadata": {}, "name": "tags", "nullable": true, "type": { "containsNull": true, "elementType": "string", "type": "array" } } ], "type": "struct"}
Как видно, это совсем не стандартный формат
JSON-схемы.
Но мы же наверняка не первые, у кого встала задача
конвертировать стандартную JSON-схему в формат, понятный Sparkу -
подумали мы и В принципе, не ошиблись, но несколько часов поиска
упорно выводили исключительно на примеры, из серии:
как сохранить схему уже прочитанного DataFrame в JSON, затем
использовать повторно
либо на репозиторий
https://github.com/zalando-incubator/spark-json-schema, который
нам бы подошел, если мы использовали Scala, а не pySpark
В общем, на горизонте маячила перспектива писать
SchemaConverter. Сперва решили отделаться малой кровью и написать
простенький конвертер никаких ссылок и прочих сложных конструкций.
Конвертер был успешно протестирован на синтетических данных, после
чего захотелось скормить ему что-то приближенное к реальности.
К счастью, у нас уже был один источник, генерирующий данные в
формате JSON. Как временное решение схема его интеграции в
DataPlatform была незамысловата: NiFi читал данные из Kafka,
преобразовывал их в parquet, использую прибитую гвоздями в NiFi
схему в формате AVRO-schema, и складывал на S3. Схема данных была
действительно непростой и с кучей вложенных структур и нескольких
десятков полей - неплохой тест-кейс в общем-то:
Посмотреть длинную портянку, если кому интересно :)
root |-- taskId: string (nullable = true) |-- extOrderId: string (nullable = true) |-- taskStatus: string (nullable = true) |-- taskControlStatus: string (nullable = true) |-- documentVersion: long (nullable = true) |-- buId: long (nullable = true) |-- storeId: long (nullable = true) |-- priority: string (nullable = true) |-- created: struct (nullable = true) | |-- createdBy: string (nullable = true) | |-- created: string (nullable = true) |-- lastUpdateInformation: struct (nullable = true) | |-- updatedBy: string (nullable = true) | |-- updated: string (nullable = true) |-- customerId: string (nullable = true) |-- employeeId: string (nullable = true) |-- pointOfGiveAway: struct (nullable = true) | |-- selected: string (nullable = true) | |-- available: array (nullable = true) | | |-- element: string (containsNull = true) |-- dateOfGiveAway: string (nullable = true) |-- dateOfGiveAwayEnd: string (nullable = true) |-- pickingDeadline: string (nullable = true) |-- storageLocation: string (nullable = true) |-- currentStorageLocations: array (nullable = true) | |-- element: string (containsNull = true) |-- customerType: string (nullable = true) |-- comment: string (nullable = true) |-- totalAmount: double (nullable = true) |-- currency: string (nullable = true) |-- stockDecrease: boolean (nullable = true) |-- offline: boolean (nullable = true) |-- trackId: string (nullable = true) |-- transportationType: string (nullable = true) |-- stockRebook: boolean (nullable = true) |-- notificationStatus: string (nullable = true) |-- lines: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- lineId: string (nullable = true) | | |-- extOrderLineId: string (nullable = true) | | |-- productId: string (nullable = true) | | |-- lineStatus: string (nullable = true) | | |-- lineControlStatus: string (nullable = true) | | |-- orderedQuantity: double (nullable = true) | | |-- confirmedQuantity: double (nullable = true) | | |-- assignedQuantity: double (nullable = true) | | |-- pickedQuantity: double (nullable = true) | | |-- controlledQuantity: double (nullable = true) | | |-- allowedForGiveAwayQuantity: double (nullable = true) | | |-- givenAwayQuantity: double (nullable = true) | | |-- returnedQuantity: double (nullable = true) | | |-- sellingScheme: string (nullable = true) | | |-- stockSource: string (nullable = true) | | |-- productPrice: double (nullable = true) | | |-- lineAmount: double (nullable = true) | | |-- currency: string (nullable = true) | | |-- markingFlag: string (nullable = true) | | |-- operations: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- operationId: string (nullable = true) | | | | |-- type: string (nullable = true) | | | | |-- reason: string (nullable = true) | | | | |-- quantity: double (nullable = true) | | | | |-- dmCodes: array (nullable = true) | | | | | |-- element: string (containsNull = true) | | | | |-- timeStamp: string (nullable = true) | | | | |-- updatedBy: string (nullable = true) | | |-- source: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- type: string (nullable = true) | | | | |-- items: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- assignedQuantity: double (nullable = true) |-- linkedObjects: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- objectType: string (nullable = true) | | |-- objectId: string (nullable = true) | | |-- objectStatus: string (nullable = true) | | |-- objectLines: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- objectLineId: string (nullable = true) | | | | |-- taskLineId: string (nullable = true)
Естественно, я не захотел перебивать руками захардкоженную
схему, а воспользовался одним из многочисленных онлайн-конвертеров,
позволяющих из Avro-схемы сделать JSON-схему. И тут меня ждал
неприятный сюрприз: все перепробованные мною конвертеры на выходе
использовали гораздо больше синтаксических конструкций, чем
понимала первая версия конвертера. Дополнительно пришло осознание,
что также как и я, наши пользователи (а для нас пользователями в
данном контексте являются владельцы источников данных) с большой
вероятностью могут использовать подобные конвертеры для того, чтобы
получить JSON-схему, которую надо зарегистрировать в Kafka Schema
Registry, из того, что у них есть.
В результате наш SparkJsonSchemaConverter был доработан
появилась поддержка более сложных конструкций, таких как
definitions, refs (только внутренние) и
oneOf. Сам же парсер был оформлен уже в отдельный класс,
который сразу собирал на основании JSON-схемы объект
pyspark.sql.types.StructType
У нас почти сразу же родилась мысль, что хорошо бы было
поделиться им с сообществом, так как мы в Леруа Мерлен сами активно
используем продукты Open Source, созданные и поддерживаемые
энтузиастами со всего мира, и хотим не просто их использовать, но и
контрибьютить обратно, участвуя в разработке Open Source продуктов
и поддерживая сообщество. В настоящий момент мы решаем внутренние
орг.вопросы по схеме выкладывания данного конвертера в Open Source
и, уверен, что в ближайшее время поделимся с сообществом этой
наработкой!
В итоге благодаря написанному SparkJsonSchemaConverterу
доработка шага Parsing свелась только к небольшому тюнингу чтения
данных с S3: в зависимости от формата входных данных источника
(получаем из сервиса метаданных) читаем файлы с S3 немного
по-разному:
# Для JSONdf = spark.read.format(in_format)\ .option("multiline", "false")\ .schema(json_schema) \ .load(path)# Для parquet:df = spark.read.format(in_format)\ .load(path)
А дальше отрабатывает уже существующий код, раскрывающий все
вложенные структуры согласно маппингу и сохраняющий данные
DataFrameа в несколько плоских CSV-файлов.
В итоге мы смогли при относительном минимуме внесенных изменений
в код текущего frameworkа добавить в него поддержку интеграции в
нашу Data Platform JSON-источников данных. И результат нашей работы
уже заметен:
-
Всего через месяц после внедрения доработки у нас на ПРОДе
проинтегрировано 4 новых JSON-источника!
-
Все текущие интеграции даже не заметили расширения функционала
frameworkа, а значит, мы точно ничего не сломали произведенной
доработкой.