Русский
Русский
English
Статистика
Реклама

Rabbitmq

Неужели нельзя обойтись без кафок и рэббитов, когда принимаешь 10 000 ивентов в секунду

21.01.2021 20:07:17 | Автор: admin

Однажды я вел вебинар про то, как принимать 10 000 ивентов в секунду. Показал вот такую картинку, зрители увидели сиреневый слой, и началось: Ребят, а зачем нам все эти кафки и рэббиты, неужели без них не обойтись? Мы и ответили: Зачем-зачем, чтобы пройти собес!

Очень смешно, но давайте я все-таки объясню.


Мы можем принимать ивенты сразу в зеленой области и заставить наши приложения писать их в кликхаус.

Но кликхаус любит, когда в него пишут сообщения пачками

Другими словами, в него лучше запихнуть миллион сообщений, вместо того чтобы писать по одному. Kafka, Rabbit или Яндекс.Кью выступают как буфер, и мы можем контролировать с его помощью входящую нагрузку.

Как бывает: в одну секунду пришло 10 тысяч ивентов, в следующую тысяча, в другую 50 тысяч. Это нормально, пользователи рандомно включают свои мобильные приложения. В таком случае в кликхаус напрямую будет заходить то 2 тысячи, то 10 тысяч сообщений. Но с помощью буфера вы можете подкопить сообщения, потом достать из этой копилки миллион ивентов и направить в кликхаус. И вот она желанная стабильная нагрузка на ваш кластер.

Это все история про очереди

Во-первых, очереди можно использовать для передачи сообщений между различными сервисами.

Например, для бэкграунд задач. Вы заходите в админку магазина и генерируете отчет по продажам за год. Задача трудоемкая: нужно прочитать миллионы строк из базы, это хлопотно и очень долго. Если клиент будет висеть постоянно с открытым http-коннектом 5, 10 минут связь может оборваться, и он не получит файл.

Логично выполнить эту задачу асинхронно на фоне. Пользователь нажимает кнопку сгенерировать отчет, ему пишут: Все окей, отчет генерируется, он придет на вашу почту в течение часа. Задача автоматически попадает в очередь сообщений и далее на стол воркера, который выполнит ее и отправит пользователю на ящик.

Второй кейс про кучу микросервисов, которые общаются через шину.

Например, один сервис принимает ивенты от пользователей, передает их в очередь. Следующий сервис вытаскивает ивенты и нормализует их, к примеру, проверяет, чтобы у них был валидный e-mail или телефон. Если все хорошо, он перекладывает сообщение дальше, в следующую очередь, из которой данные будут записываться в базу.

Еще один поинт это падение дата-центра, в котором хостится база.

Конец, ничего не работает. Что будет с сообщениями? Если писать без буфера, сообщения потеряются. Кликхаус недоступен, клиенты отвалились. До какого-то предела выручит память, но с буфером безопаснее вы просто взяли и записали сообщения в очередь (например, в кафку). И сообщения будут храниться там, пока не закончится место или пока их не прочитают и не обработают.


Как автоматически добавлять новые виртуалки при увеличении нагрузки

Чтобы протестить нагрузку, я написал приложение и протестил автоматически масштабируемые группы.

Мы создаем инстанс-группу. Задаем ей имя и указываем сервисный аккаунт. Он будет использоваться для создания виртуалок.

resource "yandex_compute_instance_group" "events-api-ig" {  name               = "events-api-ig"  service_account_id = yandex_iam_service_account.instances.id

Затем указываем шаблон виртуалки. Указываем CPU, память, размер диска и т.д.

instance_template {    platform_id = "standard-v2"    resources {      memory = 2      cores  = 2    }    boot_disk {      mode = "READ_WRITE"      initialize_params {        image_id = data.yandex_compute_image.container-optimized-image.id        size = 10      }

Указываем, к какому сетевому интерфейсу его подрубить.

}    network_interface {      network_id = yandex_vpc_network.internal.id      subnet_ids = [yandex_vpc_subnet.internal-a.id, yandex_vpc_subnet.internal-b.id, yandex_vpc_subnet.internal-c.id]      nat = true    }

Самое интересное это scale_policy.

Можно задать группу фиксированного размера fixed scale с тремя инстансами A, B, C.

scale_policy {    fixed_scale {      size = 3    }  }  allocation_policy {    zones = ["ru-central1-a", "ru-central1-b", "ru-central1-c"]  }

Либо использовать auto_scale тогда группа будет автоматически масштабироваться в зависимости от нагрузки и параметров.

scale_policy {auto_scale {    initial_size = 3    measurment_duration = 60    cpu_utilization_target = 60    min_zone_size = 1    max_size = 6    warmup_duration = 60    stabilization_duration = 180}

Главный параметр, на который надо обратить внимание, это cpu utilization target. Можно выставить значение, при превышении которого Яндекс.Облако автоматически создаст нам новую виртуалку.

Теперь протестируем автомасштабирование при увеличении нагрузки

Наше приложение принимает различные ивенты, проверяет джейсонку и направляет в кафку.

Перед нашей инстанс-группой стоит load-балансер. Он принимает все запросы, которые приходят на адрес 84.201.147.84 на порту 80, и направляет их на нашу инстанс-группу на порт 8080.

У меня есть виртуалка, которая с помощью Yandex.Tank делает тестовую нагрузку. Для теста я установил 20 тысяч запросов в течение 5 минут.


Итак, нагрузка пошла.

Сначала все ноды будут загружены во всех трех зонах (A, B и C), но когда мы превысим нагрузку, Яндекс.Облако должно развернуть дополнительные инстансы.

По логам будет видно, что нагрузка выросла и в каждом регионе количество нод увеличилось до двух. В балансировку добавилась еще одна машина, количество инстансов тоже везде увеличилось.

При этом у меня был интересный момент. Один инстанс, который находится в регионе С, записывал данные (от момента приема данных до записи) за 23 миллисекунды, а у инстанса из региона А было 12,8 миллисекунд. Такое происходит из-за расположения кафки. Кафка находится в регионе А, поэтому в нее записи идут быстрее.

Ставить все инстансы кафки в одном регионе не надо.

Когда добавилась еще одна машина, новая нагрузка спала, показатель CPU вернулся к норме. Полную аналитику по тестовому запуску можно посмотреть по ссылке: overload.yandex.net/256194.


Как написать приложение для работы с очередями и буферами обмена

Приложение написано на golang. Сначала мы импортируем встроенные модули.

package mainimport (    "encoding/json"    "flag"    "io"    "io/ioutil"    "log"    "net/http"    "strings")

Затем подключаем github.com/Shopify/sarama это библиотека для работы с кафкой.

Прописываем github.com/prometheus/client_golang/prometheus, чтобы метрики передавались в API Metrics.

Также подключаем github.com/streadway/amqp для работы с rabbitmq.

Затем следуют параметры бэкендов, в которые мы будем записывать.

var (    // Config options    addr     = flag.String("addr", ":8080", "TCP address to listen to")    kafka    = flag.String("kafka", "127.0.0.1:9092", "Kafka endpoints")    enableKafka    = flag.Bool("enable-kafka", false, "Enable Kafka or not")amqp    = flag.String("amqp", "amqp://guest:guest@127.0.0.1:5672/", "AMQP URI")enableAmqp    = flag.Bool("enable-amqp", false, "Enable AMQP or not")sqsUri    = flag.String("sqs-uri", "", "SQS URI")sqsId    = flag.String("sqs-id", "", SQS Access id")sqsSecret    = flag.String("sqs-secret", "", "SQS Secret key")enableSqs    = flag.Bool("enable-sqs", false, "Enable SQS or not")        // Declaring prometheus metrics    apiDurations = prometheus.NewSummary(        prometheus.SummaryOpts{            Name:       "api_durations_seconds",            Help:       "API duration seconds",            Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},        },    )

Адрес кафки (строка).

Включить кафку или нет поскольку приложение может писать в несколько разных бэкендов.

В приложении реализована возможность работы с тремя очередям.

Первое это кафка.

Второе amqp для рэббита.

И третья очередь sqs для Яндекс.Кью.

Дальше мы открываем и задаем общие глобальные переменные для работы с нашим бэкендом. Прописываем настройки prometheus для отображения и визуализации.

В main мы включаем кафку, рэббит и создаем очередь с названием Load.

И если у нас включен sqs, мы создаем клиент для Яндекс.Кью.

Дальше наше приложение по http принимает несколько инпоинтов:

/status просто отдает okey, это сигнал для load-балансера, что наше приложение работает.

Если вы кидаете запрос на /post/kafka, ваша джейсонка попадет в кафку. Также работают /post/amqp и /post/sqs.

Как работает кафка

Кафка простой, некапризный и очень эффективный инструмент. Если вам нужно быстро принять и сохранить много сообщений, кафка к вашим услугам.

Как-то на одном из проектов важно было уложиться в маленький бюджет. И вот представьте, мы берем самые дешевые машины без SSD (а кафка пишет последовательно и читает последовательно, так что можно не тратиться на дорогие диски), ставим кафку и zookeeper. Наше скромное решение на три ноды спокойно выдерживает нагрузку 200 тысяч сообщений в секунду! Кафка это про поставил и забыл, за пару лет работы кластер ни разу нас не потревожил. И стоил 120 евро в месяц.

Единственное, что нужно запомнить кафка очень требовательна к CPU, и ей очень не нравится, когда кто-то рядом поджирает проц. Если у нее будет сосед под боком, она начнет тормозить.

Кафка устроена так: у вас есть topic, можно сказать, что это название очереди. Каждый topic бьется на части до 50 partitions. Эти партиции размещаются на разных серверах.

Как вы видите на схемке, topic load разбит на 3 партиции. Partition 1 оказывается на Kafka 1, вторая партиция на кафка 2, третья на 3. Тем самым нагрузка полностью распределяется. Когда кластер начинает принимать нагрузку, сообщения пишутся в один топик, а кафка раскидывает их по партициям, гоняет их по кругу. В итоге все ноды нагружаются равномерно.

Можно заморочиться и разбить топик на 50 партиций, поставить 50 серверов и расположить на каждом сервере по 1 партиции нагрузка распределится на 50 нод. И это очень круто.

Партиции могут реплицироваться благодаря zookeeper. Кафке необходимо минимум 3 ноды зукипера. Например, вы хотите, чтобы ваша партиция реплицировались на 2 ноды. Указываете репликейшн фактор 2 и каждая партиция будет закинута 2 раза на рандомные хосты. И если ваша нода упадет, то благодаря зукиперу кафка это увидит: ага, первая нода в дауне, кафка 2 заберет себе первую партицию.

Как я разворачивал кафку с помощью Terraform

В репозитории у нас есть terraform-файл, он называется kafka.tf .

Вначале мы поднимем 3 зукипера: resource yandex compute instance zookeeper count = 3.

Потом находим zookeeper_deploy, который деплоит наш зукипер. Хорошо, если он будет вынесен на отдельные машины, где кроме него ничего нет. Далее собираем айдишники нод и генерируем файл. Запускаем ansible для настройки зукипера.

Кафку поднимаем аналогично зукиперу и, что важно, после него.

Как работает RabbitMQ

Благодаря тому, что кафка по сути просто сохраняет сообщения на диск и по запросу отдает клиенту с нужного места, она очень и очень шустрая. Производительность рэббита значительно ниже, но он напичкан фичами под завязку! Рэббит пытается сделать очень многое, что естественным образом влечет за собой потребление ресурсов.

Рэббит уже не так прост тут вам и exchanges с роутингом, и куча плагинов для delayed messages, deadletter и прочего хлама. За сообщениями следит сам кролик. Как только консьюмер подтвердил обработку сообщения, оно удаляется. Если консьюмер отвалился посередине рэббит вернет сообщение в очередь. В общем, хороший комбайн, когда нужно перекидывать сообщения между сервисами. Цена этого производительность.

Рэббит практически все делает внутри себя. В нем есть много встроенных инструментов, можно подключать разные плагины, много настроек для работы с сообщениями и очередями.

Если вам нужно перекидывать сообщения между сервисами в небольшом количестве ваш выбор однозначно RabbitMQ. Если вам необходимо быстро сохранять кучу событий метрики от клиентов, логи, аналитика и т.д. ваш выбор kafka. Подробнее о сравнении двух инструментов можно прочитать в моей статье.

И еще: рэббиту не нужен зукипер.

Подробнее..

Паттерн сага как способ обеспечения консистентности данных

18.09.2020 14:13:41 | Автор: admin
Всем привет. Уже сейчас в OTUS открывает набор в новую группу курса Highload Architect. В связи с этим я продолжаю серию своих публикаций, написанных специально для этого курса, а также приглашаю вас на свой бесплатный демо урок по теме: Индексы в MySQL: best practices и подводные камни. Записаться на вебинар можно тут.



Введение


Как известно, переход от монолита к микросервисной архитектуре вызывает ряд сложностей, связанных как с технической частью проекта, так и с человеческим фактором. Одной из самых сложных технических проблем вызывает обеспечение согласованности в распределенной системе.

В прошлый раз мы обсудили причины возникновения проблем с согласованностью в микросервисной архитектуре, оптимистичный подход в обеспечению согласованности и обеспечение согласованности с применением двухфазного коммита.

Паттерн Сага


Сага это механизм, обеспечивающий согласованность данных в микросервисной архитектуры без применения распределенных транзакций.

Для каждой системной команды, которой надо обновлять данные в нескольких сервисах, создается некоторая сага. Сага представляет из себя некоторый чек-лист, состоящий из последовательных локальных ACID-транзакций, каждая из которых обновляет данные в одном сервисе. Для обработки сбоев применяется компенсирующая транзакция. Такие транзакции выполняются в случае сбоя на всех сервисах, на которых локальные транзакции выполнились успешно.

Типов транзакций в саге несколько, целых четыре:

  • Компенсирующая отменяет изменение, сделанное локальной транзакцией.
  • Компенсируемая это транзакция, которую необходимо компенсировать (отменить) в случае, если последующие транзакции завершаются неудачей.
  • Поворотная транзакция, опеределяющая успешность всей саги. Если она выполняется успешно, то сага гарантированно дойдет до конца.
  • Повторяемая идет после поворотной и гарантированно завершается успехом.

Организовывать сагу можно с помощью хореографии или оркестрации.

В случае с хореографической саги выделенный оркестратор отсутствует. На примере сервиса заказов и пользователей она может выглядеть так: сервис заказов получает запрос и создает заказ в состоянии PENDING, а затем публикует событие Заказ создан. Обработчик событий в сервисе пользователей обрабатывает данное событие, пытается зарезервировать товар и публикует результат в виде события. Сервис заказов обрабывает данное событие, подтверждая или отменяя заказ в зависимости от прочитанного результата.

Сага с оркестрацией выглядит чуть более интересно. На примере указанных выше сервисов может получиться так: сервис заказов получает запрос, создает сагу, которая создает заказ в состоянии PENDING, а затем отправляет команду резервирования товара для сервиса пользователей. Сервис пользователей пытается зарезервировать товар и отправляет ответное сообщение с указанием результата. Сага одобряет или отменяет заказ.

Паттерн сага позволяет приложению поддерживать согласованность данных между нескольких сервисов без использования распределенных транзакций (двухфазных коммитов) и с избежанием проблем, обсужденных в предыдущей статье. Но с другой стороны, сильно осложняется модель программирования: например, разработчик для каждой транзакции должен писать компенсирующую транзакцию, которая отменяет изменения, сделанные внутри саги ранее.

Сага позволяет добиться ACD-модели (Atomicity + Consistency + Durability в терминах ACID), но одну букву мы потеряли. Недостаток буквы I приводит к известным проблемам недостатка изолированности. К ним относятся: потерянные обновления (lost updates) одна сага перезаписывает изменения, внесенные другой, не читая их при этом, грязное чтение (dirty reads) транзакция или сага читают незавершенные обновления другой саги, нечеткое/неповторяемое чтение (fuzzy/nonrepeatable reads) два разных этапа саги читают одни и те же данные, но получают разные результаты, потому что другая сага внесла изменения. Существует ряд паттернов, позволяющих пофиксить те или иные аномалии: семантическая блокировка, коммутативные обновления, пессимистическое представление, повторное чтение значения, файл изменений и по значению. Вопрос обеспечения изоляции остается открытым.

Еще одна интересная проблема заключается в невозможности атомарных обновления базы данных и публикации сообщения в брокер сообщений для запуска дальнейших шагов саги.

Заключение


Мы поговорили о способах организации саги с применением хореографии и оркестрации, а также о проблемах, которые влечет применения данного паттерна. Далее мы поговорим о способах исправления некоторых аномалий и транзакционной отправки сообщений в брокер сообщений.

Подробнее..

Проблематика распределенных транзакций в контексте микросервисной архитектуры

27.08.2020 10:09:54 | Автор: admin
Всем привет. Уже в сентябре OTUS открывает набор в новую группу курса Highload Architect. В связи с этим я продолжаю серию своих публикаций, написанных специально для этого курса, а также приглашаю вас на свой бесплатный вебинар, в рамках которого я подробно расскажу о программе курса и формате обучения в OTUS. Записаться на вебинар можно тут.



Введение


Как известно, переход от монолита к микросервисной архитектуре вызывает ряд сложностей, связанных как с технической частью проекта, так и с человеческим фактором. Одной из самых сложных технических проблем вызывает обеспечение согласованности в распределенной системе.

Согласованность


Достаточно тонким моментом является то, что согласованность в контексте распределенных систем отличается от согласованности в контексте баз данных. Далее под согласованностью мы будем понимать именно первое: незавершенная (ошибочная) операция не вносит никаких эффектов и не меняет данные, при конкурентном доступе к данным все операции рассматриваются как атомарные (нельзя увидеть промежуточный результат операции), если у данных имеется несколько копий (репликация), то последовательность применения операций на всех копиях одна и та же. То есть на самом деле мы хотим получить ACID транзакцию, но только распределенную.

Причина проблемы


Почему обеспечение согласованности затруднено именно в микросервисной архитектуре? Дело в том, что данный архитектурный стиль зачастую предполагает применение паттерна database per service. Позволю себе напомнить, что этот паттерн заключается в том, что у каждого микросервиса своя независимая база или базы (базы, потому что помимо первичного источника данных может использоваться, например, кеш). Такой подход позволяет с одной стороны не добавлять неявные связи по формату данных между микросервисами (микросервисы взаимодействуют только явно через API), с другой стороны по максимуму использовать такое преимущество микросервисной архитектуры как technology agnostic (мы можем выбирать подходящую под особую нагрузку на микросервис технологию хранения данных). Но при всем при этом мы потеряли гарантию согласованности данных. Посудите сами, монолит общался с одной большой базой, которая предоставляла возможности по обеспечению ACID транзакций. Теперь баз данных стало много, а вместо одной большой ACID транзакции у нас много небольших ACID транзакций. Нашей задачей будет объединение всех этих транзакций в одну распределенную.

Оптимистичная согласованность


Первое что может прийти в голову это концепция оптимистичной согласованности: мы совершаем столько транзакций сколько хотим на столько механизмов хранения сколько нужно. При этом мы ждем, что все будет хорошо, а если все плохо, то говорим, что все будет хорошо в итоге. Если все плохо в итоге, то говорим: Да, такое случается, но с крайне низкой вероятностью.

Если без шуток, то пренебрежение обеспечением согласованности в случае ее некритичности для бизнеса является хорошим решением, особенно если учесть то, каких усилий нам будет стоить ее обеспечение (в чем, как я надеюсь, вы убедитесь несколько позже).

Варианты обеспечения консистентности


Если для бизнеса все-таки согласованность является критичной, можно попытаться ее обеспечить несколькими способами. Если мы говорим о ситуации, когда данные обновляются одним сервисом (например имеет место репликация базы данных), то можно применить стандартные алгоритмы обеспечения консистентности такие как Paxos или Raft. Такие транзакции называются гомогенными. Если данные обновляются несколькими сервисами (то есть имеет место гетерогенная транзакция), то как тут как раз и начинаются сложности, о которых мы говорили выше.

С одной стороны мы можем все-таки обойти необходимость обеспечения распределенной транзакции путем стремления к service-based архитектуры (объединяем сервисы таким образом, чтобы транзакция была гомогенная). Такое решение не очень каноничное с точки зрения принципов микросервисной архитектуры, но зато оно технически намного проще из-за чего часто применяется на практике. С другой стороны мы можем оставить каноничные микросервисы, но при этом применить один из механизмов обеспечения распределенных транзакций: двухфазный коммит или сагу. В этой статье будет изучен первый вариант, а второй мы обсудим в следующий раз.

Двухфазный коммит


Механизм предельно прост: есть некоторый transaction manager, который собственно оркестрирует транзакцию. На первом этапе (prepare) transaction manager подает соответствующую команду для resource manager'ов, по которой они в свои журналы записывают данные, которые будут закоммичены. Получив подтверждение ото всех resource manager'ов об успешном завершении первого этапа, transaction manager начинает второй этап и подает следующую команду (commit), по которой resource manager'ы применяют принятые ранее изменения.

Несмотря на кажущуюся простоту, такой подход обладает рядом недостатков. Во-первых, если хотя бы один resource manager даст сбой на втором этапе, вся транзакция должна быть отменена. Таким образом, нарушается один из принципов микросервисной архитектуры устойчивость к отказам (когда мы приходили к распределенной системе, мы сразу закладывались на то, что отказ в ней является нормой а не исключительной ситуацией). Более того, если отказов будет много (а их будет много), то процесс отмены транзакций необходимо будет автоматизировать (в том числе и писать транзакции, откатывающие транзакции). Во-вторых, сам transaction manager является единой точкой отказа. Он должен уметь транзакционно выдавать id-шники транзакциям. В-третьих, поскольку хранилищу подаются специальные команды, логично предположить, что хранилище должно уметь это делать, то есть соответствовать стандарту XA, а не все современные технологии ему соответствуют (такие брокеры как Kafka, RabbitMQ и NoSQL решения как MongoDB и Cassandra не поддерживают двухфазные коммиты).

Вывод, напрашивающийся из всех этих факторов, был отлично сформулирован Крисом Ричардсоном: 2PC not an option (двухфазный коммит не вариант).

Вывод


Мы разобрались почему распределенные транзакции являются основной технической болью микросервисной архитектуры и поговорили о различных вариантах решения данной задачи, детально обсудили механизм двухфазного коммита.



Приглашаю всех записаться на свой вебинар о курсе, в рамках которого я подробно расскажу о формате обучения и ознакомлю всех желающих с программой обучения.




Читать ещё:


Подробнее..

Оркестратор бесконечных задач

11.01.2021 20:23:30 | Автор: admin

В данной статье мы поговорим отом,как реализовать оркестратор бесконечных задач с использованием очередей. Как конечная цель- нам необходимо реализовать систему, способную управлять задачами с длительным сроком жизни, систему распределённую, где группа задачхостятсяна определенном сервере и в случае отказа этого сервера, задачи автоматически перераспределяютсяна свободные.

В большинстве случаев всяenterpriseразработка сводится к выполнению одних и тех же требований: создается заявка, в зависимости от типа заявки у нее есть какой-то жизненный цикл, по завершению жизни заявки мы получаем (или не получаем) желаемое. Под заявкой мы можем подразумевать все что угодно, начиная с покупки в интернет-магазине товара, денежного перевода или расчета траектории баллистической ракеты. У каждой заявки есть свой жизненный путь и что важно отметить -время жизни, и чем меньше это время, тем лучше. Иными словами, чем быстрее мой банковский перевод осуществится, тем лучше. Требования тоже схожи, побольшеRPCoperationspersecond, поменьшеLatency, система должна быть отказоустойчивой, масштабируемой и должна быть готовавчера. Есть миллион инструментов, сотни баз данных, различные подходы и паттерны. И все уже давно написано, нам остается лишь правильно использовать готовые технологии в наших проектах.

Темаоркестрациизадач не нова, но к моему удивлению, готовых решений по управлению бесконечными задачами (время жизни которых неограниченно велико), с возможностью перераспределения задач по активным серверам, попросту нет. Поэтому реализуем собственное решение . Но обо всем по порядку.


Давайте сначала поймем, что значит бесконечная задача и где в природе такое вообще может встречаться. Бесконечная задача это некий процесс (Job), который выполняет работу до тех пор, пока ему не скажут прекратить это. Аналогию можно провести с бесконечными циклами. В природе же подобное встречается, когда нам нужны наблюдатели, которые следят и реагируют на определённые события. Например: нам необходимо следить за изменениями цен на бирже, повышением или понижением цены актива. Представим нам нужно следить за всеми валютами, всеми активами, на разных биржах, тогда количество наблюдателей может превышать десятки тысяч единиц. Что же из себя может представлять наблюдатель- это может быть отдельноеWebSocketсоединение, которое должно быть постоянноconnected. Этот наблюдатель, может получать данные,денормализовывать, производить расчеты, сохранять и много чего еще. Для удобства, наблюдателем я буду называть неObserverиз известного паттерна, а модуль, который постоянно в работе и бесконечно долго выполняет полезную работу.

Очевидно, нам нужна распределенная система, так как на ноутбуке ее явно не запустишь. Сформулируемтребование длянаших наблюдателей:

  • Наблюдатели должны быть управляемы, то есть мы можем как добавить нового, так и прекратить работу существующего.

  • Наблюдатели должны быть изолированными, работа одного, никак не должна сказываться на работе других.

  • Система должна быть отказоустойчивой, и горизонтально масштабируемой, мы должны иметь возможность распределить всех наблюдателей на разных серверах. Причем при отказе одного сервера, наблюдатели, которые на нем находились, должны перераспределиться на другие, работающие сервера.

  • Сервис/приложение, должно иметь ограничение на количество наблюдателей, которые могут быть в работе и задаваться данное значение должно через файл конфигурации или рассчитываться исходя из мощности сервера (количество ядер, RAM и т.д.), на котором сервис работает.

Упрощенно: имеем N Сервисов, каждый сервис имеет несколько наблюдателей. Пока не будем задаваться вопросом о том, как это все работает и каким образом компоненты взаимодействуют, разберем это чуть позднее.

Статья описана в 3 актах. Все листинги с кодом на С#, но в процессе написания старался уделять меньше внимания примерам с кодом и больше самой идеи. Поэтому листинги должны быть понятны даже тем людям, которые вообще не писали на C# и не знакомы с .Net.

  1. Все естьTask. Тут мы поговорим о теории и некоторых базовых концепциях. Разберем что естьTaskи что общего между таской и наблюдателем.

  2. Schedulers. Возьмем готовое решение, разберем его и проанализируем. Понимание концепции работы планировщика с возможностью запуска задач на удаленных серверах, может нам лучше понять, что будет происходить в третей части.

  3. Очередь, которая думает, что она планировщик. Финальная часть статьи, где мы реализуем системуоркестрациизадач через очереди сообщений. Я использовалRabbitMq, и какFramework-MassTransit, поэтому все примеры будут тесно связаны с данными инструментами. Но принцип будет оставаться тот же.

Всё естьTask

Наш наблюдатель это ни что иное какTask. И что делать если мы хотим запустить и контролировать таску, не дожидаясь получения результата (ведь если наша таска будет работать бесконечно, то она и не даст нам никогда результат).

Рассмотрим на простом примере. Возьмём метод, которыйпишет HelloWord в консольотправляет письмо:

public async Task SendEmailAsync(Email email, CancellationToken token) {    // отправляем письмо }

Чтобы отправить письмо, не дожидаясь получения результата, нам достаточно просто забыть поставитьawaitперед вызовомSendEmailAsync.

foreach (var email in emails {    if(token.IsCancellationRequested)         break;     _emailSender.SendEmailAsync(email, token); //нет await } 

Минусов у данного подхода много:

  • Мы никак не гарантируем выполнение отправки письма.

  • FireAndForgetи как следствие о возникновенииExceptionмы не узнаем.

  • Так же не узнаем и о выполнении.

  • Многие считают, что это грех большой, вообщеантипаттерни я с ними согласен.

Более детально о том почему желательно рано или поздноawait-ить таску, можно почитатьпро async/await антипаттерны.

Наша задача во многом похожа на отправкуemail, только внутри у нас будет подобие бесконечного цикла и метод закончит работу естественным путем только тогда, когда будет вызванCancellationToken. Мы можем, конечно, написать свои костыли, которые позволят нам отслеживать состояние задачи и уведомлять, когда она завершилась. У нее будутRetryPolicyи много чего ещё, но зачем?! Когда есть уже готовые планировщики задач, которые заточены под данные требования.

Schedulers

На .NET есть как минимум два планировщика задач с промежуточным хранением задач в базе данных, поддержкой отложенного выполнения и возможностью распределенного выполнения на разных серверах.

Тутесть неплохое сравнение планировщиков. Больше всего нас интересует возможность иметь неограниченное количествосерверов, (тут может быть недопонимание, сервер это не физическая машина, где выполняется наше приложение, этоinstanceпланировщика)где будут исполнятся наши задачи/Tasks. Лично я отдал предпочтениеHangfire, по большей части из-за хорошо описанной документации и встроенного UI, который позволяет не только отображать метрики по задачам, но и вручную запускать их. Всё это весьма приятные бонусы.

А теперь посмотрим на то, как отправить наше письмо с использованиемHangfire. В этом нам поможет статический методBackgroundJob.Enqueue(Expression<Action>methodCall).

var jobIds = new List<string>(); foreach (var email in emails) {    if(token.IsCancellationRequested)       break;    jobIds.Add(BackgroundJob.Enqueue(       async () => await _emailSender.SendEmailAsync(email, token))); } 

Мы не дожидаемся отправки всех писем, а кладем их в очередь и можем не переживать за выполнение, обо всем позаботиться планировщик. Есть настройкаRetryPolicy, через которую мы можем задать количество повторений вызова метода в случае ошибок. В итоге мы знаем сколько задач было выполнено успешно, сколько с ошибками, сколько времени потребовалось на выполнение каждой.

Но нас же интересует не просто запустить задачу и гарантировать ее исполнение, а иметь возможность запустить ее на другом сервере. Забудем уже про отправку писем и представим, что наш наблюдатель запускается через вызов метода:

_observer.DoWork(observerArg,newCancellationToken())

Мы передаем какие-то аргументы для работы и главное, передаем токен отмены.Для этого нам потребуется указать еще имя очереди в созданномBackgroundJobClient.

var client = new BackgroundJobClient(JobStorage.Current);//задаем имя очереди, где будет хоститься задача.var state = new EnqueuedState(unique-queue-name); client.Create(() =>_observer.DoWork(observerArg,newCancellationToken()), state);

И конечно же мы должны иметь сервис, который займется обработкой данной очереди. В настройках которого будет указано имя той самой очереди-unique-queue-name.

// Настраиваем instance hangfire сервера. _server = new BackgroundJobServer(new BackgroundJobServerOptions() {       WorkerCount = 10,     Queues = new[] { unique-queue-name },     ServerName = _serverOptions.ServerName }); 

WorkerCount- отвечает за то, сколько сервер может одновременно обрабатывать задач. Запомним ее, так как в последствии мы о ней будем много говорить.

Теперь у нас есть возможность запускать любое количество задач на разных серверах, указывая очередь, которую слушает сервис. Пока нам не хватает только одного: возможности мониторинга. Мы должны понимать какой сервер свободен, а какой нет и запускать задачи на свободном. Для этого вHangfireесть статический класс, которой предоставляет все метрики, начиная с того сколько серверов сейчас активно и заканчивая информированием о том сколько раз задача была выпалена с ошибкой.

_monitoringApi = JobStorage.Current.GetMonitoringApi(); 

Наша система с планировщиком теперь будет выглядеть следующим образом:

Observer-service - сервис, который может выполнять одновременно несколько задач, количество задается через конфиг или рассчитывается с учетом количества ядер и мощности сервера (ВHangFilreэтоWorkerCount).

Observer-manager- сервис, который отвечает за... наблюдателей.Валидируетзапросы, решает на каком сервисе будет запущен наблюдатель, а также имеет возможность удалить его. Он знает сколько сейчас доступно сервисов и на сколько каждый из них загружен.

Schedulercommondbпсевдо-очередьи хранилище всей информации по задачам,Hangfireподдерживает какMsSql, такPostgreSqlи дажеRedis.

Отправка задачи в очередь это сохранение в базу данных с предварительнойсериализациейее вместе со значениями входных параметров. Поэтому менеджер и сервис должны иметь доступ к одной сборке с кодом нашего наблюдателя.

Если зависимости, при проектировании системы в общей сборке, имплементированы в сервисе, то их не обязательно копировать со всем функционалом в менеджер, достаточно имплементировать как заглуши, без логики, ведь исполняться они будут на стороне сервиса.

С помощью планировщика мы можем запускать задачи на удаленных сервисах, мониторить состояние задач и останавливать их, когда нам это потребуется. Но поговорим о проблемах, с которым столкнулся и подведем итог с учетом наших изначальных требований.Так же перед использованием планировщика обязательно прочитайте статью оподдержке очередей в Hangfire. Так вот:

1)Общая сборка для менеджера и сервиса. Не могу сказать, что это прям минус, главное помнить это при проектировании системы.

2)Высокая нагрузка на сервер. Каждый сервис опрашивает базу данных на предмет изменений. Можно, конечно, увеличить интервал между запросами, но это ухудшит отклик системы.

3)Добавление задачи в очередь возвращает числовой идентификатор и идентифицировать задачу можно только по нему. Нельзя задать свойcustom-id, например поиск по названию. Поэтому получив идентификатор задачи нужно сохранить связку идентификатор-название в собственное хранилище.

4)В случае ошибки во время исполнения задачи, она автоматически будет перемещена в default очередь. Крайне неприятный момент, о котором узнал уже на этапе тестирования, так как в документации о таком не рассказали. Решается черезjob-filtersили черезатрибуты. Второй вариант делает код более связанным и не подходит, так как значение атрибута не может задаваться динамически.

5)В случае если сервер откажет, задачи, которые на нем исполнялись, не будут перераспределены между работающими. Можно, конечно, реализовать данную логику в менеджере и сделать его ответственным за это, но хотелось бы чтобыframeworkумел это из коробки.

6)Отсутствие транзакционности, ВедьHangfireуниверсален как дляMsSql, так и дляRedis, а в нем транзакции не предусмотрены.

На протяжении всей разработки, меня не покидало ощущение что систему можно реализовать гораздо проще. Некоторые фичи, такие как перераспределение задач, не предусмотрены в планировщике, приходилось обходить ограничение, путем добавления собственных костылей и посему было принято решение реализовать собственное решение.

Очередь, которая думает, что она планировщик

Намучившись, пытаясь использовать верблюда как лошадь, перейдем к описанию того, как это можно сделать через очередь. Но сперва зададимся вопросом. Что мы знаем о планировщиках? Мы знаем, что планировщики это системы, которые выполняют задачи с учетом заданных правил, расписания. А что мы знаем об очередях, шинах данных? Мы используем очереди как транспорт данных, как средство доставки сообщений. Конечно же это все очень абстрактно, и тут можно говорить часами, но пока ограничимся этим. Давай те изменим шаблонное мышление и на время представим, что очередь тоже может быть планировщиком.

Как же сделать из очереди сообщений планировщик задач? Хотя тут корректней был бы термин оркестратор.Весь вышеописанный функционал решается использованием только одной настройки-PrefetchCountи особенностью обработки сообщений.

  • Когда сообщение попадает в очередь оно имеет состояниеReady.

  • КогдаConumerобрабатывает сообщение, оно переходит в состояниеUnacked. И другойConsumerможет взять следующие сообщение из очереди.

  • Если в момент обработки сообщения происходит ошибка, оно помещается в _Errorочередь.

  • Если сообщение после обработки не былоacknowledged, то оно возвращается обратно в очередь и его может прочитать любой другойConsumer.

И теперь главное -PrefetchCountэто количество одновременно обрабатываемых сообщений в очереди, а если сообщение никогда не будет дочитано (бесконечно обрабатывается), то его можно воспринимать какWorkerCount, прям как уHangfire.

Разберем на пальцах:

На данной схеме у нас есть триObserver-services, каждый из них слушает очередь в ожидании поступления сообщения.PrefetchCountу каждого стоит 1. Это значит, что за раз каждый сервис будет обрабатывать одно сообщение. А так как мы знаем, что сообщение это запуск бесконечной задачи, то оно никогда не прочитается и всегда будет в состоянииUnacked.

Дадим команду на создание двух "наблюдателей, таким образом в очереди у нас окажется два сообщения:

Так какObserver-servicesслушают одну и ту же очередь, то сообщения между ними будут распределятся равномерно, черезRound-robin.

  • msg1поступает в очередь. Его начинает обрабатывать один из свободныхконсьюмеров, допустим Observer1. Сообщение переходит в состояниеUnackedи теперь новые сообщения, которые поступят в очередь будут доступны для другихконсьюмеров.

  • msg2поступает в очередь. Observer1у нас уже занят, и поэтому сообщение на обработку достанется всем свободнымконсьюмерам, в данном случае оно достаетсяObserver2.

Давайте теперь представим, что Observer-service1у нас сломался, например он находится на отдельном сервере и сервер вышел из строя (самый популярный контраргумент - а что... если свет вырубили?).

Ошибка произошла не в самомконсьюмере, и поэтому сообщение, по которому не былоacknowledgementпопадает обратно в очередь и переходит из статусаUnackedвReady. Теперь его может считать любой свободныйконсьюмер. Второй у нас занят, первый умер, свободен только третий и поэтому ему задача и достаётся.

Резонным будет замечание - что будет если ошибка произойдет в самомконсьюмере, в процессе обработки сообщения. Оно в таком случае прейдёт в очередь с пометкой _Error, чтобы этого избежать можем настроитьRetryPolicy. И тогда в случае ошибки сообщение не попадет обратно в очередь, аконсьюмерпопытается заново обработать это сообщение.
Правила дляRetryPolicyмогут быть гибкими:

  • Попробовать 1000 раза и положить в очередь ошибок.

  • Попробовать 5 раз с интервалом 1,4,10...минут и потом положить в очередь ошибок.

  • Вообще попробоватьint.MaxValueраз.

Что же мы имеем в итоге? Мы можем иметь абсолютно любое количество наблюдателей, каждый из которых смотрит на одну очередь и каждый обрабатывает свою задачу/сообщение. Мы можем увеличитьPrefetchCount, допустим до 10, и тогда у сервиса будет 10 свободных консьюмеров, которые будут ждать команды на работу. Сервисы можно распределять по разным серверамиесли мы допускаем что кой-то сервер может выйти из строя, нужно просто иметь свободный сервис, который в случае поломки возьмет задачибольного.Например, если у нас есть 10 серверов, мощностей каждого из которых достаточно для обработки 5 наблюдателей, и шанс того, что один процент из них может выйти из строя, нужнозадеплоитьодин 11-ый сервер с той же мощностью, который будет на подстраховку.

А как жеконсистентность? Да и как вообще всем этим управлять? Да, мы можем добавить сообщения в очередь, но как убрать их оттуда... не очищать же очередь вручную?! Тем более, в идеале, наши "наблюдатели" должны закончить свою жизнь естественным образом, то есть через вызовCancellationToken.

И тут нам снова потребуетсяManager. Менеджеру неплохо бы знать об активных сервисах в системе. Это позволит понимать, перед запуском задачи, сколько свободных сервисов и может ли один из них взять в работу новую задачу. Так же это даст возможность отображать сколько их в системе, на сколько каждый из них загружен и какие задачи обрабатывает. Поэтому, когда сервис только поднимается он отправляет сообщение о своем рождении, которое содержит:

  • Id(Идентификатор) -Guidгенерируемый при рождении.

  • Name(Имя), которое мы сами дали ему, когда сервис деплоили, оно уникальное для каждого сервиса.

  • CreatedAt/ModifyAt(Дата создания/Дата изменения).

  • WorkersCount, это будетPrefetchCount- его мощность, сколько он может обрабатывать задач одновременно.

Managerпринимает эти сообщения с делает записи в базу данных о новых активных сервисах.

Id

Name

WorkerCount

CreatedAt

ModifyAt

IsDeleted

{Uniqueid}

Observerservice1

10

{somedate}

null

false

{Uniqueid}

Observerservice2

10

{somedate}

null

false

{Uniqueid}

Observerservice3

10

{somedate}

null

false

И нам не важно работает ли менеджер или мы вообще забыли егозадеплоить. В тот момент, когда он заработает ему сразу придет информация о том, что в системе есть 3 сервиса с такими-то параметрами.

Перед отключением сервисы так же отправят сообщения о том, что они закончили работать и большенедоступны, менеджербудет знать, что теперь на N сервисов у него меньше в строю. Тем самым он сделает пометку в базе данных и проставит каждому удаленному значениеIsDeleted=true.

Есть вероятность с тем, что сервис может не успеть отправить свое последнее сообщение о прекращении работы (Kill9, все тот же свет вырубили). За работоспособность компонентов у нас должна отвечать инфраструктура, напримерDocker. Мы должны быть уверены, если сервис непредвиденно прекратил работу, контейнер пере поднимется и сервис заново начнет работу. В таком случае при рождении, он заново отправит сообщение, но уже с новым идентификатором, но старым именем. Менеджеру достаточно будет данной информации чтобы привести данные в консистентное состояние и понять, что со старым сервисом случилось что-то страшное.

А теперь попробуем создать нового наблюдателя через API. Отправляем команду на создание (Мы должны позаботиться о том, что менеджер в процессе инициализации прочитает все сообщения из Statequeue и будет содержать последние актуальные данные о состоянии сервисов). Менеджер проверяет есть ли наблюдатель с таким именем уже, если нет, он проверяет наличие свободных сервисов, а они пока все свободны, далее он дает команду на создание - кладет сообщение в очередь.

Мы уже знаем, как распределится данное сообщение, оно просто достанется одному из наблюдателей. И тогда, когда он его получит, задача будет запущена и наблюдатель отправит сообщение в очередь состояний о том, что он получил на обработку определенную задачу, указав метаданные задачи и сервиса в рамках которого она работает.

Менеджер при отправке сообщения делает пометку в базу данных, записывая информацию отом,что был создан новый наблюдатель и находится в статусеCreated.

Id

Name

CreatedAt

ModifyAt

ServiceId

Status

{Observerid}

My_new_observer

{created date}

null

null

Created

Менеджер, дождавшись ответа от сервиса, которому досталась задача, изменяет статус наProcessingи связывает задачу с сервисом.

Id

Name

CreatedAt

ModifyAt

ServiceId

Status

{Observerid}

My_new_observer

{created date}

{modifydate}

{Observerservice1id}

Processing

Мы можем получить список всех наблюдателей и узнать кто на каком сервисе работает и в каком статусе находится.

Перечень статусов:

  • Created

  • Processing

  • OnDeleting

  • Deleted

Разберем теперь как удалить "наблюдателя", тут можно пойти двумя путями:

1) направить конкретному сервису сообщение о том, что нужно найти у себя наблюдателя с указанным идентификатором и вызватьCancellationToken.

2) Направить сообщение всем доступным сервисам, черезFanOut.Сервис,у которого есть наблюдатель с нужным идентификатором будет удален, а все остальные сервисы просто проигнорируют это сообщение.

Лично я отдал предпочтение второму варианту, одна из причин это то, что нет необходимости хранить адрес сервиса...но тут как говорится ап ту ю.

КаждыйObserver-serviceимеет свою очередь, где он ожидает получить команду на остановку работы наблюдателя. Когда приходит такая команда, сервис проверяет наличие такого наблюдателя и в случае обнаружения вызываетCancellationToken. Тем самым завершая работу наблюдателя естественным путем.

Пошагово завершение работы Наблюдателя выглядит следующим образом. Пользователь отправляет команду на завершение работы, для этого ему нужно знать лишьidнаблюдателя. Менеджер проверят если ли такой наблюдатель в системе, и его статус.

  • ЕслиCreated, пользователю возвращается ответ что наблюдатель еще не активирован. Из-за гонки условий сообщений об удалении может прийти раньше, чем сервису придет сообщение о запуске наблюдателя.

  • ЕслиOnDeletingилиDeleted, то возвращается ответ - запрос на удаление уже был отправлен или наблюдатель удален, соответственно.

  • ЕслиProcessing, то менеджер переводит наблюдателя в статусOnDeletingи отправляет сообщения на удаление в очередь. Сообщениеброадкаститсявсем сервисам. Сервис, у которого был нужный наблюдатель, вызываетCancellationTokenи оправляет сообщение в statequeue. Менеджер же, получив данное сообщение актуализирует данные и делает пометку переводя изOnDeletingвDeleted.

Id

Name

CreatedAt

ModifyAt

ServiceId

Status

{Observerid}

My_new_observer

{created date}

{modifydate}

{Observerservice1id}

Deleted

Рассмотрим критичные сценарии:

1) Отказала шина данных.

Вся инфраструктура, будь то шина данных или база данных должна находится опосредованно от нашей системы и бытькластеризированной. От себя добавлю следующий тезис, который как бритва Оккама отсечет ряд критичных сценариев -MsSql,RabbitMq,Kafka, дажеKubernetesсюда можно добавить, все это надежные системы, и при соблюдении SLA будут работать без отказа. За спиной у них огромные компании или комьюнити, сотни разработчиков. А вот собственную систему нужно воспринимать как что-то ненадежное, где любой компонент в любой момент времени может выйти из строя.

2) Полныйblackout, везде нет света.

Тогда, когда свет все-таки включат, докер поднимет все контейнеры, а вместе с ними сервисы, каждый сервис сообщает о том, что он заново родился и начинает разбирать из очереди свободные сообщения, запускает наблюдателей, они отправляют метаданные в менеджер. Менеджер актуализирует информацию о наблюдателях, когда они были созданы и на каких сервисах они теперь расположены. (Сообщения записаны на диск, поэтому никуда не делись.)

3) Вылетел один сервер.

Сценарий мы уже описывали выше, "наблюдатели, которые выполнялись на нем просто перераспределятся по всем свободным сервисам. Когда менеджеру придут сообщения от новых "наблюдателей, которые были перераспределены, он запишет на каких сервисах они теперь работают.

4) Отказал менеджер. В процессе того пока он был неактивен, ломались сервера с "наблюдателями.

Сервисы будут класть сообщения в очередь о своем состоянии. Когда менеджер все-таки придет в себя, он примет сразу пачку сообщения, и в конце концов дойдет до консистентного состояния.

5) Попытка удалить конкретного наблюдателя, в том момент, пока он перераспределяется.

Есть маленькое окно, когда пользователь отправляет команду на удаление, наблюдатель может перераспределяться. Тем самым его нет в моменте на конкретном сервисе. И сообщение об удалении может быть проигнорировано сервисами. В таком случае, менеджер, не получив ответа, по истечению времени, отправляет сообщение на удаление, повторно.

Итог

Мы реализовали оркестратор задач, на базе механизма отправки сообщений. Где сообщение это задача, с двумя статусами, в работе -Unacked, и в ожидании работы -Ready. Очередь сама распределяет задачи между исполнителями, делая это событийно, а не черезpollingсостояния, как это делают планировщики. Система масштабируемая - мы можем иметь неограниченно количество "наблюдателей, которые могут быть распределены на разных серверах. Более того мы можем масштабировать как горизонтально, так и вертикально, увеличивая количество одновременно обрабатываемых сервисом задач, просто увеличиваяPrefetchCount. И последнее, время на разработку оказалось меньше, чем время на изучение и внедрение планировщика.

Подробнее..

Перевод Как настроить мультинодовый кластер Airflow с помощью Celery и RabbitMQ

25.01.2021 10:05:20 | Автор: admin

Что такое Airflow?


Apache Airflow это продвинутый workflow менеджер и незаменимый инструмент в арсенале современного дата инженера.


Airflow позволяет создавать рабочие процессы в виде направленных ациклических графов (DAG) задач. Разнообразные служебные программы командной строки выполняют сложные операции на DAG. Пользовательский интерфейс легко визуализирует конвейеры, работающие в производственной среде, отслеживает ход выполнения и при необходимости устраняет неполадки.


Программно создавайте, планируйте и контролируйте рабочий процесс. Он предоставляет функциональную абстракцию в виде идемпотентного DAG (направленного ациклического графа). Функция как служба абстракции для выполнения задач с заданными интервалами.


Кластер с одним узлом Airflow


В одноузловом кластере Airflow все компоненты (рабочий, планировщик, веб-сервер) установлены на одном узле, известном как "Master нода". Чтобы масштабировать кластер с одним узлом, Airflow должен быть настроен в режиме LocalExecutor. Worker берет (pull) задачу из очереди IPC (межпроцессное взаимодействие), это очень хорошо масштабируется до тех пор, пока ресурсы доступны на Master нода. Чтобы масштабировать Airflow на много нод, необходимо включить Celery Executor.



Архитектура с одной нодой Airflow


Мультинодовый кластер Airflow


В мультинодовой архитектуре Airflow его демоны распределены по всем рабочим нодам. Поскольку веб-сервер и планировщик будут установлены на главной ноде, а рабочие будут установлены на каждом отдельном рабочей ноде, поэтому он может хорошо масштабироваться как по горизонтали, так и по вертикали. Чтобы использовать этот режим архитектуры, необходимо настроить Airflow с помощью CeleryExecutor.


Серверную часть Celery необходимо настроить для включения режима CeleryExecutor в архитектуре Airflow. Популярными фреймворками / приложениями для бэкэнда Celery являются Redis и RabbitMQ. RabbitMQ это брокер сообщений. Его задача управлять обменом данными между несколькими службами задач путем управления очередями сообщений. Вместо канала связи IPC, который был бы в архитектуре с одной нодой, RabbitMQ предоставляет модель механизма публикации подписки для обмена сообщениями в разных очередях. Каждая очередь в RabbitMQ опубликована с событиями / сообщениями в виде команд задач, работники Celery будут извлекать команды задач из каждой очереди и выполнять их как действительно распределенные и параллельные способы. Что действительно может ускорить действительно мощное одновременное и параллельное выполнение задач в кластере.



Мультинодовая архитектура Airflow


Celery:


Celery это асинхронная очередь задач, основанная на распределенной передаче сообщений. Он ориентирован на работу в реальном времени, но также поддерживает планирование. Airflow использует его для выполнения нескольких параллельных операций на уровне задач на нескольких рабочих узлах с использованием многопроцессорности и многозадачности. Мультинодовая архитектура Airflow позволяет масштабировать Airflow, легко добавляя новые воркеры.


Устновка мультинодового кластера Airflow и настройка Celery:


Примечание. Мы используем операционную систему CentOS 7 Linux.


  1. Установка RabbitMQ

yum install epel-releaseyum install rabbitmq-server

  1. Включение и запуск RabbitMQ Server

systemctl enable rabbitmq-server.servicesystemctl start rabbitmq-server.service

  1. Включение интерфейса веб-консоли управления RabbitMQ

rabbitmq-plugins enable rabbitmq_management


Номер порта сервера rabbitmq по умолчанию 15672, имя пользователя и пароль по умолчанию для веб-консоли управления admin/admin.



  1. Установка протокола транспорта pyamqp для RabbitMQ и адаптера PostGreSQL

pip install pyamqp

amqp:// это псевдоним, который использует librabbitmq, если он доступен, или py-amqp, если его нет.


Вы должны использовать pyamqp:// или librabbitmq://, если хотите точно указать, какой протокол передачи данных использовать. Протокол pyamqp:// использует библиотеку amqp (http://github.com/celery/py-amqp)


Установка адаптера PostGreSQL: psycopg2


Psycopg это адаптер PostgreSQL для языка программирования Python.


pip install psycopg2

  1. Установка Airflow.

pip install 'apache-airflow[all]'

Проверьте версию airflow


airflow version


Мы используем версию Airflow v1.10.0, рекомендованную и стабильную в настоящее время.


  1. Инициализация базы данных

airflow initdb

После установки и настройки вам необходимо инициализировать базу данных, прежде чем вы сможете запустить группы обеспечения доступности баз данных и ее задачу. Поэтому последние изменения будут отражены в метаданных Airflow из конфигурации.


  1. Установка Celery

Celery должен быть установлен на главной ноде и на всех рабочих нодах.


pip install celery==4.3.0

Проверка версии Celery


celery --version4.3.0 (rhubarb)

  1. Изменение файла airflow.cfg для Celery Executor.

executor = CeleryExecutorsql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/celery_result_backend = db+postgresql://airflow:airflow@{HOSTNAME}/airflow dags_are_paused_at_creation = Trueload_examples = False

После внесения этих изменений в файл конфигурации airflow.cfg необходимо обновить метаданные airflow с помощью команды airflow initdb, а затем перезапустить airflow.


Теперь вы можете запустить веб-сервер airflow с помощью следующей команды


# default port is 8080airflow webserver -p 8000

Вы можете запустить планировщик


# start the schedulerairflow scheduler

Вы также должны запустить airflow на каждом рабочем узле.


airflow worker

Как только вы закончите запускать различные службы airflow, вы можете проверить фантастический интерфейс airflow при помощи команды:


http://<IP-ADDRESS/HOSTNAME>:8000

поскольку мы указали порт 8000 в нашей команде запуска службы веб-сервера, в противном случае номер порта по умолчанию 8080.


Да! Мы закончили создание кластера с мультинодовый архитектурой Airflow. :)

Подробнее..

Гибриды побеждают или холивары дорого

11.01.2021 02:05:19 | Автор: admin

Мотивом для написания данной статьи послужил тот факт, что на habr.com участилось появление материалов маркетингового характера про Apache Kafka. А также тот факт, что из статей складывается впечатление что пишут их немного далекие от реального использования люди это конечно же только впечателение, но почему-то в большинстве своем статьи обязательно содержат сравнение Apache Kafka с RabbitMQ, причем не в пользу последнего. Что самое интересное читая подобные статьи управленцы без технического бэкграунда начинают тратить деньги на внутренние исследования, чтобы ведущие разработчики и технические директора выбрали одно из решений. Так как я очень жадный/домовитый, а также так как я сторонник тезиса "В споре НЕ рождается истина" предлагаю вам ознакомится с другим подходом почти без сравнения разных брокеров.


Без сравнения никуда


Вообще, по правильному, я должен был сделать статью в формате Kafka+RabbitMQ+Nats+ActiveMQ+Mosquito+etc, но мне кажется, что для Вас дорогие читатели это будет перебор, хотя обычно в моих архитектурных решениях присутствуют все вышеуказанные сервисы (и не только). И это я еще не рассказываю про AzureServiceBus/AmazonServiceBus которые также участвуют в "гибридах" при крупных программах проектов. Поэтому пока остановимся на связке Kafka+RabbitMQ и далее вы поймете почему: по аналогии можно подключить любой сервис с его протоколом. Потому что:


сравнивая Apache Kafka и RabbitMQ вы сравниваете 2 (два) бренда, а точнее 2 коммерческие компании Confluent и vmWare, и немножко Apache Software Foundation (но это не компания)

то есть формально при сравнении мы должны сравнивать бизнес-модели компаний которые являются основными драйверами развития наших сегодняшних подоопытных. Так как Хабр все таки не портал экономических исследований, поэтому мы для начала должны вспомнить не бренды, а те описания которые стоят за этими брендами (то как сами себя называют наши сегодняшние участники).


  • RabbitMQ мультипротокольный и расширяемый брокер сообщений
  • Apache Kafka платформа для распределенной потоковой передачи событий
  • Confluent Platform платформа потоковой передачи событий с возможностью создания высокопроизводительных конвейеров обработки данных для целей аналитики и интеграции в бизнес-сценариях

Я не зря третьим пунктом выделяю наработки компании Confluent те кто собирается использовать Apache Kafka в продуктиве должны хотя бы видеть какую функциональность дополнительно добавляет Confluent к Apache Kafka. А это SchemeRegistry, RestProxy, kSQL и еще несколько интересных штук, о одной из которых мы поговорим ниже, она называется Kafka-Connect.


Но вернемся к сравнению внимательный читатель видит, что RabbitMQ сам себя называет брокером сообщений выделяя свою главную фишку "мультипротокольность", а товарищи из экосистемы Kafka почему-то называют себя аж платформой (завышенное самомнение оно такое).


Итак чтобы было совсем понятно, куда я веду.


  • ключевая особенность RabbitMQ мультипротокольность и расширяемость. (основной язык якобы Erlang)
  • ключевая особенность экосистемы Kafka потоковая передача с обработкой (основной язык якобы Scala/Java)

Отсюда и возникают минусы каждого из решений


  • для RabbitMQ мы не сможем построить нормального решения для потоковой обработки. Точнее сможем, но НЕ штатно.
  • а для Kafka мы не сможем сделать мультипротокольность, точнее сможем но НЕ штатно.

Сократ не говорил, что в споре рождается истина


Еще одна новость: действительно если почитать источник, то Сократ вообще-то в итоге пришел к тому, что нужно обеспечить диалог, а если по научному то истина рождается в научном споре, который формально представляет собой процесс публикация со ссылкой на источники -> а затем научная критика оппонентов -> истина


А значит перейдем к ссылкам для начала их будет три. Когда 14 лет назад я совместно с коллегами начинал использовать брокеры сообщений в качестве основы для построения своих интеграционных решений, мы сразу обратили внимание, что фактически с точки зрения "клиента" (конечного приложения), под разные задачи подходят разные протоколы интеграции.


  • ODBC
  • AMQP
  • MSMQ
  • XMPP
  • IP over Avian Carriers

так как тогда наша задача была интегрировать всякое (python, C#, java) и 1С был придуман проект One-S-Connectors (https://code.google.com/archive/p/one-c-connectors/source/default/source). Сейчас он имеет сугубо академический интерес (так как в 1С мире моя персона достаточно известна и на Хабре много 1С специалистов из сообщества "воинствующих 1С-ников" эта ссылка специально для них).
Однако уже тогда (в 2006 году) стало понятно, что по большому счету конечному разработчику придется менять/выбирать протокол под бизнес-задачу. А инфраструктурщикам придется обеспечить максимально широкий спектр интеграционных протоколов. От ODBC до Kafka/NATs/ModBus.


Но вернемся к дню сегодняшнему когда я начал использовать в проектах уровня ГИС (госсударственные информационные системы) различные транспорта данных внезапно выяснилось, что универсальные адаптеры это не только концепт воинствующих 1С-ников, но и соседей. Поэтому многие идеи при внедрении черпались из еще двух интересных проектов



маленькое примечание для менеджеров про Kombu как то так получилось, что имплементация протокола Apache Kafka до сих пор открыта https://github.com/celery/kombu/issues/301 и почему-то перешла в разряд "Дайте денег", поэтому для Python проектов приходится использовать дополнительно https://github.com/confluentinc/confluent-kafka-python

Когда вы дочитаете до этого момента предполагаю, что вы зададите вопрос про остальные языки: Java, GoLang, RUST, etc. Но во первых я не зря выше указал что по серьезному в наш обсуждаемый сегодня гибрид нужно добавить историю про NATs и ActiveMQ и внезапно JMS поэтому просьба дочитать до конца: Java будет, а во вторых мы переходим к еще трем полезным ссылкам



Прокоментируем их? Дело в том, что как бы вы не хотели, а для полноценного использования "в длинную" вам придется подписаться на историю релизов как сервера RabbitMQ и самое главное на те самые расширения (лежат в каталоге /deps) которые постоянно добавляются в ядро RabbitMQ, так и на портал компании Confluent где она публикует приложения полезные для конечного бизнеса использующего Apache Kafka в продуктиве.


подход к расширяемости за счет активируемых расширений также используется в экосистеме PostgreSQL тот который CREATE EXTENSION hypopg, так что подход реализованный компанией Pivotal/vmWare далеко не новый в нашем чудесном мире архитектуры программного обеспечения

Дополнительно на чудесном рынке облачных услуг в формате "Серьезная штука как сервис" есть еще один игрок это компания 84Codes https://github.com/84codes. Когда в рамках проектов внедрения нет нормальных инженеров по инфраструктуре именно 84Codes спасает пилотные проекты, потому как у них можно легко арендовать бесплатные/сильнодешевые контура CloudAMQP и CloudKarafka.


Я как бы обещал, что не буду ничего говорить про деньги, однако придется отразить 2 ключевых момента:


  • компания vmWare зарабатывает известно на чем, поэтому RabbitMQ ей развивается как часть своей платформы то есть они инвестируют в открытый проект не особо занимаясь его монетизацией. Возврат их инвестиций происходит в других местах, ну и также за счет контрибьторов на GitHub.
  • а вот компания Confuent собирается монетизировать свою платформу через Enterprise лицензию в которую включает те самые коннекторы Enterprise-Kafka-Connect, а также GUI для управления платформой.

Когда-то давно существовал https://github.com/jcustenborder/kafka-connect-rabbitmq, примечателен тот факт что товарищ Джереми его скрыл, оставив только свои наработки для Java разработчиков в виде Maven Archetype https://github.com/jcustenborder/kafka-connect-archtype еще раз обращаю Ваше внимание, что компания Confluent будет и дальше пытаться монетизировать свою деятельность, так что переводить всю интеграцию только на Kafka я бы на вашем месте поостерегся.

Поэтому когда вам топят за Kafka учитывайте, что вы либо изучаете Java, либо платите за Enterprise лицензию. А когда вам топят за RabbitMQ учитывайте, что либо вы изучаете системное администрирование (Erlang накладывает особенности системного администрирования), либо покупаете сервис у провайдеров типа 84Codes. Кодить на Erlang никогда не придется там это не нужно, если только вы не контрибьюторы OpenStack.


Поставил и забыл уже не работает


Приближаемся к дальнейшему пониманию. Данный раздел уже будет полезен инфраструктурщикам, хотя и разработчикам важно знать, что в эпоху когда семимильными шагами развивается имплементация ITILv4, для того чтобы перейти от текста и менеджерских хитростей про риски и деньги к реальности нам придется осознать 3 тезиса


  • использование только одного протокола интеграции приводит к появлению ProtocolLock и как следствие к VendorLock я же не зря выше написал, что за каждым открытым продуктом, стоит какой-то ключевой комплект вендоров как они себя поведут: мы не знаем.
  • в мире ИТ больше нет серьезных продуктов, которые бы представляли собой монолитную службу все приложения давно стали композитными.
  • все нормальные вендоры сокращают свои релизные циклы по ключевым продуктам нормальной практикой стало выпускать редакции раз в 3 месяца TDD, BDD, CICD, ScallableAgile и DevOps (DocOps, DevSecOps) эти инженерные практики и методики управления не просто так развиваются. Всем очень хочется сокращать себестоимость и TimeToMarket.

Абзац выше важен, как финальный аккорд, прежде чем мы перейдем к Docker-Compose. А именно к нему я вел чтобы и разработчики и инфраструктурщики понимали что такое гибридная инфраструктура в режиме мультипротокольности (с) нужно сделать так, чтобы каждый мог поэкспериментировать с предлагаемым контуром. Как я уже указал выше первично подобное применительно к Kafka+RabbitMQ было подсмотрено именно у коллег из 84Codes (хорошие ребята всем советую https://www.84codes.com/).


Чтобы вы смогли поэкспериментировать сами


Итак подходим к примерам, так как обоснования и вводных уже хватит. Предположим вы уже поняли, что вам также нужна мультипротокольность, однако мы же помним, что все рекламные материалы про Apache Kafka нам рассказывают что это единственное решение с реализацией exactly-ones доставки сообщений от отправителя получателю. Собственно на самом деле нам и нужен гибрид, чтобы сделать из связки ТочкаОбмена->Очередь журнал Kafka (это тот который Topic) чтобы возникла сущность под называнием Offsets у нашей очереди событий.


exactly-ones

проверка на внимательность читающего exactly-ones это шутка в формате "Хотя бы один раз из 1С", а имеется в виду концепт Exactly once строго однократная доставка сообщений получателю, без необходимости повторной отправки от отправителя.


Предлагаю попробовать. Концепт для проверки Вашими руками будет состоять из:


  • Zookeper
  • KafkaBroker
  • RabbitMQ
  • KafkaConnect

и трех приложений приложений


  • отправитель на Python по протоколу AMQP 0.9
  • получатель на С# по протоколу AMQP 1.0
  • получатель на C# по протоколу Kafka

Еще интересное замечание: когда вы смотрите на всякие обучающие видео по Apache Kafka авторы часто (но не всегда) старательно пишут примеры на Java, это они делают скорее всего для того, чтобы скрыть от вас особенности использования librdkafka C++ библиотеки на основе которой сделаны многие не-джава адаптеры,. Я же наоборот предлагаю вам начинать исследование интеграции с Kafka именно с неё, чтобы четко оценивать риски "куда вы ввязываетесь": очень примечательно что там работает фактически один разработчик, формально в одиночку https://github.com/edenhill/librdkafka/pulse/monthly, а допустим wmWare старается поддерживать свою линейку клиентов под своим брендом https://github.com/rabbitmq

ну и самое главное и тяжелое:


контур содержит открытый форк старого RabbitMQ-Kafka-Sinc-Connector того самого который товарищи из Confluent в своё время скрыли с Github.


Докер контура для экспериментов


Для показательного эксперимента мы сделаем 2 композитных приложения инфраструктурное-трансформационное и непосредственно бизнес-приложения.


Развертываем RabbitMQ и Kafka


контур инфраструктуры который нам понадобится запускается достаточно просто


docker-compose -f dockers/infra.yml up -d

Если вам интересно что же там внутри, нашего композитного приложения, то в конце статьи дается ссылка на полный комплект исходников, наиболее интересен в нем Kafka-UI и непосредственно RabbitMQ-Sinc, все остальное обычно и штатно для всех известных примеров по Kafka или RabbitMQ


    image: provectuslabs/kafka-ui:latest    ports:      - 8080:8080    depends_on:      - kafka-broker      - zookeeper    environment:      KAFKA_CLUSTERS_0_NAME: local      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181      KAFKA_CLUSTERS_0_JMXPORT: 9101

Но самое главное кроется в репозитории Java


    <parent>        <groupId>com.github.jcustenborder.kafka.connect</groupId>        <artifactId>kafka-connect-parent</artifactId>        <version>1.0.0</version>    </parent>

Если подробно изучить pom.xml то выяснится, что существует заглавный проект для всех конекторов к Кафка https://github.com/jcustenborder/kafka-connect-parent, в котором используется Java-Kafka-Adapter


И непосредственно синхронизацией c RMQ занимается штатный Java клиент https://www.rabbitmq.com/java-client.html


            <groupId>com.rabbitmq</groupId>            <artifactId>amqp-client</artifactId>            <version>${rabbitmq.version}</version>

Таким образом по правильному, чтобы получились повторить тот же эксперимент что и у меня, необходимо выполнить:


  • собрать из исходников java синхронизатор -1-build-connect-jar.bat
  • собрать контейнер с синхрозатором 00-build-connect-image.sh

и уже потом запустить полный инфраструктурный контур


  • стартуем полный инфраструктурный контур 01-start-infra.sh

обратите внимание так как Docker использует разное поведение при работе с PWD для Windows и Linux приходится делать дубликаты скриптов. В остальных случаях под обоими операционными системами используется интерпретатор sh

В итоге вы получите следующий комплект сервисов



На картинке можно увидеть как подключаются конфигурационные файлы к RabbitMQ и какая топология сетевых портов у нас будет участвовать в эксперименте:


Назначение портов:


  • 9092 будет использоваться для Kafka протокола
  • 8080 используется для отображения красивой картинки состояния Apache Kafka UI
  • 5672 будет использоваться для протокола AMQP 0.9 и он же будет работать и как AMQP 1.0
  • 15672 используется для красивой картинки управления RabbitMQ
  • 28082 отладочный порт для управления через curl трансформатором протоколов

В этот момент нужно остановится и прокомментировать особенность развертывания RabbitMQ в Docker:


  • хорошей практикой является версионирование включенных плагинов расширений enabled-rmq-plugins

[    rabbitmq_management,     rabbitmq_amqp1_0,     rabbitmq_mqtt,     rabbitmq_federation,     rabbitmq_federation_management,    rabbitmq_shovel,    rabbitmq_shovel_management,    rabbitmq_prometheus].

  • а также в крупных проектах когда нужно передать разработчику преднастроенную топологию точек обмена и очередей, можно и нужно добавлять это в виде конфигурационного файла rmq_definitions.json

     "bindings":[        {           "source":"orders-send",           "vhost":"/",           "destination":"orders-amqp-10-consumer",           "destination_type":"queue",           "routing_key":"",           "arguments":{

Запускаем наши приложения


Остается только запустить наши приложения эмулирующие подключения


docker-compose -f dockers/infra.yml restart protocol-connect-syncdocker-compose -f applications.yml builddocker-compose -f applications.yml up

Топология наших тестовых приложений достаточно простая



Исходный код также максимально упрощён:


  • отправляется как-будто бы заказ Васи с периодичностью в 2 секунды

        producer = conn.Producer(serializer='json')        producer.publish({'client': 'Вася', 'count': 10, 'good': 'АйФончик'},                      exchange=order_exchange,                      declare=[kafka_queue, amqp10_queue])        time.sleep(2)

RUN python -m pip install \    kombu \    librabbitmq

причем используется для этого максимально производительная библиотека на Си для AMQP 0.9 librabbitmq наследуется именно от неё https://github.com/alanxz/rabbitmq-c


  • создан подписчик который уже по протоколу AMQP 1.0 смотрит в свою очередь и получает события, соответственно очередь очищается и больше мы заказов Васи не получим. В этом потоке нам это и не нужно.

            Attach recvAttach = new Attach()            {                Source = new Source()                {                    Address = "orders-amqp-10-consumer",                    Durable = 1,                },

            ReceiverLink receiver =                 new ReceiverLink(session,"netcore_amqp_10_consumer", recvAttach, null);            Console.WriteLine("Receiver connected to broker.");            while (true) {                Message message = receiver.Receive();                if (message == null)                {                    Console.WriteLine("Client exiting.");                    break;                }                Console.WriteLine("Received "                   + System.Text.Encoding.UTF8.GetString((byte[])message.Body)

Причем в качестве драйвера выбран


  <ItemGroup>    <PackageReference Include="AMQPNetLite.Core" Version="2.4.1" />  </ItemGroup>

именно его https://github.com/Azure/amqpnetlite Microsoft использует для маркетинга своей реализации сервисной шины. Собственно именно AMQP 1.0 как протокол они и рекламируют https://docs.microsoft.com/ru-ru/azure/service-bus-messaging/service-bus-amqp-overview


Ну и финально


  • создан подписчик по протоколу Kafka который при каждом старте перечитывает с нуля журнал отправленных заказов Васи. Тот самый Exactly once.

                AutoOffsetReset = AutoOffsetReset.Earliest

                c.Subscribe("orders-from-amqp");

                    while (true)                    {                        try                        {                            var cr = c.Consume(cts.Token);

Выглядит наш контур в итоге следующим образом:


  • 5 инфраструктурных контейнеров


  • 3 контейнера с приложениями


  • готовый журнал транзакций заказов который можно посмотреть через Kafka-Ui


  • и готовый контур связей для RabbitMQ


А где же Java ?


Не волнуйтесь при таком гибридном подходе, без неё никуда, для того чтобы всё вышеуказанное заработало пришлось сделать форк и актуализировать версии Kafka-Connect-Base


[submodule "dockers/rabbitmq-kafka-sink"]    path = dockers/rabbitmq-kafka-sink    url = https://github.com/aliczin/kafka-connect-rabbitmq

Но самое интересное не это, самое интересное что в этом самом Kafka-Connect нет по сути никакой магии только код трансформации.


По сути нам предлагают:


  • создать наследника абстрактной задачи Источника

public class RabbitMQSourceTask extends SourceTask {

  • выполнить подписку на очередь сообщений

        this.channel.basicConsume(queue, this.consumer);        log.info("Setting channel.basicQos({}, {});", this.config.prefetchCount, this.config.prefetchGlobal);        this.channel.basicQos(this.config.prefetchCount, this.config.prefetchGlobal);

  • трасформировать полученные сообщения в абстрактные записи причем с буфером.

  @Override  public List<SourceRecord> poll() throws InterruptedException {    List<SourceRecord> batch = new ArrayList<>(4096);    while (!this.records.drain(batch)) {

Отдельно можно выделить чудесный трансформатор сообщений из AMQP 0.9 в Кафка. У несведующего в Java глаз может задергаться. У автора чувствуется многолетный опыт работы в J2EE.


  private static final Logger log = LoggerFactory.getLogger(MessageConverter.class);  static final String FIELD_ENVELOPE_DELIVERYTAG = "deliveryTag";  static final String FIELD_ENVELOPE_ISREDELIVER = "isRedeliver";  static final String FIELD_ENVELOPE_EXCHANGE = "exchange";  static final String FIELD_ENVELOPE_ROUTINGKEY = "routingKey";  static final Schema SCHEMA_ENVELOPE = SchemaBuilder.struct()      .name("com.github.jcustenborder.kafka.connect.rabbitmq.Envelope")      .doc("Encapsulates a group of parameters used for AMQP's Basic methods. See " +          "`Envelope <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html>`_")      .field(FIELD_ENVELOPE_DELIVERYTAG, SchemaBuilder.int64().doc("The delivery tag included in this parameter envelope. See `Envelope.getDeliveryTag() <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html#getDeliveryTag-->`_").build())      .field(FIELD_ENVELOPE_ISREDELIVER, SchemaBuilder.bool().doc("The redelivery flag included in this parameter envelope. See `Envelope.isRedeliver() <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html#isRedeliver-->`_").build())      .field(FIELD_ENVELOPE_EXCHANGE, SchemaBuilder.string().optional().doc("The name of the exchange included in this parameter envelope. See `Envelope.getExchange() <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html#getExchange-->`_"))      .field(FIELD_ENVELOPE_ROUTINGKEY, SchemaBuilder.string().optional().doc("The routing key included in this parameter envelope. See `Envelope.getRoutingKey() <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Envelope.html#getRoutingKey-->`_").build())      .build();

Но Не будем критиковать, мы же в самом начале договорились что наша главная задача добиться конечного результата удобным на сегодня способом. А итоги у нас получаются следующие.


Итоги


Все что здесь продемонстрировано естественно лежит на Github.


В репозитории https://github.com/aliczin/hybrid-eventing. Лицензия выставленна простая до невозможности Creative Commons Attribution 4.0 International.


Полезно использовать в обучающих целях для команд разработки и инфраструктуры и поиграться с DevOps и поиграться с мультипротокольными приложениями. Ничего особо экстравагантного в данном концепте конечно нет, ключевое тут я как я написал в самом начале мы делаем избыточное количество интеграционных протоколов, добавляя транформаторов между потоками интеграции.


Схема коммуникации в итоге для "разработчика интеграционных потоков" (с) выглядит следующим образом для источника и брокеров


orderEventsApp->Amqp09: send orderAmqp09->Amqp10: fanout\n copy eventAmqp09->KafkaQ: fanout\n copy eventKafkaQ->KafkaConnect: consume\n on messageKafkaConnect->KafkaConnect: transform\n messageKafkaConnect->Kafka: publish to topic


а для приемников все упрощается


Amqp10->orderEventSubApp: subcribe\n for eventorderJournalApp->Kafka: read kafka journal


Приемники берут нужные им данные только по нужному им протоколу

Ключевые посылы


Ключевые моменты которые я хотел расскрыть данной статьей


  • стройте эксперименты и продуктивы с Apache Kafka не со штатным Java клиентом, а librdkafka и базирующихся на ней адаптерах это позволит вам отладить сценарии разных версий протоколов KafkaAPI. Java вам пригодится в другом месте.


  • не ввязывайтесь с священные войны, что лучше RabbitMQ/Kafka/Nats/ActiveMQ просто развертывайте сервисы и публикуйте протоколы и пробуйте свои бизнес-сценарии.


  • начните уже внедрять продуктивный Docker, или хотя бы пилотные и разработческие контура.


  • реальный ИТ ландшафт почти всегда будет мультипротокольным



Примечание для понимающих


чтобы гибриды развивались дальше:


  • Mosquito очень удобен как встраиваемый брокер на уровне контролера SCADA для преобразования из ModBus/OPC-UA. Хотя как вы уже поняли из статьи интересны реализации "мостов из протокола в протокол" пример https://github.com/mainflux/mainflux


  • ActiveMQ удобен для Java разработчиков, потому что у них есть боязнь Erlang, но как мы выше уже сказали мост RabbitMQ AMQP 1.0 -> ActiveMQ легко организуется средствами RabbitMQ, кстати также как и JMS.


  • NATs интересен как часть OpenFaaS платформы, при внедрении "своего маленького" Amazon Lambda с преферансом. И опять же подход будет всё тот же мультипротокольные мосты с трансформацией: https://github.com/nats-io/nats-kafka если Вам не страшно посмотрите эксперименты с OpenFaaS веселых 1С-ников 2.5 часа примеров https://youtu.be/8sF-oGGVa9M



Надеюсь мой архитектурный подход Вам придется по душе и вы перестанете тратить деньги заказчика (инвестора/свои если вы стартапщик: Маша это замечание специально для тебя) на бессмысленные обсуждения что же выбрать в качестве брокера/платформы, и начнете наконец-то делать функциональность, которая будет использовать тот протокол, который удобен прямо сейчас. С возможностью переключения в случае "если чё"


Функциональность: Мультипротокольный адаптер    Как разработчик я хочу иметь абстракцию Produser/Consumer    С возможность изменения протокола интеграции    Чтобы под каждую задачу выбирать разные протоколы     и единый интерфейс вызова для обеспечения независимости от вендора предоставляющего транспортСценарий: vmWare реализует протокол Stream средствами RabbitMQ     Когда vmWare закончит свой плагин для потоков    Тогда я активирую новый протокол     И быстро воткну его в приложение    И так как у меня есть продуктивный кластер RabbitMQ    И мне нужно будет просто поменять канал для отдельных бизнес сценариевСценарий: Завтра придут 1С-ники со своим ActiveMQ из Шины для 1С    Когда мне нужно быстро включить очереди 1С в общий контур    И чтобы на Питоне использовать старые наработки с Kafka API    Тогда я добавляю трансформацию ActivemeMQ2Kafka    и живу по старому а события ходят уже и из 1Сetc

А чтобы вы не думали, что данный подход это нечто уникальное вот Вам еще интересная ссылка: https://github.com/fclairamb/ftpserver/pull/34 это когда нужен FTP сервер, а хочется S3.


Ну и в качестве финального момента обратите внимание: есть и риски данного подхода: но они я думаю Вам и так понятны.


  • Придется оркестрировать такой комплект сервисов и вручную это почти невозможно. Придется использовать DevOps штуки типа k8s, OpenShift, etc но если вы уже решились на интеграцию в режимах слабой связаности приложений в режиме онлайн, у вас что-то на эту тему уже скорее всего есть.
  • Трансформаторы между протоколами приходится дорабатывать ничего готового открытого и PRODUCTION-READY на данный момент найти почти невозможно.

Финальное примечение для любителей писать ТЗ по ГОСТу


так как Хабр читают любители цифровой трансформации (чтобы кто не понимал под этим словом) советую в техническое задание добавлять не упоминание конкретных реализации серверов, а что-то примерно следующее:


комплект программ для интеграции должен реализовывать коммуникацию конечных приложений по открытым протоколам HTTP, AMQP 0.9, AMQP 1.0, Apache Kafka не ниже версии 23, MQTT, WebSockets, <ЛюбойДругойХотьSOAPХотяЭтоЖуть> с возможность преобразования между протоколами дополнительными средствами администрирования

Надеюсь моя публикация после долгого перерыва Вам будет полезна в ваших интеграционных проектах. Предполагаю что будет вопрос про 1С и тут у меня совет только один. Используйте Google по ключевым словам 1С+RabbitMQ или 1С+Kafka или 1С+OpenFaas и RabbitMQ и Kafka "в 1С" давно и непринужденно используются. Потому что 1С это не только язык, но и несколько сообществ где уже давно сделаны все возможные адаптеры и платные и бесплатные. Собственно как и в Java/C#/Python/C++/Rust/etc.


Данная статья написана с применением расширения https://shd101wyy.github.io/markdown-preview-enhanced для Visual Studio Code за что автору летят дополнительные лучи добра.


Ну и в качестве финального момента хотел бы заметить, что выбор Cunfluent Inc в качестве платформы разработки Kafka-Connect экосистемы JDK выглядит все таки странно. Не удивлюсь если их конкуренты сделают такое же, но на GoLang, NodeJS (что-нибудь типа Kafka-Beats-Hub)



Красивые картинки в формате GraphViz я делаю при помощи хитрого проекта Docker2GraphViz помогает поддерживать актуальный контур и техническую документацию в формате Markdown


set CURPATH=%~dp0set DOCKER_DIR=%CURPATH%\dockersdocker run --rm -it --name dcv -v %DOCKER_DIR%\:/input pmsipilot/docker-compose-viz render -m image --force --output-file=infra-topology.png infra.ymldocker run --rm -it --name dcv -v %CURPATH%\:/input pmsipilot/docker-compose-viz render -m image --force --output-file=apps-topology.png applications.ymlcopy /b/v/y dockers\infra-topology.png content\assets\infra-topology.pngcopy /b/v/y apps-topology.png content\assets\apps-topology.png
Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru