Русский
Русский
English
Статистика
Реклама

Currency

Курсы валют и аналитика использование обменных курсов в Хранилище Данных

19.05.2021 16:06:46 | Автор: admin

Привет! На связи Артемий Analytics Engineer из Wheely.

Сегодня хотел бы поговорить о вопросах конвертирования финансовых показателей в разные валюты. Вопрос достаточно актуальный, так как большое количество компаний имеют мультинациональные зоны присутствия, строят аналитику глобального масштаба, готовят отчетность по международным стандартам.

Покажу как этот вопрос решается с помощью современных подходов на примере кейса Wheely:

  • Расширение списка базовых валют

  • Регулярное обновление и получения актуальных курсов

  • Обеспечение корректности исторических показателей

  • Максимальное удобство и простота использования в аналитических инструментах

Велком под кат для разбора решения проблемы учета мультивалютных метрик и показателей: Open Exchange Rate, Airflow, Redshift Spectrum, dbt.


Новые требования к сервису валютных курсов

В качестве legacy-источника использовался веб-сервис ЦБ РФ. Однако с изменяющимися требованиями и расширением зон присутствия компании его стало недостаточно. Например, по причине отсутствия котировки AED (дирхам ОАЭ). Для кого-то могут быть актуальны курсы криптовалют BTC, ETH, которые в веб-сервисе ЦБ РФ тоже отсутствуют.

Новые требования можно суммировать следующим образом:

  • Поддержка расширенного набора базовых валют, которые отсутствуют в API ЦБ РФ

  • Получение самых актуальных котировок, включая внутридневные курсы

  • Минимизация трансформаций данных вне Хранилища Данных (лучше если их вообще нет)

Матрица новых требований к работе с курсами валютМатрица новых требований к работе с курсами валют

Задачи, которые предстоит решить легко визуализировать в виде матрицы. Красным помечены области, поддержку которых предстоит добавить:

  • Интеграция нового API для уже использующихся курсов

  • Добавление новых базовых валют в выгрузку

  • Получение ретроспективных (исторических) данных по новым валютам за прошлые периоды

  • Архивирование курсов из legacy-источника

Легаси приложение по выгрузке курсов валют формировало pivot-таблицу с коэффициентом для каждой пары в отдельном столбце. Это удобно, когда у нас есть строго фиксированный набор валют и наименования колонок, но превращается в головную боль если список валют необходимо расширить.

Появилось желание уйти от всех трансформаций и формирований таблиц в pandas до того как данные попадают в Хранилище. Здесь я придерживаюсь принципа применения всех трансформаций (T в ELT) в одном месте, и помогает мне в этом замечательный инструмент dbt.

Интеграция с новым поставщиком данных

Как уже стало понятно, без внешнего поставщика данных обойтись не получится, поэтому предлагаю рассмотреть один из ряда провайдеров курсов валют https://openexchangerates.org/

Минимальный необходимый план Developer включает в себя:

  • 10.000 запросов ежемесячно (более чем достаточно)

  • Ежечасные внутридневные обновления курсов

  • Широкий набор базовых валют, включая криптовалюты

Доступные методы API:

Для получения актуальных курсов валют воспользуемся API endpoint /latest.json

Простой запрос-ответ может выглядеть следующим образом:

Установка на расписание в Airflow

Для регулярного получения актуальных курсов валют я воспользуюсь инструментом Airflow. Apache Airflow де-факто стандарт в области оркестрации данных, data engineering и управления пайплайнами.

Смысловая составляющая графа задачи (DAG):

  • Сделать запрос к API

  • Сохранить полученный ответ (например, в виде уникального ключа на S3)

  • Уведомить в Slack в случае ошибки

Конфигурация DAG:

  • Базовые валюты (base currency), от которых отсчитываем курсы

  • Синхронизация расписание запусков с расчетом витрин в Хранилище Данных

  • Токен доступа к сервису

Самый простой DAG состоит из одного таска с вызовом простого shell-скрипта:

TS=`date +"%Y-%m-%d-%H-%M-%S-%Z"` curl -H "Authorization: Token $OXR_TOKEN" \ "https://openexchangerates.org/api/historical/$BUSINESS_DT.json?base=$BASE_CURRENCY&symbols=$SYMBOLS" \ | aws s3 cp - s3://$BUCKET/$BUCKET_PATH/$BUSINESS_DT-$BASE_CURRENCY-$TS.json

Вот как выглядит результат регулярной работы скрипта в S3:

Сегодня в штатном режиме выполняется около 25 обращений к сервису в сутки, статистика выглядит следующим образом:

Выгрузка истории по новым валютам

После обеспечения регулярной выгрузки всех необходимых валют, можно приступить к формированию истории по новым базовым валютам (которой, очевидно, нет). Это позволит переводить в новые валюты суммы транзакций прошлых периодов.

К сожалению, план Developer не включает обращения к API endpoint /time-series.json, и только ради этой разовой задачи не имеет смысла делать upgrade на более дорогостоящую версию.

Воспользуемся методом /historical/*.json и простым опросом API в цикле для формирования исторической выгрузки:

#!/bin/bash d=2011-01-01while [ "$d" != 2021-02-19 ]; do echo $d curl -H "Authorization: Token $TOKEN" "https://openexchangerates.org/api/historical/$d.json?base=AED&symbols=AED,GBP,EUR,RUB,USD" > ./export/$d.json d=$(date -j -v +1d -f "%Y-%m-%d" $d +%Y-%m-%d)done

Пиковая нагрузка вызвала вопросы у коллег, которые тоже пользуются сервисом, но это была разовая акция:

Архивирование исторических курсов валют

Вся история обменных курсов полученная из legacy-источника ЦБ РФ до даты X (перехода на новый сервис-провайдер) подлежит архивированию в неизменном виде.

Я хочу сохранить все те курсы, которые мы показывали в своих аналитических инструментах без изменений. То есть чтобы суммы в дашбордах и отчетах бизнес-пользователей не были изменены ни на копейку.

Для этого я выполню выгрузку накопленных значений обменных курсов за весь исторический период в Data Lake. Более детально, я произведу:

  • Трансформацию legacy pivot-таблицы в двумерную

  • Запись в колоночный формат PARQUET в AWS S3

Формирование архива в S3 в формате PARQUET
CREATE EXTERNAL TABLE spectrum.currencies_cbrfSTORED AS PARQUETLOCATION 's3://<BUCKET>/dwh/currencies_cbrf/' ASWITH base AS (   SELECT 'EUR' AS base_currency   UNION ALL   SELECT 'GBP'   UNION ALL   SELECT 'RUB'   UNION ALL   SELECT 'USD')SELECT   "day" AS business_dt   ,b.base_currency   ,CASE b.base_currency       WHEN 'EUR' THEN 1       WHEN 'GBP' THEN gbp_to_eur       WHEN 'RUB' THEN rub_to_eur       WHEN 'USD' THEN usd_to_eur       ELSE NULL     END AS eur   ,CASE b.base_currency       WHEN 'EUR' THEN eur_to_gbp       WHEN 'GBP' THEN 1       WHEN 'RUB' THEN rub_to_gbp       WHEN 'USD' THEN usd_to_gbp       ELSE NULL     END AS gbp   ,CASE b.base_currency       WHEN 'EUR' THEN eur_to_rub       WHEN 'GBP' THEN gbp_to_rub       WHEN 'RUB' THEN 1       WHEN 'USD' THEN usd_to_rub       ELSE NULL     END AS rub   ,CASE b.base_currency       WHEN 'EUR' THEN eur_to_usd       WHEN 'GBP' THEN gbp_to_usd       WHEN 'RUB' THEN rub_to_usd       WHEN 'USD' THEN 1       ELSE NULL     END AS usd     FROM ext.currencies c   CROSS JOIN base b;

Таким образом, в хранилище S3 у меня теперь есть статический снимок всех обменных курсов, когда-либо использованных в аналитических приложениях, сериализованный в оптимизированный колоночный формат со сжатием. В случае необходимости пересчета витрин и исторических данных я запросто смогу воспользоваться этими курсами.

Доступ к данным из DWH через S3 External Table

А теперь самое интересное из своего аналитического движка Amazon Redshift я хочу иметь возможность просто и быстро обращаться к самым актуальным курсам валют, использовать их в своих трансформациях.

Оптимальное решение создание внешних таблиц EXTERNAL TABLE, которые обеспечивают SQL-доступ к данным, хранящимся в S3. При этом нам доступно чтение полуструктурированных данных в формате JSON, бинарных данных в форматах AVRO, ORC, PARQUET и другие опции. Продукт имеет название Redshift Spectrum и тесно связан с SQL-движком Amazon Athena, который имеет много общего с Presto.

CREATE EXTERNAL TABLE IF NOT EXISTS spectrum.currencies_oxr (   "timestamp" bigint   , base varchar(3)   , rates struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>)ROW format serde 'org.openx.data.jsonserde.JsonSerDe'LOCATION 's3://<BUCKET>/dwh/currencies/';

Обратите внимание на обращение ко вложенному документу rates с помощью создания типа данных struct.

Теперь добавим к этой задаче секретную силу dbt. Модуль dbt-external-tables позволяет автоматизировать создание EXTERNAL TABLES и зарегистрировать их в качестве источников данных:

   - name: external     schema: spectrum     tags: ["spectrum"]     loader: S3     description: "External data stored in S3 accessed vith Redshift Spectrum"     tables:       - name: currencies_oxr         description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"         freshness:           error_after: {count: 15, period: hour}         loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'         external:           location: "s3://<BUCKET>/dwh/currencies/"           row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"         columns:           - name: timestamp             data_type: bigint           - name: base             data_type: varchar(3)           - name: rates             data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>

Немаловажным элементом является проверка своевременности данных source freshness test на курсы валют. Тем самым мы будем постоянно держать руку на пульсе поступления актуальных данных в Хранилище. Очень важно рассчитывать все финансовые метрики корректно и в срок, а без актуальных значений курсов задачу решить невозможно.

В случае отставания данных более 15 часов без свежих обменных курсов мы тут же получаем уведомление в Slack.

Для прозрачности и простоты пользователей объединим исторические данные (архив) и постоянно поступающие актуальные курсы (новый API) в одну модель currencies:

Объединение исторических и новых данных в единый справочник
{{   config(       materialized='table',       dist='all',       sort=["business_dt", "base_currency"]   )}} with cbrf as (  select      business_dt   , null as business_ts   , base_currency   , aed   , eur   , gbp   , rub   , usd  from {{ source('external', 'currencies_cbrf') }} where business_dt <= '2021-02-18' ), oxr_all as (    select      (timestamp 'epoch' + o."timestamp" * interval '1 second')::date as business_dt   , (timestamp 'epoch' + o."timestamp" * interval '1 second') as business_ts   , o.base as base_currency   , o.rates.aed::decimal(10,4) as aed   , o.rates.eur::decimal(10,4) as eur   , o.rates.gbp::decimal(10,4) as gbp   , o.rates.rub::decimal(10,4) as rub   , o.rates.usd::decimal(10,4) as usd   , row_number() over (partition by base_currency, business_dt order by business_ts desc) as rn    from {{ source('external', 'currencies_oxr') }} as o   where business_dt > '2021-02-18' ), oxr as (  select      business_dt   , business_ts   , base_currency   , aed   , eur   , gbp   , rub   , usd  from {{ ref('stg_currencies_oxr_all') }} where rn = 1 ), united as (  select      business_dt   , business_ts   , base_currency   , aed   , eur   , gbp   , rub   , usd  from cbrf  union all  select      business_dt   , business_ts   , base_currency   , aed   , eur   , gbp   , rub   , usd  from oxr ) select    business_dt , business_ts , base_currency , aed , eur , gbp , rub , usd from united

При этом физически справочник с курсами валют копируется на каждую ноду аналитического кластера Redshift и хранится в отсортированном по дате и базовой валюте виде для ускорения работы запросов.

Использование курсов в моделировании данных

В целом, работа с курсами валют для аналитиков и инженеров, которые развивают Хранилище Данных не изменилась и осталась весьма простой. Все детали использования нового API, обращения к внешним полу-структурированным документам JSON в S3, объединению с архивными данными скрыты . В своих трансформациях достаточно сделать простой джоин на таблицу с курсами валют:

   select        -- price_details       , r.currency       , {{ convert_currency('price', 'currency') }}       , {{ convert_currency('discount', 'currency') }}       , {{ convert_currency('insurance', 'currency') }}       , {{ convert_currency('tips', 'currency') }}       , {{ convert_currency('parking', 'currency') }}       , {{ convert_currency('toll_road', 'currency') }}    from {{ ref('requests') }} r       left join {{ ref('stg_currencies') }} currencies on r.completed_dt_utc = currencies.business_dt           and r.currency = currencies.base_currency

Сами метрики конвертируются при помощи простого макроса, который на вход принимает колонку с исходной суммой и колонку с исходным кодом валюты:

-- currency conversion macro{% macro convert_currency(convert_column, currency_code_column) -%}      ( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed   , ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur   , ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp   , ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub   , ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd {%- endmacro %}

Практико-ориентированное развитие

Работа с данными одно из наиболее востребованных и бурно развивающихся направлений. Каждый день я нахожу новые интересные задачи и придумываю решения для них. Это захватывающий и интересный путь, расширяющий горизонты.

В конце мая состоится юбилейный запуск курса Data Engineer в ОТУС, в котором я принимаю участие в роли преподавателя.

По прошествии двух лет программа постоянно менялась, адаптировалась. Ближайший запуск принесет ряд нововведений и будет построен вокруг кейсов реальных прикладных проблем инженеров:

  • Data Architecture

  • Data Lake

  • Data Warehouse

  • NoSQL / NewSQL

  • MLOps

Детально с программой можно ознакомиться на лендинге курса.

Также я делюсь своими авторскими заметками и планами в телеграм-канале Technology Enthusiast.

Благодарю за внимание.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru