Русский
Русский
English
Статистика
Реклама

Dbt

Работа с dbt на базе Google BigQuery

11.02.2021 18:04:31 | Автор: admin

На днях смотрел вебинар OWOX, где Андрей Осипов (веб-аналитик, автор блога web-analytics.me и лектор Школы веб-аналитики Андрея Осипова) рассказал о своем опыте использования dbt. Говорил о том, кому будет полезен инструмент и какие проблемы решает, а самое главное как не свихнуться со сложной иерархией таблиц и быть уверенным, что все данные считаются корректно. Я решил расшифровать вебинар в статью, потому что так удобнее возвращаться к информации, а она тут, поверьте, того стоит.

Зачем нужен dbt

Зачем нужен еще один инструмент для управления SQL-запросами? Ведь у нас есть Google BigQuery, и вообще в Google Cloud много различных механизмов, которые могут решать задачи по формированию таблиц, например scheduled queries.

Если у вас маленький проект, пара источников данных (таблица с событиями плюс расходы) и вам нужно построить 2-4 отчета, то структура расчета этих таблиц будет довольно простой.

Но если мы говорим о реальном проекте, то все может выглядеть как на картинке ниже.

Здесь мы видим большое количество таблиц с исходными данными. Это могут быть данные из рекламных систем, транзакции из CRM, информация о пользователях, расширенные данные о продуктах, о ваших мерчантах и другие данные, которые попадают в Google BigQuery разными путями.

Первая задача, которая возникает в таком случае автоматизация отчетности. Нужно все данные собрать, посчитать, проверить, валидировать и построить отчеты, который отвечали бы на ваши вопросы.

Чтобы это сделать, необходимо связать исходные таблицы с данными между собой. То есть финальная таблица формируется из десятка других, которые, в свою очередь из третьих.

При такой сложной структуре стандартные механизмы управления (типа scheduled queries) не подходят по нескольким причинам:

  1. Зависимости. Каждый scheduled query нужно запускать в определенное время, то есть у вас должна быть четкая иерархия запуска расчета таблиц. Нельзя, чтобы результирующая таблица считалась раньше, чем исходная. К примеру, в 5 утра у вас формируется отчет. После этого происходит какой-то сбой, и только в 7 утра данные о нем попадают в исходную таблицу. В результате вы теряете данные за вчера и нужно пересчитывать весь проект. А если у вас десятка два-три запросов, то пересчет займет минут 40.

  2. Отсутствие документации. В теории можно в том же Google Docs описывать каждую таблицу, правила ее формирования, поля и т.д. Но это занимает много времени, никак не автоматизировано и по факту мало кто это делает. В итоге у вас есть большая сложная структура и совершенно непонятно, как она работает. При возникновении ошибки будет сложно найти ее причину.

  3. Тестирование. Данные, которые попадают в исходные таблицы, не всегда корректны, а в Google BigQuery нет встроенных инструментов для их тестирования. Например, у вас была таблица с заказами. Потом в нее добавили рейтинг, который пользователи могут менять. Из-за этого могут дублироваться транзакции. То есть в таблице будет две записи с одинаковой транзакцией, но с разными timestamp и рейтингами.

  4. Много одинаковых кусков SQL. Когда у вас несколько финальных таблиц, которые генерируются из базовых, одни и те же агрегации происходят много раз. Это сильно увеличивает стоимость использования BigQuery. На нашем вебинаре Андрей Осипов поделился примером, как с помощью dbt ему удалось снизить стоимость использования GBQ в 20 раз для одного из проектов, с которым он работал. Только за счет того, что обращение к базовой таблице с событиями происходило всего один раз.

Для решения описанных проблем есть большое количество похожих инструментов. В этой статье мы рассмотрим, как с ними справляется dbt.

Что такое dbt (data build tool)

Инструментов для управления SQL довольно много, все они похожи. Почему стоит выбрать dbt? Во-первых, о нем больше всего информации, во-вторых, он очень активно развивается, постоянно добавляется новый функционал. На наш взгляд, из всех доступных инструментов он самый юзабельный и универсальный.

Как выглядит процесс обработки данных:

Схема из презентации dbt.

На схеме мы видим:

  1. Источники данных.

  2. Сервисы, которые достают данные из этих источников и складывают в хранилище. Это могут быть Cloud Functions, Cloud Run, OWOX BI Pipeline и др.

  3. Хранилище, куда мы складываем данные и где потом их нужно преобразовать: объединить с другими данными, проверить, валидировать.

  4. После обработки мы можем отправлять данные в инструменты визуализации, BI-системы или использовать как-либо еще.

dbt это инструмент, который трансформирует сырые исходные данные, проверяет и валидирует. Он делает это на базе собственной логики, а не на базе scheduled queries.

Структура dbt

Фактически dbt состоит из двух сущностей: модель и файл конфигурации.

Модель это сам запрос, то есть отдельный файл, на базе которого будут формироваться view или table.

Модель (.sql) единица трансформации, выраженная SELECT-запросом.

Также есть отдельный конфигурационный файл, из которого потом генерируется документация. Все важные моменты, которые необходимо учесть при формировании таблиц (например, описание полей) вы можете прописать в descriptions в этом файле.

Вы можете настроить все таким образом, чтобы дескрипшн или лейблы таблицы считывались из этого конфигурационного файла и при формировании таблицы записывались прямо в BigQuery. Это удобно, так как вы получаете и документацию, и краткое описание таблицы внутри GBQ.

Файл конфигурации (.yml) параметры, настройки, тесты, документация.

dbt CLI

dbt поставляется в двух версиях: консоль и cloud. То есть инструмент может быть как локальным, так и размещаться в Google Cloud и быть полноценным микросервисом.

Как в любой консоли здесь есть набор команд. Базовая это dbt run, которая как раз просчитывает и формирует подряд с учетом зависимостей все ваши модели. Как результат она пишет количество обработанных байтов или строк. Кроме dbt run, есть dbt test, чтобы проверить, корректно ли все посчиталось, и другие команды.

Также у вас есть возможность привязать свой dbt проект к Cloud Source Repositories или GitHub. То есть фактически все будет вертеться вокруг вашего cloud-проекта, и сам dbt будет там работать.

Как работать с dbt консолью:

  • В текстовом редакторе, например Atom, вы формируете новые таблички, локально рассчитываете, проверяете, корректно ли все посчиталось. В случае каких-либо проблем можете все это дело поправить.

  • После этого используете команду git push для выгрузки содержимого локального репозитория в удаленный репозиторий. И сам dbt с вашими новыми моделями через Cloud Build билдятся в новый контейнер и запускаются.

Процесс работы довольно удобный, контролируемый и мы всегда можем увидеть, если что-то пошло не так.

dbt Cloud

Если использовать консоль для вас сложно или нецелесообразно, вы можете попробовать dbt Cloud. По сути это веб-интерфейс той же самой консоли. Простой сайт, на котором можно вносить правки в свои модели, добавлять макросы. Вы можете запускать как весь проект, так и отдельные модели. Здесь же хостится документация по проекту. Регистрация для первого пользователя бесплатна, для каждого следующего $50 в месяц.

Функционал dbt

Зависимости (Refs)

О них мы говорили в начале статьи у вас должна быть четкая иерархия запуска расчета таблиц. dbt позволяет указывать эти зависимости прямо внутри каждого запроса. На базе этих зависимостей, во-первых, происходит расчет подряд необходимых табличек, а во-вторых, строится прямой асинхронный граф (Directed Acyclic Graph).

Прямой асинхронный граф (Directed Acyclic Graph)

Это часть документации, которая также формируется самим dbt. Благодаря этому графу вы можете посмотреть, как именно формируются ваши таблицы. Этого, конечно, очень не хватает в Google BigQuery. Потому что, если в вашем проекте больше 2-3 датасетов с несколькими таблицами, довольно сложно разобраться, как они все формируются.

В dbt вы можете кликнуть на конкретную таблицу и посмотреть, из каких таблиц она собирается. На рисунке совсем простая схема. Но чем она больше, тем удобнее работать с нею благодаря графу. При каком-то дебаге можно идти сверху вниз и на каждом этапе проверять, где возникла проблема.

Кроме того, вы можете запускать не весь проект, а только какую-то часть. Например, можно пересчитать одну табличку в графе (и все, из которых она формируется), не изменяя вашу конечную таблицу. И это классно, потому что большой проект может считаться несколько минут.

Шаблоны (Loops)

Вторая полезная вещь это использование Jinja. Это такой язык шаблонов. Шаблонизатор, в котором есть циклы, переменные и все остальное. Их можно указывать для похожих сущностей.

Например, в Google Analytics есть параметры событий, и у вас есть набор кастомных параметров, которые описываются в событии. Чтобы каждый раз не прописывать для каждого параметра одинаковые куски кода, в которых меняется одно-два значения, вы можете все это сделать в цикле. Список необходимых параметров можно указать в самом запросе или задать переменные в проекте.

Это удобно, потому что при создании нового проекта вы можете в переменных самого dbt прописать все нужные значения событий, параметров событий и т.д. Вы один раз прописываете в шаблонах все необходимые вычисления и та же самая модель, которая работала на старом проекте, сработает и на новом. Это значительно экономит время, если у нас много повторяющихся задач. Сам запрос становится меньше, а значит, его легче читать и проверять.

Переменные (Variables)

Переменные в dbt двух типов: можно создать переменную в рамках всего проекта или в рамках конкретной модели.

Макросы (Macros)

Это кусочки запроса, которые можно прописать отдельно и сложить в папку Macros. Они будут выполнять полезные преобразования данных. Например, в каждой модели вы можете прописать, в какой конкретно датасет будет складываться таблица. И не важно, в какой папке это датасет у вас лежит.

Благодаря макросам вы можете преобразовывать базовые вещи, подстраивать dbt под себя и формировать результат расчета на порядок проще и удобнее.

Пакеты макросов

Кроме того, есть библиотеки макросов, которые содержат массу полезных пакетов для привычных операций по работе с датами, геолокацией, логированием и т.д. То есть это готовые блоки, которые делают что-то полезное с вашими моделями.

Materializations

Еще одна полезная функция, которая значительно экономит ресурсы.

Допустим, вы не используете dbt. Когда вы обращаетесь в BigQuery к своей партиционной таблице, например делаете select * from [название вашей таблицы], то вы обращаетесь ко всему датасету. Если у вас немного трафика, GA 4 или стриминг OWOX BI настроены недавно, то вы обработаете небольшое количество данных. Если же у вас много данных и большие таблицы, то каждый такой запрос будет дорого стоить.

По факту инкрементальная модель dbt позволяет вам сначала удалять данные в формируемой таблице (например, по дате или по order_id), а потом записывать новые.

Например, в GA 4 данные за вчера, которые образовались в табличке events, еще не полностью готовы. Если вы посмотрите на логи, то вы увидите, что система перезатирает данные. Она формирует из таблицы intraday таблицу events и перезаписывает данные еще на два дня назад.

Используя инкрементальные модели, вы можете нивелировать этот фактор и на каждый день перезаписывать вчера, позавчера и позапозавчера. Вы не работает со всем датасетом, а только с определенной частью. Соответственно данные, которые у вас после этого будут формироваться (какие-нибудь агрегированные таблицы и отчеты), также могут работать по этой логике. То есть у вас на каждый день будет просчет не только вчерашнего дня, не всего периода, а только последних трех дней.

Обычно объем данных, которые при этом формируются, не очень большой. Это позволяет вам работать с меньшим количеством данных и значительно экономит ресурсы и деньги.

Тесты

По умолчанию при формировании таблиц вы можете тестировать данные, которые в них появляются, а также настроить отправку уведомлений об обнаруженных ошибках.

Примеры тестов, которые можно проводить в dbt:

  • Not Null.

  • Unique.

  • Reference Integrity ссылочная целостность (например, customer_id в таблице orders соответствует id в таблице customers).

  • Соответствие списку допустимых значений.

  • Custom data tests.

Тестирование полезная штука, потому что данных все больше и больше, и контролировать их вручную все сложнее.

Документация

dbt позволяет формировать документацию по всем вашим моделям. Это descriptions, описания полей, тесты, как формируются таблицы, возможность посмотреть сам запрос и то, что получается после обработки, то есть вызывается непосредственно в Google BigQuery.

Такая документация очень упрощает ввод в курс дела нового сотрудника или поиск ошибок и их причин.

DEV TEST PROD

В dbt есть возможность разделения расчетов на разные среды. Если ваш проект на этапе тестирования, вы можете прямо в модели описать, что нужно работать не со всем массивом данных, а с выбранным куском, например, за последние три дня. В плане тестов и разработки это полезный функционал, который позволяет сэкономить ресурсы при процессинге данных в Google BigQuery.

Подключение Git

Вы можете подключить к своему dbt проекту любой Git, например GitHub или Google Cloud Source Repositories, и полноценно управлять всеми моделями, которые у вас есть.

Логгинг через вебхуки (Logging via webhooks)

Вы можете складывать результаты тестов и расчетов в какой-нибудь pop-up и потом отправлять через Google Cloud, куда вам надо. Или складывать их в отдельную таблицу GBQ и выводить визуализацию, как все посчиталось.

Как использовать dbt

dbt Сloud

В dbt cloud есть две опции: сама модель, конфигурационный файл, и что-то типа cron. Вы можете указать конкретную модель либо сделать для модели тег или лебл. Это позволит просчитывать, не определенную логически сгруппированную папку, а все папки по какому-то тегу. Например, все, что касается заказов или источников трафика.

Также здесь есть schedule, с помощью которого можно задать периодичность расчета данных.

Google Cloud Cloud Shell

Еще один вариант использования dbt. В Google Cloud есть такой инструмент, как Cloud Shell. Если вы его откроете, то фактически окажетесь внутри apenjin. То есть у вас есть возможность развернуть dbt не в доке, а прямо в apenjin.

Это не очень удобно с точки зрения полноценной работы, в плане использования запросов, кронов и всего остального, но как вариант работы в облаке вполне сгодится. Все необходимые изменения можно делать внутри облака, не выгружая данные локально на компьютер.

Google Cloud Cloud Run

Схема работы с dbt, Google Cloud и Cloud Run выглядит примерно так:

  1. В Atom или другом редакторе вносим правки, делаем для новых правок отдельные бренчи.

  2. После этого коммитим изменения и пушим в Cloud репозиторий.

  3. По факту пуша через Cloud Build формируется новая версия нашего Cloud Run и запускается через Cloud Scheduler по необходимому расписанию.

  4. Как результат работы dbt, который находится в Cloud Run, все это рассчитывается в BigQuery, откуда идет уже в Data Studio.

  5. Логи можно складывать через pop-up в Telegram по определенным правилам например, пушить не все, а только какие-то важные изменения.

Такая инфраструктура позволяет довольно легко переносить запросы с одного Cloud проекта на другой и контролировать все происходящее с расчетами в dbt. Благодаря использованию Git вы четко понимаете, кто в вашей команде что запушил, куда, зачем и почему.

Подробнее..

Курсы валют и аналитика использование обменных курсов в Хранилище Данных

19.05.2021 16:06:46 | Автор: admin

Привет! На связи Артемий Analytics Engineer из Wheely.

Сегодня хотел бы поговорить о вопросах конвертирования финансовых показателей в разные валюты. Вопрос достаточно актуальный, так как большое количество компаний имеют мультинациональные зоны присутствия, строят аналитику глобального масштаба, готовят отчетность по международным стандартам.

Покажу как этот вопрос решается с помощью современных подходов на примере кейса Wheely:

  • Расширение списка базовых валют

  • Регулярное обновление и получения актуальных курсов

  • Обеспечение корректности исторических показателей

  • Максимальное удобство и простота использования в аналитических инструментах

Велком под кат для разбора решения проблемы учета мультивалютных метрик и показателей: Open Exchange Rate, Airflow, Redshift Spectrum, dbt.


Новые требования к сервису валютных курсов

В качестве legacy-источника использовался веб-сервис ЦБ РФ. Однако с изменяющимися требованиями и расширением зон присутствия компании его стало недостаточно. Например, по причине отсутствия котировки AED (дирхам ОАЭ). Для кого-то могут быть актуальны курсы криптовалют BTC, ETH, которые в веб-сервисе ЦБ РФ тоже отсутствуют.

Новые требования можно суммировать следующим образом:

  • Поддержка расширенного набора базовых валют, которые отсутствуют в API ЦБ РФ

  • Получение самых актуальных котировок, включая внутридневные курсы

  • Минимизация трансформаций данных вне Хранилища Данных (лучше если их вообще нет)

Матрица новых требований к работе с курсами валютМатрица новых требований к работе с курсами валют

Задачи, которые предстоит решить легко визуализировать в виде матрицы. Красным помечены области, поддержку которых предстоит добавить:

  • Интеграция нового API для уже использующихся курсов

  • Добавление новых базовых валют в выгрузку

  • Получение ретроспективных (исторических) данных по новым валютам за прошлые периоды

  • Архивирование курсов из legacy-источника

Легаси приложение по выгрузке курсов валют формировало pivot-таблицу с коэффициентом для каждой пары в отдельном столбце. Это удобно, когда у нас есть строго фиксированный набор валют и наименования колонок, но превращается в головную боль если список валют необходимо расширить.

Появилось желание уйти от всех трансформаций и формирований таблиц в pandas до того как данные попадают в Хранилище. Здесь я придерживаюсь принципа применения всех трансформаций (T в ELT) в одном месте, и помогает мне в этом замечательный инструмент dbt.

Интеграция с новым поставщиком данных

Как уже стало понятно, без внешнего поставщика данных обойтись не получится, поэтому предлагаю рассмотреть один из ряда провайдеров курсов валют https://openexchangerates.org/

Минимальный необходимый план Developer включает в себя:

  • 10.000 запросов ежемесячно (более чем достаточно)

  • Ежечасные внутридневные обновления курсов

  • Широкий набор базовых валют, включая криптовалюты

Доступные методы API:

Для получения актуальных курсов валют воспользуемся API endpoint /latest.json

Простой запрос-ответ может выглядеть следующим образом:

Установка на расписание в Airflow

Для регулярного получения актуальных курсов валют я воспользуюсь инструментом Airflow. Apache Airflow де-факто стандарт в области оркестрации данных, data engineering и управления пайплайнами.

Смысловая составляющая графа задачи (DAG):

  • Сделать запрос к API

  • Сохранить полученный ответ (например, в виде уникального ключа на S3)

  • Уведомить в Slack в случае ошибки

Конфигурация DAG:

  • Базовые валюты (base currency), от которых отсчитываем курсы

  • Синхронизация расписание запусков с расчетом витрин в Хранилище Данных

  • Токен доступа к сервису

Самый простой DAG состоит из одного таска с вызовом простого shell-скрипта:

TS=`date +"%Y-%m-%d-%H-%M-%S-%Z"` curl -H "Authorization: Token $OXR_TOKEN" \ "https://openexchangerates.org/api/historical/$BUSINESS_DT.json?base=$BASE_CURRENCY&symbols=$SYMBOLS" \ | aws s3 cp - s3://$BUCKET/$BUCKET_PATH/$BUSINESS_DT-$BASE_CURRENCY-$TS.json

Вот как выглядит результат регулярной работы скрипта в S3:

Сегодня в штатном режиме выполняется около 25 обращений к сервису в сутки, статистика выглядит следующим образом:

Выгрузка истории по новым валютам

После обеспечения регулярной выгрузки всех необходимых валют, можно приступить к формированию истории по новым базовым валютам (которой, очевидно, нет). Это позволит переводить в новые валюты суммы транзакций прошлых периодов.

К сожалению, план Developer не включает обращения к API endpoint /time-series.json, и только ради этой разовой задачи не имеет смысла делать upgrade на более дорогостоящую версию.

Воспользуемся методом /historical/*.json и простым опросом API в цикле для формирования исторической выгрузки:

#!/bin/bash d=2011-01-01while [ "$d" != 2021-02-19 ]; do echo $d curl -H "Authorization: Token $TOKEN" "https://openexchangerates.org/api/historical/$d.json?base=AED&symbols=AED,GBP,EUR,RUB,USD" > ./export/$d.json d=$(date -j -v +1d -f "%Y-%m-%d" $d +%Y-%m-%d)done

Пиковая нагрузка вызвала вопросы у коллег, которые тоже пользуются сервисом, но это была разовая акция:

Архивирование исторических курсов валют

Вся история обменных курсов полученная из legacy-источника ЦБ РФ до даты X (перехода на новый сервис-провайдер) подлежит архивированию в неизменном виде.

Я хочу сохранить все те курсы, которые мы показывали в своих аналитических инструментах без изменений. То есть чтобы суммы в дашбордах и отчетах бизнес-пользователей не были изменены ни на копейку.

Для этого я выполню выгрузку накопленных значений обменных курсов за весь исторический период в Data Lake. Более детально, я произведу:

  • Трансформацию legacy pivot-таблицы в двумерную

  • Запись в колоночный формат PARQUET в AWS S3

Формирование архива в S3 в формате PARQUET
CREATE EXTERNAL TABLE spectrum.currencies_cbrfSTORED AS PARQUETLOCATION 's3://<BUCKET>/dwh/currencies_cbrf/' ASWITH base AS (   SELECT 'EUR' AS base_currency   UNION ALL   SELECT 'GBP'   UNION ALL   SELECT 'RUB'   UNION ALL   SELECT 'USD')SELECT   "day" AS business_dt   ,b.base_currency   ,CASE b.base_currency       WHEN 'EUR' THEN 1       WHEN 'GBP' THEN gbp_to_eur       WHEN 'RUB' THEN rub_to_eur       WHEN 'USD' THEN usd_to_eur       ELSE NULL     END AS eur   ,CASE b.base_currency       WHEN 'EUR' THEN eur_to_gbp       WHEN 'GBP' THEN 1       WHEN 'RUB' THEN rub_to_gbp       WHEN 'USD' THEN usd_to_gbp       ELSE NULL     END AS gbp   ,CASE b.base_currency       WHEN 'EUR' THEN eur_to_rub       WHEN 'GBP' THEN gbp_to_rub       WHEN 'RUB' THEN 1       WHEN 'USD' THEN usd_to_rub       ELSE NULL     END AS rub   ,CASE b.base_currency       WHEN 'EUR' THEN eur_to_usd       WHEN 'GBP' THEN gbp_to_usd       WHEN 'RUB' THEN rub_to_usd       WHEN 'USD' THEN 1       ELSE NULL     END AS usd     FROM ext.currencies c   CROSS JOIN base b;

Таким образом, в хранилище S3 у меня теперь есть статический снимок всех обменных курсов, когда-либо использованных в аналитических приложениях, сериализованный в оптимизированный колоночный формат со сжатием. В случае необходимости пересчета витрин и исторических данных я запросто смогу воспользоваться этими курсами.

Доступ к данным из DWH через S3 External Table

А теперь самое интересное из своего аналитического движка Amazon Redshift я хочу иметь возможность просто и быстро обращаться к самым актуальным курсам валют, использовать их в своих трансформациях.

Оптимальное решение создание внешних таблиц EXTERNAL TABLE, которые обеспечивают SQL-доступ к данным, хранящимся в S3. При этом нам доступно чтение полуструктурированных данных в формате JSON, бинарных данных в форматах AVRO, ORC, PARQUET и другие опции. Продукт имеет название Redshift Spectrum и тесно связан с SQL-движком Amazon Athena, который имеет много общего с Presto.

CREATE EXTERNAL TABLE IF NOT EXISTS spectrum.currencies_oxr (   "timestamp" bigint   , base varchar(3)   , rates struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>)ROW format serde 'org.openx.data.jsonserde.JsonSerDe'LOCATION 's3://<BUCKET>/dwh/currencies/';

Обратите внимание на обращение ко вложенному документу rates с помощью создания типа данных struct.

Теперь добавим к этой задаче секретную силу dbt. Модуль dbt-external-tables позволяет автоматизировать создание EXTERNAL TABLES и зарегистрировать их в качестве источников данных:

   - name: external     schema: spectrum     tags: ["spectrum"]     loader: S3     description: "External data stored in S3 accessed vith Redshift Spectrum"     tables:       - name: currencies_oxr         description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"         freshness:           error_after: {count: 15, period: hour}         loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'         external:           location: "s3://<BUCKET>/dwh/currencies/"           row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"         columns:           - name: timestamp             data_type: bigint           - name: base             data_type: varchar(3)           - name: rates             data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>

Немаловажным элементом является проверка своевременности данных source freshness test на курсы валют. Тем самым мы будем постоянно держать руку на пульсе поступления актуальных данных в Хранилище. Очень важно рассчитывать все финансовые метрики корректно и в срок, а без актуальных значений курсов задачу решить невозможно.

В случае отставания данных более 15 часов без свежих обменных курсов мы тут же получаем уведомление в Slack.

Для прозрачности и простоты пользователей объединим исторические данные (архив) и постоянно поступающие актуальные курсы (новый API) в одну модель currencies:

Объединение исторических и новых данных в единый справочник
{{   config(       materialized='table',       dist='all',       sort=["business_dt", "base_currency"]   )}} with cbrf as (  select      business_dt   , null as business_ts   , base_currency   , aed   , eur   , gbp   , rub   , usd  from {{ source('external', 'currencies_cbrf') }} where business_dt <= '2021-02-18' ), oxr_all as (    select      (timestamp 'epoch' + o."timestamp" * interval '1 second')::date as business_dt   , (timestamp 'epoch' + o."timestamp" * interval '1 second') as business_ts   , o.base as base_currency   , o.rates.aed::decimal(10,4) as aed   , o.rates.eur::decimal(10,4) as eur   , o.rates.gbp::decimal(10,4) as gbp   , o.rates.rub::decimal(10,4) as rub   , o.rates.usd::decimal(10,4) as usd   , row_number() over (partition by base_currency, business_dt order by business_ts desc) as rn    from {{ source('external', 'currencies_oxr') }} as o   where business_dt > '2021-02-18' ), oxr as (  select      business_dt   , business_ts   , base_currency   , aed   , eur   , gbp   , rub   , usd  from {{ ref('stg_currencies_oxr_all') }} where rn = 1 ), united as (  select      business_dt   , business_ts   , base_currency   , aed   , eur   , gbp   , rub   , usd  from cbrf  union all  select      business_dt   , business_ts   , base_currency   , aed   , eur   , gbp   , rub   , usd  from oxr ) select    business_dt , business_ts , base_currency , aed , eur , gbp , rub , usd from united

При этом физически справочник с курсами валют копируется на каждую ноду аналитического кластера Redshift и хранится в отсортированном по дате и базовой валюте виде для ускорения работы запросов.

Использование курсов в моделировании данных

В целом, работа с курсами валют для аналитиков и инженеров, которые развивают Хранилище Данных не изменилась и осталась весьма простой. Все детали использования нового API, обращения к внешним полу-структурированным документам JSON в S3, объединению с архивными данными скрыты . В своих трансформациях достаточно сделать простой джоин на таблицу с курсами валют:

   select        -- price_details       , r.currency       , {{ convert_currency('price', 'currency') }}       , {{ convert_currency('discount', 'currency') }}       , {{ convert_currency('insurance', 'currency') }}       , {{ convert_currency('tips', 'currency') }}       , {{ convert_currency('parking', 'currency') }}       , {{ convert_currency('toll_road', 'currency') }}    from {{ ref('requests') }} r       left join {{ ref('stg_currencies') }} currencies on r.completed_dt_utc = currencies.business_dt           and r.currency = currencies.base_currency

Сами метрики конвертируются при помощи простого макроса, который на вход принимает колонку с исходной суммой и колонку с исходным кодом валюты:

-- currency conversion macro{% macro convert_currency(convert_column, currency_code_column) -%}      ( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed   , ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur   , ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp   , ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub   , ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd {%- endmacro %}

Практико-ориентированное развитие

Работа с данными одно из наиболее востребованных и бурно развивающихся направлений. Каждый день я нахожу новые интересные задачи и придумываю решения для них. Это захватывающий и интересный путь, расширяющий горизонты.

В конце мая состоится юбилейный запуск курса Data Engineer в ОТУС, в котором я принимаю участие в роли преподавателя.

По прошествии двух лет программа постоянно менялась, адаптировалась. Ближайший запуск принесет ряд нововведений и будет построен вокруг кейсов реальных прикладных проблем инженеров:

  • Data Architecture

  • Data Lake

  • Data Warehouse

  • NoSQL / NewSQL

  • MLOps

Детально с программой можно ознакомиться на лендинге курса.

Также я делюсь своими авторскими заметками и планами в телеграм-канале Technology Enthusiast.

Благодарю за внимание.

Подробнее..

Моделирование данных зачем оно нужно и какие преимущества дает бизнесу

14.04.2021 12:16:12 | Автор: admin

Когда маркетологам нужно получить новый отчет или изменить существующий, они вынуждены обращаться к аналитикам и ждать, пока те подготовят данные. Аналитики строят отчеты с помощью SQL. Со временем таких SQL запросов становится все больше, а логика в них все сложнее. В результате маркетологи теряют время и упускают возможности, а аналитикам приходится заниматься рутиной вместо интересных задач. Как трансформация и моделирование данных могут существенно облегчить жизнь и тем, и другим читайте под катом.

Что такое трансформация и моделирование данных?

Чтобы ответить на этот вопрос, разберемся с основными терминами, которые вы встретите в этой статье.

Модель данных это описание объектов и их свойств; связей между объектами и ссылок на источники данных. Связь между объектами реализуется через специальные свойства ключи.

Схема данных это шаблон модели, который разработан под конкретную бизнес-нишу. Схема позволяет создавать модель не с чистого листа, а учитывая наш предыдущий опыт.

Схема данных описывает некую предметную область, например, электронную коммерцию. В ней есть заказы, пользователи, сессии. У SaaS-бизнесов могут быть дополнительные сущности, например, подписка или тарифный план. У телеком компаний и банков подтвержденные заявки и т.д.

Как выглядит модель данных пример для Ecommerce:

Трансформация данных это процесс преобразования данных из одной структуры в другую.

Моделирование данных это трансформация данных в структуру, соответствующую требованиям модели.

Какую проблему решает моделирование данных?

Если представить основные этапы работы с данными в виде графика...

...можно увидеть, что на между этапами Preparation Reporting (Подготовка данных Создание отчетов) и возникает больше всего проблем.

Когда маркетологу нужно: получить данные для отчета в новой структуре; добавить колонку в готовый отчет; изменить логику объединения данных в готовом отчете он вынужден обращаться к аналитику и ждать.

Когда аналитику нужно: приджойнить в готовый отчет дополнительные данные; обновить логику расчета определенной метрики; заменить источник данных на источник с другой структурой он тратит значительное время на понимание и рутинное изменение надцати SQL-запросов.

Чтобы понять, как именно моделирование данных решает эти проблемы, рассмотрим подробнее процесс подготовки данных. Его можно разбить на три этапа:

1. Моделирование. На этом этапе аналитик задается вопросами: Как собранные данные соотносятся с бизнес-сущностями?, Какие способы объединения данных допустимы?.

Моделирование достаточно универсальная задача. Например, когда маркетолог спрашивает у аналитика: А мы вообще сможем объединить данные из источников X и Y?. Это задача моделирования.

2. Виртуализация. На этом этапе решается вопрос: Какая структура и вариант объединения данных нужны для конкретного отчета?. Структура может быть получена с помощью разных вариантов объединения, но какой нам нужен в данном случае? Нам нужны источники последней сессии или всех? Одну и ту же структуру мы можем получить разными способами. Для виртуализации аналитик должен ответить на эти вопросы и создать такой SQL-запрос, который эти данные вернет.

Задача виртуализации данных, когда нам нужно получить их в каком-то новом срезе, используется регулярно. Потому что каждый раз, когда нам нужно что-то поменять в отчете, мы сталкиваемся именно с этой задачей.

3. Трансформация. На этом этапе аналитик решает, в какой структуре необходимо подготовить данные для их объединения в отчете. Готовит данные: занимается дедупликацией, отсеиванием всплесков, нормализацией. То есть это все манипуляции с входным потоком для того, чтобы потом эти данные можно было использовать.

Задача трансформации достаточно нетривиальна. Потому что у каждого бизнеса есть уникальные требования. Она в целом менее универсальная и используется как минимум на этапе запуска. То есть мы сделали какой-то отчет и один раз как минимум определили правила трансформации.

Маркетологи зависят от аналитиков, потому что у них нет готового продукта, который решал бы задачу виртуализации. Из-за этого для каждого нового отчета, когда нужно добавить колонку, аналитик трансформирует данные снова и снова. Чтобы этого избежать, нужно решение, которое позволит трансформировать данные и хранить их в структуре, пригодной для многократного использования. Таким решением может выступать связка dbt + Google BigQuery.

Какое преимущество дает бизнесу моделирование данных?

Понятно, что маркетологи в большинстве своем далеки от темы трансформации и моделирования данных. Но без этого невозможно переиспользовать данные, подключенные к продукту, в котором вы создаете отчеты.

Как сейчас выглядит построение отчетов? Маркетолог рассказал аналитику, что нужно сделать. Тот написал SQL-запросы и построил на их результатах дашборд. Нашли расхождения, добавили условия в запросы, сделали пару итераций, получили обратную связь и таким образом довели отчет до ума. Но все решения были в контексте отчета. Это значит, что когда понадобится другой отчет с другими источниками, новыми колонками, несколькими срезами данных операцию проверки сходимости данных, валидацию нужно будет проходить заново.

С трансформацией и моделированием данных эта задача решается до того, как строится отчет. Когда мы еще не знаем, какие нам понадобятся отчеты. Но ответ на вопрос, что такое сессия, пользователь, расходы, мы сформулировали на уровне модели. То есть трансформация это преобразование данных в структуру, которая соответствует требованиям модели. Она позволяет строить отчеты, не тратя время на очередную обработку данных. При этом вы будете уверены, что отчеты построены на сходимых данных.

Способы трансформации данных для моделирования

Есть встроенный механизм регулярных запросов, которые выполняются в Google BigQuery, Scheduled Queries и AppScript. Их легко можно освоить, потому что это привычный SQL, но проводить отладку в Scheduled Queries практически нереально. Особенно, если это какой-то сложный запрос или каскад запросов.

Есть специализированные инструменты для управления SQL-запросами, например, dbt и Dataform.

dbt (data build tool) это фреймворк с открытым исходным кодом для выполнения, тестирования и документирования SQL-запросов, который позволяет привнести

элемент программной инженерии в процесс анализа данных. Он помогает оптимизировать работу с SQL-запросами: использовать макросы и шаблоны JINJA, чтобы не повторять в сотый раз одни и те же фрагменты кода.

Главная проблема, которую решают специализированные инструменты это уменьшение времени, необходимого на поддержку и обновление. Это достигается за счет удобства отладки.

Сравнение способов трансформации данных:

Если вы хотите получать больше пользы от своих данных, рекомендуем уже сейчас осваивать dbt или Dataform. Они помогут значительно упростить и ускорить создание отчетов для вашей компании.

Подробнее..

Мультитул для управления Хранилищем Данных кейс Wheely dbt

30.03.2021 00:12:15 | Автор: admin

Уже более двух лет data build tool активно используется в компании Wheely для управления Хранилищем Данных. За это время накоплен немалый опыт, мы на тернистом пути проб и ошибок к совершенству в Analytics Engineering.

Несмотря на то, что в русскоязычном сегменте уже есть несколько публикаций, посвященных применению dbt, всё ещё нельзя говорить о широкой популярности и интересе, которые продукт стремительно обретает на Западе.

Поэтому сегодня я предлагаю вам экскурсию по Хранилищу Данных Wheely. В формат публикации я попытался уложить самые яркие моменты и впечатления от использования dbt, снабдив реальными примерами, практиками и опытом. Добро пожаловать под кат.

Структура превыше всего

Измерять сложность Хранилища Данных в количестве гигабайт сегодня - дурной тон

Налить кучу тяжело интерпретируемых данных без метаинформации (читай мусора) не составит большого труда. Гораздо сложнее из этих данных получить что-то осмысленное. То, на что с уверенностью могут опираться business stakeholders, принимая решения. То, что регулярно измеряется на предмет качества и актуальности. Наконец, то, что соответствует принципам Keep it simple (KISS) и Dont repeat yourself (DRY).

Первостепенным элементом я считаю прозрачность структуры Хранилища Данных. Чаще всего DWH выстраивается согласно многослойной логике, где каждому этапу соответствует набор преобразований, детали реализации которого скрыты для последующих слоев (элемент абстракции).

Схема слоев Хранилища ДанныхСхема слоев Хранилища Данных

Зеленым цветом слой источников данных sources. Это реплики структур и таблиц из исходных систем, которые поддерживаются ELT-сервисом. Данные синхронизируются 1:1 с источником, без каких-либо преобразований. Опциональный слой flatten позволяет вложенные иерархические структуры (JSON) превратить в плоские таблицы.

Слой staging предназначен для простых преобразований: переименование полей, преобразование типов, расчет новых колонок с помощью конструкции case. На этом этапе мы готовим почву для дальнейших преобразований, приводим всё к единому виду и неймингу.

Intermediate или промежуточный слой отвечает за формирование предварительных таблиц и агрегатов, где происходит обогащение данных. Для ряда бизнес-областей мы не используем этот слой, для других логика может насчитывать до 5-10 взаимосвязанных моделей.

Кульминацией являются data marts или Витрины Данных, которые используются Data Scientists / Business Users / BI tools. Слой, в свою очередь, делится на:

  • dimensions: пользователи, компании, машины, водители, календарь

  • facts: поездки, транзакции, сеансы, продвижения, коммуникации

  • looker: материализованные представления и витрины, оптимизированные под чтение из BI-системы

Число 120 из заголовка публикации относится только к витринам данных:

Running with dbt=0.19.0Found 273 models, 493 tests, 6 snapshots, 4 analyses, 532 macros, 7 operations, 8 seed files, 81 sources, 0 exposures

На текущий момент в проекте:

  • 273 модели во всех перечисленных слоях

  • 493 теста на эти модели, включая not null, unique, foreign key, accepted values

  • 6 снапшотов для ведения истории SCD (slowly changing dimensions)

  • 532 макроса (большая часть из которых импортирована из сторонних модулей)

  • 7 operations включая vacuum + analyze

  • 81 источник данных

Помимо разбиения на логические слои, Хранилище можно нарезать по бизнес-областям. В случае необходимости есть возможность пересчитать или протестировать витрины, относящиеся к вертикалям Marketing / Supply / Growth / B2B. Например, в случае late arriving data или ручных корректировках маппингов/справочников.

Осуществляется это за счет присвоения моделям и витринам тегов, а также за счет богатых возможностей синтаксиса выбора моделей. Запустить расчет всех витрин вертикали Marketing и их вышестоящие зависимости:

dbt run -m +tag:marketing

Этот же принцип лежит в основе организации кодой базы. Все скрипты объединены в директории с общей логикой и понятными наименованиями. Сложно потеряться даже при огромном количестве моделей и витрин:

Иерархия проекта dbt
.|____staging| |____webhook| |____receipt_prod| |____core| |____wheely_prod| |____flights_prod| |____online_hours_prod| |____external| |____financial_service|____marts| |____looker| |____dim| |____snapshots| |____facts|____flatten| |____webhook| |____receipt_prod| |____wheely_prod| |____communication_prod|____audit|____sources|____aux| |____dq| | |____marts| | |____external|____intermediate

Оптимизация физической модели

Логическое разделение на слои и области - это замечательно. Но не менее важно и то, как эта логика ложится на конкретную СУБД. В случае Wheely это Amazon Redshift.

Подход с декомпозицией позволит разбить логику на понятные части, которые можно рефакторить по отдельности. Одновременно это помогает оптимизатору запросов подобрать лучший план выполнения. По такому принципу реализована одна из центральных витрин journeys (поездки).

Цепочка зависимостей витрины поездок (journeys)Цепочка зависимостей витрины поездок (journeys)

На этапе обогащения данных важна скорость склейки таблиц (join performance), поэтому данные сегментированы и отсортированы в одинаковом ключе, начиная с sources. Это позволит использовать самый быстрый вид соединения - sort merge join:

Конфигурация для оптимального соединения sort merge join
{{config(materialized='table',unique_key='request_id',dist="request_id",sort="request_id")}}

Витрина же хранится отсортированной по самым популярным колонкам доступа: city, country, completed timestamp, service group. В случае правильного подбора колонок Interleaved key позволяет значительно оптимизировать I/O и ускорить отрисовку графиков в BI-системах.

Конфигурация для быстрого чтения витрины interleaved sortkey
{{config(materialized='table',unique_key='request_id',dist="request_id",sort_type='interleaved',sort=["completed_ts_loc", "city", "country", "service_group", "is_airport", "is_wheely_journey"])}}

При этом часть моделей есть смысл материализовать в виде views (виртуальных таблиц), не занимающих дисковое пространство в СУБД. Так, слой staging, не содержащий сложных преобразований, конфигурируется на создание в виде представлений на уровне проекта:

staging:+materialized: view+schema: staging+tags: ["staging"]

Другой интересный пример результаты проверки качества данных. Выбранный тип материализации ephemeral, т.е. на уровне СУБД не будет создано ни таблицы, ни представления. При каждом обращении к такой модели будет выполнен лишь запрос. Результат такого запроса является слагаемым в суммарной таблице, содержащей метрики всех проверяемых объектов.

В свою очередь большие таблицы фактов имеет смысл наполнять инкрементально. Особенно при условии того, что факт, случившийся однажды, больше не меняет своих характеристик. Таким образом мы процессим только изменения (delta) новые факты, произошедшие после последнего обновления витрины. Обратите внимание на условие where:

Пример инкрементального наполнения витрины
{{config(materialized='incremental',sort='metadata_timestamp',dist='fine_id',unique_key='id')}}with fines as (selectfine_id, city_id, amount, details, metadata_timestamp, created_ts_utc, updated_ts_utc, created_dt_utcfrom {{ ref('stg_fines') }}where true-- filter fines arrived since last processed time{% if is_incremental() -%}and metadata_timestamp > (select max(metadata_timestamp) from {{ this }}){%- endif %}),...

Кстати, о принципах MPP и о том, как выжать максимум из аналитических СУБД я рассказываю на курсах Data Engineer и Data Warehouse Analyst (скоро первый запуск!).

SQL + Jinja = Flexibility

Высокоуровневый декларативный язык SQL прекрасен сам по себе, но вкупе с движком шаблонизации Jinja он способен творить чудеса.

Любой код, который вы используете с dbt проходит этапы compile & run. На этапе компиляции интерпретируются все шаблонизированные выражения и переменные. На этапе запуска код оборачивается в конструкцию CREATE в зависимости от выбранного типа материализации и фишек используемой СУБД: clustered by / distributed by / sorted by. Рассмотрим пример:

Model code:
{{config(materialized='table',dist="fine_id",sort="created_ts_utc")}}with details as (  select{{dbt_utils.star(from=ref('fine_details_flatten'),except=["fine_amount", "metadata_timestamp", "generated_number"])}}from {{ ref('fine_details_flatten') }}where fine_amount > 0)select * from details
Compiled code:
with details as (select  "id","fine_id","city_id","amount","description","created_ts_utc","updated_ts_utc","created_dt_utc"from "wheely"."dbt_test_akozyr"."fine_details_flatten"where fine_amount > 0)select * from details
Run code:
create table"wheely"."dbt_test_akozyr"."f_chauffeurs_fines"diststyle key distkey (fine_id)compound sortkey(created_ts_utc)as (with details as (select"id","fine_id","city_id","amount","description","created_ts_utc","updated_ts_utc","created_dt_utc"from "wheely"."dbt_test_akozyr"."fine_details_flatten"where fine_amount > 0)select * from details);

Ключевым моментом является тот факт, что пишете вы только лаконичный шаблонизированный код, а остальным занимается движок dbt. Написание boilerplate code сведено к минимуму. Фокус инженера или аналитика остается преимущественно на реализуемой логике.

Во-вторых, как происходит выстраивание цепочки связей и очередности создания витрин, продемонстрированные на картинках выше? Внимательный читатель уже заметил, что в рамках написания кода при ссылках на другие модели нет хардкода, но есть конструкция {{ ref('fine_details_flatten') }} ссылка на наименование другой модели. Она и позволяет распарсить весь проект и построить граф связей и зависимостей. Так что это тоже делается абсолютно прозрачным и органичным способом.

С помощью шаблонизации Jinja в проекте Wheely мы гибко управляем схемами данных и разделением сред dev / test / prod. В зависимости от метаданных подключения к СУБД будет выбрана схема и период исторических данных. Продакшн модели создаются в целевых схемах под технической учетной записью. Аналитики же ведут разработку каждый в своей личной песочнице, ограниченной объемом данных в 3-е последних суток. Это реализуется с помощью макроса:

Макрос управления схемами для подключений:
{% macro generate_schema_name_for_env(custom_schema_name, node) -%}{%- set default_schema = target.schema -%}{%- if target.name == 'prod' and custom_schema_name is not none -%}{{ custom_schema_name | trim }}{%- else -%}{{ default_schema }}{%- endif -%}{%- endmacro %}

Еще одним важным преимуществом является самодокументируемый код. Иными словами, из репозитория проекта автоматически можно собрать статический сайт с документацией: перечень слоев, моделей, атрибутный состав, метаинформацию о таблицах в СУБД и даже визуализировать граф зависимостей (да-да, картинки выше именно оттуда).

Не повторяйся лучше подготовь макрос

Однотипный код, повторяющиеся обращения и действия, зачастую реализуемые по принципу copy-paste нередко являются причиной ошибок и багов. В Wheely мы придерживаемся принципа Do not repeat yourself и любой сколько-нибудь похожий код шаблонизируем в макрос с параметрами. Писать и поддерживать такой код становится сплошным удовольствием.

Простой пример с конвертацией валют:
-- currency conversion macro{% macro convert_currency(convert_column, currency_code_column) -%}( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed, ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur, ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp, ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub, ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd{%- endmacro %}
Вызов макроса из модели:
select...-- price_details, r.currency, {{ convert_currency('price', 'currency') }}, {{ convert_currency('transfer_min_price', 'currency') }}, {{ convert_currency('discount', 'currency') }}, {{ convert_currency('insurance', 'currency') }}, {{ convert_currency('tips', 'currency') }}, {{ convert_currency('parking', 'currency') }}, {{ convert_currency('toll_road', 'currency') }}, {{ convert_currency('pickup_charge', 'currency') }}, {{ convert_currency('cancel_fee', 'currency') }}, {{ convert_currency('net_bookings', 'currency') }}, {{ convert_currency('gross_revenue', 'currency') }}, {{ convert_currency('service_charge', 'currency') }}...from {{ ref('requests_joined') }} r

По большому счету, макрос это просто вызов функции с передачей аргументов, на уже знакомом вам диалекте Jinja. Результатом работы макроса является готовый к исполнению SQL-скрипт. Макрос для кросс-сверки значений в колонках:

Сравнить значения двух колонок
-- compare two columns{% macro dq_compare_columns(src_column, trg_column, is_numeric=false) -%}{%- if is_numeric == true -%}{%- set src_column = 'round(' + src_column + ', 2)' -%}{%- set trg_column = 'round(' + trg_column + ', 2)' -%}{%- endif -%}CASEWHEN {{ src_column }} = {{ trg_column }} THEN 'match'WHEN {{ src_column }} IS NULL AND {{ trg_column }} IS NULL THEN 'both null'WHEN {{ src_column }} IS NULL THEN 'missing in source'WHEN {{ trg_column }} IS NULL THEN 'missing in target'WHEN {{ src_column }} <> {{ trg_column }} THEN 'mismatch'ELSE 'unknown'END{%- endmacro %}

В макрос можно запросто записать даже создание UDF-функций:

Создать UDF
-- cast epoch as human-readable timestamp{% macro create_udf() -%}{% set sql %}CREATE OR REPLACE FUNCTION {{ target.schema }}.f_bitwise_to_delimited(bitwise_column BIGINT, bits_in_column INT)RETURNS VARCHAR(512)STABLEAS $$# Convert column to binary, strip "0b" prefix, pad out with zeroesif bitwise_column is not None:b = bin(bitwise_column)[2:].zfill(bits_in_column)[:bits_in_column+1]return belse:None$$ LANGUAGE plpythonu;CREATE OR REPLACE FUNCTION {{ target.schema }}.f_decode_access_flags(access_flags INT, deleted_at TIMESTAMP)RETURNS VARCHAR(128)STABLEAS $$SELECT nvl(DECODE($2, null, null, 'deleted'), DECODE(LEN(analytics.f_bitwise_to_delimited($1, 7))::INT, 7, null, 'unknown'), DECODE(analytics.f_bitwise_to_delimited($1, 7)::INT, 0, 'active', null), DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 1, 1), 1, 'end_of_life', null), DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 7, 1), 1, 'pending', null), DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 6, 1), 1, 'rejected', null), DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 5, 1), 1, 'blocked', null), DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 4, 1), 1, 'expired_docs', null), DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 3, 1), 1, 'partner_blocked', null), DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 2, 1), 1, 'new_partner', null))$$ LANGUAGE SQL;{% endset %}{% set table = run_query(sql) %}{%- endmacro %}

Параметризовать можно и довольно сложные вещи, такие как работа с nested structures (иерархическими структурами) и выгрузка во внешние таблицы (external tables) в S3 в формате parquet. Эти примеры вполне достойны отдельных публикаций.

Не изобретай велосипед импортируй модули

Модуль или package - это набор макросов, моделей, тестов, который можно импортировать в свой проект в виде готовой к использованию библиотеки. На портале dbt hub есть неплохая подборка модулей на любой вкус, и, что самое главное, их список постоянно пополняется.

С помощью модуля логирования и добавления 2 простых hooks на каждый запуск dbt у меня как на ладони появляется статистическая информация о времени, продолжительности, флагах и параметрах развертывания. Я наглядно вижу модели анти-лидеры по потребляемым ресурсам (первые кандидаты на рефакторинг):

models:+pre-hook: "{{ logging.log_model_start_event() }}"+post-hook: "{{ logging.log_model_end_event() }}"
Мониторинг развертывания dbt моделей на кластере RedshiftМониторинг развертывания dbt моделей на кластере Redshift

Измерение календаря собирается в одну строку, при этом набор колонок поражает:

{{ dbt_date.get_date_dimension('2012-01-01', '2025-12-31') }}
Измерение календарь, сгенерированное макросомИзмерение календарь, сгенерированное макросом

С помощью модуля dbt_external_tables я уже выстраиваю полноценный Lakehouse, обращаясь из Хранилища к данным, расположенным в файловом хранилище S3. К примеру, самые свежие курсы валют, получаемые через API Open Exchange Rates в формате JSON:

External data stored in S3 accessed vith Redshift Spectrum
- name: externalschema: spectrumtags: ["spectrum"]description: "External data stored in S3 accessed vith Redshift Spectrum"tables:- name: currencies_oxrdescription: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"freshness:error_after: {count: 15, period: hour}loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'external:location: "s3://data-analytics.wheely.com/dwh/currencies/"row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"columns:- name: timestampdata_type: bigint- name: basedata_type: varchar(3)- name: ratesdata_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>

Ну и, конечно, ночью по расписанию работает VACUUM + ANALYZE, ведь Redshift это форк PostgreSQL. Дефрагментация, сортировка данных в таблицах, сбор статистик. Иначе говоря поддержание кластера в тонусе, пока dba спит.

dbt run-operation redshift_maintenance --args '{include_schemas: ["staging", "flatten", "intermediate", "analytics", "meta", "snapshots", "ad_hoc"]}'
VACUUM + ANALYZEVACUUM + ANALYZE

Running in production: используем dbt Cloud в Wheely

dbt Cloud это платный сервис для управления проектами, основанными на движке dbt. За небольшие деньги команда получает возможность создавать окружения, конфигурировать джобы и таски, устанавливать расписание запусков, и даже полноценную IDE (среду разработки!) в браузере.

Прежде всего речь идет об удобстве использования: приятный и понятный визуальный интерфейс, простота поиска и ориентирования, акцентирование ключевой информации при разборе ошибок и чтении логов:

Во-вторых, это гибкие настройки условий запуска джобов. Начиная от простых условий с выбором дня недели и времени, продолжая кастомными cron-выражениями, и заканчивая триггером запуска через webhook. Например, именно через вебхук мы связываем в цепочку завершение выгрузок для кросс-сверки и начало расчета соответствующих витрин в Хранилище (kicked off from Airflow):

В третьих, это консолидация всех важных уведомлений одном месте. Для нашей команды это канал в Slack и любые проблемы связанные с Production-запусками. В режиме реального времени мы получаем все уведомления об инцидентах с деталями и ссылками на подробный лог.

Сам dbt является проектом с открытым исходным кодом, и использование продукта dbt Cloud представляется очень удобным, но не обязательным. В качестве альтернативных способов можно выбрать любой другой оркестратор: Airflow, Prefect, Dagster, и даже просто cron. В своем проекте Сквозная Аналитика я организую оркестрацию при помощи Github Actions. Выходит очень занятно.

Вместо заключения

В команде аналитики Wheely мы стремимся к тому, чтобы работа была наполнена смыслом и приносила удовлетворение и пользу, но не раздражение и негодование. Все перечисленные в публикации достоинства не могут не вызвать симпатию со стороны новых членов команды и значительно ускоряют процессы адаптации и onboarding.

Сегодня бизнес и команда активно растут. Доступен ряд интересных позиций:

У тебя есть возможность узнать детали из первых уст и получить прямую рекомендацию.

Также время от времени я провожу вебинары и выступления, на которых подробнее рассказываю о своей работе и проектах. Следить за моими публикациями можно в телеграм-канале Technology Enthusiast https://t.me/enthusiastech

Пишите, задавайте вопросы и, конечно, пробуйте dbt в своих проектах!

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru