Русский
Русский
English
Статистика
Реклама

Apache airflow

Перевод CICD-конвейеры с охватом нескольких кластеров OpenShift

29.04.2021 16:13:12 | Автор: admin

Традиционно конвейеры CI/CD строились поверх физических серверов и виртуальных машин. Контейнерные платформы, вроде Kubernetes и OpenShift, появились на этой сцене гораздо позже. Однако по мере того, как все больше и больше рабочих нагрузок переводится на платформу OpenShift, туда же движутся и конвейеры CI/CD, а конвейерные задания все чаще выполняются внутри контейнеров, работающих на кластере.

Этот пост представляет собой перевод блога нашего друга Алеса Носика, автора блога имени себя ("Helping you navigate the world of Kubernetes"). Девиз Алеса: "Software developer and architect passionate about new technologies, open-source and the DevOps culture. Making the world a better place with software".

Вы можете задать любой вопрос после прочтения Алесу в комментариях. Ну и куда без этого Алес хочет подарить за самые каверзные вопросы набор пешего туриста с картой маршрутов от Red Hat. И шляпу.

В реальном мире компании не ограничиваются одним-единственным кластером OpenShift, а имеют их сразу несколько. Почему? Чтобы запускать рабочие нагрузки как в различных публичных облаках, так и на своих собственных ИТ-мощностях. Либо если организация хранит верность какому-то одному поставщику платформ для того, чтобы эффективно работать в различных регионах мира. А иногда необходимо иметь несколько кластеров и в одном регионе, например, при наличии нескольких зон безопасности, в каждой из которых должен быть свой кластер.

Как видим, у организаций есть масса причин иметь нескольких кластеров OpenShift. Соответственно, конвейеры CI/CD должны уметь работать в таких мультикластерных системах, и ниже мы расскажем, как проектировать такие конвейеры.

CI/CD-конвейер Jenkins

Jenkins это настоящая легенда в мире CI/CD. А ведь когда-то давно он назывался Hudson, впрочем, это совсем уже древняя история. Вернемся к нашей теме и зададимся вопросом, как построить конвейер Jenkins, охватывающий сразу несколько кластеров OpenShift? При проектировании такого конвейера очень важно предусмотреть единую информационную панель, где отображались бы результаты выполнения всех заданий, задействованных в этом конвейере. Если немного подумать, то для этого надо иметь некий главный Jenkins (назовем его мастер-Jenkins), который будет связан с каждым кластером OpenShift. При выполнении конвейера мастер-Jenkins сможет запускать отдельные задания на любом из подключенных кластеров, а журналы вывода заданий будут собираться отправляться на этот мастер-Jenkins, как обычно. Если у нас есть три кластера OpenShift (Dev, Test и Prod), то схема будет выглядеть следующим образом:

Для подключения Jenkins к OpenShift есть замечательный Kubernetes-plugin, с помощью которого мастер-Jenkins сможет создавать эфемерные worker-ы на кластере. Каждому кластеру можно присвоить свою метку узла, чтобы иметь возможность запускать каждый этап конвейера на разных кластерах, просто задавая соответствующую метку. Вот как выглядит простое определение такого конвейера:

stage ('Build') {  node ("dev") {    // running on dev cluster  }}stage ('Test') {  node ("test") {    // running on test cluster  }}stage ('Prod') {  node ("prod") {    // running on prod cluster  }}

В OpenShift есть специальный шаблон для Jenkins, который можно найти в проекте openshift. Этот шаблон позволяет создать мастер-Jenkins, который преднастроен для запуска worker-podов на одном кластере. А дальше придется приложить определенные усилия, чтобы подключить этот мастер-Jenkins к другим кластерам OpenShift, и вся хитрость здесь заключается в настройке сети. Jenkins-овский worker-pod после запуска подключается к мастер-Jenkins. Поэтому нам надо, чтобы мастер-Jenkins был доступен для worker-ов, работающих на любом из наших кластеров OpenShift.

Кроме того, стоит обсудить безопасность. Если мастер-Jenkins может запускать worker-овские pod-ы на OpenShift, то значит он может запускать на этих worker-ах произвольный код. Кластер OpenShift не имеет возможности контролировать, какой код запустит Jenkins-овский worker. Определения заданий управляется Jenkins, поэтому только Jenkins-овские средства контроля доступа решают, можно ли выполнять то или иное задание на том или ином кластере.

Kubernetes-нативный конвейер Tekton

Теперь посмотрим, как организовать мультикластерный конвейер CI/CDВ с помощью Tekton. В отличие от Jenkins, Tekton это Kubernetes-нативное решение: он реализован на основе строительных блоков Kubernetes и тесно интегрирован с Kubernetes. Естественная граница Tekton это кластер. И как тут построить конвейер, который будет охватывать сразу несколько кластеров OpenShift?

Для этого можно скомпоновать схему из несколько конвейеров Tekton. Чтобы объединить несколько конвейеров в один, мы создадим задачу execute-remote-pipeline, которая будет запускать Tekton-конвейер, расположенный на удаленном кластере OpenShift. При выполнении удаленного конвейера эта задача будет подхватывать его вывод. Теперь с помощью этой задачи мы можем объединять несколько конвейеров Tekton на разных кластерах OpenShift и запускать их как один конвейер. Например, на диаграмме ниже показана композиция из трех конвейеров, где каждый из них расположен на своем кластере OpenShift (Dev, Test и Prod):

Выполнение этого конвейера начинается на кластере Dev. Конвейер Dev запустит конвейер Test, который, в свою очередь, запустит конвейер Prod. Объединенные логи можно отследить к окне терминала:

$ tkn pipeline start dev --showlogPipelinerun started: dev-run-bd5fsWaiting for logs to be available...[execute-remote-pipeline : execute-remote-pipeline-step] Logged into "https://api.cluster-affc.sandbox1480.opentlc.com:6443" as "system:serviceaccount:test-pipeline:pipeline-starter" using the token provided.[execute-remote-pipeline : execute-remote-pipeline-step][execute-remote-pipeline : execute-remote-pipeline-step] You have one project on this server: "test-pipeline"[execute-remote-pipeline : execute-remote-pipeline-step][execute-remote-pipeline : execute-remote-pipeline-step] Using project "test-pipeline".[execute-remote-pipeline : execute-remote-pipeline-step] Welcome! See 'oc help' to get started.[execute-remote-pipeline : execute-remote-pipeline-step] [execute-remote-pipeline : execute-remote-pipeline-step] Logged into "https://api.cluster-affc.sandbox1480.opentlc.com:6443" as "system:serviceaccount:prod-pipeline:pipeline-starter" using the token provided.[execute-remote-pipeline : execute-remote-pipeline-step] [execute-remote-pipeline : execute-remote-pipeline-step][execute-remote-pipeline : execute-remote-pipeline-step] [execute-remote-pipeline : execute-remote-pipeline-step] You have one project on this server: "prod-pipeline"[execute-remote-pipeline : execute-remote-pipeline-step] [execute-remote-pipeline : execute-remote-pipeline-step][execute-remote-pipeline : execute-remote-pipeline-step] [execute-remote-pipeline : execute-remote-pipeline-step] Using project "prod-pipeline".[execute-remote-pipeline : execute-remote-pipeline-step] [execute-remote-pipeline : execute-remote-pipeline-step] Welcome! See 'oc help' to get started.[execute-remote-pipeline : execute-remote-pipeline-step] [execute-remote-pipeline : execute-remote-pipeline-step] [prod : prod-step] Running on prod cluster[execute-remote-pipeline : execute-remote-pipeline-step] [execute-remote-pipeline : execute-remote-pipeline-step][execute-remote-pipeline : execute-remote-pipeline-step]

Обратите внимание, что в этом примере показано каскадное выполнение конвейеров Tekton. Есть и способ компоновки последовательное выполнение нескольких удаленных конвейеров.

Прежде чем переходить к заключению, кратко обсудим компоновку конвейеров с точки зрения безопасности. Kubernetes-нативность Tekton означает, что все управление доступом в нем осуществляется через RBAC. Поэтому, чтобы задача, запущенная на локальном кластере (допустим, на кластере Test на нашей схеме), могла запустить конвейер на удаленном кластере (Prod), у нее должны быть соответствующие разрешения, которые задаются на удаленном кластере (Prod). Таким образом, удаленный кластер, работающий в среде более высоко уровня (Prod), может ограничивать доступ для задач, работающие в среде более низкого уровня (Test). Например, Prod может разрешать Test-у запускать только штатные продакшн-конвейеры, поэтому Test не сможет создавать новые конвейеры в кластере Prod.

Заключение

Итак, мы показали, как в Jenkins и Tekton создавать конвейеры CI/CD, охватывающие сразу несколько кластеров OpenShift, обсудив некоторые аспекты их проектирования, безопасности и реализации.

Нет нужды говорить, что контейнеризированные конвейеры будут работать одним и тем же образом на любом кластере OpenShift, неважно, расположен ли он в публичном облаке или в корпоративном дата-центре, что хорошо иллюстрирует преимущества гибридного облака.

Автор: Ales Nosek

13 мая Алес расскажет про Apache Airflow на OpenShift, и там тоже можно будет задавать вопросы, получать подарки и все такое.

Подробнее..

Apache Airflow делаем ETL проще

27.07.2020 12:16:39 | Автор: admin

Привет, я Дмитрий Логвиненко Data Engineer отдела аналитики группы компаний Везёт.


Я расскажу вам о замечательном инструменте для разработки ETL-процессов Apache Airflow. Но Airflow настолько универсален и многогранен, что вам стоит присмотреться к нему даже если вы не занимаетесь потоками данных, а имеете потребность периодически запускать какие-либо процессы и следить за их выполнением.


И да, я буду не только рассказывать, но и показывать: в программе много кода, скриншотов и рекомендаций.



Что обычно видишь, когда гуглишь слово Airflow / Wikimedia Commons


Введение


Apache Airflow он прямо как Django:


  • написан на Python,
  • есть отличная админка,
  • неограниченно расширяем,

только лучше, да и сделан совсем для других целей, а именно (как написано до ката):


  • запуск и мониторинг задач на неограниченном количестве машин (сколько вам позволит Celery/Kubernetes и ваша совесть)
  • с динамической генерацией workflow из очень легкого для написания и восприятия Python-кода
  • и возможностью связывать друг с друг любые базы данных и API с помощью как готовых компонентов, так и самодельных плагинов (что делается чрезвычайно просто).

Мы используем Apache Airflow так:


  • собираем данные из различных источников (множество инстансов SQL Server и PostgreSQL, различные API с метриками приложений, даже 1С) в DWH и ODS (у нас это Vertica и Clickhouse).
  • как продвинутый cron, который запускает процессы консолидации данных на ODS, а также следит за их обслуживанием.

До недавнего времени наши потребности покрывал один небольшой сервер на 32 ядрах и 50 GB оперативки. В Airflow при этом работает:


  • более 200 дагов (собственно workflows, в которые мы набили задачки),
  • в каждом в среднем по 70 тасков,
  • запускается это добро (тоже в среднем) раз в час.

А о том, как мы расширялись, я напишу ниже, а сейчас давайте определим ber-задачу, которую мы будем решать:


Есть три исходных SQL Serverа, на каждом по 50 баз данных инстансов одного проекта, соответственно, структура у них одинаковая (почти везде, муа-ха-ха), а значит в каждой есть таблица Orders (благо таблицу с таким названием можно затолкать в любой бизнес). Мы забираем данные, добавляя служебные поля (сервер-источник, база-источник, идентификатор ETL-задачи) и наивным образом бросим их в, скажем, Vertica.

Поехали!


Часть основная, практическая (и немного теоретическая)


Зачем оно нам (и вам)


Когда деревья были большими, а я был простым SQL-щиком в одном российском ритейле, мы шпарили ETL-процессы aka потоки данных с помощью двух доступных нам средств:


  • Informatica Power Center крайне развесистая система, чрезвычайно производительная, со своими железками, собственным версионированием. Использовал я дай бог 1% её возможностей. Почему? Ну, во-первых, этот интерфейс где-то из нулевых психически давил на нас. Во-вторых, эта штуковина заточена под чрезвычайно навороченные процессы, яростное переиспользование компонентов и другие очень-важные-энтерпрайз-фишечки. Про то что стоит она, как крыло Airbus A380/год, мы промолчим.


    Осторожно, скриншот может сделать людям младше 30 немного больно




  • SQL Server Integration Server этим товарищем мы пользовались в своих внутрипроектных потоках. Ну а в самом деле: SQL Server мы уже используем, и не юзать его ETL-тулзы было бы как-то неразумно. Всё в нём в хорошо: и интерфейс красивый, и отчётики выполнения Но не за это мы любим программные продукты, ох не за это. Версионировать его dtsx (который представляет собой XML с перемешивающимися при сохранении нодами) мы можем, а толку? А сделать пакет тасков, который перетащит сотню таблиц с одного сервера на другой? Да что сотню, у вас от двадцати штук отвалится указательный палец, щёлкающий по мышиной кнопке. Но выглядит он, определенно, более модно:




Мы безусловно искали выходы. Дело даже почти дошло до самописного генератора SSIS-пакетов...


а потом меня нашла новая работа. А на ней меня настиг Apache Airflow.


Когда я узнал, что описания ETL-процессов это простой Python-код, я только что не плясал от радости. Вот так потоки данных подверглись версионированию и диффу, а ссыпать таблицы с единой структурой из сотни баз данных в один таргет стало делом Python-кода в полтора-два 13 экрана.


Собираем кластер


Давайте не устраивать совсем уж детский сад, и не говорить тут о совершенно очевидных вещах, вроде установки Airflow, выбранной вами БД, Celery и других дел, описанных в доках.


Чтобы мы могли сразу приступить к экспериментам, я набросал docker-compose.yml в котором:


  • Поднимем собственно Airflow: Scheduler, Webserver. Там же будет крутится Flower для мониторинга Celery-задач (потому что его уже затолкали в apache/airflow:1.10.10-python3.7, а мы и не против);
  • PostgreSQL, в который Airflow будет писать свою служебную информацию (данные планировщика, статистика выполнения и т. д.), а Celery отмечать завершенные таски;
  • Redis, который будет выступать брокером задач для Celery;
  • Celery worker, который и займется непосредственным выполнением задачек.
  • В папку ./dags мы будет складывать наши файлы с описанием дагов. Они будут подхватываться на лету, поэтому передёргивать весь стек после каждого чиха не нужно.

Кое-где код в примерах приведен не полностью (чтобы не загромождать текст), а где-то он модифицируется в процессе. Цельные работающие примеры кода можно посмотреть в репозитории https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml
version: '3.4'x-airflow-config: &airflow-config  AIRFLOW__CORE__DAGS_FOLDER: /dags  AIRFLOW__CORE__EXECUTOR: CeleryExecutor  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow  AIRFLOW__CORE__PARALLELISM: 128  AIRFLOW__CORE__DAG_CONCURRENCY: 16  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflowx-airflow-base: &airflow-base  image: apache/airflow:1.10.10-python3.7  entrypoint: /bin/bash  restart: always  volumes:    - ./dags:/dags    - ./requirements.txt:/requirements.txtservices:  # Redis as a Celery broker  broker:    image: redis:6.0.5-alpine  # DB for the Airflow metadata  airflow-db:    image: postgres:10.13-alpine    environment:      - POSTGRES_USER=airflow      - POSTGRES_PASSWORD=airflow      - POSTGRES_DB=airflow    volumes:      - ./db:/var/lib/postgresql/data  # Main container with Airflow Webserver, Scheduler, Celery Flower  airflow:    <<: *airflow-base    environment:      <<: *airflow-config      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'      AIRFLOW__SCHEDULER__MAX_THREADS: 8      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10    depends_on:      - airflow-db      - broker    command: >      -c " sleep 10 &&           pip install --user -r /requirements.txt &&           /entrypoint initdb &&          (/entrypoint webserver &) &&          (/entrypoint flower &) &&           /entrypoint scheduler"    ports:      # Celery Flower      - 5555:5555      # Airflow Webserver      - 8080:8080  # Celery worker, will be scaled using `--scale=n`  worker:    <<: *airflow-base    environment:      <<: *airflow-config    command: >      -c " sleep 10 &&           pip install --user -r /requirements.txt &&           /entrypoint worker"    depends_on:      - airflow      - airflow-db      - broker

Примечания:


  • В сборке композа я во многом опирался на известный образ puckel/docker-airflow обязательно посмотрите. Может, вам в жизни больше ничего и не понадобится.
  • Все настройки Airflow доступны не только через airflow.cfg, но и через переменные среды (слава разработчикам), чем я злостно воспользовался.
  • Естественно, он не production-ready: я намеренно не ставил heartbeats на контейнеры, не заморачивался с безопасностью. Но минимум, подходящий для наших экспериментиков я сделал.
  • Обратите внимание, что:
    • Папка с дагами должна быть доступна как планировщику, так и воркерам.
    • То же самое касается и всех сторонних библиотек они все должны быть установлены на машины с шедулером и воркерами.

Ну а теперь просто:


$ docker-compose up --scale worker=3

После того, как всё поднимется, можно смотреть на веб-интерфейсы:



Основные понятия


Если вы ничего не поняли во всех этих дагах, то вот краткий словарик:


  • Scheduler самый главный дядька в Airflow, контролирующий, чтобы вкалывали роботы, а не человек: следит за расписанием, обновляет даги, запускает таски.


    Вообще, в старых версиях, у него были проблемы с памятью (нет, не амнезия, а утечки) и в конфигах даже остался легаси-параметр run_duration интервал его перезапуска. Но сейчас всё хорошо.


  • DAG (он же даг) направленный ацикличный граф, но такое определение мало кому что скажет, а по сути это контейнер для взаимодействующих друг с другом тасков (см. ниже) или аналог Package в SSIS и Workflow в Informatica.


    Помимо дагов еще могут быть сабдаги, но мы до них скорее всего не доберёмся.


  • DAG Run инициализированный даг, которому присвоен свой execution_date. Даграны одного дага могут вполне работать параллельно (если вы, конечно, сделали свои таски идемпотентными).


  • Operator это кусочки кода, ответственные за выполнение какого-либо конкретного действия. Есть три типа операторов:


    • action, как например наш любимый PythonOperator, который в силах выполнить любой (валидный) Python-код;
    • transfer, которые перевозят данные с места на место, скажем, MsSqlToHiveTransfer;
    • sensor же позволит реагировать или притормозить дальнейшее выполнение дага до наступления какого-либо события. HttpSensor может дергать указанный эндпойнт, и когда дождется нужный ответ, запустить трансфер GoogleCloudStorageToS3Operator. Пытливый ум спросит: зачем? Ведь можно делать повторы прямо в операторе! А затем, чтобы не забивать пул тасков подвисшими операторами. Сенсор запускается, проверяет и умирает до следующей попытки.

  • Task объявленные операторы вне зависимости от типа и прикрепленные к дагу повышаются до чина таска.


  • Task instance когда генерал-планировщик решил, что таски пора отправлять в бой на исполнители-воркеры (прямо на месте, если мы используем LocalExecutor или на удалённую ноду в случае с CeleryExecutor), он назначает им контекст (т. е. комплект переменных параметров выполнения), разворачивает шаблоны команд или запросов и складывает их в пул.



Генерируем таски


Сперва обозначим общую схему нашего дага, а затем будем всё больше и больше погружаться в детали, потому что мы применяем некоторые нетривиальные решения.


Итак, в простейшем виде подобный даг будет выглядеть так:


from datetime import timedelta, datetimefrom airflow import DAGfrom airflow.operators.python_operator import PythonOperatorfrom commons.datasources import sql_server_dsdag = DAG('orders',          schedule_interval=timedelta(hours=6),          start_date=datetime(2020, 7, 8, 0))def workflow(**context):    print(context)for conn_id, schema in sql_server_ds:    PythonOperator(        task_id=schema,        python_callable=workflow,        provide_context=True,        dag=dag)

Давайте разбираться:


  • Сперва импортируем нужные либы и кое что ещё;
  • sql_server_ds это List[namedtuple[str, str]] с именами коннектов из Airflow Connections и базами данных из которых мы будем забирать нашу табличку;
  • dag объявление нашего дага, которое обязательно должно лежать в globals(), иначе Airflow его не найдет. Дагу также нужно сказать:
    • что его зовут orders это имя потом будет маячить в веб-интерфейсе,
    • что работать он будет, начиная с полуночи восьмого июля,
    • а запускать он должен, примерно каждые 6 часов (для крутых парней здесь вместо timedelta() допустима cron-строка 0 0 0/6 ? * * *, для менее крутых выражение вроде @daily);
  • workflow() будет делать основную работу, но не сейчас. Сейчас мы просто высыпем наш контекст в лог.
  • А теперь простая магия создания тасков:
    • пробегаем по нашим источникам;
    • инициализируем PythonOperator, который будет выполнять нашу пустышку workflow(). Не забывайте указывать уникальное (в рамках дага) имя таска и подвязывать сам даг. Флаг provide_context в свою очередь насыпет в функцию дополнительных аргументов, которые мы бережно соберём с помощью **context.

Пока на этом всё. Что мы получили:


  • новый даг в веб-интерфейсе,
  • полторы сотни тасков, которые будут выполняться параллельно (если то позволят настройки Airflow, Celery и мощности серверов).

Ну, почти получили.



Зависимости кто будет ставить?


Чтобы всё это дело упростить я вкорячил в docker-compose.yml обработку requirements.txt на всех нодах.


Вот теперь понеслась:



Серые квадратики task instances, обработанные планировщиком.


Немного ждем, задачи расхватывают воркеры:



Зеленые, понятное дело, успешно отработавшие. Красные не очень успешно.


Кстати, на нашем проде никакой папки ./dags, синхронизирующейся между машинами нет всё даги лежат в git на нашем Gitlab, а Gitlab CI раскладывает обновления на машины при мёрдже в master.

Немного о Flower


Пока воркеры молотят наши тасочки-пустышки, вспомним про еще один инструмент, который может нам кое-что показать Flower.


Самая первая страничка с суммарной информацией по нодам-воркерам:



Самая насыщенная страничка с задачами, отправившимися в работу:



Самая скучная страничка с состоянием нашего брокера:



Самая яркая страничка с графиками состояния тасков и их временем выполнения:



Догружаем недогруженное


Итак, все таски отработали, можно уносить раненых.



А раненых оказалось немало по тем или иным причинами. В случае правильного использования Airflow вот эти самые квадраты говорят о том, что данные определенно не доехали.


Нужно смотреть лог и перезапускать упавшие task instances.


Жмякнув на любой квадрат, увидим доступные нам действия:



Можно взять, и сделать Clear упавшему. То есть, мы забываем о том, что там что-то завалилось, и тот же самый инстанс таска уйдет планировщику.



Понятно, что делать так мышкой со всеми красными квадратами не очень гуманно не этого мы ждем от Airflow. Естественно, у нас есть оружие массового поражения: Browse/Task Instances



Выберем всё разом и обнулим нажмем правильный пункт:



После очистки наши такси выглядят так (они уже ждут не дождутся, когда шедулер их запланирует):



Соединения, хуки и прочие переменные


Самое время посмотреть на следующий DAG, update_reports.py:


from collections import namedtuplefrom datetime import datetime, timedeltafrom textwrap import dedentfrom airflow import DAGfrom airflow.contrib.operators.vertica_operator import VerticaOperatorfrom airflow.operators.email_operator import EmailOperatorfrom airflow.utils.trigger_rule import TriggerRulefrom commons.operators import TelegramBotSendMessagedag = DAG('update_reports',          start_date=datetime(2020, 6, 7, 6),          schedule_interval=timedelta(days=1),          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})Report = namedtuple('Report', 'source target')reports = [Report(f'{table}_view', table) for table in [    'reports.city_orders',    'reports.client_calls',    'reports.client_rates',    'reports.daily_orders',    'reports.order_duration']]email = EmailOperator(    task_id='email_success', dag=dag,    to='{{ var.value.all_the_kings_men }}',    subject='DWH Reports updated',    html_content=dedent("""Господа хорошие, отчеты обновлены"""),    trigger_rule=TriggerRule.ALL_SUCCESS)tg = TelegramBotSendMessage(    task_id='telegram_fail', dag=dag,    tg_bot_conn_id='tg_main',    chat_id='{{ var.value.failures_chat }}',    message=dedent("""\         Наташ, просыпайся, мы {{ dag.dag_id }} уронили        """),    trigger_rule=TriggerRule.ONE_FAILED)for source, target in reports:    queries = [f"TRUNCATE TABLE {target}",               f"INSERT INTO {target} SELECT * FROM {source}"]    report_update = VerticaOperator(        task_id=target.replace('reports.', ''),        sql=queries, vertica_conn_id='dwh',        task_concurrency=1, dag=dag)    report_update >> [email, tg]

Все ведь когда-нибудь делали обновлялку отчетов? Это снова она: есть список источников, откуда забрать данные; есть список, куда положить; не забываем посигналить, когда всё случилось или сломалось (ну это не про нас, нет).


Давайте снова пройдемся по файлу и посмотрим на новые непонятные штуки:


  • from commons.operators import TelegramBotSendMessage нам ничто не мешает делать свои операторы, чем мы и воспользовались, сделав небольшую обёрточку для отправки сообщений в Разблокированный. (Об этом операторе мы еще поговорим ниже);
  • default_args={} даг может раздавать одни и те же аргументы всем своим операторам;
  • to='{{ var.value.all_the_kings_men }}' поле to у нас будет не захардкоженным, а формируемым динамически с помощью Jinja и переменной со списком email-ов, которую я заботливо положил в Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS условие запуска оператора. В нашем случае, письмо полетит боссам только если все зависимости отработали успешно;
  • tg_bot_conn_id='tg_main' аргументы conn_id принимают в себя идентификаторы соединений, которые мы создаем в Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED сообщения в Telegram улетят только при наличии упавших тасков;
  • task_concurrency=1 запрещаем одновременный запуск нескольких task instances одного таска. В противном случае, мы получим одновременный запуск нескольких VerticaOperator (смотрящих на одну таблицу);
  • report_update >> [email, tg] все VerticaOperator сойдутся в отправке письма и сообщения, вот так:


    Но так как у операторов-нотификаторов стоят разные условия запуска, работать будет только один. В Tree View всё выглядит несколько менее наглядно:



Скажу пару слов о макросах и их друзьях переменных.


Макросы это Jinja-плейсхолдеры, которые могут подставлять разную полезную информацию в аргументы операторов. Например, так:


SELECT    id,    payment_dtm,    payment_type,    client_idFROM orders.paymentsWHERE    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} развернется в содержимое переменной контекста execution_date в формате YYYY-MM-DD: 2020-07-14. Самое приятное, что переменные контекста прибиваются гвоздями к определенному инстансу таска (квадратику в Tree View), и при перезапуске плейсхолдеры раскроются в те же самые значения.


Присвоенные значения можно смотреть с помощью кнопки Rendered на каждом таск-инстансе. Вот так у таска с отправкой письма:



А так у таски с отправкой сообщения:



Полный список встроенных макросов для последней доступной версии доступен здесь: Macros Reference


Более того, с помощью плагинов, мы можем объявлять собственные макросы, но это уже совсем другая история.


Помимо предопределенных штук, мы можем подставлять значения своих переменных (выше в коде я уже этим воспользовался). Создадим в Admin/Variables пару штук:



Всё, можно пользоваться:


TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

В значении может быть скаляр, а может лежать и JSON. В случае JSON-а:


bot_config{    "bot": {        "token": 881hskdfASDA16641,        "name": "Verter"    },    "service": "TG"}

просто используем путь к нужному ключу: {{ var.json.bot_config.bot.token }}.


Скажу буквально одно слово и покажу один скриншот про соединения. Тут всё элементарно: на странице Admin/Connections создаем соединение, складываем туда наши логины/пароли и более специфичные параметры. Вот так:



Пароли можно шифровать (более тщательно, чем в варианте по умолчанию), а можно не указывать тип соединения (как я сделал для tg_main) дело в том, что список типов зашит в моделях Airflow и расширению без влезания в исходники не поддается (если вдруг я чего-то не догуглил прошу меня поправить), но получить креды просто по имени нам ничто не помешает.


А еще можно сделать несколько соединений с одним именем: в таком случае метод BaseHook.get_connection(), который достает нам соединения по имени, будет отдавать случайного из нескольких тёзок (было бы логичнее сделать Round Robin, но оставим это на совести разработчиков Airflow).


Variables и Connections, безусловно, классные средства, но важно не потерять баланс: какие части ваших потоков вы храните собственно в коде, а какие отдаете на хранение Airflow. C одной стороны быстро поменять значение, например, ящик рассылки, может быть удобно через UI. А с другой это всё-таки возврат к мышеклику, от которого мы (я) хотели избавиться.

Работа с соединениями это одна из задач хуков. Вообще хуки Airflow это точки подключения его к сторонним сервисам и библиотекам. К примеру, JiraHook откроет для нас клиент для взаимодействия с Jira (можно задачки подвигать туда-сюда), а с помощью SambaHook можно запушить локальный файл на smb-точку.


Разбираем кастомный оператор


И мы вплотную подобрались к тому, чтобы посмотреть на то, как сделан TelegramBotSendMessage


Код commons/operators.py с собственно оператором:


from typing import Unionfrom airflow.operators import BaseOperatorfrom commons.hooks import TelegramBotHook, TelegramBotclass TelegramBotSendMessage(BaseOperator):    """Send message to chat_id using TelegramBotHook    Example:        >>> TelegramBotSendMessage(        ...     task_id='telegram_fail', dag=dag,        ...     tg_bot_conn_id='tg_bot_default',        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',        ...     message='{{ dag.dag_id }} failed :(',        ...     trigger_rule=TriggerRule.ONE_FAILED)    """    template_fields = ['chat_id', 'message']    def __init__(self,                 chat_id: Union[int, str],                 message: str,                 tg_bot_conn_id: str = 'tg_bot_default',                 *args, **kwargs):        super().__init__(*args, **kwargs)        self._hook = TelegramBotHook(tg_bot_conn_id)        self.client: TelegramBot = self._hook.client        self.chat_id = chat_id        self.message = message    def execute(self, context):        print(f'Send "{self.message}" to the chat {self.chat_id}')        self.client.send_message(chat_id=self.chat_id,                                 message=self.message)

Здесь, как и остальное в Airflow, всё очень просто:


  • Отнаследовались от BaseOperator, который реализует довольно много Airflow-специфичных штук (посмотрите на досуге)
  • Объявили поля template_fields, в которых Jinja будет искать макросы для обработки.
  • Организовали правильные аргументы для __init__(), расставили умолчания, где надо.
  • Об инициализации предка тоже не забыли.
  • Открыли соответствующий хук TelegramBotHook, получили от него объект-клиент.
  • Оверрайднули (переопределили) метод BaseOperator.execute(), который Airfow будет подергивать, когда наступит время запускать оператор в нем мы и реализуем основное действие, на забыв залогироваться. (Логируемся, кстати, прямо в stdout и stderr Airflow всё перехватит, красиво обернет, разложит, куда надо.)

Давайте смотреть, что у нас в commons/hooks.py. Первая часть файлика, с самим хуком:


from typing import Unionfrom airflow.hooks.base_hook import BaseHookfrom requests_toolbelt.sessions import BaseUrlSessionclass TelegramBotHook(BaseHook):    """Telegram Bot API hook    Note: add a connection with empty Conn Type and don't forget    to fill Extra:        {"bot_token": "YOuRAwEsomeBOtToKen"}    """    def __init__(self,                 tg_bot_conn_id='tg_bot_default'):        super().__init__(tg_bot_conn_id)        self.tg_bot_conn_id = tg_bot_conn_id        self.tg_bot_token = None        self.client = None        self.get_conn()    def get_conn(self):        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson        self.tg_bot_token = extra['bot_token']        self.client = TelegramBot(self.tg_bot_token)        return self.client

Я даже не знаю, что тут можно объяснять, просто отмечу важные моменты:


  • Наследуемся, думаем над аргументами в большинстве случаев он будет один: conn_id;
  • Переопределяем стандартные методы: я ограничился get_conn(), в котором я получаю параметры соединения по имени и всего-навсего достаю секцию extra (это поле для JSON), в которую я (по своей же инструкции!) положил токен Telegram-бота: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Создаю экземпляр нашего TelegramBot, отдавая ему уже конкретный токен.

Вот и всё. Получить клиент из хука можно c помощью TelegramBotHook().clent или TelegramBotHook().get_conn().


И вторая часть файлика, в котором я сделать микрообёрточку для Telegram REST API, чтобы не тащить тот же python-telegram-bot ради одного метода sendMessage.


class TelegramBot:    """Telegram Bot API wrapper    Examples:        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)    """    API_ENDPOINT = 'https://api.telegram.org/bot{}/'    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)        self.session = BaseUrlSession(self._base_url)        self.chat_id = chat_id    def send_message(self, message: str, chat_id: Union[int, str] = None):        method = 'sendMessage'        payload = {'chat_id': chat_id or self.chat_id,                   'text': message,                   'parse_mode': 'MarkdownV2'}        response = self.session.post(method, data=payload).json()        if not response.get('ok'):            raise TelegramBotException(response)class TelegramBotException(Exception):    def __init__(self, *args, **kwargs):        super().__init__((args, kwargs))

Правильный путь сложить всё это: TelegramBotSendMessage, TelegramBotHook, TelegramBot в плагин, положить в общедоступный репозиторий, и отдать в Open Source.

Пока мы всё это изучали, наши обновления отчетов успели успешно завалиться и отправить мне в канал сообщение об ошибке. Пойду проверять, что опять не так...



В нашем даге что-то сломалось! А ни этого ли мы ждали? Именно!


Наливать-то будешь?


Чувствуете, что-то я пропустил? Вроде бы обещал данные из SQL Server в Vertica переливать, и тут взял и съехал с темы, негодяй!


Злодеяние это было намеренным, я просто обязан был расшифровать вам кое-какую терминологию. Теперь можно ехать дальше.


План у нас был такой:


  1. Сделать даг
  2. Нагенерить таски
  3. Посмотреть, как всё красиво
  4. Присваивать заливкам номера сессий
  5. Забрать данные из SQL Server
  6. Положить данные в Vertica
  7. Собрать статистику

Итак, чтобы всё это запустить, я сделал маленькое дополнение к нашему docker-compose.yml:


docker-compose.db.yml
version: '3.4'x-mssql-base: &mssql-base  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04  restart: always  environment:    ACCEPT_EULA: Y    MSSQL_PID: Express    SA_PASSWORD: SayThanksToSatiaAt2020    MSSQL_MEMORY_LIMIT_MB: 1024services:  dwh:    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04  mssql_0:    <<: *mssql-base  mssql_1:    <<: *mssql-base  mssql_2:    <<: *mssql-base  mssql_init:    image: mio101/py3-sql-db-client-base    command: python3 ./mssql_init.py    depends_on:      - mssql_0      - mssql_1      - mssql_2    environment:      SA_PASSWORD: SayThanksToSatiaAt2020    volumes:      - ./mssql_init.py:/mssql_init.py      - ./dags/commons/datasources.py:/commons/datasources.py

Там мы поднимаем:


  • Vertica как хост dwh с самыми дефолтными настройками,
  • три экземпляра SQL Server,
  • наполняем базы в последних кое-какими данными (ни в коем случае не заглядывайте в mssql_init.py!)

Запускаем всё добро с помощью чуть более сложной, чем в прошлый раз, команды:


$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Что нагенерировал наш чудорандомайзер, можно, воспользовавшись пунктом Data Profiling/Ad Hoc Query:



Главное, не показывать это аналитикам


Подробно останавливаться на ETL-сессиях я не буду, там всё тривиально: делаем базу, в ней табличку, оборачиваем всё менеджером контекста, и теперь делаем так:


with Session(task_name) as session:    print('Load', session.id, 'started')    # Load worflow    ...    session.successful = True    session.loaded_rows = 15

session.py
from sys import stderrclass Session:    """ETL workflow session    Example:        with Session(task_name) as session:            print(session.id)            session.successful = True            session.loaded_rows = 15            session.comment = 'Well done'    """    def __init__(self, connection, task_name):        self.connection = connection        self.connection.autocommit = True        self._task_name = task_name        self._id = None        self.loaded_rows = None        self.successful = None        self.comment = None    def __enter__(self):        return self.open()    def __exit__(self, exc_type, exc_val, exc_tb):        if any(exc_type, exc_val, exc_tb):            self.successful = False            self.comment = f'{exc_type}: {exc_val}\n{exc_tb}'            print(exc_type, exc_val, exc_tb, file=stderr)        self.close()    def __repr__(self):        return (f'<{self.__class__.__name__} '                 f'id={self.id} '                 f'task_name="{self.task_name}">')    @property    def task_name(self):        return self._task_name    @property    def id(self):        return self._id    def _execute(self, query, *args):        with self.connection.cursor() as cursor:            cursor.execute(query, args)            return cursor.fetchone()[0]    def _create(self):        query = """            CREATE TABLE IF NOT EXISTS sessions (                id          SERIAL       NOT NULL PRIMARY KEY,                task_name   VARCHAR(200) NOT NULL,                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,                finished    TIMESTAMPTZ           DEFAULT current_timestamp,                successful  BOOL,                loaded_rows INT,                comment     VARCHAR(500)            );            """        self._execute(query)    def open(self):        query = """            INSERT INTO sessions (task_name, finished)            VALUES (%s, NULL)            RETURNING id;            """        self._id = self._execute(query, self.task_name)        print(self, 'opened')        return self    def close(self):        if not self._id:            raise SessionClosedError('Session is not open')        query = """            UPDATE sessions            SET                finished    = DEFAULT,                successful  = %s,                loaded_rows = %s,                comment     = %s            WHERE                id = %s            RETURNING id;            """        self._execute(query, self.successful, self.loaded_rows,                      self.comment, self.id)        print(self, 'closed',              ', successful: ', self.successful,              ', Loaded: ', self.loaded_rows,              ', comment:', self.comment)class SessionError(Exception):    passclass SessionClosedError(SessionError):    pass

Настала пора забрать наши данные из наших полутора сотен таблиц. Сделаем это с помощью очень незатейливых строчек:


source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()query = f"""    SELECT         id, start_time, end_time, type, data    FROM dbo.Orders    WHERE        CONVERT(DATE, start_time) = '{dt}'    """df = pd.read_sql_query(query, source_conn)

  1. С помощью хука получим из Airflow pymssql-коннект
  2. В запрос подставим ограничение в виде даты в функцию её подбросит шаблонизатор.
  3. Скармливаем наш запрос pandas, который достанет для нас DataFrame он нам пригодится в дальнейшем.

Я использую подстановку {dt} вместо параметра запроса %s не потому, что я злобный Буратино, а потому что pandas не может совладать с pymssql и подсовывает последнему params: List, хотя тот очень хочет tuple.
Также обратите внимание, что разработчик pymssql решил больше его не поддерживать, и самое время съехать на pyodbc.

Посмотрим, чем Airflow нашпиговал аргументы наших функций:



Если данных не оказалось, то продолжать смысла нет. Но считать заливку успешной тоже странно. Но это и не ошибка. А-а-а, что делать?! А вот что:


if df.empty:    raise AirflowSkipException('No rows to load')

AirflowSkipException скажет Airflow, что ошибки, собственно нет, а таск мы пропускаем. В интерфейсе будет не зеленый и не красный квадратик, а цвета pink.


Подбросим нашим данным несколько колонок:


df['etl_source'] = src_schemadf['etl_id'] = session.iddf['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

А именно:


  • БД, из которой мы забрали заказы,
  • Идентификатор нашей заливающей сессии (она будет разной на каждый таск),
  • Хэш от источника и идентификатора заказа чтобы в конечной базе (где всё ссыпется в одну таблицу) у нас был уникальный идентификатор заказа.

Остался предпоследний шаг: залить всё в Vertica. А, как ни странно, один из самых эффектных эффективных способов сделать это через CSV!


# Export data to CSV bufferbuffer = StringIO()df.to_csv(buffer,          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,          header=False, float_format='%.8f', doublequote=False, escapechar='\\')buffer.seek(0)# Push CSVtarget_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()copy_stmt = f"""    COPY {target_table}({df.columns.to_list()})     FROM STDIN     DELIMITER '|'     ENCLOSED '"'     ABORT ON ERROR     NULL 'NUL'    """cursor = target_conn.cursor()cursor.copy(copy_stmt, buffer)

  1. Мы делаем спецприёмник StringIO.
  2. pandas любезно сложит в него наш DataFrame в виде CSV-строк.
  3. Откроем соединение к нашей любимой Vertica хуком.
  4. А теперь с помощью copy() отправим наши данные прямо в Вертику!

Из драйвера заберем, сколько строчек засыпалось, и скажем менеджеру сессии, что всё ОК:


session.loaded_rows = cursor.rowcountsession.successful = True

Вот и всё.


На проде мы создаем целевую табличку вручную. Здесь же я позволил себе небольшой автомат:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'create_table_query = f"""    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (         id         INT,         start_time TIMESTAMP,         end_time   TIMESTAMP,         type       INT,         data       VARCHAR(32),         etl_source VARCHAR(200),         etl_id     INT,         hash_id    INT PRIMARY KEY     );"""create_table = VerticaOperator(    task_id='create_target',    sql=[create_schema_query,         create_table_query],    vertica_conn_id=target_conn_id,    task_concurrency=1,    dag=dag)

Я с помощью VerticaOperator() создаю схему БД и таблицу (если их еще нет, естественно). Главное, правильно расставить зависимости:

for conn_id, schema in sql_server_ds:    load = PythonOperator(        task_id=schema,        python_callable=workflow,        op_kwargs={            'src_conn_id': conn_id,            'src_schema': schema,            'dt': '{{ ds }}',            'target_conn_id': target_conn_id,            'target_table': f'{target_schema}.{target_table}'},        dag=dag)    create_table >> load

Подводим итоги


Ну вот, сказал мышонок, не правда ли, теперь
Ты убедился, что в лесу я самый страшный зверь?

Джулия Дональдсон, Груффало


Думаю, если бы мы с моими коллегами устроили соревнование: кто быстрее составит и запустит с нуля ETL-процесс: они со своими SSIS и мышкой и я с Airflow А потом бы мы еще сравнили удобство сопровождения Ух, думаю, вы согласитесь, что я обойду их по всем фронтам!


Если же чуть-чуть посерьезнее, то Apache Airflow за счет описания процессов в виде программного кода сделал мою работу гораздо удобнее и приятнее.


Его же неограниченная расширяемость: как в плане плагинов, так и предрасположенность к масштабируемости даёт вам возможность применять Airflow практически в любой области: хоть в полном цикле сбора, подготовки и обработки данных, хоть в запуске ракет (на Марс, конечно же).


Часть заключительная, справочно-информационная


Грабли, которые мы собрали за вас


  • start_date. Да, это уже локальный мемасик. Через главный аргумент дага start_date проходят все. Кратко, если указать в start_date текущую дату, а в schedule_interval один день, то DAG запустится завтра не раньше.


    start_date = datetime(2020, 7, 7, 0, 1, 2)
    

    И больше никаких проблем.


    С ним же связана и еще одна ошибка выполнения: Task is missing the start_date parameter, которая чаще всего говорит о том, что вы забыли привязать к оператору даг.


  • Всё на одной машине. Да, и базы (самого Airflow и нашей обмазки), и веб-сервер, и планировщик, и воркеры. И оно даже работало. Но со временем количество задач у сервисов росло, и когда PostgreSQL стал отдавать ответ по индексу за 20 с вместо 5 мс, мы его взяли и унесли.


  • LocalExecutor. Да, мы сидим на нём до сих пор, и мы уже подошли к краю пропасти. LocalExecutorа нам до сих пор хватало, но сейчас пришла пора расшириться минимум одним воркером, и придется поднапрячься, чтобы переехать на CeleryExecutor. А ввиду того, что с ним можно работать и на одной машиной, то ничего не останавливает от использования Celery даже не сервере, который естественно, никогда не пойдет в прод, чесслово!


  • Неиспользование встроенных средств:


    • Connections для хранения учетных данных сервисов,
    • SLA Misses для реагирования на таски, которые не отработали вовремя,
    • XCom для обмена метаданными (я сказал метаданными!) между тасками дага.

  • Злоупотребление почтой. Ну что тут сказать? Были настроены оповещения на все повторы упавших тасков. Теперь в моём рабочем Gmail >90k писем от Airflow, и веб-морда почты отказывается брать и удалять больше чем по 100 штук за раз.



Больше подводных камней: Apache Airflow Pitfails

Средства ещё большей автоматизации


Для того чтобы нам еще больше работать головой, а не руками, Airflow заготовила для нас вот что:


  • REST API он до сих пор имеет статус Experimental, что не мешает ему работать. С его помощью можно не только получать информацию о дагах и тасках, но остановить/запустить даг, создать DAG Run или пул.


  • CLI через командную строку доступны многие средства, которые не просто неудобны в обращении через WebUI, а вообще отсутствуют. Например:


    • backfill нужен для повторного запуска инстансов тасков.
      Например, пришли аналитики, говорят: А у вас, товарищ, ерунда в данных с 1 по 13 января! Чини-чини-чини-чини!. А ты такой хоба:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
      
    • Обслуживание базы: initdb, resetdb, upgradedb, checkdb.
    • run, который позволяет запустить один инстанс таска, да еще и забить на всё зависимости. Более того, можно запустить его через LocalExecutor, даже если у вас Celery-кластер.
    • Примерно то же самое делает test, только еще и в баз ничего не пишет.
    • connections позволяет массово создавать подключения из шелла.

  • Python API довольно хардкорный способ взаимодействия, который предназначен для плагинов, а не копошения в нём ручёнками. Но кто ж нам помешает пойти в /home/airflow/dags, запустить ipython и начать беспредельничать? Можно, например, экспортировать все подключения таком кодом:


    from airflow import settingsfrom airflow.models import Connectionfields = 'conn_id conn_type host port schema login password extra'.split()session = settings.Session()for conn in session.query(Connection).order_by(Connection.conn_id):  d = {field: getattr(conn, field) for field in fields}  print(conn.conn_id, '=', d)
    

  • Подключение к базе метаданных Airflow. Писать в неё я не рекомендую, а вот доставать состояния тасков для различных специфических метрик можно значительно быстрее и проще, чем через любой из API.


    Скажем, далеко не все наши таски идемпотентны, а могут иногда падать и это нормально. Но несколько завалов это уже подозрительно, и надо бы проверить.


    Осторожно, SQL!
    WITH last_executions AS (SELECT    task_id,    dag_id,    execution_date,    state,        row_number()        OVER (            PARTITION BY task_id, dag_id            ORDER BY execution_date DESC) AS rnFROM public.task_instanceWHERE    execution_date > now() - INTERVAL '2' DAY),failed AS (    SELECT        task_id,        dag_id,        execution_date,        state,        CASE WHEN rn = row_number() OVER (            PARTITION BY task_id, dag_id            ORDER BY execution_date DESC)                 THEN TRUE END AS last_fail_seq    FROM last_executions    WHERE        state IN ('failed', 'up_for_retry'))SELECT    task_id,    dag_id,    count(last_fail_seq)                       AS unsuccessful,    count(CASE WHEN last_fail_seq        AND state = 'failed' THEN 1 END)       AS failed,    count(CASE WHEN last_fail_seq        AND state = 'up_for_retry' THEN 1 END) AS up_for_retryFROM failedGROUP BY    task_id,    dag_idHAVING    count(last_fail_seq) > 0
    



Ссылки


Ну и естественно первые десять ссылок из выдачи гугла содержимое папки Airflow из моих закладок.



И ссылки, задействованные в статье:


Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru