Русский
Русский
English
Статистика
Реклама

Dag

Apache Airflow делаем ETL проще

27.07.2020 12:16:39 | Автор: admin

Привет, я Дмитрий Логвиненко Data Engineer отдела аналитики группы компаний Везёт.


Я расскажу вам о замечательном инструменте для разработки ETL-процессов Apache Airflow. Но Airflow настолько универсален и многогранен, что вам стоит присмотреться к нему даже если вы не занимаетесь потоками данных, а имеете потребность периодически запускать какие-либо процессы и следить за их выполнением.


И да, я буду не только рассказывать, но и показывать: в программе много кода, скриншотов и рекомендаций.



Что обычно видишь, когда гуглишь слово Airflow / Wikimedia Commons


Введение


Apache Airflow он прямо как Django:


  • написан на Python,
  • есть отличная админка,
  • неограниченно расширяем,

только лучше, да и сделан совсем для других целей, а именно (как написано до ката):


  • запуск и мониторинг задач на неограниченном количестве машин (сколько вам позволит Celery/Kubernetes и ваша совесть)
  • с динамической генерацией workflow из очень легкого для написания и восприятия Python-кода
  • и возможностью связывать друг с друг любые базы данных и API с помощью как готовых компонентов, так и самодельных плагинов (что делается чрезвычайно просто).

Мы используем Apache Airflow так:


  • собираем данные из различных источников (множество инстансов SQL Server и PostgreSQL, различные API с метриками приложений, даже 1С) в DWH и ODS (у нас это Vertica и Clickhouse).
  • как продвинутый cron, который запускает процессы консолидации данных на ODS, а также следит за их обслуживанием.

До недавнего времени наши потребности покрывал один небольшой сервер на 32 ядрах и 50 GB оперативки. В Airflow при этом работает:


  • более 200 дагов (собственно workflows, в которые мы набили задачки),
  • в каждом в среднем по 70 тасков,
  • запускается это добро (тоже в среднем) раз в час.

А о том, как мы расширялись, я напишу ниже, а сейчас давайте определим ber-задачу, которую мы будем решать:


Есть три исходных SQL Serverа, на каждом по 50 баз данных инстансов одного проекта, соответственно, структура у них одинаковая (почти везде, муа-ха-ха), а значит в каждой есть таблица Orders (благо таблицу с таким названием можно затолкать в любой бизнес). Мы забираем данные, добавляя служебные поля (сервер-источник, база-источник, идентификатор ETL-задачи) и наивным образом бросим их в, скажем, Vertica.

Поехали!


Часть основная, практическая (и немного теоретическая)


Зачем оно нам (и вам)


Когда деревья были большими, а я был простым SQL-щиком в одном российском ритейле, мы шпарили ETL-процессы aka потоки данных с помощью двух доступных нам средств:


  • Informatica Power Center крайне развесистая система, чрезвычайно производительная, со своими железками, собственным версионированием. Использовал я дай бог 1% её возможностей. Почему? Ну, во-первых, этот интерфейс где-то из нулевых психически давил на нас. Во-вторых, эта штуковина заточена под чрезвычайно навороченные процессы, яростное переиспользование компонентов и другие очень-важные-энтерпрайз-фишечки. Про то что стоит она, как крыло Airbus A380/год, мы промолчим.


    Осторожно, скриншот может сделать людям младше 30 немного больно




  • SQL Server Integration Server этим товарищем мы пользовались в своих внутрипроектных потоках. Ну а в самом деле: SQL Server мы уже используем, и не юзать его ETL-тулзы было бы как-то неразумно. Всё в нём в хорошо: и интерфейс красивый, и отчётики выполнения Но не за это мы любим программные продукты, ох не за это. Версионировать его dtsx (который представляет собой XML с перемешивающимися при сохранении нодами) мы можем, а толку? А сделать пакет тасков, который перетащит сотню таблиц с одного сервера на другой? Да что сотню, у вас от двадцати штук отвалится указательный палец, щёлкающий по мышиной кнопке. Но выглядит он, определенно, более модно:




Мы безусловно искали выходы. Дело даже почти дошло до самописного генератора SSIS-пакетов...


а потом меня нашла новая работа. А на ней меня настиг Apache Airflow.


Когда я узнал, что описания ETL-процессов это простой Python-код, я только что не плясал от радости. Вот так потоки данных подверглись версионированию и диффу, а ссыпать таблицы с единой структурой из сотни баз данных в один таргет стало делом Python-кода в полтора-два 13 экрана.


Собираем кластер


Давайте не устраивать совсем уж детский сад, и не говорить тут о совершенно очевидных вещах, вроде установки Airflow, выбранной вами БД, Celery и других дел, описанных в доках.


Чтобы мы могли сразу приступить к экспериментам, я набросал docker-compose.yml в котором:


  • Поднимем собственно Airflow: Scheduler, Webserver. Там же будет крутится Flower для мониторинга Celery-задач (потому что его уже затолкали в apache/airflow:1.10.10-python3.7, а мы и не против);
  • PostgreSQL, в который Airflow будет писать свою служебную информацию (данные планировщика, статистика выполнения и т. д.), а Celery отмечать завершенные таски;
  • Redis, который будет выступать брокером задач для Celery;
  • Celery worker, который и займется непосредственным выполнением задачек.
  • В папку ./dags мы будет складывать наши файлы с описанием дагов. Они будут подхватываться на лету, поэтому передёргивать весь стек после каждого чиха не нужно.

Кое-где код в примерах приведен не полностью (чтобы не загромождать текст), а где-то он модифицируется в процессе. Цельные работающие примеры кода можно посмотреть в репозитории https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml
version: '3.4'x-airflow-config: &airflow-config  AIRFLOW__CORE__DAGS_FOLDER: /dags  AIRFLOW__CORE__EXECUTOR: CeleryExecutor  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow  AIRFLOW__CORE__PARALLELISM: 128  AIRFLOW__CORE__DAG_CONCURRENCY: 16  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflowx-airflow-base: &airflow-base  image: apache/airflow:1.10.10-python3.7  entrypoint: /bin/bash  restart: always  volumes:    - ./dags:/dags    - ./requirements.txt:/requirements.txtservices:  # Redis as a Celery broker  broker:    image: redis:6.0.5-alpine  # DB for the Airflow metadata  airflow-db:    image: postgres:10.13-alpine    environment:      - POSTGRES_USER=airflow      - POSTGRES_PASSWORD=airflow      - POSTGRES_DB=airflow    volumes:      - ./db:/var/lib/postgresql/data  # Main container with Airflow Webserver, Scheduler, Celery Flower  airflow:    <<: *airflow-base    environment:      <<: *airflow-config      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'      AIRFLOW__SCHEDULER__MAX_THREADS: 8      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10    depends_on:      - airflow-db      - broker    command: >      -c " sleep 10 &&           pip install --user -r /requirements.txt &&           /entrypoint initdb &&          (/entrypoint webserver &) &&          (/entrypoint flower &) &&           /entrypoint scheduler"    ports:      # Celery Flower      - 5555:5555      # Airflow Webserver      - 8080:8080  # Celery worker, will be scaled using `--scale=n`  worker:    <<: *airflow-base    environment:      <<: *airflow-config    command: >      -c " sleep 10 &&           pip install --user -r /requirements.txt &&           /entrypoint worker"    depends_on:      - airflow      - airflow-db      - broker

Примечания:


  • В сборке композа я во многом опирался на известный образ puckel/docker-airflow обязательно посмотрите. Может, вам в жизни больше ничего и не понадобится.
  • Все настройки Airflow доступны не только через airflow.cfg, но и через переменные среды (слава разработчикам), чем я злостно воспользовался.
  • Естественно, он не production-ready: я намеренно не ставил heartbeats на контейнеры, не заморачивался с безопасностью. Но минимум, подходящий для наших экспериментиков я сделал.
  • Обратите внимание, что:
    • Папка с дагами должна быть доступна как планировщику, так и воркерам.
    • То же самое касается и всех сторонних библиотек они все должны быть установлены на машины с шедулером и воркерами.

Ну а теперь просто:


$ docker-compose up --scale worker=3

После того, как всё поднимется, можно смотреть на веб-интерфейсы:



Основные понятия


Если вы ничего не поняли во всех этих дагах, то вот краткий словарик:


  • Scheduler самый главный дядька в Airflow, контролирующий, чтобы вкалывали роботы, а не человек: следит за расписанием, обновляет даги, запускает таски.


    Вообще, в старых версиях, у него были проблемы с памятью (нет, не амнезия, а утечки) и в конфигах даже остался легаси-параметр run_duration интервал его перезапуска. Но сейчас всё хорошо.


  • DAG (он же даг) направленный ацикличный граф, но такое определение мало кому что скажет, а по сути это контейнер для взаимодействующих друг с другом тасков (см. ниже) или аналог Package в SSIS и Workflow в Informatica.


    Помимо дагов еще могут быть сабдаги, но мы до них скорее всего не доберёмся.


  • DAG Run инициализированный даг, которому присвоен свой execution_date. Даграны одного дага могут вполне работать параллельно (если вы, конечно, сделали свои таски идемпотентными).


  • Operator это кусочки кода, ответственные за выполнение какого-либо конкретного действия. Есть три типа операторов:


    • action, как например наш любимый PythonOperator, который в силах выполнить любой (валидный) Python-код;
    • transfer, которые перевозят данные с места на место, скажем, MsSqlToHiveTransfer;
    • sensor же позволит реагировать или притормозить дальнейшее выполнение дага до наступления какого-либо события. HttpSensor может дергать указанный эндпойнт, и когда дождется нужный ответ, запустить трансфер GoogleCloudStorageToS3Operator. Пытливый ум спросит: зачем? Ведь можно делать повторы прямо в операторе! А затем, чтобы не забивать пул тасков подвисшими операторами. Сенсор запускается, проверяет и умирает до следующей попытки.

  • Task объявленные операторы вне зависимости от типа и прикрепленные к дагу повышаются до чина таска.


  • Task instance когда генерал-планировщик решил, что таски пора отправлять в бой на исполнители-воркеры (прямо на месте, если мы используем LocalExecutor или на удалённую ноду в случае с CeleryExecutor), он назначает им контекст (т. е. комплект переменных параметров выполнения), разворачивает шаблоны команд или запросов и складывает их в пул.



Генерируем таски


Сперва обозначим общую схему нашего дага, а затем будем всё больше и больше погружаться в детали, потому что мы применяем некоторые нетривиальные решения.


Итак, в простейшем виде подобный даг будет выглядеть так:


from datetime import timedelta, datetimefrom airflow import DAGfrom airflow.operators.python_operator import PythonOperatorfrom commons.datasources import sql_server_dsdag = DAG('orders',          schedule_interval=timedelta(hours=6),          start_date=datetime(2020, 7, 8, 0))def workflow(**context):    print(context)for conn_id, schema in sql_server_ds:    PythonOperator(        task_id=schema,        python_callable=workflow,        provide_context=True,        dag=dag)

Давайте разбираться:


  • Сперва импортируем нужные либы и кое что ещё;
  • sql_server_ds это List[namedtuple[str, str]] с именами коннектов из Airflow Connections и базами данных из которых мы будем забирать нашу табличку;
  • dag объявление нашего дага, которое обязательно должно лежать в globals(), иначе Airflow его не найдет. Дагу также нужно сказать:
    • что его зовут orders это имя потом будет маячить в веб-интерфейсе,
    • что работать он будет, начиная с полуночи восьмого июля,
    • а запускать он должен, примерно каждые 6 часов (для крутых парней здесь вместо timedelta() допустима cron-строка 0 0 0/6 ? * * *, для менее крутых выражение вроде @daily);
  • workflow() будет делать основную работу, но не сейчас. Сейчас мы просто высыпем наш контекст в лог.
  • А теперь простая магия создания тасков:
    • пробегаем по нашим источникам;
    • инициализируем PythonOperator, который будет выполнять нашу пустышку workflow(). Не забывайте указывать уникальное (в рамках дага) имя таска и подвязывать сам даг. Флаг provide_context в свою очередь насыпет в функцию дополнительных аргументов, которые мы бережно соберём с помощью **context.

Пока на этом всё. Что мы получили:


  • новый даг в веб-интерфейсе,
  • полторы сотни тасков, которые будут выполняться параллельно (если то позволят настройки Airflow, Celery и мощности серверов).

Ну, почти получили.



Зависимости кто будет ставить?


Чтобы всё это дело упростить я вкорячил в docker-compose.yml обработку requirements.txt на всех нодах.


Вот теперь понеслась:



Серые квадратики task instances, обработанные планировщиком.


Немного ждем, задачи расхватывают воркеры:



Зеленые, понятное дело, успешно отработавшие. Красные не очень успешно.


Кстати, на нашем проде никакой папки ./dags, синхронизирующейся между машинами нет всё даги лежат в git на нашем Gitlab, а Gitlab CI раскладывает обновления на машины при мёрдже в master.

Немного о Flower


Пока воркеры молотят наши тасочки-пустышки, вспомним про еще один инструмент, который может нам кое-что показать Flower.


Самая первая страничка с суммарной информацией по нодам-воркерам:



Самая насыщенная страничка с задачами, отправившимися в работу:



Самая скучная страничка с состоянием нашего брокера:



Самая яркая страничка с графиками состояния тасков и их временем выполнения:



Догружаем недогруженное


Итак, все таски отработали, можно уносить раненых.



А раненых оказалось немало по тем или иным причинами. В случае правильного использования Airflow вот эти самые квадраты говорят о том, что данные определенно не доехали.


Нужно смотреть лог и перезапускать упавшие task instances.


Жмякнув на любой квадрат, увидим доступные нам действия:



Можно взять, и сделать Clear упавшему. То есть, мы забываем о том, что там что-то завалилось, и тот же самый инстанс таска уйдет планировщику.



Понятно, что делать так мышкой со всеми красными квадратами не очень гуманно не этого мы ждем от Airflow. Естественно, у нас есть оружие массового поражения: Browse/Task Instances



Выберем всё разом и обнулим нажмем правильный пункт:



После очистки наши такси выглядят так (они уже ждут не дождутся, когда шедулер их запланирует):



Соединения, хуки и прочие переменные


Самое время посмотреть на следующий DAG, update_reports.py:


from collections import namedtuplefrom datetime import datetime, timedeltafrom textwrap import dedentfrom airflow import DAGfrom airflow.contrib.operators.vertica_operator import VerticaOperatorfrom airflow.operators.email_operator import EmailOperatorfrom airflow.utils.trigger_rule import TriggerRulefrom commons.operators import TelegramBotSendMessagedag = DAG('update_reports',          start_date=datetime(2020, 6, 7, 6),          schedule_interval=timedelta(days=1),          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})Report = namedtuple('Report', 'source target')reports = [Report(f'{table}_view', table) for table in [    'reports.city_orders',    'reports.client_calls',    'reports.client_rates',    'reports.daily_orders',    'reports.order_duration']]email = EmailOperator(    task_id='email_success', dag=dag,    to='{{ var.value.all_the_kings_men }}',    subject='DWH Reports updated',    html_content=dedent("""Господа хорошие, отчеты обновлены"""),    trigger_rule=TriggerRule.ALL_SUCCESS)tg = TelegramBotSendMessage(    task_id='telegram_fail', dag=dag,    tg_bot_conn_id='tg_main',    chat_id='{{ var.value.failures_chat }}',    message=dedent("""\         Наташ, просыпайся, мы {{ dag.dag_id }} уронили        """),    trigger_rule=TriggerRule.ONE_FAILED)for source, target in reports:    queries = [f"TRUNCATE TABLE {target}",               f"INSERT INTO {target} SELECT * FROM {source}"]    report_update = VerticaOperator(        task_id=target.replace('reports.', ''),        sql=queries, vertica_conn_id='dwh',        task_concurrency=1, dag=dag)    report_update >> [email, tg]

Все ведь когда-нибудь делали обновлялку отчетов? Это снова она: есть список источников, откуда забрать данные; есть список, куда положить; не забываем посигналить, когда всё случилось или сломалось (ну это не про нас, нет).


Давайте снова пройдемся по файлу и посмотрим на новые непонятные штуки:


  • from commons.operators import TelegramBotSendMessage нам ничто не мешает делать свои операторы, чем мы и воспользовались, сделав небольшую обёрточку для отправки сообщений в Разблокированный. (Об этом операторе мы еще поговорим ниже);
  • default_args={} даг может раздавать одни и те же аргументы всем своим операторам;
  • to='{{ var.value.all_the_kings_men }}' поле to у нас будет не захардкоженным, а формируемым динамически с помощью Jinja и переменной со списком email-ов, которую я заботливо положил в Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS условие запуска оператора. В нашем случае, письмо полетит боссам только если все зависимости отработали успешно;
  • tg_bot_conn_id='tg_main' аргументы conn_id принимают в себя идентификаторы соединений, которые мы создаем в Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED сообщения в Telegram улетят только при наличии упавших тасков;
  • task_concurrency=1 запрещаем одновременный запуск нескольких task instances одного таска. В противном случае, мы получим одновременный запуск нескольких VerticaOperator (смотрящих на одну таблицу);
  • report_update >> [email, tg] все VerticaOperator сойдутся в отправке письма и сообщения, вот так:


    Но так как у операторов-нотификаторов стоят разные условия запуска, работать будет только один. В Tree View всё выглядит несколько менее наглядно:



Скажу пару слов о макросах и их друзьях переменных.


Макросы это Jinja-плейсхолдеры, которые могут подставлять разную полезную информацию в аргументы операторов. Например, так:


SELECT    id,    payment_dtm,    payment_type,    client_idFROM orders.paymentsWHERE    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} развернется в содержимое переменной контекста execution_date в формате YYYY-MM-DD: 2020-07-14. Самое приятное, что переменные контекста прибиваются гвоздями к определенному инстансу таска (квадратику в Tree View), и при перезапуске плейсхолдеры раскроются в те же самые значения.


Присвоенные значения можно смотреть с помощью кнопки Rendered на каждом таск-инстансе. Вот так у таска с отправкой письма:



А так у таски с отправкой сообщения:



Полный список встроенных макросов для последней доступной версии доступен здесь: Macros Reference


Более того, с помощью плагинов, мы можем объявлять собственные макросы, но это уже совсем другая история.


Помимо предопределенных штук, мы можем подставлять значения своих переменных (выше в коде я уже этим воспользовался). Создадим в Admin/Variables пару штук:



Всё, можно пользоваться:


TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

В значении может быть скаляр, а может лежать и JSON. В случае JSON-а:


bot_config{    "bot": {        "token": 881hskdfASDA16641,        "name": "Verter"    },    "service": "TG"}

просто используем путь к нужному ключу: {{ var.json.bot_config.bot.token }}.


Скажу буквально одно слово и покажу один скриншот про соединения. Тут всё элементарно: на странице Admin/Connections создаем соединение, складываем туда наши логины/пароли и более специфичные параметры. Вот так:



Пароли можно шифровать (более тщательно, чем в варианте по умолчанию), а можно не указывать тип соединения (как я сделал для tg_main) дело в том, что список типов зашит в моделях Airflow и расширению без влезания в исходники не поддается (если вдруг я чего-то не догуглил прошу меня поправить), но получить креды просто по имени нам ничто не помешает.


А еще можно сделать несколько соединений с одним именем: в таком случае метод BaseHook.get_connection(), который достает нам соединения по имени, будет отдавать случайного из нескольких тёзок (было бы логичнее сделать Round Robin, но оставим это на совести разработчиков Airflow).


Variables и Connections, безусловно, классные средства, но важно не потерять баланс: какие части ваших потоков вы храните собственно в коде, а какие отдаете на хранение Airflow. C одной стороны быстро поменять значение, например, ящик рассылки, может быть удобно через UI. А с другой это всё-таки возврат к мышеклику, от которого мы (я) хотели избавиться.

Работа с соединениями это одна из задач хуков. Вообще хуки Airflow это точки подключения его к сторонним сервисам и библиотекам. К примеру, JiraHook откроет для нас клиент для взаимодействия с Jira (можно задачки подвигать туда-сюда), а с помощью SambaHook можно запушить локальный файл на smb-точку.


Разбираем кастомный оператор


И мы вплотную подобрались к тому, чтобы посмотреть на то, как сделан TelegramBotSendMessage


Код commons/operators.py с собственно оператором:


from typing import Unionfrom airflow.operators import BaseOperatorfrom commons.hooks import TelegramBotHook, TelegramBotclass TelegramBotSendMessage(BaseOperator):    """Send message to chat_id using TelegramBotHook    Example:        >>> TelegramBotSendMessage(        ...     task_id='telegram_fail', dag=dag,        ...     tg_bot_conn_id='tg_bot_default',        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',        ...     message='{{ dag.dag_id }} failed :(',        ...     trigger_rule=TriggerRule.ONE_FAILED)    """    template_fields = ['chat_id', 'message']    def __init__(self,                 chat_id: Union[int, str],                 message: str,                 tg_bot_conn_id: str = 'tg_bot_default',                 *args, **kwargs):        super().__init__(*args, **kwargs)        self._hook = TelegramBotHook(tg_bot_conn_id)        self.client: TelegramBot = self._hook.client        self.chat_id = chat_id        self.message = message    def execute(self, context):        print(f'Send "{self.message}" to the chat {self.chat_id}')        self.client.send_message(chat_id=self.chat_id,                                 message=self.message)

Здесь, как и остальное в Airflow, всё очень просто:


  • Отнаследовались от BaseOperator, который реализует довольно много Airflow-специфичных штук (посмотрите на досуге)
  • Объявили поля template_fields, в которых Jinja будет искать макросы для обработки.
  • Организовали правильные аргументы для __init__(), расставили умолчания, где надо.
  • Об инициализации предка тоже не забыли.
  • Открыли соответствующий хук TelegramBotHook, получили от него объект-клиент.
  • Оверрайднули (переопределили) метод BaseOperator.execute(), который Airfow будет подергивать, когда наступит время запускать оператор в нем мы и реализуем основное действие, на забыв залогироваться. (Логируемся, кстати, прямо в stdout и stderr Airflow всё перехватит, красиво обернет, разложит, куда надо.)

Давайте смотреть, что у нас в commons/hooks.py. Первая часть файлика, с самим хуком:


from typing import Unionfrom airflow.hooks.base_hook import BaseHookfrom requests_toolbelt.sessions import BaseUrlSessionclass TelegramBotHook(BaseHook):    """Telegram Bot API hook    Note: add a connection with empty Conn Type and don't forget    to fill Extra:        {"bot_token": "YOuRAwEsomeBOtToKen"}    """    def __init__(self,                 tg_bot_conn_id='tg_bot_default'):        super().__init__(tg_bot_conn_id)        self.tg_bot_conn_id = tg_bot_conn_id        self.tg_bot_token = None        self.client = None        self.get_conn()    def get_conn(self):        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson        self.tg_bot_token = extra['bot_token']        self.client = TelegramBot(self.tg_bot_token)        return self.client

Я даже не знаю, что тут можно объяснять, просто отмечу важные моменты:


  • Наследуемся, думаем над аргументами в большинстве случаев он будет один: conn_id;
  • Переопределяем стандартные методы: я ограничился get_conn(), в котором я получаю параметры соединения по имени и всего-навсего достаю секцию extra (это поле для JSON), в которую я (по своей же инструкции!) положил токен Telegram-бота: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Создаю экземпляр нашего TelegramBot, отдавая ему уже конкретный токен.

Вот и всё. Получить клиент из хука можно c помощью TelegramBotHook().clent или TelegramBotHook().get_conn().


И вторая часть файлика, в котором я сделать микрообёрточку для Telegram REST API, чтобы не тащить тот же python-telegram-bot ради одного метода sendMessage.


class TelegramBot:    """Telegram Bot API wrapper    Examples:        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)    """    API_ENDPOINT = 'https://api.telegram.org/bot{}/'    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)        self.session = BaseUrlSession(self._base_url)        self.chat_id = chat_id    def send_message(self, message: str, chat_id: Union[int, str] = None):        method = 'sendMessage'        payload = {'chat_id': chat_id or self.chat_id,                   'text': message,                   'parse_mode': 'MarkdownV2'}        response = self.session.post(method, data=payload).json()        if not response.get('ok'):            raise TelegramBotException(response)class TelegramBotException(Exception):    def __init__(self, *args, **kwargs):        super().__init__((args, kwargs))

Правильный путь сложить всё это: TelegramBotSendMessage, TelegramBotHook, TelegramBot в плагин, положить в общедоступный репозиторий, и отдать в Open Source.

Пока мы всё это изучали, наши обновления отчетов успели успешно завалиться и отправить мне в канал сообщение об ошибке. Пойду проверять, что опять не так...



В нашем даге что-то сломалось! А ни этого ли мы ждали? Именно!


Наливать-то будешь?


Чувствуете, что-то я пропустил? Вроде бы обещал данные из SQL Server в Vertica переливать, и тут взял и съехал с темы, негодяй!


Злодеяние это было намеренным, я просто обязан был расшифровать вам кое-какую терминологию. Теперь можно ехать дальше.


План у нас был такой:


  1. Сделать даг
  2. Нагенерить таски
  3. Посмотреть, как всё красиво
  4. Присваивать заливкам номера сессий
  5. Забрать данные из SQL Server
  6. Положить данные в Vertica
  7. Собрать статистику

Итак, чтобы всё это запустить, я сделал маленькое дополнение к нашему docker-compose.yml:


docker-compose.db.yml
version: '3.4'x-mssql-base: &mssql-base  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04  restart: always  environment:    ACCEPT_EULA: Y    MSSQL_PID: Express    SA_PASSWORD: SayThanksToSatiaAt2020    MSSQL_MEMORY_LIMIT_MB: 1024services:  dwh:    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04  mssql_0:    <<: *mssql-base  mssql_1:    <<: *mssql-base  mssql_2:    <<: *mssql-base  mssql_init:    image: mio101/py3-sql-db-client-base    command: python3 ./mssql_init.py    depends_on:      - mssql_0      - mssql_1      - mssql_2    environment:      SA_PASSWORD: SayThanksToSatiaAt2020    volumes:      - ./mssql_init.py:/mssql_init.py      - ./dags/commons/datasources.py:/commons/datasources.py

Там мы поднимаем:


  • Vertica как хост dwh с самыми дефолтными настройками,
  • три экземпляра SQL Server,
  • наполняем базы в последних кое-какими данными (ни в коем случае не заглядывайте в mssql_init.py!)

Запускаем всё добро с помощью чуть более сложной, чем в прошлый раз, команды:


$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Что нагенерировал наш чудорандомайзер, можно, воспользовавшись пунктом Data Profiling/Ad Hoc Query:



Главное, не показывать это аналитикам


Подробно останавливаться на ETL-сессиях я не буду, там всё тривиально: делаем базу, в ней табличку, оборачиваем всё менеджером контекста, и теперь делаем так:


with Session(task_name) as session:    print('Load', session.id, 'started')    # Load worflow    ...    session.successful = True    session.loaded_rows = 15

session.py
from sys import stderrclass Session:    """ETL workflow session    Example:        with Session(task_name) as session:            print(session.id)            session.successful = True            session.loaded_rows = 15            session.comment = 'Well done'    """    def __init__(self, connection, task_name):        self.connection = connection        self.connection.autocommit = True        self._task_name = task_name        self._id = None        self.loaded_rows = None        self.successful = None        self.comment = None    def __enter__(self):        return self.open()    def __exit__(self, exc_type, exc_val, exc_tb):        if any(exc_type, exc_val, exc_tb):            self.successful = False            self.comment = f'{exc_type}: {exc_val}\n{exc_tb}'            print(exc_type, exc_val, exc_tb, file=stderr)        self.close()    def __repr__(self):        return (f'<{self.__class__.__name__} '                 f'id={self.id} '                 f'task_name="{self.task_name}">')    @property    def task_name(self):        return self._task_name    @property    def id(self):        return self._id    def _execute(self, query, *args):        with self.connection.cursor() as cursor:            cursor.execute(query, args)            return cursor.fetchone()[0]    def _create(self):        query = """            CREATE TABLE IF NOT EXISTS sessions (                id          SERIAL       NOT NULL PRIMARY KEY,                task_name   VARCHAR(200) NOT NULL,                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,                finished    TIMESTAMPTZ           DEFAULT current_timestamp,                successful  BOOL,                loaded_rows INT,                comment     VARCHAR(500)            );            """        self._execute(query)    def open(self):        query = """            INSERT INTO sessions (task_name, finished)            VALUES (%s, NULL)            RETURNING id;            """        self._id = self._execute(query, self.task_name)        print(self, 'opened')        return self    def close(self):        if not self._id:            raise SessionClosedError('Session is not open')        query = """            UPDATE sessions            SET                finished    = DEFAULT,                successful  = %s,                loaded_rows = %s,                comment     = %s            WHERE                id = %s            RETURNING id;            """        self._execute(query, self.successful, self.loaded_rows,                      self.comment, self.id)        print(self, 'closed',              ', successful: ', self.successful,              ', Loaded: ', self.loaded_rows,              ', comment:', self.comment)class SessionError(Exception):    passclass SessionClosedError(SessionError):    pass

Настала пора забрать наши данные из наших полутора сотен таблиц. Сделаем это с помощью очень незатейливых строчек:


source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()query = f"""    SELECT         id, start_time, end_time, type, data    FROM dbo.Orders    WHERE        CONVERT(DATE, start_time) = '{dt}'    """df = pd.read_sql_query(query, source_conn)

  1. С помощью хука получим из Airflow pymssql-коннект
  2. В запрос подставим ограничение в виде даты в функцию её подбросит шаблонизатор.
  3. Скармливаем наш запрос pandas, который достанет для нас DataFrame он нам пригодится в дальнейшем.

Я использую подстановку {dt} вместо параметра запроса %s не потому, что я злобный Буратино, а потому что pandas не может совладать с pymssql и подсовывает последнему params: List, хотя тот очень хочет tuple.
Также обратите внимание, что разработчик pymssql решил больше его не поддерживать, и самое время съехать на pyodbc.

Посмотрим, чем Airflow нашпиговал аргументы наших функций:



Если данных не оказалось, то продолжать смысла нет. Но считать заливку успешной тоже странно. Но это и не ошибка. А-а-а, что делать?! А вот что:


if df.empty:    raise AirflowSkipException('No rows to load')

AirflowSkipException скажет Airflow, что ошибки, собственно нет, а таск мы пропускаем. В интерфейсе будет не зеленый и не красный квадратик, а цвета pink.


Подбросим нашим данным несколько колонок:


df['etl_source'] = src_schemadf['etl_id'] = session.iddf['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

А именно:


  • БД, из которой мы забрали заказы,
  • Идентификатор нашей заливающей сессии (она будет разной на каждый таск),
  • Хэш от источника и идентификатора заказа чтобы в конечной базе (где всё ссыпется в одну таблицу) у нас был уникальный идентификатор заказа.

Остался предпоследний шаг: залить всё в Vertica. А, как ни странно, один из самых эффектных эффективных способов сделать это через CSV!


# Export data to CSV bufferbuffer = StringIO()df.to_csv(buffer,          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,          header=False, float_format='%.8f', doublequote=False, escapechar='\\')buffer.seek(0)# Push CSVtarget_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()copy_stmt = f"""    COPY {target_table}({df.columns.to_list()})     FROM STDIN     DELIMITER '|'     ENCLOSED '"'     ABORT ON ERROR     NULL 'NUL'    """cursor = target_conn.cursor()cursor.copy(copy_stmt, buffer)

  1. Мы делаем спецприёмник StringIO.
  2. pandas любезно сложит в него наш DataFrame в виде CSV-строк.
  3. Откроем соединение к нашей любимой Vertica хуком.
  4. А теперь с помощью copy() отправим наши данные прямо в Вертику!

Из драйвера заберем, сколько строчек засыпалось, и скажем менеджеру сессии, что всё ОК:


session.loaded_rows = cursor.rowcountsession.successful = True

Вот и всё.


На проде мы создаем целевую табличку вручную. Здесь же я позволил себе небольшой автомат:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'create_table_query = f"""    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (         id         INT,         start_time TIMESTAMP,         end_time   TIMESTAMP,         type       INT,         data       VARCHAR(32),         etl_source VARCHAR(200),         etl_id     INT,         hash_id    INT PRIMARY KEY     );"""create_table = VerticaOperator(    task_id='create_target',    sql=[create_schema_query,         create_table_query],    vertica_conn_id=target_conn_id,    task_concurrency=1,    dag=dag)

Я с помощью VerticaOperator() создаю схему БД и таблицу (если их еще нет, естественно). Главное, правильно расставить зависимости:

for conn_id, schema in sql_server_ds:    load = PythonOperator(        task_id=schema,        python_callable=workflow,        op_kwargs={            'src_conn_id': conn_id,            'src_schema': schema,            'dt': '{{ ds }}',            'target_conn_id': target_conn_id,            'target_table': f'{target_schema}.{target_table}'},        dag=dag)    create_table >> load

Подводим итоги


Ну вот, сказал мышонок, не правда ли, теперь
Ты убедился, что в лесу я самый страшный зверь?

Джулия Дональдсон, Груффало


Думаю, если бы мы с моими коллегами устроили соревнование: кто быстрее составит и запустит с нуля ETL-процесс: они со своими SSIS и мышкой и я с Airflow А потом бы мы еще сравнили удобство сопровождения Ух, думаю, вы согласитесь, что я обойду их по всем фронтам!


Если же чуть-чуть посерьезнее, то Apache Airflow за счет описания процессов в виде программного кода сделал мою работу гораздо удобнее и приятнее.


Его же неограниченная расширяемость: как в плане плагинов, так и предрасположенность к масштабируемости даёт вам возможность применять Airflow практически в любой области: хоть в полном цикле сбора, подготовки и обработки данных, хоть в запуске ракет (на Марс, конечно же).


Часть заключительная, справочно-информационная


Грабли, которые мы собрали за вас


  • start_date. Да, это уже локальный мемасик. Через главный аргумент дага start_date проходят все. Кратко, если указать в start_date текущую дату, а в schedule_interval один день, то DAG запустится завтра не раньше.


    start_date = datetime(2020, 7, 7, 0, 1, 2)
    

    И больше никаких проблем.


    С ним же связана и еще одна ошибка выполнения: Task is missing the start_date parameter, которая чаще всего говорит о том, что вы забыли привязать к оператору даг.


  • Всё на одной машине. Да, и базы (самого Airflow и нашей обмазки), и веб-сервер, и планировщик, и воркеры. И оно даже работало. Но со временем количество задач у сервисов росло, и когда PostgreSQL стал отдавать ответ по индексу за 20 с вместо 5 мс, мы его взяли и унесли.


  • LocalExecutor. Да, мы сидим на нём до сих пор, и мы уже подошли к краю пропасти. LocalExecutorа нам до сих пор хватало, но сейчас пришла пора расшириться минимум одним воркером, и придется поднапрячься, чтобы переехать на CeleryExecutor. А ввиду того, что с ним можно работать и на одной машиной, то ничего не останавливает от использования Celery даже не сервере, который естественно, никогда не пойдет в прод, чесслово!


  • Неиспользование встроенных средств:


    • Connections для хранения учетных данных сервисов,
    • SLA Misses для реагирования на таски, которые не отработали вовремя,
    • XCom для обмена метаданными (я сказал метаданными!) между тасками дага.

  • Злоупотребление почтой. Ну что тут сказать? Были настроены оповещения на все повторы упавших тасков. Теперь в моём рабочем Gmail >90k писем от Airflow, и веб-морда почты отказывается брать и удалять больше чем по 100 штук за раз.



Больше подводных камней: Apache Airflow Pitfails

Средства ещё большей автоматизации


Для того чтобы нам еще больше работать головой, а не руками, Airflow заготовила для нас вот что:


  • REST API он до сих пор имеет статус Experimental, что не мешает ему работать. С его помощью можно не только получать информацию о дагах и тасках, но остановить/запустить даг, создать DAG Run или пул.


  • CLI через командную строку доступны многие средства, которые не просто неудобны в обращении через WebUI, а вообще отсутствуют. Например:


    • backfill нужен для повторного запуска инстансов тасков.
      Например, пришли аналитики, говорят: А у вас, товарищ, ерунда в данных с 1 по 13 января! Чини-чини-чини-чини!. А ты такой хоба:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
      
    • Обслуживание базы: initdb, resetdb, upgradedb, checkdb.
    • run, который позволяет запустить один инстанс таска, да еще и забить на всё зависимости. Более того, можно запустить его через LocalExecutor, даже если у вас Celery-кластер.
    • Примерно то же самое делает test, только еще и в баз ничего не пишет.
    • connections позволяет массово создавать подключения из шелла.

  • Python API довольно хардкорный способ взаимодействия, который предназначен для плагинов, а не копошения в нём ручёнками. Но кто ж нам помешает пойти в /home/airflow/dags, запустить ipython и начать беспредельничать? Можно, например, экспортировать все подключения таком кодом:


    from airflow import settingsfrom airflow.models import Connectionfields = 'conn_id conn_type host port schema login password extra'.split()session = settings.Session()for conn in session.query(Connection).order_by(Connection.conn_id):  d = {field: getattr(conn, field) for field in fields}  print(conn.conn_id, '=', d)
    

  • Подключение к базе метаданных Airflow. Писать в неё я не рекомендую, а вот доставать состояния тасков для различных специфических метрик можно значительно быстрее и проще, чем через любой из API.


    Скажем, далеко не все наши таски идемпотентны, а могут иногда падать и это нормально. Но несколько завалов это уже подозрительно, и надо бы проверить.


    Осторожно, SQL!
    WITH last_executions AS (SELECT    task_id,    dag_id,    execution_date,    state,        row_number()        OVER (            PARTITION BY task_id, dag_id            ORDER BY execution_date DESC) AS rnFROM public.task_instanceWHERE    execution_date > now() - INTERVAL '2' DAY),failed AS (    SELECT        task_id,        dag_id,        execution_date,        state,        CASE WHEN rn = row_number() OVER (            PARTITION BY task_id, dag_id            ORDER BY execution_date DESC)                 THEN TRUE END AS last_fail_seq    FROM last_executions    WHERE        state IN ('failed', 'up_for_retry'))SELECT    task_id,    dag_id,    count(last_fail_seq)                       AS unsuccessful,    count(CASE WHEN last_fail_seq        AND state = 'failed' THEN 1 END)       AS failed,    count(CASE WHEN last_fail_seq        AND state = 'up_for_retry' THEN 1 END) AS up_for_retryFROM failedGROUP BY    task_id,    dag_idHAVING    count(last_fail_seq) > 0
    



Ссылки


Ну и естественно первые десять ссылок из выдачи гугла содержимое папки Airflow из моих закладок.



И ссылки, задействованные в статье:


Подробнее..

Деньги это всегда технология

21.02.2021 16:09:36 | Автор: admin

Чувствую, что должен написать статью про будущее Биткоина и криптовалют, не часто бывает время, когда людям это интересно. И сразу сообщу главное. Нет вообще никаких причин, почему капитализация Биткоина должна быть меньше капитализации золота. Дальше без картинок философское объяснение этой точки зрения.

Нет вообще никаких причин, почему капитализация Биткоина должна быть меньше капитализации золота. Более того, именно физические свойства золота, а конкретнее, его физические ограничения, дали жизнь такому (новому в масштабе истории денег) явлению как фиатные валюты. Задайте себе вопрос - смогли бы существовать фиатные валюты, если бы золото обладало свойством нематериальной передачи и передачи на любое расстояние? Нет конечно, не смогли бы. Потому что фиатные валюты это и есть изначально долговые расписки под золото - более удобная, "бумажная" форма золота. И только потом деньги получили развитие в виде золотовалютных и иных деривативов, коими сейчас и являются. Если бы золото не было таким плохим и неудобным, то никаких валют не возникло бы вовсе. А сегодня даже бумажные расписки не столь удобны, как записи в системе VISA. Позавчера золото было неудобно переправлять через океан. Вчера было бумагу не удобно переправлять через океан. Сегодня даже записи в базах данных не удобно переправлять через океан, если эта база данных не блокчейн. Вся нынешняя финансовая система с множеством суверенных денег всех сортов мало того что новодел (сколько лет она существует?), так еще и новодел с трехэтажными подпорками и подпорками подпорок. Не верите что финансовая система спроектирована дефективно? По моему, уже никто в этом не сомневается. Посмотрите что происходит. Уже даже акции начинают обладать свойствами денег. Уже сырье начинает частично обладать свойствами денег. Даже не говорю про недвижимость, которая почти сразу стала выполнять роль больших денег. Все это не нормально. При золоте такого не было. При золоте все было понятно - золото обладало свойствами максимальной ликвидности и вдоль и поперек. Не абсолютной ликвидностью, но возле того. А сейчас золото обладает ликвидностью только "в каком то смысле", весьма условном.

Я изучаю рыночную ликвидность ежедневно и могу сказать точно - именно ликвидность определяет цену. Высокая ликвидность - высокая цена, низкая ликвидность - низкая цена. Это факт. Не "динамика" спроса и предложения определяет цену, нет на рынке никакой динамики (и быть не может на сегодняшних скоростях передачи информации). Только ликвидность определяет цену. Миллиард долларов однодолларовыми банкнотами и одной банкнотой обладали бы разной ликвидностью. И точно так же ваш банковский счет с тысячей долларов и с миллиардом долларов обладает разной ликвидностью. Более того, один и тот же миллион долларов выигранный в лотерею или полученный преступным путем также обладает разной ликвидностью и вы это понимаете, когда ведете бизнес честно, уступая в доходах.

И вот к чему я подвожу. Проблема существует. Она существовала до золота и её решали. Она существовала во время золота, её решали. Она существовала во время фиатных бумажных денег, её решали. Ровно та же проблема существует сегодня, во время фиатных электронных денег со всеми службами в виде VISA, Master, SWIFT, UnionPay, WesterUnion, Revolut - все это подпорки того или иного сорта ликвидности. И вот скажите мне после этого, после всей этой истории от ракушек до PayPass, что блокчейн не является финальным решением. Любой человек, который способен мыслить деньги с инженерной точки зрения, которого денежные знаки не заставляют присягнуть богу Меркурию, понимает, что самыми дорогими деньгами будет те, которые лучше всего решают понятную технологическую проблему ликвидности и решают ее на любом масштабе - от билета в метро до космической миссии.

И вот скажите мне, почему блокчейн, или DAG Биткоина, внутри которого будет существовать рынок скорости и вероятности транзакции (которого сегодня нет), не является финальным решением?

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru