Русский
Русский
English
Статистика
Реклама

Перевод Python API в Delta Lake простые и надежные операции Upsert и Delete

Приглашаем всех желающих на бесплатный демо-урок, в рамках которого рассмотрим Ni-Fi и роль data ingestion инструментов в целом при построении систем обработки данных. А также решим простую задачку по построению пайплайна для загрузки файлов в хранилище данных с использованием Ni-Fi. Урок проведет эксперт OTUS - Егор Матешук.


А прямо сейчас традиционно делимся полезным переводом.


Delta Lake 0.4.0 включает Python API и преобразование Parquet в таблицу Delta Lake на месте

Мы рады объявить о релизе Delta Lake 0.4.0, в котором представлен Python API, улучшающий манипулирование и управление данными в Delta-таблицах. Ключевыми фичами этого релиза являются:

  • Convert-to-Delta (#78) - теперь вы можете преобразовать таблицу Parquet в таблицу Delta Lake на месте без перезаписи каких-либо данных. Эта функция отлично подходит для преобразования очень больших таблиц Parquet, которые было бы довольно затратно перезаписывать в Delta-таблицу. Более того, этот процесс обратим - вы можете преобразовать таблицу Parquet в таблицу Delta Lake, поработать с ней (например, удалить или объединить) и легко преобразовать ее обратно в таблицу Parquet. Для получения более подробной информации читайте документацию.

  • SQL для служебных операций - теперь вы можете использовать SQL для выполнения служебных операций vacuum и history. Смотрите документацию для получения дополнительных сведений о том, как настроить Spark для выполнения этих специфичных для Delta Lake команд SQL.

Больше информации вы можете найти в примечаниях к релизу Delta Lake 0.4.0 и в документации по Delta Lake > Удаление, обновление и слияние таблиц.

В этой статье мы продемонстрируем как использовать Python и новый Python API в Delta Lake 0.4.0 в контексте данных о вылетах и задержках рейсов на Apache Spark 2.4.3. Мы продемонстрируем, как производить операции upsert и delete, запрашивать старые версии данных с помощью механизма путешествия во времени (time travel) и подчищать старые версии данных с помощью vacuum.

Начало работы с Delta Lake

Пакет Delta Lake доступен через параметр в --packages. В нашем примере мы также продемонстрируем возможность делать VACUUM файлов и выполнять Delta Lake SQL команды в Apache Spark. Поскольку это короткая демонстрация, мы также подключим следующие конфигурации:

  • spark.databricks.delta.retentionDurationCheck.enabled=false, что позволит нам производить vacuum файлов, срок хранения которых меньше установленного по умолчанию срока в 7 дней. Эта настройка нужна только для команды SQL VACUUM.

  • spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension для подключения Delta Lake SQL команд в Apache Spark; для вызовов Python API или Scala этого не требуется.

# Подключение пакета в Spark./bin/pyspark --packages io.delta:delta-core2.11:0.4.0 --conf "spark.databricks.delta.retentionDurationCheck.enabled=false" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"

Загрузка и сохранение наших Delta Lake данных

В этой статье мы будем использовать набор летных данных о времени вылетов или задержках рейсов, сгенерированный на основе статистики вылетов RITA BTS; некоторые примеры этих данных в действии можно посмотреть здесь - Данные о вылетах рейсов за 2014 в d3.js Crossfilter, и здесь - Данные о вылетах и задержках рейсов в GraphFrames для Apache Spark. Этот набор данных можно загрузить с этого репозитория на github. Начнем с чтения набора данных в pyspark.

# Переменные местонахожденияtripdelaysFilePath = "/root/data/departuredelays.csv"pathToEventsTable = "/root/deltalake/departureDelays.delta"# Чтение данных о задержках рейсовdepartureDelays = spark.read \.option("header", "true") \.option("inferSchema", "true") \.csv(tripdelaysFilePath)

Затем давайте сохраним наш набор данных departureDelays в таблицу Delta Lake. Сохранив эту таблицу в хранилище Delta Lake, мы сможем воспользоваться его функциями, которые включают ACID транзакции, унифицированную пакетную обработку и потоковую передачу, а также путешествия во времени.

# Сохраняем данные о задержках рейсов в формате Delta LakedepartureDelays \.write \.format("delta") \.mode("overwrite") \.save("departureDelays.delta")

Обратите внимание, этот подход аналогичен обычному сохранению данных Parquet; вместо указания format("parquet") вы просто указываете format("delta"). Если вы заглянете в базовую файловую систему, вы заметите четыре файла, созданных для таблицы Delta Lake departureDelays.

/departureDelays.delta$ ls -l..._delta_logpart-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquetpart-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquetpart-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquetpart-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet

Отметим, что _delta_log - папка, которая содержит лог транзакций Delta Lake. Для получения более подробной информации читайте Погружение в Delta Lake: распаковка лога транзакций.

Теперь давайте перезагрузим данные, но на этот раз наш DataFrame будет поддерживаться Delta Lake.

# Загружаем данные о задержках рейсов в формате Delta Lakedelays_delta = spark \.read \.format("delta") \.load("departureDelays.delta")# Создаем временное представлениеdelays_delta.createOrReplaceTempView("delays_delta") # Сколько рейсов между Сиэтлом и Сан-Францискоspark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

Наконец, давайте определим количество рейсов из Сиэтла в Сан-Франциско; в этом наборе данных таких рейсов 1698.

Преобразование на месте в Delta Lake

Если у вас есть уже существующие таблицы Parquet, у вас появилась возможность выполнять преобразование ваших таблиц в Delta Lake на месте, т.е. вам не нужно переписывать таблицу. Чтобы преобразовать таблицу, вам достаточно выполнить следующие команды.

from delta.tables import *# Преобразовать не секционированную таблицу parquet по пути '/path/to/table'deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")# Преобразовать секционированную таблицу parquet по пути '/path/to/table', секционированную целочисленным столбцом с именем 'part'partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")

Для получения более подробной информации, в том числе и о том, как сделать это преобразование в Scala и SQL, читайте Преобразование в Delta Lake.

Удаление наших летных данных

Чтобы удалить данные из традиционной Data Lake таблицы, вам необходимо:

  1. Выбрать все данные из вашей таблицы, за исключением строк, которые вы хотите удалить.

  2. Создать новую таблицу на основе предыдущего запроса.

  3. Удалить исходную таблицу.

  4. Назвать новую таблицу именем исходной таблицы для нисходящих зависимостей.

Вместо выполнения всех этих шагов в Delta Lake мы можем упростить этот процесс, выполнив оператор DELETE. Чтобы продемонстрировать это, давайте удалим все рейсы, которые прибыли вовремя или раньше назначенного времени (т.е. delay < 0).

from delta.tables import *from pyspark.sql.functions import *# Доступ к таблице Delta LakedeltaTable = DeltaTable.forPath(spark, pathToEventsTable)# Удаляем все рейсы пришедшие раньше или вовремяdeltaTable.delete("delay < 0") # Сколько рейсов между Сиэтлом и Сан-Францискоspark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

После удаления (подробнее об этом ниже) всех рейсов пришедших вовремя или раньше, как вы можете видеть из предыдущего запроса, осталось 837 опоздавших рейсов из Сиэтла в Сан-Франциско. Если вы заглянете в файловую систему, вы заметите, что файлов стало больше, даже если вы удалили данные.

/departureDelays.delta$ ls -l_delta_logpart-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquetpart-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquetpart-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquetpart-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquetpart-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquetpart-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquetpart-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquetpart-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet

В традиционных озерах данных удаление (операция delete) выполняется путем перезаписи всей таблицы, за исключением значений, которые должны быть удалены. В Delta Lake вместо этого удаление выполняется путем выборочной записи новых версий файлов, содержащих удаляемые данные, а предыдущие файлы только помечаются как удаленные. Это связано с тем, что для выполнения атомарных операций с таблицей Delta Lake использует управление параллельным доступом с помощью многоверсионности: например, пока один пользователь удаляет данные, другой пользователь может запрашивать предыдущую версию таблицы. Эта многоверсионная модель также позволяет нам путешествовать назад во времени (механизм путешествия во времени) и запрашивать предыдущие версии, как мы увидим позже.

Обновление наших летных данных

Чтобы обновить данные из вашей традиционной таблицы Data Lake, вам необходимо:

  1. Выбрать все данные из вашей таблицы, за исключением строк, которые вы хотите изменить.

  2. Изменить строки, которые необходимо обновить/изменить.

  3. Объединить эти две таблицы, чтобы создать новую таблицу.

  4. Удалить исходную таблицу.

  5. Назвать новую таблицу именем исходной таблицы для нисходящих зависимостей.

Вместо выполнения всех этих шагов в Delta Lake мы можем упростить этот процесс, выполнив оператор UPDATE. Чтобы продемонстрировать это, давайте обновим информацию обо всех рейсах из Детройта, заменив Детройт на Сиэтл.

# Обновляем все рейсы из Детройта, чтобы теперь они стали из СиэтлаdeltaTable.update("origin = 'DTW'", { "origin": "'SEA'" } ) # Сколько рейсов между Сиэтлом и Сан-Францискоspark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

Теперь, когда рейсы из Детройта теперь помечены как рейсы Сиэтла, у нас насчитывается 986 рейсов, вылетающих из Сиэтла в Сан-Франциско. Если выведете содержимое папки файловой системы departureDelays (например, $../departureDelays/ls -l), вы заметите, что теперь там 11 файлов (вместо 8 сразу после удаления файлов и четырех файлов после создания таблицы).

Объединение наших летных данных

Распространенный сценарий при работе с озером данных - это постоянное добавление данных в вашу таблицу. Это часто приводит к дублированию данных (строки, которые вы бы не хотели снова вставлять в таблицу), появлению новых строк, которые необходимо вставить, и строк, которые необходимо обновить. В Delta Lake все это можно решить с помощью операции merge (аналогично оператору SQL MERGE).

Начнем с выборочного набора данных, который вы хотите обновить, вставить или дедуплицировать с помощью следующего запроса.

# Какие рейсы между Сиэтлом (SEA) и Сан-Франциско (SFO) приходятся на эти датыspark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()

Результат этого запроса приведен таблице ниже. Обратите внимание, что для наглядности были добавлены цветные выделения, чтобы четко определить, какие строки дедуплицированы (синий), обновлены (желтый) и вставлены (зеленый).

Затем давайте сгенерируем нашу собственную merge_table, содержащую данные, которые мы будем вставлять, обновлять или дедуплицировать с помощью следующего фрагмента кода.

items = [(1010710, 31, 590, 'SEA', 'SFO'), (1010521, 10, 590, 'SEA', 'SFO'), (1010822, 31, 590, 'SEA', 'SFO')]cols = ['date', 'delay', 'distance', 'origin', 'destination']merge_table = spark.createDataFrame(items, cols)merge_table.toPandas()

В предыдущей таблице (merge_table) есть три строки, с уникальным значением даты:

  1. 1010521: эта строка обновит таблицу flights новым значением задержки (желтый)

  2. 1010710: эта строка является дубликатом (синий)

  3. 1010822: это новая строка будет вставлена (зеленый)

В Delta Lake все это можно легко сделать с помощью оператора merge, как указано в следующем фрагменте кода.

# Объединяем merge_table с flightsdeltaTable.alias("flights") \    .merge(merge_table.alias("updates"),"flights.date = updates.date") \    .whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \    .whenNotMatchedInsertAll() \    .execute()# Какие рейсы между Сиэтлом (SEA) и Сан-Франциско (SFO) приходятся на эти датыspark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()

Все три действия: дедупликация, обновление и вставка были эффективно выполнены с помощью одного оператора.

Просмотр истории таблицы

Как уже отмечалось ранее, после каждой нашей транзакции (удаление, обновление) в файловой системе создавалось все больше файлов. Это связано с тем, что для каждой транзакции существуют разные версии таблицы Delta Lake. Это можно увидеть с помощью метода DeltaTable.history(), как показано ниже.

deltaTable.history().show()+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+|      2|2019-09-29 15:41:22|  null|    null|   UPDATE|[predicate -> (or...|null|    null|     null|          1|          null|        false||      1|2019-09-29 15:40:45|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          0|          null|        false||      0|2019-09-29 15:40:14|  null|    null|    WRITE|[mode -> Overwrit...|null|    null|     null|       null|          null|        false|+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+

Вы также можете выполнить это с помощью SQL:

spark.sql("DESCRIBE HISTORY '" + pathToEventsTable + "'").show()

Как видите, есть три строки, представляющие различные версии таблицы (ниже представлена сокращенная версия для облегчения чтения) для каждой операции (создание таблицы, удаление и обновление):

version

timestamp

operation

operationParameters

2

2019-09-29 15:41:22

UPDATE

[predicate -> (or

1

2019-09-29 15:40:45

DELETE

[predicate -> [(

0

2019-09-29 15:40:14

WRITE

[mode -> Overwrit

Путешествие назад во времени по истории таблицы

С помощью механизма путешествия во времени вы можете просмотреть таблицу Delta Lake указанной версии или для определенного таймстемпа. Для получения более подробной информации читайте Документацию Delta Lake > Чтение старых версии данных с помощью Time Travel. Чтобы просмотреть старые данные, укажите опцию version или Timestamp; в фрагменте кода ниже, мы указываем version

# Загружаем DataFrames для каждой версииdfv0 = spark.read.format("delta").option("versionAsOf", 0).load("departureDelays.delta")dfv1 = spark.read.format("delta").option("versionAsOf", 1).load("departureDelays.delta")dfv2 = spark.read.format("delta").option("versionAsOf", 2).load("departureDelays.delta")# Рассчитываем количество полетов от Сиэтла (SEA) до Сан-Франциско (SFO) для каждой версии историиcnt0 = dfv0.where("origin = 'SEA'").where("destination = 'SFO'").count()cnt1 = dfv1.where("origin = 'SEA'").where("destination = 'SFO'").count()cnt2 = dfv2.where("origin = 'SEA'").where("destination = 'SFO'").count()# Выводим значениеprint("SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s" % (cnt0, cnt1, cnt2))## ВыводSEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986

Очистка старых версий таблиц с помощью Vacuum

Метод vacuum Delta Lake по умолчанию удаляет все строки (и файлы) старше 7 дней (референс: Delta Lake Vacuum). Если бы вы заглянули в файловую систему, вы бы заметили 11 файлов для своей таблицы.

/departureDelays.delta$ ls -l_delta_logpart-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquetpart-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquetpart-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquetpart-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquetpart-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquetpart-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquetpart-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquetpart-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquetpart-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquetpart-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquetpart-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet

Чтобы удалить все файлы и сохранить только текущий снапшот данных, укажите очень маленькое значение в методе vacuum (вместо 7 дней хранения по умолчанию).

Вы можете выполнить ту же задачу с помощью SQL синтаксиса: # Удаляем все файлы старше 0 часовspark.sql(VACUUM  + pathToEventsTable +  RETAIN 0 HOURS)

После завершения операции vacuum, при просмотре файловой системы вы заметите намного меньше файлов, так как старые данные были удалены.

/departureDelays.delta$ ls -l_delta_logpart-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquetpart-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquetpart-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquetpart-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet

Обратите внимание, что возможность переместиться во времени обратно к версии старше периода хранения теряется после запуска vacuum.

Что дальше?

Попробуйте Delta Lake уже сегодня, запустив приведенные выше фрагменты кода на своем инстансе Apache Spark 2.4.3 (или выше). Используя Delta Lake, вы можете сделать свои озера данных более надежными (независимо от того, создаете ли вы новое или мигрируете существующее озеро данных). Чтобы узнать больше, посетите https://delta.io/ и присоединитесь к сообществу Delta Lake в Slack и Google Group. Вы можете отслеживать все предстоящие релизы и запланированные фичи в github milestones.


Записаться на бесплатный демо-урок.


Читать ещё:

Источник: habr.com
К списку статей
Опубликовано: 11.11.2020 20:10:19
0

Сейчас читают

Комментариев (0)
Имя
Электронная почта

Блог компании otus. онлайн-образование

Python

Data engineering

Delta lake

Python api

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru